Skip to content

Commit

Permalink
Simplify OpAggregate logic, remove unnecessary struct members (RedisG…
Browse files Browse the repository at this point in the history
…raph#947)

* Simplify OpAggregate logic, remove unnecessary struct members

* Update op_aggregate.c

* Update op_aggregate.c

* Update op_aggregate.c

* Update op_aggregate.c

Co-authored-by: Roi Lipman <swilly22@users.noreply.github.com>
  • Loading branch information
jeffreylovitz and swilly22 committed Feb 23, 2020
1 parent 0cfe1bb commit f31a74c
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 133 deletions.
192 changes: 80 additions & 112 deletions src/execution_plan/ops/op_aggregate.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,37 +13,35 @@
#include "../../arithmetic/aggregate.h"

/* Forward declarations. */
static OpResult AggregateInit(OpBase *opBase);
static Record AggregateConsume(OpBase *opBase);
static OpResult AggregateReset(OpBase *opBase);
static void AggregateFree(OpBase *opBase);

/* Initialize expression_classification, which denotes whether each
* expression in the RETURN or ORDER segment is an aggregate function.
* In addition keeps track of non-aggregated expressions in
* a separate array.*/
static void _classify_expressions(OpAggregate *op) {
op->non_aggregated_expressions = array_new(AR_ExpNode *, 0);
/* Migrate each expression projected by this operation to either
* the array of keys or the array of aggregate functions as appropriate. */
static void _migrate_expressions(OpAggregate *op, AR_ExpNode **exps) {
uint exp_count = array_len(exps);
op->key_exps = array_new(AR_ExpNode *, 0);
op->aggregate_exps = array_new(AR_ExpNode *, 1);

op->expression_classification = rm_malloc(sizeof(ExpClassification) * op->exp_count);

for(uint i = 0; i < op->exp_count; i++) {
AR_ExpNode *exp = op->exps[i];
for(uint i = 0; i < exp_count; i++) {
AR_ExpNode *exp = exps[i];
if(!AR_EXP_ContainsAggregation(exp)) {
op->expression_classification[i] = NON_AGGREGATED;
op->non_aggregated_expressions = array_append(op->non_aggregated_expressions, exp);
op->key_exps = array_append(op->key_exps, exp);
} else {
op->expression_classification[i] = AGGREGATED;
op->aggregate_exps = array_append(op->aggregate_exps, exp);
}
}

op->aggregate_count = array_len(op->aggregate_exps);
op->key_count = array_len(op->key_exps);
}

static AR_ExpNode **_build_aggregated_expressions(OpAggregate *op) {
AR_ExpNode **agg_exps = array_new(AR_ExpNode *, 1);
static AR_ExpNode **_build_aggregate_exps(OpAggregate *op) {
AR_ExpNode **agg_exps = array_new(AR_ExpNode *, op->aggregate_count);

for(uint i = 0; i < op->exp_count; i++) {
if(op->expression_classification[i] == NON_AGGREGATED) continue;
AR_ExpNode *exp = AR_EXP_Clone(op->exps[i]);
for(uint i = 0; i < op->aggregate_count; i++) {
AR_ExpNode *exp = AR_EXP_Clone(op->aggregate_exps[i]);
agg_exps = array_append(agg_exps, exp);
}

Expand All @@ -53,46 +51,42 @@ static AR_ExpNode **_build_aggregated_expressions(OpAggregate *op) {
static Group *_CreateGroup(OpAggregate *op, Record r) {
/* Create a new group
* Get a fresh copy of aggregation functions. */
AR_ExpNode **agg_exps = _build_aggregated_expressions(op);
AR_ExpNode **agg_exps = _build_aggregate_exps(op);

/* Clone group keys. */
uint key_count = array_len(op->non_aggregated_expressions);
SIValue *group_keys = rm_malloc(sizeof(SIValue) * key_count);
SIValue *group_keys = rm_malloc(sizeof(SIValue) * op->key_count);

for(uint i = 0; i < key_count; i++) {
for(uint i = 0; i < op->key_count; i++) {
SIValue key = op->group_keys[i];
SIValue_Persist(&key);
group_keys[i] = key;
}

/* There's no need to keep a reference to record if we're not sorting groups. */
Record cache_record = (op->should_cache_records) ? r : NULL;
op->group = NewGroup(key_count, group_keys, agg_exps, cache_record);
op->group = NewGroup(op->key_count, group_keys, agg_exps, cache_record);

return op->group;
}

static void _ComputeGroupKey(OpAggregate *op, Record r) {
uint exp_count = array_len(op->non_aggregated_expressions);

for(uint i = 0; i < exp_count; i++) {
AR_ExpNode *exp = op->non_aggregated_expressions[i];
for(uint i = 0; i < op->key_count; i++) {
AR_ExpNode *exp = op->key_exps[i];
op->group_keys[i] = AR_EXP_Evaluate(exp, r);
}
}

static void _ComputeGroupKeyStr(OpAggregate *op, char **key) {
uint non_agg_exp_count = array_len(op->non_aggregated_expressions);
if(non_agg_exp_count == 0) {
if(op->key_count == 0) {
*key = rm_strdup("SINGLE_GROUP");
return;
}

// Determine required size for group key string representation.
size_t key_len = SIValue_StringJoinLen(op->group_keys, non_agg_exp_count, ",");
size_t key_len = SIValue_StringJoinLen(op->group_keys, op->key_count, ",");
*key = rm_malloc(sizeof(char) * key_len);
size_t bytesWritten = 0;
SIValue_StringJoin(op->group_keys, non_agg_exp_count, ",", key, &key_len, &bytesWritten);
SIValue_StringJoin(op->group_keys, op->key_count, ",", key, &key_len, &bytesWritten);
}

/* Retrieves group under which given record belongs to,
Expand All @@ -107,21 +101,14 @@ static Group *_GetGroup(OpAggregate *op, Record r) {
op->group = _CreateGroup(op, r);
Group_KeyStr(op->group, &group_key_str);
CacheGroupAdd(op->groups, group_key_str, op->group);
rm_free(group_key_str);
return op->group;
goto cleanup;
}

// Evaluate non-aggregated fields, see if they match
// the last accessed group.
bool reuseLastAccessedGroup = true;
uint exp_count = array_len(op->non_aggregated_expressions);
for(uint i = 0; i < exp_count; i++) {
if(reuseLastAccessedGroup &&
SIValue_Compare(op->group->keys[i], op->group_keys[i], NULL) == 0) {
reuseLastAccessedGroup = true;
} else {
reuseLastAccessedGroup = false;
}
for(uint i = 0; reuseLastAccessedGroup && i < op->key_count; i++) {
reuseLastAccessedGroup = (SIValue_Compare(op->group->keys[i], op->group_keys[i], NULL) == 0);
}

// See if we can reuse last accessed group.
Expand All @@ -130,14 +117,12 @@ static Group *_GetGroup(OpAggregate *op, Record r) {
// Can't reuse last accessed group, lookup group by identifier key.
_ComputeGroupKeyStr(op, &group_key_str);
op->group = CacheGroupGet(op->groups, group_key_str);
if(op->group) {
rm_free(group_key_str);
return op->group;
if(!op->group) {
// Group does not exists, create it.
op->group = _CreateGroup(op, r);
CacheGroupAdd(op->groups, group_key_str, op->group);
}

// Group does not exists, create it.
op->group = _CreateGroup(op, r);
CacheGroupAdd(op->groups, group_key_str, op->group);
cleanup:
rm_free(group_key_str);
return op->group;
}
Expand All @@ -148,14 +133,12 @@ static void _aggregateRecord(OpAggregate *op, Record r) {
assert(group);

// Aggregate group exps.
uint aggFuncCount = array_len(group->aggregationFunctions);
for(uint i = 0; i < aggFuncCount; i++) {
for(uint i = 0; i < op->aggregate_count; i++) {
AR_ExpNode *exp = group->aggregationFunctions[i];
AR_EXP_Aggregate(exp, r);
}

/* Free record, incase it is not group representative.
* group representative will be freed once group is freed. */

// Free record.
OpBase_DeleteRecord(r);
}

Expand All @@ -166,75 +149,64 @@ static Record _handoff(OpAggregate *op) {
if(!CacheGroupIterNext(op->group_iter, &key, &group)) return NULL;

Record r = OpBase_CreateRecord((OpBase *)op);
/* In this function, we're only evaluating aggregate function calls, which do not
* currently raise exceptions. As such, we don't need to register volatile Records,
* though this may not be true in the future. */
// OpBase_AddVolatileRecord((OpBase *)op, r);

// Populate record.
uint aggIdx = 0; // Index into group aggregated exps.
uint keyIdx = 0; // Index into group keys.
SIValue res;

for(uint i = 0; i < op->exp_count; i++) {
// Add all projected keys to the Record.
for(uint i = 0; i < op->key_count; i++) {
int rec_idx = op->record_offsets[i];
if(op->expression_classification[i] == AGGREGATED) {
// Aggregated expression, get aggregated value.
AR_ExpNode *exp = group->aggregationFunctions[aggIdx++];
AR_EXP_Reduce(exp);
res = AR_EXP_Evaluate(exp, NULL);
Record_AddScalar(r, rec_idx, res);
} else {
// Non-aggregated expression.
res = group->keys[keyIdx++];
// Key values are shared with the Record, as they'll be freed with the group cache.
res = SI_ShareValue(res);
Record_Add(r, rec_idx, res);
}
// Non-aggregated expression.
SIValue res = group->keys[i];
// Key values are shared with the Record, as they'll be freed with the group cache.
res = SI_ShareValue(res);
Record_Add(r, rec_idx, res);
}

// Compute the final value of all aggregating expressions and add to the Record.
for(uint i = 0; i < op->aggregate_count; i++) {
int rec_idx = op->record_offsets[i + op->key_count];
AR_ExpNode *exp = group->aggregationFunctions[i];
AR_EXP_Reduce(exp);
SIValue res = AR_EXP_Evaluate(exp, NULL);
Record_AddScalar(r, rec_idx, res);
}

// OpBase_RemoveVolatileRecords((OpBase *)op); // Not currently necessary, as described above.
return r;
}

OpBase *NewAggregateOp(const ExecutionPlan *plan, AR_ExpNode **exps, bool should_cache_records) {
OpAggregate *op = rm_malloc(sizeof(OpAggregate));
op->exps = exps;
op->group = NULL;
op->group_iter = NULL;
op->group_keys = NULL;
op->expression_classification = NULL;
op->non_aggregated_expressions = NULL;
op->groups = CacheGroupNew();
op->exp_count = array_len(exps);
op->record_offsets = array_new(uint, op->exp_count);
op->should_cache_records = should_cache_records;

OpBase_Init((OpBase *)op, OPType_AGGREGATE, "Aggregate", AggregateInit, AggregateConsume,
// Migrate each expression to the keys array or the aggregations array as appropriate.
_migrate_expressions(op, exps);
array_free(exps);

// Allocate memory for group keys if we have any non-aggregate expressions.
if(op->key_count) op->group_keys = rm_malloc(op->key_count * sizeof(SIValue));

OpBase_Init((OpBase *)op, OPType_AGGREGATE, "Aggregate", NULL, AggregateConsume,
AggregateReset, NULL, NULL, AggregateFree, false, plan);

for(uint i = 0; i < op->exp_count; i ++) {
// The projected record will associate values with their resolved name
// to ensure that space is allocated for each entry.
int record_idx = OpBase_Modifies((OpBase *)op, op->exps[i]->resolved_name);
// The projected record will associate values with their resolved name
// to ensure that space is allocated for each entry.
op->record_offsets = array_new(uint, op->aggregate_count + op->key_count);
for(uint i = 0; i < op->key_count; i ++) {
// Store the index of each key expression.
int record_idx = OpBase_Modifies((OpBase *)op, op->key_exps[i]->resolved_name);
op->record_offsets = array_append(op->record_offsets, record_idx);
}
for(uint i = 0; i < op->aggregate_count; i ++) {
// Store the index of each aggregating expression.
int record_idx = OpBase_Modifies((OpBase *)op, op->aggregate_exps[i]->resolved_name);
op->record_offsets = array_append(op->record_offsets, record_idx);
}

return (OpBase *)op;
}

static OpResult AggregateInit(OpBase *opBase) {
OpAggregate *op = (OpAggregate *)opBase;

// Determine whether each expression is an aggregate function or not.
_classify_expressions(op);

/* Allocate memory for group keys. */
uint nonAggExpCount = array_len(op->non_aggregated_expressions);
if(nonAggExpCount) op->group_keys = rm_malloc(sizeof(SIValue) * nonAggExpCount);
return OP_OK;
}

static Record AggregateConsume(OpBase *opBase) {
OpAggregate *op = (OpAggregate *)opBase;
if(op->group_iter) return _handoff(op);
Expand Down Expand Up @@ -274,12 +246,6 @@ static void AggregateFree(OpBase *opBase) {
OpAggregate *op = (OpAggregate *)opBase;
if(!op) return;

if(op->exps) {
for(uint i = 0; i < op->exp_count; i ++) AR_EXP_Free(op->exps[i]);
array_free(op->exps);
op->exps = NULL;
}

if(op->group_keys) {
rm_free(op->group_keys);
op->group_keys = NULL;
Expand All @@ -290,14 +256,16 @@ static void AggregateFree(OpBase *opBase) {
op->group_iter = NULL;
}

if(op->expression_classification) {
rm_free(op->expression_classification);
op->expression_classification = NULL;
if(op->key_exps) {
for(uint i = 0; i < op->key_count; i ++) AR_EXP_Free(op->key_exps[i]);
array_free(op->key_exps);
op->key_exps = NULL;
}

if(op->non_aggregated_expressions) {
array_free(op->non_aggregated_expressions);
op->non_aggregated_expressions = NULL;
if(op->aggregate_exps) {
for(uint i = 0; i < op->aggregate_count; i ++) AR_EXP_Free(op->aggregate_exps[i]);
array_free(op->aggregate_exps);
op->aggregate_exps = NULL;
}

if(op->groups) {
Expand Down
26 changes: 10 additions & 16 deletions src/execution_plan/ops/op_aggregate.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,18 @@
#include "../../grouping/group_cache.h"
#include "../../arithmetic/arithmetic_expression.h"

typedef enum {
AGGREGATED,
NON_AGGREGATED,
} ExpClassification;

typedef struct {
OpBase op;
AR_ExpNode **exps; /* Projected expressions (including order exps). */
uint *record_offsets; /* Record IDs for exps and order_exps. */
AR_ExpNode **non_aggregated_expressions; /* Array of arithmetic expression. */
ExpClassification *expression_classification; /* Classifies each expression as aggregated/not. */
rax *groups;
Group *group; /* Last accessed group. */
SIValue *group_keys; /* Array of values composing an aggregated group. */
CacheGroupIterator *group_iter;
Record last_record;
uint exp_count; /* Number of projected expressions. */
bool should_cache_records; /* Records should be cached if we're sorting after aggregation. */
uint *record_offsets; /* Record IDs for key and aggregate exps. */
AR_ExpNode **key_exps; /* Array of expressions used to calculate the group key. */
AR_ExpNode **aggregate_exps; /* Array of expressions that aggregate data for each key. */
rax *groups; /* Map of all groups built by this operation. */
Group *group; /* Last accessed group. */
SIValue *group_keys; /* Array of values that represent a key associated with a Group of aggregations. */
CacheGroupIterator *group_iter; /* Iterator for walking all groups. */
uint key_count; /* Number of key expressions. */
uint aggregate_count; /* Number of aggregating expressions. */
bool should_cache_records; /* Records should be cached if we're sorting after aggregation. */
} OpAggregate;

OpBase *NewAggregateOp(const ExecutionPlan *plan, AR_ExpNode **exps, bool should_cache_records);
Expand Down
10 changes: 5 additions & 5 deletions src/execution_plan/optimizations/reduce_count.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@ static int _identifyResultAndAggregateOps(OpBase *root, OpResult **opResult,

// Expecting a single aggregation, without ordering.
*opAggregate = (OpAggregate *)op;
uint exp_count = array_len((*opAggregate)->exps);
if(exp_count != 1) return 0;
if((*opAggregate)->aggregate_count != 1 || (*opAggregate)->key_count != 0) return 0;

AR_ExpNode *exp = (*opAggregate)->exps[0];
AR_ExpNode *exp = (*opAggregate)->aggregate_exps[0];

// Make sure aggregation performs counting.
if(exp->type != AR_EXP_OP ||
Expand Down Expand Up @@ -106,7 +105,7 @@ bool _reduceNodeCount(ExecutionPlan *plan) {
* projection operation. */
AR_ExpNode *exp = AR_EXP_NewConstOperandNode(nodeCount);
// The new expression must be aliased to populate the Record.
exp->resolved_name = opAggregate->exps[0]->resolved_name;
exp->resolved_name = opAggregate->aggregate_exps[0]->resolved_name;
AR_ExpNode **exps = array_new(AR_ExpNode *, 1);
exps = array_append(exps, exp);

Expand Down Expand Up @@ -240,7 +239,7 @@ void _reduceEdgeCount(ExecutionPlan *plan) {
* projection operation. */
AR_ExpNode *exp = AR_EXP_NewConstOperandNode(edgeCount);
// The new expression must be aliased to populate the Record.
exp->resolved_name = opAggregate->exps[0]->resolved_name;
exp->resolved_name = opAggregate->aggregate_exps[0]->resolved_name;
AR_ExpNode **exps = array_new(AR_ExpNode *, 1);
exps = array_append(exps, exp);

Expand All @@ -267,3 +266,4 @@ void reduceCount(ExecutionPlan *plan) {
* then edge count will be tried to be executed upon the same execution plan */
if(!_reduceNodeCount(plan)) _reduceEdgeCount(plan);
}

0 comments on commit f31a74c

Please sign in to comment.