Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion be/src/core/data_type_serde/data_type_datetimev2_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -457,14 +457,18 @@ Status DataTypeDateTimeV2SerDe::read_column_from_decoded_values(
if (view.value_kind != DecodedValueKind::INT64) {
return Status::NotSupported("DATETIMEV2 decoded reader expects INT64 source");
}
if (view.values == nullptr && view.row_count > 0) {
if (view.values == nullptr && decoded_column_view_has_non_null_value(view)) {
return Status::Corruption("Decoded value buffer is null for {}", column.get_name());
}
auto& data = assert_cast<ColumnDateTimeV2&>(column).get_data();
const auto* values = reinterpret_cast<const int64_t*>(view.values);
static const cctz::time_zone utc_time_zone = cctz::utc_time_zone();
const int64_t second_mask = view.time_unit == DecodedTimeUnit::MILLIS ? 1000 : 1000000;
for (int64_t row = 0; row < view.row_count; ++row) {
if (decoded_column_view_row_is_null(view, row)) {
data.push_back(DateV2Value<DateTimeV2ValueType>());
continue;
}
int64_t epoch_seconds = values[row] / second_mask;
int64_t sub_second = values[row] % second_mask;
if (sub_second < 0) {
Expand Down
6 changes: 5 additions & 1 deletion be/src/core/data_type_serde/data_type_datev2_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,16 @@ Status DataTypeDateV2SerDe::read_column_from_decoded_values(IColumn& column,
if (view.value_kind != DecodedValueKind::INT32) {
return Status::NotSupported("DATEV2 decoded reader expects INT32 source");
}
if (view.values == nullptr && view.row_count > 0) {
if (view.values == nullptr && decoded_column_view_has_non_null_value(view)) {
return Status::Corruption("Decoded value buffer is null for {}", column.get_name());
}
auto& data = assert_cast<ColumnDateV2&>(column).get_data();
const auto* values = reinterpret_cast<const int32_t*>(view.values);
for (int64_t row = 0; row < view.row_count; ++row) {
if (decoded_column_view_row_is_null(view, row)) {
data.push_back(DateV2Value<DateV2ValueType>());
continue;
}
DateV2Value<DateV2ValueType> date_v2;
date_v2.get_date_from_daynr(values[row] + date_threshold);
data.push_back(date_v2);
Expand Down
22 changes: 22 additions & 0 deletions be/src/core/data_type_serde/data_type_decimal_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,30 @@ typename PrimitiveTypeTraits<T>::CppType read_decimal_decoded_value(const Decode

template <PrimitiveType T>
Status read_decimal_decoded_values(IColumn& column, const DecodedColumnView& view) {
if (view.value_kind == DecodedValueKind::INT32 || view.value_kind == DecodedValueKind::INT64) {
if (view.values == nullptr && decoded_column_view_has_non_null_value(view)) {
return Status::Corruption("Decoded value buffer is null for {}", column.get_name());
}
} else if (view.binary_values == nullptr && decoded_column_view_has_non_null_value(view)) {
return Status::Corruption("Decoded binary values are null for {}", column.get_name());
}
auto& data = assert_cast<ColumnDecimal<T>&>(column).get_data();
for (int64_t row = 0; row < view.row_count; ++row) {
if (decoded_column_view_row_is_null(view, row)) {
data.push_back(typename PrimitiveTypeTraits<T>::CppType());
continue;
}
if (view.value_kind == DecodedValueKind::BINARY ||
view.value_kind == DecodedValueKind::FIXED_BINARY) {
const auto& value = (*view.binary_values)[row];
const auto length = view.value_kind == DecodedValueKind::FIXED_BINARY
? view.fixed_length
: cast_set<int, size_t, false>(value.size);
if (value.data == nullptr && length > 0) {
return Status::Corruption("Decoded decimal binary value is null for {} at row {}",
column.get_name(), row);
}
}
data.push_back(read_decimal_decoded_value<T>(view, row));
}
return Status::OK();
Expand Down
1 change: 0 additions & 1 deletion be/src/core/data_type_serde/data_type_nullable_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,6 @@ Status DataTypeNullableSerDe::read_column_from_decoded_values(IColumn& column,
memcpy(dst, view.null_map, view.row_count);
}
DecodedColumnView nested_view = view;
nested_view.null_map = nullptr;
return nested_serde->read_column_from_decoded_values(nullable_column.get_nested_column(),
nested_view);
}
Expand Down
15 changes: 10 additions & 5 deletions be/src/core/data_type_serde/data_type_number_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,18 @@ const NativeType* decoded_values_as(const DecodedColumnView& view) {

template <PrimitiveType DorisType, typename SourceType>
Status read_number_decoded_values(IColumn& column, const DecodedColumnView& view) {
if (view.values == nullptr && view.row_count > 0) {
if (view.values == nullptr && decoded_column_view_has_non_null_value(view)) {
return Status::Corruption("Decoded value buffer is null for {}", column.get_name());
}
auto& data =
assert_cast<typename PrimitiveTypeTraits<DorisType>::ColumnType&>(column).get_data();
const auto* values = decoded_values_as<SourceType>(view);
for (int64_t row = 0; row < view.row_count; ++row) {
using DorisCppType = typename PrimitiveTypeTraits<DorisType>::CppType;
if (decoded_column_view_row_is_null(view, row)) {
data.push_back(DorisCppType());
continue;
}
data.push_back(static_cast<DorisCppType>(values[row]));
}
return Status::OK();
Expand Down Expand Up @@ -188,13 +192,14 @@ Status DataTypeNumberSerDe<T>::read_column_from_decoded_values(
if (view.value_kind == DecodedValueKind::BOOL) {
return read_number_decoded_values<TYPE_BOOLEAN, bool>(column, view);
}
} else if constexpr (T == TYPE_INT) {
} else if constexpr (T == TYPE_TINYINT || T == TYPE_SMALLINT || T == TYPE_INT) {
if (view.value_kind == DecodedValueKind::INT32) {
return read_number_decoded_values<TYPE_INT, int32_t>(column, view);
return read_number_decoded_values<T, int32_t>(column, view);
}
} else if constexpr (T == TYPE_BIGINT) {
} else if constexpr (T == TYPE_TINYINT || T == TYPE_SMALLINT || T == TYPE_INT ||
T == TYPE_BIGINT) {
if (view.value_kind == DecodedValueKind::INT64) {
return read_number_decoded_values<TYPE_BIGINT, int64_t>(column, view);
return read_number_decoded_values<T, int64_t>(column, view);
}
} else if constexpr (T == TYPE_FLOAT) {
if (view.value_kind == DecodedValueKind::FLOAT) {
Expand Down
6 changes: 5 additions & 1 deletion be/src/core/data_type_serde/data_type_string_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,15 @@ namespace {

template <typename ColumnType>
Status read_string_decoded_values(IColumn& column, const DecodedColumnView& view) {
if (view.binary_values == nullptr && view.row_count > 0) {
if (view.binary_values == nullptr && decoded_column_view_has_non_null_value(view)) {
return Status::Corruption("Decoded binary values are null for {}", column.get_name());
}
auto& string_column = assert_cast<ColumnType&>(column);
for (int64_t row = 0; row < view.row_count; ++row) {
if (decoded_column_view_row_is_null(view, row)) {
string_column.insert_default();
continue;
}
const auto& value = (*view.binary_values)[row];
string_column.insert_data(value.data, value.size);
}
Expand Down
6 changes: 5 additions & 1 deletion be/src/core/data_type_serde/data_type_time_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,11 +177,15 @@ Status DataTypeTimeV2SerDe::read_column_from_decoded_values(IColumn& column,
if (view.value_kind != DecodedValueKind::INT32 && view.value_kind != DecodedValueKind::INT64) {
return Status::NotSupported("TIMEV2 decoded reader expects INT32 or INT64 source");
}
if (view.values == nullptr && view.row_count > 0) {
if (view.values == nullptr && decoded_column_view_has_non_null_value(view)) {
return Status::Corruption("Decoded value buffer is null for {}", column.get_name());
}
auto& data = assert_cast<ColumnTimeV2&>(column).get_data();
for (int64_t row = 0; row < view.row_count; ++row) {
if (decoded_column_view_row_is_null(view, row)) {
data.push_back(TimeValue::TimeType());
continue;
}
data.push_back(read_time_decoded_value(view, row));
}
return Status::OK();
Expand Down
18 changes: 18 additions & 0 deletions be/src/core/data_type_serde/decoded_column_view.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,22 @@ struct DecodedColumnView {
const std::vector<StringRef>* binary_values = nullptr;
};

inline bool decoded_column_view_row_is_null(const DecodedColumnView& view, int64_t row) {
return view.null_map != nullptr && view.null_map[row] != 0;
}

inline bool decoded_column_view_has_non_null_value(const DecodedColumnView& view) {
if (view.null_map == nullptr) {
return view.row_count > 0;
}

// TODO(gabriel): optimize null map check with SIMD or bitset if needed.
for (int64_t row = 0; row < view.row_count; ++row) {
if (view.null_map[row] == 0) {
return true;
}
}
return false;
}

} // namespace doris
11 changes: 6 additions & 5 deletions be/src/exec/scan/file_scanner_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
#include "exprs/vexpr_context.h"
#include "exprs/vslot_ref.h"
#include "format/format_common.h"
#include "format/reader/column_mapper.h"
#include "format/reader/expr/slot_ref.h"
#include "format/reader/table/hive_reader.h"
#include "format/reader/table/paimon_reader.h"
Expand Down Expand Up @@ -544,12 +545,14 @@ Status rewrite_slot_refs_to_global_index(
*expr = TableSlotRef::create_shared(cast_set<int>(global_index.value()),
cast_set<int>(global_index.value()), -1,
slot_ref->data_type(), slot_ref->column_name());
RETURN_IF_ERROR(expr->get()->prepare(nullptr, RowDescriptor(), nullptr));
return Status::OK();
}
const auto global_index = global_index_it->second;
*expr = TableSlotRef::create_shared(cast_set<int>(global_index.value()),
cast_set<int>(global_index.value()), -1,
slot_ref->data_type(), slot_ref->column_name());
RETURN_IF_ERROR(expr->get()->prepare(nullptr, RowDescriptor(), nullptr));
return Status::OK();
}
auto children = (*expr)->children();
Expand Down Expand Up @@ -876,13 +879,11 @@ Status FileScannerV2::_build_table_conjuncts(VExprContextSPtrs* conjuncts) const
conjuncts->clear();
conjuncts->reserve(_conjuncts.size());
for (const auto& conjunct : _conjuncts) {
VExprContextSPtr table_conjunct;
RETURN_IF_ERROR(conjunct->clone(_state, table_conjunct));
auto root = table_conjunct->root();
VExprSPtr root;
RETURN_IF_ERROR(reader::clone_table_expr_tree(conjunct->root(), &root));
RETURN_IF_ERROR(
rewrite_slot_refs_to_global_index(&root, _column_unique_id_to_global_index));
table_conjunct->set_root(root);
conjuncts->push_back(std::move(table_conjunct));
conjuncts->push_back(VExprContext::create_shared(std::move(root)));
}
return Status::OK();
}
Expand Down
9 changes: 9 additions & 0 deletions be/src/exprs/vexpr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,15 @@ Status VExpr::open(RuntimeState* state, VExprContext* context,
return Status::OK();
}

void VExpr::reset_prepare_state() {
_prepared = false;
_prepare_finished = false;
_open_finished = false;
for (auto& child : _children) {
child->reset_prepare_state();
}
}

void VExpr::close(VExprContext* context, FunctionContext::FunctionStateScope scope) {
for (auto& i : _children) {
i->close(context, scope);
Expand Down
1 change: 1 addition & 0 deletions be/src/exprs/vexpr.h
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ class VExpr {
virtual const VExprSPtrs& children() const { return _children; }
void set_children(const VExprSPtrs& children) { _children = children; }
void set_children(VExprSPtrs&& children) { _children = std::move(children); }
void reset_prepare_state();
virtual std::string debug_string() const;
static std::string debug_string(const VExprSPtrs& exprs);
static std::string debug_string(const VExprContextSPtrs& ctxs);
Expand Down
Loading
Loading