From eb8b9301ea80a8b5e059d3deab044a1547a2f6f7 Mon Sep 17 00:00:00 2001 From: Roi Lipman Date: Tue, 12 Mar 2024 11:36:57 +0200 Subject: [PATCH 1/6] [WIP] load csv op --- src/execution_plan/ops/op.h | 5 +- src/execution_plan/ops/op_load_csv.c | 177 +++++++++++++++++++++++++++ src/execution_plan/ops/op_load_csv.h | 28 +++++ 3 files changed, 208 insertions(+), 2 deletions(-) create mode 100644 src/execution_plan/ops/op_load_csv.c create mode 100644 src/execution_plan/ops/op_load_csv.h diff --git a/src/execution_plan/ops/op.h b/src/execution_plan/ops/op.h index a6288e769..a4e7506d4 100644 --- a/src/execution_plan/ops/op.h +++ b/src/execution_plan/ops/op.h @@ -55,6 +55,7 @@ typedef enum { OPType_OR_APPLY_MULTIPLEXER, OPType_AND_APPLY_MULTIPLEXER, OPType_OPTIONAL, + OPType_LOAD_CSV, } OPType; typedef enum { @@ -204,8 +205,8 @@ uint OpBase_ChildCount // returns the i'th child of the op OpBase *OpBase_GetChild ( - OpBase *join, // op - uint i // child index + OpBase *op, // op + uint i // child index ); // mark alias as being modified by operation diff --git a/src/execution_plan/ops/op_load_csv.c b/src/execution_plan/ops/op_load_csv.c new file mode 100644 index 000000000..0e4953d78 --- /dev/null +++ b/src/execution_plan/ops/op_load_csv.c @@ -0,0 +1,177 @@ +/* + * Copyright FalkorDB Ltd. 2023 - present + * Licensed under the Server Side Public License v1 (SSPLv1). + */ + +#include "RG.h" +#include "op_load_csv.h" +#include "../../datatypes/array.h" + +// forward declarations +static OpResult LoadCSVInit(OpBase *opBase); +static Record LoadCSVConsume(OpBase *opBase); +static Record LoadCSVConsumeFromChild(OpBase *opBase); +static OpBase *LoadCSVClone(const ExecutionPlan *plan, const OpBase *opBase); +static void LoadCSVFree(OpBase *opBase); + +// create a new load CSV operation +OpBase *NewLoadCSVOp +( + const ExecutionPlan *plan, // execution plan + const char *path, // path to CSV file + const char *alias // CSV row alias +) { + ASSERT(plan != NULL); + ASSERT(path != NULL); + ASSERT(alias != NULL); + + OpLoadCSV *op = rm_calloc(1, sizeof(OpLoadCSV)); + + op->path = strdup(path); + op->alias = strdup(alias); + + // Set our Op operations + OpBase_Init((OpBase *)op, OPType_LOAD_CSV, "Load CSV", LoadCSVInit, + LoadCSVConsume, NULL, NULL, LoadCSVClone, LoadCSVFree, false, plan); + + op->recIdx = OpBase_Modifies((OpBase *)op, alias); + + return (OpBase *)op; +} + +static OpResult LoadCSVInit +( + OpBase *opBase +) { + // update consume function in case operation has a child + if(OpBase_ChildCount(opBase) > 0) { + OpLoadCSV *op = (OpLoadCSV*)opBase; + op->child = OpBase_GetChild(opBase, 0); + OpBase_UpdateConsume(opBase, LoadCSVConsumeFromChild); + } + + return OP_OK; +} + +static const char *A = "AAA"; +static const char *B = "BB"; +static const char *C = "C"; + +static bool _CSV_GetRow +( + OpLoadCSV *op, + SIValue *row +) { + ASSERT(op != NULL); + ASSERT(row != NULL); + + SIValue _row = SIArray_New(3); + + SIArray_Append(&_row, SI_ConstStringVal(A)); + SIArray_Append(&_row, SI_ConstStringVal(B)); + SIArray_Append(&_row, SI_ConstStringVal(C)); + + *row = _row; + + return true; +} + +// load CSV consume function in case this operation in not a tap +static Record LoadCSVConsumeFromChild +( + OpBase *opBase +) { + ASSERT(opBase != NULL); + + OpLoadCSV *op = (OpLoadCSV*)opBase; + +pull_from_child: + // in case a record is missing ask child to provide one + if(op->child_record == NULL) { + op->child_record = OpBase_Consume(op->child); + + // child failed to provide record, depleted + if(op->child_record == NULL) { + return NULL; + } + } + + // must have a record at this point + ASSERT(op->child_record != NULL); + + // get a new CSV row + SIValue row; + if(!_CSV_GetRow(op, &row)) { + // failed to get a CSV row + // reset CSV reader and free current child record + OpBase_DeleteRecord(op->child_record); + op->child_record = NULL; + + // try to get a new record from child + goto pull_from_child; + } + + // managed to get a new CSV row + // update record and return to caller + Record r = OpBase_DeepCloneRecord(op->child_record); + Record_AddScalar(r, op->recIdx, row); + + return r; +} + +// load CSV consume function in the case this operation is a tap +static Record LoadCSVConsume +( + OpBase *opBase +) { + ASSERT(opBase != NULL); + + OpLoadCSV *op = (OpLoadCSV*)opBase; + + SIValue row; + Record r = NULL; + + if(_CSV_GetRow(op, &row)) { + r = OpBase_CreateRecord(opBase); + Record_AddScalar(r, op->recIdx, row); + } + + return r; +} + +static inline OpBase *LoadCSVClone +( + const ExecutionPlan *plan, + const OpBase *opBase +) { + ASSERT(opBase->type == OPType_LOAD_CSV); + + OpLoadCSV *op = (OpLoadCSV*)opBase; + return NewLoadCSVOp(plan, op->path, op->alias); +} + +// free Load CSV operation +static void LoadCSVFree +( + OpBase *opBase +) { + ASSERT(opBase != NULL); + + OpLoadCSV *op = (OpLoadCSV*)opBase; + + if(op->path != NULL) { + rm_free(op->path); + op->path = NULL; + } + + if(op->alias != NULL) { + rm_free(op->alias); + op->alias = NULL; + } + + if(op->child_record != NULL) { + OpBase_DeleteRecord(op->child_record); + op->child_record = NULL; + } +} + diff --git a/src/execution_plan/ops/op_load_csv.h b/src/execution_plan/ops/op_load_csv.h new file mode 100644 index 000000000..6d289f042 --- /dev/null +++ b/src/execution_plan/ops/op_load_csv.h @@ -0,0 +1,28 @@ +/* + * Copyright FalkorDB Ltd. 2023 - present + * Licensed under the Server Side Public License v1 (SSPLv1). + */ + +#pragma once + +#include "op.h" +#include "../execution_plan.h" + +// load CSV +typedef struct { + OpBase op; // op base must be the first field in this struct + char *path; // full path to CSV file + char *alias; // CSV row alias + int recIdx; // record index to populate with CSV row + OpBase *child; // child operation + Record child_record; // child record +} OpLoadCSV; + +// creates a new load CSV operation +OpBase *NewLoadCSVOp +( + const ExecutionPlan *plan, // execution plan + const char *path, // full path to CSV file + const char *alias // CSV row alias +); + From 92db8e4552a8a43be982887d8f811cb58255b443 Mon Sep 17 00:00:00 2001 From: Roi Lipman Date: Wed, 13 Mar 2024 10:21:55 +0200 Subject: [PATCH 2/6] [WIP] load csv plan --- src/ast/ast_validations.c | 19 ++++- src/errors/error_msgs.h | 12 ++-- src/errors/errors.c | 4 +- .../execution_plan_construct.c | 27 +++++++ src/execution_plan/ops/op_load_csv.c | 71 +++++++++++++++++-- src/execution_plan/ops/op_load_csv.h | 6 +- src/execution_plan/ops/ops.h | 1 + 7 files changed, 124 insertions(+), 16 deletions(-) diff --git a/src/ast/ast_validations.c b/src/ast/ast_validations.c index e07ac298d..c2104aa90 100644 --- a/src/ast/ast_validations.c +++ b/src/ast/ast_validations.c @@ -504,6 +504,23 @@ static VISITOR_STRATEGY _Validate_pattern_comprehension return VISITOR_CONTINUE; } +// validate LOAD CSV clause +static VISITOR_STRATEGY _Validate_load_csv +( + const cypher_astnode_t *n, // ast-node (LOAD CSV) + bool start, // first traversal + ast_visitor *visitor // visitor +) { + validations_ctx *vctx = AST_Visitor_GetContext(visitor); + + const cypher_astnode_t *node = cypher_ast_load_csv_get_identifier(n); + const char *alias = cypher_ast_identifier_get_name(node); + + _IdentifierAdd(vctx, alias, NULL); + + return VISITOR_CONTINUE; +} + // validate that an identifier is bound static VISITOR_STRATEGY _Validate_identifier ( @@ -2253,6 +2270,7 @@ bool AST_ValidationsMappingInit(void) { validations_mapping[CYPHER_AST_REMOVE] = _Validate_REMOVE_Clause; validations_mapping[CYPHER_AST_REDUCE] = _Validate_reduce; validations_mapping[CYPHER_AST_FOREACH] = _Validate_FOREACH_Clause; + validations_mapping[CYPHER_AST_LOAD_CSV] = _Validate_load_csv; validations_mapping[CYPHER_AST_IDENTIFIER] = _Validate_identifier; validations_mapping[CYPHER_AST_PROJECTION] = _Validate_projection; validations_mapping[CYPHER_AST_NAMED_PATH] = _Validate_named_path; @@ -2276,7 +2294,6 @@ bool AST_ValidationsMappingInit(void) { validations_mapping[CYPHER_AST_FILTER] = _visit_break; validations_mapping[CYPHER_AST_EXTRACT] = _visit_break; validations_mapping[CYPHER_AST_COMMAND] = _visit_break; - validations_mapping[CYPHER_AST_LOAD_CSV] = _visit_break; validations_mapping[CYPHER_AST_MATCH_HINT] = _visit_break; validations_mapping[CYPHER_AST_USING_JOIN] = _visit_break; validations_mapping[CYPHER_AST_USING_SCAN] = _visit_break; diff --git a/src/errors/error_msgs.h b/src/errors/error_msgs.h index f2b9cf7b3..ff47ca279 100644 --- a/src/errors/error_msgs.h +++ b/src/errors/error_msgs.h @@ -19,8 +19,8 @@ #define EMSG_SHORTESTPATH_SINGLE_RELATIONSHIP "shortestPath requires a path containing a single relationship" #define EMSG_SHORTESTPATH_MINIMAL_LENGTH "shortestPath does not support a minimal length different from 0 or 1" #define EMSG_SHORTESTPATH_MAX_HOPS "Maximum number of hops must be greater than or equal to minimum number of hops" -#define EMSG_SHORTESTPATH_UNDIRECTED "RedisGraph does not currently support undirected shortestPath traversals" -#define EMSG_SHORTESTPATH_RELATIONSHIP_FILTERS "RedisGraph does not currently support filters on relationships in shortestPath" +#define EMSG_SHORTESTPATH_UNDIRECTED "FalkorDB does not currently support undirected shortestPath traversals" +#define EMSG_SHORTESTPATH_RELATIONSHIP_FILTERS "FalkorDB does not currently support filters on relationships in shortestPath" #define EMSG_SHORTESTPATH_NODE_FILTERS "Node filters may not be introduced in shortestPath" #define EMSG_FUNCTION_REQUIER_PREDICATE "'%s' function requires a WHERE predicate" #define EMSG_NESTED_AGGREGATION "Can't use aggregate functions inside of aggregate functions." @@ -65,7 +65,7 @@ #define EMSG_CALLSUBQUERY_INVALID_REFERENCES "WITH imports in CALL {} must consist of only simple references to outside variables" #define EMSG_VAIABLE_ALREADY_DECLARED_IN_OUTER_SCOPE "Variable `%s` already declared in outer scope" #define EMSG_DELETE_INVALID_ARGUMENTS "DELETE can only be called on nodes, paths and relationships" -#define EMSG_SET_LHS_NON_ALIAS "RedisGraph does not currently support non-alias references on the left-hand side of SET expressions" +#define EMSG_SET_LHS_NON_ALIAS "FalkorDB does not currently support non-alias references on the left-hand side of SET expressions" #define EMSG_UNION_COMBINATION "Invalid combination of UNION and UNION ALL." #define EMSG_FOREACH_INVALID_BODY "Error: Only updating clauses may reside in FOREACH" #define EMSG_QUERY_INVALID_LAST_CLAUSE "Query cannot conclude with %s (must be a RETURN clause, an update clause, a procedure call or a non-returning subquery)" @@ -75,8 +75,8 @@ #define EMSG_MISSING_WITH_AFTER_MATCH "A WITH clause is required to introduce a MATCH clause after an OPTIONAL MATCH." #define EMSG_UNSUPPORTED_QUERY_TYPE "Encountered unsupported query type '%s'" #define EMSG_EMPTY_QUERY "Error: empty query." -#define EMSG_ALLSHORTESTPATH_SUPPORT "RedisGraph support allShortestPaths only in match clauses" -#define EMSG_SHORTESTPATH_SUPPORT "RedisGraph currently only supports shortestPaths in WITH or RETURN clauses" +#define EMSG_ALLSHORTESTPATH_SUPPORT "FalkorDB support allShortestPaths only in match clauses" +#define EMSG_SHORTESTPATH_SUPPORT "FalkorDB currently only supports shortestPaths in WITH or RETURN clauses" #define EMSG_EXPLAIN_PROFILE_USAGE "Please use GRAPH.%s 'key' 'query' command instead of GRAPH.QUERY 'key' '%s query'" #define EMSG_DUPLICATE_PARAMETERS "Duplicated parameter: %s" #define EMSG_PARSER_ERROR "errMsg: %s line: %u, column: %u, offset: %zu errCtx: %s errCtxOffset: %zu" @@ -87,7 +87,7 @@ #define EMSG_COULD_NOT_PARSE_QUERY "Error: could not parse query" #define EMSG_UNABLE_TO_RESOLVE_FILTER_ALIAS "Unable to resolve filtered alias '%s'" #define EMSG_TYPE_MISMATCH "Type mismatch: expected %s but was %s" -#define EMSG_REDISGRAPH_SUPPORT "RedisGraph does not currently support %s" +#define EMSG_FALKORDB_SUPPORT "FalkorDB does not currently support %s" #define EMSG_INVALID_PROPERTY_VALUE "Property values can only be of primitive types or arrays of primitive types" #define EMSG_DIVISION_BY_ZERO "Division by zero" #define EMSG_ALLSHORTESTPATH_SRC_DST_RESLOVED "Source and destination must already be resolved to call allShortestPaths" diff --git a/src/errors/errors.c b/src/errors/errors.c index 2d2d30d63..ac4566976 100644 --- a/src/errors/errors.c +++ b/src/errors/errors.c @@ -165,13 +165,13 @@ void Error_UnsupportedASTNodeType(const cypher_astnode_t *node) { cypher_astnode_type_t type = cypher_astnode_type(node); const char *type_str = cypher_astnode_typestr(type); - ErrorCtx_SetError(EMSG_REDISGRAPH_SUPPORT, type_str); + ErrorCtx_SetError(EMSG_FALKORDB_SUPPORT, type_str); } void Error_UnsupportedASTOperator(const cypher_operator_t *op) { ASSERT(op != NULL); - ErrorCtx_SetError(EMSG_REDISGRAPH_SUPPORT, op->str); + ErrorCtx_SetError(EMSG_FALKORDB_SUPPORT, op->str); } inline void Error_InvalidPropertyValue(void) { diff --git a/src/execution_plan/execution_plan_build/execution_plan_construct.c b/src/execution_plan/execution_plan_build/execution_plan_construct.c index b3f0e7dce..5400a092e 100644 --- a/src/execution_plan/execution_plan_build/execution_plan_construct.c +++ b/src/execution_plan/execution_plan_build/execution_plan_construct.c @@ -170,6 +170,31 @@ static inline void _buildUnwindOp(ExecutionPlan *plan, const cypher_astnode_t *c ExecutionPlan_UpdateRoot(plan, op); } +static void _buildLoadCSVOp +( + ExecutionPlan *plan, + const cypher_astnode_t *clause +) { + ASSERT(plan != NULL); + ASSERT(clause != NULL); + + // extract information from AST + + // with headers + bool with_headers = cypher_ast_load_csv_has_with_headers(clause); + + // URI expression + const cypher_astnode_t *node = cypher_ast_load_csv_get_url(clause); + AR_ExpNode *exp = AR_EXP_FromASTNode(node); + + // alias + node = cypher_ast_load_csv_get_identifier(clause); + const char *alias = cypher_ast_identifier_get_name(node); + + OpBase *op = NewLoadCSVOp(plan, exp, alias, with_headers); + ExecutionPlan_UpdateRoot(plan, op); +} + static inline void _buildUpdateOp(GraphContext *gc, ExecutionPlan *plan, const cypher_astnode_t *clause) { rax *update_exps = AST_PrepareUpdateOp(gc, clause); @@ -368,6 +393,8 @@ void ExecutionPlanSegment_ConvertClause _buildForeachOp(plan, clause, gc); } else if(t == CYPHER_AST_CALL_SUBQUERY) { buildCallSubqueryPlan(plan, clause); + } else if(t == CYPHER_AST_LOAD_CSV) { + _buildLoadCSVOp(plan, clause); } else { assert(false && "unhandeled clause"); } diff --git a/src/execution_plan/ops/op_load_csv.c b/src/execution_plan/ops/op_load_csv.c index 0e4953d78..696c5f104 100644 --- a/src/execution_plan/ops/op_load_csv.c +++ b/src/execution_plan/ops/op_load_csv.c @@ -12,22 +12,23 @@ static OpResult LoadCSVInit(OpBase *opBase); static Record LoadCSVConsume(OpBase *opBase); static Record LoadCSVConsumeFromChild(OpBase *opBase); static OpBase *LoadCSVClone(const ExecutionPlan *plan, const OpBase *opBase); +static OpResult LoadCSVReset(OpBase *opBase); static void LoadCSVFree(OpBase *opBase); // create a new load CSV operation OpBase *NewLoadCSVOp ( const ExecutionPlan *plan, // execution plan - const char *path, // path to CSV file + AR_ExpNode *exp, // CSV URI path expression const char *alias // CSV row alias ) { + ASSERT(exp != NULL); ASSERT(plan != NULL); - ASSERT(path != NULL); ASSERT(alias != NULL); OpLoadCSV *op = rm_calloc(1, sizeof(OpLoadCSV)); - op->path = strdup(path); + op->exp = exp; op->alias = strdup(alias); // Set our Op operations @@ -53,6 +54,26 @@ static OpResult LoadCSVInit return OP_OK; } +// evaluate path expression +// expression must evaluate to string representing a valid URI +// if that's not the case an exception is raised +static bool _compute_path +( + OpLoadCSV *op +) { + ASSERT(op != NULL); + + SIValue v = AR_EXP_Evaluate(op->exp, NULL); + if(SI_TYPE(v) != T_STRING) { + ErrorCtx_RaiseRuntimeException("path to CSV must be a string"); + return false; + } + + op->path = v.stringval; + + return true; +} + static const char *A = "AAA"; static const char *B = "BB"; static const char *C = "C"; @@ -86,6 +107,13 @@ static Record LoadCSVConsumeFromChild OpLoadCSV *op = (OpLoadCSV*)opBase; pull_from_child: + + // first call, evaluate CSV path + if(op->path == NULL && !_compute_path(op)) { + // failed to evaluate CSV path, quickly return + return NULL; + } + // in case a record is missing ask child to provide one if(op->child_record == NULL) { op->child_record = OpBase_Consume(op->child); @@ -107,6 +135,10 @@ static Record LoadCSVConsumeFromChild OpBase_DeleteRecord(op->child_record); op->child_record = NULL; + // free CSV path, just in case it relies on record data + rm_free(op->path); + op->path = NULL; + // try to get a new record from child goto pull_from_child; } @@ -128,6 +160,12 @@ static Record LoadCSVConsume OpLoadCSV *op = (OpLoadCSV*)opBase; + // first call, evaluate CSV path + if(op->path == NULL && !_compute_path(op)) { + // failed to evaluate CSV path, quickly return + return NULL; + } + SIValue row; Record r = NULL; @@ -147,7 +185,25 @@ static inline OpBase *LoadCSVClone ASSERT(opBase->type == OPType_LOAD_CSV); OpLoadCSV *op = (OpLoadCSV*)opBase; - return NewLoadCSVOp(plan, op->path, op->alias); + return NewLoadCSVOp(plan, AR_EXP_Clone(op->exp), op->alias); +} + +static OpResult LoadCSVReset ( + OpBase *opBase +) { + OpLoadCSV *op = (OpLoadCSV*)opBase; + + if(op->path != NULL) { + rm_free(op->path); + op->path = NULL; + } + + if(op->child_record != NULL) { + OpBase_DeleteRecord(op->child_record); + op->child_record = NULL; + } + + return OP_OK; } // free Load CSV operation @@ -159,9 +215,14 @@ static void LoadCSVFree OpLoadCSV *op = (OpLoadCSV*)opBase; + if(op->exp != NULL) { + AR_EXP_Free(op->exp); + op->exp = NULL; + } + if(op->path != NULL) { rm_free(op->path); - op->path = NULL; + op->path = NULL; } if(op->alias != NULL) { diff --git a/src/execution_plan/ops/op_load_csv.h b/src/execution_plan/ops/op_load_csv.h index 6d289f042..bb713f73a 100644 --- a/src/execution_plan/ops/op_load_csv.h +++ b/src/execution_plan/ops/op_load_csv.h @@ -11,7 +11,8 @@ // load CSV typedef struct { OpBase op; // op base must be the first field in this struct - char *path; // full path to CSV file + AR_ExpNode *exp; // expression evaluated to CSV path + char *path; // CSV path char *alias; // CSV row alias int recIdx; // record index to populate with CSV row OpBase *child; // child operation @@ -22,7 +23,8 @@ typedef struct { OpBase *NewLoadCSVOp ( const ExecutionPlan *plan, // execution plan - const char *path, // full path to CSV file + AR_ExpNode *exp, // CSV URI path expression const char *alias // CSV row alias + bool with_headers // CSV contains header row ); diff --git a/src/execution_plan/ops/ops.h b/src/execution_plan/ops/ops.h index 32e959604..2f3cc04c7 100644 --- a/src/execution_plan/ops/ops.h +++ b/src/execution_plan/ops/ops.h @@ -23,6 +23,7 @@ #include "op_foreach.h" #include "op_optional.h" #include "op_argument.h" +#include "op_load_csv.h" #include "op_distinct.h" #include "op_aggregate.h" #include "op_semi_apply.h" From 38ecd529aace92bb6578e01a9d569cbbd63faced Mon Sep 17 00:00:00 2001 From: Roi Lipman Date: Sun, 17 Mar 2024 20:42:44 +0200 Subject: [PATCH 3/6] [WIP] load csv op --- src/execution_plan/ops/op_load_csv.c | 155 +++++++++++++++------------ src/execution_plan/ops/op_load_csv.h | 6 +- 2 files changed, 93 insertions(+), 68 deletions(-) diff --git a/src/execution_plan/ops/op_load_csv.c b/src/execution_plan/ops/op_load_csv.c index 696c5f104..976141aa0 100644 --- a/src/execution_plan/ops/op_load_csv.c +++ b/src/execution_plan/ops/op_load_csv.c @@ -10,17 +10,66 @@ // forward declarations static OpResult LoadCSVInit(OpBase *opBase); static Record LoadCSVConsume(OpBase *opBase); +static Record LoadCSVConsumeDepleted(OpBase *opBase); static Record LoadCSVConsumeFromChild(OpBase *opBase); static OpBase *LoadCSVClone(const ExecutionPlan *plan, const OpBase *opBase); static OpResult LoadCSVReset(OpBase *opBase); static void LoadCSVFree(OpBase *opBase); +// evaluate path expression +// expression must evaluate to string representing a valid URI +// if that's not the case an exception is raised +static bool _compute_path +( + OpLoadCSV *op, + Record r +) { + ASSERT(op != NULL); + + op->path = AR_EXP_Evaluate(op->exp, r); + if(SI_TYPE(op->path) != T_STRING) { + ErrorCtx_RaiseRuntimeException("path to CSV must be a string"); + return false; + } + + return true; +} + +// mock data +static char *A = "AAA"; +static char *B = "BB"; +static char *C = "C"; + +// get a single CSV row +static bool _CSV_GetRow +( + OpLoadCSV *op, // load CSV operation + SIValue *row // row to populate +) { + ASSERT(op != NULL); + ASSERT(row != NULL); + + if(!op->mock) return false; + + SIValue _row = SIArray_New(3); + + SIArray_Append(&_row, SI_ConstStringVal(A)); + SIArray_Append(&_row, SI_ConstStringVal(B)); + SIArray_Append(&_row, SI_ConstStringVal(C)); + + *row = _row; + + op->mock = false; + return true; +} + // create a new load CSV operation OpBase *NewLoadCSVOp ( const ExecutionPlan *plan, // execution plan AR_ExpNode *exp, // CSV URI path expression - const char *alias // CSV row alias + const char *alias, // CSV row alias + bool with_headers // CSV contains header row ) { ASSERT(exp != NULL); ASSERT(plan != NULL); @@ -28,8 +77,11 @@ OpBase *NewLoadCSVOp OpLoadCSV *op = rm_calloc(1, sizeof(OpLoadCSV)); - op->exp = exp; - op->alias = strdup(alias); + op->exp = exp; + op->path = SI_NullVal(); + op->mock = true; + op->alias = strdup(alias); + op->with_headers = with_headers; // Set our Op operations OpBase_Init((OpBase *)op, OPType_LOAD_CSV, "Load CSV", LoadCSVInit, @@ -40,61 +92,40 @@ OpBase *NewLoadCSVOp return (OpBase *)op; } +// initialize operation static OpResult LoadCSVInit ( OpBase *opBase ) { + // set operation consume function + + OpLoadCSV *op = (OpLoadCSV*)opBase; // update consume function in case operation has a child if(OpBase_ChildCount(opBase) > 0) { - OpLoadCSV *op = (OpLoadCSV*)opBase; op->child = OpBase_GetChild(opBase, 0); OpBase_UpdateConsume(opBase, LoadCSVConsumeFromChild); - } - - return OP_OK; -} -// evaluate path expression -// expression must evaluate to string representing a valid URI -// if that's not the case an exception is raised -static bool _compute_path -( - OpLoadCSV *op -) { - ASSERT(op != NULL); - - SIValue v = AR_EXP_Evaluate(op->exp, NULL); - if(SI_TYPE(v) != T_STRING) { - ErrorCtx_RaiseRuntimeException("path to CSV must be a string"); - return false; + return OP_OK; } - op->path = v.stringval; + // no child operation evaluate path expression + Record r = OpBase_CreateRecord(opBase); + if(!_compute_path(op, r)) { + // failed to evaluate CSV path + // update consume function + OpBase_DeleteRecord(r); + OpBase_UpdateConsume(opBase, LoadCSVConsumeDepleted); + } - return true; + return OP_OK; } -static const char *A = "AAA"; -static const char *B = "BB"; -static const char *C = "C"; - -static bool _CSV_GetRow +// simply return NULL indicating operation depleted +static Record LoadCSVConsumeDepleted ( - OpLoadCSV *op, - SIValue *row + OpBase *opBase ) { - ASSERT(op != NULL); - ASSERT(row != NULL); - - SIValue _row = SIArray_New(3); - - SIArray_Append(&_row, SI_ConstStringVal(A)); - SIArray_Append(&_row, SI_ConstStringVal(B)); - SIArray_Append(&_row, SI_ConstStringVal(C)); - - *row = _row; - - return true; + return NULL; } // load CSV consume function in case this operation in not a tap @@ -108,12 +139,6 @@ static Record LoadCSVConsumeFromChild pull_from_child: - // first call, evaluate CSV path - if(op->path == NULL && !_compute_path(op)) { - // failed to evaluate CSV path, quickly return - return NULL; - } - // in case a record is missing ask child to provide one if(op->child_record == NULL) { op->child_record = OpBase_Consume(op->child); @@ -122,6 +147,14 @@ static Record LoadCSVConsumeFromChild if(op->child_record == NULL) { return NULL; } + + // first call, evaluate CSV path + if(!_compute_path(op, op->child_record)) { + // failed to evaluate CSV path, quickly return + return NULL; + } + + op->mock = true; } // must have a record at this point @@ -136,8 +169,8 @@ static Record LoadCSVConsumeFromChild op->child_record = NULL; // free CSV path, just in case it relies on record data - rm_free(op->path); - op->path = NULL; + SIValue_Free(op->path); + op->path = SI_NullVal(); // try to get a new record from child goto pull_from_child; @@ -160,12 +193,6 @@ static Record LoadCSVConsume OpLoadCSV *op = (OpLoadCSV*)opBase; - // first call, evaluate CSV path - if(op->path == NULL && !_compute_path(op)) { - // failed to evaluate CSV path, quickly return - return NULL; - } - SIValue row; Record r = NULL; @@ -185,7 +212,8 @@ static inline OpBase *LoadCSVClone ASSERT(opBase->type == OPType_LOAD_CSV); OpLoadCSV *op = (OpLoadCSV*)opBase; - return NewLoadCSVOp(plan, AR_EXP_Clone(op->exp), op->alias); + return NewLoadCSVOp(plan, AR_EXP_Clone(op->exp), op->alias, + op->with_headers); } static OpResult LoadCSVReset ( @@ -193,10 +221,8 @@ static OpResult LoadCSVReset ( ) { OpLoadCSV *op = (OpLoadCSV*)opBase; - if(op->path != NULL) { - rm_free(op->path); - op->path = NULL; - } + SIValue_Free(op->path); + op->path = SI_NullVal(); if(op->child_record != NULL) { OpBase_DeleteRecord(op->child_record); @@ -215,16 +241,13 @@ static void LoadCSVFree OpLoadCSV *op = (OpLoadCSV*)opBase; + SIValue_Free(op->path); + if(op->exp != NULL) { AR_EXP_Free(op->exp); op->exp = NULL; } - if(op->path != NULL) { - rm_free(op->path); - op->path = NULL; - } - if(op->alias != NULL) { rm_free(op->alias); op->alias = NULL; diff --git a/src/execution_plan/ops/op_load_csv.h b/src/execution_plan/ops/op_load_csv.h index bb713f73a..9a3fb218c 100644 --- a/src/execution_plan/ops/op_load_csv.h +++ b/src/execution_plan/ops/op_load_csv.h @@ -12,9 +12,11 @@ typedef struct { OpBase op; // op base must be the first field in this struct AR_ExpNode *exp; // expression evaluated to CSV path - char *path; // CSV path + SIValue path; // CSV path char *alias; // CSV row alias int recIdx; // record index to populate with CSV row + bool mock; // mock CSV row + bool with_headers; // CSV contains header row OpBase *child; // child operation Record child_record; // child record } OpLoadCSV; @@ -24,7 +26,7 @@ OpBase *NewLoadCSVOp ( const ExecutionPlan *plan, // execution plan AR_ExpNode *exp, // CSV URI path expression - const char *alias // CSV row alias + const char *alias, // CSV row alias bool with_headers // CSV contains header row ); From 6092938c4c5f84db280f32e89a4c3b9e3bdb28d4 Mon Sep 17 00:00:00 2001 From: Roi Lipman Date: Sun, 17 Mar 2024 21:06:39 +0200 Subject: [PATCH 4/6] [WIP] test LOAD CSV --- tests/flow/test_load_csv.py | 54 +++++++++++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) create mode 100644 tests/flow/test_load_csv.py diff --git a/tests/flow/test_load_csv.py b/tests/flow/test_load_csv.py new file mode 100644 index 000000000..5574943f2 --- /dev/null +++ b/tests/flow/test_load_csv.py @@ -0,0 +1,54 @@ +from common import * +import re + +GRAPH_ID = "load_csv" + +class testLoadCSV(): + def __init__(self): + self.env, self.db = Env() + self.graph = self.db.select_graph(GRAPH_ID) + + def test01_invalid_call(self): + queries = ["LOAD CSV FROM a AS row RETURN row", + "LOAD CSV FROM 2 AS row RETURN row", + "LOAD CSV FROM $arr AS row RETURN row", + "WITH 2 AS x LOAD CSV FROM x AS row RETURN row"] + + for q in queries: + try: + self.graph.query(q, {'arr': []}) + self.env.assertFalse(True) + except Exception as e: + self.env.assertEquals(str(e), "path to CSV must be a string") + + def test02_project_csv_rows(self): + # project all rows in a CSV file + q = """LOAD CSV FROM 'data.csv' AS row + RETURN row""" + + result = self.graph.query(q).result_set + self.env.assertEquals(result[0][0], ['AAA', 'BB', 'C']) + + def test03_load_csv_multiple_times(self): + # project the same CSV multiple times + q = """UNWIND [1,2,3] AS x + LOAD CSV FROM 'data.csv' AS row + RETURN x, row + ORDER BY x""" + + result = self.graph.query(q).result_set + self.env.assertEquals(result[0], [1, ['AAA', 'BB', 'C']]) + self.env.assertEquals(result[1], [2, ['AAA', 'BB', 'C']]) + self.env.assertEquals(result[2], [3, ['AAA', 'BB', 'C']]) + + def test04_dynamic_csv_path(self): + # project all rows in a CSV file + q = """UNWIND ['a', 'b'] AS x + LOAD CSV FROM x + '.csv' AS row + RETURN x, row + ORDER BY x""" + + result = self.graph.query(q).result_set + self.env.assertEquals(result[0], ['a', ['AAA', 'BB', 'C']]) + self.env.assertEquals(result[1], ['b', ['AAA', 'BB', 'C']]) + From 0a7e71d43c4bffecfcebdb27673feb55823576b0 Mon Sep 17 00:00:00 2001 From: Roi Lipman Date: Tue, 2 Apr 2024 08:55:35 +0300 Subject: [PATCH 5/6] [WIP] support WITH HEADERS --- src/csv_reader/csv_reader.h | 51 ++++++++ src/datatypes/array.c | 127 +++++++++++++----- src/datatypes/array.h | 187 +++++++++++++-------------- src/datatypes/map.c | 20 +++ src/datatypes/map.h | 9 ++ src/execution_plan/ops/op_load_csv.c | 142 +++++++++++++++++--- src/execution_plan/ops/op_load_csv.h | 5 +- tests/flow/test_load_csv.py | 166 ++++++++++++++++++++---- 8 files changed, 540 insertions(+), 167 deletions(-) create mode 100644 src/csv_reader/csv_reader.h diff --git a/src/csv_reader/csv_reader.h b/src/csv_reader/csv_reader.h new file mode 100644 index 000000000..1796cad26 --- /dev/null +++ b/src/csv_reader/csv_reader.h @@ -0,0 +1,51 @@ +/* + * Copyright FalkorDB Ltd. 2023 - present + * Licensed under the Server Side Public License v1 (SSPLv1). + */ + +#pragma once + +#include +#include + +typedef struct Opaque_CSVReader *CSVReader; + +// create a new CSV reader +CSVReader CSVReader_New +( + const char *file_name, // URI to CSV + bool has_headers, // first row is a header row + char delimiter // column delimiter character +); + +// returns the number of columns in CSV file +size_t CSVReader_ColumnCount +( + const CSVReader reader // CSV reader +); + +// extracts the header row +// length of 'values' and 'lengths' arrays must be the same +// returns true on success false otherwise +bool CSVReader_GetHeaders +( + const CSVReader reader, // CSV reader + const char **values, // header values + const size_t *lengths // length of each value +); + +// extract the current row +// length of 'values' and 'lengths' arrays must be the same +// returns true on success false indicates either an error or EOF +bool CSVReader_GetRow +( + const CSVReader reader, // CSV reader + const char **values, // row values + const size_t *lengths // length of each value +); + +// free CSV reader +void CSVReader_Free +( + CSVReader reader // CSV reader to free +); diff --git a/src/datatypes/array.c b/src/datatypes/array.c index b438c4169..4b17224fc 100644 --- a/src/datatypes/array.c +++ b/src/datatypes/array.c @@ -10,7 +10,12 @@ #include #include "xxhash.h" -SIValue SIArray_New(uint32_t initialCapacity) { +// initialize a new SIValue array type with given capacity +// returns initialized array +SIValue SIArray_New +( + u_int32_t initialCapacity // initial capacity +) { SIValue siarray; siarray.array = array_new(SIValue, initialCapacity); siarray.type = T_ARRAY; @@ -18,31 +23,68 @@ SIValue SIArray_New(uint32_t initialCapacity) { return siarray; } -void SIArray_Append(SIValue *siarray, SIValue value) { +// creates an SIArray from a raw 'arr.h' array +// SIArray takes ownership over 'raw' +SIValue SIArray_FromRaw +( + SIValue **raw // raw array +) { + ASSERT(raw != NULL); + ASSERT(*raw != NULL); + + SIValue siarray; + + siarray.array = *raw; + siarray.type = T_ARRAY; + siarray.allocation = M_SELF; + + *raw = NULL; + + return siarray; +} + +// appends a new SIValue to a given array +void SIArray_Append +( + SIValue *siarray, // pointer to array + SIValue value // new value +) { // clone and persist incase of pointer values SIValue clone = SI_CloneValue(value); // append array_append(siarray->array, clone); } -SIValue SIArray_Get(SIValue siarray, uint32_t index) { +// returns a volatile copy of the SIValue from an array in a given index +// if index is out of bound, SI_NullVal is returned +// caller is expected either to not free the returned value or take ownership on +// its own copy +// returns the value in the requested index +SIValue SIArray_Get +( + SIValue siarray, // siarray: array + u_int32_t index // index: index +) { // check index if(index >= SIArray_Length(siarray)) return SI_NullVal(); return SI_ShareValue(siarray.array[index]); } -uint32_t SIArray_Length(SIValue siarray) { +// get the array length +u_int32_t SIArray_Length +( + SIValue siarray // array to return length of +) { return array_len(siarray.array); } -/** - * @brief Returns true if any of the types in 't' are contained in the array - or its nested array children, if any - * @param siarray: array - * @param t: bitmap of types to search for - * @retval a boolean indicating whether any types were matched - */ -bool SIArray_ContainsType(SIValue siarray, SIType t) { +// returns true if any of the types in 't' are contained in the array +// or its nested array children, if any +bool SIArray_ContainsType +( + SIValue siarray, // array to inspect + SIType t // bitmap of types to search for +) { uint array_len = SIArray_Length(siarray); for(uint i = 0; i < array_len; i++) { SIValue elem = siarray.array[i]; @@ -57,14 +99,13 @@ bool SIArray_ContainsType(SIValue siarray, SIType t) { return false; } -/** - * @brief Returns true if the array contains an element equals to 'value' - * @param siarray: array - * @param value: value to search for - * @param comparedNull: indicate if there was a null comparison during the array scan - * @retval a boolean indicating whether value was found in siarray - */ -bool SIArray_ContainsValue(SIValue siarray, SIValue value, bool *comparedNull) { +// returns true if the array contains an element equals to 'value' +bool SIArray_ContainsValue +( + SIValue siarray, // array to search + SIValue value, // value to search for + bool *comparedNull // indicate if there was a null comparison +) { // indicate if there was a null comparison during the array scan if(comparedNull) *comparedNull = false; uint array_len = SIArray_Length(siarray); @@ -81,7 +122,12 @@ bool SIArray_ContainsValue(SIValue siarray, SIValue value, bool *comparedNull) { return false; } -bool SIArray_AllOfType(SIValue siarray, SIType t) { +// returns true if all of the elements in the array are of type 't' +bool SIArray_AllOfType +( + SIValue siarray, // array to inspect + SIType t // type to compare against +) { uint array_len = SIArray_Length(siarray); for(uint i = 0; i < array_len; i++) { SIValue elem = siarray.array[i]; @@ -111,11 +157,11 @@ static int _siarray_compare_func_desc return SIValue_Compare(*(SIValue*)b, *(SIValue*)a, NULL); } -// sorts the array in place in ascending\descending order +// sorts the array in place void SIArray_Sort ( - SIValue siarray, - bool ascending + SIValue siarray, // array to sort + bool ascending // sorting order ) { uint32_t arrayLen = SIArray_Length(siarray); @@ -128,7 +174,11 @@ void SIArray_Sort } } -SIValue SIArray_Clone(SIValue siarray) { +// clones an array, caller needs to free the array +SIValue SIArray_Clone +( + SIValue siarray // array to clone +) { uint arrayLen = SIArray_Length(siarray); SIValue newArray = SIArray_New(arrayLen); for(uint i = 0; i < arrayLen; i++) { @@ -137,12 +187,13 @@ SIValue SIArray_Clone(SIValue siarray) { return newArray; } +// prints an array into a given buffer void SIArray_ToString ( - SIValue list, - char **buf, - size_t *bufferLen, - size_t *bytesWritten + SIValue siarray, // array to print + char **buf, // print buffer + size_t *bufferLen, // print buffer length + size_t *bytesWritten // the actual number of bytes written to the buffer ) { if(*bufferLen - *bytesWritten < 64) { *bufferLen += 64; @@ -151,10 +202,10 @@ void SIArray_ToString // open array with "[" *bytesWritten += snprintf(*buf + *bytesWritten, *bufferLen, "["); - uint arrayLen = SIArray_Length(list); + uint arrayLen = SIArray_Length(siarray); for(uint i = 0; i < arrayLen; i ++) { // write the next value - SIValue_ToString(list.array[i], buf, bufferLen, bytesWritten); + SIValue_ToString(siarray.array[i], buf, bufferLen, bytesWritten); // if it is not the last element, add ", " if(i != arrayLen - 1) { if(*bufferLen - *bytesWritten < 64) { @@ -174,9 +225,11 @@ void SIArray_ToString *bytesWritten += snprintf(*buf + *bytesWritten, *bufferLen, "]"); } -// this method referenced by Java ArrayList.hashCode() method, which takes -// into account the hasing of nested values -XXH64_hash_t SIArray_HashCode(SIValue siarray) { + // returns the array hash code. +XXH64_hash_t SIArray_HashCode +( + SIValue siarray +) { SIType t = T_ARRAY; XXH64_hash_t hashCode = XXH64(&t, sizeof(t), 0); @@ -210,7 +263,11 @@ SIValue SIArray_FromBinary return arr; } -void SIArray_Free(SIValue siarray) { +// free an array +void SIArray_Free +( + SIValue siarray // array to free +) { uint arrayLen = SIArray_Length(siarray); for(uint i = 0; i < arrayLen; i++) { SIValue value = siarray.array[i]; diff --git a/src/datatypes/array.h b/src/datatypes/array.h index d0b83407b..b41c72f34 100644 --- a/src/datatypes/array.h +++ b/src/datatypes/array.h @@ -8,94 +8,94 @@ #include "../value.h" -/** - * @brief Initialize a new SIValue array type with given capacity - * @param initialCapacity: - * @retval Initialized array - */ -SIValue SIArray_New(u_int32_t initialCapacity); - -/** - * @brief Appends a new SIValue to a given array - * @param siarray: pointer to array - * @param value: new value - */ -void SIArray_Append(SIValue *siarray, SIValue value); - -/** - * @brief Returns a volatile copy of the SIValue from an array in a given index - * @note If index is out of bound, SI_NullVal is returned - * Caller is expected either to not free the returned value or take ownership on - * its own copy - * @param siarray: array - * @param index: index - * @retval The value in the requested index - */ -SIValue SIArray_Get(SIValue siarray, u_int32_t index); - -/** - * @brief Returns the array length - * @param siarray: - * @retval array length - */ -u_int32_t SIArray_Length(SIValue siarray); - -/** - * @brief Returns true if any of the types in 't' are contained in the array - or its nested array children, if any - * @param siarray: array - * @param t: bitmap of types to search for - * @retval a boolean indicating whether any types were matched - */ -bool SIArray_ContainsType(SIValue siarray, SIType t); - -/** - * @brief Returns true if the array contains an element equals to 'value' - * @param siarray: array - * @param value: value to search for - * @param comparedNull: indicate if there was a null comparison during the array scan - * @retval a boolean indicating whether value was found in siarray - */ -bool SIArray_ContainsValue(SIValue siarray, SIValue value, bool *comparedNull); - -/** - * @brief Returns true if all of the elements in the array are of type 't' - * @param siarray: array - * @param t: type to compare - * @retval a boolean indicating whether all elements are of type 't' - */ -bool SIArray_AllOfType(SIValue siarray, SIType t); - -/** - * @brief Sorts the array in place - * @param siarray: array to sort - * @param ascending: sort order - */ -void SIArray_Sort(SIValue siarray, bool ascending); - -/** - * @brief Returns a copy of the array - * @note The caller needs to free the array - * @param siarray: - * @retval A clone of the given array - */ -SIValue SIArray_Clone(SIValue siarray); - -/** - * @brief Prints an array into a given buffer - * @param list: array to print - * @param buf: print buffer (pointer to pointer to allow re allocation) - * @param len: print buffer length - * @param bytesWritten: the actual number of bytes written to the buffer - */ -void SIArray_ToString(SIValue list, char **buf, size_t *bufferLen, size_t *bytesWritten); - -/** - * @brief Returns the array hash code. - * @param siarray: SIArray. - * @retval The array hashCode. - */ -XXH64_hash_t SIArray_HashCode(SIValue siarray); +// initialize a new SIValue array type with given capacity +// returns initialized array +SIValue SIArray_New +( + u_int32_t initialCapacity // initial capacity +); + +// creates an SIArray from a raw 'arr.h' array +// SIArray takes ownership over 'raw' +SIValue SIArray_FromRaw +( + SIValue **raw // raw array +); + +// appends a new SIValue to a given array +void SIArray_Append +( + SIValue *siarray, // pointer to array + SIValue value // new value +); + +// returns a volatile copy of the SIValue from an array in a given index +// if index is out of bound, SI_NullVal is returned +// caller is expected either to not free the returned value or take ownership on +// its own copy +// returns the value in the requested index +SIValue SIArray_Get +( + SIValue siarray, // siarray: array + u_int32_t index // index: index +); + +// get the array length +u_int32_t SIArray_Length +( + SIValue siarray // array to return length of +); + +// returns true if any of the types in 't' are contained in the array +// or its nested array children, if any +bool SIArray_ContainsType +( + SIValue siarray, // array to inspect + SIType t // bitmap of types to search for +); + +// returns true if the array contains an element equals to 'value' +bool SIArray_ContainsValue +( + SIValue siarray, // array to search + SIValue value, // value to search for + bool *comparedNull // indicate if there was a null comparison +); + +// returns true if all of the elements in the array are of type 't' +bool SIArray_AllOfType +( + SIValue siarray, // array to inspect + SIType t // type to compare against +); + +// sorts the array in place +void SIArray_Sort +( + SIValue siarray, // array to sort + bool ascending // sorting order +); + +// clones an array, caller needs to free the array +SIValue SIArray_Clone +( + SIValue siarray // array to clone +); + +// prints an array into a given buffer +void SIArray_ToString +( + SIValue siarray, // array to print + char **buf, // print buffer + size_t *bufferLen, // print buffer length + size_t *bytesWritten // the actual number of bytes written to the buffer +); + + // returns the array hash code. +XXH64_hash_t SIArray_HashCode +( + SIValue siarray // array to hash +); // creates an array from its binary representation // this is the reverse of SIArray_ToBinary @@ -106,10 +106,9 @@ SIValue SIArray_FromBinary FILE *stream // stream containing binary representation of an array ); -/** - * @brief delete an array - * @param siarray: - * @retval None - */ -void SIArray_Free(SIValue siarray); +// free an array +void SIArray_Free +( + SIValue siarray // array to free +); diff --git a/src/datatypes/map.c b/src/datatypes/map.c index d7ca1fd77..0859f847a 100644 --- a/src/datatypes/map.c +++ b/src/datatypes/map.c @@ -74,6 +74,26 @@ SIValue Map_New return map; } +// create a map from keys and values arrays +// keys and values are both of length n +SIValue Map_FromArrays +( + const SIValue *keys, // keys + const SIValue *values, // values + uint n // arrays length +) { + ASSERT(keys != NULL); + ASSERT(values != NULL); + + SIValue map = Map_New(n); + + for(uint i = 0; i < n; i++) { + array_append(map.map, Pair_New(keys[i], values[i])); + } + + return map; +} + // clone map SIValue Map_Clone ( diff --git a/src/datatypes/map.h b/src/datatypes/map.h index 9c6f8d403..fa3a44455 100644 --- a/src/datatypes/map.h +++ b/src/datatypes/map.h @@ -37,6 +37,15 @@ SIValue Map_New uint capacity // map initial capacity ); +// create a map from keys and values arrays +// keys and values are both of length n +SIValue Map_FromArrays +( + const SIValue *keys, // keys + const SIValue *values, // values + uint n // arrays length +); + // clones map SIValue Map_Clone ( diff --git a/src/execution_plan/ops/op_load_csv.c b/src/execution_plan/ops/op_load_csv.c index 976141aa0..bbffb5056 100644 --- a/src/execution_plan/ops/op_load_csv.c +++ b/src/execution_plan/ops/op_load_csv.c @@ -5,6 +5,7 @@ #include "RG.h" #include "op_load_csv.h" +#include "../../datatypes/map.h" #include "../../datatypes/array.h" // forward declarations @@ -35,10 +36,57 @@ static bool _compute_path return true; } -// mock data -static char *A = "AAA"; -static char *B = "BB"; -static char *C = "C"; +// initialize CSV reader +static bool _Init_CSVReader +( + OpLoadCSV *op // load CSV operation +) { + ASSERT(op != NULL); + + // free old reader + if(op->reader != NULL) { + CSVReader_Free(op->reader); + } + + // initialize a new CSV reader + op->reader = CSVReader_New(op->path.stringval, op->with_headers, ','); + op->ncols = CSVReader_ColumnCount(op->reader); + + //-------------------------------------------------------------------------- + // save headers + //-------------------------------------------------------------------------- + + if(op->with_headers) { + // free old headers + if(op->headers != NULL) { + for(int i = 0; i < op->ncols; i++) { + SIValue_Free(op->headers[i]); + } + rm_free(op->headers); + } + + char *columns[op->ncols]; + size_t lengths[op->ncols]; + if(!CSVReader_GetHeaders(op->reader, (const char**)&columns, lengths)) { + ErrorCtx_RaiseRuntimeException("failed to read headers row"); + return false; + } + + // save headers + op->headers = rm_malloc(sizeof(SIValue) * op->ncols); + for(int i = 0; i < op->ncols; i++) { + size_t l = lengths[i]; + char *s = rm_malloc(l+1); + memcpy(s, columns[i], l); + s[l] = '\0'; // nullify + + // add value to row array + op->headers[i] = SI_TransferStringVal(s); + } + } + + return true; +} // get a single CSV row static bool _CSV_GetRow @@ -49,17 +97,50 @@ static bool _CSV_GetRow ASSERT(op != NULL); ASSERT(row != NULL); - if(!op->mock) return false; + char *values[op->ncols]; + size_t lengths[op->ncols]; - SIValue _row = SIArray_New(3); + // try to get a new row from CSV + if(!CSVReader_GetRow(op->reader, (const char**)&values, lengths)) { + // reached the end of the file + return false; + } + + //-------------------------------------------------------------------------- + // copy values + //-------------------------------------------------------------------------- - SIArray_Append(&_row, SI_ConstStringVal(A)); - SIArray_Append(&_row, SI_ConstStringVal(B)); - SIArray_Append(&_row, SI_ConstStringVal(C)); + if(op->with_headers) { + SIValue _row[op->ncols]; - *row = _row; + for(int i = 0; i < op->ncols; i++) { + size_t l = lengths[i]; + char *s = rm_malloc(l+1); + memcpy(s, values[i], l); + s[l] = '\0'; // nullify + + // add value to row array + _row[i] = SI_TransferStringVal(s); + } + + *row = Map_FromArrays(op->headers, _row, op->ncols); + } else { + SIValue *_row = array_new(SIValue, op->ncols); + + for(int i = 0; i < op->ncols; i++) { + size_t l = lengths[i]; + char *s = rm_malloc(l+1); + memcpy(s, values[i], l); + s[l] = '\0'; // nullify + + // add value to row array + array_append(_row, SI_TransferStringVal(s)); + } + + *row = SIArray_FromRaw(&_row); + ASSERT(_row == NULL); + } - op->mock = false; return true; } @@ -79,7 +160,6 @@ OpBase *NewLoadCSVOp op->exp = exp; op->path = SI_NullVal(); - op->mock = true; op->alias = strdup(alias); op->with_headers = with_headers; @@ -97,7 +177,7 @@ static OpResult LoadCSVInit ( OpBase *opBase ) { - // set operation consume function + // set operation's consume function OpLoadCSV *op = (OpLoadCSV*)opBase; // update consume function in case operation has a child @@ -108,12 +188,22 @@ static OpResult LoadCSVInit return OP_OK; } + //-------------------------------------------------------------------------- // no child operation evaluate path expression + //-------------------------------------------------------------------------- + + // try to evaluate expression Record r = OpBase_CreateRecord(opBase); if(!_compute_path(op, r)) { // failed to evaluate CSV path // update consume function - OpBase_DeleteRecord(r); + OpBase_UpdateConsume(opBase, LoadCSVConsumeDepleted); + } + OpBase_DeleteRecord(r); + + if(!_Init_CSVReader(op)) { + // failed to init CSV + // update consume function OpBase_UpdateConsume(opBase, LoadCSVConsumeDepleted); } @@ -154,10 +244,14 @@ static Record LoadCSVConsumeFromChild return NULL; } - op->mock = true; + // create a new CSV reader + if(!_Init_CSVReader(op)) { + return NULL; + } } // must have a record at this point + ASSERT(op->reader != NULL); ASSERT(op->child_record != NULL); // get a new CSV row @@ -229,6 +323,11 @@ static OpResult LoadCSVReset ( op->child_record = NULL; } + if(op->reader != NULL) { + CSVReader_Free(op->reader); + op->reader = NULL; + } + return OP_OK; } @@ -257,5 +356,18 @@ static void LoadCSVFree OpBase_DeleteRecord(op->child_record); op->child_record = NULL; } + + if(op->reader != NULL) { + CSVReader_Free(op->reader); + op->reader = NULL; + } + + if(op->headers != NULL) { + for(int i = 0; i < op->ncols; i++) { + SIValue_Free(op->headers[i]); + } + rm_free(op->headers); + op->headers = NULL; + } } diff --git a/src/execution_plan/ops/op_load_csv.h b/src/execution_plan/ops/op_load_csv.h index 9a3fb218c..37a24c4b7 100644 --- a/src/execution_plan/ops/op_load_csv.h +++ b/src/execution_plan/ops/op_load_csv.h @@ -7,16 +7,19 @@ #include "op.h" #include "../execution_plan.h" +#include "../../csv_reader/csv_reader.h" // load CSV typedef struct { OpBase op; // op base must be the first field in this struct + CSVReader reader; // CSV reader AR_ExpNode *exp; // expression evaluated to CSV path SIValue path; // CSV path char *alias; // CSV row alias int recIdx; // record index to populate with CSV row - bool mock; // mock CSV row + size_t ncols; // number of columns bool with_headers; // CSV contains header row + SIValue *headers; // header columns OpBase *child; // child operation Record child_record; // child record } OpLoadCSV; diff --git a/tests/flow/test_load_csv.py b/tests/flow/test_load_csv.py index 5574943f2..af6309195 100644 --- a/tests/flow/test_load_csv.py +++ b/tests/flow/test_load_csv.py @@ -1,54 +1,176 @@ +import csv from common import * -import re GRAPH_ID = "load_csv" +EMPTY_CSV = "empty.csv" +EMPTY_CSV_RELATIVE_PATH = "./tests/flow/" + EMPTY_CSV +EMPTY_CSV_HEADER = [] +EMPTY_CSV_DATA = [] + +SHORT_CSV_WITH_HEADERS = "short_with_header.csv" +SHORT_CSV_WITH_HEADERS_RELATIVE_PATH = "./tests/flow/" + SHORT_CSV_WITH_HEADERS +SHORT_CSV_WITH_HEADERS_HEADER = [["First Name", "Last Name"]] +SHORT_CSV_WITH_HEADERS_DATA = [["Roi", "Lipman"], + ["Hila", "Lipman"], + ["Adam", "Lipman"], + ["Yoav", "Lipman"]] + +SHORT_CSV_WITHOUT_HEADERS = "short_without_header.csv" +SHORT_CSV_WITHOUT_HEADERS_RELATIVE_PATH = "./tests/flow/" + SHORT_CSV_WITHOUT_HEADERS +SHORT_CSV_WITHOUT_HEADERS_HEADER = [] +SHORT_CSV_WITHOUT_HEADERS_DATA = [["Roi", "Lipman"], + ["Hila", "Lipman"], + ["Adam", "Lipman"], + ["Yoav", "Lipman"]] + +# write a CSV file using 'name' as the file name +# 'header' [optional] as the first row +# 'data' [optional] CSV rows +def create_csv_file(name, header, data, delimiter=','): + with open(name, 'w') as f: + writer = csv.writer(f) + data = header + data + writer.writerows(data) + +# create an empty CSV file +def create_empty_csv(): + name = EMPTY_CSV + DATA = EMPTY_CSV_DATA + HEADER = EMPTY_CSV_HEADER + create_csv_file(name, HEADER, DATA) + + return name + +# create a short CSV file with a header row +def create_short_csv_with_header(): + name = SHORT_CSV_WITH_HEADERS + DATA = SHORT_CSV_WITH_HEADERS_DATA + HEADER = SHORT_CSV_WITH_HEADERS_HEADER + + create_csv_file(name, HEADER, DATA) + + return name + +# create a short CSV file without a header row +def create_short_csv_without_header(): + name = SHORT_CSV_WITHOUT_HEADERS + DATA = SHORT_CSV_WITHOUT_HEADERS_DATA + HEADER = SHORT_CSV_WITHOUT_HEADERS_HEADER + + create_csv_file(name, HEADER, DATA) + + return name + class testLoadCSV(): def __init__(self): self.env, self.db = Env() self.graph = self.db.select_graph(GRAPH_ID) + + # create CSV files + create_empty_csv() + create_short_csv_with_header() + create_short_csv_without_header() + # test invalid invocations of the LOAD CSV command def test01_invalid_call(self): queries = ["LOAD CSV FROM a AS row RETURN row", + "LOAD CSV WITH HEADERS FROM a AS row RETURN row", + "LOAD CSV FROM 2 AS row RETURN row", + "LOAD CSV WITH HEADERS FROM 2 AS row RETURN row", + "LOAD CSV FROM $arr AS row RETURN row", - "WITH 2 AS x LOAD CSV FROM x AS row RETURN row"] + "LOAD CSV WITH HEADERS FROM $arr AS row RETURN row", + + "WITH 2 AS x LOAD CSV FROM x AS row RETURN row", + "WITH 2 AS x LOAD CSV WITH HEADERS FROM x AS row RETURN row" + ] for q in queries: try: self.graph.query(q, {'arr': []}) self.env.assertFalse(True) except Exception as e: - self.env.assertEquals(str(e), "path to CSV must be a string") + continue def test02_project_csv_rows(self): + g = self.graph + # project all rows in a CSV file - q = """LOAD CSV FROM 'data.csv' AS row + q = """LOAD CSV FROM $file AS row RETURN row""" - result = self.graph.query(q).result_set - self.env.assertEquals(result[0][0], ['AAA', 'BB', 'C']) + datasets = [(EMPTY_CSV_RELATIVE_PATH, []), + (SHORT_CSV_WITH_HEADERS_RELATIVE_PATH, [*SHORT_CSV_WITH_HEADERS_HEADER, *SHORT_CSV_WITH_HEADERS_DATA]), + (SHORT_CSV_WITHOUT_HEADERS_RELATIVE_PATH, [*SHORT_CSV_WITHOUT_HEADERS_HEADER, *SHORT_CSV_WITHOUT_HEADERS_DATA])] - def test03_load_csv_multiple_times(self): - # project the same CSV multiple times - q = """UNWIND [1,2,3] AS x - LOAD CSV FROM 'data.csv' AS row - RETURN x, row - ORDER BY x""" + for dataset in datasets: + # project all rows from CSV file + file_name = dataset[0] + expected = dataset[1] - result = self.graph.query(q).result_set - self.env.assertEquals(result[0], [1, ['AAA', 'BB', 'C']]) - self.env.assertEquals(result[1], [2, ['AAA', 'BB', 'C']]) - self.env.assertEquals(result[2], [3, ['AAA', 'BB', 'C']]) + result = g.query(q, {'file': file_name}).result_set + for i, row in enumerate(result): + # validate result + self.env.assertEquals(row[0], expected[i]) + + def test03_project_csv_as_map(self): + g = self.graph - def test04_dynamic_csv_path(self): # project all rows in a CSV file - q = """UNWIND ['a', 'b'] AS x - LOAD CSV FROM x + '.csv' AS row + q = """LOAD CSV WITH HEADERS FROM $file AS row + RETURN row""" + + datasets = [(EMPTY_CSV_RELATIVE_PATH, EMPTY_CSV_HEADER, EMPTY_CSV_DATA), + (SHORT_CSV_WITHOUT_HEADERS_RELATIVE_PATH, SHORT_CSV_WITHOUT_HEADERS_DATA[0], SHORT_CSV_WITHOUT_HEADERS_DATA[1:]), + (SHORT_CSV_WITH_HEADERS_RELATIVE_PATH, SHORT_CSV_WITH_HEADERS_HEADER[0], SHORT_CSV_WITH_HEADERS_DATA)] + + for dataset in datasets: + file = dataset[0] + columns = dataset[1] + data = dataset[2] + + expected = [] + for row in data: + obj = {} + for idx, column in enumerate(columns): + obj[column] = row[idx] + expected.append(obj) + + result = g.query(q, {'file': file}).result_set + self.env.assertEquals(result, expected) + + def _test04_load_csv_multiple_times(self): + # project the same CSV multiple times + q = """UNWIND range(0, 3) AS x + LOAD CSV FROM $file AS row RETURN x, row ORDER BY x""" - result = self.graph.query(q).result_set - self.env.assertEquals(result[0], ['a', ['AAA', 'BB', 'C']]) - self.env.assertEquals(result[1], ['b', ['AAA', 'BB', 'C']]) + result = self.graph.query(q, {'file': SHORT_CSV_WITHOUT_HEADERS}).result_set + + expected = [] + for i in range(3): + for row in SHORT_CSV_WITHOUT_HEADERS_DATA: + expected.append([i, row]) + + self.env.assertEquals(result, expected) + + def _test05_load_multiple_files(self): + g = self.graph + + # project multiple CSV files + q = """LOAD CSV FROM $file_1 AS row + WITH collect(row) as file_1_rows + LOAD CSV FROM $file_2 AS row + RETURN file_1_rows, collect(row) as file_2_rows + """ + result = g.query(q, {'file_1': SHORT_CSV_WITHOUT_HEADERS, + 'file_2': SHORT_CSV_WITH_HEADERS}).result_set + + file_1_rows = SHORT_CSV_WITHOUT_HEADERS_DATA + file_2_rows = [SHORT_CSV_WITH_HEADERS_HEADER + SHORT_CSV_WITH_HEADERS_DATA] + self.env.assertEquals(result[0], file_1_rows) + self.env.assertEquals(result[1], file_2_rows) From 90f0dcb588c79ac24f576bec599fcfea119e105d Mon Sep 17 00:00:00 2001 From: Roi Lipman Date: Sun, 7 Apr 2024 13:47:31 +0300 Subject: [PATCH 6/6] handle malformed csv files --- src/csv_reader/csv_reader.h | 7 ++- src/errors/error_msgs.h | 5 ++- src/execution_plan/ops/op_load_csv.c | 25 ++++++++--- tests/flow/test_load_csv.py | 67 +++++++++++++++++++++------- 4 files changed, 81 insertions(+), 23 deletions(-) diff --git a/src/csv_reader/csv_reader.h b/src/csv_reader/csv_reader.h index 1796cad26..a902116e3 100644 --- a/src/csv_reader/csv_reader.h +++ b/src/csv_reader/csv_reader.h @@ -36,8 +36,11 @@ bool CSVReader_GetHeaders // extract the current row // length of 'values' and 'lengths' arrays must be the same -// returns true on success false indicates either an error or EOF -bool CSVReader_GetRow +// returns: +// 0 success +// -1 failed to get row +// -2 end of file +int CSVReader_GetRow ( const CSVReader reader, // CSV reader const char **values, // row values diff --git a/src/errors/error_msgs.h b/src/errors/error_msgs.h index ff47ca279..5c241fed2 100644 --- a/src/errors/error_msgs.h +++ b/src/errors/error_msgs.h @@ -125,4 +125,7 @@ #define EMSG_INDEX_CANT_RECONFIG "Can not override index configuration" #define EMSG_REMOVE_INVALID_INPUT "REMOVE operates on either a node, relationship or a map" #define EMSG_VECTOR_DIMENSION_MISMATCH "Vector dimension mismatch, expected %d but got %d" - +#define EMSG_INVALID_CSV_PATH "path to CSV must be a string" +#define EMSG_FAILED_TO_LOAD_CSV "failed to open CSV file: %s" +#define EMSG_FAILED_TO_READ_CSV_ROW "Failed to get next row" +#define EMSG_FAILED_TO_READ_CSV_HEADERS "failed to read headers row" diff --git a/src/execution_plan/ops/op_load_csv.c b/src/execution_plan/ops/op_load_csv.c index bbffb5056..3007e9384 100644 --- a/src/execution_plan/ops/op_load_csv.c +++ b/src/execution_plan/ops/op_load_csv.c @@ -29,7 +29,7 @@ static bool _compute_path op->path = AR_EXP_Evaluate(op->exp, r); if(SI_TYPE(op->path) != T_STRING) { - ErrorCtx_RaiseRuntimeException("path to CSV must be a string"); + ErrorCtx_RaiseRuntimeException(EMSG_INVALID_CSV_PATH); return false; } @@ -49,8 +49,16 @@ static bool _Init_CSVReader } // initialize a new CSV reader - op->reader = CSVReader_New(op->path.stringval, op->with_headers, ','); - op->ncols = CSVReader_ColumnCount(op->reader); + const char *path = op->path.stringval; + op->reader = CSVReader_New(path, op->with_headers, ','); + + // raise exception if we've failed to initialize a new CSV reader + if(op->reader == NULL) { + ErrorCtx_RaiseRuntimeException(EMSG_FAILED_TO_LOAD_CSV, path); + return false; + } + + op->ncols = CSVReader_ColumnCount(op->reader); //-------------------------------------------------------------------------- // save headers @@ -68,7 +76,7 @@ static bool _Init_CSVReader char *columns[op->ncols]; size_t lengths[op->ncols]; if(!CSVReader_GetHeaders(op->reader, (const char**)&columns, lengths)) { - ErrorCtx_RaiseRuntimeException("failed to read headers row"); + ErrorCtx_RaiseRuntimeException(EMSG_FAILED_TO_READ_CSV_HEADERS); return false; } @@ -101,11 +109,18 @@ static bool _CSV_GetRow size_t lengths[op->ncols]; // try to get a new row from CSV - if(!CSVReader_GetRow(op->reader, (const char**)&values, lengths)) { + int res = CSVReader_GetRow(op->reader, (const char**)&values, lengths); + if(res == -1) { + ErrorCtx_RaiseRuntimeException(EMSG_FAILED_TO_READ_CSV_ROW); + return false; + } else if (res == -2) { // reached the end of the file return false; } + // 0 indicates success + ASSERT(res == 0); + //-------------------------------------------------------------------------- // copy values //-------------------------------------------------------------------------- diff --git a/tests/flow/test_load_csv.py b/tests/flow/test_load_csv.py index af6309195..4249c1577 100644 --- a/tests/flow/test_load_csv.py +++ b/tests/flow/test_load_csv.py @@ -1,5 +1,6 @@ import csv from common import * +from collections import OrderedDict GRAPH_ID = "load_csv" @@ -24,6 +25,14 @@ ["Adam", "Lipman"], ["Yoav", "Lipman"]] +MALFORMED_CSV = "malformed.csv" +MALFORMED_CSV_RELATIVE_PATH = "./tests/flow/" + MALFORMED_CSV +MALFORMED_CSV_HEADER = [["FirstName", "LastName"]] +MALFORMED_CSV_DATA = [["Roi", "Lipman"], + ["Hila", "Lipman"], + ["Adam", "Lipman"], + ["Yoav", "Lipman", "Extra"]] + # write a CSV file using 'name' as the file name # 'header' [optional] as the first row # 'data' [optional] CSV rows @@ -62,6 +71,16 @@ def create_short_csv_without_header(): return name +# create a malformed CSV file +def create_malformed_csv(): + name = MALFORMED_CSV + DATA = MALFORMED_CSV_DATA + HEADER = MALFORMED_CSV_HEADER + + create_csv_file(name, HEADER, DATA) + + return name + class testLoadCSV(): def __init__(self): self.env, self.db = Env() @@ -69,6 +88,7 @@ def __init__(self): # create CSV files create_empty_csv() + create_malformed_csv() create_short_csv_with_header() create_short_csv_without_header() @@ -94,7 +114,25 @@ def test01_invalid_call(self): except Exception as e: continue - def test02_project_csv_rows(self): + def test02_none_existing_csv_file(self): + q = "LOAD CSV FROM 'none_existing.csv' AS row RETURN row" + try: + self.graph.query(q) + self.env.assertFalse(True) + except Exception as e: + # failed to open CSV file: a + pass + + def test03_malformed_csv(self): + q = "LOAD CSV FROM $file AS row RETURN row" + try: + self.graph.query(q, {'file': MALFORMED_CSV_RELATIVE_PATH}) + self.env.assertFalse(True) + except Exception as e: + # failed to process malformed csv + pass + + def test04_project_csv_rows(self): g = self.graph # project all rows in a CSV file @@ -115,7 +153,7 @@ def test02_project_csv_rows(self): # validate result self.env.assertEquals(row[0], expected[i]) - def test03_project_csv_as_map(self): + def test05_project_csv_as_map(self): g = self.graph # project all rows in a CSV file @@ -133,31 +171,30 @@ def test03_project_csv_as_map(self): expected = [] for row in data: - obj = {} + obj = OrderedDict() for idx, column in enumerate(columns): obj[column] = row[idx] - expected.append(obj) + expected.append([obj]) result = g.query(q, {'file': file}).result_set self.env.assertEquals(result, expected) - def _test04_load_csv_multiple_times(self): + def test06_load_csv_multiple_times(self): # project the same CSV multiple times q = """UNWIND range(0, 3) AS x LOAD CSV FROM $file AS row - RETURN x, row - ORDER BY x""" + RETURN x, row""" - result = self.graph.query(q, {'file': SHORT_CSV_WITHOUT_HEADERS}).result_set + result = self.graph.query(q, {'file': SHORT_CSV_WITHOUT_HEADERS_RELATIVE_PATH}).result_set expected = [] - for i in range(3): + for i in range(4): for row in SHORT_CSV_WITHOUT_HEADERS_DATA: expected.append([i, row]) self.env.assertEquals(result, expected) - def _test05_load_multiple_files(self): + def test07_load_multiple_files(self): g = self.graph # project multiple CSV files @@ -166,11 +203,11 @@ def _test05_load_multiple_files(self): LOAD CSV FROM $file_2 AS row RETURN file_1_rows, collect(row) as file_2_rows """ - result = g.query(q, {'file_1': SHORT_CSV_WITHOUT_HEADERS, - 'file_2': SHORT_CSV_WITH_HEADERS}).result_set + result = g.query(q, {'file_1': SHORT_CSV_WITHOUT_HEADERS_RELATIVE_PATH, + 'file_2': SHORT_CSV_WITH_HEADERS_RELATIVE_PATH}).result_set file_1_rows = SHORT_CSV_WITHOUT_HEADERS_DATA - file_2_rows = [SHORT_CSV_WITH_HEADERS_HEADER + SHORT_CSV_WITH_HEADERS_DATA] + file_2_rows = SHORT_CSV_WITH_HEADERS_HEADER + SHORT_CSV_WITH_HEADERS_DATA - self.env.assertEquals(result[0], file_1_rows) - self.env.assertEquals(result[1], file_2_rows) + self.env.assertEquals(result[0][0], file_1_rows) + self.env.assertEquals(result[0][1], file_2_rows)