Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions be/src/runtime/mem_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -231,16 +231,17 @@ class MemPool {
// I refers to https://github.com/mcgov/asan_alignment_example.

ChunkInfo& info = chunks_[current_chunk_idx_];
int64_t aligned_allocated_bytes = BitUtil::RoundUpToMultiplyOfFactor(
info.allocated_bytes + DEFAULT_PADDING_SIZE, alignment);
if (aligned_allocated_bytes + size + DEFAULT_PADDING_SIZE <= info.chunk.size) {
int64_t aligned_allocated_bytes =
BitUtil::RoundUpToMultiplyOfFactor(info.allocated_bytes, alignment);
auto size_with_padding = size + DEFAULT_PADDING_SIZE;
if (aligned_allocated_bytes + size_with_padding <= info.chunk.size) {
// Ensure the requested alignment is respected.
int64_t padding = aligned_allocated_bytes - info.allocated_bytes;
uint8_t* result = info.chunk.data + aligned_allocated_bytes;
ASAN_UNPOISON_MEMORY_REGION(result, size);
DCHECK_LE(info.allocated_bytes + size, info.chunk.size);
info.allocated_bytes += padding + size;
total_allocated_bytes_ += padding + size;
ASAN_UNPOISON_MEMORY_REGION(result, size_with_padding);
DCHECK_LE(info.allocated_bytes + size_with_padding, info.chunk.size);
info.allocated_bytes += padding + size_with_padding;
total_allocated_bytes_ += padding + size_with_padding;
DCHECK_LE(current_chunk_idx_, chunks_.size() - 1);
return result;
}
Expand Down
26 changes: 22 additions & 4 deletions be/src/vec/exec/join/vhash_join_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -978,7 +978,7 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eo
if (probe_rows != 0) {
COUNTER_UPDATE(_probe_rows_counter, probe_rows);
if (_join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN) {
_probe_column_convert_to_null = _convert_block_to_null(_probe_block);
_probe_column_convert_to_null = _get_null_index_in_block(_probe_block);
}

int probe_expr_ctxs_sz = _probe_expr_ctxs.size();
Expand Down Expand Up @@ -1229,6 +1229,11 @@ Status HashJoinNode::_extract_build_join_column(Block& block, NullMap& null_map,
RETURN_IF_ERROR(_build_expr_ctxs[i]->execute(&block, &result_col_id));
}


if (_join_op == TJoinOp::LEFT_OUTER_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN) {
_convert_block_to_null(block);
}

// TODO: opt the column is const
block.get_by_position(result_col_id).column =
block.get_by_position(result_col_id).column->convert_to_full_column_if_const();
Expand Down Expand Up @@ -1267,6 +1272,11 @@ Status HashJoinNode::_extract_probe_join_column(Block& block, NullMap& null_map,
RETURN_IF_ERROR(_probe_expr_ctxs[i]->execute(&block, &result_col_id));
}


if (_join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN) {
_convert_block_to_null(block);
}

// TODO: opt the column is const
block.get_by_position(result_col_id).column =
block.get_by_position(result_col_id).column->convert_to_full_column_if_const();
Expand Down Expand Up @@ -1304,9 +1314,6 @@ Status HashJoinNode::_extract_probe_join_column(Block& block, NullMap& null_map,

Status HashJoinNode::_process_build_block(RuntimeState* state, Block& block, uint8_t offset) {
SCOPED_TIMER(_build_table_timer);
if (_join_op == TJoinOp::LEFT_OUTER_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN) {
_convert_block_to_null(block);
}
size_t rows = block.rows();
if (UNLIKELY(rows == 0)) {
return Status::OK();
Expand Down Expand Up @@ -1471,6 +1478,17 @@ std::vector<uint16_t> HashJoinNode::_convert_block_to_null(Block& block) {
return results;
}

std::vector<uint16_t> HashJoinNode::_get_null_index_in_block(Block& block) {
std::vector<uint16_t> results;
for (int i = 0; i < block.columns(); ++i) {
if (auto& column_type = block.safe_get_by_position(i); !column_type.type->is_nullable()) {
DCHECK(!column_type.column->is_nullable());
results.emplace_back(i);
}
}
return results;
}

Status HashJoinNode::_build_output_block(Block* origin_block, Block* output_block) {
auto is_mem_reuse = output_block->mem_reuse();
MutableBlock mutable_block =
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/exec/join/vhash_join_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,8 @@ class HashJoinNode : public ::doris::ExecNode {

static std::vector<uint16_t> _convert_block_to_null(Block& block);

static std::vector<uint16_t> _get_null_index_in_block(Block& block);

template <class HashTableContext>
friend struct ProcessHashTableBuild;

Expand Down