Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move updateInputStream to ITransformingStep #37393

Merged
merged 25 commits into from
Jun 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
a38c1df
unify signatures of updateInputStream across steps
nickitat May 20, 2022
91a5318
move updateInputStream to intf
nickitat May 20, 2022
fa2bb58
Merge branch 'master' into move_update_input_stream_to_intf
mergify[bot] May 23, 2022
a3659f3
remove res_header from ArrayJoinStep
nickitat May 28, 2022
0edda4c
aggregates arguments by names
nickitat May 28, 2022
33eb2e6
aggregation keys by names
nickitat May 29, 2022
e726a99
remove ColumnsMask from steps
nickitat May 30, 2022
9f45d6f
more safe comparison for grouping sets
nickitat May 30, 2022
c7d28e2
Merge branch 'master' into move_update_input_stream_to_intf
nickitat May 30, 2022
5a238b6
fix tests
nickitat May 31, 2022
62d815c
fix
nickitat Jun 2, 2022
b34c9b1
move construction of AggregatingTransformParams to transforms level
nickitat Jun 3, 2022
d3972fc
Aggregator::Params only one header
nickitat Jun 3, 2022
f7af302
move header out from Aggregator::Params
nickitat Jun 7, 2022
7862aa4
fix tests
nickitat Jun 8, 2022
ed3db52
fix join push down
nickitat Jun 16, 2022
11f0d8d
cleanup
nickitat Jun 11, 2022
d40477c
Merge commit 'e604d31febf6744d50b83c31a9aa72c6f207d3bb' into move_upd…
nickitat Jun 16, 2022
47bbd4b
Merge branch 'master' into move_update_input_stream_to_intf
mergify[bot] Jun 17, 2022
b0e4bd7
Merge branch 'master' into move_update_input_stream_to_intf
mergify[bot] Jun 17, 2022
e216708
Merge branch 'master' into move_update_input_stream_to_intf
mergify[bot] Jun 19, 2022
574321d
Merge remote-tracking branch 'upstream/master' into move_update_input…
nickitat Jun 22, 2022
fded7d7
fix
nickitat Jun 22, 2022
80ac85e
Merge branch 'master' into move_update_input_stream_to_intf
mergify[bot] Jun 23, 2022
83527e2
Merge branch 'master' into move_update_input_stream_to_intf
mergify[bot] Jun 23, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 10 additions & 7 deletions src/Interpreters/ActionsDAG.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1786,7 +1786,9 @@ ActionsDAGPtr ActionsDAG::cloneActionsForConjunction(NodeRawConstPtrs conjunctio
actions->inputs.push_back(input);
}

actions->index.push_back(input);
/// We should not add result_predicate into the index for the second time.
if (input->result_name != result_predicate->result_name)
actions->index.push_back(input);
}

return actions;
Expand Down Expand Up @@ -1840,13 +1842,14 @@ ActionsDAGPtr ActionsDAG::cloneActionsForFilterPushDown(
if (can_remove_filter)
{
/// If filter column is not needed, remove it from index.
for (auto i = index.begin(); i != index.end(); ++i)
std::erase_if(index, [&](const Node * node) { return node == predicate; });

/// At the very end of this method we'll call removeUnusedActions() with allow_remove_inputs=false,
/// so we need to manually remove predicate if it is an input node.
if (predicate->type == ActionType::INPUT)
{
if (*i == predicate)
{
index.erase(i);
break;
}
std::erase_if(inputs, [&](const Node * node) { return node == predicate; });
nodes.remove_if([&](const Node & node) { return &node == predicate; });
}
}
else
Expand Down
27 changes: 0 additions & 27 deletions src/Interpreters/AggregateDescription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,24 +82,6 @@ void AggregateDescription::explain(WriteBuffer & out, size_t indent) const
}
out << "\n";
}

out << prefix << " Argument positions: ";

if (arguments.empty())
out << "none\n";
else
{
bool first = true;
for (auto arg : arguments)
{
if (!first)
out << ", ";
first = false;

out << arg;
}
out << '\n';
}
}

void AggregateDescription::explain(JSONBuilder::JSONMap & map) const
Expand Down Expand Up @@ -137,15 +119,6 @@ void AggregateDescription::explain(JSONBuilder::JSONMap & map) const
args_array->add(name);

map.add("Arguments", std::move(args_array));

