Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Plan clone refactor #1278

Merged
merged 7 commits into from Aug 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
98 changes: 42 additions & 56 deletions src/execution_plan/execution_plan.c
Expand Up @@ -718,13 +718,13 @@ static ExecutionPlan *_ExecutionPlan_UnionPlans(AST *ast) {

/* Placeholder for each execution plan, these all will be joined
* via a single UNION operation. */
ExecutionPlan **plans = array_new(ExecutionPlan *, union_count);
ExecutionPlan *plans[union_count];

for(int i = 0; i < union_count; i++) {
// Create an AST segment from which we will build an execution plan.
end_offset = union_indices[i];
AST *ast_segment = AST_NewSegment(ast, start_offset, end_offset);
plans = array_append(plans, NewExecutionPlan());
plans[i] = NewExecutionPlan();
AST_Free(ast_segment); // Free the AST segment.

// Next segment starts where this one ends.
Expand All @@ -745,9 +745,7 @@ static ExecutionPlan *_ExecutionPlan_UnionPlans(AST *ast) {
* right stream: [Scan]->[Project]
* [Union]->[Distinct]->[Result] */
ExecutionPlan *plan = ExecutionPlan_NewEmptyExecutionPlan();
plan->segments = plans;
plan->record_map = raxNew();
plan->is_union = true;

OpBase *results_op = NewResultsOp(plan);
OpBase *parent = results_op;
Expand Down Expand Up @@ -825,7 +823,7 @@ ExecutionPlan *NewExecutionPlan(void) {
segment_indices = array_append(segment_indices, clause_count);

uint segment_count = array_len(segment_indices);
ExecutionPlan **segments = array_new(ExecutionPlan *, segment_count);
ExecutionPlan *segments[segment_count];
AST *ast_segments[segment_count];
start_offset = 0;
for(int i = 0; i < segment_count; i++) {
Expand All @@ -837,7 +835,7 @@ ExecutionPlan *NewExecutionPlan(void) {
ExecutionPlan *segment = ExecutionPlan_NewEmptyExecutionPlan();
ExecutionPlan_PopulateExecutionPlan(segment);
segment->ast_segment = ast_segment;
segments = array_append(segments, segment);
segments[i] = segment;
start_offset = end_offset;
}

Expand Down Expand Up @@ -873,15 +871,13 @@ ExecutionPlan *NewExecutionPlan(void) {

array_free(segment_indices);

ExecutionPlan *plan = array_pop(segments);
ExecutionPlan *plan = segments[segment_count - 1];
// The root operation is OpResults only if the query culminates in a RETURN or CALL clause.
if(query_has_return || last_clause_type == CYPHER_AST_CALL) {
OpBase *results_op = NewResultsOp(plan);
_ExecutionPlan_UpdateRoot(plan, results_op);
}

plan->segments = segments;

return plan;
}

Expand Down Expand Up @@ -1063,27 +1059,24 @@ ResultSet *ExecutionPlan_Profile(ExecutionPlan *plan) {
return rs;
}

static void _ExecutionPlan_FreeOperations(OpBase *op) {
for(int i = 0; i < op->childCount; i++) {
_ExecutionPlan_FreeOperations(op->children[i]);
}
OpBase_Free(op);
void ExecutionPlan_IncreaseRefCount(ExecutionPlan *plan) {
ASSERT(plan);
__atomic_fetch_add(&plan->ref_count, 1, __ATOMIC_RELAXED);
}

static void _ExecutionPlan_FreeSubPlan(ExecutionPlan *plan) {
if(plan == NULL) return;
int ExecutionPlan_DecRefCount(ExecutionPlan *plan) {
ASSERT(plan);
return __atomic_sub_fetch(&plan->ref_count, 1, __ATOMIC_RELAXED);
}

if(plan->segments) {
uint segment_count = array_len(plan->segments);
for(int i = 0; i < segment_count; i++) _ExecutionPlan_FreeSubPlan(plan->segments[i]);
array_free(plan->segments);
}

static void _ExecutionPlan_FreeInternals(ExecutionPlan *plan) {
if(plan == NULL) return;

if(plan->connected_components) {
uint connected_component_count = array_len(plan->connected_components);
for(uint i = 0; i < connected_component_count; i ++) QueryGraph_Free(plan->connected_components[i]);
array_free(plan->connected_components);
plan->connected_components = NULL;
}

QueryGraph_Free(plan->query_graph);
Expand All @@ -1093,46 +1086,39 @@ static void _ExecutionPlan_FreeSubPlan(ExecutionPlan *plan) {
rm_free(plan);
}

void ExecutionPlan_IncreaseRefCount(ExecutionPlan *plan) {
ASSERT(plan);
__atomic_fetch_add(&plan->ref_count, 1, __ATOMIC_RELAXED);
}
// Free an op tree and its associated ExecutionPlan segments.
static ExecutionPlan *_ExecutionPlan_FreeOpTree(OpBase *op) {
if(op == NULL) return NULL;
ExecutionPlan *child_plan = NULL;
ExecutionPlan *prev_child_plan = NULL;
// Store a reference to the current plan.
ExecutionPlan *current_plan = (ExecutionPlan *)op->plan;
for(uint i = 0; i < op->childCount; i ++) {
child_plan = _ExecutionPlan_FreeOpTree(op->children[i]);
// In most cases all children will share the same plan, but if they don't
// (for an operation like UNION) then free the now-obsolete previous child plan.
if(prev_child_plan != child_plan) {
_ExecutionPlan_FreeInternals(prev_child_plan);
prev_child_plan = child_plan;
}
}

int ExecutionPlan_DecRefCount(ExecutionPlan *plan) {
ASSERT(plan);
return __atomic_sub_fetch(&plan->ref_count, 1, __ATOMIC_RELAXED);
}
// Free this op.
OpBase_Free(op);

// Free each ExecutionPlan segment once all ops associated with it have been freed.
if(current_plan != child_plan) _ExecutionPlan_FreeInternals(child_plan);

return current_plan;
}
void ExecutionPlan_Free(ExecutionPlan *plan) {
if(plan == NULL) return;
if(ExecutionPlan_DecRefCount(plan) >= 0) return;

if(plan->root) {
_ExecutionPlan_FreeOperations(plan->root);
plan->root = NULL;
}
// Free all ops and ExecutionPlan segments.
_ExecutionPlan_FreeOpTree(plan->root);

/* All segments but the last should have everything but
* their operation chain freed.
* The last segment is the actual plan passed as an argument to this function.
* TODO this logic isn't ideal, try to improve. */
if(plan->segments) {
uint segment_count = array_len(plan->segments);
for(int i = 0; i < segment_count; i++) _ExecutionPlan_FreeSubPlan(plan->segments[i]);
array_free(plan->segments);
}

if(plan->connected_components) {
uint connected_component_count = array_len(plan->connected_components);
for(uint i = 0; i < connected_component_count; i ++) QueryGraph_Free(plan->connected_components[i]);
array_free(plan->connected_components);
plan->connected_components = NULL;
}

QueryGraph_Free(plan->query_graph);
if(plan->record_map) raxFree(plan->record_map);
if(plan->record_pool) ObjectPool_Free(plan->record_pool);
if(plan->ast_segment) AST_Free(plan->ast_segment);
rm_free(plan);
// Free the final ExecutionPlan segment.
_ExecutionPlan_FreeInternals(plan);
}

2 changes: 0 additions & 2 deletions src/execution_plan/execution_plan.h
Expand Up @@ -21,10 +21,8 @@ struct ExecutionPlan {
QueryGraph *query_graph; // QueryGraph representing all graph entities in this segment.
FT_FilterNode *filter_tree; // FilterTree containing filters to be applied to this segment.
QueryGraph **connected_components; // Array of all connected components in this segment.
ExecutionPlan **segments; // Partial execution plans scoped to a subset of operations.
ObjectPool *record_pool;
bool prepared; // Indicates if the execution plan is ready for execute.
bool is_union; // Indicates if the execution plan is a union of execution plans.
int ref_count; // Number of active references.
};

Expand Down
124 changes: 51 additions & 73 deletions src/execution_plan/execution_plan_clone.c
Expand Up @@ -5,95 +5,73 @@
*/

#include "execution_plan_clone.h"
#include "../RG.h"
#include "../query_ctx.h"
#include "../util/rax_extensions.h"

// Clones an execution plan operations, with respect to the original execution plan segment.
static OpBase *_ExecutionPlan_CloneOperations(const ExecutionPlan *orig_plan,
ExecutionPlan *clone_plan, const OpBase *op) {
// If there is no op, or the op is a part of a different segment, return NULL.
if(!op || op->plan != orig_plan) return NULL;
static ExecutionPlan *_ClonePlanInternals(const ExecutionPlan *template) {
ExecutionPlan *clone = ExecutionPlan_NewEmptyExecutionPlan();

// Clone the op.
OpBase *clone = OpBase_Clone(clone_plan, op);
// Clone the op's children and add them the cloned children array.
for(uint i = 0; i < op->childCount; i++) {
OpBase *cloned_child = _ExecutionPlan_CloneOperations(orig_plan, clone_plan,
op->children[i]);
// Assumption: all the children are either in the same segment or on a different segment.
if(!cloned_child) break;
ExecutionPlan_AddOp(clone, cloned_child);
clone->record_map = raxClone(template->record_map);
if(template->ast_segment) clone->ast_segment = AST_ShallowCopy(template->ast_segment);
if(template->query_graph) clone->query_graph = QueryGraph_Clone(template->query_graph);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can avoid both query_graph and connected_components if we introduce ref count to the QueryGraph object.

// TODO improve QueryGraph logic so that we do not need to store or clone connected_components.
if(template->connected_components) {
array_clone_with_cb(clone->connected_components, template->connected_components, QueryGraph_Clone);
}

// Temporarily set the thread-local AST to be the one referenced by this ExecutionPlan segment.
QueryCtx_SetAST(clone->ast_segment);

return clone;
}

// Merge cloned execution plan segments, with respect to the plan type (union or not).
static void _ExecutionPlan_MergeSegments(ExecutionPlan *plan) {
// No need to merge segments if there aren't any.
uint segment_count = array_len(plan->segments);
if(segment_count == 0) return;

if(plan->is_union) {
// Locate the join operation.
OpBase *join_op = ExecutionPlan_LocateOp(plan->root, OPType_JOIN);
assert(join_op);
// Each segment is a sub execution plan that needs to be joined.
for(int i = 0; i < segment_count; i++) {
ExecutionPlan *sub_plan = plan->segments[i];
ExecutionPlan_AddOp(join_op, sub_plan->root);
}
static OpBase *_CloneOpTree(OpBase *template_parent, OpBase *template_current,
OpBase *clone_parent) {
const ExecutionPlan *plan_segment;
if(!template_parent || (template_current->plan != template_parent->plan)) {
/* If this is the first operation or it was built using a different ExecutionPlan
* segment than its parent, clone the ExecutionPlan segment. */
plan_segment = _ClonePlanInternals(template_current->plan);
} else {
array_append(plan->segments, plan);
segment_count = array_len(plan->segments);
// Plan is not union, concatenate the segments.
OpBase *connecting_op;
// segments[0] is the first segment of the execution.
OpBase *prev_root = plan->segments[0]->root;
for(uint i = 1; i < segment_count; i++) {
ExecutionPlan *current_segment = plan->segments[i];
connecting_op = ExecutionPlan_LocateOpMatchingType(current_segment->root, PROJECT_OPS,
PROJECT_OP_COUNT);
assert(connecting_op->childCount == 0);
ExecutionPlan_AddOp(connecting_op, prev_root);
prev_root = current_segment->root;
}
array_pop(plan->segments);
// This op was built as part of the same segment as its parent, don't change ExecutionPlans.
plan_segment = clone_parent->plan;
}
}

/* This function clones execution plan by cloning each segment in the execution plan as a unit.
* Each segment has its own filter tree, record mapping, query graphs and ast segment, that compose
* a single logical execution unit, together with the segment operations.
* The ast segment is shallow copied while all the other objects are deep cloned.
*/
ExecutionPlan *ExecutionPlan_Clone(const ExecutionPlan *template) {
if(template == NULL) return NULL;
// Verify that the execution plan template is not prepared yet.
assert(template->prepared == false && "Execution plan cloning should be only on templates");
// Allocate an empty execution plan.
ExecutionPlan *clone = ExecutionPlan_NewEmptyExecutionPlan();
// Clone the current operation.
OpBase *clone_current = OpBase_Clone(plan_segment, template_current);

clone->is_union = template->is_union;
clone->record_map = raxClone(template->record_map);
if(template->ast_segment) clone->ast_segment = AST_ShallowCopy(template->ast_segment);
if(template->query_graph) clone->query_graph = QueryGraph_Clone(template->query_graph);
if(template->connected_components) {
array_clone_with_cb(clone->connected_components, template->connected_components, QueryGraph_Clone);
for(uint i = 0; i < template_current->childCount; i++) {
// Recursively visit and clone the op's children.
OpBase *child_op = _CloneOpTree(template_current, template_current->children[i], clone_current);
ExecutionPlan_AddOp(clone_current, child_op);
}

// The execution plan segment clone requires the specific AST segment for referenced entities.
return clone_current;
}

static ExecutionPlan *_ExecutionPlan_Clone(const ExecutionPlan *template) {
OpBase *clone_root = _CloneOpTree(NULL, template->root, NULL);
// The "master" execution plan is the one constructed with the root op.
ExecutionPlan *clone = (ExecutionPlan *)clone_root->plan;
// The root op is currently NULL; set it now.
clone->root = clone_root;
Copy link
Collaborator

@swilly22 swilly22 Aug 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't clone_root->plan->root is clone_root ?
in which case we can return clone ?
Redundant.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This turns out to not be redundant, the root pointer is never touched by the cloning or the New op routines.


return clone;
}

/* This function clones the input ExecutionPlan by recursively visiting its tree of ops.
* When an op is encountered that was constructed as part of a different ExecutionPlan segment, that segment
* and its internal members (FilterTree, record mapping, query graphs, and AST segment) are also cloned. */
ExecutionPlan *ExecutionPlan_Clone(const ExecutionPlan *template) {
ASSERT(template != NULL);
// Store the original AST pointer.
AST *master_ast = QueryCtx_GetAST();
QueryCtx_SetAST(clone->ast_segment);
// Clone each operation in the template relevant segment.
clone->root = _ExecutionPlan_CloneOperations(template, clone, template->root);
// After clone, restore master ast.
// Verify that the execution plan template is not prepared yet.
ASSERT(template->prepared == false && "Execution plan cloning should be only on templates");
ExecutionPlan *clone = _ExecutionPlan_Clone(template);
// Restore the original AST pointer.
QueryCtx_SetAST(master_ast);

if(template->segments) {
array_clone_with_cb(clone->segments, template->segments, ExecutionPlan_Clone);
}
// Merge the segments
_ExecutionPlan_MergeSegments(clone);
return clone;
}

1 change: 1 addition & 0 deletions src/execution_plan/execution_plan_modify.c
Expand Up @@ -107,6 +107,7 @@ void ExecutionPlan_ReplaceOp(ExecutionPlan *plan, OpBase *a, OpBase *b) {

void ExecutionPlan_RemoveOp(ExecutionPlan *plan, OpBase *op) {
if(op->parent == NULL) {
ExecutionPlan *plan = (ExecutionPlan *)op->plan;
// Removing execution plan root.
assert(op->childCount == 1);
// Assign child as new root.
Expand Down
11 changes: 1 addition & 10 deletions src/execution_plan/optimizations/optimizer.c
Expand Up @@ -8,7 +8,7 @@
#include "./optimizations.h"
#include "../../query_ctx.h"

void _optimizePlan(ExecutionPlan *plan) {
void optimizePlan(ExecutionPlan *plan) {
// Tries to compact filter trees, and remove redundant filters.
compactFilters(plan);

Expand Down Expand Up @@ -48,12 +48,3 @@ void _optimizePlan(ExecutionPlan *plan) {
reduceCount(plan);
}

void optimizePlan(ExecutionPlan *plan) {
/* Handle UNION of execution plans. */
if(plan->is_union) {
uint segment_count = array_len(plan->segments);
for(uint i = 0; i < segment_count; i++) _optimizePlan(plan->segments[i]);
} else {
_optimizePlan(plan);
}
}
17 changes: 12 additions & 5 deletions src/execution_plan/optimizations/reduce_count.c
Expand Up @@ -112,10 +112,13 @@ bool _reduceNodeCount(ExecutionPlan *plan) {
OpBase *opProject = NewProjectOp(opAggregate->op.plan, exps);

// New execution plan: "Project -> Results"
ExecutionPlan_RemoveOp(plan, (OpBase *)opScan);
ExecutionPlan *disconnected_plan = (ExecutionPlan *)opScan->plan;
ExecutionPlan_RemoveOp(disconnected_plan, opScan);
OpBase_Free(opScan);
// The plan segment that the scan and traverse op had been built with is now disconnected and should be freed.
ExecutionPlan_Free(disconnected_plan);

ExecutionPlan_RemoveOp(plan, (OpBase *)opAggregate);
ExecutionPlan_RemoveOp(disconnected_plan, (OpBase *)opAggregate);
OpBase_Free((OpBase *)opAggregate);

ExecutionPlan_AddOp((OpBase *)opResult, opProject);
Expand Down Expand Up @@ -246,13 +249,17 @@ void _reduceEdgeCount(ExecutionPlan *plan) {
OpBase *opProject = NewProjectOp(opAggregate->op.plan, exps);

// New execution plan: "Project -> Results"
ExecutionPlan_RemoveOp(plan, (OpBase *)opScan);
ExecutionPlan *disconnected_plan = (ExecutionPlan *)opScan->plan;
ExecutionPlan_RemoveOp(disconnected_plan, opScan);
OpBase_Free(opScan);

ExecutionPlan_RemoveOp(plan, (OpBase *)opTraverse);
ExecutionPlan_RemoveOp(disconnected_plan, (OpBase *)opTraverse);
OpBase_Free(opTraverse);

ExecutionPlan_RemoveOp(plan, (OpBase *)opAggregate);
// The plan segment that the scan and traverse op had been built with is now disconnected and should be freed.
ExecutionPlan_Free(disconnected_plan);

ExecutionPlan_RemoveOp(disconnected_plan, (OpBase *)opAggregate);
OpBase_Free((OpBase *)opAggregate);

ExecutionPlan_AddOp((OpBase *)opResult, opProject);
Expand Down