Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ref count record #663

Merged
merged 16 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions src/arithmetic/list_funcs/list_funcs.c
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,6 @@ SIValue AR_HEAD(SIValue *argv, int argc, void *private_data) {
uint arrayLen = SIArray_Length(value);
if(arrayLen == 0) return SI_NullVal();
SIValue retval = SIArray_Get(value, 0);
SIValue_Persist(&retval);
return retval;
}

Expand All @@ -413,7 +412,6 @@ SIValue AR_LAST(SIValue *argv, int argc, void *private_data) {
uint arrayLen = SIArray_Length(value);
if(arrayLen == 0) return SI_NullVal();
SIValue retval = SIArray_Get(value, arrayLen-1);
SIValue_Persist(&retval);
return retval;
}

Expand Down
34 changes: 28 additions & 6 deletions src/execution_plan/execution_plan.c
Original file line number Diff line number Diff line change
Expand Up @@ -385,20 +385,42 @@ inline rax *ExecutionPlan_GetMappings(const ExecutionPlan *plan) {
return plan->record_map;
}

Record ExecutionPlan_BorrowRecord(ExecutionPlan *plan) {
Record ExecutionPlan_BorrowRecord
(
ExecutionPlan *plan
) {
rax *mapping = ExecutionPlan_GetMappings(plan);
ASSERT(plan->record_pool);

// Get a Record from the pool and set its owner and mapping.
// get a Record from the pool and set its owner and mapping
Record r = ObjectPool_NewItem(plan->record_pool);
r->owner = plan;
r->mapping = mapping;

r->owner = plan;
r->mapping = mapping;
r->ref_count = 1;

return r;
}

void ExecutionPlan_ReturnRecord(const ExecutionPlan *plan, Record r) {
void ExecutionPlan_ReturnRecord
(
const ExecutionPlan *plan,
Record r
) {
ASSERT(plan && r);
ObjectPool_DeleteItem(plan->record_pool, r);
ASSERT(r->ref_count > 0);

// decrease record ref count
r->ref_count--;

// free record when ref count reached 0
if(r->ref_count == 0) {
// call recursively for parent
if(r->parent != NULL) {
ExecutionPlan_ReturnRecord(r->parent->owner, r->parent);
}
ObjectPool_DeleteItem(plan->record_pool, r);
}
}

//------------------------------------------------------------------------------
Expand Down
33 changes: 23 additions & 10 deletions src/execution_plan/ops/op.c
Original file line number Diff line number Diff line change
Expand Up @@ -295,15 +295,11 @@ Record OpBase_CloneRecord
) {
Record clone = ExecutionPlan_BorrowRecord((struct ExecutionPlan *)r->owner);
Record_Clone(r, clone);
return clone;
}

Record OpBase_DeepCloneRecord
(
Record r
) {
Record clone = ExecutionPlan_BorrowRecord((struct ExecutionPlan *)r->owner);
Record_DeepClone(r, clone);
// increase r's ref count and set r as clone's parent
r->ref_count++;
clone->parent = r;

return clone;
}

Expand Down Expand Up @@ -337,9 +333,26 @@ OpBase *OpBase_GetChild

inline void OpBase_DeleteRecord
(
Record r
Record *r
) {
ExecutionPlan_ReturnRecord((*r)->owner, *r);
// nullify record
*r = NULL;
}

// merge src into dest and deletes src
void OpBase_MergeRecords
(
Record dest, // entries are merged into this record
Record *src, // entries are merged from this record
bool override // override existing entries within dest
) {
ExecutionPlan_ReturnRecord(r->owner, r);
ASSERT(dest != NULL);
ASSERT(src != NULL && *src != NULL);
ASSERT(dest != *src);

Record_Merge(dest, *src, override);
OpBase_DeleteRecord(src);
}

OpBase *OpBase_Clone
Expand Down
15 changes: 9 additions & 6 deletions src/execution_plan/ops/op.h
Original file line number Diff line number Diff line change
Expand Up @@ -287,14 +287,17 @@ Record OpBase_CloneRecord
Record r
);

// deep clones given record
Record OpBase_DeepCloneRecord
// release record
void OpBase_DeleteRecord
(
Record r
Record *r
);

// release record
void OpBase_DeleteRecord
// merge src into dest and deletes src
void OpBase_MergeRecords
(
Record r
Record dest, // entries are merged into this record
Record *src, // entries are merged from this record
bool override // override existing entries within dest
);

