Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 43 additions & 2 deletions be/src/core/block/block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include "core/column/column_const.h"
#include "core/column/column_nothing.h"
#include "core/column/column_nullable.h"
#include "core/column/column_variant.h"
#include "core/column/column_vector.h"
#include "core/data_type/data_type_factory.hpp"
#include "core/data_type/data_type_nullable.h"
Expand All @@ -59,6 +60,33 @@ namespace doris::segment_v2 {
enum CompressionTypePB : int;
} // namespace doris::segment_v2
namespace doris {
namespace {

ColumnPtr clone_finalized_variant_for_serialization(const ColumnWithTypeAndName& column) {
if (remove_nullable(column.type)->get_primitive_type() != PrimitiveType::TYPE_VARIANT) {
return column.column;
}

const auto& column_ptr = column.column;
const auto* nullable = check_and_get_column<ColumnNullable>(*column_ptr);
const IColumn* nested_column =
nullable != nullptr ? &nullable->get_nested_column() : column_ptr.get();
const auto* variant = check_and_get_column<ColumnVariant>(*nested_column);
if (variant == nullptr || variant->is_finalized()) {
return column_ptr;
}

auto finalized_variant = variant->clone_finalized();
if (nullable == nullptr) {
return finalized_variant;
}

auto null_map = nullable->get_null_map_column_ptr()->clone_resized(nullable->size());
return ColumnNullable::create(std::move(finalized_variant), std::move(null_map));
}

} // namespace

template <typename T>
void clear_blocks(moodycamel::ConcurrentQueue<T>& blocks,
RuntimeProfile::Counter* memory_used_counter = nullptr) {
Expand Down Expand Up @@ -1012,10 +1040,23 @@ Status Block::serialize(int be_exec_version, PBlock* pblock,
bool allow_transfer_large_data) const {
RETURN_IF_ERROR(BeExecVersionManager::check_be_exec_version(be_exec_version));
pblock->set_be_exec_version(be_exec_version);
Block block_for_serialization;
const Block* serialized_block = this;
for (size_t i = 0; i < data.size(); ++i) {
auto serialized_column = clone_finalized_variant_for_serialization(data[i]);
if (serialized_column.get() == data[i].column.get()) {
continue;
}
if (serialized_block == this) {
block_for_serialization = *this;
serialized_block = &block_for_serialization;
}
block_for_serialization.replace_by_position(i, std::move(serialized_column));
}

// calc uncompressed size for allocation
size_t content_uncompressed_size = 0;
for (const auto& c : *this) {
for (const auto& c : *serialized_block) {
PColumnMeta* pcm = pblock->add_column_metas();
c.to_pb_column_meta(pcm);
DCHECK(pcm->type() != PGenericType::UNKNOWN) << " forget to set pb type";
Expand All @@ -1038,7 +1079,7 @@ Status Block::serialize(int be_exec_version, PBlock* pblock,
}
char* buf = column_values.data();

for (const auto& c : *this) {
for (const auto& c : *serialized_block) {
buf = c.type->serialize(*(c.column), buf, pblock->be_exec_version());
}
*uncompressed_bytes = content_uncompressed_size;
Expand Down
19 changes: 8 additions & 11 deletions be/src/core/column/column_nullable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,17 +183,14 @@ void ColumnNullable::update_crc32c_batch(uint32_t* __restrict hashes,
const uint8_t* __restrict /* null_map */) const {
const auto* __restrict real_null_data = get_null_map_column().get_data().data();
if (_nested_column->support_replace_column_null_data()) {
// nullmap process is slow, replace null data to default value to avoid nullmap process
// This is an intentional in-place mutation inside a logically-const hash computation:
// null positions are overwritten with defaults so the inner hash loop needs no null checks.
// The invariant is that a column instance is not hashed concurrently through the same
// owner while this per-block hash path runs. Shared aliases are detached by mutate()
// before this normalized nested column is written back.
auto nested_mut = std::move(*static_cast<const IColumn::Ptr&>(_nested_column)).mutate();
nested_mut->replace_column_null_data(real_null_data);
static_cast<IColumn::Ptr&>(const_cast<IColumn::WrappedPtr&>(_nested_column)) =
std::move(nested_mut);
_nested_column->update_crc32c_batch(hashes, nullptr);
if (!has_null()) {
_nested_column->update_crc32c_batch(hashes, nullptr);
return;
}
auto nested_column = is_exclusive() ? _nested_column->assert_mutable()
: _nested_column->clone_resized(_nested_column->size());
nested_column->replace_column_null_data(real_null_data);
nested_column->update_crc32c_batch(hashes, nullptr);
} else {
auto s = size();
for (int i = 0; i < s; ++i) {
Expand Down
37 changes: 37 additions & 0 deletions be/src/core/column/column_variant.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,14 @@
namespace doris {
namespace {

IColumn::WrappedPtr clone_column_deep(const IColumn::WrappedPtr& column) {
auto full_column = column->convert_to_full_column_if_const();
auto cloned = full_column->clone_resized(full_column->size());
cloned->for_each_subcolumn(
[](IColumn::WrappedPtr& subcolumn) { subcolumn = clone_column_deep(subcolumn); });
return cloned;
}

DataTypePtr create_array_of_type(PrimitiveType type, size_t num_dimensions, bool is_nullable,
int precision = -1, int scale = -1) {
DataTypePtr result = type == PrimitiveType::INVALID_TYPE
Expand Down Expand Up @@ -2841,4 +2849,33 @@ MutableColumnPtr ColumnVariant::clone() const {
return res;
}

MutableColumnPtr ColumnVariant::clone_finalized() const {
auto res = ColumnVariant::create(_max_subcolumns_count, _enable_doc_mode);
Subcolumns new_subcolumns;
for (const auto& subcolumn : subcolumns) {
auto new_subcolumn = subcolumn->data;
for (auto& part : new_subcolumn.data) {
part = clone_column_deep(part);
}
if (subcolumn->data.is_root) {
new_subcolumns.create_root(std::move(new_subcolumn));
} else if (!new_subcolumns.add(subcolumn->path, std::move(new_subcolumn))) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR,
"add path {} is error in clone_finalized()",
subcolumn->path.get_path());
}
}
if (!new_subcolumns.get_root()) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR, "root is nullptr in clone_finalized()");
}
res->subcolumns = std::move(new_subcolumns);
res->serialized_sparse_column = clone_column_deep(serialized_sparse_column);
res->serialized_doc_value_column = clone_column_deep(serialized_doc_value_column);
res->set_num_rows(num_rows);

ENABLE_CHECK_CONSISTENCY(res.get());
res->finalize(FinalizeMode::READ_MODE);
return res;
}

} // namespace doris
6 changes: 1 addition & 5 deletions be/src/core/column/column_variant.h
Original file line number Diff line number Diff line change
Expand Up @@ -440,11 +440,7 @@ class ColumnVariant final : public COWHelper<IColumn, ColumnVariant> {

bool is_finalized() const;

MutableColumnPtr clone_finalized() const {
auto finalized = IColumn::mutate(get_ptr());
static_cast<ColumnVariant*>(finalized.get())->finalize(FinalizeMode::READ_MODE);
return finalized;
}
MutableColumnPtr clone_finalized() const;

MutableColumnPtr clone() const override;

Expand Down
42 changes: 23 additions & 19 deletions be/src/core/data_type/data_type_variant.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,16 @@ bool DataTypeVariant::equals(const IDataType& rhs) const {

int64_t DataTypeVariant::get_uncompressed_serialized_bytes(const IColumn& column,
int be_exec_version) const {
const auto& column_variant = assert_cast<const ColumnVariant&>(column);
if (!column_variant.is_finalized()) {
// Icolumn originates from MutablePtr or block, and therefore can be modified.
// todo: We should reconsider the logic here, why are we using finalize() in this context?
const_cast<ColumnVariant&>(column_variant).finalize();
const auto* column_variant = assert_cast<const ColumnVariant*>(&column);
MutableColumnPtr finalized_column;
if (!column_variant->is_finalized()) {
// Local exchange can share the same block across downstream tasks. Serialize a private
// finalized copy so serialization never mutates shared variant columns.
finalized_column = column_variant->clone_finalized();
column_variant = assert_cast<const ColumnVariant*>(finalized_column.get());
}

const auto& subcolumns = column_variant.get_subcolumns();
const auto& subcolumns = column_variant->get_subcolumns();
size_t size = 0;

size += sizeof(uint32_t);
Expand All @@ -95,26 +97,28 @@ int64_t DataTypeVariant::get_uncompressed_serialized_bytes(const IColumn& column
// sparse column
// TODO make compability with sparse column
size += ColumnVariant::get_binary_column_type()->get_uncompressed_serialized_bytes(
*column_variant.get_sparse_column(), be_exec_version);
*column_variant->get_sparse_column(), be_exec_version);

size += ColumnVariant::get_binary_column_type()->get_uncompressed_serialized_bytes(
*column_variant.get_doc_value_column(), be_exec_version);
*column_variant->get_doc_value_column(), be_exec_version);
return size;
}

char* DataTypeVariant::serialize(const IColumn& column, char* buf, int be_exec_version) const {
const auto& column_variant = assert_cast<const ColumnVariant&>(column);
if (!column_variant.is_finalized()) {
// Icolumn originates from block, and therefore can be modified.
// todo: We should reconsider the logic here, why are we using finalize() in this context?
const_cast<ColumnVariant&>(column_variant).finalize();
const auto* column_variant = assert_cast<const ColumnVariant*>(&column);
MutableColumnPtr finalized_column;
if (!column_variant->is_finalized()) {
// Local exchange can share the same block across downstream tasks. Serialize a private
// finalized copy so serialization never mutates shared variant columns.
finalized_column = column_variant->clone_finalized();
column_variant = assert_cast<const ColumnVariant*>(finalized_column.get());
}
#ifndef NDEBUG
// DCHECK size
column_variant.check_consistency();
column_variant->check_consistency();
#endif

const auto& subcolumns = column_variant.get_subcolumns();
const auto& subcolumns = column_variant->get_subcolumns();

char* size_pos = buf;
buf += sizeof(uint32_t);
Expand Down Expand Up @@ -147,15 +151,15 @@ char* DataTypeVariant::serialize(const IColumn& column, char* buf, int be_exec_v
// Safe case
unaligned_store<uint32_t>(size_pos, static_cast<UInt32>(num_of_columns));
// serialize num of rows, only take effect when subcolumns empty
unaligned_store<uint32_t>(buf, static_cast<UInt32>(column_variant.rows()));
unaligned_store<uint32_t>(buf, static_cast<UInt32>(column_variant->rows()));
buf += sizeof(uint32_t);

// serialize sparse column
// TODO make compability with sparse column
buf = ColumnVariant::get_binary_column_type()->serialize(*column_variant.get_sparse_column(),
buf, be_exec_version);
buf = ColumnVariant::get_binary_column_type()->serialize(*column_variant.get_doc_value_column(),
buf = ColumnVariant::get_binary_column_type()->serialize(*column_variant->get_sparse_column(),
buf, be_exec_version);
buf = ColumnVariant::get_binary_column_type()->serialize(
*column_variant->get_doc_value_column(), buf, be_exec_version);
return buf;
}

Expand Down
70 changes: 67 additions & 3 deletions be/src/exec/operator/nested_loop_join_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,47 @@ ColumnPtr align_eval_column_nullable(const ColumnWithTypeAndName& target, const
return column;
}

IColumn::WrappedPtr clone_column_deep(const IColumn::WrappedPtr& column) {
auto full_column = column->convert_to_full_column_if_const();
auto cloned = full_column->clone_resized(full_column->size());
cloned->for_each_subcolumn(
[](IColumn::WrappedPtr& subcolumn) { subcolumn = clone_column_deep(subcolumn); });
return cloned;
}

Status copy_lazy_probe_block_rows(const Block& src, Block* dst,
const std::set<int>& lazy_eval_column_ids,
const std::set<int>& materialize_column_ids) {
RETURN_IF_CATCH_EXCEPTION({
ColumnsWithTypeAndName copied_columns;
copied_columns.reserve(src.columns());
const auto rows = src.rows();
for (size_t column_idx = 0; column_idx < src.columns(); ++column_idx) {
const auto& src_column = src.get_by_position(column_idx);
const auto column_id = cast_set<int>(column_idx);
const bool should_copy_column =
lazy_eval_column_ids.find(column_id) != lazy_eval_column_ids.end() ||
materialize_column_ids.find(column_id) != materialize_column_ids.end();
ColumnPtr column;
if (should_copy_column) {
column = clone_column_deep(src_column.column);
} else {
column = src_column.type->create_column_const_with_default_value(rows);
}
copied_columns.emplace_back(std::move(column), src_column.type, src_column.name);
}
*dst = Block(std::move(copied_columns));
});
return Status::OK();
}

void append_many_from_source(MutableColumnPtr& dst_column, const ColumnWithTypeAndName& src_column,
size_t row, size_t rows) {
if (src_column.column->is_nullable() && src_column.column->is_null_at(row)) {
DCHECK(dst_column->is_nullable());
dst_column->insert_many_defaults(rows);
return;
}
if (!src_column.column->is_nullable() && dst_column->is_nullable()) {
const auto origin_size = dst_column->size();
auto* nullable_column = assert_cast<ColumnNullable*>(dst_column.get());
Expand All @@ -67,6 +106,23 @@ void append_filtered_from_source(MutableColumnPtr& dst_column,
if (selected_rows == 0) {
return;
}
if (src_column.column->is_nullable()) {
DCHECK(dst_column->is_nullable());
size_t appended_rows = 0;
for (size_t row = 0; row < filter.size() && appended_rows < selected_rows; ++row) {
if (!filter[row]) {
continue;
}
if (src_column.column->is_null_at(row)) {
dst_column->insert_default();
} else {
dst_column->insert_from(*src_column.column, row);
}
++appended_rows;
}
DCHECK_EQ(appended_rows, selected_rows);
return;
}
auto filtered_column = src_column.column->filter(filter, selected_rows);
if (!src_column.column->is_nullable() && dst_column->is_nullable()) {
const auto origin_size = dst_column->size();
Expand Down Expand Up @@ -131,6 +187,7 @@ Status NestedLoopJoinProbeLocalState::close(RuntimeState* state) {
return Status::OK();
}
_child_block->clear();
_lazy_probe_block.clear();

return JoinProbeLocalState<NestedLoopJoinSharedState, NestedLoopJoinProbeLocalState>::close(
state);
Expand Down Expand Up @@ -889,7 +946,7 @@ Status NestedLoopJoinProbeLocalState::generate_inner_join_block_data(RuntimeStat
_probe_side_process_count = 0;
DCHECK(!_need_more_input_data || !_matched_rows_done);
auto& p = _parent->cast<NestedLoopJoinProbeOperatorX>();
auto* probe_block = _child_block.get();
auto* probe_block = p._enable_lazy_materialize ? &_lazy_probe_block : _child_block.get();

if (p._enable_lazy_materialize) {
if (!_matched_rows_done && !_need_more_input_data) {
Expand Down Expand Up @@ -931,7 +988,7 @@ Status NestedLoopJoinProbeLocalState::generate_other_join_block_data(RuntimeStat
DCHECK(!_need_more_input_data || !_matched_rows_done);

auto& p = _parent->cast<NestedLoopJoinProbeOperatorX>();
auto* probe_block = _child_block.get();
auto* probe_block = p._enable_lazy_materialize ? &_lazy_probe_block : _child_block.get();

if (p._enable_lazy_materialize) {
if (!_matched_rows_done && !_need_more_input_data) {
Expand Down Expand Up @@ -1230,7 +1287,6 @@ Status NestedLoopJoinProbeOperatorX::push(doris::RuntimeState* state, Block* blo
bool eos) const {
auto& local_state = get_local_state(state);
COUNTER_UPDATE(local_state._probe_rows_counter, block->rows());
COUNTER_SET(local_state._memory_used_counter, block->allocated_bytes());
SCOPED_PEAK_MEM(&local_state.estimate_memory_usage());
local_state._cur_probe_row_visited_flags.resize(block->rows());
std::fill(local_state._cur_probe_row_visited_flags.begin(),
Expand All @@ -1243,6 +1299,14 @@ Status NestedLoopJoinProbeOperatorX::push(doris::RuntimeState* state, Block* blo
local_state._probe_block_pos = 0;
local_state._need_more_input_data = false;
local_state._shared_state->probe_side_eos = eos;
if (_enable_lazy_materialize) {
RETURN_IF_ERROR(copy_lazy_probe_block_rows(*block, &local_state._lazy_probe_block,
_lazy_eval_column_ids, _materialize_column_ids));
}
COUNTER_SET(local_state._memory_used_counter,
block->allocated_bytes() +
(_enable_lazy_materialize ? local_state._lazy_probe_block.allocated_bytes()
: 0));

if (!_is_output_probe_side_only) {
auto func = [&](auto&& join_op_variants, auto set_build_side_flag,
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/operator/nested_loop_join_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ class NestedLoopJoinProbeLocalState final
}

bool _matched_rows_done;
Block _lazy_probe_block;
int _probe_block_start_pos = 0;
int _probe_block_pos; // current scan pos in _probe_block
int _probe_side_process_count = 0;
Expand Down
Loading
Loading