Skip to content

Commit

Permalink
cp 2.12.3 (#3124)
Browse files Browse the repository at this point in the history
* Call {} partial improvements (#3123)

* place eager_types in op.h

* eager ops array replace

* fix match op construction

* fix Match op construction

* touchup

* refactor recursive call to a loop

* fix reduce-scans optimization to not look into call {}

* fix

* fix build_match_op_tree

* fix leak

* fix leak

* documentation for call {}

* touchup

* revert important fixes

* touchups

* add example

---------

Co-authored-by: Avi Avni <avi.avni@gmail.com>

* Call {} improvements (#3115)

* place eager_types in op.h

* eager ops array replace

* fix match op construction

* fix Match op construction

* touchup

* refactor recursive call to a loop

* fix reduce-scans optimization to not look into call {}

* fix

* fix build_match_op_tree

* fix leak

* fix leak

* documentation for call {}

* touchup

* remove connected components from exec-plan

* fix

* touchup

* fix leak

---------

Co-authored-by: Avi Avni <avi.avni@gmail.com>
  • Loading branch information
raz-mon and AviAvni committed Jun 28, 2023
1 parent 1ae86cd commit 522a8a9
Show file tree
Hide file tree
Showing 17 changed files with 286 additions and 78 deletions.
151 changes: 142 additions & 9 deletions docs/commands/graph.query.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ The syntax is based on [Cypher](http://www.opencypher.org/). [Most](https://redi
* [UNION](#union)
* [UNWIND](#unwind)
* [FOREACH](#foreach)
* [CALL {}](#call-{}})

#### MATCH

Expand Down Expand Up @@ -788,6 +789,24 @@ GRAPH.QUERY DEMO_GRAPH
"MATCH (u:User) WITH u AS nonrecent ORDER BY u.lastVisit LIMIT 3 SET nonrecent.should_contact = true"
```
#### UNION
The UNION clause is used to combine the result of multiple queries.
UNION combines the results of two or more queries into a single result set that includes all the rows that belong to all queries in the union.
The number and the names of the columns must be identical in all queries combined by using UNION.
To keep all the result rows, use UNION ALL.
Using just UNION will combine and remove duplicates from the result set.
```sh
GRAPH.QUERY DEMO_GRAPH
"MATCH (n:Actor) RETURN n.name AS name
UNION ALL
MATCH (n:Movie) RETURN n.title AS name"
```
#### UNWIND
The UNWIND clause breaks down a given list into a sequence of records; each contains a single element in the list.
Expand Down Expand Up @@ -835,23 +854,137 @@ GRAPH.QUERY DEMO_GRAPH
FOREACH(do_perform IN CASE WHEN b = NULL THEN [1] ELSE [] END | MERGE (h)-[b2:BUYS_FROM]->(s:SUPPLIER {supplies_bread: true}) SET b2.direct = false)"
```
#### UNION
The UNION clause is used to combine the result of multiple queries.
#### CALL {}
UNION combines the results of two or more queries into a single result set that includes all the rows that belong to all queries in the union.
(Since RedisGraph v2.12)
The number and the names of the columns must be identical in all queries combined by using UNION.
The CALL {} (subquery) clause allows local execution of subqueries, which opens the door for many comfortable and efficient actions on a graph.
To keep all the result rows, use UNION ALL.
The subquery is executed once for each record in the input stream.
Using just UNION will combine and remove duplicates from the result set.
The subquery may be a returning or non-returning subquery. A returning subquery may change the amount of records, while a non-returning subquery may not.
The variables in the scope before the CALL {} clause are available after the clause, together with the variables returned by the subquery (in the case of a returning subquery).
The bound variables may be imported **only** in an opening `WITH` clause, via simple projections (e.g. `WITH n, m`), or via `WITH *` (which imports all bound variables). Note that importing as little variables as possible will result in **higher performance**. Respectively, the returned variables may not override existing variables in the scope.
The CALL {} clause may be used for numerous purposes, such as: Post-`UNION` processing, local environment for aggregations and actions on every input row, efficient operations using a limited namespace (via imports) and performing side-effects using non-returning subqueries. Let's see some examples.
* Post-`UNION` processing.
We can easily get the cheapest and most expensive items in a store and set their `keep_surveillance` property to `true` (to keep monitoring the 'interesting' items) using post-`UNION` processing:
```sh
GRAPH.QUERY DEMO_GRAPH
CALL {
MATCH (s:Store {name: 'Walmart'})-[:SELLS]->(i:Item)
RETURN i AS item
ORDER BY price ASC
LIMIT 1
UNION
MATCH (s:Store {name: 'Walmart'})-[:SELLS]->(i:Item)
RETURN i AS item
ORDER BY price DESC
LIMIT 1
}
SET item.keep_surveillance = true
RETURN item.name AS name, item.price AS price
```
We can utilize post-`UNION` processing to perform aggregations over differently-matched entities. For example, we can count the number of customers and vendors that a store interacts with:
```sh
GRAPH.QUERY DEMO_GRAPH
CALL {
MATCH (s:Store {name: 'Walmart'})-[:SELLS_TO]->(c:Customer)
RETURN c AS interface
UNION
MATCH (s:Store {name: 'Walmart'})-[:BUYS_FROM]->(v:Vendor)
RETURN v AS interface
}
RETURN count(interface) AS interfaces
```
* Local environment for aggregations and actions on every input row.
Another key feature of the CALL {} clause is the ability to perform isolated aggregations on every input row. For example, we can count the amount of times every item was bought in a store:
```sh
GRAPH.QUERY DEMO_GRAPH
MATCH (item:Item)
CALL {
WITH item
MATCH (item)-[s:SOLD_TO]->(c:Customer)
RETURN count(s) AS item_sales
}
RETURN item.name AS name, item_sales AS sales
```
<!-- * Observe changes from previous executions (on previous records).
We can form useful structures and connections like linked-lists via the CALL {} clause. Let's form a linked-list of all items in a store, from the cheapest to the priciest:
```sh
MATCH (i:Item)
WITH i order BY i.price ASC LIMIT 1
SET i:HEAD
WITH i
MATCH (next_item:Item) WHERE NOT next_item:HEAD
WITH next_item ORDER BY next_item.price ASC
CALL {
WITH next_item
MATCH (curr_head:HEAD)
REMOVE curr_head:HEAD
SET next_item:HEAD
CREATE (curr_head)-[:IS_CHEAPER_THAN]->(next_item)
}
``` -->
* Efficient operations using a limited namespace (via imports).
Given a query holding a respectively large namespace (a lot of bound variables), we can perform a subquery on a sub-namespace, and by thus enhance performance significantly. Let's look at an example.
Without a CALL {} clause:
```sh
GRAPH.QUERY DEMO_GRAPH
"MATCH (n:Actor) RETURN n.name AS name
UNION ALL
MATCH (n:Movie) RETURN n.title AS name"
"MATCH (n:N), (m:M), (x:X), (y:Y), (z:Z), (e:E), (q:Q)
CALL {
WITH n
MATCH (temp:TEMP)
SET temp.v = n.v
}
RETURN n, m, x, y, z, e, q"
```
Runtime: 99 ms.
With a CALL {} clause:
```sh
GRAPH.QUERY DEMO_GRAPH
"MATCH (n:N), (m:M), (x:X), (y:Y), (z:Z), (e:E), (q:Q)
MATCH (temp:TEMP)
SET temp.v = n.v
RETURN n, m, x, y, z, e, q"
```
Runtime: 256 ms.
* Side-effects.
We can comfortably perform side-effects using non-returning subqueries. For example, we can mark a sub-group of nodes in the graph withholding some shared property. Let's mark all the items in a Walmart store that were sold more than 100 times as popular items, and return **all** items in the store:
```sh
GRAPH.QUERY DEMO_GRAPH
MATCH (item:Item)
CALL {
WITH item
MATCH (item)-[s:SOLD_TO]->(c:Customer)
WITH item, count(s) AS item_sales
WHERE item_sales > 100
SET item.popular = true
}
RETURN item
```
### Functions
Expand Down
2 changes: 2 additions & 0 deletions src/ast/ast_rewrite_call_subquery.c
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,8 @@ bool AST_RewriteCallSubquery
array_free_cb(inter_names, rm_free);

// add a `WITH *` clause before every call {} clause
// this is an optimization that decreases the mapping size of the plan up to
// the CallSubquery operation
rewritten |= _add_star_projections((cypher_astnode_t *)root);

return rewritten;
Expand Down
8 changes: 0 additions & 8 deletions src/execution_plan/execution_plan.c
Original file line number Diff line number Diff line change
Expand Up @@ -530,14 +530,6 @@ static void _ExecutionPlan_FreeInternals
) {
if(plan == NULL) return;

if(plan->connected_components) {
uint connected_component_count = array_len(plan->connected_components);
for(uint i = 0; i < connected_component_count; i ++) {
QueryGraph_Free(plan->connected_components[i]);
}
array_free(plan->connected_components);
}

if(plan->query_graph) {
QueryGraph_Free(plan->query_graph);
}
Expand Down
1 change: 0 additions & 1 deletion src/execution_plan/execution_plan.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ struct ExecutionPlan {
AST *ast_segment; // The segment which the current ExecutionPlan segment is built from.
rax *record_map; // Mapping between identifiers and record indices.
QueryGraph *query_graph; // QueryGraph representing all graph entities in this segment.
QueryGraph **connected_components; // Array of all connected components in this segment.
ObjectPool *record_pool;
bool prepared; // Indicates if the execution plan is ready for execute.
};
Expand Down
44 changes: 12 additions & 32 deletions src/execution_plan/execution_plan_build/build_match_op_tree.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ static void _ExecutionPlan_ProcessQueryGraph
// so that we can order traversals properly
FT_FilterNode *ft = AST_BuildFilterTree(ast);
QueryGraph **connectedComponents = QueryGraph_ConnectedComponents(qg);
plan->connected_components = connectedComponents;

// if we have already constructed any ops
// the plan's record map contains all variables bound at this time
uint connectedComponentsCount = array_len(connectedComponents);
Expand Down Expand Up @@ -155,6 +155,11 @@ static void _ExecutionPlan_ProcessQueryGraph
ExecutionPlan_UpdateRoot(plan, root);
}
}

for(uint i = 0; i < connectedComponentsCount; i++) {
QueryGraph_Free(connectedComponents[i]);
}
array_free(connectedComponents);
FilterTree_Free(ft);
}

Expand All @@ -169,7 +174,7 @@ static void _buildOptionalMatchOps(ExecutionPlan *plan, AST *ast, const cypher_a
bound_vars = raxNew();
// Rather than cloning the record map, collect the bound variables along with their
// parser-generated constant strings.
ExecutionPlan_BoundVariables(plan->root, bound_vars);
ExecutionPlan_BoundVariables(plan->root, bound_vars, plan);
// Collect the variable names from bound_vars to populate the Argument op we will build.
arguments = (const char **)raxValues(bound_vars);
raxFree(bound_vars);
Expand Down Expand Up @@ -200,46 +205,21 @@ void buildMatchOpTree(ExecutionPlan *plan, AST *ast, const cypher_astnode_t *cla
return;
}

// only add at most one set of traversals per plan
// TODO Revisit and improve this logic
if(plan->root && ExecutionPlan_LocateOpMatchingTypes(plan->root, SCAN_OPS, SCAN_OP_COUNT)) {
return;
}

//--------------------------------------------------------------------------
// Extract mandatory patterns
//--------------------------------------------------------------------------

uint mandatory_match_count = 0; // Number of mandatory patterns
const cypher_astnode_t **match_clauses = AST_GetClauses(ast, CYPHER_AST_MATCH);
uint match_clause_count = array_len(match_clauses);
const cypher_astnode_t *patterns[match_clause_count];
const cypher_astnode_t *mandatory_matches[match_clause_count];

for(uint i = 0; i < match_clause_count; i++) {
const cypher_astnode_t *match_clause = match_clauses[i];
if(cypher_ast_match_is_optional(match_clause)) continue;
mandatory_matches[mandatory_match_count] = match_clause;
patterns[mandatory_match_count] = cypher_ast_match_get_pattern(match_clause);
mandatory_match_count++;
}
const cypher_astnode_t *pattern = cypher_ast_match_get_pattern(clause);

// collect the QueryGraph entities referenced in the clauses being converted
QueryGraph *qg = plan->query_graph;
QueryGraph *sub_qg = QueryGraph_ExtractPatterns(qg, patterns,
mandatory_match_count);
QueryGraph *sub_qg =
QueryGraph_ExtractPatterns(plan->query_graph, &pattern, 1);

_ExecutionPlan_ProcessQueryGraph(plan, sub_qg, ast);
if(ErrorCtx_EncounteredError()) goto cleanup;

// Build the FilterTree to model any WHERE predicates on these clauses and place ops appropriately.
FT_FilterNode *sub_ft = AST_BuildFilterTreeFromClauses(ast, mandatory_matches,
mandatory_match_count);
FT_FilterNode *sub_ft = AST_BuildFilterTreeFromClauses(ast,
&clause, 1);
ExecutionPlan_PlaceFilterOps(plan, plan->root, NULL, sub_ft);

// Clean up
cleanup:
QueryGraph_Free(sub_qg);
array_free(match_clauses);
}

Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ void buildMergeOp(ExecutionPlan *plan, AST *ast, const cypher_astnode_t *clause,
bound_vars = raxNew();
// Rather than cloning the record map, collect the bound variables along with their
// parser-generated constant strings.
ExecutionPlan_BoundVariables(plan->root, bound_vars);
ExecutionPlan_BoundVariables(plan->root, bound_vars, plan);
// Collect the variable names from bound_vars to populate the Argument ops we will build.
arguments = (const char **)raxValues(bound_vars);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ void buildPatternComprehensionOps
if(root->childCount > 0) {
// get the bound variable to use when building the traversal ops
rax *bound_vars = raxNew();
ExecutionPlan_BoundVariables(root->children[0], bound_vars);
ExecutionPlan_BoundVariables(root->children[0], bound_vars,
root->children[0]->plan);
arguments = (const char **)raxValues(bound_vars);
raxFree(bound_vars);
}
Expand Down Expand Up @@ -187,7 +188,8 @@ void buildPatternPathOps
if(root->childCount > 0) {
// get the bound variable to use when building the traversal ops
rax *bound_vars = raxNew();
ExecutionPlan_BoundVariables(root->children[0], bound_vars);
ExecutionPlan_BoundVariables(root->children[0], bound_vars,
root->children[0]->plan);
arguments = (const char **)raxValues(bound_vars);
raxFree(bound_vars);
}
Expand Down
14 changes: 6 additions & 8 deletions src/execution_plan/execution_plan_build/execution_plan_util.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,7 @@ bool ExecutionPlan_isEager
(
OpBase *root
) {
OPType eager_types[] = {OPType_CREATE, OPType_UPDATE, OPType_FOREACH,
OPType_MERGE, OPType_SORT, OPType_AGGREGATE};

return ExecutionPlan_LocateOpMatchingTypes(root, eager_types, 6) != NULL;
return ExecutionPlan_LocateOpMatchingTypes(root, EAGER_OPERATIONS, 6) != NULL;
}

OpBase *ExecutionPlan_LocateOpResolvingAlias
Expand Down Expand Up @@ -166,7 +163,7 @@ OpBase *ExecutionPlan_LocateReferencesExcludingOps
// If we've reached a blacklisted op, all variables in its subtree are
// considered to be modified by it, as we can't recurse farther.
rax *bound_vars = raxNew();
ExecutionPlan_BoundVariables(root, bound_vars);
ExecutionPlan_BoundVariables(root, bound_vars, root->plan);
modifies = (char **)raxKeys(bound_vars);
raxFree(bound_vars);
} else {
Expand Down Expand Up @@ -269,10 +266,11 @@ uint ExecutionPlan_CollectUpwards
void ExecutionPlan_BoundVariables
(
const OpBase *op,
rax *modifiers
rax *modifiers,
const ExecutionPlan *plan
) {
ASSERT(op != NULL && modifiers != NULL);
if(op->modifies) {
if(op->modifies && op->plan == plan) {
uint modifies_count = array_len(op->modifies);
for(uint i = 0; i < modifies_count; i++) {
const char *modified = op->modifies[i];
Expand All @@ -287,6 +285,6 @@ void ExecutionPlan_BoundVariables
if(op->type == OPType_PROJECT || op->type == OPType_AGGREGATE) return;

for(int i = 0; i < op->childCount; i++) {
ExecutionPlan_BoundVariables(op->children[i], modifiers);
ExecutionPlan_BoundVariables(op->children[i], modifiers, plan);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,5 +129,6 @@ uint ExecutionPlan_CollectUpwards
void ExecutionPlan_BoundVariables
(
const OpBase *op,
rax *modifiers
rax *modifiers,
const ExecutionPlan *plan
);
3 changes: 2 additions & 1 deletion src/execution_plan/execution_plan_build/reduce_to_apply.c
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ static OpBase *_ReduceFilterToOp(ExecutionPlan *plan, const char **vars,
void ExecutionPlan_ReduceFilterToApply(ExecutionPlan *plan, OpFilter *filter) {
// Collect all variables that are bound at this position in the op tree.
rax *bound_vars = raxNew();
ExecutionPlan_BoundVariables((OpBase *)filter, bound_vars);
ExecutionPlan_BoundVariables((OpBase *)filter, bound_vars,
((OpBase *)filter)->plan);
// Collect the variable names from bound_vars to populate the Argument ops we will build.
const char **vars = (const char **)raxValues(bound_vars);

Expand Down
4 changes: 0 additions & 4 deletions src/execution_plan/execution_plan_clone.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@ static ExecutionPlan *_ClonePlanInternals(const ExecutionPlan *template) {
QueryGraph_ResolveUnknownRelIDs(template->query_graph);
clone->query_graph = QueryGraph_Clone(template->query_graph);
}
// TODO improve QueryGraph logic so that we do not need to store or clone connected_components.
if(template->connected_components) {
array_clone_with_cb(clone->connected_components, template->connected_components, QueryGraph_Clone);
}

return clone;
}
Expand Down
Loading

0 comments on commit 522a8a9

Please sign in to comment.