Skip to content

Commit

Permalink
Implemented new AVG() pushdown optimizations for distributed queries …
Browse files Browse the repository at this point in the history
…in the ExecutionEngine. This is untested at the moment. Added the ability to get the catalog from RegressionSuite
  • Loading branch information
apavlo committed Mar 21, 2012
1 parent 19fea3b commit 89d06c8
Show file tree
Hide file tree
Showing 13 changed files with 708 additions and 511 deletions.
11 changes: 6 additions & 5 deletions src/ee/common/NValue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,12 @@ class NValue {
static const uint16_t kMaxDecScale = 12;
static const int64_t kMaxScaleFactor = 1000000000000;


const int32_t& getInteger() const {
assert(getValueType() == VALUE_TYPE_INTEGER);
return *reinterpret_cast<const int32_t*>(m_data);
}

private:
/*
* Private methods are private for a reason. Don't expose the raw
Expand Down Expand Up @@ -518,11 +524,6 @@ class NValue {
return *reinterpret_cast<int16_t*>(m_data);
}

const int32_t& getInteger() const {
assert(getValueType() == VALUE_TYPE_INTEGER);
return *reinterpret_cast<const int32_t*>(m_data);
}

int32_t& getInteger() {
assert(getValueType() == VALUE_TYPE_INTEGER);
return *reinterpret_cast<int32_t*>(m_data);
Expand Down
5 changes: 5 additions & 0 deletions src/ee/common/types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,9 @@ string expressionToString(ExpressionType type)
case EXPRESSION_TYPE_AGGREGATE_AVG: {
return "AGGREGATE_AVG";
}
case EXPRESSION_TYPE_AGGREGATE_DISTRIBUTED_AVG: {
return "AGGREGATE_DISTRIBUTED_AVG";
}
}
return "INVALID";
}
Expand Down Expand Up @@ -518,6 +521,8 @@ ExpressionType stringToExpression(string str )
return EXPRESSION_TYPE_AGGREGATE_MAX;
} else if (str == "AGGREGATE_AVG") {
return EXPRESSION_TYPE_AGGREGATE_AVG;
} else if (str == "AGGREGATE_DISTRIBUTED_AVG") {
return EXPRESSION_TYPE_AGGREGATE_DISTRIBUTED_AVG;
}

return EXPRESSION_TYPE_INVALID;
Expand Down
3 changes: 2 additions & 1 deletion src/ee/common/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,8 @@ enum ExpressionType {
EXPRESSION_TYPE_AGGREGATE_SUM = 42,
EXPRESSION_TYPE_AGGREGATE_MIN = 43,
EXPRESSION_TYPE_AGGREGATE_MAX = 44,
EXPRESSION_TYPE_AGGREGATE_AVG = 45
EXPRESSION_TYPE_AGGREGATE_AVG = 45,
EXPRESSION_TYPE_AGGREGATE_DISTRIBUTED_AVG = 46

};

Expand Down
74 changes: 57 additions & 17 deletions src/ee/executors/aggregateexecutor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,24 +135,39 @@ class SumAgg : public Agg

class AvgAgg : public Agg {
public:
AvgAgg() :
AvgAgg(bool weighted) :
m_weighted(weighted),
m_count(0)
{
// m_value initialized on first advance.
m_defaultDelta = ValueFactory::getIntegerValue(1);
}

void advance(const NValue val)
{
this->advance(val, m_defaultDelta);
}

void advance(const NValue val, const NValue delta)
{
if (val.isNull()) {
return;
}
if (m_count == 0) {
m_value = val;
}
else {
m_value = m_value.op_add(val);
if (m_weighted) {
NValue weighted_val = val.op_multiply(delta);
if (m_count == 0) {
m_value = weighted_val;
} else {
m_value = m_value.op_add(weighted_val);
}
m_count += delta.getInteger();
} else {
if (m_count == 0) {
m_value = val;
} else {
m_value = m_value.op_add(val);
}
m_count += 1;
}
++m_count;
}

NValue finalize()
Expand All @@ -167,7 +182,10 @@ class AvgAgg : public Agg {
}

private:
// m_value initialized on first advance.
NValue m_value;
NValue m_defaultDelta;
bool m_weighted;
int64_t m_count;
};

Expand Down Expand Up @@ -313,7 +331,10 @@ inline Agg* getAggInstance(Pool* memoryPool, ExpressionType agg_type)
agg = new (memoryPool->allocate(sizeof(SumAgg))) SumAgg();
break;
case EXPRESSION_TYPE_AGGREGATE_AVG:
agg = new (memoryPool->allocate(sizeof(AvgAgg))) AvgAgg();
agg = new (memoryPool->allocate(sizeof(AvgAgg))) AvgAgg(false);
break;
case EXPRESSION_TYPE_AGGREGATE_DISTRIBUTED_AVG:
agg = new (memoryPool->allocate(sizeof(AvgAgg))) AvgAgg(true);
break;
case EXPRESSION_TYPE_AGGREGATE_MIN:
agg = new (memoryPool->allocate(sizeof(MinAgg))) MinAgg();
Expand Down Expand Up @@ -507,6 +528,7 @@ class Aggregator<PLAN_NODE_TYPE_HASHAGGREGATE>
moveNoHeader(static_cast<char*>
(memoryPool->
allocate(groupByKeySchema->tupleLength())));
m_numAggColumns = static_cast<int>(m_colTypes->size());
}

inline bool nextTuple(TableTuple nextTuple, TableTuple)
Expand Down Expand Up @@ -553,8 +575,20 @@ class Aggregator<PLAN_NODE_TYPE_HASHAGGREGATE>
// update the aggregation calculation.
for (int i = 0; i < m_colTypes->size(); i++)
{
aggregateList->m_aggregates[i]->
advance(nextTuple.getNValue(m_node->getAggregateColumns()[i]));
NValue targetColumn = nextTuple.getNValue(m_node->getAggregateColumns()[i]);

// 2012-03-20 - PAVLO
// We have a new special ExpressionType that can compute a weighted
// average from a distributed query. This is slightly different than our
// other aggregates because we need to get the column that has the count
// and pass that to our special DistributedAvgAgg
if ((*m_aggTypes)[i] == EXPRESSION_TYPE_AGGREGATE_DISTRIBUTED_AVG) {
// We also need the last column, which is our count
NValue weightColumn = nextTuple.getNValue(m_inputTable->columnCount());
((AvgAgg*)aggregateList->m_aggregates[i])->advance(targetColumn, weightColumn);
} else {
aggregateList->m_aggregates[i]->advance(targetColumn);
}
}

return true;
Expand Down Expand Up @@ -612,6 +646,7 @@ class Aggregator<PLAN_NODE_TYPE_HASHAGGREGATE>
std::vector<int>* m_groupByCols;
std::vector<ValueType>* m_colTypes;
HashAggregateMapType m_aggregates;
int m_numAggColumns;
TableTuple groupByKeyTuple;
};

Expand Down Expand Up @@ -645,6 +680,7 @@ class Aggregator<PLAN_NODE_TYPE_AGGREGATE>
agg_types->size())))
{
::memset(m_aggs, 0, sizeof(void*) * agg_types->size());
m_numAggColumns = static_cast<int>(m_colTypes->size());
}

inline bool nextTuple(TableTuple nextTuple, TableTuple prevTuple)
Expand Down Expand Up @@ -677,7 +713,7 @@ class Aggregator<PLAN_NODE_TYPE_AGGREGATE>
{
return false;
}
for (int i = 0; i < m_colTypes->size(); i++)
for (int i = 0; i < m_numAggColumns; i++)
{
//is_ints and ret_types are all referring to all
//output columns some of which are not aggregates. It
Expand All @@ -692,7 +728,7 @@ class Aggregator<PLAN_NODE_TYPE_AGGREGATE>
getAggInstance(m_memoryPool, (*m_aggTypes)[i]);
}
}
for (int i = 0; i < m_colTypes->size(); i++)
for (int i = 0; i < m_numAggColumns; i++)
{
const int column = m_node->getAggregateColumns()[i];
NValue value = nextTuple.getNValue(column);
Expand Down Expand Up @@ -747,6 +783,7 @@ class Aggregator<PLAN_NODE_TYPE_AGGREGATE>
std::vector<ExpressionType>* m_aggTypes;
std::vector<int>* m_groupByCols;
std::vector<ValueType>* m_colTypes;
int m_numAggColumns;
Agg** m_aggs;
};

Expand Down Expand Up @@ -794,8 +831,9 @@ AggregateExecutor<aggregateType>::p_init(AbstractPlanNode *abstract_node,
{
VOLT_ERROR("Failed to find index of AGGREGATE PlanColumn %s [guid=%d]",
node->getAggregateColumnNames()[ctr].c_str(), node->getAggregateColumnGuids()[ctr]);
VOLT_ERROR("[%02d] GUID = %d\n", ctr, node->getAggregateColumnGuids()[ctr]);
VOLT_ERROR("CHILD:\n%s\n----------------\nNODE:\n%s\n", child_node->debugInfo("").c_str(), node->debugInfo("").c_str());
VOLT_ERROR("Invalid Query Plan CHILD:\n%s", child_node->debugInfo("").c_str());
VOLT_ERROR("----------------------------------");
VOLT_ERROR("Invalid Query Plan PARENT:\n%s", node->debugInfo("").c_str());
return false;
}
aggregateColumns.push_back(index);
Expand Down Expand Up @@ -852,8 +890,10 @@ AggregateExecutor<aggregateType>::p_init(AbstractPlanNode *abstract_node,
// Planner must provide the GUID of the input column for the mapping.
int inputColumnIndex = child_node->getColumnIndexFromGuid(outputColumnInputColumnGuid, catalog_db);
if (inputColumnIndex == -1) {
VOLT_ERROR("Can not find index for passthrough col guid %d at offset %d", outputColumnInputColumnGuid, ii);
VOLT_ERROR("CHILD:\n%s\n----------------\nNODE:\n%s\n", child_node->debugInfo("").c_str(), node->debugInfo("").c_str());
VOLT_ERROR("Failed to find index for passthrough col guid %d at offset %d", outputColumnInputColumnGuid, ii);
VOLT_ERROR("Invalid Query Plan CHILD:\n%s", child_node->debugInfo("").c_str());
VOLT_ERROR("----------------------------------");
VOLT_ERROR("Invalid Query Plan PARENT:\n%s", node->debugInfo("").c_str());
return false;
}

Expand Down
2 changes: 1 addition & 1 deletion src/ee/executors/seqscanexecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ bool SeqScanExecutor::p_execute(const NValueArray &params) {
{
predicate->substitute(params);
assert(predicate != NULL);
VOLT_DEBUG("SCAN PREDICATE B:\n%s\n",
VOLT_TRACE("SCAN PREDICATE B:\n%s\n",
predicate->debug(true).c_str());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,13 @@ protected HashAggregatePlanNode cloneAggregatePlanNode(final HashAggregatePlanNo

// Update the cloned AggregateNode to handle distributed averages
List<ExpressionType> clone_types = clone_agg.getAggregateTypes();
boolean has_count = (clone_types.contains(ExpressionType.AGGREGATE_COUNT) ||
clone_types.contains(ExpressionType.AGGREGATE_COUNT_STAR));

// For now we'll always put a COUNT at the end of the AggregatePlanNode
// This makes it easier for us to find it in the EE
boolean has_count = false;
// boolean has_count = (clone_types.contains(ExpressionType.AGGREGATE_COUNT) ||
// clone_types.contains(ExpressionType.AGGREGATE_COUNT_STAR));

int orig_cnt = clone_types.size();
for (int i = 0; i < orig_cnt; i++) {
ExpressionType cloneType = clone_types.get(i);
Expand Down
Loading

0 comments on commit 89d06c8

Please sign in to comment.