Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CLICKHOUSE-3893 - Add modificator rollup. #2948

Merged
merged 25 commits into from Sep 6, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
df017f9
test
CurtizJ Aug 7, 2018
8f97453
Revert "test"
CurtizJ Aug 7, 2018
4534d44
working on rollup
CurtizJ Aug 21, 2018
dbfd513
Merge branch 'master' of https://github.com/yandex/ClickHouse
CurtizJ Aug 21, 2018
0704da5
Merge branch 'master' into CLICKHOUSE-3893
CurtizJ Aug 21, 2018
c18ad18
Add rollup modificator
CurtizJ Aug 24, 2018
e0b3283
Merge remote-tracking branch 'upstream/master' into CLICKHOUSE-3893
CurtizJ Aug 24, 2018
bdedf1d
Add test
CurtizJ Aug 24, 2018
cf3fbb8
minor changes
CurtizJ Aug 24, 2018
a04c372
remove redundant code
CurtizJ Aug 27, 2018
b48a144
add mysql like rollup syntax
CurtizJ Aug 27, 2018
0e28195
update syntax and test
CurtizJ Aug 27, 2018
a603595
remove redundant code
CurtizJ Aug 28, 2018
3bc743c
do not merge blocks into one
CurtizJ Sep 3, 2018
cdb2c8a
Revert "Merge branch 'master' into CLICKHOUSE-3893"
CurtizJ Sep 5, 2018
a7bbf83
Revert "Revert "Merge branch 'master' into CLICKHOUSE-3893""
CurtizJ Sep 5, 2018
18da41a
Merge remote-tracking branch 'upstream/master' into CLICKHOUSE-3893
CurtizJ Sep 5, 2018
35cbdcd
optimize memory consumption
CurtizJ Sep 5, 2018
ea2b569
fix submodules updates
CurtizJ Sep 5, 2018
d0b56da
fix submodules updates
CurtizJ Sep 5, 2018
0ad5793
fix submodules updates
CurtizJ Sep 5, 2018
472adf4
add rollup to formatted query
CurtizJ Sep 5, 2018
e6d49c4
Update InterpreterSelectQuery.cpp
CurtizJ Sep 5, 2018
7f71bc4
Update InterpreterSelectQuery.cpp
CurtizJ Sep 5, 2018
4c549f7
Update RollupBlockInputStream.cpp
alexey-milovidov Sep 6, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
73 changes: 73 additions & 0 deletions dbms/src/DataStreams/RollupBlockInputStream.cpp
@@ -0,0 +1,73 @@
#include <DataStreams/RollupBlockInputStream.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <Columns/ColumnAggregateFunction.h>
#include <Columns/FilterDescription.h>
#include <Common/typeid_cast.h>

namespace DB
{

static void finalize(Block & block)
{
for (size_t i = 0; i < block.columns(); ++i)
{
ColumnWithTypeAndName & current = block.getByPosition(i);
const DataTypeAggregateFunction * unfinalized_type = typeid_cast<const DataTypeAggregateFunction *>(current.type.get());

if (unfinalized_type)
{
current.type = unfinalized_type->getReturnType();
if (current.column)
current.column = typeid_cast<const ColumnAggregateFunction &>(*current.column).convertToValues();
}
}
}

RollupBlockInputStream::RollupBlockInputStream(
const BlockInputStreamPtr & input_, const Aggregator::Params & params_) : aggregator(params_),
keys(params_.keys)
{
children.push_back(input_);
Aggregator::CancellationHook hook = [this]() { return this->isCancelled(); };
aggregator.setCancellationHook(hook);
}


Block RollupBlockInputStream::getHeader() const
{
Block res = children.at(0)->getHeader();
finalize(res);
return res;
}


Block RollupBlockInputStream::readImpl()
{
/** After reading a block from input stream,
* we will subsequently roll it up on next iterations of 'readImpl'
* by zeroing out every column one-by-one and re-merging a block.
*/

if (current_key >= 0)
{
auto & current = rollup_block.getByPosition(keys[current_key]);
current.column = current.column->cloneEmpty()->cloneResized(rollup_block.rows());
--current_key;

BlocksList rollup_blocks = { rollup_block };
rollup_block = aggregator.mergeBlocks(rollup_blocks, false);

Block finalized = rollup_block;
finalize(finalized);
return finalized;
}

Block block = children[0]->read();
current_key = keys.size() - 1;

rollup_block = block;
finalize(block);

return block;
}
}
41 changes: 41 additions & 0 deletions dbms/src/DataStreams/RollupBlockInputStream.h
@@ -0,0 +1,41 @@
#pragma once

