From e10f8fefece78c0c506b49bb4f70ef716dda003a Mon Sep 17 00:00:00 2001 From: Dousir9 <736191200@qq.com> Date: Wed, 11 Oct 2023 21:27:15 +0800 Subject: [PATCH 1/2] add set_true_validity --- .../processors/transforms/hash_join/common.rs | 32 +++---- .../hash_join/hash_join_build_state.rs | 5 +- .../hash_join/hash_join_probe_state.rs | 31 ++----- .../hash_join/probe_join/inner_join.rs | 34 +++----- .../hash_join/probe_join/left_join.rs | 86 +++++-------------- .../hash_join/probe_join/right_join.rs | 12 +-- 6 files changed, 54 insertions(+), 146 deletions(-) diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/common.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/common.rs index 141a32061214..7a44d1cfcdd3 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/common.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/common.rs @@ -27,7 +27,6 @@ use common_expression::DataBlock; use common_expression::Evaluator; use common_expression::Expr; use common_expression::FunctionContext; -use common_expression::Scalar; use common_expression::Value; use common_functions::BUILTIN_FUNCTIONS; use common_sql::executor::cast_expr_to_non_null_boolean; @@ -187,32 +186,21 @@ impl HashJoinState { } } -pub(crate) fn set_validity(column: &BlockEntry, num_rows: usize, validity: &Bitmap) -> BlockEntry { +pub(crate) fn set_true_validity( + column: &BlockEntry, + num_rows: usize, + true_validity: &Bitmap, +) -> BlockEntry { let (value, data_type) = (&column.value, &column.data_type); let col = value.convert_to_full_column(data_type, num_rows); - - if matches!(col, Column::Null { .. }) { + if matches!(col, Column::Null { .. }) || col.as_nullable().is_some() { column.clone() - } else if let Some(col) = col.as_nullable() { - if col.len() == 0 { - return BlockEntry::new(data_type.clone(), Value::Scalar(Scalar::Null)); - } - // It's possible validity is longer than col. - let diff_len = validity.len() - col.validity.len(); - let mut new_validity = MutableBitmap::with_capacity(validity.len()); - for (b1, b2) in validity.iter().zip(col.validity.iter()) { - new_validity.push(b1 & b2); - } - new_validity.extend_constant(diff_len, false); - let col = Column::Nullable(Box::new(NullableColumn { - column: col.column.clone(), - validity: new_validity.into(), - })); - BlockEntry::new(data_type.clone(), Value::Column(col)) } else { + let mut validity = true_validity.clone(); + validity.slice(0, num_rows); let col = Column::Nullable(Box::new(NullableColumn { - column: col.clone(), - validity: validity.clone(), + column: col, + validity, })); BlockEntry::new(data_type.wrap_nullable(), Value::Column(col)) } diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs index 8e22215d5618..a6fc8d3d2431 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs @@ -54,7 +54,7 @@ use log::info; use parking_lot::Mutex; use parking_lot::RwLock; -use crate::pipelines::processors::transforms::hash_join::common::set_validity; +use crate::pipelines::processors::transforms::hash_join::common::set_true_validity; use crate::pipelines::processors::transforms::hash_join::desc::MARKER_KIND_FALSE; use crate::pipelines::processors::transforms::hash_join::hash_join_state::FixedKeyHashJoinHashTable; use crate::pipelines::processors::transforms::hash_join::hash_join_state::HashJoinHashTable; @@ -171,11 +171,10 @@ impl HashJoinBuildState { let mut validity = MutableBitmap::new(); validity.extend_constant(data_block.num_rows(), true); let validity: Bitmap = validity.into(); - let nullable_columns = data_block .columns() .iter() - .map(|c| set_validity(c, validity.len(), &validity)) + .map(|c| set_true_validity(c, validity.len(), &validity)) .collect::>(); data_block = DataBlock::new(nullable_columns, data_block.num_rows()); } diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_probe_state.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_probe_state.rs index 3ed563e25cac..973fe50b3372 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_probe_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_probe_state.rs @@ -18,7 +18,6 @@ use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; use std::sync::Arc; -use common_arrow::arrow::bitmap::Bitmap; use common_arrow::arrow::bitmap::MutableBitmap; use common_base::base::tokio::sync::Barrier; use common_catalog::table_context::TableContext; @@ -46,7 +45,7 @@ use parking_lot::Mutex; use parking_lot::RwLock; use super::ProbeState; -use crate::pipelines::processors::transforms::hash_join::common::set_validity; +use crate::pipelines::processors::transforms::hash_join::common::set_true_validity; use crate::pipelines::processors::transforms::hash_join::desc::MARKER_KIND_FALSE; use crate::pipelines::processors::transforms::hash_join::desc::MARKER_KIND_NULL; use crate::pipelines::processors::transforms::hash_join::desc::MARKER_KIND_TRUE; @@ -169,12 +168,7 @@ impl HashJoinProbeState { let nullable_columns = input .columns() .iter() - .map(|c| { - let mut validity = MutableBitmap::new(); - validity.extend_constant(input.num_rows(), true); - let validity: Bitmap = validity.into(); - set_validity(c, validity.len(), &validity) - }) + .map(|c| set_true_validity(c, input.num_rows(), &probe_state.true_validity)) .collect::>(); input = DataBlock::new(nullable_columns, input.num_rows()); } @@ -456,22 +450,11 @@ impl HashJoinProbeState { if self.hash_join_state.hash_join_desc.join_type == JoinType::Full { let num_rows = unmatched_build_block.num_rows(); - let nullable_unmatched_build_columns = if num_rows == max_block_size { - unmatched_build_block - .columns() - .iter() - .map(|c| set_validity(c, num_rows, true_validity)) - .collect::>() - } else { - let mut validity = MutableBitmap::new(); - validity.extend_constant(num_rows, true); - let validity: Bitmap = validity.into(); - unmatched_build_block - .columns() - .iter() - .map(|c| set_validity(c, num_rows, &validity)) - .collect::>() - }; + let nullable_unmatched_build_columns = unmatched_build_block + .columns() + .iter() + .map(|c| set_true_validity(c, num_rows, true_validity)) + .collect::>(); unmatched_build_block = DataBlock::new(nullable_unmatched_build_columns, num_rows); }; diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/inner_join.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/inner_join.rs index 0e05b85cadf5..7b30fdb97542 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/inner_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/inner_join.rs @@ -15,8 +15,6 @@ use std::iter::TrustedLen; use std::sync::atomic::Ordering; -use common_arrow::arrow::bitmap::Bitmap; -use common_arrow::arrow::bitmap::MutableBitmap; use common_exception::ErrorCode; use common_exception::Result; use common_expression::types::BooleanType; @@ -27,7 +25,7 @@ use common_functions::BUILTIN_FUNCTIONS; use common_hashtable::HashJoinHashtableLike; use common_sql::executor::cast_expr_to_non_null_boolean; -use crate::pipelines::processors::transforms::hash_join::common::set_validity; +use crate::pipelines::processors::transforms::hash_join::common::set_true_validity; use crate::pipelines::processors::transforms::hash_join::HashJoinProbeState; use crate::pipelines::processors::transforms::hash_join::ProbeState; @@ -120,16 +118,11 @@ impl HashJoinProbeState { (true, false) => { result_block.get_by_offset(*index).clone().remove_nullable() } - (false, true) => { - let mut validity = MutableBitmap::new(); - validity.extend_constant(result_block.num_rows(), true); - let validity: Bitmap = validity.into(); - set_validity( - result_block.get_by_offset(*index), - validity.len(), - &validity, - ) - } + (false, true) => set_true_validity( + result_block.get_by_offset(*index), + result_block.num_rows(), + &probe_state.true_validity, + ), }; result_block.add_column(entry); } @@ -195,16 +188,11 @@ impl HashJoinProbeState { (true, false) => { result_block.get_by_offset(*index).clone().remove_nullable() } - (false, true) => { - let mut validity = MutableBitmap::new(); - validity.extend_constant(result_block.num_rows(), true); - let validity: Bitmap = validity.into(); - set_validity( - result_block.get_by_offset(*index), - validity.len(), - &validity, - ) - } + (false, true) => set_true_validity( + result_block.get_by_offset(*index), + result_block.num_rows(), + &probe_state.true_validity, + ), }; result_block.add_column(entry); } diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_join.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_join.rs index 0ada98786288..56f6021e2ddd 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_join.rs @@ -16,7 +16,6 @@ use std::iter::TrustedLen; use std::sync::atomic::Ordering; use common_arrow::arrow::bitmap::Bitmap; -use common_arrow::arrow::bitmap::MutableBitmap; use common_exception::ErrorCode; use common_exception::Result; use common_expression::BlockEntry; @@ -25,7 +24,7 @@ use common_expression::Scalar; use common_expression::Value; use common_hashtable::HashJoinHashtableLike; -use crate::pipelines::processors::transforms::hash_join::common::set_validity; +use crate::pipelines::processors::transforms::hash_join::common::set_true_validity; use crate::pipelines::processors::transforms::hash_join::HashJoinProbeState; use crate::pipelines::processors::transforms::hash_join::ProbeState; use crate::sql::plans::JoinType; @@ -114,6 +113,7 @@ impl HashJoinProbeState { probe_unmatched_indexes_occupied, is_probe_projected, is_build_projected, + &probe_state.true_validity, string_items_buf, )?); probe_unmatched_indexes_occupied = 0; @@ -135,22 +135,11 @@ impl HashJoinProbeState { )?; // For full join, wrap nullable for probe block if self.hash_join_state.hash_join_desc.join_type == JoinType::Full { - let nullable_probe_columns = if matched_num == max_block_size { - probe_block - .columns() - .iter() - .map(|c| set_validity(c, max_block_size, true_validity)) - .collect::>() - } else { - let mut validity = MutableBitmap::new(); - validity.extend_constant(matched_num, true); - let validity: Bitmap = validity.into(); - probe_block - .columns() - .iter() - .map(|c| set_validity(c, matched_num, &validity)) - .collect::>() - }; + let nullable_probe_columns = probe_block + .columns() + .iter() + .map(|c| set_true_validity(c, matched_num, true_validity)) + .collect::>(); probe_block = DataBlock::new(nullable_probe_columns, matched_num); } Some(probe_block) @@ -178,24 +167,12 @@ impl HashJoinProbeState { .collect::>(), matched_num, ) - } else if matched_num == max_block_size { - ( - build_block - .columns() - .iter() - .map(|c| set_validity(c, max_block_size, true_validity)) - .collect::>(), - max_block_size, - ) } else { - let mut validity = MutableBitmap::new(); - validity.extend_constant(matched_num, true); - let validity: Bitmap = validity.into(); ( build_block .columns() .iter() - .map(|c| set_validity(c, matched_num, &validity)) + .map(|c| set_true_validity(c, matched_num, true_validity)) .collect::>(), matched_num, ) @@ -258,6 +235,7 @@ impl HashJoinProbeState { probe_unmatched_indexes_occupied, is_probe_projected, is_build_projected, + &probe_state.true_validity, string_items_buf, )?); Ok(result_blocks) @@ -358,22 +336,11 @@ impl HashJoinProbeState { )?; // For full join, wrap nullable for probe block if self.hash_join_state.hash_join_desc.join_type == JoinType::Full { - let nullable_probe_columns = if matched_num == max_block_size { - probe_block - .columns() - .iter() - .map(|c| set_validity(c, max_block_size, true_validity)) - .collect::>() - } else { - let mut validity = MutableBitmap::new(); - validity.extend_constant(matched_num, true); - let validity: Bitmap = validity.into(); - probe_block - .columns() - .iter() - .map(|c| set_validity(c, matched_num, &validity)) - .collect::>() - }; + let nullable_probe_columns = probe_block + .columns() + .iter() + .map(|c| set_true_validity(c, matched_num, true_validity)) + .collect::>(); probe_block = DataBlock::new(nullable_probe_columns, matched_num) } Some(probe_block) @@ -401,24 +368,12 @@ impl HashJoinProbeState { .collect::>(), matched_num, ) - } else if matched_num == max_block_size { - ( - build_block - .columns() - .iter() - .map(|c| set_validity(c, max_block_size, true_validity)) - .collect::>(), - max_block_size, - ) } else { - let mut validity = MutableBitmap::new(); - validity.extend_constant(matched_num, true); - let validity: Bitmap = validity.into(); ( build_block .columns() .iter() - .map(|c| set_validity(c, matched_num, &validity)) + .map(|c| set_true_validity(c, matched_num, true_validity)) .collect::>(), matched_num, ) @@ -535,6 +490,7 @@ impl HashJoinProbeState { matched_num, is_probe_projected, is_build_projected, + &probe_state.true_validity, string_items_buf, )?); matched_num = 0; @@ -553,11 +509,13 @@ impl HashJoinProbeState { matched_num, is_probe_projected, is_build_projected, + &probe_state.true_validity, string_items_buf, )?); Ok(result_blocks) } + #[allow(clippy::too_many_arguments)] fn create_left_join_null_block( &self, input: &DataBlock, @@ -565,6 +523,7 @@ impl HashJoinProbeState { matched_num: usize, is_probe_projected: bool, is_build_projected: bool, + true_validity: &Bitmap, string_items_buf: &mut Option>, ) -> Result { let probe_block = if is_probe_projected { @@ -575,12 +534,7 @@ impl HashJoinProbeState { let nullable_probe_columns = probe_block .columns() .iter() - .map(|c| { - let mut probe_validity = MutableBitmap::new(); - probe_validity.extend_constant(matched_num, true); - let probe_validity: Bitmap = probe_validity.into(); - set_validity(c, matched_num, &probe_validity) - }) + .map(|c| set_true_validity(c, matched_num, true_validity)) .collect::>(); probe_block = DataBlock::new(nullable_probe_columns, matched_num); } diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_join.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_join.rs index b0f0afbea9f6..a1daa8ef242a 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_join.rs @@ -17,14 +17,13 @@ use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use common_arrow::arrow::bitmap::Bitmap; -use common_arrow::arrow::bitmap::MutableBitmap; use common_exception::ErrorCode; use common_exception::Result; use common_expression::DataBlock; use common_hashtable::HashJoinHashtableLike; use common_hashtable::RowPtr; -use crate::pipelines::processors::transforms::hash_join::common::set_validity; +use crate::pipelines::processors::transforms::hash_join::common::set_true_validity; use crate::pipelines::processors::transforms::hash_join::HashJoinProbeState; use crate::pipelines::processors::transforms::hash_join::ProbeState; use crate::sql::plans::JoinType; @@ -114,7 +113,7 @@ impl HashJoinProbeState { let nullable_columns = probe_block .columns() .iter() - .map(|c| set_validity(c, max_block_size, true_validity)) + .map(|c| set_true_validity(c, max_block_size, true_validity)) .collect::>(); Some(DataBlock::new(nullable_columns, max_block_size)) } else { @@ -253,15 +252,12 @@ impl HashJoinProbeState { )?; // The join type is right join, we need to wrap nullable for probe side. - let mut validity = MutableBitmap::new(); - validity.extend_constant(matched_num, true); - let validity: Bitmap = validity.into(); let nullable_columns = probe_block .columns() .iter() - .map(|c| set_validity(c, probe_block.num_rows(), &validity)) + .map(|c| set_true_validity(c, matched_num, true_validity)) .collect::>(); - Some(DataBlock::new(nullable_columns, validity.len())) + Some(DataBlock::new(nullable_columns, matched_num)) } else { None }; From 4ee09708fb2cf89666e768155ed1a88867919de3 Mon Sep 17 00:00:00 2001 From: Dousir9 <736191200@qq.com> Date: Wed, 11 Oct 2023 21:27:23 +0800 Subject: [PATCH 2/2] fast path for take --- src/query/expression/src/kernels/take.rs | 9 +++++++++ src/query/expression/src/kernels/take_chunks.rs | 13 +++++++++++++ 2 files changed, 22 insertions(+) diff --git a/src/query/expression/src/kernels/take.rs b/src/query/expression/src/kernels/take.rs index fa8d1d34f128..6e8f3d9dddf2 100644 --- a/src/query/expression/src/kernels/take.rs +++ b/src/query/expression/src/kernels/take.rs @@ -262,6 +262,15 @@ impl Column { pub fn take_boolean_types(col: &Bitmap, indices: &[I]) -> Bitmap where I: common_arrow::arrow::types::Index { let num_rows = indices.len(); + // Fast path: avoid iterating column to generate a new bitmap. + // If this [`Bitmap`] is all true or all false and `num_rows <= bitmap.len()``, + // we can just slice it. + if num_rows <= col.len() && (col.unset_bits() == 0 || col.unset_bits() == col.len()) { + let mut bitmap = col.clone(); + bitmap.slice(0, num_rows); + return bitmap; + } + let capacity = num_rows.saturating_add(7) / 8; let mut builder: Vec = Vec::with_capacity(capacity); let mut builder_len = 0; diff --git a/src/query/expression/src/kernels/take_chunks.rs b/src/query/expression/src/kernels/take_chunks.rs index 0c52ff886bd3..7d34875bccfa 100644 --- a/src/query/expression/src/kernels/take_chunks.rs +++ b/src/query/expression/src/kernels/take_chunks.rs @@ -771,6 +771,19 @@ impl Column { pub fn take_block_vec_boolean_types(col: &[Bitmap], indices: &[RowPtr]) -> Bitmap { let num_rows = indices.len(); + // Fast path: avoid iterating column to generate a new bitmap. + for bitmap in col.iter() { + // If this [`Bitmap`] is all true or all false and `num_rows <= bitmap.len()``, + // we can just slice it. + if num_rows <= bitmap.len() + && (bitmap.unset_bits() == 0 || bitmap.unset_bits() == bitmap.len()) + { + let mut bitmap = bitmap.clone(); + bitmap.slice(0, num_rows); + return bitmap; + } + } + let capacity = num_rows.saturating_add(7) / 8; let mut builder: Vec = Vec::with_capacity(capacity); let mut builder_len = 0;