Skip to content

Commit

Permalink
Merge pull request #55489 from kitaisreal/external-aggregation-improv…
Browse files Browse the repository at this point in the history
…e-performance

Improve performance of external aggregation with a lot of temporary files
  • Loading branch information
alexey-milovidov committed Oct 12, 2023
2 parents e48fad0 + a6f894b commit b53a982
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 54 deletions.
4 changes: 4 additions & 0 deletions src/Common/HashTable/HashTable.h
Expand Up @@ -1274,6 +1274,10 @@ class HashTable : private boost::noncopyable,
return !buf[place_value].isZero(*this);
}

bool ALWAYS_INLINE contains(const Key & x) const
{
return has(x);
}

void write(DB::WriteBuffer & wb) const
{
Expand Down
Expand Up @@ -20,37 +20,9 @@ GroupingAggregatedTransform::GroupingAggregatedTransform(
, num_inputs(num_inputs_)
, params(std::move(params_))
, last_bucket_number(num_inputs, -1)
, read_from_input(num_inputs, false)
{
}

void GroupingAggregatedTransform::readFromAllInputs()
{
auto in = inputs.begin();
read_from_all_inputs = true;

for (size_t i = 0; i < num_inputs; ++i, ++in)
{
if (in->isFinished())
continue;

if (read_from_input[i])
continue;

in->setNeeded();

if (!in->hasData())
{
read_from_all_inputs = false;
continue;
}

auto chunk = in->pull();
read_from_input[i] = true;
addChunk(std::move(chunk), i);
}
}

void GroupingAggregatedTransform::pushData(Chunks chunks, Int32 bucket, bool is_overflows)
{
auto & output = outputs.front();
Expand Down Expand Up @@ -119,7 +91,7 @@ bool GroupingAggregatedTransform::tryPushOverflowData()
return true;
}

IProcessor::Status GroupingAggregatedTransform::prepare()
IProcessor::Status GroupingAggregatedTransform::prepare(const PortNumbers & updated_input_ports, const PortNumbers &)
{
/// Check can output.
auto & output = outputs.front();
Expand All @@ -134,19 +106,51 @@ IProcessor::Status GroupingAggregatedTransform::prepare()
return Status::Finished;
}

/// Read first time from each input to understand if we have two-level aggregation.
if (!read_from_all_inputs)
if (!initialized_index_to_input)
{
readFromAllInputs();
if (!read_from_all_inputs)
return Status::NeedData;
initialized_index_to_input = true;
auto in = inputs.begin();
index_to_input.resize(num_inputs);

for (size_t i = 0; i < num_inputs; ++i, ++in)
index_to_input[i] = in;
}

/// Convert single level to two levels if have two-level input.
if (has_two_level && !single_level_chunks.empty())
return Status::Ready;
auto need_input = [this](size_t input_num)
{
if (last_bucket_number[input_num] < current_bucket)
return true;

return expect_several_chunks_for_single_bucket_per_source && last_bucket_number[input_num] == current_bucket;
};

if (!wait_input_ports_numbers.empty())
{
for (const auto & updated_input_port_number : updated_input_ports)
{
if (!wait_input_ports_numbers.contains(updated_input_port_number))
continue;

auto & input = index_to_input[updated_input_port_number];
if (!input->hasData())
{
wait_input_ports_numbers.erase(updated_input_port_number);
continue;
}

auto chunk = input->pull();
addChunk(std::move(chunk), updated_input_port_number);

if (!input->isFinished() && need_input(updated_input_port_number))
continue;

wait_input_ports_numbers.erase(updated_input_port_number);
}

if (!wait_input_ports_numbers.empty())
return Status::NeedData;
}

/// Check can push (to avoid data caching).
if (!output.canPush())
{
for (auto & input : inputs)
Expand All @@ -155,20 +159,16 @@ IProcessor::Status GroupingAggregatedTransform::prepare()
return Status::PortFull;
}

/// Convert single level to two levels if have two-level input.
if (has_two_level && !single_level_chunks.empty())
return Status::Ready;

bool pushed_to_output = false;

/// Output if has data.
if (has_two_level)
pushed_to_output = tryPushTwoLevelData();

auto need_input = [this](size_t input_num)
{
if (last_bucket_number[input_num] < current_bucket)
return true;

return expect_several_chunks_for_single_bucket_per_source && last_bucket_number[input_num] == current_bucket;
};

/// Read next bucket if can.
for (; ; ++current_bucket)
{
Expand All @@ -190,20 +190,24 @@ IProcessor::Status GroupingAggregatedTransform::prepare()

if (!in->hasData())
{
wait_input_ports_numbers.insert(input_num);
need_data = true;
continue;
}

auto chunk = in->pull();
addChunk(std::move(chunk), input_num);

if (has_two_level && !single_level_chunks.empty())
return Status::Ready;

if (!in->isFinished() && need_input(input_num))
{
wait_input_ports_numbers.insert(input_num);
need_data = true;
}
}

if (has_two_level && !single_level_chunks.empty())
return Status::Ready;

if (finished)
{
all_inputs_finished = true;
Expand Down
@@ -1,5 +1,7 @@
#pragma once

#include <Core/SortDescription.h>
#include <Common/HashTable/HashSet.h>
#include <Interpreters/Aggregator.h>
#include <Processors/IProcessor.h>
#include <Processors/ISimpleTransform.h>
Expand Down Expand Up @@ -67,7 +69,7 @@ class GroupingAggregatedTransform : public IProcessor
void allowSeveralChunksForSingleBucketPerSource() { expect_several_chunks_for_single_bucket_per_source = true; }

protected:
Status prepare() override;
Status prepare(const PortNumbers & updated_input_ports, const PortNumbers &) override;
void work() override;

private:
Expand All @@ -83,16 +85,15 @@ class GroupingAggregatedTransform : public IProcessor
bool has_two_level = false;

bool all_inputs_finished = false;
bool read_from_all_inputs = false;
std::vector<bool> read_from_input;
bool initialized_index_to_input = false;
std::vector<InputPorts::iterator> index_to_input;
HashSet<uint64_t> wait_input_ports_numbers;

/// If we aggregate partitioned data several chunks might be produced for the same bucket: one for each partition.
bool expect_several_chunks_for_single_bucket_per_source = true;

/// Add chunk read from input to chunks_map, overflow_chunks or single_level_chunks according to it's chunk info.
void addChunk(Chunk chunk, size_t input);
/// Read from all inputs first chunk. It is needed to detect if any source has two-level aggregation.
void readFromAllInputs();
/// Push chunks if all inputs has single level.
bool tryPushSingleLevelData();
/// Push chunks from ready bucket if has one.
Expand Down
10 changes: 10 additions & 0 deletions tests/performance/aggregation_external.xml
@@ -0,0 +1,10 @@
<test>
<settings>
<max_threads>30</max_threads>
<max_bytes_before_external_group_by>10485760</max_bytes_before_external_group_by>
</settings>

<query>SELECT number, count() FROM numbers_mt(5000000) GROUP BY number FORMAT Null;</query>
<query>SELECT number, count() FROM numbers_mt(15000000) GROUP BY number FORMAT Null;</query>
<query>SELECT number, count() FROM numbers_mt(30000000) GROUP BY number FORMAT Null;</query>
</test>

0 comments on commit b53a982

Please sign in to comment.