Skip to content

Commit

Permalink
Agg func fixes (RedisGraph#946)
Browse files Browse the repository at this point in the history
* Fix memory leaks in Collect function

* Fix leak in aggregate func's SIValue result

* Fix leak in children of aggregate function call

* Simplify variable-length path free logic

* Improve ownership logic in Collect, add explanatory comments

* PR fix

Co-authored-by: Roi Lipman <swilly22@users.noreply.github.com>
  • Loading branch information
jeffreylovitz and swilly22 committed Feb 27, 2020
1 parent ced050d commit b5feee0
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 7 deletions.
10 changes: 9 additions & 1 deletion src/arithmetic/agg_funcs.c
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,8 @@ int __agg_collectStep(AggCtx *ctx, SIValue *argv, int argc) {

SIValue value = argv[0];
if(value.type == T_NULL) return AGG_OK;
/* SIArray_Append will clone the added value, ensuring it can be
* safely accessed for the lifetime of the Collect context. */
SIArray_Append(&ac->list, value);
return AGG_OK;
}
Expand All @@ -596,7 +598,11 @@ int __agg_collectDistinctStep(AggCtx *ctx, SIValue *argv, int argc) {

int __agg_collectReduceNext(AggCtx *ctx) {
__agg_collectCtx *ac = Agg_FuncCtx(ctx);
Agg_SetResult(ctx, ac->list);
/* Share the Collect context's internal list with the caller,
* as the Collect context owns this allocation. The caller is responsible for
* persisting the value if it will be accessed after the Collect context is freed. */
SIValue result = SI_ShareValue(ac->list);
Agg_SetResult(ctx, result);
return AGG_OK;
}

Expand All @@ -611,6 +617,7 @@ void *__agg_collectCtxNew(AggCtx *ctx) {
void __agg_collectCtxFree(AggCtx *ctx) {
__agg_collectCtx *ac = Agg_FuncCtx(ctx);
if(ac->hashSet) Set_Free(ac->hashSet);
SIValue_Free(ac->list);
rm_free(ac);
}

Expand Down Expand Up @@ -638,3 +645,4 @@ void Agg_RegisterFuncs() {
Agg_RegisterFunc("stDevP", Agg_StdevPFunc);
Agg_RegisterFunc("collect", Agg_CollectFunc);
}

1 change: 1 addition & 0 deletions src/arithmetic/aggregate.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,4 @@ inline void *Agg_FuncCtx(AggCtx *ctx) {
inline void Agg_SetResult(struct AggCtx *ctx, SIValue v) {
ctx->result = v;
}

1 change: 1 addition & 0 deletions src/arithmetic/arithmetic_expression.c
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ void AR_EXP_Aggregate(const AR_ExpNode *root, const Record r) {
/* Aggregate. */
AggCtx *agg = root->op.agg_func;
agg->Step(agg, sub_trees, root->op.child_count);
_AR_EXP_FreeResultsArray(sub_trees, root->op.child_count);
} else {
/* Keep searching for aggregation nodes. */
for(int i = 0; i < root->op.child_count; i++) {
Expand Down
10 changes: 4 additions & 6 deletions src/execution_plan/ops/op_cond_var_len_traverse.c
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,10 @@ static Record CondVarLenTraverseConsume(OpBase *opBase) {

if(!op->expandInto) Record_AddNode(op->r, op->destNodeIdx, n);
if(op->edgesIdx >= 0) {
if(reused_record) {
// If we're returning a new path from a previously-used Record,
// free the previous path to avoid a memory leak.
SIValue old_path = Record_GetScalar(op->r, op->edgesIdx);
SIValue_Free(old_path);
}
// If we're returning a new path from a previously-used Record,
// free the previous path to avoid a memory leak.
if(reused_record) SIValue_Free(Record_GetScalar(op->r, op->edgesIdx));
// Add new path to Record.
Record_AddScalar(op->r, op->edgesIdx, SI_Path(p));
}

Expand Down

0 comments on commit b5feee0

Please sign in to comment.