From ba43d6331087db261f427adbfc4ca4f74cc83cf7 Mon Sep 17 00:00:00 2001 From: Maksim Kita Date: Wed, 11 Oct 2023 17:51:59 +0300 Subject: [PATCH] Updated implementation --- ...gingAggregatedMemoryEfficientTransform.cpp | 43 +++++-------------- ...ergingAggregatedMemoryEfficientTransform.h | 5 ++- 2 files changed, 13 insertions(+), 35 deletions(-) diff --git a/src/Processors/Transforms/MergingAggregatedMemoryEfficientTransform.cpp b/src/Processors/Transforms/MergingAggregatedMemoryEfficientTransform.cpp index d8305d71adbf..a92e2253314a 100644 --- a/src/Processors/Transforms/MergingAggregatedMemoryEfficientTransform.cpp +++ b/src/Processors/Transforms/MergingAggregatedMemoryEfficientTransform.cpp @@ -106,37 +106,14 @@ IProcessor::Status GroupingAggregatedTransform::prepare(const PortNumbers & upda return Status::Finished; } - /// Read first time from each input to understand if we have two-level aggregation. - if (!read_from_all_inputs) + if (!initialized_index_to_input) { - read_from_all_inputs = true; + initialized_index_to_input = true; auto in = inputs.begin(); index_to_input.resize(num_inputs); for (size_t i = 0; i < num_inputs; ++i, ++in) - { index_to_input[i] = in; - - if (in->isFinished()) - continue; - - in->setNeeded(); - - if (!in->hasData()) - { - wait_input_ports_numbers.insert(i); - continue; - } - - auto chunk = in->pull(); - addChunk(std::move(chunk), i); - } - - if (has_two_level && !single_level_chunks.empty()) - return Status::Ready; - - if (!wait_input_ports_numbers.empty()) - return Status::NeedData; } auto need_input = [this](size_t input_num) @@ -151,9 +128,10 @@ IProcessor::Status GroupingAggregatedTransform::prepare(const PortNumbers & upda { for (const auto & updated_input_port_number : updated_input_ports) { - auto & input = index_to_input[updated_input_port_number]; - // input->setNeeded(); + if (!wait_input_ports_numbers.contains(updated_input_port_number)) + continue; + auto & input = index_to_input[updated_input_port_number]; if (!input->hasData()) { wait_input_ports_numbers.erase(updated_input_port_number); @@ -169,18 +147,17 @@ IProcessor::Status GroupingAggregatedTransform::prepare(const PortNumbers & upda wait_input_ports_numbers.erase(updated_input_port_number); } - if (!output.canPush()) - return Status::PortFull; - - if (has_two_level && !single_level_chunks.empty()) - return Status::Ready; - if (!wait_input_ports_numbers.empty()) return Status::NeedData; } if (!output.canPush()) + { + for (auto & input : inputs) + input.setNotNeeded(); + return Status::PortFull; + } /// Convert single level to two levels if have two-level input. if (has_two_level && !single_level_chunks.empty()) diff --git a/src/Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h b/src/Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h index 7fa9947495af..77ee3034ffcf 100644 --- a/src/Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h +++ b/src/Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include #include @@ -84,9 +85,9 @@ class GroupingAggregatedTransform : public IProcessor bool has_two_level = false; bool all_inputs_finished = false; - bool read_from_all_inputs = false; + bool initialized_index_to_input = false; std::vector index_to_input; - std::unordered_set wait_input_ports_numbers; + HashSet wait_input_ports_numbers; /// If we aggregate partitioned data several chunks might be produced for the same bucket: one for each partition. bool expect_several_chunks_for_single_bucket_per_source = true;