diff --git a/src/query/expression/src/block.rs b/src/query/expression/src/block.rs index d0196c8f82e79..8af901f49cbbc 100644 --- a/src/query/expression/src/block.rs +++ b/src/query/expression/src/block.rs @@ -907,6 +907,19 @@ impl DataBlock { }) .sum() } + + pub fn maybe_gc(self) -> DataBlock { + let mut columns = Vec::with_capacity(self.entries.len()); + + for entry in self.entries { + columns.push(match entry { + BlockEntry::Column(column) => BlockEntry::Column(column.maybe_gc()), + BlockEntry::Const(s, d, n) => BlockEntry::Const(s, d, n), + }); + } + + DataBlock::new(columns, self.num_rows) + } } impl Eq for Box {} diff --git a/src/query/service/src/global_services.rs b/src/query/service/src/global_services.rs index 6081cdd830a05..b5ed3b8a0fcbc 100644 --- a/src/query/service/src/global_services.rs +++ b/src/query/service/src/global_services.rs @@ -58,6 +58,7 @@ use crate::servers::http::v1::ClientSessionManager; use crate::servers::http::v1::HttpQueryManager; use crate::sessions::QueriesQueueManager; use crate::sessions::SessionManager; +use crate::spillers::SpillsBufferPool; use crate::task::service::TaskService; pub struct GlobalServices; @@ -106,6 +107,7 @@ impl GlobalServices { // 4. cluster discovery init. ClusterDiscovery::init(config, version).await?; + SpillsBufferPool::init(); // TODO(xuanwo): // // This part is a bit complex because catalog are used widely in different diff --git a/src/query/service/src/physical_plans/physical_hash_join.rs b/src/query/service/src/physical_plans/physical_hash_join.rs index aeab64fddd5b0..64d15be1f7743 100644 --- a/src/query/service/src/physical_plans/physical_hash_join.rs +++ b/src/query/service/src/physical_plans/physical_hash_join.rs @@ -55,10 +55,8 @@ use crate::physical_plans::physical_plan::PhysicalPlan; use crate::physical_plans::physical_plan::PhysicalPlanMeta; use crate::physical_plans::Exchange; use crate::physical_plans::PhysicalPlanBuilder; -use crate::pipelines::processors::transforms::memory::outer_left_join::OuterLeftHashJoin; -use crate::pipelines::processors::transforms::BasicHashJoinState; +use crate::pipelines::processors::transforms::HashJoinFactory; use crate::pipelines::processors::transforms::HashJoinProbeState; -use crate::pipelines::processors::transforms::InnerHashJoin; use crate::pipelines::processors::transforms::RuntimeFiltersDesc; use crate::pipelines::processors::transforms::TransformHashJoin; use crate::pipelines::processors::transforms::TransformHashJoinBuild; @@ -395,16 +393,22 @@ impl HashJoin { builder: &mut PipelineBuilder, desc: Arc, ) -> Result<()> { - let state = Arc::new(BasicHashJoinState::create()); + let factory = self.join_factory(builder, desc)?; + // We must build the runtime filter before constructing the child nodes, // as we will inject some runtime filter information into the context for the child nodes to use. let rf_desc = RuntimeFiltersDesc::create(&builder.ctx, self)?; - if let Some((build_cache_index, _)) = self.build_side_cache_info { - builder.hash_join_states.insert( - build_cache_index, - HashJoinStateRef::NewHashJoinState(state.clone()), - ); + // After common subexpression elimination is completed, we can delete this type of code. + { + let state = factory.create_basic_state(0)?; + + if let Some((build_cache_index, _)) = self.build_side_cache_info { + builder.hash_join_states.insert( + build_cache_index, + HashJoinStateRef::NewHashJoinState(state.clone()), + ); + } } self.build.build_pipeline(builder)?; @@ -441,7 +445,7 @@ impl HashJoin { build_input.clone(), probe_input.clone(), joined_output.clone(), - self.create_join(&self.join_type, builder, desc.clone(), state.clone())?, + factory.create_hash_join(self.join_type.clone(), 0)?, stage_sync_barrier.clone(), self.projections.clone(), rf_desc.clone(), @@ -465,13 +469,11 @@ impl HashJoin { .resize(builder.main_pipeline.output_len(), true) } - fn create_join( + fn join_factory( &self, - join_type: &JoinType, - builder: &mut PipelineBuilder, + ctx: &PipelineBuilder, desc: Arc, - state: Arc, - ) -> Result> { + ) -> Result> { let hash_key_types = self .build_keys .iter() @@ -486,25 +488,12 @@ impl HashJoin { }) .collect::>(); - let method = DataBlock::choose_hash_method_with_types(&hash_key_types)?; - - Ok(match join_type { - JoinType::Inner => Box::new(InnerHashJoin::create( - &builder.ctx, - builder.func_ctx.clone(), - method, - desc, - state, - )?), - JoinType::Left => Box::new(OuterLeftHashJoin::create( - &builder.ctx, - builder.func_ctx.clone(), - method, - desc, - state, - )?), - _ => unreachable!(), - }) + Ok(HashJoinFactory::create( + ctx.ctx.clone(), + ctx.func_ctx.clone(), + DataBlock::choose_hash_method_with_types(&hash_key_types)?, + desc, + )) } } diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/mod.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/mod.rs index 64adce85f0d60..32b46295ccb1f 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/mod.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/mod.rs @@ -40,5 +40,6 @@ pub use hash_join_state::*; pub use probe_state::ProbeState; pub use probe_state::ProcessState; pub use runtime_filter::*; +pub use spill_common::get_hashes; pub use transform_hash_join_build::TransformHashJoinBuild; pub use transform_hash_join_probe::TransformHashJoinProbe; diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/common/squash_blocks.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/common/squash_blocks.rs index 5f2a32ff9c6db..e26a042ae1882 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/common/squash_blocks.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/common/squash_blocks.rs @@ -97,7 +97,11 @@ impl SquashBlocks { let mut current_rows = 0; let mut current_bytes = 0; - while let Some(mut block) = self.blocks.pop_front() { + while current_rows < self.squash_rows && current_bytes < self.squash_bytes { + let Some(mut block) = self.blocks.pop_front() else { + return DataBlock::concat(&blocks); + }; + if block.block.is_empty() { continue; } @@ -106,34 +110,29 @@ impl SquashBlocks { self.current_bytes -= block.block.memory_size(); let mut slice_rows = block.block.num_rows(); - slice_rows = std::cmp::min(slice_rows, self.squash_rows - current_rows); let max_bytes_rows = match block.avg_bytes { 0 => block.block.num_rows(), - _ => (self.squash_bytes - current_bytes) / block.avg_bytes, + _ => self.squash_bytes.saturating_sub(current_bytes) / block.avg_bytes, }; slice_rows = std::cmp::min(max_bytes_rows, slice_rows); if slice_rows != block.block.num_rows() { - let compact_block = block.block.slice(0..slice_rows); - let remain_block = block.block.slice(slice_rows..block.block.num_rows()); - + let compact_block = block.block.slice(0..slice_rows).maybe_gc(); blocks.push(compact_block); - if !remain_block.is_empty() { - let mut columns = Vec::with_capacity(block.block.num_columns()); + let remain_block = block + .block + .slice(slice_rows..block.block.num_rows()) + .maybe_gc(); - for block_entry in remain_block.take_columns() { - let column = block_entry.to_column(); - drop(block_entry); - columns.push(column.maybe_gc()); - } + self.current_rows += remain_block.num_rows(); + self.current_bytes += remain_block.memory_size(); - block.block = DataBlock::new_from_columns(columns); - self.current_rows += block.block.num_rows(); - self.current_bytes += block.block.memory_size(); + if !remain_block.is_empty() { + block.block = remain_block; self.blocks.push_front(block); } diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/grace_join.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/grace_join.rs new file mode 100644 index 0000000000000..97f5fdaf6cf43 --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/grace_join.rs @@ -0,0 +1,480 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::btree_map::Entry; +use std::collections::VecDeque; +use std::sync::Arc; +use std::sync::PoisonError; + +use databend_common_base::base::GlobalUniqName; +use databend_common_base::base::ProgressValues; +use databend_common_catalog::table_context::TableContext; +use databend_common_exception::Result; +use databend_common_expression::BlockPartitionStream; +use databend_common_expression::DataBlock; +use databend_common_expression::FunctionContext; +use databend_common_expression::HashMethodKind; +use databend_common_pipeline_transforms::traits::Location; +use databend_common_storage::DataOperator; +use databend_common_storages_parquet::ReadSettings; + +use crate::pipelines::processors::transforms::get_hashes; +use crate::pipelines::processors::transforms::new_hash_join::grace::grace_memory::GraceMemoryJoin; +use crate::pipelines::processors::transforms::new_hash_join::grace::grace_state::GraceHashJoinState; +use crate::pipelines::processors::transforms::new_hash_join::grace::grace_state::SpillMetadata; +use crate::pipelines::processors::transforms::new_hash_join::join::EmptyJoinStream; +use crate::pipelines::processors::transforms::new_hash_join::join::JoinStream; +use crate::pipelines::processors::transforms::Join; +use crate::pipelines::processors::HashJoinDesc; +use crate::sessions::QueryContext; +use crate::spillers::Layout; +use crate::spillers::SpillAdapter; +use crate::spillers::SpillsBufferPool; +use crate::spillers::SpillsDataReader; +use crate::spillers::SpillsDataWriter; + +pub struct GraceHashJoin { + pub(crate) desc: Arc, + pub(crate) hash_method_kind: HashMethodKind, + pub(crate) function_context: FunctionContext, + + pub(crate) location_prefix: String, + pub(crate) shift_bits: usize, + + pub(crate) stage: RestoreStage, + pub(crate) state: Arc, + + pub(crate) partitions: Vec, + pub(crate) build_partition_stream: BlockPartitionStream, + pub(crate) probe_partition_stream: BlockPartitionStream, + + pub(crate) memory_hash_join: T, + pub(crate) read_settings: ReadSettings, +} + +unsafe impl Send for GraceHashJoin {} +unsafe impl Sync for GraceHashJoin {} + +impl Join for GraceHashJoin { + fn add_block(&mut self, data: Option) -> Result<()> { + let ready_partitions = match data { + None => self.finalize_build_data(), + Some(data) => self.partition_build_data(data)?, + }; + + for (id, data_block) in ready_partitions { + self.partitions[id].writer.write(data_block)?; + self.partitions[id].writer.flush()?; + } + + Ok(()) + } + + fn final_build(&mut self) -> Result> { + let mut partitions_meta = Vec::with_capacity(self.partitions.len()); + + let mut ready_partitions = Vec::with_capacity(self.partitions.len()); + for id in 0..self.partitions.len() { + let mut partition = GraceJoinPartition::create(&self.location_prefix)?; + + std::mem::swap(&mut self.partitions[id], &mut partition); + ready_partitions.push(partition); + } + + for (id, partition) in ready_partitions.into_iter().enumerate() { + let path = partition.path; + let (written, row_groups) = partition.writer.close()?; + + self.state + .ctx + .add_spill_file(Location::Remote(path.clone()), Layout::Parquet, written); + + if !row_groups.is_empty() { + partitions_meta.push((id, SpillMetadata { path, row_groups })); + } + } + + if !partitions_meta.is_empty() { + let locked = self.state.mutex.lock(); + let _locked = locked.unwrap_or_else(PoisonError::into_inner); + + for (id, metadata) in partitions_meta { + match self.state.build_row_groups.as_mut().entry(id) { + Entry::Occupied(mut v) => { + v.get_mut().push(metadata); + } + Entry::Vacant(v) => { + v.insert(vec![metadata]); + } + } + } + } + + Ok(None) + } + + fn probe_block(&mut self, data: DataBlock) -> Result> { + let ready_partitions = self.partition_probe_data(data)?; + + for (id, data_block) in ready_partitions { + self.partitions[id].writer.write(data_block)?; + self.partitions[id].writer.flush()?; + } + + Ok(Box::new(EmptyJoinStream)) + } + + fn final_probe(&mut self) -> Result>> { + match self.stage { + RestoreStage::FlushMemory => { + self.finalize_probe_data()?; + self.stage = RestoreStage::RestoreBuild; + Ok(Some(Box::new(EmptyJoinStream))) + } + RestoreStage::RestoreBuild => { + if !self.advance_restore_partition() { + return Ok(None); + } + + self.restore_build_data()?; + self.stage = RestoreStage::RestoreBuildFinal; + Ok(Some(Box::new(EmptyJoinStream))) + } + RestoreStage::RestoreBuildFinal => { + self.stage = RestoreStage::RestoreProbe; + while let Some(_x) = self.memory_hash_join.final_build()? {} + Ok(Some(Box::new(EmptyJoinStream))) + } + RestoreStage::RestoreProbe => { + self.stage = RestoreStage::RestoreProbeFinal; + Ok(Some(RestoreProbeStream::create(self))) + } + RestoreStage::RestoreProbeFinal => unsafe { + // Note that this is safe: we are not violating the uniqueness of mutable references, + // as the reuse of join always waits for the stream to be fully consumed. + // However, it seems impossible to express this without using unsafe code. + let join: &mut T = &mut *(&mut self.memory_hash_join as *mut _); + if let Some(stream) = join.final_probe()? { + return Ok(Some(stream)); + } + + let locked = self.state.mutex.lock(); + let _locked = locked.unwrap_or_else(PoisonError::into_inner); + + self.stage = RestoreStage::RestoreBuild; + *self.state.restore_partition.as_mut() = None; + self.memory_hash_join.reset_memory(); + Ok(Some(Box::new(EmptyJoinStream))) + }, + } + } +} + +impl GraceHashJoin { + pub fn create( + ctx: Arc, + function_ctx: FunctionContext, + hash_method_kind: HashMethodKind, + desc: Arc, + state: Arc, + memory_hash_join: T, + shift_bits: usize, + ) -> Result> { + let settings = ctx.get_settings(); + let rows = settings.get_max_block_size()? as usize; + let bytes = settings.get_max_block_bytes()? as usize; + let location_prefix = ctx.query_id_spill_prefix(); + + let mut partitions = Vec::with_capacity(16); + + for _ in 0..16 { + partitions.push(GraceJoinPartition::create(&location_prefix)?); + } + + let ctx: Arc = ctx.clone(); + Ok(GraceHashJoin { + desc, + state, + shift_bits, + location_prefix, + hash_method_kind, + memory_hash_join, + function_context: function_ctx, + stage: RestoreStage::FlushMemory, + partitions, + read_settings: ReadSettings::from_ctx(&ctx)?, + build_partition_stream: BlockPartitionStream::create(rows, bytes, 16), + probe_partition_stream: BlockPartitionStream::create(rows, bytes, 16), + }) + } + + fn advance_restore_partition(&mut self) -> bool { + let locked = self.state.mutex.lock(); + let _locked = locked.unwrap_or_else(PoisonError::into_inner); + + if self.state.restore_partition.is_none() { + let Some((id, data)) = self.state.build_row_groups.as_mut().pop_first() else { + let Some((id, data)) = self.state.probe_row_groups.as_mut().pop_first() else { + *self.state.finished.as_mut() = true; + return false; + }; + + // Only left join? + *self.state.restore_partition.as_mut() = Some(id); + self.state.restore_build_queue.as_mut().clear(); + *self.state.restore_probe_queue.as_mut() = VecDeque::from(data); + return true; + }; + + *self.state.restore_partition.as_mut() = Some(id); + *self.state.restore_build_queue.as_mut() = VecDeque::from(data); + + self.state.restore_probe_queue.as_mut().clear(); + if let Some(probe_spills_data) = self.state.probe_row_groups.as_mut().remove(&id) { + *self.state.restore_probe_queue.as_mut() = VecDeque::from(probe_spills_data); + } + } + + !*self.state.finished + } + + fn steal_restore_build_task(&mut self) -> Option { + let locked = self.state.mutex.lock(); + let _locked = locked.unwrap_or_else(PoisonError::into_inner); + self.state.restore_build_queue.as_mut().pop_front() + } + + fn restore_build_data(&mut self) -> Result<()> { + let operator = DataOperator::instance().spill_operator(); + + while let Some(data) = self.steal_restore_build_task() { + let buffer_pool = SpillsBufferPool::instance(); + let mut reader = buffer_pool.reader(operator.clone(), data.path, data.row_groups)?; + + while let Some(data_block) = reader.read(self.read_settings)? { + self.memory_hash_join.add_block(Some(data_block))?; + } + } + + self.memory_hash_join.add_block(None) + } + + fn partition_build_data(&mut self, data: DataBlock) -> Result> { + let mut hashes = Vec::with_capacity(data.num_rows()); + + get_hashes( + &self.function_context, + &data, + &self.desc.build_keys, + &self.hash_method_kind, + &self.desc.join_type, + true, + &self.desc.is_null_equal, + &mut hashes, + )?; + + for hash in hashes.iter_mut() { + *hash = ((*hash << self.shift_bits) >> 60) & 0b1111; + } + + Ok(self.build_partition_stream.partition(hashes, data, true)) + } + + fn partition_probe_data(&mut self, data: DataBlock) -> Result> { + let mut hashes = Vec::with_capacity(data.num_rows()); + + get_hashes( + &self.function_context, + &data, + &self.desc.probe_keys, + &self.hash_method_kind, + &self.desc.join_type, + false, + &self.desc.is_null_equal, + &mut hashes, + )?; + + for hash in hashes.iter_mut() { + *hash = ((*hash << self.shift_bits) >> 60) & 0b1111; + } + + Ok(self.probe_partition_stream.partition(hashes, data, true)) + } + + fn finalize_build_data(&mut self) -> Vec<(usize, DataBlock)> { + let ready_partitions_id = self.build_partition_stream.partition_ids(); + let mut ready_partitions = Vec::with_capacity(ready_partitions_id.len()); + for id in ready_partitions_id { + if let Some(data) = self.build_partition_stream.finalize_partition(id) { + ready_partitions.push((id, data)); + } + } + ready_partitions + } + + fn finalize_probe_data(&mut self) -> Result<()> { + let ready_partitions_id = self.probe_partition_stream.partition_ids(); + + for id in ready_partitions_id { + if let Some(data_block) = self.probe_partition_stream.finalize_partition(id) { + self.partitions[id].writer.write(data_block)?; + self.partitions[id].writer.flush()?; + } + } + + let ready_partitions = std::mem::take(&mut self.partitions); + let mut partitions_meta = Vec::with_capacity(self.partitions.len()); + + for (id, partition) in ready_partitions.into_iter().enumerate() { + let path = partition.path; + let (written, row_groups) = partition.writer.close()?; + + self.state + .ctx + .add_spill_file(Location::Remote(path.clone()), Layout::Parquet, written); + + if !row_groups.is_empty() { + partitions_meta.push((id, SpillMetadata { path, row_groups })); + } + } + + if !partitions_meta.is_empty() { + let locked = self.state.mutex.lock(); + let _locked = locked.unwrap_or_else(PoisonError::into_inner); + + for (id, metadata) in partitions_meta { + match self.state.probe_row_groups.as_mut().entry(id) { + Entry::Occupied(mut v) => { + v.get_mut().push(metadata); + } + Entry::Vacant(v) => { + v.insert(vec![metadata]); + } + } + } + } + + Ok(()) + } +} + +pub enum RestoreStage { + FlushMemory, + RestoreBuild, + RestoreBuildFinal, + RestoreProbe, + RestoreProbeFinal, +} + +pub struct GraceJoinPartition { + path: String, + writer: SpillsDataWriter, +} + +impl GraceJoinPartition { + pub fn create(prefix: &str) -> Result { + let data_operator = DataOperator::instance(); + + let operator = data_operator.spill_operator(); + let buffer_pool = SpillsBufferPool::instance(); + let file_path = format!("{}/{}", prefix, GlobalUniqName::unique()); + let spills_data_writer = buffer_pool.writer(operator, file_path.clone())?; + + Ok(GraceJoinPartition { + path: file_path, + writer: spills_data_writer, + }) + } +} + +pub struct RestoreProbeStream<'a, T: GraceMemoryJoin> { + join: &'a mut GraceHashJoin, + spills_reader: Option, + joined_stream: Option>, +} + +impl<'a, T: GraceMemoryJoin> RestoreProbeStream<'a, T> { + pub fn create(join: &'a mut GraceHashJoin) -> Box { + Box::new(RestoreProbeStream::<'a, T> { + join, + spills_reader: None, + joined_stream: None, + }) + } + + fn steal_restore_probe_task(&mut self) -> Option { + let locked = self.join.state.mutex.lock(); + let _locked = locked.unwrap_or_else(PoisonError::into_inner); + self.join.state.restore_probe_queue.as_mut().pop_front() + } +} + +impl<'a, T: GraceMemoryJoin> JoinStream for RestoreProbeStream<'a, T> { + #[allow(clippy::missing_transmute_annotations)] + fn next(&mut self) -> Result> { + loop { + if let Some(mut joined_stream) = self.joined_stream.take() { + if let Some(joined_data) = joined_stream.next()? { + self.joined_stream = Some(joined_stream); + return Ok(Some(joined_data)); + } + } + + let Some(block) = self.next_probe_block()? else { + return Ok(None); + }; + + // Note that this is safe: we are not violating the uniqueness of mutable references, + // as the reuse of join always waits for the stream to be fully consumed. + // However, it seems impossible to express this without using unsafe code. + unsafe { + let join: &mut GraceHashJoin = &mut *(self.join as *mut _); + self.joined_stream = Some(std::mem::transmute( + join.memory_hash_join.probe_block(block)?, + )); + } + } + } +} + +impl<'a, T: GraceMemoryJoin> RestoreProbeStream<'a, T> { + fn next_probe_block(&mut self) -> Result> { + loop { + if self.spills_reader.is_none() { + while let Some(data) = self.steal_restore_probe_task() { + if data.row_groups.is_empty() { + continue; + } + + let operator = DataOperator::instance().spill_operator(); + let buffer_pool = SpillsBufferPool::instance(); + let reader = buffer_pool.reader(operator, data.path, data.row_groups)?; + self.spills_reader = Some(reader); + break; + } + + if self.spills_reader.is_none() { + return Ok(None); + } + } + + if let Some(mut spills_reader) = self.spills_reader.take() { + if let Some(v) = spills_reader.read(self.join.read_settings)? { + self.spills_reader = Some(spills_reader); + return Ok(Some(v)); + } + } + } + } +} diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/grace_memory.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/grace_memory.rs new file mode 100644 index 0000000000000..a0ad98b17f19e --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/grace_memory.rs @@ -0,0 +1,70 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::PoisonError; + +use crate::pipelines::processors::transforms::memory::outer_left_join::OuterLeftHashJoin; +use crate::pipelines::processors::transforms::BasicHashJoinState; +use crate::pipelines::processors::transforms::HashJoinHashTable; +use crate::pipelines::processors::transforms::InnerHashJoin; +use crate::pipelines::processors::transforms::Join; + +pub trait GraceMemoryJoin: Join { + fn reset_memory(&mut self); +} + +fn reset_basic_state(state: &BasicHashJoinState) { + let locked = state.mutex.lock(); + let _locked = locked.unwrap_or_else(PoisonError::into_inner); + + if !state.columns.is_empty() { + state.columns.as_mut().clear(); + } + + if !state.chunks.is_empty() { + state.chunks.as_mut().clear(); + } + + if *state.build_rows != 0 { + *state.build_rows.as_mut() = 0; + } + + if !state.column_types.is_empty() { + state.column_types.as_mut().clear(); + } + + if !state.arenas.is_empty() { + state.arenas.as_mut().clear(); + } + + if !state.build_queue.is_empty() { + state.build_queue.as_mut().clear(); + } + + *state.hash_table.as_mut() = HashJoinHashTable::Null; +} + +impl GraceMemoryJoin for InnerHashJoin { + fn reset_memory(&mut self) { + self.performance_context.clear(); + reset_basic_state(&self.basic_state); + } +} + +impl GraceMemoryJoin for OuterLeftHashJoin { + fn reset_memory(&mut self) { + self.performance_context.clear(); + reset_basic_state(&self.basic_state); + } +} diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/grace_state.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/grace_state.rs new file mode 100644 index 0000000000000..f6a41102ec54e --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/grace_state.rs @@ -0,0 +1,71 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeMap; +use std::collections::VecDeque; +use std::sync::Arc; +use std::sync::Mutex; + +use parquet::file::metadata::RowGroupMetaData; + +use crate::pipelines::processors::transforms::new_hash_join::common::CStyleCell; +use crate::pipelines::processors::transforms::HashJoinFactory; +use crate::sessions::QueryContext; + +#[derive(Debug)] +pub struct SpillMetadata { + pub path: String, + pub row_groups: Vec, +} + +pub struct GraceHashJoinState { + pub mutex: Mutex<()>, + pub ctx: Arc, + pub finished: CStyleCell, + pub restore_partition: CStyleCell>, + pub restore_build_queue: CStyleCell>, + pub restore_probe_queue: CStyleCell>, + pub build_row_groups: CStyleCell>>, + pub probe_row_groups: CStyleCell>>, + + level: usize, + factory: Arc, +} + +impl GraceHashJoinState { + pub fn create( + ctx: Arc, + level: usize, + factory: Arc, + ) -> Arc { + Arc::from(GraceHashJoinState { + ctx, + level, + factory, + mutex: Mutex::new(()), + finished: CStyleCell::new(false), + build_row_groups: CStyleCell::new(BTreeMap::new()), + probe_row_groups: CStyleCell::new(BTreeMap::new()), + restore_build_queue: CStyleCell::new(VecDeque::new()), + restore_probe_queue: CStyleCell::new(VecDeque::new()), + restore_partition: CStyleCell::new(None), + }) + } +} + +impl Drop for GraceHashJoinState { + fn drop(&mut self) { + self.factory.remove_grace_state(self.level); + } +} diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/mod.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/mod.rs new file mode 100644 index 0000000000000..0ee237fcf9890 --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/mod.rs @@ -0,0 +1,20 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod grace_join; +mod grace_memory; +mod grace_state; + +pub use grace_join::GraceHashJoin; +pub use grace_state::*; diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/hash_join_factory.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/hash_join_factory.rs new file mode 100644 index 0000000000000..bc7aad3e4a36f --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/hash_join_factory.rs @@ -0,0 +1,190 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::hash_map::Entry; +use std::collections::HashMap; +use std::sync::Arc; +use std::sync::Mutex; +use std::sync::PoisonError; +use std::sync::Weak; + +use databend_common_catalog::table_context::TableContext; +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_expression::FunctionContext; +use databend_common_expression::HashMethodKind; +use databend_common_sql::plans::JoinType; + +use crate::pipelines::processors::transforms::memory::outer_left_join::OuterLeftHashJoin; +use crate::pipelines::processors::transforms::new_hash_join::common::CStyleCell; +use crate::pipelines::processors::transforms::new_hash_join::grace::GraceHashJoinState; +use crate::pipelines::processors::transforms::BasicHashJoinState; +use crate::pipelines::processors::transforms::GraceHashJoin; +use crate::pipelines::processors::transforms::InnerHashJoin; +use crate::pipelines::processors::transforms::Join; +use crate::pipelines::processors::HashJoinDesc; +use crate::sessions::QueryContext; + +pub struct HashJoinFactory { + mutex: Mutex<()>, + ctx: Arc, + desc: Arc, + hash_method: HashMethodKind, + function_ctx: FunctionContext, + grace_state: CStyleCell>>, + basic_state: CStyleCell>>, +} + +impl HashJoinFactory { + pub fn create( + ctx: Arc, + function_ctx: FunctionContext, + method: HashMethodKind, + desc: Arc, + ) -> Arc { + Arc::new(HashJoinFactory { + ctx, + desc, + function_ctx, + hash_method: method, + mutex: Mutex::new(()), + grace_state: CStyleCell::new(HashMap::new()), + basic_state: CStyleCell::new(HashMap::new()), + }) + } + + pub fn create_grace_state(self: &Arc, id: usize) -> Result> { + let locked = self.mutex.lock(); + let _locked = locked.unwrap_or_else(PoisonError::into_inner); + + let ctx = self.ctx.clone(); + match self.grace_state.as_mut().entry(id) { + Entry::Occupied(v) => match v.get().upgrade() { + Some(v) => Ok(v), + None => Err(ErrorCode::Internal(format!( + "Error state: The level {} grace hash state has been destroyed.", + id + ))), + }, + Entry::Vacant(v) => { + let grace_state = GraceHashJoinState::create(ctx, id, self.clone()); + v.insert(Arc::downgrade(&grace_state)); + Ok(grace_state) + } + } + } + + pub fn create_basic_state(self: &Arc, id: usize) -> Result> { + let locked = self.mutex.lock(); + let _locked = locked.unwrap_or_else(PoisonError::into_inner); + + match self.basic_state.as_mut().entry(id) { + Entry::Occupied(v) => match v.get().upgrade() { + Some(v) => Ok(v), + None => Err(ErrorCode::Internal(format!( + "Error state: The level {} basic hash state has been destroyed.", + id + ))), + }, + Entry::Vacant(v) => { + let basic_hash_state = Arc::new(BasicHashJoinState::create(id, self.clone())); + v.insert(Arc::downgrade(&basic_hash_state)); + Ok(basic_hash_state) + } + } + } + + pub fn remove_basic_state(&self, id: usize) { + let locked = self.mutex.lock(); + let _locked = locked.unwrap_or_else(PoisonError::into_inner); + + self.basic_state.as_mut().remove(&id); + } + + pub fn remove_grace_state(&self, id: usize) { + let locked = self.mutex.lock(); + let _locked = locked.unwrap_or_else(PoisonError::into_inner); + self.grace_state.as_mut().remove(&id); + } + + pub fn create_hash_join(self: &Arc, typ: JoinType, id: usize) -> Result> { + let settings = self.ctx.get_settings(); + + if settings.get_force_join_data_spill()? { + return self.create_grace_join(typ, id); + } + + match typ { + JoinType::Inner => Ok(Box::new(InnerHashJoin::create( + &self.ctx, + self.function_ctx.clone(), + self.hash_method.clone(), + self.desc.clone(), + self.create_basic_state(id)?, + )?)), + JoinType::Left => Ok(Box::new(OuterLeftHashJoin::create( + &self.ctx, + self.function_ctx.clone(), + self.hash_method.clone(), + self.desc.clone(), + self.create_basic_state(id)?, + )?)), + _ => unreachable!(), + } + } + + pub fn create_grace_join(self: &Arc, typ: JoinType, id: usize) -> Result> { + match typ { + JoinType::Inner => { + let inner_hash_join = InnerHashJoin::create( + &self.ctx, + self.function_ctx.clone(), + self.hash_method.clone(), + self.desc.clone(), + self.create_basic_state(id)?, + )?; + + Ok(Box::new(GraceHashJoin::create( + self.ctx.clone(), + self.function_ctx.clone(), + self.hash_method.clone(), + self.desc.clone(), + self.create_grace_state(id + 1)?, + inner_hash_join, + 0, + )?)) + } + JoinType::Left => { + let left_hash_join = OuterLeftHashJoin::create( + &self.ctx, + self.function_ctx.clone(), + self.hash_method.clone(), + self.desc.clone(), + self.create_basic_state(id)?, + )?; + + Ok(Box::new(GraceHashJoin::create( + self.ctx.clone(), + self.function_ctx.clone(), + self.hash_method.clone(), + self.desc.clone(), + self.create_grace_state(id + 1)?, + left_hash_join, + 0, + )?)) + } + _ => unreachable!(), + } + } +} diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/hybrid/hybrid_join.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/hybrid/hybrid_join.rs new file mode 100644 index 0000000000000..64b8eaef12663 --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/hybrid/hybrid_join.rs @@ -0,0 +1,66 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_pipeline_transforms::MemorySettings; + +use crate::pipelines::processors::transforms::Join; + +#[allow(dead_code)] +struct MemoryHashJoin { + inner: Box, + memory_setting: MemorySettings, +} +// pub struct HybridHashJoin { +// inner: Box, +// memory_settings: MemorySettings, +// +// is_memory: bool, +// state_factory: Arc, +// } +// +// impl Join for HybridHashJoin { +// fn add_block(&mut self, data: Option) -> Result<()> { +// self.inner.add_block(data)?; +// +// // if self.is_memory +// if let HybridHashJoin::Memory(memory) = self { +// if memory.memory_setting.check_spill() { +// // memory.inner.reset_memory()?; +// } +// } +// +// Ok(()) +// } +// +// fn final_build(&mut self) -> Result> { +// match self { +// HybridHashJoin::Memory(memory) => memory.inner.final_build(), +// HybridHashJoin::GraceHashJoin(grace_hash_join) => grace_hash_join.final_build(), +// } +// } +// +// fn probe_block(&mut self, data: DataBlock) -> Result> { +// match self { +// HybridHashJoin::Memory(memory) => memory.inner.probe_block(data), +// HybridHashJoin::GraceHashJoin(grace_hash_join) => grace_hash_join.probe_block(data), +// } +// } +// +// fn final_probe(&mut self) -> Result>> { +// match self { +// HybridHashJoin::Memory(memory) => memory.inner.final_probe(), +// HybridHashJoin::GraceHashJoin(grace_hash_join) => grace_hash_join.final_probe(), +// } +// } +// } diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/hybrid/hybrid_state.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/hybrid/hybrid_state.rs new file mode 100644 index 0000000000000..159e8ef8674c9 --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/hybrid/hybrid_state.rs @@ -0,0 +1,20 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::pipelines::processors::transforms::new_hash_join::grace::GraceHashJoinState; + +#[allow(dead_code)] +pub struct HybridState { + grace_state: GraceHashJoinState, +} diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/hybrid/mod.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/hybrid/mod.rs new file mode 100644 index 0000000000000..4af53c0b88c44 --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/hybrid/mod.rs @@ -0,0 +1,16 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod hybrid_join; +mod hybrid_state; diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/join.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/join.rs index f76eb09878bc7..d1a44c0992b28 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/join.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/join.rs @@ -34,8 +34,8 @@ pub trait Join: Send + Sync + 'static { fn probe_block(&mut self, data: DataBlock) -> Result>; - fn final_probe(&mut self) -> Result> { - Ok(Box::new(EmptyJoinStream)) + fn final_probe(&mut self) -> Result>> { + Ok(None) } } diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/basic.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/basic.rs index 51f343728bc54..8168fbc081173 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/basic.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/basic.rs @@ -43,12 +43,12 @@ use crate::pipelines::processors::HashJoinDesc; use crate::sessions::QueryContext; pub struct BasicHashJoin { - desc: Arc, - squash_block: SquashBlocks, + pub(crate) desc: Arc, + pub(crate) squash_block: SquashBlocks, - method: HashMethodKind, - function_ctx: FunctionContext, - state: Arc, + pub(crate) method: HashMethodKind, + pub(crate) function_ctx: FunctionContext, + pub(crate) state: Arc, } impl BasicHashJoin { diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/basic_state.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/basic_state.rs index e575e6f508dd1..897ed42c27eee 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/basic_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/basic_state.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::VecDeque; +use std::sync::Arc; use std::sync::Mutex; use databend_common_expression::types::DataType; @@ -20,6 +21,7 @@ use databend_common_expression::ColumnVec; use databend_common_expression::DataBlock; use crate::pipelines::processors::transforms::new_hash_join::common::CStyleCell; +use crate::pipelines::processors::transforms::HashJoinFactory; use crate::pipelines::processors::transforms::HashJoinHashTable; pub struct BasicHashJoinState { @@ -32,11 +34,16 @@ pub struct BasicHashJoinState { pub arenas: CStyleCell>>, pub hash_table: CStyleCell, + + level: usize, + factory: Arc, } impl BasicHashJoinState { - pub fn create() -> Self { + pub fn create(level: usize, factory: Arc) -> Self { BasicHashJoinState { + level, + factory, mutex: Mutex::new(()), build_rows: CStyleCell::new(0), chunks: CStyleCell::new(Vec::new()), @@ -48,3 +55,9 @@ impl BasicHashJoinState { } } } + +impl Drop for BasicHashJoinState { + fn drop(&mut self) { + self.factory.remove_basic_state(self.level) + } +} diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/inner_join.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/inner_join.rs index 0cb4f1a3a4e4a..0988f95be0696 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/inner_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/inner_join.rs @@ -45,12 +45,12 @@ use crate::pipelines::processors::HashJoinDesc; use crate::sessions::QueryContext; pub struct InnerHashJoin { - basic_hash_join: BasicHashJoin, + pub(crate) basic_hash_join: BasicHashJoin, - desc: Arc, - function_ctx: FunctionContext, - basic_state: Arc, - performance_context: PerformanceContext, + pub(crate) desc: Arc, + pub(crate) function_ctx: FunctionContext, + pub(crate) basic_state: Arc, + pub(crate) performance_context: PerformanceContext, } impl InnerHashJoin { diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/outer_left_join.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/outer_left_join.rs index d4a59d2e7fd0f..8210b495bfca3 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/outer_left_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/outer_left_join.rs @@ -46,12 +46,12 @@ use crate::pipelines::processors::HashJoinDesc; use crate::sessions::QueryContext; pub struct OuterLeftHashJoin { - basic_hash_join: BasicHashJoin, + pub(crate) basic_hash_join: BasicHashJoin, - desc: Arc, - function_ctx: FunctionContext, - basic_state: Arc, - performance_context: PerformanceContext, + pub(crate) desc: Arc, + pub(crate) function_ctx: FunctionContext, + pub(crate) basic_state: Arc, + pub(crate) performance_context: PerformanceContext, } impl OuterLeftHashJoin { diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/mod.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/mod.rs index b949c36f331a4..002e21b578204 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/mod.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/mod.rs @@ -13,13 +13,18 @@ // limitations under the License. mod common; +mod grace; +mod hash_join_factory; mod hashtable; +mod hybrid; mod join; pub mod memory; mod performance; mod runtime_filter; mod transform_hash_join; +pub use grace::GraceHashJoin; +pub use hash_join_factory::HashJoinFactory; pub use join::Join; pub use memory::BasicHashJoinState; pub use memory::InnerHashJoin; diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/performance.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/performance.rs index 4a75913e3d1de..5f3b71794e286 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/performance.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/performance.rs @@ -55,4 +55,9 @@ impl PerformanceContext { probe_hash_statistics: ProbeHashStatistics::new(max_block_size), } } + + pub fn clear(&mut self) { + self.probe_result.clear(); + self.probe_hash_statistics.clear(0); + } } diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/transform_hash_join.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/transform_hash_join.rs index e2dce50cb30ca..1f329b4993abe 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/transform_hash_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/transform_hash_join.rs @@ -157,17 +157,22 @@ impl Processor for TransformHashJoin { Ok(()) } Stage::ProbeFinal(state) => { - if !state.initialized { - state.initialized = true; - let final_stream = self.join.final_probe()?; - // This is safe because both join and stream are properties of the struct. - state.stream = Some(unsafe { std::mem::transmute(final_stream) }); + if state.stream.is_none() { + if let Some(final_stream) = self.join.final_probe()? { + state.initialize = true; + // This is safe because both join and stream are properties of the struct. + state.stream = Some(unsafe { std::mem::transmute(final_stream) }); + } else { + state.finished = true; + } } if let Some(mut stream) = state.stream.take() { if let Some(joined_data) = stream.next()? { self.joined_data = Some(joined_data); state.stream = Some(stream); + } else { + state.initialize = false; } } @@ -179,7 +184,7 @@ impl Processor for TransformHashJoin { async fn async_process(&mut self) -> Result<()> { let wait_res = self.stage_sync_barrier.wait().await; - self.stage = match self.stage { + self.stage = match &mut self.stage { Stage::Build(_) => { if wait_res.is_leader() { let packet = self.join.build_runtime_filter(&self.rf_desc)?; @@ -193,7 +198,14 @@ impl Processor for TransformHashJoin { } Stage::BuildFinal(_) => Stage::Probe(ProbeState::new()), Stage::Probe(_) => Stage::ProbeFinal(ProbeFinalState::new()), - Stage::ProbeFinal(_) => Stage::Finished, + Stage::ProbeFinal(state) => match state.finished { + true => Stage::Finished, + false => Stage::ProbeFinal(ProbeFinalState { + initialize: true, + finished: state.finished, + stream: state.stream.take(), + }), + }, Stage::Finished => Stage::Finished, }; @@ -296,14 +308,15 @@ impl ProbeState { } struct ProbeFinalState { - initialized: bool, + finished: bool, + initialize: bool, stream: Option>, } impl Debug for ProbeFinalState { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("ProbeFinalState") - .field("initialized", &self.initialized) + .field("initialized", &self.finished) .finish() } } @@ -311,8 +324,9 @@ impl Debug for ProbeFinalState { impl ProbeFinalState { pub fn new() -> ProbeFinalState { ProbeFinalState { - initialized: false, stream: None, + finished: false, + initialize: false, } } @@ -321,11 +335,14 @@ impl ProbeFinalState { return Ok(Event::Sync); } - if self.initialized { + if self.finished { output_port.finish(); return Ok(Event::Async); } - Ok(Event::Sync) + match self.initialize { + true => Ok(Event::Sync), + false => Ok(Event::Async), + } } } diff --git a/src/query/service/src/pipelines/processors/transforms/window/partition/transform_window_partition_collect.rs b/src/query/service/src/pipelines/processors/transforms/window/partition/transform_window_partition_collect.rs index bc498363849fc..5ef2668c44073 100644 --- a/src/query/service/src/pipelines/processors/transforms/window/partition/transform_window_partition_collect.rs +++ b/src/query/service/src/pipelines/processors/transforms/window/partition/transform_window_partition_collect.rs @@ -39,11 +39,11 @@ use super::WindowPartitionMeta; use crate::pipelines::processors::transforms::DataProcessorStrategy; use crate::sessions::QueryContext; use crate::spillers::BackpressureSpiller; -use crate::spillers::BufferPool; use crate::spillers::Spiller; use crate::spillers::SpillerConfig; use crate::spillers::SpillerDiskConfig; use crate::spillers::SpillerType; +use crate::spillers::SpillsBufferPool; enum WindowBuffer { V1(WindowPartitionBuffer), @@ -191,7 +191,7 @@ impl TransformWindowPartitionCollect { Either::Left(Spiller::create(ctx, operator, spill_config)?) } else { let runtime = GlobalIORuntime::instance(); - let buffer_pool = BufferPool::create(runtime, 128 * 1024 * 1024, 3); + let buffer_pool = SpillsBufferPool::create(runtime, 128 * 1024 * 1024, 3); Either::Right(BackpressureSpiller::create( ctx, operator, diff --git a/src/query/service/src/spillers/adapter.rs b/src/query/service/src/spillers/adapter.rs index b47ec6a8bbdc0..2696671742721 100644 --- a/src/query/service/src/spillers/adapter.rs +++ b/src/query/service/src/spillers/adapter.rs @@ -35,13 +35,13 @@ use opendal::Buffer; use opendal::Operator; use parquet::file::metadata::RowGroupMetaDataPtr; -use super::async_buffer::BufferPool; use super::block_reader::BlocksReader; use super::block_writer::BlocksWriter; use super::inner::*; use super::row_group_encoder::*; use super::serialize::*; use super::Location; +use super::SpillsBufferPool; use crate::sessions::QueryContext; #[derive(Clone)] @@ -357,7 +357,7 @@ impl Spiller { #[derive(Clone)] pub struct BackpressureAdapter { ctx: Arc, - buffer_pool: Arc, + buffer_pool: Arc, chunk_size: usize, } @@ -382,7 +382,7 @@ impl BackpressureSpiller { ctx: Arc, operator: Operator, config: SpillerConfig, - buffer_pool: Arc, + buffer_pool: Arc, chunk_size: usize, ) -> Result { Self::new( diff --git a/src/query/service/src/spillers/async_buffer.rs b/src/query/service/src/spillers/async_buffer.rs index b38946c85ac1e..1b2e06ff6a952 100644 --- a/src/query/service/src/spillers/async_buffer.rs +++ b/src/query/service/src/spillers/async_buffer.rs @@ -20,14 +20,39 @@ use std::sync::Condvar; use std::sync::Mutex; use std::sync::PoisonError; +use arrow_schema::Schema; use bytes::Bytes; use bytes::BytesMut; +use databend_common_base::base::GlobalInstance; +use databend_common_base::runtime::GlobalIORuntime; use databend_common_base::runtime::Runtime; use databend_common_base::runtime::TrySpawn; +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_expression::infer_table_schema; +use databend_common_expression::DataBlock; +use databend_common_expression::DataSchema; +use databend_common_expression::DataSchemaRef; +use databend_common_expression::TableSchemaRef; +use databend_common_storages_parquet::read_all; +use databend_common_storages_parquet::InMemoryRowGroup; +use databend_common_storages_parquet::ReadSettings; use fastrace::future::FutureExt; use fastrace::Span; use opendal::Metadata; +use opendal::Operator; use opendal::Writer; +use parquet::arrow::parquet_to_arrow_field_levels; +use parquet::arrow::parquet_to_arrow_schema; +use parquet::arrow::ArrowWriter; +use parquet::arrow::FieldLevels; +use parquet::arrow::ProjectionMask; +use parquet::basic::Compression; +use parquet::file::metadata::RowGroupMetaData; +use parquet::file::properties::EnabledStatistics; +use parquet::file::properties::WriterProperties; + +use crate::spillers::serialize::fake_data_schema; const CHUNK_SIZE: usize = 4 * 1024 * 1024; @@ -82,14 +107,27 @@ const CHUNK_SIZE: usize = 4 * 1024 * 1024; /// - Buffer reuse minimizes GC pressure during intensive spill operations /// - Automatic flow control matches spill rate to storage bandwidth /// - Works with any OpenDAL-supported storage (local disk, S3, etc.) -pub struct BufferPool { +pub struct SpillsBufferPool { working_queue: async_channel::Sender, available_write_buffers: async_channel::Receiver, available_write_buffers_tx: async_channel::Sender, } -impl BufferPool { - pub fn create(executor: Arc, memory: usize, workers: usize) -> Arc { +impl SpillsBufferPool { + pub fn init() { + // TODO: config + GlobalInstance::set(SpillsBufferPool::create( + GlobalIORuntime::instance(), + 200 * 1024 * 1024, + 2, + )) + } + + pub fn instance() -> Arc { + GlobalInstance::get() + } + + pub fn create(executor: Arc, memory: usize, workers: usize) -> Arc { let (working_tx, working_rx) = async_channel::unbounded(); let (buffers_tx, buffers_rx) = async_channel::unbounded(); @@ -104,27 +142,29 @@ impl BufferPool { for _ in 0..workers { let working_queue: async_channel::Receiver = working_rx.clone(); let available_write_buffers = buffers_tx.clone(); - executor.spawn(async move { - let mut background = Background::create(available_write_buffers); - while let Ok(op) = working_queue.recv().await { - let span = Span::enter_with_parent("Background::recv", op.span()); - background.recv(op).in_span(span).await; - } - }); + executor.spawn( + async_backtrace::location!(String::from("async_buffer")).frame(async move { + let mut background = Background::create(available_write_buffers); + while let Ok(op) = working_queue.recv().await { + let span = Span::enter_with_parent("Background::recv", op.span()); + background.recv(op).in_span(span).await; + } + }), + ); } - Arc::new(BufferPool { + Arc::new(SpillsBufferPool { working_queue: working_tx, available_write_buffers: buffers_rx, available_write_buffers_tx: buffers_tx, }) } - pub fn try_alloc_buffer(&self) -> Option { + pub(crate) fn try_alloc_buffer(&self) -> Option { self.available_write_buffers.try_recv().ok() } - pub fn alloc_buffer(&self) -> std::io::Result { + pub(crate) fn alloc_buffer(&self) -> std::io::Result { match self.available_write_buffers.recv_blocking() { Ok(buf) => Ok(buf), Err(_) => Err(std::io::Error::new( @@ -134,26 +174,59 @@ impl BufferPool { } } - pub fn write(&self, op: BufferWriteOperator) { + pub(crate) fn operator(&self, op: BufferOperator) { self.working_queue - .try_send(BufferOperator::Write(op)) - .expect("Buffer pool working queue need unbounded.") + .try_send(op) + .expect("Buffer pool working queue need unbounded."); } - pub fn close(&self, op: BufferCloseOperator) { - self.working_queue - .try_send(BufferOperator::Close(op)) - .expect("Buffer pool working queue need unbounded.") + pub fn buffer_write(self: &Arc, writer: Writer) -> BufferWriter { + BufferWriter::new(writer, self.clone()) } - pub fn buffer_write(self: &Arc, writer: Writer) -> BufferWriter { - BufferWriter::new(writer, self.clone()) + pub fn writer(self: &Arc, op: Operator, path: String) -> Result { + let pending_response = Arc::new(BufferOperatorResp { + mutex: Mutex::new(None), + condvar: Default::default(), + }); + + let operator = BufferOperator::CreateWriter(CreateWriterOperator { + span: Span::enter_with_local_parent("CreateWriterOperator"), + op, + path, + response: pending_response.clone(), + }); + + if self.working_queue.try_send(operator).is_err() { + unreachable!("Buffer pool working queue need unbounded."); + } + + let locked = pending_response.mutex.lock(); + let mut locked = locked.unwrap_or_else(PoisonError::into_inner); + + if locked.is_none() { + let waited = pending_response.condvar.wait(locked); + locked = waited.unwrap_or_else(PoisonError::into_inner); + } + + Ok(SpillsDataWriter::Uninitialize(Some( + self.buffer_write(locked.take().unwrap()?), + ))) } - pub fn release_buffer(&self, buffer: BytesMut) { - self.available_write_buffers_tx - .try_send(buffer) - .expect("Buffer pool available_write_buffers need unbounded.") + pub fn reader( + self: &Arc, + op: Operator, + path: String, + row_groups: Vec, + ) -> Result { + SpillsDataReader::create(path, op, row_groups, self.clone()) + } + + pub(crate) fn release_buffer(&self, buffer: BytesMut) { + if self.available_write_buffers_tx.try_send(buffer).is_err() { + unreachable!("Buffer pool available_write_buffers need unbounded."); + } } } @@ -162,13 +235,13 @@ pub struct BufferWriter { current_bytes: Option, - buffer_pool: Arc, + buffer_pool: Arc, pending_buffers: VecDeque, pending_response: Option>>, } impl BufferWriter { - pub fn new(writer: Writer, buffer_pool: Arc) -> BufferWriter { + pub fn new(writer: Writer, buffer_pool: Arc) -> BufferWriter { BufferWriter { buffer_pool, writer: Some(writer), @@ -213,12 +286,14 @@ impl BufferWriter { self.pending_response = Some(pending_response.clone()); - self.buffer_pool.write(BufferWriteOperator { + let operator = BufferOperator::Write(BufferWriteOperator { span: Span::enter_with_local_parent("BufferWriteOperator"), writer, response: pending_response, buffers: std::mem::take(&mut self.pending_buffers), }); + + self.buffer_pool.operator(operator); } Ok(()) @@ -233,12 +308,14 @@ impl BufferWriter { condvar: Default::default(), }); - self.buffer_pool.close(BufferCloseOperator { + let close_operator = BufferOperator::Close(BufferCloseOperator { span: Span::enter_with_local_parent("BufferCloseOperator"), writer, response: pending_response.clone(), }); + self.buffer_pool.operator(close_operator); + let locked = pending_response.mutex.lock(); let mut locked = locked.unwrap_or_else(PoisonError::into_inner); @@ -374,6 +451,175 @@ impl Drop for BufferWriter { } } +pub struct InitializedBlocksStreamWriter { + table_schema: TableSchemaRef, + writer: ArrowWriter, +} + +pub enum SpillsDataWriter { + Uninitialize(Option), + Initialized(InitializedBlocksStreamWriter), +} + +impl SpillsDataWriter { + pub fn create(writer: BufferWriter) -> Self { + Self::Uninitialize(Some(writer)) + } + + pub fn write(&mut self, block: DataBlock) -> Result<()> { + match self { + SpillsDataWriter::Uninitialize(writer) => { + let data_schema = fake_data_schema(&block); + let table_schema = infer_table_schema(&data_schema)?; + + let props = WriterProperties::builder() + .set_compression(Compression::LZ4_RAW) + .set_statistics_enabled(EnabledStatistics::None) + .set_bloom_filter_enabled(false) + .build(); + + let arrow_schema = Arc::new(Schema::from(table_schema.as_ref())); + let buffer_writer = writer.take().unwrap(); + let mut writer = ArrowWriter::try_new(buffer_writer, arrow_schema, Some(props))?; + let record_batch = block.to_record_batch(&table_schema)?; + writer.write(&record_batch)?; + *self = SpillsDataWriter::Initialized(InitializedBlocksStreamWriter { + writer, + table_schema, + }); + + Ok(()) + } + SpillsDataWriter::Initialized(writer) => { + let record_batch = block.to_record_batch(&writer.table_schema)?; + Ok(writer.writer.write(&record_batch)?) + } + } + } + + pub fn flush(&mut self) -> Result<()> { + match self { + SpillsDataWriter::Uninitialize(_) => Err(ErrorCode::Internal( + "Bad state, BlockStreamWriter is uninitialized", + )), + SpillsDataWriter::Initialized(writer) => { + writer.writer.flush()?; + Ok(writer.writer.inner_mut().flush()?) + } + } + } + + pub fn close(self) -> Result<(usize, Vec)> { + match self { + SpillsDataWriter::Uninitialize(mut writer) => { + if let Some(writer) = writer.take() { + writer.close()?; + } + + Ok((0, vec![])) + } + SpillsDataWriter::Initialized(mut writer) => { + writer.writer.flush()?; + let row_groups = writer.writer.flushed_row_groups().to_vec(); + let bytes_written = writer.writer.bytes_written(); + writer.writer.into_inner()?.close()?; + Ok((bytes_written, row_groups)) + } + } + } +} + +pub struct SpillsDataReader { + location: String, + operator: Operator, + row_groups: VecDeque, + spills_buffer_pool: Arc, + data_schema: DataSchemaRef, + field_levels: FieldLevels, +} + +impl SpillsDataReader { + pub fn create( + location: String, + operator: Operator, + row_groups: Vec, + spills_buffer_pool: Arc, + ) -> Result { + if row_groups.is_empty() { + return Err(ErrorCode::Internal( + "Parquet reader cannot read empty row groups.", + )); + } + + let arrow_schema = parquet_to_arrow_schema(row_groups[0].schema_descr(), None)?; + let data_schema = DataSchemaRef::new(DataSchema::try_from(&arrow_schema)?); + + let field_levels = parquet_to_arrow_field_levels( + row_groups[0].schema_descr(), + ProjectionMask::all(), + None, + )?; + + Ok(SpillsDataReader { + location, + operator, + spills_buffer_pool, + data_schema, + field_levels, + row_groups: VecDeque::from(row_groups), + }) + } + + #[allow(clippy::missing_transmute_annotations)] + pub fn read(&mut self, settings: ReadSettings) -> Result> { + let Some(row_group) = self.row_groups.pop_front() else { + return Ok(None); + }; + + let row_group = InMemoryRowGroup::new( + self.location.as_str(), + self.operator.clone(), + &row_group, + None, + settings, + ); + + let pending_response = Arc::new(BufferOperatorResp { + condvar: Default::default(), + mutex: Mutex::new(None), + }); + + let operator = BufferOperator::ReadRowGroup(ReadRowGroupOperator { + span: Span::enter_with_local_parent("ReadRowGroupOperator"), + in_memory_row_group: unsafe { std::mem::transmute(row_group) }, + response: pending_response.clone(), + }); + + self.spills_buffer_pool.operator(operator); + + let locked = pending_response.mutex.lock(); + let mut locked = locked.unwrap_or_else(PoisonError::into_inner); + + if locked.is_none() { + let waited = pending_response.condvar.wait(locked); + locked = waited.unwrap_or_else(PoisonError::into_inner); + } + + let fetched_row_group = locked.take().unwrap()?; + + let num_rows = fetched_row_group.row_count(); + + Ok(Some(read_all( + &self.data_schema, + &fetched_row_group, + &self.field_levels, + None, + &None, + num_rows, + )?)) + } +} + pub struct BufferWriteResp { writer: Writer, error: Option, @@ -397,6 +643,20 @@ pub struct BufferCloseOperator { response: Arc>, } +pub struct CreateWriterOperator { + span: Span, + op: Operator, + path: String, + response: Arc>>, +} + +pub struct ReadRowGroupOperator { + span: Span, + in_memory_row_group: InMemoryRowGroup<'static>, + response: Arc>>>, +} + +#[derive(Default)] pub struct BufferOperatorResp { condvar: Condvar, mutex: Mutex>, @@ -405,6 +665,8 @@ pub struct BufferOperatorResp { pub enum BufferOperator { Write(BufferWriteOperator), Close(BufferCloseOperator), + CreateWriter(CreateWriterOperator), + ReadRowGroup(ReadRowGroupOperator), } impl BufferOperator { @@ -412,6 +674,8 @@ impl BufferOperator { match self { BufferOperator::Write(op) => &op.span, BufferOperator::Close(op) => &op.span, + BufferOperator::CreateWriter(op) => &op.span, + BufferOperator::ReadRowGroup(op) => &op.span, } } } @@ -474,6 +738,24 @@ impl Background { res: res.map_err(std::io::Error::from), }); + op.response.condvar.notify_one(); + } + BufferOperator::CreateWriter(op) => { + let writer = op.op.writer(&op.path).await; + let locked = op.response.mutex.lock(); + let mut locked = locked.unwrap_or_else(PoisonError::into_inner); + *locked = Some(writer); + + op.response.condvar.notify_one(); + } + BufferOperator::ReadRowGroup(mut op) => { + let projection_mask = ProjectionMask::all(); + let res = op.in_memory_row_group.fetch(&projection_mask, None).await; + + let locked = op.response.mutex.lock(); + let mut locked = locked.unwrap_or_else(PoisonError::into_inner); + *locked = Some(res.map(|_| op.in_memory_row_group)); + op.response.condvar.notify_one(); } } @@ -504,7 +786,7 @@ mod tests { let memory = 16 * 1024 * 1024; // 16MB let workers = 2; - let pool = BufferPool::create(runtime.clone(), memory, workers); + let pool = SpillsBufferPool::create(runtime.clone(), memory, workers); // Should be able to allocate buffers let buffer1 = pool.try_alloc_buffer(); @@ -519,7 +801,7 @@ mod tests { #[tokio::test] async fn test_buffer_writer_basic_write() { let runtime = Arc::new(Runtime::with_worker_threads(2, None).unwrap()); - let pool = BufferPool::create(runtime.clone(), 8 * 1024 * 1024, 1); + let pool = SpillsBufferPool::create(runtime.clone(), 8 * 1024 * 1024, 1); let operator = create_test_operator().unwrap(); let writer = operator.writer("test_file").await.unwrap(); @@ -537,7 +819,7 @@ mod tests { #[tokio::test] async fn test_buffer_writer_large_write() { let runtime = Arc::new(Runtime::with_worker_threads(2, None).unwrap()); - let pool = BufferPool::create(runtime.clone(), 16 * 1024 * 1024, 2); + let pool = SpillsBufferPool::create(runtime.clone(), 16 * 1024 * 1024, 2); let operator = create_test_operator().unwrap(); let writer = operator.writer("large_file").await.unwrap(); @@ -556,7 +838,7 @@ mod tests { #[tokio::test] async fn test_buffer_writer_multiple_writes() { let runtime = Arc::new(Runtime::with_worker_threads(2, None).unwrap()); - let pool = BufferPool::create(runtime.clone(), 8 * 1024 * 1024, 1); + let pool = SpillsBufferPool::create(runtime.clone(), 8 * 1024 * 1024, 1); let operator = create_test_operator().unwrap(); let writer = operator.writer("multi_write_file").await.unwrap(); @@ -578,7 +860,7 @@ mod tests { async fn test_buffer_pool_exhaustion_and_backpressure() { let runtime = Arc::new(Runtime::with_worker_threads(2, None).unwrap()); // Create pool with only 1 buffer to test backpressure - let pool = BufferPool::create(runtime.clone(), CHUNK_SIZE, 1); + let pool = SpillsBufferPool::create(runtime.clone(), CHUNK_SIZE, 1); let operator = create_test_operator().unwrap(); let writer = operator.writer("backpressure_test").await.unwrap(); @@ -600,7 +882,7 @@ mod tests { #[tokio::test] async fn test_buffer_reuse() { let runtime = Arc::new(Runtime::with_worker_threads(2, None).unwrap()); - let pool = BufferPool::create(runtime.clone(), 8 * 1024 * 1024, 1); + let pool = SpillsBufferPool::create(runtime.clone(), 8 * 1024 * 1024, 1); // Allocate all buffers let mut buffers = Vec::new(); @@ -631,7 +913,7 @@ mod tests { #[tokio::test] async fn test_empty_write() { let runtime = Arc::new(Runtime::with_worker_threads(2, None).unwrap()); - let pool = BufferPool::create(runtime.clone(), 8 * 1024 * 1024, 1); + let pool = SpillsBufferPool::create(runtime.clone(), 8 * 1024 * 1024, 1); let operator = create_test_operator().unwrap(); let writer = operator.writer("empty_test").await.unwrap(); @@ -648,7 +930,7 @@ mod tests { #[tokio::test] async fn test_close_without_writes() { let runtime = Arc::new(Runtime::with_worker_threads(2, None).unwrap()); - let pool = BufferPool::create(runtime.clone(), 8 * 1024 * 1024, 1); + let pool = SpillsBufferPool::create(runtime.clone(), 8 * 1024 * 1024, 1); let operator = create_test_operator().unwrap(); let writer = operator.writer("no_write_test").await.unwrap(); @@ -662,7 +944,7 @@ mod tests { #[tokio::test] async fn test_concurrent_writers() { let runtime = Arc::new(Runtime::with_worker_threads(2, None).unwrap()); - let pool = BufferPool::create(runtime.clone(), 16 * 1024 * 1024, 4); + let pool = SpillsBufferPool::create(runtime.clone(), 16 * 1024 * 1024, 4); let operator = create_test_operator().unwrap(); let write_count = Arc::new(AtomicUsize::new(0)); @@ -704,7 +986,7 @@ mod tests { #[tokio::test] async fn test_writer_close_error_handling() { let runtime = Arc::new(Runtime::with_worker_threads(2, None).unwrap()); - let pool = BufferPool::create(runtime.clone(), 8 * 1024 * 1024, 1); + let pool = SpillsBufferPool::create(runtime.clone(), 8 * 1024 * 1024, 1); let operator = create_test_operator().unwrap(); let writer = operator.writer("error_test").await.unwrap(); diff --git a/src/query/service/src/spillers/mod.rs b/src/query/service/src/spillers/mod.rs index 94e773afe6ae5..0aec89075e6eb 100644 --- a/src/query/service/src/spillers/mod.rs +++ b/src/query/service/src/spillers/mod.rs @@ -24,7 +24,10 @@ mod serialize; mod test_memory; pub use adapter::*; -pub use async_buffer::BufferPool; +pub use async_buffer::BufferWriter; +pub use async_buffer::SpillsBufferPool; +pub use async_buffer::SpillsDataReader; +pub use async_buffer::SpillsDataWriter; pub use block_writer::*; pub use databend_common_pipeline_transforms::traits::Location; pub use inner::*; diff --git a/src/query/service/src/spillers/row_group_encoder.rs b/src/query/service/src/spillers/row_group_encoder.rs index ebc19f9ce708d..6abb93be5712d 100644 --- a/src/query/service/src/spillers/row_group_encoder.rs +++ b/src/query/service/src/spillers/row_group_encoder.rs @@ -51,10 +51,10 @@ use parquet::file::writer::SerializedFileWriter; use parquet::file::writer::SerializedRowGroupWriter; use parquet::schema::types::SchemaDescriptor; -use super::async_buffer::BufferPool; use super::async_buffer::BufferWriter; use super::Location; use super::SpillerInner; +use super::SpillsBufferPool; pub struct Properties { schema: Arc, @@ -338,7 +338,7 @@ impl SpillerInner { pub(super) async fn new_file_writer( &self, props: &Properties, - pool: &Arc, + pool: &Arc, chunk: usize, local_file_size: Option, ) -> Result { diff --git a/src/query/service/src/spillers/serialize.rs b/src/query/service/src/spillers/serialize.rs index 7316a914eb00e..e8e846831ed19 100644 --- a/src/query/service/src/spillers/serialize.rs +++ b/src/query/service/src/spillers/serialize.rs @@ -112,7 +112,7 @@ pub(super) fn deserialize_block(columns_layout: &Layout, data: Buffer) -> Result } } -fn fake_data_schema(block: &DataBlock) -> DataSchema { +pub fn fake_data_schema(block: &DataBlock) -> DataSchema { let fields = block .columns() .iter() diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index 302d178413834..4adbef031093d 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -1501,7 +1501,6 @@ impl DefaultSettings { scope: SettingScope::Both, range: Some(SettingRange::String(vec![S3StorageClass::Standard.to_string(), S3StorageClass::IntelligentTiering.to_string()])), }), - ]); Ok(Arc::new(DefaultSettings { diff --git a/src/query/storages/parquet/src/lib.rs b/src/query/storages/parquet/src/lib.rs index 1702f66d07a9d..7b3db58260a8c 100644 --- a/src/query/storages/parquet/src/lib.rs +++ b/src/query/storages/parquet/src/lib.rs @@ -49,6 +49,8 @@ pub use parquet_part::DeleteTask; pub use parquet_part::DeleteType; pub use parquet_part::ParquetFilePart; pub use parquet_part::ParquetPart; +pub use parquet_reader::read_all; +pub use parquet_reader::InMemoryRowGroup; pub use parquet_reader::InmMemoryFile; pub use parquet_reader::ParquetFileReader; pub use parquet_reader::ParquetReaderBuilder; @@ -57,5 +59,6 @@ pub use parquet_table::ParquetTable; pub use parquet_variant_table::ParquetVariantTable; // for it test pub use pruning::ParquetPruner; +pub use read_settings::ReadSettings; pub use source::ParquetSource; pub use source::ParquetSourceType; diff --git a/src/query/storages/parquet/src/parquet_reader/mod.rs b/src/query/storages/parquet/src/parquet_reader/mod.rs index 0bd798e9068d0..352df8b4364a2 100644 --- a/src/query/storages/parquet/src/parquet_reader/mod.rs +++ b/src/query/storages/parquet/src/parquet_reader/mod.rs @@ -19,6 +19,7 @@ mod row_group; mod topk; mod utils; +pub use read_policy::read_all; pub use read_policy::*; pub use reader::InmMemoryFile; pub use reader::ParquetFileReader; diff --git a/src/query/storages/parquet/src/parquet_reader/read_policy/mod.rs b/src/query/storages/parquet/src/parquet_reader/read_policy/mod.rs index 16ef42d0ed8b1..c3d038415c878 100644 --- a/src/query/storages/parquet/src/parquet_reader/read_policy/mod.rs +++ b/src/query/storages/parquet/src/parquet_reader/read_policy/mod.rs @@ -21,3 +21,4 @@ pub mod policy; pub use no_prefetch::NoPretchPolicyBuilder; pub use predicate_and_topk::PredicateAndTopkPolicyBuilder; pub use topk_only::TopkOnlyPolicyBuilder; +pub use utils::read_all;