21 changes: 16 additions & 5 deletions src/execution_plan/ops/op_aggregate.c
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ static void _aggregateRecord
AR_EXP_Aggregate(exp, r);
}

OpBase_DeleteRecord(r);
OpBase_DeleteRecord(&r);
swilly22 marked this conversation as resolved.
Show resolved Hide resolved
}

// returns a record populated with group data
Expand All @@ -206,7 +206,14 @@ static Record _handoff
for(uint i = 0; i < op->key_count; i++) {
int rec_idx = op->record_offsets[i];
// non-aggregated expression
SIValue key = SI_ShareValue(keys[i]);
SIValue key;
if(SI_ALLOCATION(keys+i) == M_SELF) {
key = SI_TransferOwnership(keys+i);
} else {
key = keys[i];
SIValue_Persist(&key);
}

Record_Add(r, rec_idx, key);
}

Expand All @@ -216,9 +223,14 @@ static Record _handoff
AR_ExpNode *exp = g->agg[i];

SIValue agg = AR_EXP_FinalizeAggregations(exp, r);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

finalize aggregation one time value returning delete persist

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Proved to be more difficult then expected, although for the most part e.g. count, max, min this change is meaningless, the only place where this can show benefit is with the collect function, which causes a redundant clone of the array.

SIValue_Persist(&agg);
Record_AddScalar(r, rec_idx, agg);
}

// free group
Group_Free(g);
HashTableSetVal(op->groups, entry, NULL);

return r;
}

Expand Down Expand Up @@ -314,7 +326,7 @@ static Record AggregateConsume
_GetGroup(op, r);

// free record
OpBase_DeleteRecord(r);
OpBase_DeleteRecord(&r);
swilly22 marked this conversation as resolved.
Show resolved Hide resolved
}

// create group iterator
Expand Down Expand Up @@ -438,7 +450,6 @@ static void AggregateFree
}