if (!arguments.empty())
{
auto args_pos_array = std::make_unique<JSONBuilder::JSONArray>();
for (auto pos : arguments)
args_pos_array->add(pos);

map.add("Argument Positions", std::move(args_pos_array));
}
}

}
4 changes: 1 addition & 3 deletions src/Interpreters/AggregateDescription.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,12 @@ struct AggregateDescription
{
AggregateFunctionPtr function;
Array parameters; /// Parameters of the (parametric) aggregate function.
ColumnNumbers arguments;
Names argument_names; /// used if no `arguments` are specified.
Names argument_names;
String column_name; /// What name to use for a column with aggregate function values

void explain(WriteBuffer & out, size_t indent) const; /// Get description for EXPLAIN query.
void explain(JSONBuilder::JSONMap & map) const;
};

using AggregateDescriptions = std::vector<AggregateDescription>;

}
80 changes: 38 additions & 42 deletions src/Interpreters/Aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,14 @@ auto constructWithReserveIfPossible(size_t size_hint)
else
return std::make_unique<Method>();
}

DB::ColumnNumbers calculateKeysPositions(const DB::Block & header, const DB::Aggregator::Params & params)
{
DB::ColumnNumbers keys_positions(params.keys_size);
for (size_t i = 0; i < params.keys_size; ++i)
keys_positions[i] = header.getPositionByName(params.keys[i]);
return keys_positions;
}
}

