Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 44 additions & 16 deletions be/src/vec/columns/column_object.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -825,7 +825,6 @@ bool ColumnObject::is_finalized() const {
}

void ColumnObject::finalize() {
size_t old_size = size();
Subcolumns new_subcolumns;
for (auto&& entry : subcolumns) {
const auto& least_common_type = entry->data.get_least_common_type();
Expand All @@ -838,12 +837,12 @@ void ColumnObject::finalize() {
}
/// If all subcolumns were skipped add a dummy subcolumn,
/// because Tuple type must have at least one element.
if (new_subcolumns.empty()) {
new_subcolumns.add(
PathInData {COLUMN_NAME_DUMMY},
Subcolumn {static_cast<MutableColumnPtr&&>(ColumnUInt8::create(old_size, 0)),
is_nullable});
}
// if (new_subcolumns.empty()) {
// new_subcolumns.add(
// PathInData {COLUMN_NAME_DUMMY},
// Subcolumn {static_cast<MutableColumnPtr&&>(ColumnUInt8::create(old_size, 0)),
// is_nullable});
// }
std::swap(subcolumns, new_subcolumns);
}

Expand All @@ -860,7 +859,6 @@ ColumnPtr get_base_column_of_array(const ColumnPtr& column) {

void ColumnObject::strip_outer_array() {
assert(is_finalized());
size_t old_size = size();
Subcolumns new_subcolumns;
for (auto&& entry : subcolumns) {
auto base_column = get_base_column_of_array(entry->data.get_finalized_column_ptr());
Expand All @@ -869,20 +867,43 @@ void ColumnObject::strip_outer_array() {
}
/// If all subcolumns were skipped add a dummy subcolumn,
/// because Tuple type must have at least one element.
if (new_subcolumns.empty()) {
new_subcolumns.add(
PathInData {COLUMN_NAME_DUMMY},
Subcolumn {static_cast<MutableColumnPtr&&>(ColumnUInt8::create(old_size, 0)),
is_nullable});
}
// if (new_subcolumns.empty()) {
// new_subcolumns.add(
// PathInData {COLUMN_NAME_DUMMY},
// Subcolumn {static_cast<MutableColumnPtr&&>(ColumnUInt8::create(old_size, 0)),
// is_nullable});
// }
std::swap(subcolumns, new_subcolumns);
}

ColumnPtr ColumnObject::filter(const Filter& filter, ssize_t count) const {
DCHECK(is_finalized());
auto new_column = ColumnObject::create(true);
for (auto& entry : subcolumns) {
auto subcolumn = entry->data.get_finalized_column().filter(filter, count);
new_column->add_sub_column(entry->path, std::move(subcolumn));
}
return new_column;
}

size_t ColumnObject::filter(const Filter& filter) {
DCHECK(is_finalized());
for (auto& entry : subcolumns) {
num_rows = entry->data.get_finalized_column().filter(filter);
}
return num_rows;
}

template <typename ColumnInserterFn>
void align_variant_by_name_and_type(ColumnObject& dst, const ColumnObject& src, size_t row_cnt,
ColumnInserterFn inserter) {
CHECK(dst.is_finalized() && src.is_finalized());
size_t num_rows = dst.size();
// Use rows() here instead of size(), since size() will check_consistency
// but we could not check_consistency since num_rows will be upgraded even
// if src and dst is empty, we just increase the num_rows of dst and fill
// num_rows of default values when meet new data
size_t num_rows = dst.rows();
bool need_inc_row_num = true;
for (auto& entry : dst.get_subcolumns()) {
const auto* src_subcol = src.get_subcolumn(entry->path);
if (src_subcol == nullptr) {
Expand All @@ -903,12 +924,19 @@ void align_variant_by_name_and_type(ColumnObject& dst, const ColumnObject& src,
auto new_column = type->create_column();
new_column->insert_many_defaults(num_rows);
inserter(entry->data.get_finalized_column(), new_column.get());
if (dst.empty()) {
// add_sub_column updated num_rows of dst object
need_inc_row_num = false;
}
dst.add_sub_column(entry->path, std::move(new_column));
}
}
num_rows += row_cnt;
if (need_inc_row_num) {
dst.incr_num_rows(row_cnt);
}
#ifndef NDEBUG
// Check all columns rows matched
num_rows += row_cnt;
for (const auto& entry : dst.get_subcolumns()) {
DCHECK_EQ(entry->data.get_finalized_column().size(), num_rows);
}
Expand Down
12 changes: 4 additions & 8 deletions be/src/vec/columns/column_object.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ class ColumnObject final : public COWHelper<IColumn, ColumnObject> {

void incr_num_rows(size_t n) { num_rows += n; }

size_t rows() const { return num_rows; }

/// Adds a subcolumn from existing IColumn.
bool add_sub_column(const PathInData& key, MutableColumnPtr&& subcolumn);

Expand Down Expand Up @@ -307,15 +309,9 @@ class ColumnObject final : public COWHelper<IColumn, ColumnObject> {
LOG(FATAL) << "should not call the method in column object";
}

ColumnPtr filter(const Filter&, ssize_t) const override {
LOG(FATAL) << "should not call the method in column object";
return nullptr;
}
ColumnPtr filter(const Filter&, ssize_t) const override;

size_t filter(const Filter&) override {
LOG(FATAL) << "should not call the method in column object";
return 0;
}
size_t filter(const Filter&) override;

ColumnPtr permute(const Permutation&, size_t) const override {
LOG(FATAL) << "should not call the method in column object";
Expand Down
64 changes: 43 additions & 21 deletions be/src/vec/common/schema_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,22 @@
#include <vec/json/parse2column.h>

#include <vec/data_types/data_type_factory.hpp>
#include <vector>

#include "common/compiler_util.h"
#include "gen_cpp/FrontendService.h"
#include "gen_cpp/HeartbeatService_types.h"
#include "olap/rowset/rowset_writer_context.h"
#include "runtime/client_cache.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "util/thrift_rpc_helper.h"
#include "vec/columns/column.h"
#include "vec/columns/columns_number.h"
#include "vec/core/types.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_nullable.h"
#include "vec/functions/function.h"

namespace doris::vectorized::schema_util {

Expand Down Expand Up @@ -141,8 +150,14 @@ bool is_conversion_required_between_integers(FieldType lhs, FieldType rhs) {
}

Status cast_column(const ColumnWithTypeAndName& arg, const DataTypePtr& type, ColumnPtr* result) {
ColumnsWithTypeAndName arguments {arg,
{type->create_column_const_with_default_value(1), type, ""}};
ColumnsWithTypeAndName arguments;
if (WhichDataType(type->get_type_id()).is_string()) {
// Special handle ColumnString, since the original cast logic use ColumnString's first item
// as the name of the dest type
arguments = {arg, {type->create_column_const(1, type->get_name()), type, ""}};
} else {
arguments = {arg, {type->create_column_const_with_default_value(1), type, ""}};
}
auto function = SimpleFunctionFactory::instance().get_function("CAST", arguments, type);
Block tmp_block {arguments};
// the 0 position is input argument, the 1 position is to type argument, the 2 position is result argument
Expand Down Expand Up @@ -281,52 +296,59 @@ Status send_add_columns_rpc(ColumnsWithTypeAndName column_type_names,
return Status::OK();
}

void unfold_object(size_t dynamic_col_position, std::vector<MutableColumnPtr>& columns,
const HashMap<StringRef, size_t, StringRefHash>& column_offset_map,
const std::vector<SlotDescriptor*>& slot_descs, bool cast_to_original_type) {
auto* column_object_ptr = assert_cast<ColumnObject*>(columns[dynamic_col_position].get());
void unfold_object(size_t dynamic_col_position, Block& block, bool cast_to_original_type) {
auto dynamic_col = block.get_by_position(dynamic_col_position).column->assume_mutable();
auto* column_object_ptr = assert_cast<ColumnObject*>(dynamic_col.get());
if (column_object_ptr->empty()) {
return;
}
size_t num_rows = column_object_ptr->size();
CHECK(columns[0]->size() <= num_rows);
CHECK(block.rows() <= num_rows);
CHECK(column_object_ptr->is_finalized());
Columns subcolumns;
DataTypes types;
Names names;
std::unordered_set<std::string> static_column_names;

// extract columns from dynamic column
for (auto& subcolumn : column_object_ptr->get_subcolumns()) {
subcolumns.push_back(subcolumn->data.get_finalized_column().get_ptr());
types.push_back(subcolumn->data.get_least_common_type());
names.push_back(subcolumn->path.get_path());
}
for (size_t i = 0; i < subcolumns.size(); ++i) {
// block may already contains this column, eg. key columns, we should ignore
// Block may already contains this column, eg. key columns, we should ignore
// or replcace the same column from object subcolumn
auto iter = column_offset_map.find(names[i]);
if (iter != column_offset_map.end()) {
ColumnWithTypeAndName* column_type_name = block.try_get_by_name(names[i]);
if (column_type_name) {
ColumnPtr column = subcolumns[i];
SlotDescriptor* slot_desc = slot_descs[iter->get_second()];
DataTypePtr dst_type = slot_desc->get_data_type_ptr();
DataTypePtr dst_type = column_type_name->type;
// Make it nullable when src is nullable but dst is not
// since we should filter some data when slot type is not null
// but column contains nulls
if (!dst_type->is_nullable() && column->is_nullable()) {
dst_type = make_nullable(dst_type);
}
if (cast_to_original_type && !dst_type->equals(*types[i])) {
// Cast static columns to original slot type
schema_util::cast_column({subcolumns[i], types[i], ""}, dst_type, &column);
}
// TODO swap to avoid memcpy
columns[iter->get_second()]->insert_range_from(*column, 0, column->size());
// replace original column
column_type_name->column = column;
column_type_name->type = dst_type;
static_column_names.emplace(names[i]);
continue;
}
}
// remove static ones remain extra dynamic columns

// Remove static ones remain extra dynamic columns
column_object_ptr->remove_subcolumns(static_column_names);
// fill default value
for (auto& column : columns) {
if (column->size() < num_rows) {
column->insert_many_defaults(num_rows - column->size());

// Fill default value
for (auto& entry : block) {
if (entry.column->size() < num_rows) {
entry.column->assume_mutable()->insert_many_defaults(num_rows - entry.column->size());
}
}
// column_object_ptr->clear();
}

void LocalSchemaChangeRecorder::add_extended_columns(const TabletColumn& new_column,
Expand Down
4 changes: 1 addition & 3 deletions be/src/vec/common/schema_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,7 @@ Status cast_column(const ColumnWithTypeAndName& arg, const DataTypePtr& type, Co
// from object column and casted to the new type from slot_descs.
// Also if column in block is empty, it will be filled
// with num_rows of default values
void unfold_object(size_t dynamic_col_position, std::vector<MutableColumnPtr>& columns,
const HashMap<StringRef, size_t, StringRefHash>& column_offset_map,
const std::vector<SlotDescriptor*>& slot_descs, bool cast_to_original_type);
void unfold_object(size_t dynamic_col_position, Block& block, bool cast_to_original_type);

/// If both of types are signed/unsigned integers and size of left field type
/// is less than right type, we don't need to convert field,
Expand Down
10 changes: 9 additions & 1 deletion be/src/vec/data_types/data_type_object.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <vec/data_types/data_type_object.h>
#include <vec/io/io_helper.h>

#include <cassert>
#include <vec/data_types/data_type_factory.hpp>

namespace doris::vectorized {
Expand Down Expand Up @@ -65,6 +66,10 @@ int64_t DataTypeObject::get_uncompressed_serialized_bytes(const IColumn& column,
char* DataTypeObject::serialize(const IColumn& column, char* buf, int be_exec_version) const {
const auto& column_object = assert_cast<const ColumnObject&>(column);
assert(column_object.is_finalized());
#ifndef NDEBUG
// DCHECK size
column_object.check_consistency();
#endif

const auto& subcolumns = column_object.get_subcolumns();

Expand Down Expand Up @@ -123,7 +128,10 @@ const char* DataTypeObject::deserialize(const char* buf, IColumn* column,
}

column_object->finalize();

#ifndef NDEBUG
// DCHECK size
column_object->check_consistency();
#endif
return buf;
}

Expand Down
Loading