Skip to content

Commit

Permalink
PR fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffreylovitz committed Apr 2, 2020
1 parent d9171d9 commit 216f1e7
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 76 deletions.
19 changes: 11 additions & 8 deletions src/execution_plan/ops/op_conditional_traverse.c
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,11 @@ OpBase *NewCondTraverseOp(const ExecutionPlan *plan, Graph *g, AlgebraicExpressi

const char *edge = AlgebraicExpression_Edge(ae);
if(edge) {
op->setEdge = true;
/* This operation will populate an edge in the Record.
* Prepare all necessary information for collecting matching edges. */
uint edge_idx = OpBase_Modifies((OpBase *)op, edge);
QGEdge *e = QueryGraph_GetEdgeByAlias(plan->query_graph, edge);
Traverse_NewEdgeData(&op->edge_data, ae, e, edge_idx);
op->edge_ctx = Traverse_NewEdgeCtx(ae, e, edge_idx);
}

return (OpBase *)op;
Expand All @@ -115,7 +114,7 @@ static Record CondTraverseConsume(OpBase *opBase) {

/* If we're required to update an edge and have one queued, we can return early.
* Otherwise, try to get a new pair of source and destination nodes. */
if(op->setEdge && Traverse_SetEdge(&op->edge_data, op->r)) return OpBase_CloneRecord(op->r);
if(op->edge_ctx && Traverse_SetEdge(op->edge_ctx, op->r)) return OpBase_CloneRecord(op->r);

bool depleted = true;
NodeID src_id = INVALID_ENTITY_ID;
Expand Down Expand Up @@ -154,12 +153,12 @@ static Record CondTraverseConsume(OpBase *opBase) {
Node *destNode = Record_GetNode(op->r, op->destNodeIdx);
Graph_GetNode(op->graph, dest_id, destNode);

if(op->setEdge) {
if(op->edge_ctx) {
Node *srcNode = Record_GetNode(op->r, op->srcNodeIdx);
// Collect all appropriate edges connecting the current pair of endpoints.
Traverse_CollectEdges(&op->edge_data, ENTITY_GET_ID(srcNode), ENTITY_GET_ID(destNode));
Traverse_CollectEdges(op->edge_ctx, ENTITY_GET_ID(srcNode), ENTITY_GET_ID(destNode));
// We're guaranteed to have at least one edge.
Traverse_SetEdge(&op->edge_data, op->r);
Traverse_SetEdge(op->edge_ctx, op->r);
}

return OpBase_CloneRecord(op->r);
Expand All @@ -174,7 +173,8 @@ static OpResult CondTraverseReset(OpBase *ctx) {
for(uint i = 0; i < op->recordCount; i++) OpBase_DeleteRecord(op->records[i]);
op->recordCount = 0;

if(op->setEdge) array_clear(op->edge_data.edges);
if(op->edge_ctx) Traverse_ResetEdgeCtx(op->edge_ctx);

if(op->iter) {
GxB_MatrixTupleIter_free(op->iter);
op->iter = NULL;
Expand Down Expand Up @@ -212,7 +212,10 @@ static void CondTraverseFree(OpBase *ctx) {
op->ae = NULL;
}

if(op->setEdge) Traverse_FreeEdgeData(&op->edge_data);
if(op->edge_ctx) {
Traverse_FreeEdgeCtx(op->edge_ctx);
op->edge_ctx = NULL;
}

if(op->records) {
for(uint i = 0; i < op->recordCount; i++) OpBase_DeleteRecord(op->records[i]);
Expand Down
3 changes: 1 addition & 2 deletions src/execution_plan/ops/op_conditional_traverse.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ typedef struct {
AlgebraicExpression *ae;
GrB_Matrix F; // Filter matrix.
GrB_Matrix M; // Algebraic expression result.
bool setEdge; // Edge needs to be set in the Record.
EdgeTraverseData edge_data; // Edge collection data if the edge needs to be set.
EdgeTraverseCtx *edge_ctx; // Edge collection data if the edge needs to be set.
GxB_MatrixTupleIter *iter; // Iterator over M.
int srcNodeIdx; // Source node index into record.
int destNodeIdx; // Destination node index into record.
Expand Down
22 changes: 13 additions & 9 deletions src/execution_plan/ops/op_expand_into.c
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,11 @@ OpBase *NewExpandIntoOp(const ExecutionPlan *plan, Graph *g, AlgebraicExpression

const char *edge = AlgebraicExpression_Edge(ae);
if(edge) {
op->setEdge = true;
/* This operation will populate an edge in the Record.
* Prepare all necessary information for collecting matching edges. */
uint edge_idx = OpBase_Modifies((OpBase *)op, edge);
QGEdge *e = QueryGraph_GetEdgeByAlias(plan->query_graph, edge);
Traverse_NewEdgeData(&op->edge_data, ae, e, edge_idx);
op->edge_ctx = Traverse_NewEdgeCtx(ae, e, edge_idx);
}

return (OpBase *)op;
Expand All @@ -107,7 +108,7 @@ static OpResult ExpandIntoInit(OpBase *opBase) {
static Record _handoff(OpExpandInto *op) {
/* If we're required to update an edge and have one queued, we can return early.
* Otherwise, try to get a new pair of source and destination nodes. */
if(op->setEdge && Traverse_SetEdge(&op->edge_data, op->r)) return OpBase_CloneRecord(op->r);
if(op->edge_ctx && Traverse_SetEdge(op->edge_ctx, op->r)) return OpBase_CloneRecord(op->r);

/* Find a record where both record's source and destination
* nodes are connected. */
Expand All @@ -125,13 +126,13 @@ static Record _handoff(OpExpandInto *op) {
// Src is not connected to dest.
if(res != GrB_SUCCESS) continue;

// If we're here, src is connected to dest.
if(op->setEdge) {
// If we're here, src is connected to dest. Update the edge if necessary.
if(op->edge_ctx) {
Node *srcNode = Record_GetNode(op->r, op->srcNodeIdx);
// Collect all appropriate edges connecting the current pair of endpoints.
Traverse_CollectEdges(&op->edge_data, ENTITY_GET_ID(srcNode), destId);
Traverse_CollectEdges(op->edge_ctx, ENTITY_GET_ID(srcNode), destId);
// Add an edge to the Record.
Traverse_SetEdge(&op->edge_data, op->r);
Traverse_SetEdge(op->edge_ctx, op->r);
return OpBase_CloneRecord(op->r);
}

Expand Down Expand Up @@ -186,7 +187,7 @@ static OpResult ExpandIntoReset(OpBase *ctx) {
}
op->recordCount = 0;

if(op->setEdge) array_clear(op->edge_data.edges);
if(op->edge_ctx) Traverse_ResetEdgeCtx(op->edge_ctx);
if(op->F != GrB_NULL) GrB_Matrix_clear(op->F);
return OP_OK;
}
Expand Down Expand Up @@ -215,7 +216,10 @@ static void ExpandIntoFree(OpBase *ctx) {
op->ae = NULL;
}

if(op->setEdge) Traverse_FreeEdgeData(&op->edge_data);
if(op->edge_ctx) {
Traverse_FreeEdgeCtx(op->edge_ctx);
op->edge_ctx = NULL;
}

if(op->records) {
for(uint i = 0; i < op->recordsCap; i++) {
Expand Down
3 changes: 1 addition & 2 deletions src/execution_plan/ops/op_expand_into.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ typedef struct {
AlgebraicExpression *ae;
GrB_Matrix F; // Filter matrix.
GrB_Matrix M; // Algebraic expression result.
bool setEdge; // Edge needs to be set in the Record.
EdgeTraverseData edge_data; // Edge collection data if the edge needs to be set.
EdgeTraverseCtx *edge_ctx; // Edge collection data if the edge needs to be set.
int srcNodeIdx; // Source node index into record.
int destNodeIdx; // Destination node index into record.
uint recordCount; // Number of held records.
Expand Down
75 changes: 38 additions & 37 deletions src/execution_plan/ops/shared/traverse_functions.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,89 +8,90 @@
#include "../../../query_ctx.h"

// Collect edges between the source and destination nodes.
static void _Traverse_CollectEdges(EdgeTraverseData *edge_data, NodeID src, NodeID dest) {
static void _Traverse_CollectEdges(EdgeTraverseCtx *edge_ctx, NodeID src, NodeID dest) {
Graph *g = QueryCtx_GetGraph();
uint count = array_len(edge_data->edgeRelationTypes);
uint count = array_len(edge_ctx->edgeRelationTypes);
for(uint i = 0; i < count; i++) {
Graph_GetEdgesConnectingNodes(g,
src,
dest,
edge_data->edgeRelationTypes[i],
&edge_data->edges);
edge_ctx->edgeRelationTypes[i],
&edge_ctx->edges);
}
}

/* Collects traversed edge relations.
* e.g. [e:R0|R1]
* edge_data->edgeRelationTypes will hold both R0 and R1 IDs.
* edge_ctx->edgeRelationTypes will hold both R0 and R1 IDs.
* in the case where no relationship types are specified
* edge_data->edgeRelationTypes will contain GRAPH_NO_RELATION. */
static void _Traverse_SetRelationTypes(EdgeTraverseData *edge_data, QGEdge *e) {
* edge_ctx->edgeRelationTypes will contain GRAPH_NO_RELATION. */
static void _Traverse_SetRelationTypes(EdgeTraverseCtx *edge_ctx, QGEdge *e) {
uint reltype_count = array_len(e->reltypeIDs);
if(reltype_count > 0) {
array_clone(edge_data->edgeRelationTypes, e->reltypeIDs);
array_clone(edge_ctx->edgeRelationTypes, e->reltypeIDs);
} else {
edge_data->edgeRelationTypes = array_new(int, 1);
edge_data->edgeRelationTypes = array_append(edge_data->edgeRelationTypes, GRAPH_NO_RELATION);
edge_ctx->edgeRelationTypes = array_new(int, 1);
edge_ctx->edgeRelationTypes = array_append(edge_ctx->edgeRelationTypes, GRAPH_NO_RELATION);
}
}

void Traverse_NewEdgeData(EdgeTraverseData *edge_data, AlgebraicExpression *ae,
QGEdge *e, int idx) {
edge_data->edges = array_new(Edge, 32); // Instantiate array to collect matching edges.
_Traverse_SetRelationTypes(edge_data, e); // Build the array of relation type IDs.
edge_data->edgeIdx = idx;
EdgeTraverseCtx *Traverse_NewEdgeCtx(AlgebraicExpression *ae, QGEdge *e, int idx) {
EdgeTraverseCtx *edge_ctx = rm_malloc(sizeof(EdgeTraverseCtx));
edge_ctx->edges = array_new(Edge, 32); // Instantiate array to collect matching edges.
_Traverse_SetRelationTypes(edge_ctx, e); // Build the array of relation type IDs.
edge_ctx->edgeIdx = idx;
// Determine the edge directions we need to collect.
if(e->bidirectional) {
// Bidirectional edges matching incoming and outgoing edges.
edge_data->direction = GRAPH_EDGE_DIR_BOTH;
edge_ctx->direction = GRAPH_EDGE_DIR_BOTH;
} else if(AlgebraicExpression_ContainsOp(ae, AL_EXP_TRANSPOSE)) {
/* If this operation traverses a transposed edge, the source and destination nodes
* will be swapped in the Record. */
edge_data->direction = GRAPH_EDGE_DIR_INCOMING;
edge_ctx->direction = GRAPH_EDGE_DIR_INCOMING;
} else {
// The default traversal direction is outgoing.
edge_data->direction = GRAPH_EDGE_DIR_OUTGOING;
edge_ctx->direction = GRAPH_EDGE_DIR_OUTGOING;
}
return edge_ctx;
}

// Collect edges between the source and destination nodes matching the op's traversal direction.
void Traverse_CollectEdges(EdgeTraverseData *edge_data, NodeID src, NodeID dest) {
switch(edge_data->direction) {
void Traverse_CollectEdges(EdgeTraverseCtx *edge_ctx, NodeID src, NodeID dest) {
switch(edge_ctx->direction) {
case GRAPH_EDGE_DIR_OUTGOING:
_Traverse_CollectEdges(edge_data, src, dest);
_Traverse_CollectEdges(edge_ctx, src, dest);
return;
case GRAPH_EDGE_DIR_INCOMING:
// If we're traversing incoming edges, swap the source and destination.
_Traverse_CollectEdges(edge_data, dest, src);
_Traverse_CollectEdges(edge_ctx, dest, src);
return;
case GRAPH_EDGE_DIR_BOTH:
// If we're traversing in both directions, collect edges in both directions.
_Traverse_CollectEdges(edge_data, src, dest);
_Traverse_CollectEdges(edge_data, dest, src);
_Traverse_CollectEdges(edge_ctx, src, dest);
_Traverse_CollectEdges(edge_ctx, dest, src);
return;
}
}

bool Traverse_SetEdge(EdgeTraverseData *edge_data, Record r) {
bool Traverse_SetEdge(EdgeTraverseCtx *edge_ctx, Record r) {
// Return false if all edges have been consumed.
if(!array_len(edge_data->edges)) return false;
if(!array_len(edge_ctx->edges)) return false;

// Pop an edge and add it to the Record.
Edge e = array_pop(edge_data->edges);
Record_AddEdge(r, edge_data->edgeIdx, e);
Edge e = array_pop(edge_ctx->edges);
Record_AddEdge(r, edge_ctx->edgeIdx, e);
return true;
}

void Traverse_FreeEdgeData(EdgeTraverseData *edge_data) {
if(edge_data->edges) {
array_free(edge_data->edges);
edge_data->edges = NULL;
}
void Traverse_ResetEdgeCtx(EdgeTraverseCtx *edge_ctx) {
array_clear(edge_ctx->edges);
}

if(edge_data->edgeRelationTypes) {
array_free(edge_data->edgeRelationTypes);
edge_data->edgeRelationTypes = NULL;
}
void Traverse_FreeEdgeCtx(EdgeTraverseCtx *edge_ctx) {
if(!edge_ctx) return;

array_free(edge_ctx->edges);
array_free(edge_ctx->edgeRelationTypes);
rm_free(edge_ctx);
}

22 changes: 13 additions & 9 deletions src/execution_plan/ops/shared/traverse_functions.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,27 @@
#include "../../execution_plan.h"
#include "../../../arithmetic/algebraic_expression.h"

/* Container struct for traversing and populating referenced edges in
* traversal ops like CondTraverse and ExpandInto. */
typedef struct {
int *edgeRelationTypes; // The relation type IDs that should be collected.
Edge *edges; // Flexible array of all matching edges for the current endpoints.
int edgeIdx; // The Record index for the referenced edge.
GRAPH_EDGE_DIR direction; // The direction of the referenced edge being traversed.
} EdgeTraverseData;
} EdgeTraverseCtx;

// Initialize an EdgeTraverseData struct to populate edges appropriately for traversal operations.
void Traverse_NewEdgeData(EdgeTraverseData *edge_data, AlgebraicExpression *ae,
QGEdge *e, int idx);
// Initialize an EdgeTraverseCtx struct to populate edges appropriately for traversal operations.
EdgeTraverseCtx *Traverse_NewEdgeCtx(AlgebraicExpression *ae, QGEdge *e, int idx);

// Collect all appropriate edges between the given endpoints.
void Traverse_CollectEdges(EdgeTraverseData *edge_data, NodeID src, NodeID dest);
void Traverse_CollectEdges(EdgeTraverseCtx *edge_ctx, NodeID src, NodeID dest);

// If a matching edge is available, pop it and add it to the Record.
bool Traverse_SetEdge(EdgeTraverseData *edge_data, Record r);
// Remove a matching edge from the edges array if one is available and set it in the Record.
bool Traverse_SetEdge(EdgeTraverseCtx *edge_ctx, Record r);

// Free an EdgeTraverseData struct.
void Traverse_FreeEdgeData(EdgeTraverseData *edge_data);
// Reset contained edges within an EdgeTraverseCtx.
void Traverse_ResetEdgeCtx(EdgeTraverseCtx *edge_ctx);

// Free an EdgeTraverseCtx struct.
void Traverse_FreeEdgeCtx(EdgeTraverseCtx *edge_ctx);

10 changes: 5 additions & 5 deletions src/execution_plan/optimizations/reduce_count.c
Original file line number Diff line number Diff line change
Expand Up @@ -213,14 +213,14 @@ void _reduceEdgeCount(ExecutionPlan *plan) {

// If type is specified, count only labeled entities.
CondTraverse *condTraverse = (CondTraverse *)opTraverse;
int edgeRelationCount = array_len(condTraverse->edge_data.edgeRelationTypes);

// The traversal op doesn't contain information about the traversed edge, cannot apply optimization.
if(edgeRelationCount == 0) return;
if(!condTraverse->edge_ctx) return;

uint edgeRelationCount = array_len(condTraverse->edge_ctx->edgeRelationTypes);

uint64_t edges = 0;
for(int i = 0; i < edgeRelationCount; i++) {
int relType = condTraverse->edge_data.edgeRelationTypes[i];
for(uint i = 0; i < edgeRelationCount; i++) {
int relType = condTraverse->edge_ctx->edgeRelationTypes[i];
switch(relType) {
case GRAPH_NO_RELATION:
// Should be the only relationship type mentioned, -[]->
Expand Down
8 changes: 4 additions & 4 deletions tests/flow/test_bidirectional_traversals.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ def populate_acyclic_graph(self):
nodes.append(node)
acyclic_graph.add_node(node)

edge = Edge(nodes[0], "E", nodes[1])
edge = Edge(nodes[0], "E", nodes[1], properties={"val": 0})
acyclic_graph.add_edge(edge)

edge = Edge(nodes[1], "E", nodes[2])
edge = Edge(nodes[1], "E", nodes[2], properties={"val": 1})
acyclic_graph.add_edge(edge)

acyclic_graph.commit()
Expand Down Expand Up @@ -328,7 +328,7 @@ def test12_bidirectional_expand_into(self):
self.env.assertEquals(actual_result.result_set, traverse_result.result_set)

# Test undirected traversals with a referenced edge.
query = """MATCH (a), (b) WITH a, b MATCH (a)-[e:E]-(b) RETURN ID(e), a.val, b.val ORDER BY ID(e), a.val, b.val"""
query = """MATCH (a), (b) WITH a, b MATCH (a)-[e:E]-(b) RETURN e.val, a.val, b.val ORDER BY e.val, a.val, b.val"""
actual_result = acyclic_graph.query(query)
expected_result = [[0, 'v1', 'v2'],
[0, 'v2', 'v1'],
Expand All @@ -337,6 +337,6 @@ def test12_bidirectional_expand_into(self):
self.env.assertEquals(actual_result.result_set, expected_result)

# Verify result against the equivalent conditional traversal.
query = """MATCH (a)-[e:E]-(b) RETURN ID(e), a.val, b.val ORDER BY ID(e), a.val, b.val"""
query = """MATCH (a)-[e:E]-(b) RETURN e.val, a.val, b.val ORDER BY e.val, a.val, b.val"""
traverse_result = acyclic_graph.query(query)
self.env.assertEquals(actual_result.result_set, traverse_result.result_set)

0 comments on commit 216f1e7

Please sign in to comment.