namespace DB
Expand Down Expand Up @@ -356,21 +364,17 @@ Aggregator::Params::StatsCollectingParams::StatsCollectingParams(

Block Aggregator::getHeader(bool final) const
{
return params.getHeader(final);
return params.getHeader(header, final);
}

Block Aggregator::Params::getHeader(
const Block & src_header,
const Block & intermediate_header,
const ColumnNumbers & keys,
const AggregateDescriptions & aggregates,
bool final)
const Block & header, bool only_merge, const Names & keys, const AggregateDescriptions & aggregates, bool final)
{
Block res;

if (intermediate_header)
if (only_merge)
{
res = intermediate_header.cloneEmpty();
res = header.cloneEmpty();

if (final)
{
Expand All @@ -386,14 +390,14 @@ Block Aggregator::Params::getHeader(
else
{
for (const auto & key : keys)
res.insert(src_header.safeGetByPosition(key).cloneEmpty());
res.insert(header.getByName(key).cloneEmpty());

for (const auto & aggregate : aggregates)
{
size_t arguments_size = aggregate.arguments.size();
size_t arguments_size = aggregate.argument_names.size();
DataTypes argument_types(arguments_size);
for (size_t j = 0; j < arguments_size; ++j)
argument_types[j] = src_header.safeGetByPosition(aggregate.arguments[j]).type;
argument_types[j] = header.getByName(aggregate.argument_names[j]).type;

DataTypePtr type;
if (final)
Expand Down Expand Up @@ -434,26 +438,20 @@ Aggregator::AggregateColumnsConstData Aggregator::Params::makeAggregateColumnsDa
void Aggregator::Params::explain(WriteBuffer & out, size_t indent) const
{
Strings res;
const auto & header = src_header ? src_header
: intermediate_header;

String prefix(indent, ' ');

{
/// Dump keys.
out << prefix << "Keys: ";

bool first = true;
for (auto key : keys)
for (const auto & key : keys)
{
if (!first)
out << ", ";
first = false;

if (key >= header.columns())
out << "unknown position " << key;
else
out << header.getByPosition(key).name;
out << key;
}

out << '\n';
Expand All @@ -470,18 +468,10 @@ void Aggregator::Params::explain(WriteBuffer & out, size_t indent) const

void Aggregator::Params::explain(JSONBuilder::JSONMap & map) const
{
const auto & header = src_header ? src_header
: intermediate_header;

auto keys_array = std::make_unique<JSONBuilder::JSONArray>();

for (auto key : keys)
{
if (key >= header.columns())
keys_array->add("");
else
keys_array->add(header.getByPosition(key).name);
}
for (const auto & key : keys)
keys_array->add(key);

map.add("Keys", std::move(keys_array));

Expand Down Expand Up @@ -526,7 +516,8 @@ class CompiledAggregateFunctionsHolder final : public CompiledExpressionCacheEnt

#endif

Aggregator::Aggregator(const Params & params_) : params(params_)
Aggregator::Aggregator(const Block & header_, const Params & params_)
: header(header_), keys_positions(calculateKeysPositions(header, params_)), params(params_)
{
/// Use query-level memory tracker
if (auto * memory_tracker_child = CurrentThread::getMemoryTracker())
Expand Down Expand Up @@ -672,9 +663,9 @@ AggregatedDataVariants::Type Aggregator::chooseAggregationMethod()
bool has_nullable_key = false;
bool has_low_cardinality = false;

for (const auto & pos : params.keys)
for (const auto & key : params.keys)
{
DataTypePtr type = (params.src_header ? params.src_header : params.intermediate_header).safeGetByPosition(pos).type;
DataTypePtr type = header.getByName(key).type;

if (type->lowCardinality())
{
Expand Down Expand Up @@ -1277,11 +1268,15 @@ void NO_INLINE Aggregator::mergeOnIntervalWithoutKeyImpl(
}


void Aggregator::prepareAggregateInstructions(Columns columns, AggregateColumns & aggregate_columns, Columns & materialized_columns,
AggregateFunctionInstructions & aggregate_functions_instructions, NestedColumnsHolder & nested_columns_holder) const
void Aggregator::prepareAggregateInstructions(
Columns columns,
AggregateColumns & aggregate_columns,
Columns & materialized_columns,
AggregateFunctionInstructions & aggregate_functions_instructions,
NestedColumnsHolder & nested_columns_holder) const
{
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_columns[i].resize(params.aggregates[i].arguments.size());
aggregate_columns[i].resize(params.aggregates[i].argument_names.size());

aggregate_functions_instructions.resize(params.aggregates_size + 1);
aggregate_functions_instructions[params.aggregates_size].that = nullptr;
Expand All @@ -1293,7 +1288,8 @@ void Aggregator::prepareAggregateInstructions(Columns columns, AggregateColumns

for (size_t j = 0; j < aggregate_columns[i].size(); ++j)
{
materialized_columns.push_back(columns.at(params.aggregates[i].arguments[j])->convertToFullColumnIfConst());
const auto pos = header.getPositionByName(params.aggregates[i].argument_names[j]);
materialized_columns.push_back(columns.at(pos)->convertToFullColumnIfConst());
aggregate_columns[i][j] = materialized_columns.back().get();

auto full_column = allow_sparse_arguments
Expand Down Expand Up @@ -1382,7 +1378,7 @@ bool Aggregator::executeOnBlock(Columns columns,
/// Remember the columns we will work with
for (size_t i = 0; i < params.keys_size; ++i)
{
materialized_columns.push_back(recursiveRemoveSparse(columns.at(params.keys[i]))->convertToFullColumnIfConst());
materialized_columns.push_back(recursiveRemoveSparse(columns.at(keys_positions[i]))->convertToFullColumnIfConst());
key_columns[i] = materialized_columns.back().get();

if (!result.isLowCardinality())
Expand Down Expand Up @@ -1954,11 +1950,11 @@ Block Aggregator::prepareBlockAndFill(
MutableColumns final_aggregate_columns(params.aggregates_size);
AggregateColumnsData aggregate_columns_data(params.aggregates_size);

Block header = getHeader(final);
Block res_header = getHeader(final);

for (size_t i = 0; i < params.keys_size; ++i)
{
key_columns[i] = header.safeGetByPosition(i).type->createColumn();
key_columns[i] = res_header.safeGetByPosition(i).type->createColumn();
key_columns[i]->reserve(rows);
}

Expand All @@ -1967,7 +1963,7 @@ Block Aggregator::prepareBlockAndFill(
if (!final)
{
const auto & aggregate_column_name = params.aggregates[i].column_name;
aggregate_columns[i] = header.getByName(aggregate_column_name).type->createColumn();
aggregate_columns[i] = res_header.getByName(aggregate_column_name).type->createColumn();

/// The ColumnAggregateFunction column captures the shared ownership of the arena with the aggregate function states.
ColumnAggregateFunction & column_aggregate_func = assert_cast<ColumnAggregateFunction &>(*aggregate_columns[i]);
Expand Down Expand Up @@ -2003,7 +1999,7 @@ Block Aggregator::prepareBlockAndFill(

filler(key_columns, aggregate_columns_data, final_aggregate_columns, final);

Block res = header.cloneEmpty();
Block res = res_header.cloneEmpty();

for (size_t i = 0; i < params.keys_size; ++i)
res.getByPosition(i).column = std::move(key_columns[i]);
Expand All @@ -2018,7 +2014,7 @@ Block Aggregator::prepareBlockAndFill(
}

/// Change the size of the columns-constants in the block.
size_t columns = header.columns();
size_t columns = res_header.columns();
for (size_t i = 0; i < columns; ++i)
if (isColumnConst(*res.getByPosition(i).column))
res.getByPosition(i).column = res.getByPosition(i).column->cut(0, rows);
Expand Down