Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
devel
-----

* Make cluster traversal executor with DFS receive neighbours in batches
to be able to stop the retrieval early in case a limit has been reached.

* Make single-server traversal executor with DFS receive neighbours in batches
to be able to stop the retrieval early in case a limit has been reached.

Expand Down
100 changes: 57 additions & 43 deletions arangod/Cluster/TraverserEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,15 @@ static const std::string TYPE = "type";
#endif

auto EdgeCursorForMultipleVertices::rearm() -> bool {
if (_nextVertex == _vertices.end()) {
if (_nextVertex == _vertices.size()) {
return false;
}
_cursor->rearm(*_nextVertex, _depth);
_cursor->rearm(_vertices[_nextVertex], _depth);
_nextVertex++;
return true;
}
auto EdgeCursorForMultipleVertices::hasMore() -> bool {
return _cursor->hasMore() || rearm();
return _cursor->hasMore() || _nextVertex != _vertices.size();
}

BaseEngine::BaseEngine(TRI_vocbase_t& vocbase, aql::QueryContext& query,
Expand Down Expand Up @@ -315,41 +315,29 @@ BaseTraverserEngine::BaseTraverserEngine(TRI_vocbase_t& vocbase,

BaseTraverserEngine::~BaseTraverserEngine() = default;

void BaseTraverserEngine::rearm(size_t depth, uint64_t batchSize,
std::vector<std::string> vertices,
VPackSlice variables) {
size_t BaseTraverserEngine::createNewCursor(size_t depth, uint64_t batchSize,
std::vector<std::string> vertices,
VPackSlice variables) {
injectVariables(variables);
_cursor = EdgeCursorForMultipleVertices{
_nextCursorId, depth, batchSize, std::move(vertices), getCursor(depth)};
_cursors.emplace_back(EdgeCursorForMultipleVertices{
_nextCursorId, depth, batchSize, std::move(vertices), getCursor(depth),
VPackBuilder{variables}});
auto id = _nextCursorId;
_nextCursorId++;
return id;
}

graph::EdgeCursor* BaseTraverserEngine::getCursor(uint64_t currentDepth) {
graph::EdgeCursor* cursor = nullptr;
if (_opts->hasSpecificCursorForDepth(currentDepth)) {
auto it = _depthSpecificCursors.find(currentDepth);
if (it == _depthSpecificCursors.end()) {
it = _depthSpecificCursors
.emplace(currentDepth, _opts->buildCursor(currentDepth))
.first;
}
TRI_ASSERT(it != _depthSpecificCursors.end());
cursor = it->second.get();
} else {
if (_generalCursor == nullptr) {
_generalCursor = _opts->buildCursor(currentDepth);
}
TRI_ASSERT(_generalCursor != nullptr);
cursor = _generalCursor.get();
}
return cursor;
std::unique_ptr<graph::EdgeCursor> BaseTraverserEngine::getCursor(
uint64_t currentDepth) {
return _opts->buildCursor(currentDepth);
}

void BaseTraverserEngine::allEdges(std::vector<std::string> const& vertices,
size_t depth, VPackBuilder& builder) {
size_t depth, VPackSlice variables,
VPackBuilder& builder) {
auto outputVertex = [this](VPackBuilder& builder, std::string_view vertex,
size_t depth) {
graph::EdgeCursor* cursor = getCursor(depth);
auto cursor = getCursor(depth);
cursor->rearm(vertex, depth);
_nextCursorId++;

Expand All @@ -373,6 +361,7 @@ void BaseTraverserEngine::allEdges(std::vector<std::string> const& vertices,
});
};

injectVariables(variables);
builder.openObject();
builder.add(VPackValue(StaticStrings::GraphQueryEdges));
builder.openArray(true);
Expand All @@ -385,22 +374,43 @@ void BaseTraverserEngine::allEdges(std::vector<std::string> const& vertices,
builder.close();
}

Result BaseTraverserEngine::nextEdgeBatch(size_t batchId,
Result BaseTraverserEngine::nextEdgeBatch(size_t cursorId, size_t batchId,
VPackBuilder& builder) {
TRI_ASSERT(_cursor.has_value());
if (_cursor->_nextBatch != batchId) {
return Result{TRI_ERROR_HTTP_BAD_PARAMETER, ""};
if (_cursors.empty()) {
return Result{
TRI_ERROR_HTTP_BAD_PARAMETER,
fmt::format("cursor id {} does not exist in traverser engine {}",
cursorId, engineId())};
}
auto& cursor = _cursors.back();
if (cursorId != cursor._cursorId) {
return Result{
TRI_ERROR_HTTP_BAD_PARAMETER,
fmt::format(
"cursor id {} is not on top of cursor stack in traverser engine {}",
cursorId, engineId())};
}
if (cursor._nextBatch != batchId) {
return Result{TRI_ERROR_HTTP_BAD_PARAMETER,
fmt::format("batch id {} is not next batch for cursor id {} "
"in traverser engine {}",
batchId, cursorId, engineId())};
}
uint64_t count = 0;
auto batchSize = _cursor->_batchSize;
auto batchSize = cursor._batchSize;

// TODO not sure if this is necessary here or if it suffices if
// variables are injected on cursor creation (in
// BaseTraverserEngine::createNewCursor)
Comment on lines +402 to +404
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO

injectVariables(cursor._variables.slice());

builder.add(VPackValue(StaticStrings::GraphQueryEdges));
builder.openArray(true);
while (count != batchSize && _cursor->hasMore()) {
auto vertex = _cursor->_cursor->currentVertex();
auto depth = _cursor->_cursor->currentDepth();
while (count < batchSize && cursor.hasMore()) {
auto vertex = cursor._cursor->currentVertex();
auto depth = cursor._cursor->currentDepth();

_cursor->_cursor->nextBatch(
cursor._cursor->nextBatch(
[&](EdgeDocumentToken&& eid, VPackSlice edge, size_t cursorId) {
if (edge.isString()) {
edge = lookupToken(eid);
Expand All @@ -420,14 +430,18 @@ Result BaseTraverserEngine::nextEdgeBatch(size_t batchId,
count++;
},
batchSize);
if (!cursor._cursor->hasMore()) {
cursor.rearm();
}
}
builder.close();
builder.add("done", VPackValue(not _cursor->hasMore()));
builder.add("cursorId", VPackValue(_cursor->_cursorId));
builder.add("batchId", VPackValue(_cursor->_nextBatch));
builder.add("done", VPackValue(not cursor.hasMore()));
builder.add("cursorId", VPackValue(cursor._cursorId));
builder.add("batchId", VPackValue(cursor._nextBatch));

if (count > 0) {
_cursor->_nextBatch++;
cursor._nextBatch++;
if (not cursor.hasMore()) {
_cursors.pop_back();
}
return {};
}
Expand Down
31 changes: 16 additions & 15 deletions arangod/Cluster/TraverserEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "Aql/Collections.h"
#include "Basics/MemoryTypes/MemoryTypes.h"
#include "Graph/EdgeDocumentToken.h"
#include "Graph/Cursors/EdgeCursor.h"

struct TRI_vocbase_t;

Expand All @@ -51,7 +52,6 @@ class VariableGenerator;

namespace graph {
struct BaseOptions;
class EdgeCursor;
struct ShortestPathOptions;
} // namespace graph

Expand All @@ -68,19 +68,22 @@ struct EdgeCursorForMultipleVertices {
size_t _depth;
uint64_t _batchSize;
std::vector<std::string> _vertices;
std::vector<std::string>::iterator _nextVertex;
graph::EdgeCursor* _cursor;
size_t _nextVertex;
std::unique_ptr<graph::EdgeCursor> _cursor;
size_t _nextBatch = 0;
VPackBuilder _variables;
EdgeCursorForMultipleVertices(size_t cursorId, size_t depth,
uint64_t batchSize,
std::vector<std::string> vertices,
graph::EdgeCursor* cursor)
std::unique_ptr<graph::EdgeCursor> cursor,
VPackBuilder variables)
: _cursorId{cursorId},
_depth{depth},
_batchSize{batchSize},
_vertices{std::move(vertices)},
_nextVertex{_vertices.begin()},
_cursor{cursor} {
_nextVertex{0},
_cursor{std::move(cursor)},
_variables{std::move(variables)} {
TRI_ASSERT(_cursor != nullptr);
rearm();
}
Expand Down Expand Up @@ -143,12 +146,13 @@ class BaseTraverserEngine : public BaseEngine {

// old behaviour
void allEdges(std::vector<std::string> const& vertices, size_t depth,
VPackBuilder& builder);
VPackSlice variables, VPackBuilder& builder);

// new behaviour
void rearm(size_t depth, uint64_t batchSize,
std::vector<std::string> vertices, VPackSlice variables);
Result nextEdgeBatch(size_t batchId, VPackBuilder& builder);
size_t createNewCursor(size_t depth, uint64_t batchSize,
std::vector<std::string> vertices,
VPackSlice variables);
Result nextEdgeBatch(size_t cursorId, size_t batchId, VPackBuilder& builder);
void addAndClearStatistics(VPackBuilder& builder);

virtual void smartSearch(arangodb::velocypack::Slice,
Expand All @@ -167,16 +171,13 @@ class BaseTraverserEngine : public BaseEngine {
aql::VariableGenerator const* variables() const;

graph::BaseOptions const& options() const override;
std::optional<EdgeCursorForMultipleVertices> _cursor;
std::vector<EdgeCursorForMultipleVertices> _cursors;
size_t _nextCursorId = 0;

protected:
graph::EdgeCursor* getCursor(uint64_t currentDepth);
std::unique_ptr<graph::EdgeCursor> getCursor(uint64_t currentDepth);

std::unique_ptr<traverser::TraverserOptions> _opts;
std::unordered_map<uint64_t, std::unique_ptr<graph::EdgeCursor>>
_depthSpecificCursors;
std::unique_ptr<graph::EdgeCursor> _generalCursor;
aql::VariableGenerator const* _variables;
};

Expand Down
15 changes: 12 additions & 3 deletions arangod/Graph/Enumerators/OneSidedEnumerator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ auto OneSidedEnumerator<Configuration>::computeNeighbourhoodOfNextVertex()
auto stepsAdded = _provider.expandToNextBatch(
step, posPrevious, [&](Step n) -> void { _queue.append({n}); });
if (not stepsAdded) { // means that NextBatch iterator is exhausted
_queue.pop(); // now we can pop NextBatch item savely
// only execute this if nothing was added to queue in the meantime in
// expandToNextBatch!
_queue.pop(); // now we can pop NextBatch item savely
}
return;
}
Expand Down Expand Up @@ -176,8 +178,15 @@ auto OneSidedEnumerator<Configuration>::computeNeighbourhoodOfNextVertex()
smartExpand(step, posPrevious, res);
} else {
if (step.getDepth() < _options.getMaxDepth() && !res.isPruned()) {
// currently batching only works with cluster case
if (_queue.isBatched() && ServerState::instance()->isSingleServer()) {
bool batching = false;
if (_queue.isBatched()) {
batching = true;
}
if constexpr (std::is_same_v<decltype(_provider),
enterprise::SmartGraphProvider<Step>>) {
batching = false;
}
if (batching) {
_provider.addExpansionIterator(
step, [&]() -> void { _queue.append({Expansion{posPrevious}}); });
} else {
Expand Down
Loading