Skip to content

Commit

Permalink
Improve performance of external aggregation with a lot of temporary f…
Browse files Browse the repository at this point in the history
…iles
  • Loading branch information
kitaisreal committed Oct 11, 2023
1 parent 5ad055d commit 60163df
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 54 deletions.
Expand Up @@ -20,37 +20,9 @@ GroupingAggregatedTransform::GroupingAggregatedTransform(
, num_inputs(num_inputs_)
, params(std::move(params_))
, last_bucket_number(num_inputs, -1)
, read_from_input(num_inputs, false)
{
}

void GroupingAggregatedTransform::readFromAllInputs()
{
auto in = inputs.begin();
read_from_all_inputs = true;

for (size_t i = 0; i < num_inputs; ++i, ++in)
{
if (in->isFinished())
continue;

if (read_from_input[i])
continue;

in->setNeeded();

if (!in->hasData())
{
read_from_all_inputs = false;
continue;
}

auto chunk = in->pull();
read_from_input[i] = true;
addChunk(std::move(chunk), i);
}
}

void GroupingAggregatedTransform::pushData(Chunks chunks, Int32 bucket, bool is_overflows)
{
auto & output = outputs.front();
Expand Down Expand Up @@ -119,7 +91,7 @@ bool GroupingAggregatedTransform::tryPushOverflowData()
return true;
}

IProcessor::Status GroupingAggregatedTransform::prepare()
IProcessor::Status GroupingAggregatedTransform::prepare(const PortNumbers & updated_input_ports, const PortNumbers &)
{
/// Check can output.
auto & output = outputs.front();
Expand All @@ -137,29 +109,35 @@ IProcessor::Status GroupingAggregatedTransform::prepare()
/// Read first time from each input to understand if we have two-level aggregation.
if (!read_from_all_inputs)
{
readFromAllInputs();
if (!read_from_all_inputs)
return Status::NeedData;
}
read_from_all_inputs = true;
auto in = inputs.begin();
index_to_input.resize(num_inputs);

/// Convert single level to two levels if have two-level input.
if (has_two_level && !single_level_chunks.empty())
return Status::Ready;
for (size_t i = 0; i < num_inputs; ++i, ++in)
{
index_to_input[i] = in;

/// Check can push (to avoid data caching).
if (!output.canPush())
{
for (auto & input : inputs)
input.setNotNeeded();
if (in->isFinished())
continue;

return Status::PortFull;
}
in->setNeeded();

bool pushed_to_output = false;
if (!in->hasData())
{
wait_input_ports_numbers.insert(i);
continue;
}

/// Output if has data.
if (has_two_level)
pushed_to_output = tryPushTwoLevelData();
auto chunk = in->pull();
addChunk(std::move(chunk), i);
}

if (has_two_level && !single_level_chunks.empty())
return Status::Ready;

if (!wait_input_ports_numbers.empty())
return Status::NeedData;
}

auto need_input = [this](size_t input_num)
{
Expand All @@ -169,6 +147,51 @@ IProcessor::Status GroupingAggregatedTransform::prepare()
return expect_several_chunks_for_single_bucket_per_source && last_bucket_number[input_num] == current_bucket;
};

if (!wait_input_ports_numbers.empty())
{
for (const auto & updated_input_port_number : updated_input_ports)
{
auto & input = index_to_input[updated_input_port_number];
// input->setNeeded();

if (!input->hasData())
{
wait_input_ports_numbers.erase(updated_input_port_number);
continue;
}

auto chunk = input->pull();
addChunk(std::move(chunk), updated_input_port_number);

if (!input->isFinished() && need_input(updated_input_port_number))
continue;

wait_input_ports_numbers.erase(updated_input_port_number);
}

if (!output.canPush())
return Status::PortFull;

if (has_two_level && !single_level_chunks.empty())
return Status::Ready;

if (!wait_input_ports_numbers.empty())
return Status::NeedData;
}

if (!output.canPush())
return Status::PortFull;

/// Convert single level to two levels if have two-level input.
if (has_two_level && !single_level_chunks.empty())
return Status::Ready;

bool pushed_to_output = false;

/// Output if has data.
if (has_two_level)
pushed_to_output = tryPushTwoLevelData();

/// Read next bucket if can.
for (; ; ++current_bucket)
{
Expand All @@ -190,20 +213,24 @@ IProcessor::Status GroupingAggregatedTransform::prepare()

if (!in->hasData())
{
wait_input_ports_numbers.insert(input_num);
need_data = true;
continue;
}

auto chunk = in->pull();
addChunk(std::move(chunk), input_num);

if (has_two_level && !single_level_chunks.empty())
return Status::Ready;

if (!in->isFinished() && need_input(input_num))
{
wait_input_ports_numbers.insert(input_num);
need_data = true;
}
}

if (has_two_level && !single_level_chunks.empty())
return Status::Ready;

if (finished)
{
all_inputs_finished = true;
Expand Down
@@ -1,4 +1,5 @@
#pragma once

#include <Core/SortDescription.h>
#include <Interpreters/Aggregator.h>
#include <Processors/IProcessor.h>
Expand Down Expand Up @@ -67,7 +68,7 @@ class GroupingAggregatedTransform : public IProcessor
void allowSeveralChunksForSingleBucketPerSource() { expect_several_chunks_for_single_bucket_per_source = true; }

protected:
Status prepare() override;
Status prepare(const PortNumbers & updated_input_ports, const PortNumbers &) override;
void work() override;

private:
Expand All @@ -84,15 +85,14 @@ class GroupingAggregatedTransform : public IProcessor

bool all_inputs_finished = false;
bool read_from_all_inputs = false;
std::vector<bool> read_from_input;
std::vector<InputPorts::iterator> index_to_input;
std::unordered_set<uint64_t> wait_input_ports_numbers;

/// If we aggregate partitioned data several chunks might be produced for the same bucket: one for each partition.
bool expect_several_chunks_for_single_bucket_per_source = true;

/// Add chunk read from input to chunks_map, overflow_chunks or single_level_chunks according to it's chunk info.
void addChunk(Chunk chunk, size_t input);
/// Read from all inputs first chunk. It is needed to detect if any source has two-level aggregation.
void readFromAllInputs();
/// Push chunks if all inputs has single level.
bool tryPushSingleLevelData();
/// Push chunks from ready bucket if has one.
Expand Down

0 comments on commit 60163df

Please sign in to comment.