Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/query/expression/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -907,6 +907,19 @@ impl DataBlock {
})
.sum()
}

pub fn maybe_gc(self) -> DataBlock {
let mut columns = Vec::with_capacity(self.entries.len());

for entry in self.entries {
columns.push(match entry {
BlockEntry::Column(column) => BlockEntry::Column(column.maybe_gc()),
BlockEntry::Const(s, d, n) => BlockEntry::Const(s, d, n),
});
}

DataBlock::new(columns, self.num_rows)
}
}

impl Eq for Box<dyn BlockMetaInfo> {}
Expand Down
2 changes: 2 additions & 0 deletions src/query/service/src/global_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ use crate::servers::http::v1::ClientSessionManager;
use crate::servers::http::v1::HttpQueryManager;
use crate::sessions::QueriesQueueManager;
use crate::sessions::SessionManager;
use crate::spillers::SpillsBufferPool;
use crate::task::service::TaskService;

pub struct GlobalServices;
Expand Down Expand Up @@ -106,6 +107,7 @@ impl GlobalServices {
// 4. cluster discovery init.
ClusterDiscovery::init(config, version).await?;

SpillsBufferPool::init();
// TODO(xuanwo):
//
// This part is a bit complex because catalog are used widely in different
Expand Down
57 changes: 23 additions & 34 deletions src/query/service/src/physical_plans/physical_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,8 @@ use crate::physical_plans::physical_plan::PhysicalPlan;
use crate::physical_plans::physical_plan::PhysicalPlanMeta;
use crate::physical_plans::Exchange;
use crate::physical_plans::PhysicalPlanBuilder;
use crate::pipelines::processors::transforms::memory::outer_left_join::OuterLeftHashJoin;
use crate::pipelines::processors::transforms::BasicHashJoinState;
use crate::pipelines::processors::transforms::HashJoinFactory;
use crate::pipelines::processors::transforms::HashJoinProbeState;
use crate::pipelines::processors::transforms::InnerHashJoin;
use crate::pipelines::processors::transforms::RuntimeFiltersDesc;
use crate::pipelines::processors::transforms::TransformHashJoin;
use crate::pipelines::processors::transforms::TransformHashJoinBuild;
Expand Down Expand Up @@ -395,16 +393,22 @@ impl HashJoin {
builder: &mut PipelineBuilder,
desc: Arc<HashJoinDesc>,
) -> Result<()> {
let state = Arc::new(BasicHashJoinState::create());
let factory = self.join_factory(builder, desc)?;

// We must build the runtime filter before constructing the child nodes,
// as we will inject some runtime filter information into the context for the child nodes to use.
let rf_desc = RuntimeFiltersDesc::create(&builder.ctx, self)?;

if let Some((build_cache_index, _)) = self.build_side_cache_info {
builder.hash_join_states.insert(
build_cache_index,
HashJoinStateRef::NewHashJoinState(state.clone()),
);
// After common subexpression elimination is completed, we can delete this type of code.
{
let state = factory.create_basic_state(0)?;

if let Some((build_cache_index, _)) = self.build_side_cache_info {
builder.hash_join_states.insert(
build_cache_index,
HashJoinStateRef::NewHashJoinState(state.clone()),
);
}
}

self.build.build_pipeline(builder)?;
Expand Down Expand Up @@ -441,7 +445,7 @@ impl HashJoin {
build_input.clone(),
probe_input.clone(),
joined_output.clone(),
self.create_join(&self.join_type, builder, desc.clone(), state.clone())?,
factory.create_hash_join(self.join_type.clone(), 0)?,
stage_sync_barrier.clone(),
self.projections.clone(),
rf_desc.clone(),
Expand All @@ -465,13 +469,11 @@ impl HashJoin {
.resize(builder.main_pipeline.output_len(), true)
}

fn create_join(
fn join_factory(
&self,
join_type: &JoinType,
builder: &mut PipelineBuilder,
ctx: &PipelineBuilder,
desc: Arc<HashJoinDesc>,
state: Arc<BasicHashJoinState>,
) -> Result<Box<dyn crate::pipelines::processors::transforms::Join>> {
) -> Result<Arc<HashJoinFactory>> {
let hash_key_types = self
.build_keys
.iter()
Expand All @@ -486,25 +488,12 @@ impl HashJoin {
})
.collect::<Vec<_>>();

let method = DataBlock::choose_hash_method_with_types(&hash_key_types)?;

Ok(match join_type {
JoinType::Inner => Box::new(InnerHashJoin::create(
&builder.ctx,
builder.func_ctx.clone(),
method,
desc,
state,
)?),
JoinType::Left => Box::new(OuterLeftHashJoin::create(
&builder.ctx,
builder.func_ctx.clone(),
method,
desc,
state,
)?),
_ => unreachable!(),
})
Ok(HashJoinFactory::create(
ctx.ctx.clone(),
ctx.func_ctx.clone(),
DataBlock::choose_hash_method_with_types(&hash_key_types)?,
desc,
))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,6 @@ pub use hash_join_state::*;
pub use probe_state::ProbeState;
pub use probe_state::ProcessState;
pub use runtime_filter::*;
pub use spill_common::get_hashes;
pub use transform_hash_join_build::TransformHashJoinBuild;
pub use transform_hash_join_probe::TransformHashJoinProbe;
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,11 @@ impl SquashBlocks {
let mut current_rows = 0;
let mut current_bytes = 0;

while let Some(mut block) = self.blocks.pop_front() {
while current_rows < self.squash_rows && current_bytes < self.squash_bytes {
let Some(mut block) = self.blocks.pop_front() else {
return DataBlock::concat(&blocks);
};

if block.block.is_empty() {
continue;
}
Expand All @@ -106,34 +110,29 @@ impl SquashBlocks {
self.current_bytes -= block.block.memory_size();

let mut slice_rows = block.block.num_rows();

slice_rows = std::cmp::min(slice_rows, self.squash_rows - current_rows);

let max_bytes_rows = match block.avg_bytes {
0 => block.block.num_rows(),
_ => (self.squash_bytes - current_bytes) / block.avg_bytes,
_ => self.squash_bytes.saturating_sub(current_bytes) / block.avg_bytes,
};

slice_rows = std::cmp::min(max_bytes_rows, slice_rows);

if slice_rows != block.block.num_rows() {
let compact_block = block.block.slice(0..slice_rows);
let remain_block = block.block.slice(slice_rows..block.block.num_rows());

let compact_block = block.block.slice(0..slice_rows).maybe_gc();
blocks.push(compact_block);

if !remain_block.is_empty() {
let mut columns = Vec::with_capacity(block.block.num_columns());
let remain_block = block
.block
.slice(slice_rows..block.block.num_rows())
.maybe_gc();

for block_entry in remain_block.take_columns() {
let column = block_entry.to_column();
drop(block_entry);
columns.push(column.maybe_gc());
}
self.current_rows += remain_block.num_rows();
self.current_bytes += remain_block.memory_size();

block.block = DataBlock::new_from_columns(columns);
self.current_rows += block.block.num_rows();
self.current_bytes += block.block.memory_size();
if !remain_block.is_empty() {
block.block = remain_block;
self.blocks.push_front(block);
}

Expand Down
Loading