Skip to content

Commit

Permalink
Fix bidirectional ExpandInto, shared edge populating logic
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffreylovitz committed Apr 1, 2020
1 parent 19ee859 commit d9171d9
Show file tree
Hide file tree
Showing 7 changed files with 184 additions and 219 deletions.
108 changes: 19 additions & 89 deletions src/execution_plan/ops/op_conditional_traverse.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,58 +18,8 @@ static OpResult CondTraverseReset(OpBase *opBase);
static OpBase *CondTraverseClone(const ExecutionPlan *plan, const OpBase *opBase);
static void CondTraverseFree(OpBase *opBase);

static void _setupTraversedRelations(CondTraverse *op, QGEdge *e) {
uint reltype_count = array_len(e->reltypeIDs);
if(reltype_count > 0) {
array_clone(op->edgeRelationTypes, e->reltypeIDs);
op->edgeRelationCount = reltype_count;
} else {
op->edgeRelationCount = 1;
op->edgeRelationTypes = array_new(int, 1);
op->edgeRelationTypes = array_append(op->edgeRelationTypes, GRAPH_NO_RELATION);
}
}

// Updates query graph edge.
static int _CondTraverse_SetEdge(CondTraverse *op, Record r) {
// Consumed edges connecting current source and destination nodes.
if(!array_len(op->edges)) return 0;

Edge *e = op->edges + (array_len(op->edges) - 1);
Record_AddEdge(r, op->edgeIdx, *e);
array_pop(op->edges);
return 1;
}

// Collect edges between the source and destination nodes.
static void __CondTraverse_CollectEdges(CondTraverse *op, int src, int dest) {
Node *srcNode = Record_GetNode(op->r, src);
Node *destNode = Record_GetNode(op->r, dest);
for(int i = 0; i < op->edgeRelationCount; i++) {
Graph_GetEdgesConnectingNodes(op->graph,
ENTITY_GET_ID(srcNode),
ENTITY_GET_ID(destNode),
op->edgeRelationTypes[i],
&op->edges);
}
}

