Skip to content

Commit

Permalink
Updated implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
kitaisreal committed Oct 11, 2023
1 parent 0d6fd01 commit ba43d63
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 35 deletions.
Expand Up @@ -106,37 +106,14 @@ IProcessor::Status GroupingAggregatedTransform::prepare(const PortNumbers & upda
return Status::Finished;
}

/// Read first time from each input to understand if we have two-level aggregation.
if (!read_from_all_inputs)
if (!initialized_index_to_input)
{
read_from_all_inputs = true;
initialized_index_to_input = true;
auto in = inputs.begin();
index_to_input.resize(num_inputs);

for (size_t i = 0; i < num_inputs; ++i, ++in)
{
index_to_input[i] = in;

if (in->isFinished())
continue;

in->setNeeded();

if (!in->hasData())
{
wait_input_ports_numbers.insert(i);
continue;
}

auto chunk = in->pull();
addChunk(std::move(chunk), i);
}

if (has_two_level && !single_level_chunks.empty())
return Status::Ready;

if (!wait_input_ports_numbers.empty())
return Status::NeedData;
}

auto need_input = [this](size_t input_num)
Expand All @@ -151,9 +128,10 @@ IProcessor::Status GroupingAggregatedTransform::prepare(const PortNumbers & upda
{
for (const auto & updated_input_port_number : updated_input_ports)
{
auto & input = index_to_input[updated_input_port_number];
// input->setNeeded();
if (!wait_input_ports_numbers.contains(updated_input_port_number))
continue;

auto & input = index_to_input[updated_input_port_number];
if (!input->hasData())
{
wait_input_ports_numbers.erase(updated_input_port_number);
Expand All @@ -169,18 +147,17 @@ IProcessor::Status GroupingAggregatedTransform::prepare(const PortNumbers & upda
wait_input_ports_numbers.erase(updated_input_port_number);
}

if (!output.canPush())
return Status::PortFull;

if (has_two_level && !single_level_chunks.empty())
return Status::Ready;

if (!wait_input_ports_numbers.empty())
return Status::NeedData;
}

if (!output.canPush())
{
for (auto & input : inputs)
input.setNotNeeded();

return Status::PortFull;
}

/// Convert single level to two levels if have two-level input.
if (has_two_level && !single_level_chunks.empty())
Expand Down
@@ -1,6 +1,7 @@
#pragma once

#include <Core/SortDescription.h>
#include <Common/HashTable/HashSet.h>
#include <Interpreters/Aggregator.h>
#include <Processors/IProcessor.h>
#include <Processors/ISimpleTransform.h>
Expand Down Expand Up @@ -84,9 +85,9 @@ class GroupingAggregatedTransform : public IProcessor
bool has_two_level = false;

bool all_inputs_finished = false;
bool read_from_all_inputs = false;
bool initialized_index_to_input = false;
std::vector<InputPorts::iterator> index_to_input;
std::unordered_set<uint64_t> wait_input_ports_numbers;
HashSet<uint64_t> wait_input_ports_numbers;

/// If we aggregate partitioned data several chunks might be produced for the same bucket: one for each partition.
bool expect_several_chunks_for_single_bucket_per_source = true;
Expand Down

0 comments on commit ba43d63

Please sign in to comment.