Skip to content

Commit

Permalink
[WIP] load csv op
Browse files Browse the repository at this point in the history
  • Loading branch information
swilly22 committed Mar 17, 2024
1 parent 92db8e4 commit 38ecd52
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 68 deletions.
155 changes: 89 additions & 66 deletions src/execution_plan/ops/op_load_csv.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,78 @@
// forward declarations
static OpResult LoadCSVInit(OpBase *opBase);
static Record LoadCSVConsume(OpBase *opBase);
static Record LoadCSVConsumeDepleted(OpBase *opBase);
static Record LoadCSVConsumeFromChild(OpBase *opBase);
static OpBase *LoadCSVClone(const ExecutionPlan *plan, const OpBase *opBase);
static OpResult LoadCSVReset(OpBase *opBase);
static void LoadCSVFree(OpBase *opBase);

// evaluate path expression
// expression must evaluate to string representing a valid URI
// if that's not the case an exception is raised
static bool _compute_path
(
OpLoadCSV *op,
Record r
) {
ASSERT(op != NULL);

op->path = AR_EXP_Evaluate(op->exp, r);
if(SI_TYPE(op->path) != T_STRING) {
ErrorCtx_RaiseRuntimeException("path to CSV must be a string");
return false;
}

return true;
}

// mock data
static char *A = "AAA";
static char *B = "BB";
static char *C = "C";

// get a single CSV row
static bool _CSV_GetRow
(
OpLoadCSV *op, // load CSV operation
SIValue *row // row to populate
) {
ASSERT(op != NULL);
ASSERT(row != NULL);

if(!op->mock) return false;

SIValue _row = SIArray_New(3);

SIArray_Append(&_row, SI_ConstStringVal(A));
SIArray_Append(&_row, SI_ConstStringVal(B));
SIArray_Append(&_row, SI_ConstStringVal(C));

*row = _row;

op->mock = false;
return true;
}

