From f948ff5dd5771b713ac97f18acd2c27d3cc1e1dc Mon Sep 17 00:00:00 2001 From: dqhl76 Date: Mon, 17 Nov 2025 17:15:31 +0800 Subject: [PATCH 1/5] chore: add more logs to cover aggregate spill --- .../aggregator/build_partition_bucket.rs | 1 + .../new_aggregate/new_aggregate_spiller.rs | 47 ++++++++++++--- .../new_transform_aggregate_spill_writer.rs | 2 +- .../serde/transform_aggregate_spill_writer.rs | 10 ++-- ...transform_exchange_aggregate_serializer.rs | 14 +++-- .../serde/transform_spill_reader.rs | 59 +++++++++++++++---- .../service/src/spillers/async_buffer.rs | 20 ++++++- 7 files changed, 122 insertions(+), 31 deletions(-) diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/build_partition_bucket.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/build_partition_bucket.rs index fc6ef4c2191fe..0e649e8d37f3e 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/build_partition_bucket.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/build_partition_bucket.rs @@ -84,6 +84,7 @@ fn build_partition_bucket_experimental( ctx.clone(), output_num, shared_partition_stream.clone(), + true, )?; let input_port = InputPort::create(); let output_port = OutputPort::create(); diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs index 0a6f9dba1a1d7..5b5643f37e9c3 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs @@ -31,6 +31,7 @@ use databend_common_pipeline_transforms::MemorySettings; use databend_common_storage::DataOperator; use databend_common_storages_parquet::ReadSettings; use log::debug; +use log::info; use parking_lot::Mutex; use parquet::file::metadata::RowGroupMetaData; @@ -46,6 +47,8 @@ use crate::spillers::SpillsDataWriter; struct PayloadWriter { path: String, + // TODO: this may change to lazy init, for now it will create 128*thread_num files at most even + // if the writer not used to write. writer: SpillsDataWriter, } @@ -110,16 +113,23 @@ struct AggregatePayloadWriters { writers: Vec, write_stats: WriteStats, ctx: Arc, + is_local: bool, } impl AggregatePayloadWriters { - pub fn create(prefix: &str, partition_count: usize, ctx: Arc) -> Self { + pub fn create( + prefix: &str, + partition_count: usize, + ctx: Arc, + is_local: bool, + ) -> Self { AggregatePayloadWriters { spill_prefix: prefix.to_string(), partition_count, writers: vec![], write_stats: WriteStats::default(), ctx, + is_local, } } @@ -166,6 +176,18 @@ impl AggregatePayloadWriters { for (partition_id, writer) in writers.into_iter().enumerate() { let (path, written_size, row_groups) = writer.close()?; + if written_size != 0 { + info!( + "Write aggregate spill finished({}): (bucket: {}, location: {}, bytes: {}, rows: {}, batch_count: {})", + self.is_local.then(|| "local").unwrap_or("exchange"), + partition_id, + path, + written_size, + row_groups.iter().map(|rg| rg.num_rows()).sum::(), + row_groups.len() + ); + } + self.ctx.add_spill_file( Location::Remote(path.clone()), Layout::Aggregate, @@ -276,13 +298,15 @@ impl NewAggregateSpiller { ctx: Arc, partition_count: usize, partition_stream: SharedPartitionStream, + is_local: bool, ) -> Result { let memory_settings = MemorySettings::from_aggregate_settings(&ctx)?; let table_ctx: Arc = ctx.clone(); let read_setting = ReadSettings::from_settings(&table_ctx.get_settings())?; let spill_prefix = ctx.query_id_spill_prefix(); - let payload_writers = AggregatePayloadWriters::create(&spill_prefix, partition_count, ctx); + let payload_writers = + AggregatePayloadWriters::create(&spill_prefix, partition_count, ctx, is_local); Ok(Self { memory_settings, @@ -320,12 +344,21 @@ impl NewAggregateSpiller { let operator = DataOperator::instance().spill_operator(); let buffer_pool = SpillsBufferPool::instance(); - let mut reader = buffer_pool.reader(operator.clone(), location, vec![row_group.clone()])?; - let read_bytes = row_group.total_byte_size() as usize; + let mut reader = + buffer_pool.reader(operator.clone(), location.clone(), vec![row_group.clone()])?; + let instant = Instant::now(); let data_block = reader.read(self.read_setting)?; - flush_read_profile(&instant, read_bytes); + let elapsed = instant.elapsed(); + + let read_size = reader.read_bytes(); + flush_read_profile(&elapsed, read_size); + + info!( + "Read aggregate spill finished: (bucket: {}, location: {}, bytes: {}, rows: {}, elapsed: {:?})", + bucket, location, read_size, row_group.num_rows(), elapsed + ); if let Some(block) = data_block { Ok(AggregateMeta::Serialized(SerializedPayload { @@ -339,12 +372,12 @@ impl NewAggregateSpiller { } } -fn flush_read_profile(instant: &Instant, read_bytes: usize) { +fn flush_read_profile(elapsed: &Duration, read_bytes: usize) { Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillReadCount, 1); Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillReadBytes, read_bytes); Profile::record_usize_profile( ProfileStatisticsName::RemoteSpillReadTime, - instant.elapsed().as_millis() as usize, + elapsed.as_millis() as usize, ); } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_aggregate_spill_writer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_aggregate_spill_writer.rs index 089e5c06db87f..bcc57679065f2 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_aggregate_spill_writer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_aggregate_spill_writer.rs @@ -43,7 +43,7 @@ impl NewTransformAggregateSpillWriter { ) -> Result> { let partition_count = MAX_AGGREGATE_HASHTABLE_BUCKETS_NUM as usize; let spiller = - NewAggregateSpiller::try_create(ctx, partition_count, shared_partition_stream)?; + NewAggregateSpiller::try_create(ctx, partition_count, shared_partition_stream, true)?; Ok(AccumulatingTransformer::create( input, diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_spill_writer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_spill_writer.rs index c66dd8b4cb0b0..090ffbf54de97 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_spill_writer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_spill_writer.rs @@ -192,6 +192,7 @@ pub fn agg_spilling_aggregate_payload( let mut spilled_buckets_payloads = Vec::with_capacity(partition_count); // Record how many rows are spilled. let mut rows = 0; + let mut buckets_count = 0; let location = spiller.create_unique_location(); for (bucket, payload) in partitioned_payload.payloads.into_iter().enumerate() { if payload.len() == 0 { @@ -200,6 +201,7 @@ pub fn agg_spilling_aggregate_payload( let data_block = payload.aggregate_flush_all()?.consume_convert_to_full(); rows += data_block.num_rows(); + buckets_count += 1; let begin = write_size; let mut columns_data = Vec::with_capacity(data_block.num_columns()); @@ -227,6 +229,7 @@ pub fn agg_spilling_aggregate_payload( let (location, write_bytes) = spiller .spill_stream_aggregate_buffer(Some(location), write_data) .await?; + let elapsed = instant.elapsed(); // perf { Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillWriteCount, 1); @@ -236,7 +239,7 @@ pub fn agg_spilling_aggregate_payload( ); Profile::record_usize_profile( ProfileStatisticsName::RemoteSpillWriteTime, - instant.elapsed().as_millis() as usize, + elapsed.as_millis() as usize, ); } @@ -249,9 +252,8 @@ pub fn agg_spilling_aggregate_payload( } info!( - "Write aggregate spill {} successfully, elapsed: {:?}", - location, - instant.elapsed() + "Write aggregate spill finished(local): (location: {}, bytes: {}, rows: {}, buckets_count: {}, elapsed: {:?})", + location, write_bytes, rows, buckets_count, elapsed ); } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs index 9009f49141c58..65850f43e2af0 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs @@ -107,11 +107,13 @@ impl TransformExchangeAggregateSerializer { let spiller = if params.enable_experiment_aggregate { let spillers = partition_streams .into_iter() - .map(|stream| { + .enumerate() + .map(|(pos, stream)| { NewAggregateSpiller::try_create( ctx.clone(), MAX_AGGREGATE_HASHTABLE_BUCKETS_NUM as usize, stream.clone(), + pos == local_pos, ) }) .collect::>>()?; @@ -356,6 +358,7 @@ fn exchange_agg_spilling_aggregate_payload( ) -> Result>> { let partition_count = partitioned_payload.partition_count(); let mut write_size = 0; + let mut buckets_count = 0; let mut write_data = Vec::with_capacity(partition_count); let mut buckets_column_data = Vec::with_capacity(partition_count); let mut data_range_start_column_data = Vec::with_capacity(partition_count); @@ -371,6 +374,7 @@ fn exchange_agg_spilling_aggregate_payload( let data_block = payload.aggregate_flush_all()?; rows += data_block.num_rows(); + buckets_count += 1; let old_write_size = write_size; let columns = data_block.columns().to_vec(); @@ -398,6 +402,7 @@ fn exchange_agg_spilling_aggregate_payload( let (location, write_bytes) = spiller .spill_stream_aggregate_buffer(None, write_data) .await?; + let elapsed = instant.elapsed(); // perf { Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillWriteCount, 1); @@ -407,7 +412,7 @@ fn exchange_agg_spilling_aggregate_payload( ); Profile::record_usize_profile( ProfileStatisticsName::RemoteSpillWriteTime, - instant.elapsed().as_millis() as usize, + elapsed.as_millis() as usize, ); } @@ -422,9 +427,8 @@ fn exchange_agg_spilling_aggregate_payload( } info!( - "Write aggregate spill {} successfully, elapsed: {:?}", - location, - instant.elapsed() + "Write aggregate spill finished(exchange): (location: {}, bytes: {}, rows: {}, buckets_count: {}, elapsed: {:?})", + location, write_bytes, rows, buckets_count, elapsed ); let data_block = DataBlock::new_from_columns(vec![ diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_spill_reader.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_spill_reader.rs index b8c27283a5dad..fa26ada72e6e1 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_spill_reader.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_spill_reader.rs @@ -14,6 +14,9 @@ use std::any::Any; use std::collections::VecDeque; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Duration; use std::time::Instant; @@ -190,19 +193,40 @@ impl Processor for TransformSpillReader { .await? .to_vec(); + let elapsed = instant.elapsed(); + info!( - "Read aggregate spill {} successfully, elapsed: {:?}", + "Read aggregate finished: (location: {}, bytes: {}, elapsed: {:?})", &payload.location, - instant.elapsed() + data.len(), + elapsed ); + // perf + { + Profile::record_usize_profile( + ProfileStatisticsName::RemoteSpillReadCount, + 1, + ); + Profile::record_usize_profile( + ProfileStatisticsName::RemoteSpillReadBytes, + data.len(), + ); + Profile::record_usize_profile( + ProfileStatisticsName::RemoteSpillReadTime, + elapsed.as_millis() as usize, + ); + } + self.deserializing_meta = Some((block_meta, VecDeque::from(vec![data]))); } AggregateMeta::Partitioned { data, .. } => { // For log progress. - let mut total_elapsed = Duration::default(); let log_interval = 100; - let mut processed_count = 0; + let total_buckets = data.len(); + let total_elapsed = Arc::new(AtomicU64::new(0)); + let processed_count = Arc::new(AtomicUsize::new(0)); + let processed_bytes = Arc::new(AtomicUsize::new(0)); let mut read_data = Vec::with_capacity(data.len()); for meta in data { @@ -211,6 +235,9 @@ impl Processor for TransformSpillReader { let operator = self.operator.clone(); let data_range = payload.data_range.clone(); let semaphore = self.semaphore.clone(); + let total_elapsed = total_elapsed.clone(); + let processed_count = processed_count.clone(); + let processed_bytes = processed_bytes.clone(); read_data.push(databend_common_base::runtime::spawn(async move { let _guard = semaphore.acquire().await; let instant = Instant::now(); @@ -236,16 +263,21 @@ impl Processor for TransformSpillReader { ); } - total_elapsed += instant.elapsed(); - processed_count += 1; + let elapsed = instant.elapsed(); + total_elapsed + .fetch_add(elapsed.as_millis() as u64, Ordering::Relaxed); + let finished = processed_count.fetch_add(1, Ordering::Relaxed) + 1; + processed_bytes.fetch_add(data.len(), Ordering::Relaxed); // log the progress - if processed_count % log_interval == 0 { + if finished % log_interval == 0 { info!( "Read aggregate {}/{} spilled buckets, elapsed: {:?}", - processed_count, - data.len(), - total_elapsed + finished, + total_buckets, + Duration::from_millis( + total_elapsed.load(Ordering::Relaxed) + ) ); } @@ -266,10 +298,13 @@ impl Processor for TransformSpillReader { } }; + let processed_count = processed_count.load(Ordering::Relaxed); if processed_count != 0 { info!( - "Read {} aggregate spills successfully, total elapsed: {:?}", - processed_count, total_elapsed + "Read aggregate finished: (total count: {}, total bytes: {}, total elapsed: {:?})", + processed_count, + processed_bytes.load(Ordering::Relaxed), + Duration::from_millis(total_elapsed.load(Ordering::Relaxed)) ); } } diff --git a/src/query/service/src/spillers/async_buffer.rs b/src/query/service/src/spillers/async_buffer.rs index d3511c160de01..7b40b45b9f9da 100644 --- a/src/query/service/src/spillers/async_buffer.rs +++ b/src/query/service/src/spillers/async_buffer.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::cell::Cell; use std::collections::VecDeque; use std::io; use std::io::Write; @@ -532,6 +533,7 @@ pub struct SpillsDataReader { spills_buffer_pool: Arc, data_schema: DataSchemaRef, field_levels: FieldLevels, + read_bytes: usize, } impl SpillsDataReader { @@ -563,24 +565,38 @@ impl SpillsDataReader { data_schema, field_levels, row_groups: VecDeque::from(row_groups), + read_bytes: 0, }) } + pub fn read_bytes(&self) -> usize { + self.read_bytes + } + pub fn read(&mut self, settings: ReadSettings) -> Result> { let Some(row_group) = self.row_groups.pop_front() else { return Ok(None); }; let mut row_group = RowGroupCore::new(row_group, None); + + let read_bytes = Cell::new(0usize); + row_group.fetch(&ProjectionMask::all(), None, |fetch_ranges| { - self.spills_buffer_pool.fetch_ranges( + let chunk_data = self.spills_buffer_pool.fetch_ranges( self.operator.clone(), self.location.clone(), fetch_ranges, settings, - ) + )?; + let bytes_read = chunk_data.iter().map(|c| c.len()).sum::(); + read_bytes.set(read_bytes.get() + bytes_read); + + Ok(chunk_data) })?; + self.read_bytes += read_bytes.get(); + let num_rows = row_group.num_rows(); let mut reader = ParquetRecordBatchReader::try_new_with_row_groups( &self.field_levels, From 548e8bb3c1e064b4ef975648575e5dbd2bdf74da Mon Sep 17 00:00:00 2001 From: dqhl76 Date: Mon, 17 Nov 2025 17:26:10 +0800 Subject: [PATCH 2/5] chore: add more logs to cover aggregate spill --- .../aggregator/new_aggregate/new_aggregate_spiller.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs index 5b5643f37e9c3..2bf23f71fc910 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs @@ -179,7 +179,7 @@ impl AggregatePayloadWriters { if written_size != 0 { info!( "Write aggregate spill finished({}): (bucket: {}, location: {}, bytes: {}, rows: {}, batch_count: {})", - self.is_local.then(|| "local").unwrap_or("exchange"), + if self.is_local { "local" } else { "exchange" }, partition_id, path, written_size, From 0bd74a75a8f976564f3ea5a6a38169a67876ef00 Mon Sep 17 00:00:00 2001 From: dqhl76 Date: Mon, 17 Nov 2025 17:49:14 +0800 Subject: [PATCH 3/5] chore: eliminate warnings from sqllogictests for tpch --- tests/sqllogictests/suites/tpch/queries.test | 85 ++++++++++---------- 1 file changed, 42 insertions(+), 43 deletions(-) diff --git a/tests/sqllogictests/suites/tpch/queries.test b/tests/sqllogictests/suites/tpch/queries.test index 1f9a58ee91877..79728a0d16393 100644 --- a/tests/sqllogictests/suites/tpch/queries.test +++ b/tests/sqllogictests/suites/tpch/queries.test @@ -29,7 +29,7 @@ statement ok analyze table supplier # Q1 -query I +query TTRRRRRRRI select l_returnflag, l_linestatus, @@ -58,7 +58,7 @@ N O 74476040.00 111701729697.74 106118230307.6056 110367043872.497010 25.5022267 R F 37719753.00 56568041380.90 53741292684.6040 55889619119.831932 25.50579361 38250.85462610 0.05000941 1478870 # Q2 -query I +query RTTITTTT SELECT s_acctbal, s_name, @@ -106,7 +106,7 @@ LIMIT 100; 9938.53 Supplier#000005359 UNITED KINGDOM 185358 Manufacturer#4 bgxj2K0w1kJvxYl5mhCfou,W 33-429-790-6131 l, ironic instructions cajole 9937.84 Supplier#000005969 ROMANIA 108438 Manufacturer#1 rdnmd9c8EG1EIAYY3LPVa4yUNx6OwyVaQ 29-520-692-3537 es. furiously silent deposits among the deposits haggle furiously a 9936.22 Supplier#000005250 UNITED KINGDOM 249 Manufacturer#4 qX AB0vP8mJEWeBuY9jri 33-320-228-2957 ar, regular requests nag blithely special accounts. final deposits impress carefully. ironic, -9923.77 Supplier#000002324 GERMANY 29821 Manufacturer#4 uXcnR7tv87dG 17-779-299-1839 s sleep according to the quick requests. carefully +9923.77 Supplier#000002324 GERMANY 29821 Manufacturer#4 uXcnR7tv87dG 17-779-299-1839 s sleep according to the quick requests. carefully 9871.22 Supplier#000006373 GERMANY 43868 Manufacturer#5 iSLO35z7Ae 17-813-485-8637 against the slyly daring requests. unusual accounts wake atop the blithely spe 9870.78 Supplier#000001286 GERMANY 81285 Manufacturer#2 3gq0mZLHI5OTM6 tBYmLTHZaulCYnlECzQ7nj 17-516-924-4574 into beans haggle at the quickly final asymptotes. unusu 9870.78 Supplier#000001286 GERMANY 181285 Manufacturer#4 3gq0mZLHI5OTM6 tBYmLTHZaulCYnlECzQ7nj 17-516-924-4574 into beans haggle at the quickly final asymptotes. unusu @@ -118,22 +118,22 @@ LIMIT 100; 9817.10 Supplier#000002352 RUSSIA 124815 Manufacturer#2 XfLCj71HKHnPqgvs7KNgPKcOWoWxo2w 32-551-831-1437 al packages doze always according to the quickly f 9817.10 Supplier#000002352 RUSSIA 152351 Manufacturer#3 XfLCj71HKHnPqgvs7KNgPKcOWoWxo2w 32-551-831-1437 al packages doze always according to the quickly f 9739.86 Supplier#000003384 FRANCE 138357 Manufacturer#2 D01XwXbcILNwmrGS6ZPrVhZxO40i 16-494-913-5925 es. carefully regular ideas cajole. quickly ironic requests haggle. pending sentiment -9721.95 Supplier#000008757 UNITED KINGDOM 156241 Manufacturer#3 ryKUkEeWN7Z 33-821-407-2995 the instructions breach slyly +9721.95 Supplier#000008757 UNITED KINGDOM 156241 Manufacturer#3 ryKUkEeWN7Z 33-821-407-2995 the instructions breach slyly 9681.33 Supplier#000008406 RUSSIA 78405 Manufacturer#1 1A6x3PLy6F 32-139-873-8571 ons sleep express deposits. epitap 9643.55 Supplier#000005148 ROMANIA 107617 Manufacturer#1 H7WOI6lzFuSsWzTSBrhzTYV 29-252-617-4850 carefully platelets. packages sleep special ideas. quick -9624.82 Supplier#000001816 FRANCE 34306 Manufacturer#3 NTwQPSZwfhc4uu1EMvEDopBnEv2j P 16-392-237-6726 the express, regular accounts. regular decoys boost alongside of +9624.82 Supplier#000001816 FRANCE 34306 Manufacturer#3 NTwQPSZwfhc4uu1EMvEDopBnEv2j P 16-392-237-6726 the express, regular accounts. regular decoys boost alongside of 9624.78 Supplier#000009658 ROMANIA 189657 Manufacturer#1 DmRxpLmL88XCBiONB3tq3e0u 29-748-876-2014 inst the blithely brave frays. brav 9612.94 Supplier#000003228 ROMANIA 120715 Manufacturer#2 hnNBdhdXO4yT18 QNABTrL8fuv0A4p 29-325-784-8187 furiously foxes. express packages nag blithely express, pending ideas. fluffily ironi 9612.94 Supplier#000003228 ROMANIA 198189 Manufacturer#4 hnNBdhdXO4yT18 QNABTrL8fuv0A4p 29-325-784-8187 furiously foxes. express packages nag blithely express, pending ideas. fluffily ironi 9571.83 Supplier#000004305 ROMANIA 179270 Manufacturer#2 Bdj1T5EostLveb9ocRbz 29-973-481-1831 fully: fluffily special deposits use fur 9558.10 Supplier#000003532 UNITED KINGDOM 88515 Manufacturer#4 ncMxIJcDYZd5B7FlKxxLmnlzPeZB,FKBujB 33-152-301-2164 against the final pinto beans. carefully bold asymptotes use -9492.79 Supplier#000005975 GERMANY 25974 Manufacturer#5 9UEiIp7uSYtTF5 17-992-579-4839 fluffily ironic instructions haggle against the even, special accounts. quickly final +9492.79 Supplier#000005975 GERMANY 25974 Manufacturer#5 9UEiIp7uSYtTF5 17-992-579-4839 fluffily ironic instructions haggle against the even, special accounts. quickly final 9461.05 Supplier#000002536 UNITED KINGDOM 20033 Manufacturer#1 TEEkPusQ6rU18YvixE7IQtBDOyRBdGoOWl2r 33-556-973-5522 inal ideas cajole furiously. blithely special Tiresias against the b 9453.01 Supplier#000000802 ROMANIA 175767 Manufacturer#1 1Uj23QWxQjj7EyeqHWqGWTbN 29-342-882-6463 s according to the even deposits integrate express packages. express 9408.65 Supplier#000007772 UNITED KINGDOM 117771 Manufacturer#4 rIoV2rj0KNy,IT 33-152-491-1126 s nag quickly regular packages. carefully express pinto beans about th 9359.61 Supplier#000004856 ROMANIA 62349 Manufacturer#5 k2CKOmXhPruJZ 29-334-870-9731 es. final asymptotes wake carefully 9357.45 Supplier#000006188 UNITED KINGDOM 138648 Manufacturer#1 LS,Z0 zbSvC7GWjF 33-583-607-1633 somas cajole around the even, ironic deposits. pending theodolites according to the b -9352.04 Supplier#000003439 GERMANY 170921 Manufacturer#4 B2bnKDIpkJp2uHKp 17-128-996-4650 nusual frets cajole carefully beneath +9352.04 Supplier#000003439 GERMANY 170921 Manufacturer#4 B2bnKDIpkJp2uHKp 17-128-996-4650 nusual frets cajole carefully beneath 9312.97 Supplier#000007807 RUSSIA 90279 Manufacturer#5 Dk2ebpGR3jlpYbpMg9Djr 32-673-872-5854 . silent asymptotes boost. quickly ironic accounts for the 9312.97 Supplier#000007807 RUSSIA 100276 Manufacturer#5 Dk2ebpGR3jlpYbpMg9Djr 32-673-872-5854 . silent asymptotes boost. quickly ironic accounts for the 9280.27 Supplier#000007194 ROMANIA 47193 Manufacturer#3 tJ96aHp8 l3uiq38LiDHswtk9bHMg 29-318-454-2133 tes. carefully regular accounts are carefully since the waters. accounts cajole? carefully bold @@ -158,9 +158,9 @@ LIMIT 100; 8913.96 Supplier#000004603 UNITED KINGDOM 137063 Manufacturer#2 d6sFwf6 TD1xyfuFbdM2h8LX7ZWc3zHupV 33-789-255-7342 lithely whithout the furiously ironic sheaves. ironic reques 8877.82 Supplier#000007967 FRANCE 167966 Manufacturer#5 rXBIZqq9eWEuU90B vlCab6 16-442-147-9345 ckages-- evenly even requests boost blit 8862.24 Supplier#000003323 ROMANIA 73322 Manufacturer#3 5RrF2PzoRlwpAGXjyf 29-736-951-3710 regular ideas haggle blithely packages. regula -8841.59 Supplier#000005750 ROMANIA 100729 Manufacturer#5 n uXFrKx,KVYIQjmRuV,yejWmLMdRJnk 29-344-502-5481 leep finally furiously express packages. slyly unusual packages cajole unusual, +8841.59 Supplier#000005750 ROMANIA 100729 Manufacturer#5 n uXFrKx,KVYIQjmRuV,yejWmLMdRJnk 29-344-502-5481 leep finally furiously express packages. slyly unusual packages cajole unusual, 8781.71 Supplier#000003121 ROMANIA 13120 Manufacturer#5 wdA7CLuYXS22oQEmP0V,x0PHrXiPdl5Rpwv,ub 29-707-291-5144 ies. final foxes are furiou -8754.24 Supplier#000009407 UNITED KINGDOM 179406 Manufacturer#4 pj9oPHQ4OLWp 33-903-970-9604 ng asymptotes hang across the blithely special deposits. +8754.24 Supplier#000009407 UNITED KINGDOM 179406 Manufacturer#4 pj9oPHQ4OLWp 33-903-970-9604 ng asymptotes hang across the blithely special deposits. 8691.06 Supplier#000004429 UNITED KINGDOM 126892 Manufacturer#2 H0paE V6JCrlZpYrzI0LgIP 33-964-337-5038 sly requests might sleep. final dolphins sleep. furiousl 8655.99 Supplier#000006330 RUSSIA 193810 Manufacturer#2 7CsFQnd ,tzgMYvVoMim5l4DrJcX8SaQMTcy 32-561-198-3705 ideas wake across the regular, unusual instructions; furiously final deposits wake near the s 8638.36 Supplier#000002920 RUSSIA 75398 Manufacturer#1 iMYQSQzsLXg 32-122-621-7549 ickly dolphins. furiously careful asymptotes sublate @@ -171,13 +171,13 @@ LIMIT 100; 8553.82 Supplier#000003979 ROMANIA 143978 Manufacturer#4 qLE5JpqDoe3XHsBI6etWpd4zRsjsBNb9Tidi6 29-124-646-4897 counts are quickly carefully ironic instructions. platelets wake f 8517.23 Supplier#000009529 RUSSIA 37025 Manufacturer#5 NWW9SDThqi9RIeOA 32-565-297-8775 ial requests use stealthily along the carefully u 8517.23 Supplier#000009529 RUSSIA 59528 Manufacturer#2 NWW9SDThqi9RIeOA 32-565-297-8775 ial requests use stealthily along the carefully u -8503.70 Supplier#000006830 RUSSIA 44325 Manufacturer#4 qoW4lp2961uQiKOK6rW8 32-147-878-5069 atelets sleep furiously pending asymptotes. even requests for the blithely unusual packages +8503.70 Supplier#000006830 RUSSIA 44325 Manufacturer#4 qoW4lp2961uQiKOK6rW8 32-147-878-5069 atelets sleep furiously pending asymptotes. even requests for the blithely unusual packages 8457.09 Supplier#000009456 UNITED KINGDOM 19455 Manufacturer#1 U8pJ1 SKbZPhH7,bLWXX3pG 33-858-440-4349 ounts sleep about the bold, even ideas. slyly unusual accounts after the asymptotes 8441.40 Supplier#000003817 FRANCE 141302 Manufacturer#2 K6XLsYufTS 16-339-356-5115 sly fluffily regular pinto beans. slyly even deposits snooze fluffily along the fluff -8432.89 Supplier#000003990 RUSSIA 191470 Manufacturer#1 wMJppCZ9aPMuq2nr88TVfztvE gj95OG wdNUE 32-839-509-9301 . express pinto beans use slyly. regular platelets sleep quickly busy deposits. final -8431.40 Supplier#000002675 ROMANIA 5174 Manufacturer#1 khl8ydxR9VekbcMLgJKPtpNtwAkYtJTv 29-474-643-1443 regular, express platelets are. carefully ironic forges since the requests affix +8432.89 Supplier#000003990 RUSSIA 191470 Manufacturer#1 wMJppCZ9aPMuq2nr88TVfztvE gj95OG wdNUE 32-839-509-9301 . express pinto beans use slyly. regular platelets sleep quickly busy deposits. final +8431.40 Supplier#000002675 ROMANIA 5174 Manufacturer#1 khl8ydxR9VekbcMLgJKPtpNtwAkYtJTv 29-474-643-1443 regular, express platelets are. carefully ironic forges since the requests affix 8407.04 Supplier#000005406 RUSSIA 162889 Manufacturer#4 ITrK2mV94SooV6 Igo 32-626-152-4621 even theodolites. quickly bold deposits after the pen -8386.08 Supplier#000008518 FRANCE 36014 Manufacturer#3 ZHAsABq5MRP e5kc0DRD8za3xGdf763ChHmoOA45 16-618-780-7481 g alongside of the slyly unusual platelets! blithely regular asymptotes cajole. quickly regular +8386.08 Supplier#000008518 FRANCE 36014 Manufacturer#3 ZHAsABq5MRP e5kc0DRD8za3xGdf763ChHmoOA45 16-618-780-7481 g alongside of the slyly unusual platelets! blithely regular asymptotes cajole. quickly regular 8376.52 Supplier#000005306 UNITED KINGDOM 190267 Manufacturer#5 SyS2SsaA8i CqnbzUdfNH07bVtt9uW,Cp6FLCkOR 33-632-514-7931 pendencies affix furiously against the special, blithe packages. qui 8348.74 Supplier#000008851 FRANCE 66344 Manufacturer#4 E4uITlvmPHKvZ 16-796-240-2472 s packages haggle above the express pinto beans. stealthy, ironic theodolites sleep quickly 8338.58 Supplier#000007269 FRANCE 17268 Manufacturer#4 2vJh8wqp6CJp,W0Y 16-267-277-4365 lithely through the accounts. express, ironic asymptotes wou @@ -185,11 +185,11 @@ LIMIT 100; 8307.93 Supplier#000003142 GERMANY 18139 Manufacturer#1 OAPFw6SNodrC kFi 17-595-447-6026 usly express packages sleep finally regular ideas. carefu 8231.61 Supplier#000009558 RUSSIA 192000 Manufacturer#2 FONKME0t7ZJhnjn9VL5 32-762-137-5858 g to the carefully even brai 8152.61 Supplier#000002731 ROMANIA 15227 Manufacturer#4 sDFx3iox2Zzx 29-805-463-2030 ly above the packages. final accounts sleep furiously. fluffily iro -8109.09 Supplier#000009186 FRANCE 99185 Manufacturer#1 wKLCzA5bMuGRBm35tvQAGpen23L 16-668-570-1402 ts cajole daringly. pinto beans +8109.09 Supplier#000009186 FRANCE 99185 Manufacturer#1 wKLCzA5bMuGRBm35tvQAGpen23L 16-668-570-1402 ts cajole daringly. pinto beans 8102.62 Supplier#000003347 UNITED KINGDOM 18344 Manufacturer#5 Froy39Y8ZUJ 33-454-274-8532 y daring requests. unusual accounts wake atop the blithely special packages. sly 8046.07 Supplier#000008780 FRANCE 191222 Manufacturer#3 rOssxn,6gRDzHr0gu,hEK 16-473-215-6395 he regular foxes cajole ruthlessly among the sometimes final grouches. blithel -8042.09 Supplier#000003245 RUSSIA 135705 Manufacturer#4 oJSiGLXRCDAPcfWot7LkwSQRCh63XNS2 32-836-132-8872 use slyly. furiously regular deposits sleep according to the requests. -8042.09 Supplier#000003245 RUSSIA 150729 Manufacturer#1 oJSiGLXRCDAPcfWot7LkwSQRCh63XNS2 32-836-132-8872 use slyly. furiously regular deposits sleep according to the requests. +8042.09 Supplier#000003245 RUSSIA 135705 Manufacturer#4 oJSiGLXRCDAPcfWot7LkwSQRCh63XNS2 32-836-132-8872 use slyly. furiously regular deposits sleep according to the requests. +8042.09 Supplier#000003245 RUSSIA 150729 Manufacturer#1 oJSiGLXRCDAPcfWot7LkwSQRCh63XNS2 32-836-132-8872 use slyly. furiously regular deposits sleep according to the requests. 7992.40 Supplier#000006108 FRANCE 118574 Manufacturer#1 TyptNE7nv6BLpLl6WFX 16-974-998-8937 theodolites among the furiously unusual accounts must x 7980.65 Supplier#000001288 FRANCE 13784 Manufacturer#4 tm0TjL5b oE 16-646-464-8247 gular pains? fluffily bold warhorses affix? blithe instruction 7950.37 Supplier#000008101 GERMANY 33094 Manufacturer#5 HG2wfVixwCIhK7dlrigGR3an2LuSifDJH 17-627-663-8014 ly alongside of the furiously unusual requests! bold, express foxe @@ -199,13 +199,13 @@ LIMIT 100; 7912.91 Supplier#000004211 GERMANY 184210 Manufacturer#4 Zva95Dwj EY0w,XjgsL7O0Zb2 l3almck 17-266-947-7315 slyly silent requests; fluffily fi 7894.56 Supplier#000007981 GERMANY 85472 Manufacturer#4 e8hRUxe9cqQM3b 17-963-404-3760 ly final courts. unusual, quiet dolphi 7887.08 Supplier#000009792 GERMANY 164759 Manufacturer#3 3YSi76M2 I8XGikO5YgSM81r5Z6A7VkZcys 17-988-938-4296 the regular ideas. furiously bold deposits boost above the bli -7871.50 Supplier#000007206 RUSSIA 104695 Manufacturer#1 YvrLdpD 5ExhHmRWzK41tw4 32-432-452-7731 ording to the furious theodolites cajole carefully according to the busily express asymptotes. +7871.50 Supplier#000007206 RUSSIA 104695 Manufacturer#1 YvrLdpD 5ExhHmRWzK41tw4 32-432-452-7731 ording to the furious theodolites cajole carefully according to the busily express asymptotes. 7852.45 Supplier#000005864 RUSSIA 8363 Manufacturer#4 5odLpc1M83KXJ0O 32-454-883-3821 egular, regular ideas. requests are carefully. furiously final dolp 7850.66 Supplier#000001518 UNITED KINGDOM 86501 Manufacturer#1 ddNQX3hIjgico 33-730-383-3892 ccounts. special, final deposits 7843.52 Supplier#000006683 FRANCE 11680 Manufacturer#4 Z1,hkHIw,Z3,,Comv6kLxIiPJtoNt 16-464-517-8943 sits. blithely regular requests above the pending, regular ideas boo # Q3 -query I +query IRTI SELECT l_orderkey, sum(l_extendedprice * (1 - l_discount)) AS revenue, @@ -242,7 +242,7 @@ LIMIT 10; 2300070 367371.1452 1995-03-13 0 # Q4 -query I +query TI select o_orderpriority, count(*) as order_count @@ -272,7 +272,7 @@ order by 5-LOW 10487 # Q5 -query I +query TR SELECT n_name, sum(l_extendedprice * (1 - l_discount)) AS revenue @@ -305,7 +305,7 @@ INDIA 52035512.0002 JAPAN 45410175.6954 # Q6 -query I +query R select truncate(sum(l_extendedprice * l_discount),3) as revenue from @@ -319,7 +319,7 @@ where 123141078.228 # Q7 -query I +query TTTR select supp_nation, cust_nation, @@ -366,7 +366,7 @@ GERMANY FRANCE 1995 52531746.669 GERMANY FRANCE 1996 52520549.022 # Q8 -query I +query TR select o_year, truncate(sum(case @@ -409,7 +409,7 @@ order by 1996 0.04148552 # Q9 -query I +query TTR SELECT nation, o_year, @@ -618,7 +618,7 @@ VIETNAM 1993 45352676.8672 VIETNAM 1992 47846355.6485 # Q10 -query I +query ITRRTTTT select c_custkey, c_name, @@ -652,16 +652,16 @@ order by revenue desc limit 20; ---- 57040 Customer#000057040 734235.245 632.87 JAPAN nICtsILWBB 22-895-641-3466 ep. blithely regular foxes promise slyly furiously ironic depend -143347 Customer#000143347 721002.694 2557.47 EGYPT ,Q9Ml3w0gvX 14-742-935-3718 endencies sleep. slyly express deposits nag carefully around the even tithes. slyly regular +143347 Customer#000143347 721002.694 2557.47 EGYPT ,Q9Ml3w0gvX 14-742-935-3718 endencies sleep. slyly express deposits nag carefully around the even tithes. slyly regular 60838 Customer#000060838 679127.307 2454.77 BRAZIL VWmQhWweqj5hFpcvhGFBeOY9hJ4m 12-913-494-9813 tes. final instructions nag quickly according to 101998 Customer#000101998 637029.566 3790.89 UNITED KINGDOM 0,ORojfDdyMca2E2H 33-593-865-6378 ost carefully. slyly regular packages cajole about the blithely final ideas. permanently daring deposit -125341 Customer#000125341 633508.086 4983.51 GERMANY 9YRcnoUPOM7Sa8xymhsDHdQg 17-582-695-5962 ly furiously brave packages. quickly regular dugouts kindle furiously carefully bold theodolites. -25501 Customer#000025501 620269.784 7725.04 ETHIOPIA sr4VVVe3xCJQ2oo2QEhi19D,pXqo6kOGaSn2 15-874-808-6793 y ironic foxes hinder according to the furiously permanent dolphins. pending ideas integrate blithely from +125341 Customer#000125341 633508.086 4983.51 GERMANY 9YRcnoUPOM7Sa8xymhsDHdQg 17-582-695-5962 ly furiously brave packages. quickly regular dugouts kindle furiously carefully bold theodolites. +25501 Customer#000025501 620269.784 7725.04 ETHIOPIA sr4VVVe3xCJQ2oo2QEhi19D,pXqo6kOGaSn2 15-874-808-6793 y ironic foxes hinder according to the furiously permanent dolphins. pending ideas integrate blithely from 115831 Customer#000115831 596423.867 5098.10 FRANCE AlMpPnmtGrOFrDMUs5VLo EIA,Cg,Rw5TBuBoKiO 16-715-386-3788 unts nag carefully final packages. express theodolites are regular ac 84223 Customer#000084223 594998.023 528.65 UNITED KINGDOM Eq51o UpQ4RBr fYTdrZApDsPV4pQyuPq 33-442-824-8191 longside of the slyly final deposits. blithely final platelets about the blithely i 54289 Customer#000054289 585603.391 5583.02 IRAN x3ouCpz6,pRNVhajr0CCQG1 20-834-292-4707 cajole furiously after the quickly unusual fo 39922 Customer#000039922 584878.113 7321.11 GERMANY 2KtWzW,FYkhdWBfobp6SFXWYKjvU9 17-147-757-8036 ironic deposits sublate furiously. carefully regular theodolites along the b -6226 Customer#000006226 576783.760 2230.09 UNITED KINGDOM TKbxS1dbkGMtaa,KOi26lbip4P0tPbWK0 33-657-701-3391 nal packages are alongside of the quickly bold deposits. carefully +6226 Customer#000006226 576783.760 2230.09 UNITED KINGDOM TKbxS1dbkGMtaa,KOi26lbip4P0tPbWK0 33-657-701-3391 nal packages are alongside of the quickly bold deposits. carefully 922 Customer#000000922 576767.533 3869.25 GERMANY rsR9lRxyTdHbDOVt8nYbwjK5vAWH9sB 17-945-916-9648 cuses cajole carefully regular idea 147946 Customer#000147946 576455.132 2030.13 ALGERIA Jqdt1kHAJtuTqHQK,B7 3tJh 10-886-956-3143 ly pending platelets. ironic requests haggle alongside of the furiou 115640 Customer#000115640 569341.193 6436.10 ARGENTINA 6yKLIRRAirUmBjKNO6Z3 11-411-543-4901 ffily ironic deposits. blithely specia @@ -673,7 +673,7 @@ order by 23431 Customer#000023431 554269.536 3381.86 ROMANIA kKI5,CJAJQjQRQtOdCiFQ 29-915-458-2654 the final sentiments. carefully ironic packages # Q11 -query I +query IR SELECT ps_partkey, sum(ps_supplycost * ps_availqty) AS value @@ -1752,7 +1752,7 @@ ORDER BY 5182 7874521.73 # Q12 -query I +query TII select l_shipmode, sum(case @@ -1786,7 +1786,7 @@ MAIL 6202 9324 SHIP 6200 9262 # Q13 -query I +query II SELECT c_count, count(*) AS custdist @@ -1851,7 +1851,7 @@ ORDER BY 39 1 # Q14 -query I +query R select TRUNCATE(100.00 * sum(case when p_type like 'PROMO%' @@ -1869,7 +1869,7 @@ where 16.38077 # Q15 -query I +query ITTTR with revenue as ( select l_suppkey as supplier_no, @@ -1904,7 +1904,7 @@ order by 8449 Supplier#000008449 5BXWsJERA2mP5OyO4 20-469-856-8873 1772627.20 # Q15 with materialized cte -query I +query ITTTR with revenue as materialized ( select l_suppkey as supplier_no, @@ -1940,7 +1940,7 @@ order by # Q16 -query I +query TTII select p_brand, p_type, @@ -20288,8 +20288,7 @@ Brand#55 PROMO PLATED BRASS 19 3 Brand#55 STANDARD PLATED TIN 49 3 # Q17 - -query I +query R select truncate(sum(l_extendedprice) / 7.0,8) as avg_yearly from @@ -20311,7 +20310,7 @@ where 348406.05428571 # Q18 -query I +query TIITRR select c_name, c_custkey, @@ -20404,7 +20403,7 @@ Customer#000082441 82441 857959 1994-02-07 382579.74 305.00 Customer#000088703 88703 2995076 1994-01-30 363812.12 302.00 # Q19 -query I +query R SELECT sum(l_extendedprice * (1 - l_discount)) AS revenue FROM @@ -20438,7 +20437,7 @@ WHERE (p_partkey = l_partkey 3083843.0578 # Q20 -query I +query TT SELECT s_name, s_address @@ -20662,7 +20661,7 @@ Supplier#000009899 U3NBqk s Zz06al2m Supplier#000009974 Uvh0hWngOu96WgB,OafBQOqwpWqzwg8 # Q21 -query I +query TI SELECT s_name, count(*) AS numwait @@ -20804,7 +20803,7 @@ Supplier#000002357 12 Supplier#000002483 12 # Q22 -query I +query TIR select cntrycode, count(*) as numcust, From 6c4b466d70fc8ee9549d92fe1202447ba20e7180 Mon Sep 17 00:00:00 2001 From: dqhl76 Date: Tue, 18 Nov 2025 10:32:09 +0800 Subject: [PATCH 4/5] chore: adjust log style --- .../serde/transform_spill_reader.rs | 79 ++++++++++--------- 1 file changed, 40 insertions(+), 39 deletions(-) diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_spill_reader.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_spill_reader.rs index fa26ada72e6e1..38094460363ef 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_spill_reader.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_spill_reader.rs @@ -14,7 +14,6 @@ use std::any::Any; use std::collections::VecDeque; -use std::sync::atomic::AtomicU64; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; use std::sync::Arc; @@ -34,10 +33,11 @@ use databend_common_pipeline::core::InputPort; use databend_common_pipeline::core::OutputPort; use databend_common_pipeline::core::Processor; use databend_common_pipeline::core::ProcessorPtr; -use itertools::Itertools; +use futures::future::try_join_all; use log::info; use opendal::Operator; use tokio::sync::Semaphore; +use tokio::task::JoinHandle; use crate::pipelines::processors::transforms::aggregator::AggregateMeta; use crate::pipelines::processors::transforms::aggregator::BucketSpilledPayload; @@ -220,25 +220,24 @@ impl Processor for TransformSpillReader { self.deserializing_meta = Some((block_meta, VecDeque::from(vec![data]))); } - AggregateMeta::Partitioned { data, .. } => { - // For log progress. - let log_interval = 100; - let total_buckets = data.len(); - let total_elapsed = Arc::new(AtomicU64::new(0)); + AggregateMeta::Partitioned { data, bucket } => { + let bucket = *bucket; + let total_task = data + .iter() + .filter(|meta| matches!(meta, AggregateMeta::BucketSpilled(_))) + .count(); + let processed_count = Arc::new(AtomicUsize::new(0)); - let processed_bytes = Arc::new(AtomicUsize::new(0)); - let mut read_data = Vec::with_capacity(data.len()); - for meta in data { + let mut read_tasks: Vec, Duration)>>> = Vec::new(); + for meta in data.iter() { if let AggregateMeta::BucketSpilled(payload) = meta { let location = payload.location.clone(); let operator = self.operator.clone(); let data_range = payload.data_range.clone(); let semaphore = self.semaphore.clone(); - let total_elapsed = total_elapsed.clone(); let processed_count = processed_count.clone(); - let processed_bytes = processed_bytes.clone(); - read_data.push(databend_common_base::runtime::spawn(async move { + read_tasks.push(databend_common_base::runtime::spawn(async move { let _guard = semaphore.acquire().await; let instant = Instant::now(); let data = operator @@ -264,47 +263,49 @@ impl Processor for TransformSpillReader { } let elapsed = instant.elapsed(); - total_elapsed - .fetch_add(elapsed.as_millis() as u64, Ordering::Relaxed); - let finished = processed_count.fetch_add(1, Ordering::Relaxed) + 1; - processed_bytes.fetch_add(data.len(), Ordering::Relaxed); // log the progress - if finished % log_interval == 0 { + let finished = processed_count.fetch_add(1, Ordering::Relaxed) + 1; + if finished % 100 == 0 { info!( - "Read aggregate {}/{} spilled buckets, elapsed: {:?}", - finished, - total_buckets, - Duration::from_millis( - total_elapsed.load(Ordering::Relaxed) - ) + "Read aggregate {}/{} spilled task, elapsed: {:?}", + finished, total_task, elapsed ); } - Ok(data) + Ok((data, elapsed)) })); } } - match futures::future::try_join_all(read_data).await { - Err(_) => { - return Err(ErrorCode::TokioError("Cannot join tokio job")); - } - Ok(read_data) => { - let read_data: std::result::Result>, opendal::Error> = - read_data.into_iter().try_collect(); + let read_tasks = try_join_all(read_tasks) + .await + .map_err(|_| ErrorCode::TokioError("Cannot join tokio job"))?; - self.deserializing_meta = Some((block_meta, read_data?)); - } - }; + let mut processed_count = 0usize; + let mut processed_bytes = 0usize; + let mut total_elapsed = 0u64; + let mut read_data = Vec::with_capacity(read_tasks.len()); + for result in read_tasks { + let (data, elapsed) = result?; + + processed_count += 1; + processed_bytes += data.len(); + total_elapsed += elapsed.as_millis() as u64; + + read_data.push(data); + } + + let read_data: VecDeque> = VecDeque::from(read_data); + self.deserializing_meta = Some((block_meta, read_data)); - let processed_count = processed_count.load(Ordering::Relaxed); if processed_count != 0 { info!( - "Read aggregate finished: (total count: {}, total bytes: {}, total elapsed: {:?})", + "Read aggregate finished: (bucket: {}, total read count: {}, total bytes: {}, total elapsed: {:?})", + bucket, processed_count, - processed_bytes.load(Ordering::Relaxed), - Duration::from_millis(total_elapsed.load(Ordering::Relaxed)) + processed_bytes, + Duration::from_millis(total_elapsed) ); } } From 8d4b4aa7401e58cfd854a785e93693b59c6f4643 Mon Sep 17 00:00:00 2001 From: dqhl76 Date: Tue, 18 Nov 2025 11:10:39 +0800 Subject: [PATCH 5/5] chore: adjust log level in cluster mode --- scripts/ci/deploy/config/databend-query-node-2.toml | 2 +- scripts/ci/deploy/config/databend-query-node-3.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/scripts/ci/deploy/config/databend-query-node-2.toml b/scripts/ci/deploy/config/databend-query-node-2.toml index 185381f51e55b..6ee0adf5d14db 100644 --- a/scripts/ci/deploy/config/databend-query-node-2.toml +++ b/scripts/ci/deploy/config/databend-query-node-2.toml @@ -56,7 +56,7 @@ definition = "CREATE FUNCTION ping(STRING) RETURNS STRING LANGUAGE python HANDLE [log] [log.file] -level = "INFO" +level = "DEBUG" format = "text" dir = "./.databend/logs_2" limit = 12 # 12 files, 1 file per hour diff --git a/scripts/ci/deploy/config/databend-query-node-3.toml b/scripts/ci/deploy/config/databend-query-node-3.toml index 32dd045e0cb59..8f7cbf0c8b7cc 100644 --- a/scripts/ci/deploy/config/databend-query-node-3.toml +++ b/scripts/ci/deploy/config/databend-query-node-3.toml @@ -57,7 +57,7 @@ definition = "CREATE FUNCTION ping(STRING) RETURNS STRING LANGUAGE python HANDLE [log] [log.file] -level = "INFO" +level = "DEBUG" format = "text" limit = 12 # 12 files, 1 file per hour dir = "./.databend/logs_3"