From 79b072d27c9f3d44187b5341f36e23add73b7518 Mon Sep 17 00:00:00 2001 From: dqhl76 Date: Wed, 19 Nov 2025 13:56:40 +0800 Subject: [PATCH 01/12] refactor: create a new partial --- .../aggregator/new_aggregate/mod.rs | 1 + .../new_transform_aggregate_partial.rs | 266 ++++++++++++++++++ .../aggregator/transform_aggregate_partial.rs | 3 +- 3 files changed, 268 insertions(+), 2 deletions(-) create mode 100644 src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_aggregate_partial.rs diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/mod.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/mod.rs index a8e330553850c..e2be00d7a7a5a 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/mod.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/mod.rs @@ -15,6 +15,7 @@ mod datablock_splitter; mod new_aggregate_spiller; mod new_final_aggregate_state; +mod new_transform_aggregate_partial; mod new_transform_aggregate_spill_writer; mod new_transform_final_aggregate; mod transform_partition_bucket_scatter; diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_aggregate_partial.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_aggregate_partial.rs new file mode 100644 index 0000000000000..b8f1ac507ecb2 --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_aggregate_partial.rs @@ -0,0 +1,266 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; +use std::time::Instant; +use std::vec; + +use bumpalo::Bump; +use databend_common_base::base::convert_byte_size; +use databend_common_base::base::convert_number_size; +use databend_common_catalog::plan::AggIndexMeta; +use databend_common_exception::Result; +use databend_common_expression::AggregateHashTable; +use databend_common_expression::BlockMetaInfoDowncast; +use databend_common_expression::DataBlock; +use databend_common_expression::HashTableConfig; +use databend_common_expression::PayloadFlushState; +use databend_common_expression::ProbeState; +use databend_common_expression::ProjectedBlock; +use databend_common_pipeline::core::InputPort; +use databend_common_pipeline::core::OutputPort; +use databend_common_pipeline::core::Processor; +use databend_common_pipeline_transforms::processors::AccumulatingTransform; +use databend_common_pipeline_transforms::processors::AccumulatingTransformer; +use databend_common_pipeline_transforms::MemorySettings; + +use crate::pipelines::memory_settings::MemorySettingsExt; +use crate::pipelines::processors::transforms::aggregator::aggregate_meta::AggregateMeta; +use crate::pipelines::processors::transforms::aggregator::AggregatorParams; +use crate::sessions::QueryContext; +#[allow(clippy::enum_variant_names)] +enum HashTable { + MovedOut, + AggregateHashTable(AggregateHashTable), +} + +impl Default for HashTable { + fn default() -> Self { + Self::MovedOut + } +} +pub struct NewTransformPartialAggregate { + hash_table: HashTable, + probe_state: ProbeState, + params: Arc, + start: Instant, + first_block_start: Option, + processed_bytes: usize, + processed_rows: usize, + settings: MemorySettings, +} + +impl NewTransformPartialAggregate { + pub fn try_create( + ctx: Arc, + input: Arc, + output: Arc, + params: Arc, + config: HashTableConfig, + ) -> Result> { + let arena = Arc::new(Bump::new()); + let hash_table = HashTable::AggregateHashTable(AggregateHashTable::new( + params.group_data_types.clone(), + params.aggregate_functions.clone(), + config, + arena, + )); + + Ok(AccumulatingTransformer::create( + input, + output, + NewTransformPartialAggregate { + params, + hash_table, + probe_state: ProbeState::default(), + settings: MemorySettings::from_aggregate_settings(&ctx)?, + start: Instant::now(), + first_block_start: None, + processed_bytes: 0, + processed_rows: 0, + }, + )) + } + + #[inline(always)] + fn aggregate_arguments<'a>( + block: &'a DataBlock, + aggregate_functions_arguments: &'a [Vec], + ) -> Vec> { + aggregate_functions_arguments + .iter() + .map(|function_arguments| ProjectedBlock::project(function_arguments, block)) + .collect::>() + } + + #[inline(always)] + fn execute_one_block(&mut self, block: DataBlock) -> Result<()> { + let is_agg_index_block = block + .get_meta() + .and_then(AggIndexMeta::downcast_ref_from) + .map(|index| index.is_agg) + .unwrap_or_default(); + + let block = block.consume_convert_to_full(); + let group_columns = ProjectedBlock::project(&self.params.group_columns, &block); + let rows_num = block.num_rows(); + + self.processed_bytes += block.memory_size(); + self.processed_rows += rows_num; + if self.first_block_start.is_none() { + self.first_block_start = Some(Instant::now()); + } + + { + match &mut self.hash_table { + HashTable::MovedOut => { + unreachable!("[TRANSFORM-AGGREGATOR] Hash table already moved out") + } + HashTable::AggregateHashTable(hashtable) => { + let (params_columns, states_index) = if is_agg_index_block { + let num_columns = block.num_columns(); + let states_count = self + .params + .states_layout + .as_ref() + .map(|layout| layout.num_aggr_func()) + .unwrap_or(0); + ( + vec![], + (num_columns - states_count..num_columns).collect::>(), + ) + } else { + ( + Self::aggregate_arguments( + &block, + &self.params.aggregate_functions_arguments, + ), + vec![], + ) + }; + + let agg_states = if !states_index.is_empty() { + ProjectedBlock::project(&states_index, &block) + } else { + (&[]).into() + }; + + let _ = hashtable.add_groups( + &mut self.probe_state, + group_columns, + ¶ms_columns, + agg_states, + rows_num, + )?; + Ok(()) + } + } + } + } + + fn spill_out(&mut self) -> Result> { + if let HashTable::AggregateHashTable(v) = std::mem::take(&mut self.hash_table) { + let group_types = v.payload.group_types.clone(); + let aggrs = v.payload.aggrs.clone(); + v.config.update_current_max_radix_bits(); + let config = v + .config + .clone() + .with_initial_radix_bits(v.config.max_radix_bits); + + let mut state = PayloadFlushState::default(); + + // repartition to max for normalization + let partitioned_payload = v + .payload + .repartition(1 << config.max_radix_bits, &mut state); + + let blocks = vec![DataBlock::empty_with_meta( + AggregateMeta::create_agg_spilling(partitioned_payload), + )]; + + let arena = Arc::new(Bump::new()); + self.hash_table = HashTable::AggregateHashTable(AggregateHashTable::new( + group_types, + aggrs, + config, + arena, + )); + return Ok(blocks); + } + unreachable!("[TRANSFORM-AGGREGATOR] Invalid hash table state during spill check") + } +} + +impl AccumulatingTransform for NewTransformPartialAggregate { + const NAME: &'static str = "NewTransformPartialAggregate"; + + fn transform(&mut self, block: DataBlock) -> Result> { + self.execute_one_block(block)?; + + if self.settings.check_spill() { + self.spill_out()?; + } + + Ok(vec![]) + } + + fn on_finish(&mut self, output: bool) -> Result> { + Ok(match std::mem::take(&mut self.hash_table) { + HashTable::MovedOut => match !output && std::thread::panicking() { + true => vec![], + false => { + unreachable!("[TRANSFORM-AGGREGATOR] Hash table already moved out in finish") + } + }, + HashTable::AggregateHashTable(hashtable) => { + let partition_count = hashtable.payload.partition_count(); + let mut blocks = Vec::with_capacity(partition_count); + + log::info!( + "[TRANSFORM-AGGREGATOR] Aggregation completed: {} → {} rows in {:.2}s (real: {:.2}s), throughput: {} rows/sec, {}/sec, total: {}", + self.processed_rows, + hashtable.payload.len(), + self.start.elapsed().as_secs_f64(), + if let Some(t) = &self.first_block_start { + t.elapsed().as_secs_f64() + } else { + self.start.elapsed().as_secs_f64() + }, + convert_number_size( + self.processed_rows as f64 / self.start.elapsed().as_secs_f64() + ), + convert_byte_size( + self.processed_bytes as f64 / self.start.elapsed().as_secs_f64() + ), + convert_byte_size(self.processed_bytes as f64), + ); + + for (bucket, payload) in hashtable.payload.payloads.into_iter().enumerate() { + if payload.len() != 0 { + blocks.push(DataBlock::empty_with_meta( + AggregateMeta::create_agg_payload( + bucket as isize, + payload, + partition_count, + ), + )); + } + } + + blocks + } + }) + } +} diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_partial.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_partial.rs index 43fdf2221cdb1..11bb79c0f44e1 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_partial.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_partial.rs @@ -74,8 +74,7 @@ impl TransformPartialAggregate { let arena = Arc::new(Bump::new()); // when enable_experiment_aggregate, we will repartition again in the final stage // it will be too small if we use max radix bits here - let hash_table = if params.has_distinct_combinator() && !params.enable_experiment_aggregate - { + let hash_table = if params.has_distinct_combinator() { let max_radix_bits = config.max_radix_bits; HashTable::AggregateHashTable(AggregateHashTable::new( params.group_data_types.clone(), From db2ae2c8d3b8ab5be8c3061577066686ba4a8c35 Mon Sep 17 00:00:00 2001 From: dqhl76 Date: Thu, 20 Nov 2025 11:46:30 +0800 Subject: [PATCH 02/12] refactor: move aggregation spill logic into partial aggregate --- .../perf/flamegraph_main_template.html | 4 +- .../physical_aggregate_partial.rs | 54 +++- .../aggregator/aggregate_exchange_injector.rs | 2 +- .../processors/transforms/aggregator/mod.rs | 3 +- .../aggregator/new_aggregate/mod.rs | 1 + .../new_transform_aggregate_partial.rs | 280 +++++++++++++++--- .../v1/exchange/exchange_transform_scatter.rs | 9 + 7 files changed, 297 insertions(+), 56 deletions(-) diff --git a/src/common/base/src/runtime/perf/flamegraph_main_template.html b/src/common/base/src/runtime/perf/flamegraph_main_template.html index d487156a9ff21..8b9b3f76f57db 100644 --- a/src/common/base/src/runtime/perf/flamegraph_main_template.html +++ b/src/common/base/src/runtime/perf/flamegraph_main_template.html @@ -48,8 +48,6 @@

Query Performance Flamegraphs