// Collect edges between the source and destination nodes matching the op's traversal direction.
static void _CondTraverse_CollectEdges(CondTraverse *op, int src, int dest) {
switch(op->direction) {
case GRAPH_EDGE_DIR_OUTGOING:
__CondTraverse_CollectEdges(op, op->srcNodeIdx, op->destNodeIdx);
return;
case GRAPH_EDGE_DIR_INCOMING:
// If we're traversing incoming edges, swap the source and destination.
__CondTraverse_CollectEdges(op, op->destNodeIdx, op->srcNodeIdx);
return;
case GRAPH_EDGE_DIR_BOTH:
// If we're traversing in both directions, collect edges in both directions.
__CondTraverse_CollectEdges(op, op->srcNodeIdx, op->destNodeIdx);
__CondTraverse_CollectEdges(op, op->destNodeIdx, op->srcNodeIdx);
return;
}
static inline int CondTraverseToString(const OpBase *ctx, char *buf, uint buf_len) {
return TraversalToString(ctx, buf, buf_len, ((const CondTraverse *)ctx)->ae);
}

static void _populate_filter_matrix(CondTraverse *op) {
Expand Down Expand Up @@ -117,22 +67,15 @@ void _traverse(CondTraverse *op) {
GrB_Matrix_clear(op->F);
}

static inline int CondTraverseToString(const OpBase *ctx, char *buf, uint buf_len) {
return TraversalToString(ctx, buf, buf_len, ((const CondTraverse *)ctx)->ae);
}

OpBase *NewCondTraverseOp(const ExecutionPlan *plan, Graph *g, AlgebraicExpression *ae) {
CondTraverse *op = rm_calloc(1, sizeof(CondTraverse));
op->graph = g;
op->ae = ae;
op->r = NULL;
op->iter = NULL;
op->edges = NULL;
op->F = GrB_NULL;
op->M = GrB_NULL;
op->recordCount = 0;
op->direction = GRAPH_EDGE_DIR_OUTGOING;
op->edgeRelationTypes = NULL;

// Set our Op operations
OpBase_Init((OpBase *)op, OPType_CONDITIONAL_TRAVERSE, "Conditional Traverse", CondTraverseInit,
Expand All @@ -145,18 +88,11 @@ OpBase *NewCondTraverseOp(const ExecutionPlan *plan, Graph *g, AlgebraicExpressi
const char *edge = AlgebraicExpression_Edge(ae);
if(edge) {
op->setEdge = true;
op->edges = array_new(Edge, 32);
/* This operation will populate an edge in the Record.
* Prepare all necessary information for collecting matching edges. */
uint edge_idx = OpBase_Modifies((OpBase *)op, edge);
QGEdge *e = QueryGraph_GetEdgeByAlias(plan->query_graph, edge);
_setupTraversedRelations(op, e);
op->edgeIdx = OpBase_Modifies((OpBase *)op, edge);
// Determine the edge directions we need to collect.
if(e->bidirectional) {
op->direction = GRAPH_EDGE_DIR_BOTH;
} else if(AlgebraicExpression_ContainsOp(ae, AL_EXP_TRANSPOSE)) {
/* If this operation traverses a transposed edge, the source and destination nodes
* will be swapped in the Record. */
op->direction = GRAPH_EDGE_DIR_INCOMING;
}
Traverse_NewEdgeData(&op->edge_data, ae, e, edge_idx);
}

return (OpBase *)op;
Expand All @@ -170,16 +106,16 @@ static OpResult CondTraverseInit(OpBase *opBase) {
return OP_OK;
}

/* CondTraverseConsume next operation
* each call will update the graph
* returns OP_DEPLETED when no additional updates are available */
/* Each call to CondTraverseConsume emits a Record containing the
* traversal's endpoints and, if required, an edge.
* Returns NULL once all traversals have been performed. */
static Record CondTraverseConsume(OpBase *opBase) {
CondTraverse *op = (CondTraverse *)opBase;
OpBase *child = op->op.children[0];

/* If we're required to update an edge and have one queued, we can return early.
* Otherwise, try to get a new pair of source and destination nodes. */
if(op->setEdge && _CondTraverse_SetEdge(op, op->r)) return OpBase_CloneRecord(op->r);
if(op->setEdge && Traverse_SetEdge(&op->edge_data, op->r)) return OpBase_CloneRecord(op->r);

bool depleted = true;
NodeID src_id = INVALID_ENTITY_ID;
Expand All @@ -194,7 +130,7 @@ static Record CondTraverseConsume(OpBase *opBase) {
/* Run out of tuples, try to get new data.
* Free old records. */
op->r = NULL;
for(int i = 0; i < op->recordCount; i++) OpBase_DeleteRecord(op->records[i]);
for(uint i = 0; i < op->recordCount; i++) OpBase_DeleteRecord(op->records[i]);

// Ask child operations for data.
for(op->recordCount = 0; op->recordCount < op->recordsCap; op->recordCount++) {
Expand All @@ -219,9 +155,11 @@ static Record CondTraverseConsume(OpBase *opBase) {
Graph_GetNode(op->graph, dest_id, destNode);

if(op->setEdge) {
_CondTraverse_CollectEdges(op, op->destNodeIdx, op->srcNodeIdx);
Node *srcNode = Record_GetNode(op->r, op->srcNodeIdx);
// Collect all appropriate edges connecting the current pair of endpoints.
Traverse_CollectEdges(&op->edge_data, ENTITY_GET_ID(srcNode), ENTITY_GET_ID(destNode));
// We're guaranteed to have at least one edge.
_CondTraverse_SetEdge(op, op->r);
Traverse_SetEdge(&op->edge_data, op->r);
}

return OpBase_CloneRecord(op->r);
Expand All @@ -233,10 +171,10 @@ static OpResult CondTraverseReset(OpBase *ctx) {
// Do not explicitly free op->r, as the same pointer is also held
// in the op->records array and as such will be freed there.
op->r = NULL;
for(int i = 0; i < op->recordCount; i++) OpBase_DeleteRecord(op->records[i]);
for(uint i = 0; i < op->recordCount; i++) OpBase_DeleteRecord(op->records[i]);
op->recordCount = 0;

if(op->edges) array_clear(op->edges);
if(op->setEdge) array_clear(op->edge_data.edges);
if(op->iter) {
GxB_MatrixTupleIter_free(op->iter);
op->iter = NULL;
Expand Down Expand Up @@ -269,23 +207,15 @@ static void CondTraverseFree(OpBase *ctx) {
op->M = GrB_NULL;
}

if(op->edges) {
array_free(op->edges);
op->edges = NULL;
}

if(op->ae) {
AlgebraicExpression_Free(op->ae);
op->ae = NULL;
}

if(op->edgeRelationTypes) {
array_free(op->edgeRelationTypes);
op->edgeRelationTypes = NULL;
}
if(op->setEdge) Traverse_FreeEdgeData(&op->edge_data);

if(op->records) {
for(int i = 0; i < op->recordCount; i++) OpBase_DeleteRecord(op->records[i]);
for(uint i = 0; i < op->recordCount; i++) OpBase_DeleteRecord(op->records[i]);
rm_free(op->records);
op->records = NULL;
}
Expand Down
19 changes: 8 additions & 11 deletions src/execution_plan/ops/op_conditional_traverse.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#pragma once

#include "op.h"
#include "shared/traverse_functions.h"
#include "../execution_plan.h"
#include "../../arithmetic/algebraic_expression.h"
#include "../../../deps/GraphBLAS/Include/GraphBLAS.h"
Expand All @@ -18,19 +19,15 @@ typedef struct {
AlgebraicExpression *ae;
GrB_Matrix F; // Filter matrix.
GrB_Matrix M; // Algebraic expression result.
int *edgeRelationTypes; // One or more relation types.
int edgeRelationCount; // length of edgeRelationTypes.
bool setEdge; // Edge needs to be set.
Edge *edges; // Discovered edges.
bool setEdge; // Edge needs to be set in the Record.
EdgeTraverseData edge_data; // Edge collection data if the edge needs to be set.
GxB_MatrixTupleIter *iter; // Iterator over M.
int srcNodeIdx; // Index into record.
int destNodeIdx; // Index into record.
int edgeIdx; // Index into record.
int recordsCap; // Max number of records to process.
int recordCount; // Number of records to process.
GRAPH_EDGE_DIR direction; // The direction of the referenced edge being traversed.
int srcNodeIdx; // Source node index into record.
int destNodeIdx; // Destination node index into record.
uint recordCount; // Number of held records.
uint recordsCap; // Max number of records to process.
Record *records; // Array of records.
Record r; // Current selected record.
Record r; // Currently selected record.
} CondTraverse;

/* Creates a new Traverse operation */
Expand Down
Loading

0 comments on commit d9171d9

Please sign in to comment.