// create a new load CSV operation
OpBase *NewLoadCSVOp
(
const ExecutionPlan *plan, // execution plan
AR_ExpNode *exp, // CSV URI path expression
const char *alias // CSV row alias
const char *alias, // CSV row alias
bool with_headers // CSV contains header row
) {
ASSERT(exp != NULL);
ASSERT(plan != NULL);
ASSERT(alias != NULL);

OpLoadCSV *op = rm_calloc(1, sizeof(OpLoadCSV));

op->exp = exp;
op->alias = strdup(alias);
op->exp = exp;
op->path = SI_NullVal();
op->mock = true;
op->alias = strdup(alias);
op->with_headers = with_headers;

// Set our Op operations
OpBase_Init((OpBase *)op, OPType_LOAD_CSV, "Load CSV", LoadCSVInit,
Expand All @@ -40,61 +92,40 @@ OpBase *NewLoadCSVOp
return (OpBase *)op;
}

// initialize operation
static OpResult LoadCSVInit
(
OpBase *opBase
) {
// set operation consume function

OpLoadCSV *op = (OpLoadCSV*)opBase;
// update consume function in case operation has a child
if(OpBase_ChildCount(opBase) > 0) {
OpLoadCSV *op = (OpLoadCSV*)opBase;
op->child = OpBase_GetChild(opBase, 0);
OpBase_UpdateConsume(opBase, LoadCSVConsumeFromChild);
}

return OP_OK;
}

// evaluate path expression
// expression must evaluate to string representing a valid URI
// if that's not the case an exception is raised
static bool _compute_path
(
OpLoadCSV *op
) {
ASSERT(op != NULL);

SIValue v = AR_EXP_Evaluate(op->exp, NULL);
if(SI_TYPE(v) != T_STRING) {
ErrorCtx_RaiseRuntimeException("path to CSV must be a string");
return false;
return OP_OK;
}

op->path = v.stringval;
// no child operation evaluate path expression
Record r = OpBase_CreateRecord(opBase);
if(!_compute_path(op, r)) {
// failed to evaluate CSV path
// update consume function
OpBase_DeleteRecord(r);
OpBase_UpdateConsume(opBase, LoadCSVConsumeDepleted);
}

return true;
return OP_OK;
}

static const char *A = "AAA";
static const char *B = "BB";
static const char *C = "C";

static bool _CSV_GetRow
// simply return NULL indicating operation depleted
static Record LoadCSVConsumeDepleted
(
OpLoadCSV *op,
SIValue *row
OpBase *opBase
) {
ASSERT(op != NULL);
ASSERT(row != NULL);

SIValue _row = SIArray_New(3);

SIArray_Append(&_row, SI_ConstStringVal(A));
SIArray_Append(&_row, SI_ConstStringVal(B));
SIArray_Append(&_row, SI_ConstStringVal(C));

*row = _row;

return true;
return NULL;
}

// load CSV consume function in case this operation in not a tap
Expand All @@ -108,12 +139,6 @@ static Record LoadCSVConsumeFromChild

pull_from_child:

// first call, evaluate CSV path
if(op->path == NULL && !_compute_path(op)) {
// failed to evaluate CSV path, quickly return
return NULL;
}

// in case a record is missing ask child to provide one
if(op->child_record == NULL) {
op->child_record = OpBase_Consume(op->child);
Expand All @@ -122,6 +147,14 @@ static Record LoadCSVConsumeFromChild
if(op->child_record == NULL) {
return NULL;
}

// first call, evaluate CSV path
if(!_compute_path(op, op->child_record)) {
// failed to evaluate CSV path, quickly return
return NULL;
}

op->mock = true;
}

// must have a record at this point
Expand All @@ -136,8 +169,8 @@ static Record LoadCSVConsumeFromChild
op->child_record = NULL;

// free CSV path, just in case it relies on record data
rm_free(op->path);
op->path = NULL;
SIValue_Free(op->path);
op->path = SI_NullVal();

// try to get a new record from child
goto pull_from_child;
Expand All @@ -160,12 +193,6 @@ static Record LoadCSVConsume

OpLoadCSV *op = (OpLoadCSV*)opBase;

// first call, evaluate CSV path
if(op->path == NULL && !_compute_path(op)) {
// failed to evaluate CSV path, quickly return
return NULL;
}

SIValue row;
Record r = NULL;

Expand All @@ -185,18 +212,17 @@ static inline OpBase *LoadCSVClone
ASSERT(opBase->type == OPType_LOAD_CSV);

OpLoadCSV *op = (OpLoadCSV*)opBase;
return NewLoadCSVOp(plan, AR_EXP_Clone(op->exp), op->alias);
return NewLoadCSVOp(plan, AR_EXP_Clone(op->exp), op->alias,
op->with_headers);
}

static OpResult LoadCSVReset (
OpBase *opBase
) {
OpLoadCSV *op = (OpLoadCSV*)opBase;

if(op->path != NULL) {
rm_free(op->path);
op->path = NULL;
}
SIValue_Free(op->path);
op->path = SI_NullVal();

if(op->child_record != NULL) {
OpBase_DeleteRecord(op->child_record);
Expand All @@ -215,16 +241,13 @@ static void LoadCSVFree

OpLoadCSV *op = (OpLoadCSV*)opBase;

SIValue_Free(op->path);

if(op->exp != NULL) {
AR_EXP_Free(op->exp);
op->exp = NULL;
}

if(op->path != NULL) {
rm_free(op->path);
op->path = NULL;
}

if(op->alias != NULL) {
rm_free(op->alias);
op->alias = NULL;
Expand Down
6 changes: 4 additions & 2 deletions src/execution_plan/ops/op_load_csv.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
typedef struct {
OpBase op; // op base must be the first field in this struct
AR_ExpNode *exp; // expression evaluated to CSV path
char *path; // CSV path
SIValue path; // CSV path
char *alias; // CSV row alias
int recIdx; // record index to populate with CSV row
bool mock; // mock CSV row
bool with_headers; // CSV contains header row
OpBase *child; // child operation
Record child_record; // child record
} OpLoadCSV;
Expand All @@ -24,7 +26,7 @@ OpBase *NewLoadCSVOp
(
const ExecutionPlan *plan, // execution plan
AR_ExpNode *exp, // CSV URI path expression
const char *alias // CSV row alias
const char *alias, // CSV row alias
bool with_headers // CSV contains header row
);

0 comments on commit 38ecd52

Please sign in to comment.