diff --git a/Cargo.lock b/Cargo.lock index 9ee7384f8d406..a99f0b51e3a8f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2110,7 +2110,6 @@ dependencies = [ "common-storage", "futures", "futures-util", - "itertools", "jsonb", "metrics", "opendal", diff --git a/scripts/benchmark/query/load/tpch10.sh b/scripts/benchmark/query/load/tpch10.sh index a677e5ad79793..427409d7ad3c3 100755 --- a/scripts/benchmark/query/load/tpch10.sh +++ b/scripts/benchmark/query/load/tpch10.sh @@ -7,7 +7,7 @@ select version(); SQL for t in customer lineitem nation orders partsupp part region supplier; do - echo "DROP TABLE IF EXISTS $t;" | bendsql query + echo "DROP TABLE IF EXISTS $t;" | bendsql query done cat < HashSet { - HashSet::from_iter(self.to_column_ids().iter().cloned()) + pub fn to_leaf_column_id_set(&self) -> HashSet { + HashSet::from_iter(self.to_leaf_column_ids().iter().cloned()) } pub fn to_column_ids(&self) -> Vec { diff --git a/src/query/service/tests/it/storages/fuse/operations/alter_table.rs b/src/query/service/tests/it/storages/fuse/operations/alter_table.rs index f3ed67f7bb8a3..0db2e59a4e518 100644 --- a/src/query/service/tests/it/storages/fuse/operations/alter_table.rs +++ b/src/query/service/tests/it/storages/fuse/operations/alter_table.rs @@ -75,7 +75,7 @@ async fn check_segment_column_ids( location: snapshot_loc.clone(), len_hint: None, ver: TableSnapshot::VERSION, - put_cache: true, + put_cache: false, }; let snapshot = snapshot_reader.read(¶ms).await?; @@ -100,7 +100,7 @@ async fn check_segment_column_ids( location: seg_loc.clone(), len_hint: None, ver: SegmentInfo::VERSION, - put_cache: true, + put_cache: false, }; let segment_info = segment_reader.read(¶ms).await?; diff --git a/src/query/service/tests/it/storages/fuse/operations/clustering.rs b/src/query/service/tests/it/storages/fuse/operations/clustering.rs index 6dade2126b871..f9c8257e11632 100644 --- a/src/query/service/tests/it/storages/fuse/operations/clustering.rs +++ b/src/query/service/tests/it/storages/fuse/operations/clustering.rs @@ -91,7 +91,7 @@ async fn test_fuse_alter_table_cluster_key() -> common_exception::Result<()> { location: snapshot_loc.clone(), len_hint: None, ver: TableSnapshot::VERSION, - put_cache: true, + put_cache: false, }; let snapshot = reader.read(&load_params).await?; @@ -127,7 +127,7 @@ async fn test_fuse_alter_table_cluster_key() -> common_exception::Result<()> { location: snapshot_loc.clone(), len_hint: None, ver: TableSnapshot::VERSION, - put_cache: true, + put_cache: false, }; let snapshot = reader.read(¶ms).await?; diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs index 49e68727851b5..c9c82e40c4d10 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs @@ -31,6 +31,7 @@ use common_storage::DataOperator; use common_storages_fuse::io::MetaReaders; use common_storages_fuse::io::SegmentInfoReader; use common_storages_fuse::io::SegmentWriter; +use common_storages_fuse::io::SegmentsIO; use common_storages_fuse::io::TableMetaLocationGenerator; use common_storages_fuse::operations::CompactOptions; use common_storages_fuse::operations::SegmentCompactMutator; @@ -617,10 +618,9 @@ async fn test_segment_compactor() -> Result<()> { struct CompactSegmentTestFixture { threshold: u64, + ctx: Arc, data_accessor: DataOperator, location_gen: TableMetaLocationGenerator, - input_segments: Vec>, - input_segment_locations: Vec, // blocks of input_segments, order by segment input_blocks: Vec, } @@ -630,11 +630,10 @@ impl CompactSegmentTestFixture { let location_gen = TableMetaLocationGenerator::with_prefix("test/".to_owned()); let data_accessor = ctx.get_data_operator()?; Ok(Self { + ctx: ctx.clone(), threshold: block_per_seg, data_accessor, location_gen, - input_segments: vec![], - input_segment_locations: vec![], input_blocks: vec![], }) } @@ -649,17 +648,26 @@ impl CompactSegmentTestFixture { let location_gen = &self.location_gen; let block_writer = BlockWriter::new(data_accessor, location_gen); + let schema = TestFixture::default_table_schema(); + let fuse_segment_io = SegmentsIO::create(self.ctx.clone(), data_accessor.clone(), schema); + let max_io_requests = self.ctx.get_settings().get_max_storage_io_requests()? as usize; + let segment_writer = SegmentWriter::new(data_accessor, location_gen); - let seg_acc = SegmentCompactor::new(block_per_seg, segment_writer.clone()); + let seg_acc = SegmentCompactor::new( + block_per_seg, + max_io_requests, + &fuse_segment_io, + segment_writer.clone(), + ); - let (segments, locations, blocks) = + let (locations, blocks) = Self::gen_segments(&block_writer, &segment_writer, num_block_of_segments).await?; - self.input_segments = segments; - self.input_segment_locations = locations; self.input_blocks = blocks; let limit = limit.unwrap_or(usize::MAX); seg_acc - .compact(&self.input_segments, &self.input_segment_locations, limit) + .compact(locations, limit, |status| { + self.ctx.set_status_info(&status); + }) .await } @@ -667,11 +675,10 @@ impl CompactSegmentTestFixture { block_writer: &BlockWriter<'_>, segment_writer: &SegmentWriter<'_>, block_num_of_segments: &[usize], - ) -> Result<(Vec>, Vec, Vec)> { - let mut segments = vec![]; + ) -> Result<(Vec, Vec)> { let mut locations = vec![]; let mut collected_blocks = vec![]; - for num_blocks in block_num_of_segments { + for num_blocks in block_num_of_segments.iter().rev() { let (schema, blocks) = TestFixture::gen_sample_blocks_ex(*num_blocks, 1, 1); let mut stats_acc = StatisticsAccumulator::default(); for block in blocks { @@ -696,11 +703,10 @@ impl CompactSegmentTestFixture { col_stats, }); let location = segment_writer.write_segment_no_cache(&segment_info).await?; - segments.push(Arc::new(segment_info)); locations.push(location); } - Ok((segments, locations, collected_blocks)) + Ok((locations, collected_blocks)) } // verify that newly generated segments contain the proper number of blocks @@ -716,7 +722,7 @@ impl CompactSegmentTestFixture { location: x.to_string(), len_hint: None, ver: SegmentInfo::VERSION, - put_cache: true, + put_cache: false, }; let seg = segment_reader.read(&load_params).await?; @@ -791,13 +797,12 @@ impl CompactCase { let mut block_num_of_output_segments = vec![]; // 4. input blocks should be there and in the original order - // for location in r.segments_locations.iter().rev() { - for location in r.segments_locations.iter() { + for location in r.segments_locations.iter().rev() { let load_params = LoadParams { location: location.0.clone(), len_hint: None, ver: location.1, - put_cache: true, + put_cache: false, }; let segment = segment_reader.read(&load_params).await?; @@ -815,6 +820,7 @@ impl CompactCase { idx += 1; } } + block_num_of_output_segments.reverse(); // 5. statistics should be the same assert_eq!( diff --git a/src/query/service/tests/it/storages/fuse/pruning.rs b/src/query/service/tests/it/storages/fuse/pruning.rs index b85af2844e8d1..dd5bca634630a 100644 --- a/src/query/service/tests/it/storages/fuse/pruning.rs +++ b/src/query/service/tests/it/storages/fuse/pruning.rs @@ -163,7 +163,7 @@ async fn test_block_pruner() -> Result<()> { location: snapshot_loc.clone(), len_hint: None, ver: TableSnapshot::VERSION, - put_cache: true, + put_cache: false, }; let snapshot = reader.read(&load_params).await?; diff --git a/src/query/service/tests/it/storages/fuse/statistics.rs b/src/query/service/tests/it/storages/fuse/statistics.rs index 9e4fe112178ee..005af45ded376 100644 --- a/src/query/service/tests/it/storages/fuse/statistics.rs +++ b/src/query/service/tests/it/storages/fuse/statistics.rs @@ -155,7 +155,7 @@ fn test_ft_stats_col_stats_reduce() -> common_exception::Result<()> { .iter() .map(|b| gen_columns_statistics(&b.clone().unwrap(), None, &schema)) .collect::>>()?; - let r = reducers::reduce_block_statistics(&col_stats, None); + let r = reducers::reduce_block_statistics(&col_stats); assert!(r.is_ok()); let r = r.unwrap(); assert_eq!(3, r.len()); @@ -210,7 +210,7 @@ fn test_reduce_block_statistics_in_memory_size() -> common_exception::Result<()> // combine two statistics let col_stats_left = HashMap::from_iter(iter(0).take(num_of_cols)); let col_stats_right = HashMap::from_iter(iter(0).take(num_of_cols)); - let r = reducers::reduce_block_statistics(&[col_stats_left, col_stats_right], None)?; + let r = reducers::reduce_block_statistics(&[col_stats_left, col_stats_right])?; assert_eq!(num_of_cols, r.len()); // there should be 100 columns in the result for idx in 1..=100 { diff --git a/src/query/storages/fuse/Cargo.toml b/src/query/storages/fuse/Cargo.toml index d84be0c402584..610ba2ab56907 100644 --- a/src/query/storages/fuse/Cargo.toml +++ b/src/query/storages/fuse/Cargo.toml @@ -44,7 +44,6 @@ backoff = { version = "0.4.0", features = ["futures", "tokio"] } chrono = { workspace = true } futures = "0.3.24" futures-util = "0.3.24" -itertools = "0.10.5" metrics = "0.20.1" opendal = { workspace = true } serde = { workspace = true } diff --git a/src/query/storages/fuse/src/io/write/block_writer.rs b/src/query/storages/fuse/src/io/write/block_writer.rs index 3b1006c49b925..64a41ad8a5011 100644 --- a/src/query/storages/fuse/src/io/write/block_writer.rs +++ b/src/query/storages/fuse/src/io/write/block_writer.rs @@ -33,7 +33,7 @@ use crate::io::write::WriteSettings; use crate::io::TableMetaLocationGenerator; use crate::operations::util; use crate::operations::BloomIndexState; -use crate::statistics::BlockStatistics; +use crate::statistics::gen_columns_statistics; // TODO rename this, it is serialization, or pass in a writer(if not rename) pub fn serialize_block( @@ -122,14 +122,10 @@ impl BlockBuilder { // TODO, generate the cluster stats let cluster_stats = None; - // need to use BlockStatistics any more? - let block_statistics = BlockStatistics::from( - &data_block, - block_location.0.clone(), - cluster_stats, - column_distinct_count, - &self.source_schema, - )?; + let row_count = data_block.num_rows() as u64; + let block_size = data_block.memory_size() as u64; + let col_stats = + gen_columns_statistics(&data_block, column_distinct_count, &self.source_schema)?; let mut buffer = Vec::with_capacity(DEFAULT_BLOCK_BUFFER_SIZE); let (file_size, col_metas) = serialize_block( @@ -140,12 +136,12 @@ impl BlockBuilder { )?; let block_meta = BlockMeta { - row_count: block_statistics.block_rows_size, - block_size: block_statistics.block_bytes_size, + row_count, + block_size, file_size, - col_stats: block_statistics.block_column_statistics, + col_stats, col_metas, - cluster_stats: block_statistics.block_cluster_statistics, + cluster_stats, location: block_location, bloom_filter_index_location: bloom_index_state.as_ref().map(|v| v.location.clone()), bloom_filter_index_size: bloom_index_state diff --git a/src/query/storages/fuse/src/operations/commit.rs b/src/query/storages/fuse/src/operations/commit.rs index 1e3d01d177030..0162b2fedc5fa 100644 --- a/src/query/storages/fuse/src/operations/commit.rs +++ b/src/query/storages/fuse/src/operations/commit.rs @@ -462,7 +462,7 @@ impl FuseTable { acc.col_stats = if acc.col_stats.is_empty() { stats.col_stats.clone() } else { - statistics::reduce_block_statistics(&[&acc.col_stats, &stats.col_stats], None)? + statistics::reduce_block_statistics(&[&acc.col_stats, &stats.col_stats])? }; seg_acc.push(location.clone()); Ok::<_, ErrorCode>((acc, seg_acc)) @@ -521,6 +521,13 @@ impl FuseTable { // potentially concurrently appended segments, init it to empty let mut concurrently_appended_segment_locations: &[Location] = &[]; + // Status + { + let status = "mutation: begin try to commit"; + ctx.set_status_info(status); + info!(status); + } + while retries < MAX_RETRIES { let mut snapshot_tobe_committed = TableSnapshot::from_previous(latest_snapshot.as_ref()); diff --git a/src/query/storages/fuse/src/operations/compact.rs b/src/query/storages/fuse/src/operations/compact.rs index 3a4127c603738..ef2344df678d6 100644 --- a/src/query/storages/fuse/src/operations/compact.rs +++ b/src/query/storages/fuse/src/operations/compact.rs @@ -16,20 +16,17 @@ use std::sync::Arc; use common_catalog::plan::Projection; use common_catalog::table::CompactTarget; -use common_exception::ErrorCode; use common_exception::Result; -use common_pipeline_core::pipe::Pipe; -use common_pipeline_core::pipe::PipeItem; +use common_pipeline_core::processors::processor::ProcessorPtr; +use common_pipeline_transforms::processors::transforms::AsyncAccumulatingTransformer; use storages_common_table_meta::meta::TableSnapshot; +use tracing::info; use crate::operations::mutation::BlockCompactMutator; +use crate::operations::mutation::CompactAggregator; use crate::operations::mutation::CompactSource; -use crate::operations::mutation::CompactTransform; -use crate::operations::mutation::MergeSegmentsTransform; use crate::operations::mutation::MutationSink; use crate::operations::mutation::SegmentCompactMutator; -use crate::pipelines::processors::port::InputPort; -use crate::pipelines::processors::port::OutputPort; use crate::pipelines::Pipeline; use crate::FuseTable; use crate::Table; @@ -105,13 +102,13 @@ impl FuseTable { } /// The flow of Pipeline is as follows: - /// +--------------+ +-----------------+ - /// |CompactSource1| ---> |CompactTransform1| ------ - /// +--------------+ +-----------------+ | +----------------------+ +------------+ - /// | ... | ... | ... | ... | ---> |MergeSegmentsTransform| ---> |MutationSink| - /// +--------------+ +-----------------+ | +----------------------+ +------------+ - /// |CompactSourceN| ---> |CompactTransformN| ------ - /// +--------------+ +-----------------+ + /// +--------------+ + /// |CompactSource1| ------ + /// +--------------+ | +-----------------+ +------------+ + /// | ... | ... | ---> |CompactAggregator| ---> |MutationSink| + /// +--------------+ | +-----------------+ +------------+ + /// |CompactSourceN| ------ + /// +--------------+ async fn compact_blocks( &self, ctx: Arc, @@ -124,42 +121,59 @@ impl FuseTable { } let thresholds = self.get_block_compact_thresholds(); + let schema = self.schema(); + let write_settings = self.get_write_settings(); - let mut mutator = BlockCompactMutator::new(ctx.clone(), options, self.operator.clone()); - let need_compact = mutator.target_select().await?; - if !need_compact { + let mut mutator = + BlockCompactMutator::new(ctx.clone(), thresholds, options, self.operator.clone()); + mutator.target_select().await?; + if mutator.compact_tasks.is_empty() { return Ok(false); } + // Status. + { + let status = "compact: begin to run compact tasks"; + ctx.set_status_info(status); + info!(status); + } ctx.set_partitions(mutator.compact_tasks.clone())?; + let all_column_indices = self.all_column_indices(); + let projection = Projection::Columns(all_column_indices); + let block_reader = self.create_block_reader(projection, false, ctx.clone())?; let max_threads = ctx.get_settings().get_max_threads()? as usize; // Add source pipe. pipeline.add_source( - |output| CompactSource::try_create(ctx.clone(), output, thresholds), + |output| { + CompactSource::try_create( + ctx.clone(), + self.operator.clone(), + write_settings.clone(), + self.meta_location_generator().clone(), + schema.clone(), + block_reader.clone(), + output, + ) + }, max_threads, )?; - let all_column_indices = self.all_column_indices(); - let projection = Projection::Columns(all_column_indices); - let block_reader = self.create_block_reader(projection, false, ctx.clone())?; + pipeline.resize(1)?; pipeline.add_transform(|input, output| { - CompactTransform::try_create( - ctx.clone(), + let compact_aggregator = CompactAggregator::new( + self.operator.clone(), + self.meta_location_generator().clone(), + mutator.clone(), + ); + Ok(ProcessorPtr::create(AsyncAccumulatingTransformer::create( input, output, - ctx.get_scan_progress(), - block_reader.clone(), - self.meta_location_generator().clone(), - self.operator.clone(), - self.schema(), - thresholds, - self.get_write_settings(), - ) + compact_aggregator, + ))) })?; - self.try_add_merge_segments_transform(mutator.clone(), pipeline)?; pipeline.add_sink(|input| { MutationSink::try_create( self, @@ -171,39 +185,4 @@ impl FuseTable { Ok(true) } - - fn try_add_merge_segments_transform( - &self, - mutator: BlockCompactMutator, - pipeline: &mut Pipeline, - ) -> Result<()> { - if pipeline.is_empty() { - return Err(ErrorCode::Internal("The pipeline is empty.")); - } - - match pipeline.output_len() { - 0 => Err(ErrorCode::Internal("The output of the last pipe is 0.")), - last_pipe_size => { - let mut inputs_port = Vec::with_capacity(last_pipe_size); - for _ in 0..last_pipe_size { - inputs_port.push(InputPort::create()); - } - - let output_port = OutputPort::create(); - let processor = MergeSegmentsTransform::try_create( - mutator, - inputs_port.clone(), - output_port.clone(), - )?; - - pipeline.add_pipe(Pipe::create(inputs_port.len(), 1, vec![PipeItem::create( - processor, - inputs_port, - vec![output_port], - )])); - - Ok(()) - } - } - } } diff --git a/src/query/storages/fuse/src/operations/mutation/compact/block_compact_mutator.rs b/src/query/storages/fuse/src/operations/mutation/compact/block_compact_mutator.rs index 4b465ad7d6996..573bfbee90773 100644 --- a/src/query/storages/fuse/src/operations/mutation/compact/block_compact_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/compact/block_compact_mutator.rs @@ -1,48 +1,58 @@ -// Copyright 2022 Datafuse Labs. +// Copyright 2023 Datafuse Labs. // -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +use std::collections::BTreeMap; +use std::collections::HashMap; +use std::collections::HashSet; +use std::collections::VecDeque; use std::sync::Arc; +use std::time::Instant; use std::vec; use common_catalog::plan::Partitions; +use common_catalog::plan::PartitionsShuffleKind; use common_exception::Result; +use common_expression::BlockThresholds; +use common_expression::ColumnId; use opendal::Operator; +use storages_common_table_meta::meta::BlockMeta; use storages_common_table_meta::meta::Location; use storages_common_table_meta::meta::SegmentInfo; use storages_common_table_meta::meta::Statistics; +use tracing::info; -use super::compact_part::CompactPartInfo; use crate::io::SegmentsIO; +use crate::operations::merge_into::mutation_meta::mutation_log::BlockMetaIndex; +use crate::operations::mutation::CompactPartInfo; use crate::operations::CompactOptions; use crate::statistics::reducers::merge_statistics_mut; use crate::TableContext; -// BlockCompactMutator iterates through the segments and selects the segments -// that need to be block compacted, set block_per_seg as the threshold, -// select the segments whose block_count >= block_per_seg and block_count < 2*block_per_seg. #[derive(Clone)] pub struct BlockCompactMutator { pub ctx: Arc, + pub operator: Operator, + pub thresholds: BlockThresholds, pub compact_params: CompactOptions, - pub operator: Operator, + pub column_ids: HashSet, + // A set of Parts. pub compact_tasks: Partitions, - // The order of the unchanged segments in snapshot. - pub unchanged_segment_indices: Vec, + pub unchanged_blocks_map: HashMap>>, // locations all the unchanged segments. - pub unchanged_segment_locations: Vec, + pub unchanged_segments_map: BTreeMap, // summarised statistics of all the unchanged segments pub unchanged_segment_statistics: Statistics, } @@ -50,116 +60,222 @@ pub struct BlockCompactMutator { impl BlockCompactMutator { pub fn new( ctx: Arc, + thresholds: BlockThresholds, compact_params: CompactOptions, operator: Operator, ) -> Self { + let column_ids = compact_params.base_snapshot.schema.to_leaf_column_id_set(); Self { ctx, - compact_params, operator, - compact_tasks: Partitions::default(), - unchanged_segment_indices: Vec::new(), - unchanged_segment_locations: Vec::new(), + thresholds, + compact_params, + column_ids, + unchanged_blocks_map: HashMap::new(), + compact_tasks: Partitions::create_nolazy(PartitionsShuffleKind::Mod, vec![]), + unchanged_segments_map: BTreeMap::new(), unchanged_segment_statistics: Statistics::default(), } } - pub async fn target_select(&mut self) -> Result { + pub async fn target_select(&mut self) -> Result<()> { + let start = Instant::now(); let snapshot = self.compact_params.base_snapshot.clone(); let segment_locations = &snapshot.segments; - - let schema = Arc::new(self.compact_params.base_snapshot.schema.clone()); - // Read all segments information in parallel. - let segments_io = SegmentsIO::create(self.ctx.clone(), self.operator.clone(), schema); - let segments = segments_io - .read_segments(segment_locations) - .await? - .into_iter() - .collect::>>()?; - - let number_segments = segments.len(); + let number_segments = segment_locations.len(); let limit = self.compact_params.limit.unwrap_or(number_segments); - let blocks_per_seg = self.compact_params.block_per_seg as u64; - let mut order = 0; - let mut end = 0; + let mut segment_idx = 0; let mut compacted_segment_cnt = 0; + let mut checked_end_at = 0; + + // Status. + { + let status = "compact: begin to build compact tasks"; + self.ctx.set_status_info(status); + info!(status); + } - let mut builder = CompactPartBuilder::new(blocks_per_seg); - - for (idx, segment) in segments.iter().enumerate() { - let tasks = builder.add(segment.clone()); - for t in tasks { - if CompactPartBuilder::check_for_compact(&t) { - compacted_segment_cnt += t.len(); - self.compact_tasks - .partitions - .push(CompactPartInfo::create(t, order)); - } else { - self.unchanged_segment_locations - .push(segment_locations[idx].clone()); - self.unchanged_segment_indices.push(order); - merge_statistics_mut( - &mut self.unchanged_segment_statistics, - &segments[idx].summary, - )?; + let segments_io = SegmentsIO::create( + self.ctx.clone(), + self.operator.clone(), + Arc::new(self.compact_params.base_snapshot.schema.clone()), + ); + let mut checker = SegmentCompactChecker::new(self.compact_params.block_per_seg as u64); + let max_io_requests = self.ctx.get_settings().get_max_storage_io_requests()? as usize; + for chunk in segment_locations.chunks(max_io_requests) { + // Read the segments information in parallel. + let segment_infos = segments_io + .read_segments(chunk) + .await? + .into_iter() + .collect::>>()?; + + // Check the segment to be compacted. + // Size of compacted segment should be in range R == [threshold, 2 * threshold) + for (idx, segment) in segment_infos.iter().enumerate() { + let segments_vec = checker.add(segment.clone()); + for segments in segments_vec { + if SegmentCompactChecker::check_for_compact(&segments) { + compacted_segment_cnt += segments.len(); + // build the compact tasks. + self.build_compact_tasks(segments, segment_idx); + } else { + self.unchanged_segments_map + .insert(segment_idx, chunk[idx].clone()); + merge_statistics_mut( + &mut self.unchanged_segment_statistics, + &segment.summary, + )?; + } + segment_idx += 1; + } + checked_end_at += 1; + if compacted_segment_cnt + checker.segments.len() >= limit { + break; } - order += 1; } - end = idx + 1; - if compacted_segment_cnt + builder.segments.len() >= limit { - break; + + // Status. + { + let status = format!( + "compact: read segment files:{}/{}, cost:{} sec", + checked_end_at, + number_segments, + start.elapsed().as_secs() + ); + self.ctx.set_status_info(&status); + info!(status); } } - if !builder.segments.is_empty() { - let t = std::mem::take(&mut builder.segments); - if CompactPartBuilder::check_for_compact(&t) { - self.compact_tasks - .partitions - .push(CompactPartInfo::create(t, order)); + // finalize the compaction. + if !checker.segments.is_empty() { + let segments = std::mem::take(&mut checker.segments); + if SegmentCompactChecker::check_for_compact(&segments) { + compacted_segment_cnt += segments.len(); + self.build_compact_tasks(segments, segment_idx); } else { - self.unchanged_segment_locations - .push(segment_locations[end - 1].clone()); - self.unchanged_segment_indices.push(order); - merge_statistics_mut( - &mut self.unchanged_segment_statistics, - &segments[end - 1].summary, - )?; + self.unchanged_segments_map + .insert(segment_idx, segment_locations[checked_end_at - 1].clone()); + merge_statistics_mut(&mut self.unchanged_segment_statistics, &segments[0].summary)?; } - order += 1; + segment_idx += 1; } - if self.compact_tasks.partitions.is_empty() { - return Ok(false); + // combine with the unprocessed segments (which are outside of the limit). + if checked_end_at < number_segments { + for chunk in segment_locations[checked_end_at..].chunks(max_io_requests) { + let segment_infos = segments_io + .read_segments(chunk) + .await? + .into_iter() + .collect::>>()?; + + for (segment, location) in segment_infos.into_iter().zip(chunk.iter()) { + self.unchanged_segments_map + .insert(segment_idx, location.clone()); + merge_statistics_mut(&mut self.unchanged_segment_statistics, &segment.summary)?; + segment_idx += 1; + } + + checked_end_at += chunk.len(); + // Status. + { + let status = format!( + "compact: read segment files:{}/{}, cost:{} sec", + checked_end_at, + number_segments, + start.elapsed().as_secs() + ); + self.ctx.set_status_info(&status); + info!(status); + } + } + } + + // Status. + { + let status = format!( + "compact: end to build compact tasks:{}, segments to be compacted:{}, cost:{} sec", + self.compact_tasks.len(), + compacted_segment_cnt, + start.elapsed().as_secs() + ); + self.ctx.set_status_info(&status); + info!(status); + } + Ok(()) + } + + // Select the row_count >= min_rows_per_block or block_size >= max_bytes_per_block + // as the perfect_block condition(N for short). Gets a set of segments, iterates + // through the blocks, and finds the blocks >= N and blocks < 2N as a task. + fn build_compact_tasks(&mut self, segments: Vec>, segment_idx: usize) { + let mut builder = CompactTaskBuilder::new(self.column_ids.clone()); + let mut tasks = VecDeque::new(); + let mut block_idx = 0; + let mut unchanged_blocks = BTreeMap::new(); + // The order of the compact is from old to new. + for segment in segments.iter().rev() { + for block in segment.blocks.iter() { + let (unchanged, need_take) = builder.add(block, self.thresholds); + if need_take { + let blocks = builder.take_blocks(); + if blocks.len() == 1 && builder.check_column_ids(&blocks[0]) { + unchanged_blocks.insert(block_idx, blocks[0].clone()); + } else { + tasks.push_back((block_idx, blocks)); + } + block_idx += 1; + } + if unchanged { + unchanged_blocks.insert(block_idx, block.clone()); + block_idx += 1; + } + } } - if end < number_segments { - for i in end..number_segments { - self.unchanged_segment_locations - .push(segment_locations[i].clone()); - self.unchanged_segment_indices.push(order); - merge_statistics_mut(&mut self.unchanged_segment_statistics, &segments[i].summary)?; - order += 1; + if !builder.is_empty() { + let (index, mut blocks) = tasks.pop_back().unwrap_or( + unchanged_blocks + .pop_last() + .map_or((0, vec![]), |(k, v)| (k, vec![v])), + ); + blocks.extend(builder.take_blocks()); + if blocks.len() > 1 || builder.check_column_ids(&blocks[0]) { + tasks.push_back((index, blocks)); } } - Ok(true) + let mut partitions = tasks + .into_iter() + .map(|(block_idx, blocks)| { + CompactPartInfo::create(blocks, BlockMetaIndex { + segment_idx, + block_idx, + range: None, + }) + }) + .collect(); + self.compact_tasks.partitions.append(&mut partitions); + self.unchanged_blocks_map + .insert(segment_idx, unchanged_blocks); } } -#[derive(Default)] -struct CompactPartBuilder { +struct SegmentCompactChecker { segments: Vec>, - block_count: u64, + total_block_count: u64, threshold: u64, } -impl CompactPartBuilder { +impl SegmentCompactChecker { fn new(threshold: u64) -> Self { Self { threshold, - ..Default::default() + total_block_count: 0, + segments: vec![], } } @@ -170,14 +286,14 @@ impl CompactPartBuilder { } fn add(&mut self, segment: Arc) -> Vec>> { - self.block_count += segment.summary.block_count; - if self.block_count < self.threshold { + self.total_block_count += segment.summary.block_count; + if self.total_block_count < self.threshold { self.segments.push(segment); return vec![]; } - if self.block_count > 2 * self.threshold { - self.block_count = 0; + if self.total_block_count > 2 * self.threshold { + self.total_block_count = 0; let trivial = vec![segment]; if self.segments.is_empty() { return vec![trivial]; @@ -186,8 +302,72 @@ impl CompactPartBuilder { } } - self.block_count = 0; + self.total_block_count = 0; self.segments.push(segment); vec![std::mem::take(&mut self.segments)] } } + +struct CompactTaskBuilder { + column_ids: HashSet, + blocks: Vec>, + total_rows: usize, + total_size: usize, +} + +impl CompactTaskBuilder { + fn new(column_ids: HashSet) -> Self { + Self { + column_ids, + blocks: vec![], + total_rows: 0, + total_size: 0, + } + } + + fn is_empty(&self) -> bool { + self.blocks.is_empty() + } + + fn take_blocks(&mut self) -> Vec> { + self.total_rows = 0; + self.total_size = 0; + std::mem::take(&mut self.blocks) + } + + fn check_column_ids(&self, block: &Arc) -> bool { + let column_ids: HashSet = block.col_metas.keys().cloned().collect(); + self.column_ids == column_ids + } + + fn add(&mut self, block: &Arc, thresholds: BlockThresholds) -> (bool, bool) { + self.total_rows += block.row_count as usize; + self.total_size += block.block_size as usize; + + if !thresholds.check_large_enough(self.total_rows, self.total_size) { + // blocks < N + self.blocks.push(block.clone()); + return (false, false); + } + + if self.blocks.is_empty() { + if self.check_column_ids(block) { + self.total_rows = 0; + self.total_size = 0; + return (true, false); + } else { + self.blocks.push(block.clone()); + return (false, true); + } + } + + if thresholds.check_for_compact(self.total_rows, self.total_size) { + // N <= blocks < 2N + self.blocks.push(block.clone()); + (false, true) + } else { + // blocks > 2N + (true, true) + } + } +} diff --git a/src/query/storages/fuse/src/operations/mutation/compact/compact_meta.rs b/src/query/storages/fuse/src/operations/mutation/compact/compact_meta.rs index 5f7355e17dec2..f4ae918fd3f99 100644 --- a/src/query/storages/fuse/src/operations/mutation/compact/compact_meta.rs +++ b/src/query/storages/fuse/src/operations/mutation/compact/compact_meta.rs @@ -13,23 +13,19 @@ // limitations under the License. use std::any::Any; -use std::collections::VecDeque; use std::sync::Arc; -use common_exception::ErrorCode; -use common_exception::Result; use common_expression::BlockMetaInfo; use common_expression::BlockMetaInfoDowncast; use common_expression::BlockMetaInfoPtr; -use storages_common_table_meta::meta::SegmentInfo; +use storages_common_table_meta::meta::BlockMeta; -use super::compact_part::CompactTask; -use crate::operations::mutation::AbortOperation; +use crate::operations::merge_into::mutation_meta::mutation_log::BlockMetaIndex; #[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)] pub struct CompactSourceMeta { - pub order: usize, - pub tasks: VecDeque, + pub block: Arc, + pub index: BlockMetaIndex, } #[typetag::serde(name = "compact_source_meta")] @@ -51,67 +47,7 @@ impl BlockMetaInfo for CompactSourceMeta { } impl CompactSourceMeta { - pub fn create(order: usize, tasks: VecDeque) -> BlockMetaInfoPtr { - Box::new(CompactSourceMeta { order, tasks }) - } - - pub fn from_meta(info: &BlockMetaInfoPtr) -> Result<&CompactSourceMeta> { - match CompactSourceMeta::downcast_ref_from(info) { - Some(part_ref) => Ok(part_ref), - None => Err(ErrorCode::Internal( - "Cannot downcast from BlockMetaInfo to CompactSourceMeta.", - )), - } - } -} - -#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)] -pub struct CompactSinkMeta { - pub order: usize, - pub segment_location: String, - pub segment_info: Arc, - pub abort_operation: AbortOperation, -} - -#[typetag::serde(name = "compact_sink_meta")] -impl BlockMetaInfo for CompactSinkMeta { - fn as_any(&self) -> &dyn Any { - self - } - - fn equals(&self, info: &Box) -> bool { - match CompactSinkMeta::downcast_ref_from(info) { - None => false, - Some(other) => self == other, - } - } - - fn clone_self(&self) -> Box { - Box::new(self.clone()) - } -} - -impl CompactSinkMeta { - pub fn create( - order: usize, - segment_location: String, - segment_info: Arc, - abort_operation: AbortOperation, - ) -> BlockMetaInfoPtr { - Box::new(CompactSinkMeta { - order, - segment_location, - segment_info, - abort_operation, - }) - } - - pub fn from_meta(info: &BlockMetaInfoPtr) -> Result<&CompactSinkMeta> { - match CompactSinkMeta::downcast_ref_from(info) { - Some(part_ref) => Ok(part_ref), - None => Err(ErrorCode::Internal( - "Cannot downcast from BlockMetaInfo to CompactSinkMeta.", - )), - } + pub fn create(index: BlockMetaIndex, block: Arc) -> BlockMetaInfoPtr { + Box::new(CompactSourceMeta { index, block }) } } diff --git a/src/query/storages/fuse/src/operations/mutation/compact/compact_part.rs b/src/query/storages/fuse/src/operations/mutation/compact/compact_part.rs index 8346459e29ebc..032ae28f8b9a8 100644 --- a/src/query/storages/fuse/src/operations/mutation/compact/compact_part.rs +++ b/src/query/storages/fuse/src/operations/mutation/compact/compact_part.rs @@ -20,29 +20,13 @@ use common_catalog::plan::PartInfoPtr; use common_exception::ErrorCode; use common_exception::Result; use storages_common_table_meta::meta::BlockMeta; -use storages_common_table_meta::meta::SegmentInfo; -#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)] -pub enum CompactTask { - // Only one block, no need to do a compact. - Trivial(Arc), - // Multiple blocks, need to do compact. - Normal(Vec>), -} - -impl CompactTask { - pub fn get_block_metas(&self) -> Vec> { - match self { - CompactTask::Trivial(block_meta) => vec![block_meta.clone()], - CompactTask::Normal(block_metas) => block_metas.clone(), - } - } -} +use crate::operations::merge_into::mutation_meta::mutation_log::BlockMetaIndex; -#[derive(serde::Serialize, serde::Deserialize, PartialEq, Eq)] +#[derive(serde::Serialize, serde::Deserialize, PartialEq)] pub struct CompactPartInfo { - pub segments: Vec>, - pub order: usize, + pub blocks: Vec>, + pub index: BlockMetaIndex, } #[typetag::serde(name = "compact")] @@ -64,8 +48,8 @@ impl PartInfo for CompactPartInfo { } impl CompactPartInfo { - pub fn create(segments: Vec>, order: usize) -> PartInfoPtr { - Arc::new(Box::new(CompactPartInfo { segments, order })) + pub fn create(blocks: Vec>, index: BlockMetaIndex) -> PartInfoPtr { + Arc::new(Box::new(CompactPartInfo { blocks, index })) } pub fn from_part(info: &PartInfoPtr) -> Result<&CompactPartInfo> { diff --git a/src/query/storages/fuse/src/operations/mutation/compact/compact_source.rs b/src/query/storages/fuse/src/operations/mutation/compact/compact_source.rs index 2fc94e414686c..d28c01ca1d860 100644 --- a/src/query/storages/fuse/src/operations/mutation/compact/compact_source.rs +++ b/src/query/storages/fuse/src/operations/mutation/compact/compact_source.rs @@ -13,56 +13,70 @@ // limitations under the License. use std::any::Any; -use std::collections::VecDeque; use std::sync::Arc; +use std::time::Instant; -use common_catalog::plan::PartInfoPtr; +use common_base::base::Progress; +use common_base::base::ProgressValues; use common_catalog::table_context::TableContext; -use common_exception::ErrorCode; use common_exception::Result; -use common_expression::BlockThresholds; use common_expression::DataBlock; -use storages_common_table_meta::meta::BlockMeta; - -use super::compact_meta::CompactSourceMeta; -use super::compact_part::CompactPartInfo; -use super::compact_part::CompactTask; +use common_expression::TableSchema; +use common_pipeline_core::processors::processor::ProcessorPtr; +use opendal::Operator; + +use crate::io::write_data; +use crate::io::BlockBuilder; +use crate::io::BlockReader; +use crate::io::ReadSettings; +use crate::io::TableMetaLocationGenerator; +use crate::io::WriteSettings; +use crate::metrics::*; +use crate::operations::mutation::compact::CompactSourceMeta; +use crate::operations::mutation::CompactPartInfo; use crate::pipelines::processors::port::OutputPort; use crate::pipelines::processors::processor::Event; -use crate::pipelines::processors::processor::ProcessorPtr; use crate::pipelines::processors::Processor; -enum State { - ReadData(Option), - Generate { - order: usize, - tasks: VecDeque, - }, - Output(Option, DataBlock), - Finish, -} - -// Select the row_count >= min_rows_per_block or block_size >= max_bytes_per_block -// as the perfect_block condition(N for short). CompactSource gets a set of segments, -// iterates through the blocks, and finds the blocks >= N and blocks < 2N as a CompactTask. pub struct CompactSource { - state: State, ctx: Arc, + dal: Operator, + scan_progress: Arc, + + block_reader: Arc, + block_builder: BlockBuilder, + output: Arc, - thresholds: BlockThresholds, + output_data: Option, + finished: bool, } impl CompactSource { pub fn try_create( ctx: Arc, + dal: Operator, + write_settings: WriteSettings, + meta_locations: TableMetaLocationGenerator, + source_schema: Arc, + block_reader: Arc, output: Arc, - thresholds: BlockThresholds, ) -> Result { + let scan_progress = ctx.get_scan_progress(); + let block_builder = BlockBuilder { + ctx: ctx.clone(), + meta_locations, + source_schema, + write_settings, + }; Ok(ProcessorPtr::create(Box::new(CompactSource { - state: State::ReadData(None), ctx, + dal, + scan_progress, + block_reader, + block_builder, output, - thresholds, + output_data: None, + finished: false, }))) } } @@ -78,19 +92,15 @@ impl Processor for CompactSource { } fn event(&mut self) -> Result { - if matches!(self.state, State::ReadData(None)) { - self.state = match self.ctx.get_partition() { - None => State::Finish, - Some(part) => State::ReadData(Some(part)), - } - } - - if matches!(self.state, State::Finish) { + if self.finished { self.output.finish(); return Ok(Event::Finished); } if self.output.is_finished() { + if !self.finished { + return Ok(Event::Async); + } return Ok(Event::Finished); } @@ -98,108 +108,95 @@ impl Processor for CompactSource { return Ok(Event::NeedConsume); } - if matches!(self.state, State::Output(_, _)) { - if let State::Output(part, data_block) = - std::mem::replace(&mut self.state, State::Finish) - { - self.state = match part { - None => State::Finish, - Some(part) => State::ReadData(Some(part)), - }; - - self.output.push_data(Ok(data_block)); - return Ok(Event::NeedConsume); - } + if let Some(block) = self.output_data.take() { + self.output.push_data(Ok(block)); } - Ok(Event::Sync) + + Ok(Event::Async) } - fn process(&mut self) -> Result<()> { - match std::mem::replace(&mut self.state, State::Finish) { - State::ReadData(Some(part)) => { + async fn async_process(&mut self) -> Result<()> { + match self.ctx.get_partition() { + Some(part) => { + let block_reader = self.block_reader.as_ref(); + let block_builder = self.block_builder.clone(); + + // block read tasks. + let mut task_futures = Vec::new(); let part = CompactPartInfo::from_part(&part)?; - let mut builder = CompactTaskBuilder::default(); - let mut tasks = VecDeque::new(); - // The order of the compact is from old to new. - for segment in part.segments.iter().rev() { - for block in segment.blocks.iter() { - let res = builder.add(block, self.thresholds); - tasks.extend(res); - } - } - if !builder.is_empty() { - let task = tasks.pop_back(); - tasks.push_back(builder.finalize(task)); + let mut stats = Vec::with_capacity(part.blocks.len()); + for block in &part.blocks { + let progress_values = ProgressValues { + rows: block.row_count as usize, + bytes: block.block_size as usize, + }; + self.scan_progress.incr(&progress_values); + + stats.push(block.col_stats.clone()); + + let settings = ReadSettings::from_ctx(&self.ctx)?; + let storage_format = block_builder.write_settings.storage_format; + // read block in parallel. + task_futures.push(async move { + // Perf + { + metrics_inc_compact_block_read_nums(1); + metrics_inc_compact_block_read_bytes(block.block_size); + } + + block_reader + .read_by_meta(&settings, block.as_ref(), &storage_format) + .await + }); } - self.state = State::Generate { - order: part.order, - tasks, + + let start = Instant::now(); + + let blocks = futures::future::try_join_all(task_futures).await?; + // Perf. + { + metrics_inc_compact_block_read_milliseconds(start.elapsed().as_millis() as u64); } - } - State::Generate { order, tasks } => { - let meta = CompactSourceMeta::create(order, tasks); - let new_part = self.ctx.get_partition(); - self.state = State::Output(new_part, DataBlock::empty_with_meta(meta)); - } - _ => return Err(ErrorCode::Internal("It's a bug.")), - } - Ok(()) - } -} -#[derive(Default)] -struct CompactTaskBuilder { - blocks: Vec>, - total_rows: usize, - total_size: usize, -} + // concat blocks. + let new_block = DataBlock::concat(&blocks)?; + // build block serialization. + let serialized = tokio_rayon::spawn(move || block_builder.build(new_block)).await?; -impl CompactTaskBuilder { - fn is_empty(&self) -> bool { - self.blocks.is_empty() - } + let start = Instant::now(); - fn add(&mut self, block: &Arc, thresholds: BlockThresholds) -> Vec { - self.total_rows += block.row_count as usize; - self.total_size += block.block_size as usize; + // Perf. + { + metrics_inc_compact_block_write_nums(1); + metrics_inc_compact_block_write_bytes(serialized.block_raw_data.len() as u64); + } - if !thresholds.check_large_enough(self.total_rows, self.total_size) { - // blocks < N - self.blocks.push(block.clone()); - return vec![]; - } + // write block data. + write_data( + serialized.block_raw_data, + &self.dal, + &serialized.block_meta.location.0, + ) + .await?; + + // write index data. + if let Some(index_state) = serialized.bloom_index_state { + write_data(index_state.data, &self.dal, &index_state.location.0).await?; + } - let tasks = if !thresholds.check_for_compact(self.total_rows, self.total_size) { - // blocks > 2N - let trivial_task = CompactTask::Trivial(block.clone()); - if !self.blocks.is_empty() { - let compact_task = Self::create_task(std::mem::take(&mut self.blocks)); - vec![compact_task, trivial_task] - } else { - vec![trivial_task] + // Perf + { + metrics_inc_compact_block_write_milliseconds(start.elapsed().as_millis() as u64); + } + + self.output_data = Some(DataBlock::empty_with_meta(CompactSourceMeta::create( + part.index.clone(), + serialized.block_meta.into(), + ))); } - } else { - // N <= blocks < 2N - self.blocks.push(block.clone()); - vec![Self::create_task(std::mem::take(&mut self.blocks))] + None => self.finished = true, }; - self.total_rows = 0; - self.total_size = 0; - tasks - } - - fn finalize(&mut self, task: Option) -> CompactTask { - let mut blocks = task.map_or(vec![], |t| t.get_block_metas()); - blocks.extend(std::mem::take(&mut self.blocks)); - Self::create_task(blocks) - } - - fn create_task(blocks: Vec>) -> CompactTask { - match blocks.len() { - 0 => panic!("the blocks is empty"), - 1 => CompactTask::Trivial(blocks[0].clone()), - _ => CompactTask::Normal(blocks), - } + Ok(()) } } diff --git a/src/query/storages/fuse/src/operations/mutation/compact/compact_transform.rs b/src/query/storages/fuse/src/operations/mutation/compact/compact_transform.rs deleted file mode 100644 index fb6aa25119516..0000000000000 --- a/src/query/storages/fuse/src/operations/mutation/compact/compact_transform.rs +++ /dev/null @@ -1,456 +0,0 @@ -// Copyright 2022 Datafuse Labs. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::any::Any; -use std::collections::VecDeque; -use std::sync::Arc; -use std::time::Instant; - -use common_base::base::Progress; -use common_base::base::ProgressValues; -use common_catalog::table_context::TableContext; -use common_exception::ErrorCode; -use common_exception::Result; -use common_expression::BlockThresholds; -use common_expression::DataBlock; -use common_expression::TableSchemaRef; -use common_io::constants::DEFAULT_BLOCK_BUFFER_SIZE; -use opendal::Operator; -use storages_common_blocks::blocks_to_parquet; -use storages_common_cache::CacheAccessor; -use storages_common_cache_manager::CacheManager; -use storages_common_index::BloomIndex; -use storages_common_table_meta::meta::BlockMeta; -use storages_common_table_meta::meta::SegmentInfo; -use storages_common_table_meta::meta::StatisticsOfColumns; -use storages_common_table_meta::table::TableCompression; - -use super::compact_meta::CompactSourceMeta; -use super::compact_part::CompactTask; -use super::CompactSinkMeta; -use crate::io; -use crate::io::write_data; -use crate::io::BlockReader; -use crate::io::ReadSettings; -use crate::io::TableMetaLocationGenerator; -use crate::io::WriteSettings; -use crate::metrics::*; -use crate::operations::mutation::AbortOperation; -use crate::operations::mutation::SerializeState; -use crate::pipelines::processors::port::InputPort; -use crate::pipelines::processors::port::OutputPort; -use crate::pipelines::processors::processor::Event; -use crate::pipelines::processors::processor::ProcessorPtr; -use crate::pipelines::processors::Processor; -use crate::statistics::gen_col_stats_lite; -use crate::statistics::reduce_block_statistics; -use crate::statistics::reducers::reduce_block_metas; - -enum State { - Consume, - ReadBlocks, - CompactBlocks { - blocks: Vec, - stats_of_columns: Vec>, - trivals: VecDeque>, - }, - SerializedBlocks(Vec), - GenerateSegment, - SerializedSegment { - data: Vec, - location: String, - segment: Arc, - }, - Output { - location: String, - segment: Arc, - }, -} - -// Gets a set of CompactTask, only merge but not split, generate a new segment. -pub struct CompactTransform { - ctx: Arc, - state: State, - input: Arc, - output: Arc, - scan_progress: Arc, - output_data: Option, - - block_reader: Arc, - location_gen: TableMetaLocationGenerator, - dal: Operator, - schema: TableSchemaRef, - - // Limit the memory size of the block read. - max_memory: u64, - max_io_requests: usize, - compact_tasks: VecDeque, - block_metas: Vec>, - order: usize, - thresholds: BlockThresholds, - write_settings: WriteSettings, - abort_operation: AbortOperation, -} - -impl CompactTransform { - #[allow(clippy::too_many_arguments)] - pub fn try_create( - ctx: Arc, - input: Arc, - output: Arc, - scan_progress: Arc, - block_reader: Arc, - location_gen: TableMetaLocationGenerator, - dal: Operator, - schema: TableSchemaRef, - thresholds: BlockThresholds, - write_settings: WriteSettings, - ) -> Result { - let settings = ctx.get_settings(); - let max_memory_usage = (settings.get_max_memory_usage()? as f64 * 0.8) as u64; - let max_threads = settings.get_max_threads()?; - let max_memory = max_memory_usage / max_threads; - let max_io_requests = settings.get_max_storage_io_requests()? as usize; - Ok(ProcessorPtr::create(Box::new(CompactTransform { - ctx, - state: State::Consume, - input, - output, - scan_progress, - output_data: None, - block_reader, - location_gen, - dal, - schema, - max_memory, - max_io_requests, - compact_tasks: VecDeque::new(), - block_metas: Vec::new(), - order: 0, - thresholds, - write_settings, - abort_operation: AbortOperation::default(), - }))) - } -} - -#[async_trait::async_trait] -impl Processor for CompactTransform { - fn name(&self) -> String { - "CompactTransform".to_string() - } - - fn as_any(&mut self) -> &mut dyn Any { - self - } - - fn event(&mut self) -> Result { - if matches!( - &self.state, - State::CompactBlocks { .. } | State::GenerateSegment { .. } | State::Output { .. } - ) { - return Ok(Event::Sync); - } - - if matches!( - &self.state, - State::ReadBlocks | State::SerializedBlocks(_) | State::SerializedSegment { .. } - ) { - return Ok(Event::Async); - } - - if self.output.is_finished() { - self.input.finish(); - return Ok(Event::Finished); - } - - if !self.output.can_push() { - self.input.set_not_need_data(); - return Ok(Event::NeedConsume); - } - - if let Some(data_block) = self.output_data.take() { - self.output.push_data(Ok(data_block)); - return Ok(Event::NeedConsume); - } - - if self.input.is_finished() { - self.output.finish(); - return Ok(Event::Finished); - } - - if !self.input.has_data() { - self.input.set_need_data(); - return Ok(Event::NeedData); - } - - let input_data = self.input.pull_data().unwrap()?; - let meta = input_data - .get_meta() - .ok_or_else(|| ErrorCode::Internal("No block meta. It's a bug"))?; - let task_meta = CompactSourceMeta::from_meta(meta)?; - self.order = task_meta.order; - self.compact_tasks = task_meta.tasks.clone(); - - self.state = State::ReadBlocks; - Ok(Event::Async) - } - - fn process(&mut self) -> Result<()> { - match std::mem::replace(&mut self.state, State::Consume) { - State::CompactBlocks { - mut blocks, - stats_of_columns, - mut trivals, - } => { - let mut serialize_states = Vec::new(); - for stats in stats_of_columns { - let block_num = stats.len(); - if block_num == 0 { - self.block_metas.push(trivals.pop_front().unwrap()); - continue; - } - - // concat blocks. - let compact_blocks: Vec<_> = blocks.drain(0..block_num).collect(); - let new_block = DataBlock::concat(&compact_blocks)?; - - let col_stats_lites = gen_col_stats_lite( - &new_block, - self.block_reader.schema().fields(), - &self.block_reader.default_vals, - )?; - // generate block statistics. - let col_stats = reduce_block_statistics(&stats, Some(&col_stats_lites))?; - let row_count = new_block.num_rows() as u64; - let block_size = new_block.memory_size() as u64; - let (block_location, block_id) = self.location_gen.gen_block_location(); - - // build block index. - let func_ctx = self.ctx.get_function_context()?; - let maybe_bloom_index = BloomIndex::try_create( - func_ctx, - self.schema.clone(), - block_location.1, - &[&new_block], - )?; - - let (index_data, index_size, index_location) = match maybe_bloom_index { - Some(bloom_index) => { - // write index - let index_block = bloom_index.serialize_to_data_block()?; - let index_block_schema = &bloom_index.filter_schema; - let location = self.location_gen.block_bloom_index_location(&block_id); - let mut data = Vec::with_capacity(100 * 1024); - let (size, _) = blocks_to_parquet( - index_block_schema, - vec![index_block], - &mut data, - TableCompression::None, - )?; - (Some(data), size, Some(location)) - } - None => (None, 0u64, None), - }; - - // serialize data block. - let mut block_data = Vec::with_capacity(DEFAULT_BLOCK_BUFFER_SIZE); - let (file_size, col_metas) = io::serialize_block( - &self.write_settings, - &self.schema, - new_block, - &mut block_data, - )?; - - // new block meta. - let new_meta = BlockMeta::new( - row_count, - block_size, - file_size, - col_stats, - col_metas, - None, - block_location.clone(), - index_location.clone(), - index_size, - self.write_settings.table_compression.into(), - ); - self.abort_operation.add_block(&new_meta); - self.block_metas.push(Arc::new(new_meta)); - - serialize_states.push(SerializeState { - block_data, - block_location: block_location.0, - index_data, - index_location: index_location.map(|l| l.0), - }); - } - self.state = State::SerializedBlocks(serialize_states); - } - State::GenerateSegment => { - let metas = std::mem::take(&mut self.block_metas); - let stats = reduce_block_metas(&metas, self.thresholds)?; - let segment_info = SegmentInfo::new(metas, stats); - let location = self.location_gen.gen_segment_info_location(); - self.abort_operation.add_segment(location.clone()); - self.state = State::SerializedSegment { - data: serde_json::to_vec(&segment_info)?, - location, - segment: Arc::new(segment_info), - }; - } - State::Output { location, segment } => { - if let Some(segment_cache) = CacheManager::instance().get_table_segment_cache() { - segment_cache.put(location.clone(), segment.clone()) - } - - let meta = CompactSinkMeta::create( - self.order, - location, - segment, - std::mem::take(&mut self.abort_operation), - ); - self.output_data = Some(DataBlock::empty_with_meta(meta)); - } - _ => return Err(ErrorCode::Internal("It's a bug.")), - } - - Ok(()) - } - - async fn async_process(&mut self) -> Result<()> { - match std::mem::replace(&mut self.state, State::Consume) { - State::ReadBlocks => { - // block read tasks. - let mut task_futures = Vec::new(); - // The no need compact blockmetas. - let mut trivals = VecDeque::new(); - let mut memory_usage = 0; - let mut stats_of_columns = Vec::new(); - - let block_reader = self.block_reader.as_ref(); - while let Some(task) = self.compact_tasks.pop_front() { - let metas = task.get_block_metas(); - // Only one block, no need to do a compact. - if metas.len() == 1 { - stats_of_columns.push(vec![]); - trivals.push_back(metas[0].clone()); - continue; - } - - memory_usage += metas.iter().fold(0, |acc, meta| { - let memory = meta.bloom_filter_index_size + meta.block_size; - acc + memory - }); - - if (memory_usage > self.max_memory - || task_futures.len() + metas.len() > self.max_io_requests) - && !task_futures.is_empty() - { - self.compact_tasks.push_front(task); - break; - } - - let mut meta_stats = Vec::with_capacity(metas.len()); - for meta in metas { - let progress_values = ProgressValues { - rows: meta.row_count as usize, - bytes: meta.block_size as usize, - }; - self.scan_progress.incr(&progress_values); - - meta_stats.push(meta.col_stats.clone()); - let settings = ReadSettings::from_ctx(&self.ctx)?; - let storage_format = self.write_settings.storage_format; - // read block in parallel. - task_futures.push(async move { - // Perf - { - metrics_inc_compact_block_read_nums(1); - metrics_inc_compact_block_read_bytes(meta.block_size); - } - - block_reader - .read_by_meta(&settings, meta.as_ref(), &storage_format) - .await - }); - } - stats_of_columns.push(meta_stats); - } - - let start = Instant::now(); - - let blocks = futures::future::try_join_all(task_futures).await?; - - // Perf. - { - metrics_inc_compact_block_read_milliseconds(start.elapsed().as_millis() as u64); - } - - self.state = State::CompactBlocks { - blocks, - stats_of_columns, - trivals, - } - } - State::SerializedBlocks(mut serialize_states) => { - let mut handles = Vec::with_capacity(serialize_states.len()); - let dal = &self.dal; - while let Some(state) = serialize_states.pop() { - handles.push(async move { - // Perf. - { - metrics_inc_compact_block_write_nums(1); - metrics_inc_compact_block_write_bytes(state.block_data.len() as u64); - } - - // write index data. - if let (Some(index_data), Some(index_location)) = - (state.index_data, state.index_location) - { - write_data(index_data, dal, &index_location).await?; - } - // write block data. - write_data(state.block_data, dal, &state.block_location).await - }); - } - - let start = Instant::now(); - - futures::future::try_join_all(handles).await?; - - // Perf - { - metrics_inc_compact_block_write_milliseconds(start.elapsed().as_millis() as u64); - } - - if self.compact_tasks.is_empty() { - self.state = State::GenerateSegment; - } else { - self.state = State::ReadBlocks; - } - } - State::SerializedSegment { - data, - location, - segment, - } => { - self.dal.write(&location, data).await?; - self.state = State::Output { location, segment }; - } - _ => return Err(ErrorCode::Internal("It's a bug.")), - } - Ok(()) - } -} diff --git a/src/query/storages/fuse/src/operations/mutation/compact/merge_segments_transform.rs b/src/query/storages/fuse/src/operations/mutation/compact/merge_segments_transform.rs deleted file mode 100644 index 71090b708d04f..0000000000000 --- a/src/query/storages/fuse/src/operations/mutation/compact/merge_segments_transform.rs +++ /dev/null @@ -1,186 +0,0 @@ -// Copyright 2022 Datafuse Labs. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::any::Any; -use std::sync::Arc; - -use common_exception::ErrorCode; -use common_exception::Result; -use common_expression::DataBlock; -use itertools::Itertools; -use storages_common_table_meta::meta::Location; -use storages_common_table_meta::meta::SegmentInfo; -use storages_common_table_meta::meta::Statistics; -use storages_common_table_meta::meta::Versioned; - -use crate::operations::mutation::compact::CompactSinkMeta; -use crate::operations::mutation::AbortOperation; -use crate::operations::mutation::BlockCompactMutator; -use crate::operations::mutation::MutationSinkMeta; -use crate::pipelines::processors::port::InputPort; -use crate::pipelines::processors::port::OutputPort; -use crate::pipelines::processors::processor::Event; -use crate::pipelines::processors::processor::ProcessorPtr; -use crate::pipelines::processors::Processor; -use crate::statistics::reducers::merge_statistics_mut; - -enum State { - Consume, - Merge(DataBlock), - Output, -} - -pub struct MergeSegmentsTransform { - state: State, - inputs: Vec>, - output: Arc, - cur_input_index: usize, - output_data: Option, - - // The order of the merged segments in snapshot. - pub merged_indices: Vec, - // locations all the merged segments. - pub merged_segments: Vec, - // summarised statistics of all the merged segments - pub merged_statistics: Statistics, - abort_operation: AbortOperation, -} - -impl MergeSegmentsTransform { - pub fn try_create( - mutator: BlockCompactMutator, - inputs: Vec>, - output: Arc, - ) -> Result { - Ok(ProcessorPtr::create(Box::new(MergeSegmentsTransform { - state: State::Consume, - inputs, - output, - cur_input_index: 0, - output_data: None, - merged_indices: mutator.unchanged_segment_indices, - merged_segments: mutator.unchanged_segment_locations, - merged_statistics: mutator.unchanged_segment_statistics, - abort_operation: AbortOperation::default(), - }))) - } - - fn get_current_input(&mut self) -> Option> { - let mut finished = true; - let mut index = self.cur_input_index; - - loop { - let input = &self.inputs[index]; - - if !input.is_finished() { - finished = false; - input.set_need_data(); - - if input.has_data() { - self.cur_input_index = index; - return Some(input.clone()); - } - } - - index += 1; - if index == self.inputs.len() { - index = 0; - } - - if index == self.cur_input_index { - return match finished { - true => Some(input.clone()), - false => None, - }; - } - } - } -} - -#[async_trait::async_trait] -impl Processor for MergeSegmentsTransform { - fn name(&self) -> String { - "MergeSegmentsTransform".to_string() - } - - fn as_any(&mut self) -> &mut dyn Any { - self - } - - fn event(&mut self) -> Result { - if self.output.is_finished() { - for input in &self.inputs { - input.finish(); - } - return Ok(Event::Finished); - } - - if !self.output.can_push() { - return Ok(Event::NeedConsume); - } - - if let Some(data_block) = self.output_data.take() { - self.output.push_data(Ok(data_block)); - return Ok(Event::NeedConsume); - } - - let current_input = self.get_current_input(); - if let Some(cur_input) = current_input { - if cur_input.is_finished() { - self.state = State::Output; - } else { - self.state = State::Merge(cur_input.pull_data().unwrap()?); - cur_input.set_need_data(); - } - - return Ok(Event::Sync); - } - Ok(Event::NeedData) - } - - fn process(&mut self) -> Result<()> { - match std::mem::replace(&mut self.state, State::Consume) { - State::Merge(input) => { - let meta = CompactSinkMeta::from_meta( - input - .get_meta() - .ok_or_else(|| ErrorCode::Internal("No block meta. It's a bug"))?, - )?; - self.abort_operation.merge(&meta.abort_operation); - self.merged_segments - .push((meta.segment_location.clone(), SegmentInfo::VERSION)); - self.merged_indices.push(meta.order); - merge_statistics_mut(&mut self.merged_statistics, &meta.segment_info.summary)?; - } - State::Output => { - let mut merged_segments = std::mem::take(&mut self.merged_segments); - let merged_indices = std::mem::take(&mut self.merged_indices); - merged_segments = merged_segments - .into_iter() - .zip(merged_indices.iter()) - .sorted_by_key(|&(_, r)| *r) - .map(|(l, _)| l) - .collect(); - let meta = MutationSinkMeta::create( - merged_segments, - std::mem::take(&mut self.merged_statistics), - std::mem::take(&mut self.abort_operation), - ); - self.output_data = Some(DataBlock::empty_with_meta(meta)); - } - _ => return Err(ErrorCode::Internal("It's a bug.")), - } - Ok(()) - } -} diff --git a/src/query/storages/fuse/src/operations/mutation/compact/mod.rs b/src/query/storages/fuse/src/operations/mutation/compact/mod.rs index 092ca2e8f1f41..3ef4753cf6457 100644 --- a/src/query/storages/fuse/src/operations/mutation/compact/mod.rs +++ b/src/query/storages/fuse/src/operations/mutation/compact/mod.rs @@ -16,15 +16,14 @@ mod block_compact_mutator; mod compact_meta; mod compact_part; mod compact_source; -mod compact_transform; -mod merge_segments_transform; mod segment_compact_mutator; +mod transform_compact_aggregator; pub use block_compact_mutator::BlockCompactMutator; -pub use compact_meta::CompactSinkMeta; +pub use compact_meta::CompactSourceMeta; +pub use compact_part::CompactPartInfo; pub use compact_source::CompactSource; -pub use compact_transform::CompactTransform; -pub use merge_segments_transform::MergeSegmentsTransform; pub use segment_compact_mutator::SegmentCompactMutator; pub use segment_compact_mutator::SegmentCompactionState; pub use segment_compact_mutator::SegmentCompactor; +pub use transform_compact_aggregator::CompactAggregator; diff --git a/src/query/storages/fuse/src/operations/mutation/compact/segment_compact_mutator.rs b/src/query/storages/fuse/src/operations/mutation/compact/segment_compact_mutator.rs index b3712695ffbc2..380838e109af9 100644 --- a/src/query/storages/fuse/src/operations/mutation/compact/segment_compact_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/compact/segment_compact_mutator.rs @@ -22,6 +22,7 @@ use opendal::Operator; use storages_common_table_meta::meta::Location; use storages_common_table_meta::meta::SegmentInfo; use storages_common_table_meta::meta::Statistics; +use tracing::info; use crate::io::SegmentWriter; use crate::io::SegmentsIO; @@ -79,34 +80,37 @@ impl TableMutator for SegmentCompactMutator { async fn target_select(&mut self) -> Result { let select_begin = Instant::now(); - let base_segment_locations = &self.compact_params.base_snapshot.segments; + let mut base_segment_locations = self.compact_params.base_snapshot.segments.clone(); if base_segment_locations.len() <= 1 { // no need to compact return Ok(false); } - - let schema = Arc::new(self.compact_params.base_snapshot.schema.clone()); - // 1. read all the segments - let fuse_segment_io = - SegmentsIO::create(self.ctx.clone(), self.data_accessor.clone(), schema); - let base_segments = fuse_segment_io - .read_segments(base_segment_locations) - .await? - .into_iter() - .collect::>>()?; + // traverse the segment in reversed order, so that newly created unmergeable fragmented segment + // will be left at the "top", and likely to be merged in the next compaction; instead of leaving + // an unmergeable fragmented segment in the middle. + base_segment_locations.reverse(); // need at lease 2 segments to make sense - let num_segments = base_segments.len(); + let num_segments = base_segment_locations.len(); let limit = std::cmp::max(2, self.compact_params.limit.unwrap_or(num_segments)); - // 2. prepare compactor + // prepare compactor + let schema = Arc::new(self.compact_params.base_snapshot.schema.clone()); + let fuse_segment_io = + SegmentsIO::create(self.ctx.clone(), self.data_accessor.clone(), schema); let segment_writer = SegmentWriter::new(&self.data_accessor, &self.location_generator); - - let compactor = - SegmentCompactor::new(self.compact_params.block_per_seg as u64, segment_writer); + let max_io_requests = self.ctx.get_settings().get_max_storage_io_requests()? as usize; + let compactor = SegmentCompactor::new( + self.compact_params.block_per_seg as u64, + max_io_requests, + &fuse_segment_io, + segment_writer, + ); self.compaction = compactor - .compact(&base_segments, base_segment_locations, limit) + .compact(base_segment_locations, limit, |status| { + self.ctx.set_status_info(&status); + }) .await?; gauge!( @@ -156,53 +160,79 @@ pub struct SegmentCompactor<'a> { // within R, smaller one is preferred threshold: u64, // fragmented segment collected so far, it will be reset to empty if compaction occurs - fragmented_segments: Vec<(&'a SegmentInfo, Location)>, + fragmented_segments: Vec<(Arc, Location)>, // state which keep the number of blocks of all the fragmented segment collected so far, // it will be reset to 0 if compaction occurs accumulated_num_blocks: u64, + chunk_size: usize, + segment_reader: &'a SegmentsIO, segment_writer: SegmentWriter<'a>, // accumulated compaction state compacted_state: SegmentCompactionState, } impl<'a> SegmentCompactor<'a> { - pub fn new(threshold: u64, segment_writer: SegmentWriter<'a>) -> Self { + pub fn new( + threshold: u64, + chunk_size: usize, + segment_reader: &'a SegmentsIO, + segment_writer: SegmentWriter<'a>, + ) -> Self { Self { threshold, accumulated_num_blocks: 0, fragmented_segments: vec![], + chunk_size, + segment_reader, segment_writer, compacted_state: Default::default(), } } - pub async fn compact( + pub async fn compact( mut self, - segments: &'a [Arc], - locations: &'a [Location], + reverse_locations: Vec, limit: usize, - ) -> Result { + status_callback: T, + ) -> Result + where + T: Fn(String), + { + let start = Instant::now(); + let number_segments = reverse_locations.len(); // 1. feed segments into accumulator, taking limit into account - // - // traverse the segment in reversed order, so that newly created unmergeable fragmented segment - // will be left at the "top", and likely to be merged in the next compaction; instead of leaving - // an unmergeable fragmented segment in the middle. - let num_segments = segments.len(); - let mut compact_end_at = 0; - for (idx, (segment, location)) in segments - .iter() - .rev() - .zip(locations.iter().rev()) - .enumerate() - { - self.add(segment, location.clone()).await?; - let compacted = self.num_fragments_compacted(); - compact_end_at = idx; - if compacted >= limit { - // break if number of compacted segments reach the limit - // note that during the finalization of compaction, there might be some extra - // fragmented segments also need to be compacted, we just let it go - break; + let segments_io = self.segment_reader; + let chunk_size = self.chunk_size; + let mut checked_end_at = 0; + for chunk in reverse_locations.chunks(chunk_size) { + let segment_infos = segments_io + .read_segments(chunk) + .await? + .into_iter() + .collect::>>()?; + + for (segment, location) in segment_infos.into_iter().zip(chunk.iter()) { + self.add(segment, location.clone()).await?; + let compacted = self.num_fragments_compacted(); + checked_end_at += 1; + if compacted >= limit { + // break if number of compacted segments reach the limit + // note that during the finalization of compaction, there might be some extra + // fragmented segments also need to be compacted, we just let it go + break; + } + } + + // Status. + { + let status = format!( + "compact segment: read segment files:{}/{}, cost:{} sec", + checked_end_at, + number_segments, + start.elapsed().as_secs() + ); + info!(status); + (status_callback)(status); } } let mut compaction = self.finalize().await?; @@ -212,12 +242,30 @@ impl<'a> SegmentCompactor<'a> { if fragments_compacted { // if some compaction occurred, the reminders // which are outside of the limit should also be collected - let range = 0..num_segments - compact_end_at - 1; - let segment_slice = segments[range.clone()].iter().rev(); - let location_slice = locations[range].iter().rev(); - for (segment, location) in segment_slice.zip(location_slice) { - compaction.segments_locations.push(location.clone()); - merge_statistics_mut(&mut compaction.statistics, &segment.summary)?; + for chunk in reverse_locations[checked_end_at..].chunks(chunk_size) { + let segment_infos = segments_io + .read_segments(chunk) + .await? + .into_iter() + .collect::>>()?; + + for (segment, location) in segment_infos.into_iter().zip(chunk.iter()) { + compaction.segments_locations.push(location.clone()); + merge_statistics_mut(&mut compaction.statistics, &segment.summary)?; + } + + checked_end_at += chunk.len(); + // Status. + { + let status = format!( + "compact segment: read segment files:{}/{}, cost:{} sec", + checked_end_at, + number_segments, + start.elapsed().as_secs() + ); + info!(status); + (status_callback)(status); + } } } // reverse the segments back @@ -227,7 +275,7 @@ impl<'a> SegmentCompactor<'a> { } // accumulate one segment - pub async fn add(&mut self, segment_info: &'a SegmentInfo, location: Location) -> Result<()> { + pub async fn add(&mut self, segment_info: Arc, location: Location) -> Result<()> { let num_blocks_current_segment = segment_info.blocks.len() as u64; if num_blocks_current_segment == 0 { @@ -264,7 +312,7 @@ impl<'a> SegmentCompactor<'a> { } // 1. take the fragments and reset - let mut fragments = std::mem::take(&mut self.fragmented_segments); + let fragments = std::mem::take(&mut self.fragmented_segments); self.accumulated_num_blocks = 0; // check if only one fragment left @@ -283,10 +331,6 @@ impl<'a> SegmentCompactor<'a> { let mut blocks = Vec::with_capacity(self.threshold as usize); let mut new_statistics = Statistics::default(); - // since the fragmented segments are traversed in reversed order, before merge the blocks into new - // segments, reverse them back. - fragments.reverse(); - self.compacted_state.num_fragments_compacted += fragments.len(); for (segment, _location) in fragments { merge_statistics_mut(&mut new_statistics, &segment.summary)?; diff --git a/src/query/storages/fuse/src/operations/mutation/compact/transform_compact_aggregator.rs b/src/query/storages/fuse/src/operations/mutation/compact/transform_compact_aggregator.rs new file mode 100644 index 0000000000000..5e07411f1e00e --- /dev/null +++ b/src/query/storages/fuse/src/operations/mutation/compact/transform_compact_aggregator.rs @@ -0,0 +1,209 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeMap; +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Instant; + +use common_base::runtime::execute_futures_in_parallel; +use common_catalog::table_context::TableContext; +use common_exception::Result; +use common_expression::BlockMetaInfoDowncast; +use common_expression::BlockThresholds; +use common_expression::DataBlock; +use common_pipeline_transforms::processors::transforms::AsyncAccumulatingTransform; +use opendal::Operator; +use storages_common_cache::CacheAccessor; +use storages_common_cache_manager::CacheManager; +use storages_common_table_meta::meta::BlockMeta; +use storages_common_table_meta::meta::Location; +use storages_common_table_meta::meta::SegmentInfo; +use storages_common_table_meta::meta::Statistics; +use tracing::info; +use tracing::Instrument; + +use crate::io::TableMetaLocationGenerator; +use crate::operations::mutation::compact::CompactSourceMeta; +use crate::operations::mutation::AbortOperation; +use crate::operations::mutation::BlockCompactMutator; +use crate::operations::mutation::MutationSinkMeta; +use crate::statistics::reducers::merge_statistics_mut; +use crate::statistics::reducers::reduce_block_metas; + +#[derive(Clone)] +struct SerializedSegment { + location: String, + segment: Arc, +} + +pub struct CompactAggregator { + ctx: Arc, + dal: Operator, + location_gen: TableMetaLocationGenerator, + + // locations all the merged segments. + merged_segments: BTreeMap, + // summarised statistics of all the merged segments + merged_statistics: Statistics, + // locations all the merged blocks. + merge_blocks: HashMap>>, + thresholds: BlockThresholds, + abort_operation: AbortOperation, + + start_time: Instant, + total_tasks: usize, +} + +impl CompactAggregator { + pub fn new( + dal: Operator, + location_gen: TableMetaLocationGenerator, + mutator: BlockCompactMutator, + ) -> Self { + Self { + ctx: mutator.ctx.clone(), + dal, + location_gen, + merged_segments: mutator.unchanged_segments_map, + merged_statistics: mutator.unchanged_segment_statistics, + merge_blocks: mutator.unchanged_blocks_map, + thresholds: mutator.thresholds, + abort_operation: AbortOperation::default(), + start_time: Instant::now(), + total_tasks: mutator.compact_tasks.len(), + } + } + + async fn write_segment(dal: Operator, segment: SerializedSegment) -> Result<()> { + dal.write(&segment.location, serde_json::to_vec(&segment.segment)?) + .await?; + if let Some(segment_cache) = CacheManager::instance().get_table_segment_cache() { + segment_cache.put(segment.location.clone(), segment.segment.clone()); + } + Ok(()) + } + + async fn write_segments(&self, segments: Vec) -> Result<()> { + let mut iter = segments.iter(); + let tasks = std::iter::from_fn(move || { + iter.next().map(|segment| { + Self::write_segment(self.dal.clone(), segment.clone()) + .instrument(tracing::debug_span!("write_segment")) + }) + }); + + let threads_nums = self.ctx.get_settings().get_max_threads()? as usize; + let permit_nums = self.ctx.get_settings().get_max_storage_io_requests()? as usize; + execute_futures_in_parallel( + tasks, + threads_nums, + permit_nums, + "compact-write-segments-worker".to_owned(), + ) + .await? + .into_iter() + .collect::>>()?; + Ok(()) + } +} + +#[async_trait::async_trait] +impl AsyncAccumulatingTransform for CompactAggregator { + const NAME: &'static str = "CompactAggregator"; + + async fn transform(&mut self, data: DataBlock) -> Result> { + // gather the input data. + if let Some(meta) = data + .get_meta() + .and_then(CompactSourceMeta::downcast_ref_from) + { + self.abort_operation.add_block(&meta.block); + self.merge_blocks + .entry(meta.index.segment_idx) + .and_modify(|v| { + v.insert(meta.index.block_idx, meta.block.clone()); + }) + .or_insert(BTreeMap::from([(meta.index.block_idx, meta.block.clone())])); + + // Refresh status + { + let status = format!( + "compact: run compact tasks:{}/{}, cost:{} sec", + self.abort_operation.blocks.len(), + self.total_tasks, + self.start_time.elapsed().as_secs() + ); + self.ctx.set_status_info(&status); + info!(status); + } + } + // no partial output + Ok(None) + } + + async fn on_finish(&mut self, _output: bool) -> Result> { + let mut serialized_segments = Vec::with_capacity(self.merge_blocks.len()); + for (segment_idx, block_map) in std::mem::take(&mut self.merge_blocks) { + // generate the new segment. + let blocks: Vec<_> = block_map.into_values().collect(); + let new_summary = reduce_block_metas(&blocks, self.thresholds)?; + merge_statistics_mut(&mut self.merged_statistics, &new_summary)?; + let new_segment = SegmentInfo::new(blocks, new_summary); + let location = self.location_gen.gen_segment_info_location(); + self.abort_operation.add_segment(location.clone()); + self.merged_segments.insert( + segment_idx, + (location.clone(), new_segment.format_version()), + ); + serialized_segments.push(SerializedSegment { + location, + segment: Arc::new(new_segment), + }); + } + + let start = Instant::now(); + // Refresh status + { + let status = format!( + "compact: begin to write new segments:{}", + serialized_segments.len() + ); + self.ctx.set_status_info(&status); + info!(status); + } + // write segments. + self.write_segments(serialized_segments).await?; + + // Refresh status + { + let status = format!( + "compact: end to write new segments, cost:{} sec", + start.elapsed().as_secs() + ); + self.ctx.set_status_info(&status); + info!(status); + } + // gather the all segments. + let merged_segments = std::mem::take(&mut self.merged_segments) + .into_values() + .collect(); + let meta = MutationSinkMeta::create( + merged_segments, + std::mem::take(&mut self.merged_statistics), + std::mem::take(&mut self.abort_operation), + ); + Ok(Some(DataBlock::empty_with_meta(meta))) + } +} diff --git a/src/query/storages/fuse/src/operations/mutation/mod.rs b/src/query/storages/fuse/src/operations/mutation/mod.rs index 93ca20d50be5e..78d44e1ddab8b 100644 --- a/src/query/storages/fuse/src/operations/mutation/mod.rs +++ b/src/query/storages/fuse/src/operations/mutation/mod.rs @@ -27,9 +27,9 @@ mod serialize_data_transform; pub use abort_operation::AbortOperation; pub use base_mutator::BaseMutator; pub use compact::BlockCompactMutator; +pub use compact::CompactAggregator; +pub use compact::CompactPartInfo; pub use compact::CompactSource; -pub use compact::CompactTransform; -pub use compact::MergeSegmentsTransform; pub use compact::SegmentCompactMutator; pub use compact::SegmentCompactionState; pub use compact::SegmentCompactor; diff --git a/src/query/storages/fuse/src/operations/mutation/mutation_sink.rs b/src/query/storages/fuse/src/operations/mutation/mutation_sink.rs index 4db1da5195322..e28075c646127 100644 --- a/src/query/storages/fuse/src/operations/mutation/mutation_sink.rs +++ b/src/query/storages/fuse/src/operations/mutation/mutation_sink.rs @@ -25,6 +25,7 @@ use opendal::Operator; use storages_common_table_meta::meta::Location; use storages_common_table_meta::meta::Statistics; use storages_common_table_meta::meta::TableSnapshot; +use tracing::info; use crate::io::SegmentsIO; use crate::io::TableMetaLocationGenerator; @@ -153,6 +154,13 @@ impl Processor for MutationSink { fn process(&mut self) -> Result<()> { match std::mem::replace(&mut self.state, State::None) { State::ReadMeta(input_meta) => { + // Status + { + let status = "mutation: begin try to commit"; + self.ctx.set_status_info(status); + info!(status); + } + let meta = MutationSinkMeta::from_meta(&input_meta)?; self.merged_segments = meta.segments.clone(); diff --git a/src/query/storages/fuse/src/statistics/accumulator.rs b/src/query/storages/fuse/src/statistics/accumulator.rs index 9c05626a865da..c9adfc399f6a3 100644 --- a/src/query/storages/fuse/src/statistics/accumulator.rs +++ b/src/query/storages/fuse/src/statistics/accumulator.rs @@ -115,6 +115,6 @@ impl StatisticsAccumulator { } pub fn summary(&self) -> Result { - super::reduce_block_statistics(&self.blocks_statistics, None) + super::reduce_block_statistics(&self.blocks_statistics) } } diff --git a/src/query/storages/fuse/src/statistics/column_statistic.rs b/src/query/storages/fuse/src/statistics/column_statistic.rs index 5ecd4a40eee67..6d1f79922555e 100644 --- a/src/query/storages/fuse/src/statistics/column_statistic.rs +++ b/src/query/storages/fuse/src/statistics/column_statistic.rs @@ -15,15 +15,12 @@ use std::collections::HashMap; use common_exception::Result; -use common_expression::types::DataType; use common_expression::types::NumberType; use common_expression::types::ValueType; use common_expression::Column; -use common_expression::ColumnId; use common_expression::DataBlock; use common_expression::FieldIndex; use common_expression::Scalar; -use common_expression::TableField; use common_expression::TableSchemaRef; use common_functions::aggregates::eval_aggr; use storages_common_index::Index; @@ -132,96 +129,6 @@ pub fn gen_columns_statistics( Ok(statistics) } -pub struct ColumnStatisticsLite { - pub default_val: Scalar, - pub null_count: u64, - pub in_memory_size: u64, - pub distinct_of_values: u64, -} - -pub fn gen_col_stats_lite( - data_block: &DataBlock, - fields: &[TableField], - default_vals: &[Scalar], -) -> Result> { - fn collect_col_stats( - col_scalar: Option<(&Column, &Scalar)>, - data_type: &DataType, - column_id: &mut ColumnId, - stats: &mut HashMap, - rows: usize, - ) -> Result<()> { - match data_type { - DataType::Tuple(inner_types) => { - if let Some((col, val)) = col_scalar { - let inner_columns = col.as_tuple().unwrap(); - let inner_scalars = val.as_tuple().unwrap(); - for ((inner_column, inner_type), inner_scalar) in - inner_columns.iter().zip(inner_types).zip(inner_scalars) - { - collect_col_stats( - Some((inner_column, inner_scalar)), - inner_type, - column_id, - stats, - rows, - )?; - } - } else { - for inner_type in inner_types.iter() { - collect_col_stats(None, inner_type, column_id, stats, rows)?; - } - } - } - DataType::Array(inner_type) => { - collect_col_stats(None, inner_type, column_id, stats, rows)? - } - DataType::Map(inner_type) => { - collect_col_stats(None, inner_type, column_id, stats, rows)? - } - _ => { - if let Some((col, val)) = col_scalar { - if RangeIndex::supported_type(data_type) { - let (is_all_null, bitmap) = col.validity(); - let unset_bits = match (is_all_null, bitmap) { - (true, _) => rows, - (false, Some(bitmap)) => bitmap.unset_bits(), - (false, None) => 0, - }; - let in_memory_size = col.memory_size() as u64; - let distinct_of_values = calc_column_distinct_of_values(col, rows)?; - stats.insert(*column_id, ColumnStatisticsLite { - default_val: val.clone(), - null_count: unset_bits as u64, - in_memory_size, - distinct_of_values, - }); - } - } - - *column_id += 1; - } - } - Ok(()) - } - - let columns = data_block.columns(); - let mut stats = HashMap::new(); - for (idx, entry) in columns.iter().enumerate() { - let data_type = &entry.data_type; - let column = entry.value.as_column().unwrap(); - let mut next_column_id = fields[idx].column_id(); - collect_col_stats( - Some((column, &default_vals[idx])), - data_type, - &mut next_column_id, - &mut stats, - data_block.num_rows(), - )?; - } - Ok(stats) -} - pub mod traverse { use common_expression::types::DataType; use common_expression::BlockEntry; diff --git a/src/query/storages/fuse/src/statistics/mod.rs b/src/query/storages/fuse/src/statistics/mod.rs index 8136c756f4f14..1f642ed034647 100644 --- a/src/query/storages/fuse/src/statistics/mod.rs +++ b/src/query/storages/fuse/src/statistics/mod.rs @@ -22,11 +22,9 @@ pub use accumulator::StatisticsAccumulator; pub use block_statistics::BlockStatistics; pub use cluster_statistics::ClusterStatsGenerator; pub use column_statistic::calc_column_distinct_of_values; -pub use column_statistic::gen_col_stats_lite; pub use column_statistic::gen_columns_statistics; pub use column_statistic::get_traverse_columns_dfs; pub use column_statistic::traverse; -pub use column_statistic::ColumnStatisticsLite; pub use column_statistic::Trim; pub use column_statistic::STATS_REPLACEMENT_CHAR; pub use column_statistic::STATS_STRING_PREFIX_LEN; diff --git a/src/query/storages/fuse/src/statistics/reducers.rs b/src/query/storages/fuse/src/statistics/reducers.rs index bb1e1963e4261..91694fce0d65a 100644 --- a/src/query/storages/fuse/src/statistics/reducers.rs +++ b/src/query/storages/fuse/src/statistics/reducers.rs @@ -24,36 +24,26 @@ use storages_common_table_meta::meta::ColumnStatistics; use storages_common_table_meta::meta::Statistics; use storages_common_table_meta::meta::StatisticsOfColumns; -use crate::statistics::ColumnStatisticsLite; - pub fn reduce_block_statistics>( stats_of_columns: &[T], - col_stats_lites: Option<&HashMap>, ) -> Result { // Combine statistics of a column into `Vec`, that is: // from : `&[HashMap]` // to : `HashMap)>` let col_to_stats_lit = stats_of_columns.iter().fold(HashMap::new(), |acc, item| { - item.borrow() - .iter() - // ignore dropped column statistics by column id - .filter(|(col_id, _)| { - col_stats_lites.map_or(true, |col_stats_lite| col_stats_lite.contains_key(col_id)) - }) - .fold( - acc, - |mut acc: HashMap>, (col_id, col_stats)| { - acc.entry(*col_id).or_default().push(col_stats); - acc - }, - ) + item.borrow().iter().fold( + acc, + |mut acc: HashMap>, (col_id, col_stats)| { + acc.entry(*col_id).or_default().push(col_stats); + acc + }, + ) }); // Reduce the `Vec<&ColumnStatistics` into ColumnStatistics`, i.e.: // from : `HashMap)>` // to : `type BlockStatistics = HashMap` let len = col_to_stats_lit.len(); - let stats_len = stats_of_columns.len(); col_to_stats_lit .iter() .try_fold(HashMap::with_capacity(len), |mut acc, (id, stats)| { @@ -61,7 +51,6 @@ pub fn reduce_block_statistics>( let mut max_stats = Vec::with_capacity(stats.len()); let mut null_count = 0; let mut in_memory_size = 0; - let mut distinct_of_values = None; for col_stats in stats { min_stats.push(col_stats.min.clone()); @@ -71,19 +60,6 @@ pub fn reduce_block_statistics>( in_memory_size += col_stats.in_memory_size; } - if let Some(col_stats_lite) = col_stats_lites { - if let Some(stat) = col_stats_lite.get(id) { - // fill the default value for min max. - if stats.len() < stats_len { - min_stats.push(stat.default_val.clone()); - max_stats.push(stat.default_val.clone()); - } - null_count = stat.null_count; - in_memory_size = stat.in_memory_size; - distinct_of_values = Some(stat.distinct_of_values); - }; - } - // TODO: // In accumulator.rs, we use aggregation functions to get the min/max of `DataValue`s, @@ -109,7 +85,7 @@ pub fn reduce_block_statistics>( max, null_count, in_memory_size, - distinct_of_values, + distinct_of_values: None, }); Ok(acc) }) @@ -123,7 +99,7 @@ pub fn merge_statistics(l: &Statistics, r: &Statistics) -> Result { uncompressed_byte_size: l.uncompressed_byte_size + r.uncompressed_byte_size, compressed_byte_size: l.compressed_byte_size + r.compressed_byte_size, index_size: l.index_size + r.index_size, - col_stats: reduce_block_statistics(&[&l.col_stats, &r.col_stats], None)?, + col_stats: reduce_block_statistics(&[&l.col_stats, &r.col_stats])?, }; Ok(s) } @@ -135,7 +111,7 @@ pub fn merge_statistics_mut(l: &mut Statistics, r: &Statistics) -> Result<()> { l.uncompressed_byte_size += r.uncompressed_byte_size; l.compressed_byte_size += r.compressed_byte_size; l.index_size += r.index_size; - l.col_stats = reduce_block_statistics(&[&l.col_stats, &r.col_stats], None)?; + l.col_stats = reduce_block_statistics(&[&l.col_stats, &r.col_stats])?; Ok(()) } @@ -174,7 +150,7 @@ pub fn reduce_block_metas>( .iter() .map(|v| &v.borrow().col_stats) .collect::>(); - let merged_col_stats = reduce_block_statistics(&stats, None)?; + let merged_col_stats = reduce_block_statistics(&stats)?; Ok(Statistics { row_count, diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0008_fuse_optimize_table b/tests/sqllogictests/suites/base/09_fuse_engine/09_0008_fuse_optimize_table index 9f6691e075848..3cad41d547758 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0008_fuse_optimize_table +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0008_fuse_optimize_table @@ -231,26 +231,20 @@ select segment_count, block_count from fuse_snapshot('db_09_0008', 't2') limit 1 -statement ok -set max_threads=1 - -statement ok -set max_memory_usage=600 - statement ok create table t3(c int) block_per_segment=4 row_per_block=3 statement ok -insert into t3 values(1) +insert into t3 values (1) statement ok -insert into t3 values(2) +insert into t3 values (2) statement ok -insert into t3 values(3) +insert into t3 values (3) statement ok -insert into t3 values(4) +insert into t3 values (4) statement ok optimize table t3 compact @@ -270,6 +264,39 @@ select segment_count, block_count from fuse_snapshot('db_09_0008', 't3') limit 1 +statement ok +create table t4(a int) row_per_block=2 + +statement ok +insert into t4 values (1),(2) + +statement ok +insert into t4 values (3) + +statement ok +alter table t4 add column b int + +statement ok +insert into t4 values (4, 4) + +statement ok +optimize table t4 compact + +query II +select * from t4 order by a +---- +1 0 +2 0 +3 0 +4 4 + +query II +select segment_count, block_count from fuse_snapshot('db_09_0008', 't4') limit 1 +---- +1 3 + + + statement ok DROP TABLE m @@ -285,6 +312,9 @@ DROP TABLE t2 statement ok DROP TABLE t3 +statement ok +DROP TABLE t4 + statement ok DROP DATABASE db_09_0008