if(op->r) {
OpBase_DeleteRecord(op->r);
op->r = NULL;
OpBase_DeleteRecord(&op->r);
swilly22 marked this conversation as resolved.
Show resolved Hide resolved
}
}
7 changes: 3 additions & 4 deletions src/execution_plan/ops/op_all_node_scan.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ static Record AllNodeScanConsumeFromChild(OpBase *opBase) {
Node n = GE_NEW_NODE();
n.attributes = DataBlockIterator_Next(op->iter, &n.id);
if(n.attributes == NULL) {
OpBase_DeleteRecord(op->child_record); // Free old record.
OpBase_DeleteRecord(&op->child_record); // Free old record.
// Pull a new record from child.
op->child_record = OpBase_Consume(op->op.children[0]);
if(op->child_record == NULL) return NULL; // Child depleted.
Expand All @@ -68,7 +68,7 @@ static Record AllNodeScanConsumeFromChild(OpBase *opBase) {
}

// Clone the held Record, as it will be freed upstream.
Record r = OpBase_DeepCloneRecord(op->child_record);
Record r = OpBase_CloneRecord(op->child_record);

// Populate the Record with the graph entity data.
Record_AddNode(r, op->nodeRecIdx, n);
Expand Down Expand Up @@ -108,8 +108,7 @@ static void AllNodeScanFree(OpBase *ctx) {
}

if(op->child_record) {
OpBase_DeleteRecord(op->child_record);
op->child_record = NULL;
OpBase_DeleteRecord(&op->child_record);
}
}

17 changes: 9 additions & 8 deletions src/execution_plan/ops/op_apply.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,21 +47,24 @@ static OpResult ApplyInit(OpBase *opBase) {
return OP_OK;
}

static Record ApplyConsume(OpBase *opBase) {
static Record ApplyConsume
(
OpBase *opBase
) {
Apply *op = (Apply *)opBase;

while(true) {
if(op->r == NULL) {
// retrieve a Record from the bound branch
op->r = OpBase_Consume(op->bound_branch);
if(op->r == NULL) {
return NULL; // Bound branch and this op are depleted.
return NULL; // bound branch and this op are depleted
}

// collect record for future freeing
array_append(op->records, op->r);

// Successfully pulled a new Record, propagate to the top of the RHS branch.
// successfully pulled a new Record, propagate to the top of the RHS branch
if(op->op_arg) {
Argument_AddRecord(op->op_arg, OpBase_CloneRecord(op->r));
}
Expand All @@ -81,9 +84,7 @@ static Record ApplyConsume(OpBase *opBase) {

// clone the bound Record and merge the RHS Record into it
Record r = OpBase_CloneRecord(op->r);
Record_Merge(r, rhs_record);
// delete the RHS record, as it has been merged into r
OpBase_DeleteRecord(rhs_record);
OpBase_MergeRecords(r, &rhs_record, false);

return r;
}
Expand All @@ -98,7 +99,7 @@ static OpResult ApplyReset(OpBase *opBase) {
// free collected records
uint32_t n = array_len(op->records);
for(uint32_t i = 0; i < n; i++) {
OpBase_DeleteRecord(op->records[i]);
OpBase_DeleteRecord(op->records+i);
}
array_clear(op->records);

Expand All @@ -116,7 +117,7 @@ static void ApplyFree(OpBase *opBase) {
if(op->records != NULL) {
uint32_t n = array_len(op->records);
for(uint32_t i = 0; i < n; i++) {
OpBase_DeleteRecord(op->records[i]);
OpBase_DeleteRecord(op->records+i);
}

array_free(op->records);
Expand Down
20 changes: 8 additions & 12 deletions src/execution_plan/ops/op_apply_multiplexer.c
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,14 @@ static Record OrMultiplexer_Consume(OpBase *opBase) {
Record branch_record = _pullFromBranchStream(op, i);
if(branch_record) {
// Don't care about the branch record.
OpBase_DeleteRecord(branch_record);
OpBase_DeleteRecord(&branch_record);
Record r = op->r;
op->r = NULL; // Null to avoid double free.
op->r = NULL; // null to avoid double free
return r;
}
}
// Did not managed to get a record from any branch, loop back and restart.
OpBase_DeleteRecord(op->r);
op->r = NULL;
// did not managed to get a record from any branch, loop back and restart
OpBase_DeleteRecord(&op->r);
}
}

Expand All @@ -125,11 +124,10 @@ static Record AndMultiplexer_Consume(OpBase *opBase) {
for(int i = 1; i < op->op.childCount; i++) {
Record branch_record = _pullFromBranchStream(op, i);
// Don't care about the branch record.
if(branch_record) OpBase_DeleteRecord(branch_record);
if(branch_record) OpBase_DeleteRecord(&branch_record);
else {
// Did not managed to get a record from some branch, loop back and restart.
OpBase_DeleteRecord(op->r);
op->r = NULL;
OpBase_DeleteRecord(&op->r);
break;
}
}
Expand All @@ -143,8 +141,7 @@ static Record AndMultiplexer_Consume(OpBase *opBase) {
static OpResult OpApplyMultiplexerReset(OpBase *opBase) {
OpApplyMultiplexer *op = (OpApplyMultiplexer *)opBase;
if(op->r) {
OpBase_DeleteRecord(op->r);
op->r = NULL;
OpBase_DeleteRecord(&op->r);
}
return OP_OK;
}
Expand All @@ -164,8 +161,7 @@ static void OpApplyMultiplexerFree(OpBase *opBase) {
}

if(op->r) {
OpBase_DeleteRecord(op->r);
op->r = NULL;
OpBase_DeleteRecord(&op->r);
}
}

6 changes: 2 additions & 4 deletions src/execution_plan/ops/op_argument.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ static OpResult ArgumentReset(OpBase *opBase) {
Argument *arg = (Argument *)opBase;

if(arg->r) {
OpBase_DeleteRecord(arg->r);
arg->r = NULL;
OpBase_DeleteRecord(&arg->r);
}

return OP_OK;
Expand All @@ -64,8 +63,7 @@ static inline OpBase *ArgumentClone(const ExecutionPlan *plan, const OpBase *opB
static void ArgumentFree(OpBase *opBase) {
Argument *arg = (Argument *)opBase;
if(arg->r) {
OpBase_DeleteRecord(arg->r);
arg->r = NULL;
OpBase_DeleteRecord(&arg->r);
}
}

4 changes: 2 additions & 2 deletions src/execution_plan/ops/op_argument_list.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ static OpResult ArgumentListReset
// free remaining records
if(op->records != NULL) {
for(uint i = 0; i < op->rec_len; i++) {
OpBase_DeleteRecord(op->records[i]);
OpBase_DeleteRecord(op->records+i);
}

array_free(op->records);
Expand Down Expand Up @@ -94,7 +94,7 @@ static void ArgumentListFree
// free remaining records
if(op->records != NULL) {
for(uint i = 0; i < op->rec_len; i++) {
OpBase_DeleteRecord(op->records[i]);
OpBase_DeleteRecord(op->records+i);
}

array_free(op->records);
Expand Down
Loading
Loading