#include <DataStreams/IProfilingBlockInputStream.h>
#include <Common/Arena.h>
#include <Interpreters/Aggregator.h>
#include <Core/ColumnNumbers.h>


namespace DB
{

class ExpressionActions;


/** Takes blocks after grouping, with non-finalized aggregate functions.
* Calculates subtotals and grand totals values for a set of columns.
*/
class RollupBlockInputStream : public IProfilingBlockInputStream
{
private:
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
using AggregateColumns = std::vector<ColumnRawPtrs>;
public:
RollupBlockInputStream(
const BlockInputStreamPtr & input_, const Aggregator::Params & params_);

String getName() const override { return "Rollup"; }

Block getHeader() const override;

protected:
Block readImpl() override;

private:
Aggregator aggregator;
ColumnNumbers keys;
ssize_t current_key = -1;
Block rollup_block;
};

}
10 changes: 6 additions & 4 deletions dbms/src/DataStreams/TotalsHavingBlockInputStream.cpp
Expand Up @@ -14,10 +14,10 @@ namespace DB
TotalsHavingBlockInputStream::TotalsHavingBlockInputStream(
const BlockInputStreamPtr & input_,
bool overflow_row_, const ExpressionActionsPtr & expression_,
const std::string & filter_column_, TotalsMode totals_mode_, double auto_include_threshold_)
const std::string & filter_column_, TotalsMode totals_mode_, double auto_include_threshold_, bool final_)
: overflow_row(overflow_row_),
expression(expression_), filter_column_name(filter_column_), totals_mode(totals_mode_),
auto_include_threshold(auto_include_threshold_)
auto_include_threshold(auto_include_threshold_), final(final_)
{
children.push_back(input_);

Expand Down Expand Up @@ -100,7 +100,8 @@ Block TotalsHavingBlockInputStream::getTotals()
Block TotalsHavingBlockInputStream::getHeader() const
{
Block res = children.at(0)->getHeader();
finalize(res);
if (final)
finalize(res);
if (expression)
expression->execute(res);
return res;
Expand All @@ -127,7 +128,8 @@ Block TotalsHavingBlockInputStream::readImpl()
return finalized;

finalized = block;
finalize(finalized);
if (final)
finalize(finalized);

total_keys += finalized.rows();

Expand Down
3 changes: 2 additions & 1 deletion dbms/src/DataStreams/TotalsHavingBlockInputStream.h
Expand Up @@ -26,7 +26,7 @@ class TotalsHavingBlockInputStream : public IProfilingBlockInputStream
TotalsHavingBlockInputStream(
const BlockInputStreamPtr & input_,
bool overflow_row_, const ExpressionActionsPtr & expression_,
const std::string & filter_column_, TotalsMode totals_mode_, double auto_include_threshold_);
const std::string & filter_column_, TotalsMode totals_mode_, double auto_include_threshold_, bool final_);

String getName() const override { return "TotalsHaving"; }

Expand All @@ -43,6 +43,7 @@ class TotalsHavingBlockInputStream : public IProfilingBlockInputStream
String filter_column_name;
TotalsMode totals_mode;
double auto_include_threshold;
bool final;
size_t passed_keys = 0;
size_t total_keys = 0;

Expand Down
47 changes: 42 additions & 5 deletions dbms/src/Interpreters/InterpreterSelectQuery.cpp
Expand Up @@ -18,6 +18,7 @@
#include <DataStreams/CreatingSetsBlockInputStream.h>
#include <DataStreams/MaterializingBlockInputStream.h>
#include <DataStreams/ConcatBlockInputStream.h>
#include <DataStreams/RollupBlockInputStream.h>
#include <DataStreams/ConvertColumnWithDictionaryToFullBlockInputStream.h>

#include <Parsers/ASTSelectQuery.h>
Expand Down Expand Up @@ -479,7 +480,7 @@ void InterpreterSelectQuery::executeImpl(Pipeline & pipeline, const BlockInputSt
bool aggregate_final =
expressions.need_aggregate &&
to_stage > QueryProcessingStage::WithMergeableState &&
!query.group_by_with_totals;
!query.group_by_with_totals && !query.group_by_with_rollup;

if (expressions.first_stage)
{
Expand Down Expand Up @@ -535,7 +536,13 @@ void InterpreterSelectQuery::executeImpl(Pipeline & pipeline, const BlockInputSt
executeMergeAggregated(pipeline, aggregate_overflow_row, aggregate_final);

if (!aggregate_final)
executeTotalsAndHaving(pipeline, expressions.has_having, expressions.before_having, aggregate_overflow_row);
{
if (query.group_by_with_totals)
executeTotalsAndHaving(pipeline, expressions.has_having, expressions.before_having, aggregate_overflow_row, !query.group_by_with_rollup);

if (query.group_by_with_rollup)
executeRollup(pipeline);
}
else if (expressions.has_having)
executeHaving(pipeline, expressions.before_having);

Expand All @@ -549,7 +556,10 @@ void InterpreterSelectQuery::executeImpl(Pipeline & pipeline, const BlockInputSt
need_second_distinct_pass = query.distinct && pipeline.hasMoreThanOneStream();

if (query.group_by_with_totals && !aggregate_final)
executeTotalsAndHaving(pipeline, false, nullptr, aggregate_overflow_row);
executeTotalsAndHaving(pipeline, false, nullptr, aggregate_overflow_row, !query.group_by_with_rollup);

if (query.group_by_with_rollup && !aggregate_final)
executeRollup(pipeline);
}

if (expressions.has_order_by)
Expand Down Expand Up @@ -959,15 +969,42 @@ void InterpreterSelectQuery::executeHaving(Pipeline & pipeline, const Expression
}


void InterpreterSelectQuery::executeTotalsAndHaving(Pipeline & pipeline, bool has_having, const ExpressionActionsPtr & expression, bool overflow_row)
void InterpreterSelectQuery::executeTotalsAndHaving(Pipeline & pipeline, bool has_having, const ExpressionActionsPtr & expression, bool overflow_row, bool final)
{
executeUnion(pipeline);

const Settings & settings = context.getSettingsRef();

pipeline.firstStream() = std::make_shared<TotalsHavingBlockInputStream>(
pipeline.firstStream(), overflow_row, expression,
has_having ? query.having_expression->getColumnName() : "", settings.totals_mode, settings.totals_auto_threshold);
has_having ? query.having_expression->getColumnName() : "", settings.totals_mode, settings.totals_auto_threshold, final);
}

void InterpreterSelectQuery::executeRollup(Pipeline & pipeline)
{
executeUnion(pipeline);

Names key_names;
AggregateDescriptions aggregates;
query_analyzer->getAggregateInfo(key_names, aggregates);

Block header = pipeline.firstStream()->getHeader();

ColumnNumbers keys;

for (const auto & name : key_names)
keys.push_back(header.getPositionByName(name));

const Settings & settings = context.getSettingsRef();

Aggregator::Params params(header, keys, aggregates,
false, settings.max_rows_to_group_by, settings.group_by_overflow_mode,
settings.compile ? &context.getCompiler() : nullptr, settings.min_count_to_compile,
SettingUInt64(0), SettingUInt64(0),
settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set,
context.getTemporaryPath());

pipeline.firstStream() = std::make_shared<RollupBlockInputStream>(pipeline.firstStream(), params);
}


Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Interpreters/InterpreterSelectQuery.h
Expand Up @@ -177,7 +177,7 @@ class InterpreterSelectQuery : public IInterpreter
void executeWhere(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool remove_filter);
void executeAggregation(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final);
void executeMergeAggregated(Pipeline & pipeline, bool overflow_row, bool final);
void executeTotalsAndHaving(Pipeline & pipeline, bool has_having, const ExpressionActionsPtr & expression, bool overflow_row);
void executeTotalsAndHaving(Pipeline & pipeline, bool has_having, const ExpressionActionsPtr & expression, bool overflow_row, bool final);
void executeHaving(Pipeline & pipeline, const ExpressionActionsPtr & expression);
void executeExpression(Pipeline & pipeline, const ExpressionActionsPtr & expression);
void executeOrder(Pipeline & pipeline);
Expand All @@ -190,6 +190,7 @@ class InterpreterSelectQuery : public IInterpreter
void executeDistinct(Pipeline & pipeline, bool before_order, Names columns);
void executeExtremes(Pipeline & pipeline);
void executeSubqueriesInSetsAndJoins(Pipeline & pipeline, std::unordered_map<String, SubqueryForSet> & subqueries_for_sets);
void executeRollup(Pipeline & pipeline);

/** If there is a SETTINGS section in the SELECT query, then apply settings from it.
*
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Parsers/ASTSelectQuery.cpp
Expand Up @@ -103,6 +103,9 @@ void ASTSelectQuery::formatImpl(const FormatSettings & s, FormatState & state, F
: typeid_cast<const ASTExpressionList &>(*group_expression_list).formatImplMultiline(s, state, frame);
}

if (group_by_with_rollup)
s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << indent_str << (s.one_line ? "" : " ") << "WITH ROLLUP" << (s.hilite ? hilite_none : "");

if (group_by_with_totals)
s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << indent_str << (s.one_line ? "" : " ") << "WITH TOTALS" << (s.hilite ? hilite_none : "");

Expand Down
1 change: 1 addition & 0 deletions dbms/src/Parsers/ASTSelectQuery.h
Expand Up @@ -28,6 +28,7 @@ class ASTSelectQuery : public IAST
ASTPtr where_expression;
ASTPtr group_expression_list;
bool group_by_with_totals = false;
bool group_by_with_rollup = false;
ASTPtr having_expression;
ASTPtr order_expression_list;
ASTPtr limit_by_value;
Expand Down
29 changes: 26 additions & 3 deletions dbms/src/Parsers/ParserSelectQuery.cpp
Expand Up @@ -39,6 +39,7 @@ bool ParserSelectQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ParserKeyword s_limit("LIMIT");
ParserKeyword s_settings("SETTINGS");
ParserKeyword s_by("BY");
ParserKeyword s_rollup("ROLLUP");
ParserKeyword s_top("TOP");
ParserKeyword s_offset("OFFSET");

Expand All @@ -48,6 +49,9 @@ bool ParserSelectQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ParserExpressionWithOptionalAlias exp_elem(false);
ParserOrderByExpressionList order_list;

ParserToken open_bracket(TokenType::OpeningRoundBracket);
ParserToken close_bracket(TokenType::ClosingRoundBracket);

/// WITH expr list
{
if (s_with.ignore(pos, expected))
Expand All @@ -67,8 +71,6 @@ bool ParserSelectQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)

if (s_top.ignore(pos, expected))
{
ParserToken open_bracket(TokenType::OpeningRoundBracket);
ParserToken close_bracket(TokenType::ClosingRoundBracket);
ParserNumber num;

if (open_bracket.ignore(pos, expected))
Expand Down Expand Up @@ -113,14 +115,35 @@ bool ParserSelectQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
/// GROUP BY expr list
if (s_group_by.ignore(pos, expected))
{
if (s_rollup.ignore(pos, expected))
{
select_query->group_by_with_rollup = true;
if (!open_bracket.ignore(pos, expected))
return false;
}

if (!exp_list.parse(pos, select_query->group_expression_list, expected))
return false;

if (select_query->group_by_with_rollup && !close_bracket.ignore(pos, expected))
return false;
}

/// WITH ROLLUP
if (s_with.ignore(pos, expected))
{
if (s_rollup.ignore(pos, expected))
select_query->group_by_with_rollup = true;
else if (s_totals.ignore(pos, expected))
select_query->group_by_with_totals = true;
else
return false;
}

/// WITH TOTALS
if (s_with.ignore(pos, expected))
{
if (!s_totals.ignore(pos, expected))
if (select_query->group_by_with_totals || !s_totals.ignore(pos, expected))
return false;

select_query->group_by_with_totals = true;
Expand Down
27 changes: 27 additions & 0 deletions dbms/tests/queries/0_stateless/00701_rollup.reference
@@ -0,0 +1,27 @@
0 120 8
a 0 70 4
a 1 25 2
a 2 45 2
b 0 50 4
b 1 15 2
b 2 35 2
0 120 8
a 0 70 4
a 1 25 2
a 2 45 2
b 0 50 4
b 1 15 2
b 2 35 2

0 120 8
120 8
a 70 4
b 50 4
120 8
a 70 4
b 50 4
120 8
a 70 4
b 50 4

120 8