Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions be/src/core/column/column_variant.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,11 @@ void ColumnVariant::Subcolumn::insert_range_from(const Subcolumn& src, size_t st
if (pos < src.data.size() && processed_rows < end) {
size_t part_end = end - processed_rows;
insert_from_part(src.data[pos], src.data_types[pos], 0, part_end);
processed_rows = end;
}
size_t default_start = std::max(start, processed_rows);
if (default_start < end) {
data.back()->insert_many_defaults(end - default_start);
}
}

Expand Down
42 changes: 23 additions & 19 deletions be/src/core/data_type/data_type_variant.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,16 @@ bool DataTypeVariant::equals(const IDataType& rhs) const {

int64_t DataTypeVariant::get_uncompressed_serialized_bytes(const IColumn& column,
int be_exec_version) const {
const auto& column_variant = assert_cast<const ColumnVariant&>(column);
if (!column_variant.is_finalized()) {
// Icolumn originates from MutablePtr or block, and therefore can be modified.
// todo: We should reconsider the logic here, why are we using finalize() in this context?
const_cast<ColumnVariant&>(column_variant).finalize();
const auto* column_variant = assert_cast<const ColumnVariant*>(&column);
MutableColumnPtr finalized_column;
if (!column_variant->is_finalized()) {
// Local exchange can share the same block across downstream tasks. Serialize a private
// finalized copy so serialization never mutates shared variant columns.
finalized_column = column_variant->clone_finalized();
column_variant = assert_cast<const ColumnVariant*>(finalized_column.get());
}

const auto& subcolumns = column_variant.get_subcolumns();
const auto& subcolumns = column_variant->get_subcolumns();
size_t size = 0;

size += sizeof(uint32_t);
Expand All @@ -95,26 +97,28 @@ int64_t DataTypeVariant::get_uncompressed_serialized_bytes(const IColumn& column
// sparse column
// TODO make compability with sparse column
size += ColumnVariant::get_binary_column_type()->get_uncompressed_serialized_bytes(
*column_variant.get_sparse_column(), be_exec_version);
*column_variant->get_sparse_column(), be_exec_version);

size += ColumnVariant::get_binary_column_type()->get_uncompressed_serialized_bytes(
*column_variant.get_doc_value_column(), be_exec_version);
*column_variant->get_doc_value_column(), be_exec_version);
return size;
}

char* DataTypeVariant::serialize(const IColumn& column, char* buf, int be_exec_version) const {
const auto& column_variant = assert_cast<const ColumnVariant&>(column);
if (!column_variant.is_finalized()) {
// Icolumn originates from block, and therefore can be modified.
// todo: We should reconsider the logic here, why are we using finalize() in this context?
const_cast<ColumnVariant&>(column_variant).finalize();
const auto* column_variant = assert_cast<const ColumnVariant*>(&column);
MutableColumnPtr finalized_column;
if (!column_variant->is_finalized()) {
// Local exchange can share the same block across downstream tasks. Serialize a private
// finalized copy so serialization never mutates shared variant columns.
finalized_column = column_variant->clone_finalized();
column_variant = assert_cast<const ColumnVariant*>(finalized_column.get());
}
#ifndef NDEBUG
// DCHECK size
column_variant.check_consistency();
column_variant->check_consistency();
#endif

const auto& subcolumns = column_variant.get_subcolumns();
const auto& subcolumns = column_variant->get_subcolumns();

char* size_pos = buf;
buf += sizeof(uint32_t);
Expand Down Expand Up @@ -147,15 +151,15 @@ char* DataTypeVariant::serialize(const IColumn& column, char* buf, int be_exec_v
// Safe case
unaligned_store<uint32_t>(size_pos, static_cast<UInt32>(num_of_columns));
// serialize num of rows, only take effect when subcolumns empty
unaligned_store<uint32_t>(buf, static_cast<UInt32>(column_variant.rows()));
unaligned_store<uint32_t>(buf, static_cast<UInt32>(column_variant->rows()));
buf += sizeof(uint32_t);

// serialize sparse column
// TODO make compability with sparse column
buf = ColumnVariant::get_binary_column_type()->serialize(*column_variant.get_sparse_column(),
buf, be_exec_version);
buf = ColumnVariant::get_binary_column_type()->serialize(*column_variant.get_doc_value_column(),
buf = ColumnVariant::get_binary_column_type()->serialize(*column_variant->get_sparse_column(),
buf, be_exec_version);
buf = ColumnVariant::get_binary_column_type()->serialize(
*column_variant->get_doc_value_column(), buf, be_exec_version);
return buf;
}

Expand Down
91 changes: 61 additions & 30 deletions be/src/exprs/function/cast/cast_to_variant.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,45 +32,71 @@ inline Status cast_from_variant_impl(FunctionContext* context, Block& block,
auto& col_with_type_and_name = block.get_by_position(arguments[0]);
auto& col_from = col_with_type_and_name.column;
const IColumn* variant_column = col_from.get();
if (const auto* nullable = check_and_get_column<ColumnNullable>(*variant_column)) {
const auto* nullable = check_and_get_column<ColumnNullable>(*variant_column);
if (nullable != nullptr) {
variant_column = &nullable->get_nested_column();
}
const auto* variant = assert_cast<const ColumnVariant*>(variant_column);
ColumnPtr col_to = data_type_to->create_column();

if (!assert_cast<const ColumnVariant&>(*variant_column).is_finalized()) {
// ColumnVariant should be finalized before parsing, finalize maybe modify original column structure
auto mutable_column = IColumn::mutate(std::move(col_with_type_and_name.column));
if (auto* nullable = check_and_get_column<ColumnNullable>(*mutable_column)) {
const auto& const_nullable = *nullable;
auto nested_column = IColumn::mutate(const_nullable.get_nested_column_ptr());
assert_cast<ColumnVariant&>(*nested_column).finalize();
ColumnPtr nested_column_ptr = std::move(nested_column);
nullable->change_nested_column(nested_column_ptr);
} else {
assert_cast<ColumnVariant&>(*mutable_column).finalize();
}
col_with_type_and_name.column = std::move(mutable_column);
if (input_rows_count == 0) {
block.replace_by_position(result, std::move(col_to));
return Status::OK();
}

variant_column = col_with_type_and_name.column.get();
if (const auto* nullable = check_and_get_column<ColumnNullable>(*variant_column)) {
variant_column = &nullable->get_nested_column();
ColumnPtr finalized_input_column;
if (!variant->is_finalized()) {
// Local exchange can share the same input block across multiple downstream tasks.
// Finalize a private copy so variant casts never mutate shared input columns.
auto finalized_variant = variant->clone_finalized();
variant = assert_cast<const ColumnVariant*>(finalized_variant.get());
if (finalized_variant->size() > 0 && input_rows_count < finalized_variant->size()) {
finalized_variant = finalized_variant->clone_resized(input_rows_count);
variant = assert_cast<const ColumnVariant*>(finalized_variant.get());
}
if (nullable != nullptr) {
auto cloned_null_map =
nullable->get_null_map_column_ptr()->clone_resized(input_rows_count);
finalized_input_column = ColumnNullable::create(std::move(finalized_variant),
std::move(cloned_null_map));
} else {
finalized_input_column = std::move(finalized_variant);
}
}
const auto& variant = assert_cast<const ColumnVariant&>(*variant_column);
ColumnPtr col_to = data_type_to->create_column();
auto execute_on_finalized_input = [&](auto&& executor) -> Status {
if (!finalized_input_column && input_rows_count == variant->size()) {
return executor(block);
}
Block finalized_block = block;
finalized_block.replace_by_position(
arguments[0], finalized_input_column ? finalized_input_column
: col_from->cut(0, input_rows_count));
RETURN_IF_ERROR(executor(finalized_block));
block.replace_by_position(result, finalized_block.get_by_position(result).column);
return Status::OK();
};

// It's important to convert as many elements as possible in this context. For instance,
// if the root of this variant column is a number column, converting it to a number column
// is acceptable. However, if the destination type is a string and root is none scalar root, then
// we should convert the entire tree to a string.
bool is_root_valuable = variant.is_scalar_variant() ||
(!variant.is_null_root() &&
variant.get_root_type()->get_primitive_type() != INVALID_TYPE &&
!is_string_type(data_type_to->get_primitive_type()) &&
data_type_to->get_primitive_type() != TYPE_JSONB);
bool is_scalar_variant = false;
if (variant->size() > 0) {
is_scalar_variant = variant->is_scalar_variant();
} else if (!variant->is_null_root()) {
const auto* root = variant->get_subcolumns().get_root();
is_scalar_variant = root != nullptr && variant->get_subcolumns().get_leaves().size() == 1 &&
root->is_scalar();
}
bool is_root_valuable =
is_scalar_variant || (!variant->is_null_root() &&
variant->get_root_type()->get_primitive_type() != INVALID_TYPE &&
!is_string_type(data_type_to->get_primitive_type()) &&
data_type_to->get_primitive_type() != TYPE_JSONB);

if (is_root_valuable) {
ColumnPtr nested = variant.get_root();
auto nested_from_type = variant.get_root_type();
ColumnPtr nested = variant->get_root();
auto nested_from_type = variant->get_root_type();
// DCHECK(nested_from_type->is_nullable());
DCHECK(!data_type_to->is_nullable());
auto new_context = context == nullptr ? nullptr : context->clone();
Expand Down Expand Up @@ -105,16 +131,21 @@ inline Status cast_from_variant_impl(FunctionContext* context, Block& block,
{0, 1}, input_rows_count);
}
} else {
if (variant.only_have_default_values()) {
if (variant->size() > 0 && variant->only_have_default_values()) {
col_to->assert_mutable()->insert_many_defaults(input_rows_count);
col_to = make_nullable(col_to, true);
} else if (is_string_type(data_type_to->get_primitive_type())) {
// serialize to string
return CastToStringFunction::execute_impl(context, block, arguments, result,
input_rows_count);
return execute_on_finalized_input([&](Block& finalized_block) {
return CastToStringFunction::execute_impl(context, finalized_block, arguments,
result, input_rows_count);
});
} else if (data_type_to->get_primitive_type() == TYPE_JSONB) {
// serialize to json by parsing
return cast_from_generic_to_jsonb(context, block, arguments, result, input_rows_count);
return execute_on_finalized_input([&](Block& finalized_block) {
return cast_from_generic_to_jsonb(context, finalized_block, arguments, result,
input_rows_count);
});
} else if (!data_type_to->is_nullable() &&
!is_string_type(data_type_to->get_primitive_type())) {
// other types
Expand Down
105 changes: 105 additions & 0 deletions be/test/core/column/column_variant_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,28 @@
#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <memory>

#include "agent/be_exec_version_manager.h"
#include "common/cast_set.h"
#include "core/block/block.h"
#include "core/column/column_variant.cpp"
#include "core/column/common_column_test.h"
#include "core/column/subcolumn_tree.h"
#include "core/data_type/data_type_array.h"
#include "core/data_type/data_type_factory.hpp"
#include "core/data_type/data_type_number.h"
#include "core/data_type/define_primitive_type.h"
#include "core/field.h"
#include "core/string_ref.h"
#include "core/types.h"
#include "core/value/jsonb_value.h"
#include "exec/common/variant_util.h"
#include "gen_cpp/data.pb.h"
#include "storage/olap_common.h"
#include "testutil/test_util.h"
#include "testutil/variant_util.h"
#include "util/block_compression.h"

using namespace doris;
namespace doris {
Expand Down Expand Up @@ -1061,6 +1067,53 @@ TEST_F(ColumnVariantTest, test_insert_indices_from) {
}
}

TEST_F(ColumnVariantTest, insert_range_from_materializes_pending_default_suffix) {
auto make_source = [](size_t pending_defaults) {
auto nested = ColumnInt64::create();
nested->insert_value(7);
auto null_map = ColumnUInt8::create();
null_map->insert_value(0);

auto root_type = make_nullable(std::make_shared<DataTypeInt64>());
auto root_column = ColumnNullable::create(std::move(nested), std::move(null_map));
ColumnVariant::Subcolumn root(std::move(root_column), root_type, true, true);
for (size_t i = 0; i < pending_defaults; ++i) {
root.increment_default_counter();
}

ColumnVariant::Subcolumns subcolumns;
subcolumns.create_root(std::move(root));
return ColumnVariant::create(0, false, std::move(subcolumns));
};

auto src = make_source(1);
EXPECT_EQ(src->size(), 2);

auto dst = ColumnVariant::create(0, false);
dst->insert_range_from(*src, 0, 2);
dst->finalize();

const auto& copied_root =
assert_cast<const ColumnNullable&>(*static_cast<const ColumnVariant&>(*dst).get_root());
EXPECT_EQ(copied_root.size(), 2);
EXPECT_EQ(copied_root.get_null_map_data()[0], 0);
EXPECT_EQ(copied_root.get_null_map_data()[1], 1);

auto suffix_src = make_source(6);
EXPECT_EQ(suffix_src->size(), 7);

auto suffix_dst = ColumnVariant::create(0, false);
suffix_dst->insert_range_from(*suffix_src, 4, 2);
suffix_dst->finalize();

const auto& suffix_root = assert_cast<const ColumnNullable&>(
*static_cast<const ColumnVariant&>(*suffix_dst).get_root());
EXPECT_EQ(suffix_dst->size(), 2);
EXPECT_EQ(suffix_root.size(), 2);
EXPECT_EQ(suffix_root.get_null_map_data()[0], 1);
EXPECT_EQ(suffix_root.get_null_map_data()[1], 1);
}

TEST_F(ColumnVariantTest, is_variable_length) {
EXPECT_TRUE(column_variant->is_variable_length());
}
Expand Down Expand Up @@ -2025,6 +2078,58 @@ TEST_F(ColumnVariantTest, clone_finalized) {
test_func(std::move(cloned_object));
}

TEST_F(ColumnVariantTest, clone_finalized_deep_copies_columns) {
auto source_column = VariantUtil::construct_advanced_varint_column();
source_column->finalize(ColumnVariant::FinalizeMode::READ_MODE);

auto cloned = source_column->clone_finalized();
auto* cloned_variant = assert_cast<ColumnVariant*>(cloned.get());
EXPECT_TRUE(cloned_variant->is_finalized());

for (const auto& source_subcolumn : source_column->get_subcolumns()) {
const auto* cloned_subcolumn =
cloned_variant->get_subcolumns().find_exact(source_subcolumn->path);
ASSERT_NE(cloned_subcolumn, nullptr);
EXPECT_NE(source_subcolumn->data.get_finalized_column_ptr().get(),
cloned_subcolumn->data.get_finalized_column_ptr().get())
<< source_subcolumn->path.get_path();
}
EXPECT_NE(source_column->get_sparse_column().get(), cloned_variant->get_sparse_column().get());
EXPECT_NE(source_column->get_doc_value_column().get(),
cloned_variant->get_doc_value_column().get());
}

TEST_F(ColumnVariantTest, serialize_does_not_finalize_source_column) {
auto source_column = VariantUtil::construct_advanced_varint_column();
ASSERT_FALSE(source_column->is_finalized());

const int be_exec_version = BeExecVersionManager::get_newest_version();
const auto size =
dt_variant->get_uncompressed_serialized_bytes(*source_column, be_exec_version);
EXPECT_FALSE(source_column->is_finalized());

auto buffer = std::make_unique<char[]>(size);
dt_variant->serialize(*source_column, buffer.get(), be_exec_version);
EXPECT_FALSE(source_column->is_finalized());
}

TEST_F(ColumnVariantTest, block_serialize_does_not_finalize_source_column) {
auto source_column = VariantUtil::construct_advanced_varint_column();
ASSERT_FALSE(source_column->is_finalized());

Block block({{source_column->get_ptr(), dt_variant, "variant_col"}});
PBlock pblock;
size_t uncompressed_bytes = 0;
size_t compressed_bytes = 0;
int64_t compress_time = 0;
auto status = block.serialize(BeExecVersionManager::get_newest_version(), &pblock,
&uncompressed_bytes, &compressed_bytes, &compress_time,
segment_v2::NO_COMPRESSION);
ASSERT_TRUE(status.ok()) << status;
EXPECT_FALSE(source_column->is_finalized());
EXPECT_GT(pblock.column_values().size(), 0);
}

TEST_F(ColumnVariantTest, sanitize) {
auto test_func = [](const auto& source_column) {
auto src_size = source_column->size();
Expand Down
Loading
Loading