Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into plan-clone-refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffreylovitz committed Aug 13, 2020
2 parents c6f65c4 + de1892c commit 1ac1230
Show file tree
Hide file tree
Showing 36 changed files with 2,826 additions and 94 deletions.
23 changes: 19 additions & 4 deletions docs/commands.md
Expand Up @@ -580,9 +580,11 @@ This section contains information on all supported functions from the Cypher que
## Predicate functions
|Function | Description|
| ------- |:-----------|
|exists() | Returns true if the specified property exists in the node or relationship. |
| Function | Description |
| ------- | :----------- |
| exists() | Returns true if the specified property exists in the node or relationship. |
| [any()](#existential-comprehension-functions) | Returns true if the inner WHERE predicate holds true for any element in the input array. |
| [all()](#existential-comprehension-functions) | Returns true if the inner WHERE predicate holds true for all elements in the input array. |
## Scalar functions
Expand All @@ -592,7 +594,7 @@ This section contains information on all supported functions from the Cypher que
| labels() | Returns a string representation of the label of a node. |
| timestamp() | Returns the the amount of milliseconds since epoch. |
| type() | Returns a string representation of the type of a relation. |
| list comprehensions | [See documentation](#list-comprehensions.md) |
| list comprehensions | [See documentation](#list-comprehensions) |
## Aggregating functions
Expand Down Expand Up @@ -676,6 +678,19 @@ The following query collects all paths of any length, then for each produces an
MATCH p=()-[*]->() RETURN [node IN nodes(p) WHERE node.rank > 10 | node.name]
```
#### Existential comprehension functions
The functions `any()` and `all()` use a simplified form of the list comprehension syntax and return a boolean value.
```sh
any(element IN array WHERE condition)
```
They can operate on any form of input array, but are particularly useful for path filtering. The following query collects all paths of any length in which all traversed edges have a weight less than 3:
```sh
MATCH p=()-[*]->() WHERE all(edge IN relationships(p) WHERE edge.weight < 3) RETURN p
```
## Procedures
Procedures are invoked using the syntax:
```sh
Expand Down
18 changes: 18 additions & 0 deletions docs/configuration.md
Expand Up @@ -65,3 +65,21 @@ If enabled, RedisGraph will maintain transposed copies of relationship matrices.
```
$ redis-server --loadmodule ./redisgraph.so MAINTAIN_TRANSPOSED_MATRICES no
```

# Query Configurations

Some configurations may be set per query in the form of additional arguments after the query string. All per-query configurations are off by default unless using a language-specific client, which may establish its own defaults.

## Query Timeout

The query flag `timeout` allows the user to specify the maximum runtime allowed for a query in milliseconds. This configuration can only be set for read queries to avoid leaving the graph in an inconsistent state.

`timeout` may still return partial results followed by an error message indicating the timeout.

### Example

Retrieve all paths in a graph with a timeout of 1000 milliseconds.

```
GRAPH.QUERY wikipedia "MATCH p=()-[*]->() RETURN p" timeout 1000
```
2 changes: 2 additions & 0 deletions docs/cypher_support.md
Expand Up @@ -138,6 +138,8 @@ We do not support any of these properties at the type level, meaning nodes and r

### Predicate functions
+ exists
+ any
+ all

### Expression functions
+ case...when
Expand Down
138 changes: 113 additions & 25 deletions src/arithmetic/comprehension_funcs/comprehension_funcs.c
Expand Up @@ -14,13 +14,13 @@
#include "../../util/rax_extensions.h"
#include "../../execution_plan/record.h"

// Routine for freeing a list comprehension's private data.
// Routine for freeing a comprehension function's private data.
void ListComprehension_Free(void *ctx_ptr) {
ListComprehensionCtx *ctx = ctx_ptr;

// If this list comprehension has a filter tree, free it.
// If this comprehension has a filter tree, free it.
if(ctx->ft) FilterTree_Free(ctx->ft);
// If this list comprehension has an eval routine, free it.
// If this comprehension has an eval routine, free it.
if(ctx->eval_exp) AR_EXP_Free(ctx->eval_exp);

if(ctx->local_record) {
Expand All @@ -32,7 +32,7 @@ void ListComprehension_Free(void *ctx_ptr) {
rm_free(ctx);
}

// Routine for cloning a list comprehension's private data.
// Routine for cloning a comprehension function's private data.
void *ListComprehension_Clone(void *orig) {
ListComprehensionCtx *ctx = orig;
// Allocate space for the clone.
Expand All @@ -43,46 +43,114 @@ void *ListComprehension_Clone(void *orig) {
ctx_clone->variable_idx = ctx->variable_idx;
ctx_clone->local_record = NULL;

// Clone the predicate filter tree.
// Clone the predicate filter tree, if present.
ctx_clone->ft = FilterTree_Clone(ctx->ft);
// Clone the eval routine.
// Clone the eval routine, if present.
ctx_clone->eval_exp = AR_EXP_Clone(ctx->eval_exp);

return ctx_clone;
}

SIValue AR_LIST_COMPREHENSION(SIValue *argv, int argc) {
static void _PopulateComprehensionCtx(ListComprehensionCtx *ctx, Record outer_record) {
rax *local_record_map = raxClone(outer_record->mapping);
intptr_t id = raxSize(local_record_map);
int rc = raxTryInsert(local_record_map, (unsigned char *)ctx->variable_str,
strlen(ctx->variable_str), (void *)id, NULL);
if(rc == 0) {
// The local variable's name shadows an outer variable, emit an error.
QueryCtx_SetError("Variable '%s' redefined inside of list comprehension");
QueryCtx_RaiseRuntimeException();
}

ctx->local_record = Record_New(local_record_map);

// This could just be assigned to 'id', but for safety we'll use a Record lookup.
ctx->variable_idx = Record_GetEntryIdx(ctx->local_record, ctx->variable_str);
ASSERT(ctx->variable_idx != INVALID_INDEX);
}


SIValue AR_ANY(SIValue *argv, int argc) {
if(SI_TYPE(argv[0]) == T_NULL) return SI_NullVal();
/* List comprehensions are invoked with three children:
/* ANY comprehensions are invoked with three children:
* The list to iterate over.
* The current Record.
* The function context. */
SIValue list = argv[0];
Record outer_record = argv[1].ptrval;
ListComprehensionCtx *ctx = argv[2].ptrval;

// On the first invocation, build the local Record.
if(ctx->local_record == NULL) _PopulateComprehensionCtx(ctx, outer_record);
Record r = ctx->local_record;
if(r == NULL) {
// On the first invocation, build the local Record.
rax *local_record_map = raxClone(outer_record->mapping);
intptr_t id = raxSize(local_record_map);
int rc = raxTryInsert(local_record_map, (unsigned char *)ctx->variable_str,
strlen(ctx->variable_str), (void *)id, NULL);
if(rc == 0) {
// The local variable's name shadows an outer variable, emit an error.
QueryCtx_SetError("Variable '%s' redefined inside of list comprehension");
QueryCtx_RaiseRuntimeException();
return SI_NullVal();
}

r = Record_New(local_record_map);
ctx->local_record = r;
// Populate the local Record with the contents of the outer Record.
Record_Clone(outer_record, r);

uint len = SIArray_Length(list);
for(uint i = 0; i < len; i++) {
// Retrieve the current element.
SIValue current_elem = SIArray_Get(list, i);
// Add the current element to the record at its allocated position.
Record_AddScalar(r, ctx->variable_idx, current_elem);

// If any element in an ANY function passes the predicate, return true.
if(FilterTree_applyFilters(ctx->ft, r)) return SI_BoolVal(true);

// This could just be assigned to 'id', but for safety we'll use a Record lookup.
ctx->variable_idx = Record_GetEntryIdx(r, ctx->variable_str);
ASSERT(ctx->variable_idx != INVALID_INDEX);
}

// No element passed, return false.
return SI_BoolVal(false);
}

SIValue AR_ALL(SIValue *argv, int argc) {
if(SI_TYPE(argv[0]) == T_NULL) return SI_NullVal();
/* ALL comprehensions are invoked with three children:
* The list to iterate over.
* The current Record.
* The function context. */
SIValue list = argv[0];
Record outer_record = argv[1].ptrval;
ListComprehensionCtx *ctx = argv[2].ptrval;

// On the first invocation, build the local Record.
if(ctx->local_record == NULL) _PopulateComprehensionCtx(ctx, outer_record);
Record r = ctx->local_record;

// Populate the local Record with the contents of the outer Record.
Record_Clone(outer_record, r);

uint len = SIArray_Length(list);
for(uint i = 0; i < len; i++) {
// Retrieve the current element.
SIValue current_elem = SIArray_Get(list, i);
// Add the current element to the record at its allocated position.
Record_AddScalar(r, ctx->variable_idx, current_elem);

// If any element in an ALL function does not pass the predicate, return false.
if(!FilterTree_applyFilters(ctx->ft, r)) return SI_BoolVal(false);

}

// All elements passed, return true.
return SI_BoolVal(true);
}


SIValue AR_LIST_COMPREHENSION(SIValue *argv, int argc) {
if(SI_TYPE(argv[0]) == T_NULL) return SI_NullVal();
/* List comprehensions are invoked with three children:
* The list to iterate over.
* The current Record.
* The function context. */
SIValue list = argv[0];
Record outer_record = argv[1].ptrval;
ListComprehensionCtx *ctx = argv[2].ptrval;

// On the first invocation, build the local Record.
if(ctx->local_record == NULL) _PopulateComprehensionCtx(ctx, outer_record);
Record r = ctx->local_record;

// Populate the local Record with the contents of the outer Record.
Record_Clone(outer_record, r);
// Instantiate the array to be returned.
Expand Down Expand Up @@ -117,6 +185,26 @@ void Register_ComprehensionFuncs() {
SIType *types;
AR_FuncDesc *func_desc;

types = array_new(SIType, 3);
types = array_append(types, T_ARRAY | T_NULL);
types = array_append(types, T_PTR);
types = array_append(types, T_PTR);
func_desc = AR_FuncDescNew("any", AR_ANY, 3, 3, types, true);
AR_SetPrivateDataRoutines(func_desc, ListComprehension_Free, ListComprehension_Clone);
AR_RegFunc(func_desc);
func_desc->bfree = ListComprehension_Free;
func_desc->bclone = ListComprehension_Clone;

types = array_new(SIType, 3);
types = array_append(types, T_ARRAY | T_NULL);
types = array_append(types, T_PTR);
types = array_append(types, T_PTR);
func_desc = AR_FuncDescNew("all", AR_ALL, 3, 3, types, true);
AR_SetPrivateDataRoutines(func_desc, ListComprehension_Free, ListComprehension_Clone);
AR_RegFunc(func_desc);
func_desc->bfree = ListComprehension_Free;
func_desc->bclone = ListComprehension_Clone;

types = array_new(SIType, 3);
types = array_append(types, T_ARRAY | T_NULL);
types = array_append(types, T_PTR);
Expand Down
31 changes: 23 additions & 8 deletions src/ast/ast_build_ar_exp.c
Expand Up @@ -371,10 +371,13 @@ static AR_ExpNode *_AR_ExpNodeFromParameter(const cypher_astnode_t *param) {
return AR_EXP_NewParameterOperandNode(identifier);
}

static AR_ExpNode *_AR_ExpNodeFromListComprehension(const cypher_astnode_t *comp_exp) {
const char *func_name = "LIST_COMPREHENSION";
// Build an operation node to represent the list comprehension.
AR_ExpNode *op = AR_EXP_NewOpNode(func_name, 2);
static AR_ExpNode *_AR_ExpNodeFromComprehensionFunction(const cypher_astnode_t *comp_exp,
cypher_astnode_type_t type) {
// Set the appropriate function name according to the node type.
const char *func_name;
if(type == CYPHER_AST_ANY) func_name = "ANY";
else if(type == CYPHER_AST_ALL) func_name = "ALL";
else func_name = "LIST_COMPREHENSION";

/* Using the sample query:
* WITH [1,2,3] AS arr RETURN [val IN arr WHERE val % 2 = 1 | val * 2] AS comp
Expand All @@ -397,13 +400,24 @@ static AR_ExpNode *_AR_ExpNodeFromListComprehension(const cypher_astnode_t *comp
/* The predicate node is the set of WHERE conditions in the comprehension, if any. */
const cypher_astnode_t *predicate_node = cypher_ast_list_comprehension_get_predicate(comp_exp);
// Build a FilterTree to represent this predicate.
if(predicate_node) AST_ConvertFilters(&ctx->ft, predicate_node);
if(predicate_node) {
AST_ConvertFilters(&ctx->ft, predicate_node);
} else if(type != CYPHER_AST_LIST_COMPREHENSION) {
// Functions like any() and all() must have a predicate node.
QueryCtx_SetError("'%s' function requires a WHERE predicate", func_name);
rm_free(ctx);
return AR_EXP_NewConstOperandNode(SI_NullVal());
}

/* Construct the operator node that will generate updated values, if one is provided.
* In the above query, this will be an operation node representing "val * 2". */
* In the above query, this will be an operation node representing "val * 2".
* This will always be NULL for comprehensions like any() and all(). */
const cypher_astnode_t *eval_node = cypher_ast_list_comprehension_get_eval(comp_exp);
if(eval_node) ctx->eval_exp = _AR_EXP_FromExpression(eval_node);

// Build an operation node to represent the list comprehension.
AR_ExpNode *op = AR_EXP_NewOpNode(func_name, 2);

// Add the context to the function descriptor as the function's private data.
op->op.f = AR_SetPrivateData(op->op.f, ctx);

Expand Down Expand Up @@ -480,8 +494,9 @@ static AR_ExpNode *_AR_EXP_FromExpression(const cypher_astnode_t *expr) {
return _AR_ExpNodeFromGraphEntity(expr);
} else if(type == CYPHER_AST_PARAMETER) {
return _AR_ExpNodeFromParameter(expr);
} else if(type == CYPHER_AST_LIST_COMPREHENSION) {
return _AR_ExpNodeFromListComprehension(expr);
} else if(type == CYPHER_AST_LIST_COMPREHENSION || type == CYPHER_AST_ANY ||
type == CYPHER_AST_ALL) {
return _AR_ExpNodeFromComprehensionFunction(expr, type);
} else {
/*
Unhandled types:
Expand Down
5 changes: 4 additions & 1 deletion src/ast/ast_build_filter_tree.c
Expand Up @@ -296,7 +296,10 @@ void AST_ConvertFilters(FT_FilterNode **root, const cypher_astnode_t *entity) {
node = _convertBinaryOperator(entity);
} else if(type == CYPHER_AST_UNARY_OPERATOR) {
node = _convertUnaryOperator(entity);
} else if(type == CYPHER_AST_APPLY_OPERATOR) {
} else if(type == CYPHER_AST_APPLY_OPERATOR ||
type == CYPHER_AST_ANY ||
type == CYPHER_AST_ALL ||
type == CYPHER_AST_LIST_COMPREHENSION) {
node = _convertApplyOperator(entity);
} else if(type == CYPHER_AST_TRUE) {
node = _convertTrueOperator();
Expand Down
2 changes: 1 addition & 1 deletion src/ast/ast_validations.c
Expand Up @@ -62,7 +62,7 @@ static void _AST_GetIdentifiers(const cypher_astnode_t *node, rax *identifiers)
_AST_GetIdentifiers(child, identifiers);
}

if(type == CYPHER_AST_LIST_COMPREHENSION) {
if(type == CYPHER_AST_LIST_COMPREHENSION || type == CYPHER_AST_ANY || type == CYPHER_AST_ALL) {
// A list comprehension has a local variable that should only be accessed within its scope;
// do not leave it in the identifiers map.
const cypher_astnode_t *variable_node = cypher_ast_list_comprehension_get_identifier(node);
Expand Down
4 changes: 2 additions & 2 deletions src/ast/cypher_whitelist.c
Expand Up @@ -101,8 +101,8 @@ static void _buildTypesWhitelist(void) {
// CYPHER_AST_FILTER, // Deprecated, will not be supported
// CYPHER_AST_EXTRACT, // Deprecated, will not be supported
// CYPHER_AST_REDUCE,
// CYPHER_AST_ALL,
// CYPHER_AST_ANY,
CYPHER_AST_ALL,
CYPHER_AST_ANY,
// CYPHER_AST_SINGLE,
// CYPHER_AST_NONE,
CYPHER_AST_COLLECTION,
Expand Down
3 changes: 3 additions & 0 deletions src/commands/cmd_bulk_insert.c
Expand Up @@ -13,6 +13,8 @@
void _MGraph_BulkInsert(void *args) {
// Establish thread-safe environment for batch insertion
CommandCtx *command_ctx = (CommandCtx *)args;
CommandCtx_TrackCtx(command_ctx);

RedisModuleCtx *ctx = CommandCtx_GetRedisCtx(command_ctx);

RedisModuleString **argv = command_ctx->argv + 1; // skip "GRAPH.BULK"
Expand Down Expand Up @@ -108,3 +110,4 @@ int MGraph_BulkInsert(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
RedisModule_ReplicateVerbatim(ctx);
return REDISMODULE_OK;
}

0 comments on commit 1ac1230

Please sign in to comment.