-
Notifications
You must be signed in to change notification settings - Fork 6.6k
/
RollupBlockInputStream.cpp
73 lines (59 loc) · 2.05 KB
/
RollupBlockInputStream.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
#include <DataStreams/RollupBlockInputStream.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <Columns/ColumnAggregateFunction.h>
#include <Columns/FilterDescription.h>
#include <Common/typeid_cast.h>
namespace DB
{
static void finalize(Block & block)
{
for (size_t i = 0; i < block.columns(); ++i)
{
ColumnWithTypeAndName & current = block.getByPosition(i);
const DataTypeAggregateFunction * unfinalized_type = typeid_cast<const DataTypeAggregateFunction *>(current.type.get());
if (unfinalized_type)
{
current.type = unfinalized_type->getReturnType();
if (current.column)
current.column = typeid_cast<const ColumnAggregateFunction &>(*current.column).convertToValues();
}
}
}
RollupBlockInputStream::RollupBlockInputStream(
const BlockInputStreamPtr & input_, const Aggregator::Params & params_) : aggregator(params_),
keys(params_.keys)
{
children.push_back(input_);
Aggregator::CancellationHook hook = [this]() { return this->isCancelled(); };
aggregator.setCancellationHook(hook);
}
Block RollupBlockInputStream::getHeader() const
{
Block res = children.at(0)->getHeader();
finalize(res);
return res;
}
Block RollupBlockInputStream::readImpl()
{
/** After reading a block from input stream,
* we will subsequently roll it up on next iterations of 'readImpl'
* by zeroing out every column one-by-one and re-merging a block.
*/
if (current_key >= 0)
{
auto & current = rollup_block.getByPosition(keys[current_key]);
current.column = current.column->cloneEmpty()->cloneResized(rollup_block.rows());
--current_key;
BlocksList rollup_blocks = { rollup_block };
rollup_block = aggregator.mergeBlocks(rollup_blocks, false);
Block finalized = rollup_block;
finalize(finalized);
return finalized;
}
Block block = children[0]->read();
current_key = keys.size() - 1;
rollup_block = block;
finalize(block);
return block;
}
}