- \ No newline at end of file + diff --git a/src/query/service/src/physical_plans/physical_aggregate_partial.rs b/src/query/service/src/physical_plans/physical_aggregate_partial.rs index 82d3d73f6f981..f180a2e5c04c8 100644 --- a/src/query/service/src/physical_plans/physical_aggregate_partial.rs +++ b/src/query/service/src/physical_plans/physical_aggregate_partial.rs @@ -38,6 +38,7 @@ use databend_common_sql::IndexType; use databend_common_storage::DataOperator; use itertools::Itertools; +use crate::clusters::ClusterHelper; use crate::physical_plans::explain::PlanStatsInfo; use crate::physical_plans::format::AggregatePartialFormatter; use crate::physical_plans::format::PhysicalFormat; @@ -46,6 +47,7 @@ use crate::physical_plans::physical_plan::PhysicalPlan; use crate::physical_plans::physical_plan::PhysicalPlanMeta; use crate::pipelines::processors::transforms::aggregator::AggregateInjector; use crate::pipelines::processors::transforms::aggregator::NewTransformAggregateSpillWriter; +use crate::pipelines::processors::transforms::aggregator::NewTransformPartialAggregate; use crate::pipelines::processors::transforms::aggregator::PartialSingleStateAggregator; use crate::pipelines::processors::transforms::aggregator::SharedPartitionStream; use crate::pipelines::processors::transforms::aggregator::TransformAggregateSpillWriter; @@ -232,15 +234,49 @@ impl IPhysicalPlan for AggregatePartial { }); } - builder.main_pipeline.add_transform(|input, output| { - Ok(ProcessorPtr::create(TransformPartialAggregate::try_create( - builder.ctx.clone(), - input, - output, - params.clone(), - partial_agg_config.clone(), - )?)) - })?; + if params.enable_experiment_aggregate { + let cluster = &builder.ctx.get_cluster(); + let streams_num = if !builder.is_exchange_parent() { + 1 + } else { + cluster.nodes.len() + }; + let local_pos = cluster.ordered_index(); + let shared_partition_streams = (0..streams_num) + .map(|_| { + SharedPartitionStream::new( + builder.main_pipeline.output_len(), + max_block_rows, + max_block_bytes, + MAX_AGGREGATE_HASHTABLE_BUCKETS_NUM as usize, + ) + }) + .collect::>(); + + builder.main_pipeline.add_transform(|input, output| { + Ok(ProcessorPtr::create( + NewTransformPartialAggregate::try_create( + builder.ctx.clone(), + input, + output, + params.clone(), + partial_agg_config.clone(), + shared_partition_streams.clone(), + local_pos, + )?, + )) + })?; + } else { + builder.main_pipeline.add_transform(|input, output| { + Ok(ProcessorPtr::create(TransformPartialAggregate::try_create( + builder.ctx.clone(), + input, + output, + params.clone(), + partial_agg_config.clone(), + )?)) + })?; + } // If cluster mode, spill write will be completed in exchange serialize, because we need scatter the block data first if !builder.is_exchange_parent() { diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs index 3c734d5352b6d..82e5dcc9772a1 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs @@ -117,7 +117,7 @@ fn scatter_payload(mut payload: Payload, buckets: usize) -> Result> Ok(buckets) } -fn scatter_partitioned_payload( +pub fn scatter_partitioned_payload( partitioned_payload: PartitionedPayload, buckets: usize, ) -> Result> { diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/mod.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/mod.rs index eb2e69a74e47c..880668c2a932b 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/mod.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/mod.rs @@ -29,8 +29,7 @@ pub use aggregate_exchange_injector::AggregateInjector; pub use aggregate_meta::*; pub use aggregator_params::AggregatorParams; pub use build_partition_bucket::build_partition_bucket; -pub use new_aggregate::NewTransformAggregateSpillWriter; -pub use new_aggregate::SharedPartitionStream; +pub use new_aggregate::*; pub use transform_aggregate_expand::TransformExpandGroupingSets; pub use transform_aggregate_final::TransformFinalAggregate; pub use transform_aggregate_partial::TransformPartialAggregate; diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/mod.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/mod.rs index e2be00d7a7a5a..589d240694098 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/mod.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/mod.rs @@ -24,6 +24,7 @@ pub use datablock_splitter::split_partitioned_meta_into_datablocks; pub use new_aggregate_spiller::NewAggregateSpiller; pub use new_aggregate_spiller::SharedPartitionStream; pub use new_final_aggregate_state::FinalAggregateSharedState; +pub use new_transform_aggregate_partial::NewTransformPartialAggregate; pub use new_transform_aggregate_spill_writer::NewTransformAggregateSpillWriter; pub use new_transform_final_aggregate::NewFinalAggregateTransform; pub use transform_partition_bucket_scatter::TransformPartitionBucketScatter; diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_aggregate_partial.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_aggregate_partial.rs index b8f1ac507ecb2..ff28c1ee3c836 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_aggregate_partial.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_aggregate_partial.rs @@ -21,24 +21,40 @@ use databend_common_base::base::convert_byte_size; use databend_common_base::base::convert_number_size; use databend_common_catalog::plan::AggIndexMeta; use databend_common_exception::Result; +use databend_common_expression::types::BinaryType; +use databend_common_expression::types::Int64Type; +use databend_common_expression::types::StringType; use databend_common_expression::AggregateHashTable; use databend_common_expression::BlockMetaInfoDowncast; use databend_common_expression::DataBlock; +use databend_common_expression::FromData; use databend_common_expression::HashTableConfig; +use databend_common_expression::PartitionedPayload; use databend_common_expression::PayloadFlushState; use databend_common_expression::ProbeState; use databend_common_expression::ProjectedBlock; +use databend_common_expression::MAX_AGGREGATE_HASHTABLE_BUCKETS_NUM; use databend_common_pipeline::core::InputPort; use databend_common_pipeline::core::OutputPort; use databend_common_pipeline::core::Processor; use databend_common_pipeline_transforms::processors::AccumulatingTransform; use databend_common_pipeline_transforms::processors::AccumulatingTransformer; use databend_common_pipeline_transforms::MemorySettings; +use databend_common_storages_parquet::serialize_row_group_meta_to_bytes; use crate::pipelines::memory_settings::MemorySettingsExt; +use crate::pipelines::processors::transforms::aggregator::aggregate_exchange_injector::scatter_partitioned_payload; use crate::pipelines::processors::transforms::aggregator::aggregate_meta::AggregateMeta; +use crate::pipelines::processors::transforms::aggregator::exchange_defines; +use crate::pipelines::processors::transforms::aggregator::AggregateSerdeMeta; use crate::pipelines::processors::transforms::aggregator::AggregatorParams; +use crate::pipelines::processors::transforms::aggregator::FlightSerialized; +use crate::pipelines::processors::transforms::aggregator::FlightSerializedMeta; +use crate::pipelines::processors::transforms::aggregator::NewAggregateSpiller; +use crate::pipelines::processors::transforms::aggregator::SharedPartitionStream; +use crate::servers::flight::v1::exchange::serde::serialize_block; use crate::sessions::QueryContext; + #[allow(clippy::enum_variant_names)] enum HashTable { MovedOut, @@ -50,15 +66,209 @@ impl Default for HashTable { Self::MovedOut } } -pub struct NewTransformPartialAggregate { - hash_table: HashTable, - probe_state: ProbeState, - params: Arc, + +struct PartialAggregationStatistics { start: Instant, first_block_start: Option, processed_bytes: usize, processed_rows: usize, +} + +impl PartialAggregationStatistics { + fn new() -> Self { + Self { + start: Instant::now(), + first_block_start: None, + processed_bytes: 0, + processed_rows: 0, + } + } + + fn record_block(&mut self, rows: usize, bytes: usize) { + self.processed_rows += rows; + self.processed_bytes += bytes; + if self.first_block_start.is_none() { + self.first_block_start = Some(Instant::now()); + } + } + + fn log_finish_statistics(&self, hashtable: &AggregateHashTable) { + let elapsed = self.start.elapsed().as_secs_f64(); + let real_elapsed = self + .first_block_start + .as_ref() + .map(|t| t.elapsed().as_secs_f64()) + .unwrap_or(elapsed); + + log::info!( + "[TRANSFORM-AGGREGATOR] Aggregation completed: {} → {} rows in {:.2}s (real: {:.2}s), throughput: {} rows/sec, {}/sec, total: {}", + self.processed_rows, + hashtable.payload.len(), + elapsed, + real_elapsed, + convert_number_size(self.processed_rows as f64 / elapsed), + convert_byte_size(self.processed_bytes as f64 / elapsed), + convert_byte_size(self.processed_bytes as f64), + ); + } +} + +enum Spiller { + Standalone(NewAggregateSpiller), + // (local_pos, spillers for all) + Clusters(usize, Vec), +} + +impl Spiller { + pub fn create( + ctx: Arc, + partition_streams: Vec, + local_pos: usize, + ) -> Result { + match partition_streams.len() { + 1 => { + let spiller = NewAggregateSpiller::try_create( + ctx.clone(), + MAX_AGGREGATE_HASHTABLE_BUCKETS_NUM as usize, + partition_streams[0].clone(), + )?; + Ok(Spiller::Standalone(spiller)) + } + _ => { + let spillers = partition_streams + .into_iter() + .map(|stream| { + NewAggregateSpiller::try_create( + ctx.clone(), + MAX_AGGREGATE_HASHTABLE_BUCKETS_NUM as usize, + stream, + ) + }) + .collect::>>()?; + Ok(Spiller::Clusters(local_pos, spillers)) + } + } + } + + fn spill_partition( + spiller: &mut NewAggregateSpiller, + partition: PartitionedPayload, + ) -> Result<()> { + for (bucket, payload) in partition.payloads.into_iter().enumerate() { + if payload.len() == 0 { + continue; + } + + let data_block = payload.aggregate_flush_all()?.consume_convert_to_full(); + spiller.spill(bucket, data_block)?; + } + + Ok(()) + } + + pub fn spill(&mut self, partition: PartitionedPayload) -> Result<()> { + match self { + Spiller::Standalone(spiller) => Self::spill_partition(spiller, partition), + Spiller::Clusters(_, spillers) => { + let nodes_num = spillers.len(); + for (idx, partition) in scatter_partitioned_payload(partition, nodes_num)? + .into_iter() + .enumerate() + { + Self::spill_partition(&mut spillers[idx], partition)?; + } + + Ok(()) + } + } + } + + fn finish_standalone(spiller: &mut NewAggregateSpiller) -> Result> { + let payloads = spiller.spill_finish()?; + if payloads.is_empty() { + return Ok(vec![]); + } + + Ok(vec![DataBlock::empty_with_meta( + AggregateMeta::create_new_spilled(payloads), + )]) + } + + fn finish_clusters( + local_pos: usize, + spillers: &mut [NewAggregateSpiller], + ) -> Result> { + let mut serialized_blocks = Vec::with_capacity(spillers.len()); + let write_options = exchange_defines::spilled_write_options(); + + for (index, spiller) in spillers.iter_mut().enumerate() { + let spilled_payloads = spiller.spill_finish()?; + + if index == local_pos { + let block = if spilled_payloads.is_empty() { + DataBlock::empty() + } else { + DataBlock::empty_with_meta(AggregateMeta::create_new_spilled(spilled_payloads)) + }; + serialized_blocks.push(FlightSerialized::DataBlock(block)); + continue; + } + + if spilled_payloads.is_empty() { + serialized_blocks.push(FlightSerialized::DataBlock(serialize_block( + -1, + DataBlock::empty(), + &write_options, + )?)); + continue; + } + + let mut bucket_column = Vec::with_capacity(spilled_payloads.len()); + let mut row_group_column = Vec::with_capacity(spilled_payloads.len()); + let mut location_column = Vec::with_capacity(spilled_payloads.len()); + for payload in spilled_payloads { + bucket_column.push(payload.bucket as i64); + location_column.push(payload.location); + row_group_column.push(serialize_row_group_meta_to_bytes(&payload.row_group)?); + } + + let data_block = DataBlock::new_from_columns(vec![ + Int64Type::from_data(bucket_column), + StringType::from_data(location_column), + BinaryType::from_data(row_group_column), + ]); + let meta = AggregateSerdeMeta::create_new_spilled(); + let data_block = data_block.add_meta(Some(meta))?; + serialized_blocks.push(FlightSerialized::DataBlock(serialize_block( + -1, + data_block, + &write_options, + )?)); + } + + Ok(vec![DataBlock::empty_with_meta( + FlightSerializedMeta::create(serialized_blocks), + )]) + } + + pub fn finish(&mut self) -> Result> { + match self { + Spiller::Standalone(spiller) => Self::finish_standalone(spiller), + Spiller::Clusters(local_pos, spillers) => Self::finish_clusters(*local_pos, spillers), + } + } +} + +/// NewTransformPartialAggregate combine partial aggregation and spilling logic +/// When memory exceeds threshold, it will spill out current hash table into a buffer +/// and real spill out will happen when the buffer is full. +pub struct NewTransformPartialAggregate { + hash_table: HashTable, + probe_state: ProbeState, + params: Arc, + statistics: PartialAggregationStatistics, settings: MemorySettings, + spillers: Spiller, } impl NewTransformPartialAggregate { @@ -68,7 +278,11 @@ impl NewTransformPartialAggregate { output: Arc, params: Arc, config: HashTableConfig, + partition_streams: Vec, + local_pos: usize, ) -> Result> { + let spillers = Spiller::create(ctx.clone(), partition_streams, local_pos)?; + let arena = Arc::new(Bump::new()); let hash_table = HashTable::AggregateHashTable(AggregateHashTable::new( params.group_data_types.clone(), @@ -85,10 +299,8 @@ impl NewTransformPartialAggregate { hash_table, probe_state: ProbeState::default(), settings: MemorySettings::from_aggregate_settings(&ctx)?, - start: Instant::now(), - first_block_start: None, - processed_bytes: 0, - processed_rows: 0, + statistics: PartialAggregationStatistics::new(), + spillers, }, )) } @@ -115,12 +327,9 @@ impl NewTransformPartialAggregate { let block = block.consume_convert_to_full(); let group_columns = ProjectedBlock::project(&self.params.group_columns, &block); let rows_num = block.num_rows(); + let block_bytes = block.memory_size(); - self.processed_bytes += block.memory_size(); - self.processed_rows += rows_num; - if self.first_block_start.is_none() { - self.first_block_start = Some(Instant::now()); - } + self.statistics.record_block(rows_num, block_bytes); { match &mut self.hash_table { @@ -169,7 +378,7 @@ impl NewTransformPartialAggregate { } } - fn spill_out(&mut self) -> Result> { + fn spill_out(&mut self) -> Result<()> { if let HashTable::AggregateHashTable(v) = std::mem::take(&mut self.hash_table) { let group_types = v.payload.group_types.clone(); let aggrs = v.payload.aggrs.clone(); @@ -182,13 +391,11 @@ impl NewTransformPartialAggregate { let mut state = PayloadFlushState::default(); // repartition to max for normalization - let partitioned_payload = v + let partition = v .payload .repartition(1 << config.max_radix_bits, &mut state); - let blocks = vec![DataBlock::empty_with_meta( - AggregateMeta::create_agg_spilling(partitioned_payload), - )]; + self.spillers.spill(partition)?; let arena = Arc::new(Bump::new()); self.hash_table = HashTable::AggregateHashTable(AggregateHashTable::new( @@ -197,9 +404,14 @@ impl NewTransformPartialAggregate { config, arena, )); - return Ok(blocks); + } else { + unreachable!("[TRANSFORM-AGGREGATOR] Invalid hash table state during spill check") } - unreachable!("[TRANSFORM-AGGREGATOR] Invalid hash table state during spill check") + Ok(()) + } + + fn spill_finish(&mut self) -> Result> { + self.spillers.finish() } } @@ -225,31 +437,16 @@ impl AccumulatingTransform for NewTransformPartialAggregate { } }, HashTable::AggregateHashTable(hashtable) => { + let mut blocks = self.spill_finish()?; + let partition_count = hashtable.payload.partition_count(); - let mut blocks = Vec::with_capacity(partition_count); - - log::info!( - "[TRANSFORM-AGGREGATOR] Aggregation completed: {} → {} rows in {:.2}s (real: {:.2}s), throughput: {} rows/sec, {}/sec, total: {}", - self.processed_rows, - hashtable.payload.len(), - self.start.elapsed().as_secs_f64(), - if let Some(t) = &self.first_block_start { - t.elapsed().as_secs_f64() - } else { - self.start.elapsed().as_secs_f64() - }, - convert_number_size( - self.processed_rows as f64 / self.start.elapsed().as_secs_f64() - ), - convert_byte_size( - self.processed_bytes as f64 / self.start.elapsed().as_secs_f64() - ), - convert_byte_size(self.processed_bytes as f64), - ); + let mut memory_blocks = Vec::with_capacity(partition_count); + + self.statistics.log_finish_statistics(&hashtable); for (bucket, payload) in hashtable.payload.payloads.into_iter().enumerate() { if payload.len() != 0 { - blocks.push(DataBlock::empty_with_meta( + memory_blocks.push(DataBlock::empty_with_meta( AggregateMeta::create_agg_payload( bucket as isize, payload, @@ -259,6 +456,7 @@ impl AccumulatingTransform for NewTransformPartialAggregate { } } + blocks.extend(memory_blocks); blocks } }) diff --git a/src/query/service/src/servers/flight/v1/exchange/exchange_transform_scatter.rs b/src/query/service/src/servers/flight/v1/exchange/exchange_transform_scatter.rs index 9344b898cd573..3bc4243d3c2aa 100644 --- a/src/query/service/src/servers/flight/v1/exchange/exchange_transform_scatter.rs +++ b/src/query/service/src/servers/flight/v1/exchange/exchange_transform_scatter.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use databend_common_expression::BlockMetaInfoDowncast; use databend_common_expression::DataBlock; use databend_common_pipeline::core::InputPort; use databend_common_pipeline::core::OutputPort; @@ -22,6 +23,7 @@ use databend_common_pipeline_transforms::processors::Transform; use databend_common_pipeline_transforms::processors::Transformer; use super::exchange_transform_shuffle::ExchangeShuffleMeta; +use crate::pipelines::processors::transforms::aggregator::FlightSerializedMeta; use crate::servers::flight::v1::scatter::FlightScatter; pub struct ScatterTransform { @@ -48,6 +50,13 @@ impl Transform for ScatterTransform { } fn transform(&mut self, data: DataBlock) -> databend_common_exception::Result { + if let Some(meta) = data.get_meta() { + if FlightSerializedMeta::downcast_ref_from(meta).is_some() { + // Already scattered and serialized, just pass through + return Ok(data); + } + } + let blocks = self.scatter.execute(data)?; Ok(DataBlock::empty_with_meta(ExchangeShuffleMeta::create( From 46293b10de797fa1303362ea47a6cfa5010fbc1f Mon Sep 17 00:00:00 2001 From: dqhl76 Date: Thu, 20 Nov 2025 15:23:03 +0800 Subject: [PATCH 03/12] refactor: move aggregation spill logic into partial aggregate --- .../physical_aggregate_partial.rs | 46 +-- .../aggregator/aggregate_exchange_injector.rs | 36 +-- .../aggregator/new_aggregate/mod.rs | 2 - .../new_aggregate/new_aggregate_spiller.rs | 2 +- .../new_transform_aggregate_partial.rs | 8 +- .../new_transform_aggregate_spill_writer.rs | 122 ------- ...transform_exchange_aggregate_serializer.rs | 299 ++++-------------- 7 files changed, 81 insertions(+), 434 deletions(-) diff --git a/src/query/service/src/physical_plans/physical_aggregate_partial.rs b/src/query/service/src/physical_plans/physical_aggregate_partial.rs index f180a2e5c04c8..cf1f52ef825ee 100644 --- a/src/query/service/src/physical_plans/physical_aggregate_partial.rs +++ b/src/query/service/src/physical_plans/physical_aggregate_partial.rs @@ -46,7 +46,6 @@ use crate::physical_plans::physical_plan::IPhysicalPlan; use crate::physical_plans::physical_plan::PhysicalPlan; use crate::physical_plans::physical_plan::PhysicalPlanMeta; use crate::pipelines::processors::transforms::aggregator::AggregateInjector; -use crate::pipelines::processors::transforms::aggregator::NewTransformAggregateSpillWriter; use crate::pipelines::processors::transforms::aggregator::NewTransformPartialAggregate; use crate::pipelines::processors::transforms::aggregator::PartialSingleStateAggregator; use crate::pipelines::processors::transforms::aggregator::SharedPartitionStream; @@ -279,40 +278,21 @@ impl IPhysicalPlan for AggregatePartial { } // If cluster mode, spill write will be completed in exchange serialize, because we need scatter the block data first - if !builder.is_exchange_parent() { + if !builder.is_exchange_parent() && !params.enable_experiment_aggregate { let operator = DataOperator::instance().spill_operator(); let location_prefix = builder.ctx.query_id_spill_prefix(); - if params.enable_experiment_aggregate { - let shared_partition_stream = SharedPartitionStream::new( - builder.main_pipeline.output_len(), - max_block_rows, - max_block_bytes, - MAX_AGGREGATE_HASHTABLE_BUCKETS_NUM as usize, - ); - builder.main_pipeline.add_transform(|input, output| { - Ok(ProcessorPtr::create( - NewTransformAggregateSpillWriter::try_create( - input, - output, - builder.ctx.clone(), - shared_partition_stream.clone(), - )?, - )) - })?; - } else { - builder.main_pipeline.add_transform(|input, output| { - Ok(ProcessorPtr::create( - TransformAggregateSpillWriter::try_create( - builder.ctx.clone(), - input, - output, - operator.clone(), - params.clone(), - location_prefix.clone(), - )?, - )) - })?; - } + builder.main_pipeline.add_transform(|input, output| { + Ok(ProcessorPtr::create( + TransformAggregateSpillWriter::try_create( + builder.ctx.clone(), + input, + output, + operator.clone(), + params.clone(), + location_prefix.clone(), + )?, + )) + })?; } builder.exchange_injector = AggregateInjector::create(builder.ctx.clone(), params.clone()); diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs index 82e5dcc9772a1..bdb27d44eb425 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs @@ -22,7 +22,6 @@ use databend_common_expression::DataBlock; use databend_common_expression::PartitionedPayload; use databend_common_expression::Payload; use databend_common_expression::PayloadFlushState; -use databend_common_expression::MAX_AGGREGATE_HASHTABLE_BUCKETS_NUM; use databend_common_pipeline::core::Pipeline; use databend_common_pipeline::core::ProcessorPtr; use databend_common_settings::FlightCompression; @@ -32,8 +31,6 @@ use crate::pipelines::processors::transforms::aggregator::aggregate_meta::Aggreg use crate::pipelines::processors::transforms::aggregator::serde::TransformExchangeAggregateSerializer; use crate::pipelines::processors::transforms::aggregator::serde::TransformExchangeAsyncBarrier; use crate::pipelines::processors::transforms::aggregator::AggregatorParams; -use crate::pipelines::processors::transforms::aggregator::NewTransformAggregateSpillWriter; -use crate::pipelines::processors::transforms::aggregator::SharedPartitionStream; use crate::pipelines::processors::transforms::aggregator::TransformAggregateDeserializer; use crate::pipelines::processors::transforms::aggregator::TransformAggregateSerializer; use crate::pipelines::processors::transforms::aggregator::TransformAggregateSpillWriter; @@ -264,25 +261,7 @@ impl ExchangeInjector for AggregateInjector { ) -> Result<()> { let params = self.aggregator_params.clone(); - if self.aggregator_params.enable_experiment_aggregate { - let shared_partition_stream = SharedPartitionStream::new( - pipeline.output_len(), - self.aggregator_params.max_block_rows, - self.aggregator_params.max_block_bytes, - MAX_AGGREGATE_HASHTABLE_BUCKETS_NUM as usize, - ); - - pipeline.add_transform(|input, output| { - Ok(ProcessorPtr::create( - NewTransformAggregateSpillWriter::try_create( - input, - output, - self.ctx.clone(), - shared_partition_stream.clone(), - )?, - )) - })?; - } else { + if !self.aggregator_params.enable_experiment_aggregate { let operator = DataOperator::instance().spill_operator(); let location_prefix = self.ctx.query_id_spill_prefix(); @@ -322,18 +301,6 @@ impl ExchangeInjector for AggregateInjector { .position(|x| x == local_id) .unwrap(); - let mut partition_streams = vec![]; - if self.aggregator_params.enable_experiment_aggregate { - for _i in 0..shuffle_params.destination_ids.len() { - partition_streams.push(SharedPartitionStream::new( - pipeline.output_len(), - self.aggregator_params.max_block_rows, - self.aggregator_params.max_block_bytes, - MAX_AGGREGATE_HASHTABLE_BUCKETS_NUM as usize, - )); - } - } - pipeline.add_transform(|input, output| { Ok(ProcessorPtr::create( TransformExchangeAggregateSerializer::try_create( @@ -345,7 +312,6 @@ impl ExchangeInjector for AggregateInjector { params.clone(), compression, local_pos, - partition_streams.clone(), )?, )) })?; diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/mod.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/mod.rs index 589d240694098..b380ccbd7facb 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/mod.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/mod.rs @@ -16,7 +16,6 @@ mod datablock_splitter; mod new_aggregate_spiller; mod new_final_aggregate_state; mod new_transform_aggregate_partial; -mod new_transform_aggregate_spill_writer; mod new_transform_final_aggregate; mod transform_partition_bucket_scatter; @@ -25,6 +24,5 @@ pub use new_aggregate_spiller::NewAggregateSpiller; pub use new_aggregate_spiller::SharedPartitionStream; pub use new_final_aggregate_state::FinalAggregateSharedState; pub use new_transform_aggregate_partial::NewTransformPartialAggregate; -pub use new_transform_aggregate_spill_writer::NewTransformAggregateSpillWriter; pub use new_transform_final_aggregate::NewFinalAggregateTransform; pub use transform_partition_bucket_scatter::TransformPartitionBucketScatter; diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs index 2bf23f71fc910..3fa2df90e9702 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs @@ -328,7 +328,7 @@ impl NewAggregateSpiller { self.payload_writers.write_ready_blocks(pending_blocks)?; let payloads = self.payload_writers.finalize()?; - debug!( + info!( "[NewAggregateSpiller] spill finish with {} payloads", payloads.len() ); diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_aggregate_partial.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_aggregate_partial.rs index ff28c1ee3c836..440c632750d8c 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_aggregate_partial.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_aggregate_partial.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::fmt::Debug; use std::sync::Arc; use std::time::Instant; use std::vec; @@ -41,6 +42,7 @@ use databend_common_pipeline_transforms::processors::AccumulatingTransform; use databend_common_pipeline_transforms::processors::AccumulatingTransformer; use databend_common_pipeline_transforms::MemorySettings; use databend_common_storages_parquet::serialize_row_group_meta_to_bytes; +use log::info; use crate::pipelines::memory_settings::MemorySettingsExt; use crate::pipelines::processors::transforms::aggregator::aggregate_exchange_injector::scatter_partitioned_payload; @@ -409,10 +411,6 @@ impl NewTransformPartialAggregate { } Ok(()) } - - fn spill_finish(&mut self) -> Result> { - self.spillers.finish() - } } impl AccumulatingTransform for NewTransformPartialAggregate { @@ -437,7 +435,7 @@ impl AccumulatingTransform for NewTransformPartialAggregate { } }, HashTable::AggregateHashTable(hashtable) => { - let mut blocks = self.spill_finish()?; + let mut blocks = self.spillers.finish()?; let partition_count = hashtable.payload.partition_count(); let mut memory_blocks = Vec::with_capacity(partition_count); diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_aggregate_spill_writer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_aggregate_spill_writer.rs index bcc57679065f2..e69de29bb2d1d 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_aggregate_spill_writer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_aggregate_spill_writer.rs @@ -1,122 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::Arc; - -use databend_common_exception::Result; -use databend_common_expression::BlockMetaInfoDowncast; -use databend_common_expression::DataBlock; -use databend_common_expression::MAX_AGGREGATE_HASHTABLE_BUCKETS_NUM; -use databend_common_pipeline::core::InputPort; -use databend_common_pipeline::core::OutputPort; -use databend_common_pipeline::core::Processor; -use databend_common_pipeline_transforms::AccumulatingTransform; -use databend_common_pipeline_transforms::AccumulatingTransformer; - -use crate::pipelines::processors::transforms::aggregator::new_aggregate::NewAggregateSpiller; -use crate::pipelines::processors::transforms::aggregator::new_aggregate::SharedPartitionStream; -use crate::pipelines::processors::transforms::aggregator::AggregateMeta; -use crate::sessions::QueryContext; - -pub struct NewTransformAggregateSpillWriter { - finished: bool, - pub spiller: NewAggregateSpiller, -} - -impl NewTransformAggregateSpillWriter { - pub fn try_create( - input: Arc, - output: Arc, - ctx: Arc, - shared_partition_stream: SharedPartitionStream, - ) -> Result> { - let partition_count = MAX_AGGREGATE_HASHTABLE_BUCKETS_NUM as usize; - let spiller = - NewAggregateSpiller::try_create(ctx, partition_count, shared_partition_stream, true)?; - - Ok(AccumulatingTransformer::create( - input, - output, - NewTransformAggregateSpillWriter { - finished: false, - spiller, - }, - )) - } - - fn finish(&mut self) -> Result> { - if self.finished { - return Ok(vec![]); - } - self.finished = true; - let spilled_payloads = self.spiller.spill_finish()?; - - let mut spilled_blocks = Vec::with_capacity(spilled_payloads.len()); - for payload in spilled_payloads { - spilled_blocks.push(DataBlock::empty_with_meta( - AggregateMeta::create_new_bucket_spilled(payload), - )); - } - - Ok(spilled_blocks) - } -} - -impl AccumulatingTransform for NewTransformAggregateSpillWriter { - const NAME: &'static str = "NewTransformAggregateSpillWriter"; - - fn transform(&mut self, mut data: DataBlock) -> Result> { - if let Some(block_meta) = data.get_meta().and_then(AggregateMeta::downcast_ref_from) { - if matches!(block_meta, AggregateMeta::AggregatePayload(_)) { - // As soon as a non-spilled AggregatePayload shows up we must flush any pending - // spill files. AggregatePayload shows that partial stage is over, no more spilling - // will happen. - let mut blocks = self.finish()?; - blocks.push(data); - return Ok(blocks); - } - - if matches!(block_meta, AggregateMeta::AggregateSpilling(_)) { - let meta = data.take_meta().unwrap(); - let aggregate_meta = AggregateMeta::downcast_from(meta).unwrap(); - if let AggregateMeta::AggregateSpilling(partition) = aggregate_meta { - // we use fixed size partitioning, the different bucket number will caused spilled data can't be merged correctly - debug_assert_eq!( - partition.payloads.len() as u64, - MAX_AGGREGATE_HASHTABLE_BUCKETS_NUM, - "the number of payloads should be equal to MAX_AGGREGATE_HASHTABLE_BUCKETS_NUM for spilling partition" - ); - - for (bucket, payload) in partition.payloads.into_iter().enumerate() { - if payload.len() == 0 { - continue; - } - - let data_block = payload.aggregate_flush_all()?.consume_convert_to_full(); - self.spiller.spill(bucket, data_block)?; - } - } - return Ok(vec![]); - } - } - - Ok(vec![data]) - } - - fn on_finish(&mut self, _output: bool) -> Result> { - // if partial stage spilled all data, no one AggregatePayload shows up, - // we need to finish spiller here. - self.finish() - } -} diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs index 65850f43e2af0..480d075562008 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs @@ -21,28 +21,24 @@ use databend_common_base::base::ProgressValues; use databend_common_base::runtime::profile::Profile; use databend_common_base::runtime::profile::ProfileStatisticsName; use databend_common_catalog::table_context::TableContext; -use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::arrow::serialize_column; use databend_common_expression::types::ArgType; use databend_common_expression::types::ArrayType; -use databend_common_expression::types::BinaryType; use databend_common_expression::types::Int64Type; use databend_common_expression::types::ReturnType; -use databend_common_expression::types::StringType; use databend_common_expression::types::UInt64Type; use databend_common_expression::BlockMetaInfoDowncast; use databend_common_expression::DataBlock; use databend_common_expression::FromData; use databend_common_expression::PartitionedPayload; -use databend_common_expression::MAX_AGGREGATE_HASHTABLE_BUCKETS_NUM; use databend_common_pipeline::core::InputPort; use databend_common_pipeline::core::OutputPort; use databend_common_pipeline::core::Processor; -use databend_common_pipeline_transforms::AccumulatingTransform; -use databend_common_pipeline_transforms::AccumulatingTransformer; +use databend_common_pipeline_transforms::processors::BlockMetaTransform; +use databend_common_pipeline_transforms::processors::BlockMetaTransformer; +use databend_common_pipeline_transforms::UnknownMode; use databend_common_settings::FlightCompression; -use databend_common_storages_parquet::serialize_row_group_meta_to_bytes; use futures_util::future::BoxFuture; use log::info; use opendal::Operator; @@ -52,13 +48,11 @@ use crate::pipelines::processors::transforms::aggregator::agg_spilling_aggregate use crate::pipelines::processors::transforms::aggregator::aggregate_exchange_injector::compute_block_number; use crate::pipelines::processors::transforms::aggregator::aggregate_meta::AggregateMeta; use crate::pipelines::processors::transforms::aggregator::exchange_defines; -use crate::pipelines::processors::transforms::aggregator::new_aggregate::NewAggregateSpiller; use crate::pipelines::processors::transforms::aggregator::AggregateSerdeMeta; use crate::pipelines::processors::transforms::aggregator::AggregatorParams; use crate::pipelines::processors::transforms::aggregator::FlightSerialized; use crate::pipelines::processors::transforms::aggregator::FlightSerializedMeta; use crate::pipelines::processors::transforms::aggregator::SerializeAggregateStream; -use crate::pipelines::processors::transforms::aggregator::SharedPartitionStream; use crate::servers::flight::v1::exchange::serde::serialize_block; use crate::servers::flight::v1::exchange::ExchangeShuffleMeta; use crate::sessions::QueryContext; @@ -66,20 +60,13 @@ use crate::spillers::Spiller; use crate::spillers::SpillerConfig; use crate::spillers::SpillerType; -enum SpillerVer { - Old(Arc), - New(Vec), -} - pub struct TransformExchangeAggregateSerializer { ctx: Arc, local_pos: usize, options: IpcWriteOptions, params: Arc, - spiller: SpillerVer, - - finished: bool, + spiller: Arc, } impl TransformExchangeAggregateSerializer { @@ -94,7 +81,6 @@ impl TransformExchangeAggregateSerializer { params: Arc, compression: Option, local_pos: usize, - partition_streams: Vec, ) -> Result> { let compression = match compression { None => None, @@ -103,251 +89,92 @@ impl TransformExchangeAggregateSerializer { FlightCompression::Zstd => Some(CompressionType::ZSTD), }, }; - - let spiller = if params.enable_experiment_aggregate { - let spillers = partition_streams - .into_iter() - .enumerate() - .map(|(pos, stream)| { - NewAggregateSpiller::try_create( - ctx.clone(), - MAX_AGGREGATE_HASHTABLE_BUCKETS_NUM as usize, - stream.clone(), - pos == local_pos, - ) - }) - .collect::>>()?; - SpillerVer::New(spillers) - } else { - let config = SpillerConfig { - spiller_type: SpillerType::Aggregation, - location_prefix, - disk_spill: None, - use_parquet: ctx.get_settings().get_spilling_file_format()?.is_parquet(), - }; - - let spiller = Spiller::create(ctx.clone(), operator, config.clone())?; - SpillerVer::Old(Arc::new(spiller)) + let config = SpillerConfig { + spiller_type: SpillerType::Aggregation, + location_prefix, + disk_spill: None, + use_parquet: ctx.get_settings().get_spilling_file_format()?.is_parquet(), }; - Ok(AccumulatingTransformer::create( + let spiller = Spiller::create(ctx.clone(), operator, config.clone())?; + Ok(BlockMetaTransformer::create( input, output, TransformExchangeAggregateSerializer { ctx, params, local_pos, - spiller, + spiller: spiller.into(), options: IpcWriteOptions::default() .try_with_compression(compression) .unwrap(), - finished: false, }, )) } +} - fn finish(&mut self) -> Result> { - if self.finished { - return Ok(vec![]); - } - self.finished = true; - - if let SpillerVer::New(spillers) = &mut self.spiller { - let mut serialized_blocks = vec![]; - let write_options = exchange_defines::spilled_write_options(); +impl BlockMetaTransform for TransformExchangeAggregateSerializer { + const UNKNOWN_MODE: UnknownMode = UnknownMode::Pass; + const NAME: &'static str = "TransformExchangeAggregateSerializer"; - for (index, spiller) in spillers.iter_mut().enumerate() { - if index == self.local_pos { - serialized_blocks.push(Self::finish_local_new_spiller(spiller)?); - } else { - serialized_blocks - .push(Self::finish_exchange_new_spiller(spiller, &write_options)?); - } + fn transform(&mut self, meta: ExchangeShuffleMeta) -> Result> { + let mut serialized_blocks = Vec::with_capacity(meta.blocks.len()); + for (index, mut block) in meta.blocks.into_iter().enumerate() { + if block.is_empty() && block.get_meta().is_none() { + serialized_blocks.push(FlightSerialized::DataBlock(block)); + continue; } - return Ok(serialized_blocks); - } - - Err(ErrorCode::Internal( - "Spiller version is not NewAggregateSpiller", - )) - } - - fn finish_local_new_spiller(spiller: &mut NewAggregateSpiller) -> Result { - let spilled_payloads = spiller.spill_finish()?; - let block = if spilled_payloads.is_empty() { - DataBlock::empty() - } else { - DataBlock::empty_with_meta(AggregateMeta::create_new_spilled(spilled_payloads)) - }; - Ok(FlightSerialized::DataBlock(block)) - } - - fn finish_exchange_new_spiller( - spiller: &mut NewAggregateSpiller, - write_options: &IpcWriteOptions, - ) -> Result { - let spilled_payloads = spiller.spill_finish()?; - if spilled_payloads.is_empty() { - return Ok(FlightSerialized::DataBlock(serialize_block( - -1, - DataBlock::empty(), - write_options, - )?)); - } - - let mut bucket_column = Vec::with_capacity(spilled_payloads.len()); - let mut row_group_column = Vec::with_capacity(spilled_payloads.len()); - let mut location_column = Vec::with_capacity(spilled_payloads.len()); - for payload in spilled_payloads { - bucket_column.push(payload.bucket as i64); - location_column.push(payload.location); - row_group_column.push(serialize_row_group_meta_to_bytes(&payload.row_group)?); - } - - let data_block = DataBlock::new_from_columns(vec![ - Int64Type::from_data(bucket_column), - StringType::from_data(location_column), - BinaryType::from_data(row_group_column), - ]); - let meta = AggregateSerdeMeta::create_new_spilled(); - let data_block = data_block.add_meta(Some(meta))?; - Ok(FlightSerialized::DataBlock(serialize_block( - -1, - data_block, - write_options, - )?)) - } -} + match block.take_meta().and_then(AggregateMeta::downcast_from) { + Some(AggregateMeta::AggregateSpilling(payload)) => { + serialized_blocks.push(FlightSerialized::Future( + match index == self.local_pos { + true => local_agg_spilling_aggregate_payload( + self.ctx.clone(), + self.spiller.clone(), + payload, + )?, + false => exchange_agg_spilling_aggregate_payload( + self.ctx.clone(), + self.spiller.clone(), + payload, + )?, + }, + )); + } -impl AccumulatingTransform for TransformExchangeAggregateSerializer { - const NAME: &'static str = "TransformExchangeAggregateSerializer"; + Some(AggregateMeta::AggregatePayload(p)) => { + let (bucket, max_partition_count) = (p.bucket, p.max_partition_count); - fn transform(&mut self, mut data: DataBlock) -> Result> { - if let Some(block_meta) = data.take_meta() { - if ExchangeShuffleMeta::downcast_ref_from(&block_meta).is_some() { - let meta = ExchangeShuffleMeta::downcast_from(block_meta).unwrap(); - let mut serialized_blocks = Vec::with_capacity(meta.blocks.len()); - let mut spilled_blocks = None; - for (index, mut block) in meta.blocks.into_iter().enumerate() { - if block.is_empty() && block.get_meta().is_none() { - serialized_blocks.push(FlightSerialized::DataBlock(block)); + if index == self.local_pos { + serialized_blocks.push(FlightSerialized::DataBlock( + block.add_meta(Some(Box::new(AggregateMeta::AggregatePayload(p))))?, + )); continue; } - match block.take_meta().and_then(AggregateMeta::downcast_from) { - Some(AggregateMeta::AggregateSpilling(partitioned_payload)) => { - match &mut self.spiller { - SpillerVer::Old(spiller) => { - serialized_blocks.push(FlightSerialized::Future( - match index == self.local_pos { - true => local_agg_spilling_aggregate_payload( - self.ctx.clone(), - spiller.clone(), - partitioned_payload, - )?, - false => exchange_agg_spilling_aggregate_payload( - self.ctx.clone(), - spiller.clone(), - partitioned_payload, - )?, - }, - )); - } - SpillerVer::New(spillers) => { - for (bucket, payload) in - partitioned_payload.payloads.into_iter().enumerate() - { - if payload.len() == 0 { - continue; - } - - let data_block = payload - .aggregate_flush_all()? - .consume_convert_to_full(); - spillers[index].spill(bucket, data_block)?; - } - let block = if index == self.local_pos { - DataBlock::empty() - } else { - serialize_block(-1, DataBlock::empty(), &self.options)? - }; - serialized_blocks.push(FlightSerialized::DataBlock(block)); - } - } - } - - Some(AggregateMeta::AggregatePayload(p)) => { - // As soon as a non-spilled AggregatePayload shows up we must flush any pending - // spill files. AggregatePayload shows that partial stage is over, no more spilling - // will happen. - if matches!(&self.spiller, SpillerVer::New(_)) { - let spilled = self.finish()?; - if !spilled.is_empty() { - spilled_blocks = Some(spilled); - } - } - - let (bucket, max_partition_count) = (p.bucket, p.max_partition_count); - - if index == self.local_pos { - serialized_blocks.push(FlightSerialized::DataBlock( - block.add_meta(Some(Box::new( - AggregateMeta::AggregatePayload(p), - )))?, - )); - continue; - } - - let block_number = compute_block_number(bucket, max_partition_count)?; - let stream = SerializeAggregateStream::create( - &self.params, - SerializePayload::AggregatePayload(p), - ); - let mut stream_blocks = - stream.into_iter().collect::>>()?; - debug_assert!(!stream_blocks.is_empty()); - let mut c = DataBlock::concat(&stream_blocks)?; - if let Some(meta) = stream_blocks[0].take_meta() { - c.replace_meta(meta); - } - let c = serialize_block(block_number, c, &self.options)?; - serialized_blocks.push(FlightSerialized::DataBlock(c)); - } - - _ => unreachable!(), - }; + let block_number = compute_block_number(bucket, max_partition_count)?; + let stream = SerializeAggregateStream::create( + &self.params, + SerializePayload::AggregatePayload(p), + ); + let mut stream_blocks = stream.into_iter().collect::>>()?; + debug_assert!(!stream_blocks.is_empty()); + let mut c = DataBlock::concat(&stream_blocks)?; + if let Some(meta) = stream_blocks[0].take_meta() { + c.replace_meta(meta); + } + let c = serialize_block(block_number, c, &self.options)?; + serialized_blocks.push(FlightSerialized::DataBlock(c)); } - return if let Some(spilled) = spilled_blocks { - Ok(vec![ - DataBlock::empty_with_meta(FlightSerializedMeta::create(spilled)), - DataBlock::empty_with_meta(FlightSerializedMeta::create(serialized_blocks)), - ]) - } else { - Ok(vec![DataBlock::empty_with_meta( - FlightSerializedMeta::create(serialized_blocks), - )]) - }; - } - data = data.add_meta(Some(block_meta))?; - } - Ok(vec![data]) - } - - fn on_finish(&mut self, _output: bool) -> Result> { - // if partial stage spilled all data, no one AggregatePayload shows up, - // we need to finish spiller here. - if let SpillerVer::New(_) = &self.spiller { - let serialized_blocks = self.finish()?; - if !serialized_blocks.is_empty() { - return Ok(vec![DataBlock::empty_with_meta( - FlightSerializedMeta::create(serialized_blocks), - )]); - } + _ => unreachable!(), + }; } - Ok(vec![]) + Ok(vec![DataBlock::empty_with_meta( + FlightSerializedMeta::create(serialized_blocks), + )]) } } From cc31022a36f90a5f9b5ae7d8f3ce5ef0dbe7f12c Mon Sep 17 00:00:00 2001 From: Liuqing Yue Date: Thu, 20 Nov 2025 21:03:42 +0800 Subject: [PATCH 04/12] chore: catch up main --- .../aggregator/new_aggregate/new_aggregate_spiller.rs | 1 - .../new_aggregate/new_transform_aggregate_partial.rs | 7 ++++--- .../new_aggregate/new_transform_aggregate_spill_writer.rs | 0 3 files changed, 4 insertions(+), 4 deletions(-) delete mode 100644 src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_aggregate_spill_writer.rs diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs index 3fa2df90e9702..aa792f6a3d77a 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs @@ -30,7 +30,6 @@ use databend_common_pipeline_transforms::traits::Location; use databend_common_pipeline_transforms::MemorySettings; use databend_common_storage::DataOperator; use databend_common_storages_parquet::ReadSettings; -use log::debug; use log::info; use parking_lot::Mutex; use parquet::file::metadata::RowGroupMetaData; diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_aggregate_partial.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_aggregate_partial.rs index 440c632750d8c..01b02a085323f 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_aggregate_partial.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_aggregate_partial.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt::Debug; use std::sync::Arc; use std::time::Instant; use std::vec; @@ -42,7 +41,6 @@ use databend_common_pipeline_transforms::processors::AccumulatingTransform; use databend_common_pipeline_transforms::processors::AccumulatingTransformer; use databend_common_pipeline_transforms::MemorySettings; use databend_common_storages_parquet::serialize_row_group_meta_to_bytes; -use log::info; use crate::pipelines::memory_settings::MemorySettingsExt; use crate::pipelines::processors::transforms::aggregator::aggregate_exchange_injector::scatter_partitioned_payload; @@ -133,17 +131,20 @@ impl Spiller { ctx.clone(), MAX_AGGREGATE_HASHTABLE_BUCKETS_NUM as usize, partition_streams[0].clone(), + true, )?; Ok(Spiller::Standalone(spiller)) } _ => { let spillers = partition_streams .into_iter() - .map(|stream| { + .enumerate() + .map(|(pos, stream)| { NewAggregateSpiller::try_create( ctx.clone(), MAX_AGGREGATE_HASHTABLE_BUCKETS_NUM as usize, stream, + if pos == local_pos { true } else { false }, ) }) .collect::>>()?; diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_aggregate_spill_writer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_aggregate_spill_writer.rs deleted file mode 100644 index e69de29bb2d1d..0000000000000 From 7e69d049125a02f7ddef95b116f57a750e95fd6e Mon Sep 17 00:00:00 2001 From: Liuqing Yue Date: Thu, 20 Nov 2025 21:39:11 +0800 Subject: [PATCH 05/12] chore: clean up --- .../aggregator/new_aggregate/new_transform_aggregate_partial.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_aggregate_partial.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_aggregate_partial.rs index 01b02a085323f..13c23f38ae23d 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_aggregate_partial.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_aggregate_partial.rs @@ -144,7 +144,7 @@ impl Spiller { ctx.clone(), MAX_AGGREGATE_HASHTABLE_BUCKETS_NUM as usize, stream, - if pos == local_pos { true } else { false }, + pos == local_pos, ) }) .collect::>>()?; From ccfc8d0ef8f96f0d94c74b37e82100c59f1f06d4 Mon Sep 17 00:00:00 2001 From: dqhl76 Date: Fri, 21 Nov 2025 09:12:34 +0800 Subject: [PATCH 06/12] enable enable_experiment_aggregate for test --- src/query/settings/src/settings_default.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index 4a341ba6e2597..435e62e40e0ea 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -1508,7 +1508,7 @@ impl DefaultSettings { range: Some(SettingRange::String(vec![S3StorageClass::Standard.to_string(), S3StorageClass::IntelligentTiering.to_string()])), }), ("enable_experiment_aggregate", DefaultSettingValue { - value: UserSettingValue::UInt64(0), + value: UserSettingValue::UInt64(1), desc: "Enable experiment aggregate, default is 0, 1 for enable", mode: SettingMode::Both, scope: SettingScope::Both, From 9310a9cda91abc4ee14b5288cac7d9bb620d2a5e Mon Sep 17 00:00:00 2001 From: dqhl76 Date: Fri, 21 Nov 2025 11:28:19 +0800 Subject: [PATCH 07/12] chore: make writers lazy init --- .../new_aggregate/new_aggregate_spiller.rs | 83 +++++++++++++++---- 1 file changed, 66 insertions(+), 17 deletions(-) diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs index aa792f6a3d77a..c5e80f3f3408f 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs @@ -46,8 +46,6 @@ use crate::spillers::SpillsDataWriter; struct PayloadWriter { path: String, - // TODO: this may change to lazy init, for now it will create 128*thread_num files at most even - // if the writer not used to write. writer: SpillsDataWriter, } @@ -109,7 +107,7 @@ impl WriteStats { struct AggregatePayloadWriters { spill_prefix: String, partition_count: usize, - writers: Vec, + writers: Vec>, write_stats: WriteStats, ctx: Arc, is_local: bool, @@ -125,22 +123,25 @@ impl AggregatePayloadWriters { AggregatePayloadWriters { spill_prefix: prefix.to_string(), partition_count, - writers: vec![], + writers: Self::empty_writers(partition_count), write_stats: WriteStats::default(), ctx, is_local, } } - fn ensure_writers(&mut self) -> Result<()> { - if self.writers.is_empty() { - let mut writers = Vec::with_capacity(self.partition_count); - for _ in 0..self.partition_count { - writers.push(PayloadWriter::try_create(&self.spill_prefix)?); - } - self.writers = writers; + fn empty_writers(partition_count: usize) -> Vec> { + std::iter::repeat_with(|| None) + .take(partition_count) + .collect::>() + } + + fn ensure_writer(&mut self, bucket: usize) -> Result<&mut PayloadWriter> { + if self.writers[bucket].is_none() { + self.writers[bucket] = Some(PayloadWriter::try_create(&self.spill_prefix)?); } - Ok(()) + + Ok(self.writers[bucket].as_mut().unwrap()) } pub fn write_ready_blocks(&mut self, ready_blocks: Vec<(usize, DataBlock)>) -> Result<()> { @@ -148,15 +149,14 @@ impl AggregatePayloadWriters { return Ok(()); } - self.ensure_writers()?; - for (bucket, block) in ready_blocks { if block.is_empty() { continue; } let start = Instant::now(); - self.writers[bucket].write_block(block)?; + let writer = self.ensure_writer(bucket)?; + writer.write_block(block)?; let elapsed = start.elapsed(); self.write_stats.accumulate(elapsed); @@ -166,13 +166,18 @@ impl AggregatePayloadWriters { } pub fn finalize(&mut self) -> Result> { - let writers = mem::take(&mut self.writers); - if writers.is_empty() { + let writers = mem::replace(&mut self.writers, Self::empty_writers(self.partition_count)); + + if writers.iter().all(|writer| writer.is_none()) { return Ok(Vec::new()); } let mut spilled_payloads = Vec::new(); for (partition_id, writer) in writers.into_iter().enumerate() { + let Some(writer) = writer else { + continue; + }; + let (path, written_size, row_groups) = writer.close()?; if written_size != 0 { @@ -404,3 +409,47 @@ fn flush_write_profile(ctx: &Arc, stats: WriteStats) { ctx.get_aggregate_spill_progress().incr(&progress_val); } } + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + + use databend_common_base::base::tokio; + use databend_common_exception::Result; + use databend_common_expression::types::Int32Type; + use databend_common_expression::DataBlock; + use databend_common_expression::FromData; + + use crate::pipelines::processors::transforms::aggregator::new_aggregate::SharedPartitionStream; + use crate::pipelines::processors::transforms::aggregator::NewAggregateSpiller; + use crate::test_kits::TestFixture; + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn test_aggregate_payload_writers_lazy_init() -> Result<()> { + let fixture = TestFixture::setup().await?; + let ctx = fixture.new_query_ctx().await?; + + let partition_count = 4; + let partition_stream = SharedPartitionStream::new(1, 1024, 1024 * 1024, partition_count); + let mut spiller = + NewAggregateSpiller::try_create(ctx.clone(), partition_count, partition_stream, true)?; + + let block = DataBlock::new_from_columns(vec![Int32Type::from_data(vec![1i32, 2, 3])]); + + spiller.spill(0, block.clone())?; + spiller.spill(2, block)?; + + let payloads = spiller.spill_finish()?; + + assert_eq!(payloads.len(), 2); + + let spilled_files = ctx.get_spilled_files(); + assert_eq!(spilled_files.len(), 2); + + let buckets: HashSet<_> = payloads.iter().map(|p| p.bucket).collect(); + assert!(buckets.contains(&0)); + assert!(buckets.contains(&2)); + + Ok(()) + } +} From 3a03dcdc12aa5077de76cc6d426e38fd450ecc5e Mon Sep 17 00:00:00 2001 From: dqhl76 Date: Fri, 21 Nov 2025 16:07:13 +0800 Subject: [PATCH 08/12] make max_aggregate_spill_level to 1 --- src/query/settings/src/settings_default.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index 435e62e40e0ea..8be2bcc4efc47 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -1515,7 +1515,7 @@ impl DefaultSettings { range: Some(SettingRange::Numeric(0..=1)), }), ("max_aggregate_spill_level", DefaultSettingValue { - value: UserSettingValue::UInt64(0), + value: UserSettingValue::UInt64(1), desc: "Maximum recursion depth for the aggregate spill. Each recursion level repartition data into `num_cpu` smaller parts to ensure it fits in memory.", mode: SettingMode::Both, scope: SettingScope::Both, From 807a9dabd90b4f314a6279a9d02fc9776510212b Mon Sep 17 00:00:00 2001 From: Liuqing Yue Date: Sun, 23 Nov 2025 21:11:54 +0800 Subject: [PATCH 09/12] fix: final aggregate must call spill finish in every round --- .../new_aggregate/new_aggregate_spiller.rs | 3 +- .../new_final_aggregate_state.rs | 7 ++ .../new_transform_final_aggregate.rs | 76 +++++++++---------- 3 files changed, 45 insertions(+), 41 deletions(-) diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs index c5e80f3f3408f..468273a8d8bbf 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs @@ -30,6 +30,7 @@ use databend_common_pipeline_transforms::traits::Location; use databend_common_pipeline_transforms::MemorySettings; use databend_common_storage::DataOperator; use databend_common_storages_parquet::ReadSettings; +use log::debug; use log::info; use parking_lot::Mutex; use parquet::file::metadata::RowGroupMetaData; @@ -332,7 +333,7 @@ impl NewAggregateSpiller { self.payload_writers.write_ready_blocks(pending_blocks)?; let payloads = self.payload_writers.finalize()?; - info!( + debug!( "[NewAggregateSpiller] spill finish with {} payloads", payloads.len() ); diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_final_aggregate_state.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_final_aggregate_state.rs index b481f91ce7b3d..6c666672cf82e 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_final_aggregate_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_final_aggregate_state.rs @@ -53,6 +53,7 @@ impl RepartitionedQueues { pub enum RoundPhase { Idle, + NoTask, NewTask(AggregateMeta), OutputReady(DataBlock), Aggregate, @@ -63,6 +64,7 @@ impl Display for RoundPhase { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { RoundPhase::Idle => write!(f, "Idle"), + RoundPhase::NoTask => write!(f, "NoTask"), RoundPhase::NewTask(_) => write!(f, "NewTask"), RoundPhase::OutputReady(_) => write!(f, "OutputReady"), RoundPhase::AsyncWait => write!(f, "AsyncWait"), @@ -118,6 +120,11 @@ impl LocalRoundState { Event::Async } + pub fn schedule_not_get_task(&mut self) -> Event { + self.phase = RoundPhase::NoTask; + Event::Sync + } + pub fn enqueue_partitioned_meta(&mut self, datablock: &mut DataBlock) -> Result<()> { if let Some(block_meta) = datablock.take_meta().and_then(AggregateMeta::downcast_from) { match block_meta { diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_final_aggregate.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_final_aggregate.rs index 556c799eaf72f..c474be66df3b4 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_final_aggregate.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_final_aggregate.rs @@ -161,55 +161,41 @@ impl NewFinalAggregateTransform { new_produced.push_to_queue(partition_id, meta); } - // if spill already triggered, local repartition queue is all spilled out - // we only need to spill the new produced repartitioned queues out if self.round_state.is_spilled { - // when no more task, we need to finalize the partition stream - if self.round_state.working_queue.is_empty() { - self.spill(new_produced, true)?; - } else { - self.spill(new_produced, false)?; - } + self.spill(new_produced)?; return Ok(()); } - // merge new produced repartitioned queues into local repartitioned queues self.repartitioned_queues.merge_queues(new_produced); - // if the queue is triggered spill and repartition too many times, considering performance affect, we may not - // continue to trigger spill - let can_trigger_spill = - self.round_state.current_queue_spill_round < self.round_state.max_aggregate_spill_level; - let need_spill = self.spiller.memory_settings.check_spill(); - - if !can_trigger_spill { - if need_spill { - debug!( - "NewFinalAggregateTransform[{}] skip spill after {} rounds", - self.id, self.round_state.current_queue_spill_round - ); - } + // other processor has spilled in this round, we need to spill too + if self.shared_state.lock().is_spilled { + let queues = self.repartitioned_queues.take_queues(); + self.spill(queues)?; + self.round_state.is_spilled = true; return Ok(()); } - if need_spill { + // we issue spill based on memory usage + let need_spill = self.spiller.memory_settings.check_spill(); + let can_trigger_spill = + self.round_state.current_queue_spill_round < self.round_state.max_aggregate_spill_level; + + if need_spill && can_trigger_spill { debug!( - "NewFinalAggregateTransform[{}] trigger spill due to memory limit, spilled round {}", - self.id, self.round_state.current_queue_spill_round + "[FinalAggregateTransform-{}] detected memory pressure", + self.id ); - self.shared_state.lock().is_spilled = true; - } - - // if other processor or itself trigger spill, this processor will need spill its local repartitioned queue out - if self.shared_state.lock().is_spilled - && !self.round_state.is_spilled - && !self.round_state.working_queue.is_empty() - { self.round_state.is_spilled = true; + self.shared_state.lock().is_spilled = true; let queues = self.repartitioned_queues.take_queues(); - self.spill(queues, false)?; + self.spill(queues)?; + + return Ok(()); } + self.try_finish_spill_round()?; + Ok(()) } @@ -308,7 +294,7 @@ impl NewFinalAggregateTransform { Ok(()) } - pub fn spill(&mut self, mut queues: RepartitionedQueues, finalize: bool) -> Result<()> { + pub fn spill(&mut self, mut queues: RepartitionedQueues) -> Result<()> { for (id, queue) in queues.0.iter_mut().enumerate() { while let Some(meta) = queue.pop() { match meta { @@ -329,8 +315,14 @@ impl NewFinalAggregateTransform { } } } + self.try_finish_spill_round()?; + + Ok(()) + } - if finalize { + /// this need to be called because the shared partition stream depends on it + pub fn try_finish_spill_round(&mut self) -> Result<()> { + if self.round_state.working_queue.is_empty() { let spilled_payloads = self.spiller.spill_finish()?; for payload in spilled_payloads { self.repartitioned_queues.push_to_queue( @@ -393,11 +385,11 @@ impl Processor for NewFinalAggregateTransform { round_state.enqueue_partitioned_meta(&mut datablock)?; - // schedule next task from working queue, if empty, begin to wait other processors + // schedule next task from working queue if let Some(event) = round_state.schedule_next_task() { return Ok(event); } else { - return Ok(round_state.schedule_async_wait()); + return Ok(round_state.schedule_not_get_task()); } } @@ -410,11 +402,11 @@ impl Processor for NewFinalAggregateTransform { let mut data_block = self.input.pull_data().unwrap()?; round_state.enqueue_partitioned_meta(&mut data_block)?; - // schedule next task from working queue, if empty, begin to wait other processors + // schedule next task from working queue if let Some(event) = round_state.schedule_next_task() { return Ok(event); } else { - return Ok(round_state.schedule_async_wait()); + return Ok(round_state.schedule_not_get_task()); } } @@ -441,6 +433,10 @@ impl Processor for NewFinalAggregateTransform { Ok(()) } + RoundPhase::NoTask => { + self.try_finish_spill_round()?; + Ok(()) + } RoundPhase::Aggregate => { let queue = self .shared_state From 9d3c3996da03cdd19ee67cb921b23541909d2762 Mon Sep 17 00:00:00 2001 From: dqhl76 Date: Mon, 24 Nov 2025 15:35:44 +0800 Subject: [PATCH 10/12] fix: final aggregate must call spill finish in every round --- .../transforms/aggregator/aggregate_meta.rs | 18 ++++++++-- .../new_aggregate/datablock_splitter.rs | 14 ++++++-- .../new_aggregate/new_aggregate_spiller.rs | 33 ++++++++++++++++--- .../new_final_aggregate_state.rs | 23 ++++++------- .../new_transform_final_aggregate.rs | 23 +++++++------ .../serde/transform_spill_reader.rs | 6 ++-- .../aggregator/transform_aggregate_final.rs | 2 +- .../aggregator/transform_partition_bucket.rs | 2 +- 8 files changed, 85 insertions(+), 36 deletions(-) diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_meta.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_meta.rs index d872b2a259089..bd6f265c0a5da 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_meta.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_meta.rs @@ -137,7 +137,11 @@ pub enum AggregateMeta { BucketSpilled(BucketSpilledPayload), Spilled(Vec), - Partitioned { bucket: isize, data: Vec }, + Partitioned { + bucket: isize, + data: Vec, + activate_worker: Option, + }, NewBucketSpilled(NewSpilledPayload), NewSpilled(Vec), @@ -180,8 +184,16 @@ impl AggregateMeta { Box::new(AggregateMeta::BucketSpilled(payload)) } - pub fn create_partitioned(bucket: isize, data: Vec) -> BlockMetaInfoPtr { - Box::new(AggregateMeta::Partitioned { data, bucket }) + pub fn create_partitioned( + bucket: isize, + data: Vec, + activate_worker: Option, + ) -> BlockMetaInfoPtr { + Box::new(AggregateMeta::Partitioned { + data, + bucket, + activate_worker, + }) } pub fn create_new_bucket_spilled(payload: NewSpilledPayload) -> BlockMetaInfoPtr { diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/datablock_splitter.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/datablock_splitter.rs index 6603ad1ea8c02..d3ce0a36c5e46 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/datablock_splitter.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/datablock_splitter.rs @@ -30,8 +30,9 @@ pub fn split_partitioned_meta_into_datablocks( let base_chunk_size = total_len / outputs_len; let remainder = total_len % outputs_len; - let mut result = Vec::with_capacity(outputs_len); let mut data_iter = data.into_iter(); + let mut chunks = Vec::with_capacity(outputs_len); + let mut activated_workers = 0; for index in 0..outputs_len { let chunk_size = if index < remainder { @@ -41,8 +42,17 @@ pub fn split_partitioned_meta_into_datablocks( }; let chunk: Vec = data_iter.by_ref().take(chunk_size).collect(); + if !chunk.is_empty() { + activated_workers += 1; + } + chunks.push(chunk); + } + + let activate_worker = Some(activated_workers); + let mut result = Vec::with_capacity(outputs_len); + for chunk in chunks { result.push(DataBlock::empty_with_meta( - AggregateMeta::create_partitioned(bucket, chunk), + AggregateMeta::create_partitioned(bucket, chunk, activate_worker), )); } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs index 468273a8d8bbf..6b7d63e7d90da 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs @@ -227,15 +227,15 @@ impl AggregatePayloadWriters { struct SharedPartitionStreamInner { partition_stream: BlockPartitionStream, worker_count: usize, - working_count: usize, + finish_count: usize, } impl SharedPartitionStreamInner { pub fn finish(&mut self) -> Vec<(usize, DataBlock)> { - self.working_count -= 1; + self.finish_count += 1; - if self.working_count == 0 { - self.working_count = self.worker_count; + if self.finish_count == self.worker_count { + self.finish_count = 0; let ids = self.partition_stream.partition_ids(); @@ -255,6 +255,10 @@ impl SharedPartitionStreamInner { let indices = vec![partition_id; block.num_rows()]; self.partition_stream.partition(indices, block, true) } + + pub fn update_worker_count(&mut self, worker_count: usize) { + self.worker_count = worker_count; + } } #[derive(Clone)] @@ -274,7 +278,7 @@ impl SharedPartitionStream { inner: Arc::new(Mutex::new(SharedPartitionStreamInner { partition_stream, worker_count, - working_count: worker_count, + finish_count: 0, })), } } @@ -288,6 +292,11 @@ impl SharedPartitionStream { let mut inner = self.inner.lock(); inner.partition(partition_id as u64, block) } + + pub fn update_worker_count(&self, worker_count: usize) { + let mut inner = self.inner.lock(); + inner.update_worker_count(worker_count); + } } pub struct NewAggregateSpiller { @@ -375,6 +384,20 @@ impl NewAggregateSpiller { Err(ErrorCode::Internal("read empty block from final aggregate")) } } + + pub fn update_activate_worker(&self, activate_worker: usize) { + self.partition_stream.update_worker_count(activate_worker); + } + + #[cfg(debug_assertions)] + pub fn is_stream_partition_clean(&self) -> bool { + self.partition_stream + .inner + .lock() + .partition_stream + .partition_ids() + .is_empty() + } } fn flush_read_profile(elapsed: &Duration, read_bytes: usize) { diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_final_aggregate_state.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_final_aggregate_state.rs index 6c666672cf82e..955b2516ac463 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_final_aggregate_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_final_aggregate_state.rs @@ -53,7 +53,6 @@ impl RepartitionedQueues { pub enum RoundPhase { Idle, - NoTask, NewTask(AggregateMeta), OutputReady(DataBlock), Aggregate, @@ -64,7 +63,6 @@ impl Display for RoundPhase { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { RoundPhase::Idle => write!(f, "Idle"), - RoundPhase::NoTask => write!(f, "NoTask"), RoundPhase::NewTask(_) => write!(f, "NewTask"), RoundPhase::OutputReady(_) => write!(f, "OutputReady"), RoundPhase::AsyncWait => write!(f, "AsyncWait"), @@ -120,16 +118,16 @@ impl LocalRoundState { Event::Async } - pub fn schedule_not_get_task(&mut self) -> Event { - self.phase = RoundPhase::NoTask; - Event::Sync - } - - pub fn enqueue_partitioned_meta(&mut self, datablock: &mut DataBlock) -> Result<()> { + pub fn enqueue_partitioned_meta(&mut self, datablock: &mut DataBlock) -> Result> { if let Some(block_meta) = datablock.take_meta().and_then(AggregateMeta::downcast_from) { match block_meta { - AggregateMeta::Partitioned { data, .. } => { + AggregateMeta::Partitioned { + data, + activate_worker, + .. + } => { self.working_queue.extend(data); + return Ok(activate_worker); } _ => { return Err(ErrorCode::Internal( @@ -138,7 +136,7 @@ impl LocalRoundState { } } } - Ok(()) + Ok(None) } } @@ -183,7 +181,7 @@ impl FinalAggregateSharedState { } } - pub fn add_repartitioned_queue(&mut self, queues: RepartitionedQueues) { + pub fn add_repartitioned_queue(&mut self, queues: RepartitionedQueues) -> bool { self.repartitioned_queues.merge_queues(queues); self.finished_count += 1; @@ -219,7 +217,10 @@ impl FinalAggregateSharedState { split_partitioned_meta_into_datablocks(0, queue.data, self.partition_count); } } + // if it is the last one + return true; } + false } pub fn get_next_datablock(&mut self) -> Option<(DataBlock, usize)> { diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_final_aggregate.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_final_aggregate.rs index c474be66df3b4..9bcc29697d110 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_final_aggregate.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_final_aggregate.rs @@ -383,13 +383,15 @@ impl Processor for NewFinalAggregateTransform { // begin a new round, reset spilled flag and reported flag round_state.reset_for_new_round(spill_round); - round_state.enqueue_partitioned_meta(&mut datablock)?; + if let Some(activate_worker) = round_state.enqueue_partitioned_meta(&mut datablock)? { + self.spiller.update_activate_worker(activate_worker); + } // schedule next task from working queue if let Some(event) = round_state.schedule_next_task() { return Ok(event); } else { - return Ok(round_state.schedule_not_get_task()); + return Ok(round_state.schedule_async_wait()); } } @@ -400,13 +402,14 @@ impl Processor for NewFinalAggregateTransform { round_state.first_data_ready = true; let mut data_block = self.input.pull_data().unwrap()?; - round_state.enqueue_partitioned_meta(&mut data_block)?; - + if let Some(activate_worker) = round_state.enqueue_partitioned_meta(&mut data_block)? { + self.spiller.update_activate_worker(activate_worker); + } // schedule next task from working queue if let Some(event) = round_state.schedule_next_task() { return Ok(event); } else { - return Ok(round_state.schedule_not_get_task()); + return Ok(round_state.schedule_async_wait()); } } @@ -433,10 +436,6 @@ impl Processor for NewFinalAggregateTransform { Ok(()) } - RoundPhase::NoTask => { - self.try_finish_spill_round()?; - Ok(()) - } RoundPhase::Aggregate => { let queue = self .shared_state @@ -458,7 +457,11 @@ impl Processor for NewFinalAggregateTransform { RoundPhase::AsyncWait => { // report local repartitioned queues to shared state let queues = self.repartitioned_queues.take_queues(); - self.shared_state.lock().add_repartitioned_queue(queues); + if self.shared_state.lock().add_repartitioned_queue(queues) { + // if it is the last one called, we add a checkpoint to ensure + // the spiller finished in every round + debug_assert!(self.spiller.is_stream_partition_clean()) + } self.barrier.wait().await; diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_spill_reader.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_spill_reader.rs index 38094460363ef..55b97ce052cea 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_spill_reader.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_spill_reader.rs @@ -146,7 +146,7 @@ impl Processor for TransformSpillReader { self.deserialized_meta = Some(Box::new(Self::deserialize(payload, data))); } - AggregateMeta::Partitioned { bucket, data } => { + AggregateMeta::Partitioned { bucket, data, .. } => { let mut new_data = Vec::with_capacity(data.len()); for meta in data { @@ -163,7 +163,7 @@ impl Processor for TransformSpillReader { } self.deserialized_meta = - Some(AggregateMeta::create_partitioned(bucket, new_data)); + Some(AggregateMeta::create_partitioned(bucket, new_data, None)); } AggregateMeta::NewBucketSpilled(_) => unreachable!(), AggregateMeta::NewSpilled(_) => unreachable!(), @@ -220,7 +220,7 @@ impl Processor for TransformSpillReader { self.deserializing_meta = Some((block_meta, VecDeque::from(vec![data]))); } - AggregateMeta::Partitioned { data, bucket } => { + AggregateMeta::Partitioned { data, bucket, .. } => { let bucket = *bucket; let total_task = data .iter() diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_final.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_final.rs index a6f44ca009636..0f259f9de6953 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_final.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_final.rs @@ -53,7 +53,7 @@ impl TransformFinalAggregate { fn transform_agg_hashtable(&mut self, meta: AggregateMeta) -> Result { let mut agg_hashtable: Option = None; - if let AggregateMeta::Partitioned { bucket, data } = meta { + if let AggregateMeta::Partitioned { bucket, data, .. } = meta { for bucket_data in data { match bucket_data { AggregateMeta::Serialized(payload) => match agg_hashtable.as_mut() { diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_partition_bucket.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_partition_bucket.rs index ed949eb4026e9..86bc657657a48 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_partition_bucket.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_partition_bucket.rs @@ -436,7 +436,7 @@ impl TransformPartitionBucket { } } } - DataBlock::empty_with_meta(AggregateMeta::create_partitioned(bucket, data)) + DataBlock::empty_with_meta(AggregateMeta::create_partitioned(bucket, data, None)) } } From d16a69b1b98df261f691ee81641d6ec6feef3d98 Mon Sep 17 00:00:00 2001 From: dqhl76 Date: Mon, 24 Nov 2025 16:49:36 +0800 Subject: [PATCH 11/12] fix: final aggregate must call spill finish in every round --- .../aggregator/new_aggregate/new_transform_final_aggregate.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_final_aggregate.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_final_aggregate.rs index 9bcc29697d110..08ef6bed62b8d 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_final_aggregate.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_final_aggregate.rs @@ -460,7 +460,8 @@ impl Processor for NewFinalAggregateTransform { if self.shared_state.lock().add_repartitioned_queue(queues) { // if it is the last one called, we add a checkpoint to ensure // the spiller finished in every round - debug_assert!(self.spiller.is_stream_partition_clean()) + #[cfg(debug_assertions)] + debug_assert!(self.spiller.is_stream_partition_clean()); } self.barrier.wait().await; From d03f6f2e66a82c9d3d660082117668646fb049e0 Mon Sep 17 00:00:00 2001 From: dqhl76 Date: Tue, 25 Nov 2025 09:03:14 +0800 Subject: [PATCH 12/12] set enable_experiment_aggregate = 0 --- src/query/settings/src/settings_default.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index 8be2bcc4efc47..795c82339870d 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -1508,7 +1508,7 @@ impl DefaultSettings { range: Some(SettingRange::String(vec![S3StorageClass::Standard.to_string(), S3StorageClass::IntelligentTiering.to_string()])), }), ("enable_experiment_aggregate", DefaultSettingValue { - value: UserSettingValue::UInt64(1), + value: UserSettingValue::UInt64(0), desc: "Enable experiment aggregate, default is 0, 1 for enable", mode: SettingMode::Both, scope: SettingScope::Both,