Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion be/src/core/data_type_serde/data_type_datetimev2_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@

#include <chrono> // IWYU pragma: keep
#include <cstdint>

#include "common/status.h"
#include "core/column/column_const.h"
#include "core/data_type_serde/decoded_column_view.h"
#include "core/data_type/data_type_decimal.h"
#include "core/data_type/data_type_number.h"
#include "core/data_type/primitive_type.h"
#include "core/data_type_serde/decoded_column_view.h"
#include "core/types.h"
#include "core/value/vdatetime_value.h"
#include "exprs/function/cast/cast_to_datetimev2_impl.hpp"
Expand Down
7 changes: 4 additions & 3 deletions be/src/core/data_type_serde/data_type_datev2_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@
#include <fmt/core.h>

#include <cstdint>

#include "core/column/column_const.h"
#include "core/data_type_serde/decoded_column_view.h"
#include "core/data_type/data_type_decimal.h"
#include "core/data_type/data_type_number.h"
#include "core/data_type/define_primitive_type.h"
#include "core/data_type_serde/decoded_column_view.h"
#include "core/types.h"
#include "core/value/vdatetime_value.h"
#include "exprs/function/cast/cast_to_datev2_impl.hpp"
Expand Down Expand Up @@ -125,8 +126,8 @@ Status DataTypeDateV2SerDe::read_column_from_arrow(IColumn& column, const arrow:
return Status::OK();
}

Status DataTypeDateV2SerDe::read_column_from_decoded_values(
IColumn& column, const DecodedColumnView& view) const {
Status DataTypeDateV2SerDe::read_column_from_decoded_values(IColumn& column,
const DecodedColumnView& view) const {
if (view.value_kind != DecodedValueKind::INT32) {
return Status::NotSupported("DATEV2 decoded reader expects INT32 source");
}
Expand Down
14 changes: 7 additions & 7 deletions be/src/core/data_type_serde/data_type_decimal_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ NativeType decode_big_endian_signed_integer(const uint8_t* data, int length) {
}

template <PrimitiveType T>
typename PrimitiveTypeTraits<T>::CppType read_decimal_decoded_value(
const DecodedColumnView& view, int64_t row) {
typename PrimitiveTypeTraits<T>::CppType read_decimal_decoded_value(const DecodedColumnView& view,
int64_t row) {
using FieldType = typename PrimitiveTypeTraits<T>::CppType;
if (view.value_kind == DecodedValueKind::INT32) {
const auto* values = reinterpret_cast<const int32_t*>(view.values);
Expand All @@ -76,9 +76,9 @@ typename PrimitiveTypeTraits<T>::CppType read_decimal_decoded_value(
const auto length = view.value_kind == DecodedValueKind::FIXED_BINARY
? view.fixed_length
: cast_set<int, size_t, false>(value.size);
return FieldType {static_cast<typename FieldType::NativeType>(
decode_big_endian_signed_integer<Int128>(reinterpret_cast<const uint8_t*>(value.data),
length))};
return FieldType {
static_cast<typename FieldType::NativeType>(decode_big_endian_signed_integer<Int128>(
reinterpret_cast<const uint8_t*>(value.data), length))};
}

template <PrimitiveType T>
Expand Down Expand Up @@ -441,8 +441,8 @@ Status DataTypeDecimalSerDe<T>::read_column_from_decoded_values(
return read_decimal_decoded_values<T>(column, view);
}
}
return Status::NotSupported("Unsupported decoded values for {} from source kind {}",
get_name(), static_cast<int>(view.value_kind));
return Status::NotSupported("Unsupported decoded values for {} from source kind {}", get_name(),
static_cast<int>(view.value_kind));
}

template <PrimitiveType T>
Expand Down
6 changes: 3 additions & 3 deletions be/src/core/data_type_serde/data_type_nullable_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@
#include "core/column/column_const.h"
#include "core/column/column_nullable.h"
#include "core/column/column_vector.h"
#include "core/data_type_serde/decoded_column_view.h"
#include "core/data_type_serde/data_type_serde.h"
#include "core/data_type_serde/data_type_string_serde.h"
#include "core/data_type_serde/decoded_column_view.h"
#include "exprs/function/cast/cast_base.h"
#include "format/transformer/vcsv_transformer.h"
#include "util/jsonb_document.h"
Expand Down Expand Up @@ -351,8 +351,8 @@ Status DataTypeNullableSerDe::read_column_from_arrow(IColumn& column,
ctz);
}

Status DataTypeNullableSerDe::read_column_from_decoded_values(
IColumn& column, const DecodedColumnView& view) const {
Status DataTypeNullableSerDe::read_column_from_decoded_values(IColumn& column,
const DecodedColumnView& view) const {
auto& nullable_column = assert_cast<ColumnNullable&>(column);
auto& null_map = nullable_column.get_null_map_data();
const auto old_size = null_map.size();
Expand Down
10 changes: 5 additions & 5 deletions be/src/core/data_type_serde/data_type_number_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
#include "core/column/column_nullable.h"
#include "core/data_type/define_primitive_type.h"
#include "core/data_type/primitive_type.h"
#include "core/data_type_serde/decoded_column_view.h"
#include "core/data_type_serde/data_type_serde.h"
#include "core/data_type_serde/decoded_column_view.h"
#include "core/packed_int128.h"
#include "core/types.h"
#include "core/value/timestamptz_value.h"
Expand Down Expand Up @@ -55,8 +55,8 @@ Status read_number_decoded_values(IColumn& column, const DecodedColumnView& view
if (view.values == nullptr && view.row_count > 0) {
return Status::Corruption("Decoded value buffer is null for {}", column.get_name());
}
auto& data = assert_cast<typename PrimitiveTypeTraits<DorisType>::ColumnType&>(column)
.get_data();
auto& data =
assert_cast<typename PrimitiveTypeTraits<DorisType>::ColumnType&>(column).get_data();
const auto* values = decoded_values_as<SourceType>(view);
for (int64_t row = 0; row < view.row_count; ++row) {
using DorisCppType = typename PrimitiveTypeTraits<DorisType>::CppType;
Expand Down Expand Up @@ -204,8 +204,8 @@ Status DataTypeNumberSerDe<T>::read_column_from_decoded_values(
return read_number_decoded_values<TYPE_DOUBLE, double>(column, view);
}
}
return Status::NotSupported("Unsupported decoded values for {} from source kind {}",
get_name(), static_cast<int>(view.value_kind));
return Status::NotSupported("Unsupported decoded values for {} from source kind {}", get_name(),
static_cast<int>(view.value_kind));
}

template <PrimitiveType T>
Expand Down
2 changes: 1 addition & 1 deletion be/src/core/data_type_serde/data_type_string_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
#include "core/data_type_serde/data_type_string_serde.h"

#include "core/column/column_string.h"
#include "core/data_type_serde/decoded_column_view.h"
#include "core/data_type/define_primitive_type.h"
#include "core/data_type_serde/decoded_column_view.h"
#include "util/jsonb_document_cast.h"
#include "util/jsonb_utils.h"
#include "util/jsonb_writer.h"
Expand Down
20 changes: 9 additions & 11 deletions be/src/core/data_type_serde/data_type_time_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

#include "core/data_type_serde/data_type_time_serde.h"

#include "core/data_type_serde/decoded_column_view.h"
#include "core/data_type/data_type_decimal.h"
#include "core/data_type/data_type_number.h"
#include "core/data_type/primitive_type.h"
#include "core/data_type_serde/decoded_column_view.h"
#include "core/value/time_value.h"
#include "exprs/function/cast/cast_base.h"
#include "exprs/function/cast/cast_to_time_impl.hpp"
Expand All @@ -44,12 +44,11 @@ TimeValue::TimeType read_time_decoded_value(const DecodedColumnView& view, int64
}
const bool negative = micros < 0;
const int64_t abs_micros = std::abs(micros);
return TimeValue::make_time(abs_micros / TimeValue::ONE_HOUR_MICROSECONDS,
(abs_micros % TimeValue::ONE_HOUR_MICROSECONDS) /
TimeValue::ONE_MINUTE_MICROSECONDS,
(abs_micros % TimeValue::ONE_MINUTE_MICROSECONDS) /
TimeValue::ONE_SECOND_MICROSECONDS,
abs_micros % TimeValue::ONE_SECOND_MICROSECONDS, negative);
return TimeValue::make_time(
abs_micros / TimeValue::ONE_HOUR_MICROSECONDS,
(abs_micros % TimeValue::ONE_HOUR_MICROSECONDS) / TimeValue::ONE_MINUTE_MICROSECONDS,
(abs_micros % TimeValue::ONE_MINUTE_MICROSECONDS) / TimeValue::ONE_SECOND_MICROSECONDS,
abs_micros % TimeValue::ONE_SECOND_MICROSECONDS, negative);
}

} // namespace
Expand Down Expand Up @@ -173,10 +172,9 @@ Status DataTypeTimeV2SerDe::from_string_strict_mode(StringRef& str, IColumn& col
return Status::OK();
}

Status DataTypeTimeV2SerDe::read_column_from_decoded_values(
IColumn& column, const DecodedColumnView& view) const {
if (view.value_kind != DecodedValueKind::INT32 &&
view.value_kind != DecodedValueKind::INT64) {
Status DataTypeTimeV2SerDe::read_column_from_decoded_values(IColumn& column,
const DecodedColumnView& view) const {
if (view.value_kind != DecodedValueKind::INT32 && view.value_kind != DecodedValueKind::INT64) {
return Status::NotSupported("TIMEV2 decoded reader expects INT32 or INT64 source");
}
if (view.values == nullptr && view.row_count > 0) {
Expand Down
6 changes: 3 additions & 3 deletions be/src/format/new_parquet/parquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -328,9 +328,9 @@ Status ParquetReader::_execute_filter_conjuncts(int64_t batch_rows, Block* file_
}
IColumn::Filter filter(static_cast<size_t>(batch_rows), 1);
bool can_filter_all = false;
RETURN_IF_ERROR(expression_filter.conjunct->execute_filter(
file_block, filter.data(), static_cast<size_t>(batch_rows), false,
&can_filter_all));
RETURN_IF_ERROR(expression_filter.conjunct->execute_filter(file_block, filter.data(),
static_cast<size_t>(batch_rows),
false, &can_filter_all));
*selected_rows =
can_filter_all ? 0 : _apply_filter_to_selection(filter, selection, *selected_rows);
}
Expand Down
14 changes: 8 additions & 6 deletions be/src/format/reader/column_mapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -249,18 +249,20 @@ Status TableColumnMapper::create_mapping(const std::vector<TableColumn>& project
ColumnMapping mapping;
mapping.table_column_id = table_column.id;
mapping.table_type = table_column.type;
if (const auto* file_field = _find_file_field(table_column, file_schema)) {
RETURN_IF_ERROR(_create_direct_mapping(table_column, *file_field, &mapping));
} else if (table_column.is_partition_key && partition_values.count(table_column.name) > 0) {
// 3. Partition column, use partition value as a constant mapping. Note that partition column may also have default expression, but partition value should take precedence if it exists.
if (table_column.is_partition_key && partition_values.count(table_column.name) > 0) {
// 1. Partition column, use partition value as a constant mapping. Note that partition column may also have default expression, but partition value should take precedence if it exists.
mapping.is_constant = true;
mapping.default_expr = VExprContext::create_shared(TableLiteral::create_shared(
mapping.table_type, partition_values.at(table_column.name)));
} else if (const auto* file_field = _find_file_field(table_column, file_schema)) {
// 2. Table column has a matching file column, use it as a direct mapping.
RETURN_IF_ERROR(_create_direct_mapping(table_column, *file_field, &mapping));
} else if (table_column.default_expr != nullptr) {
// 4. Table column does not exist in file (column adding by schema evolution), which has a default expression, use it as a constant mapping.
// 3. Table column does not exist in file (column adding by schema evolution), which has a default expression, use it as a constant mapping.
mapping.is_constant = true;
mapping.default_expr = table_column.default_expr;
} else if (table_column.name == ROW_LINEAGE_ROW_ID) {
// 5. Virtual column, use special mapping to indicate it should be materialized by table reader instead of read from file or evaluated from expression.
// 4. Virtual column, use special mapping to indicate it should be materialized by table reader instead of read from file or evaluated from expression.
mapping.virtual_column_type = TableVirtualColumnType::ROW_ID;
} else if (table_column.name == ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER) {
mapping.virtual_column_type = TableVirtualColumnType::LAST_UPDATED_SEQUENCE_NUMBER;
Expand Down
3 changes: 1 addition & 2 deletions be/src/format/reader/column_mapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ struct SchemaField;
struct FileScanRequest;
struct FieldProjection;

using TableColumnPredicates =
std::map<int32_t, std::vector<std::shared_ptr<ColumnPredicate>>>;
using TableColumnPredicates = std::map<int32_t, std::vector<std::shared_ptr<ColumnPredicate>>>;

enum class TableColumnMappingMode {
BY_FIELD_ID,
Expand Down
17 changes: 14 additions & 3 deletions be/src/format/reader/table_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -295,9 +295,20 @@ class TableReader {
return Status::OK();
}
if (mapping.default_expr != nullptr) {
int res_id;
RETURN_IF_ERROR(mapping.default_expr->execute(current_block, &res_id));
*column = current_block->get_columns()[res_id];
if (current_block->rows() == current_rows) {
int res_id;
RETURN_IF_ERROR(mapping.default_expr->execute(current_block, &res_id));
*column = current_block->get_columns()[res_id];
} else {
DORIS_CHECK(mapping.is_constant);
Block eval_block;
eval_block.insert(
{mapping.table_type->create_column_const_with_default_value(current_rows),
mapping.table_type, "__table_reader_const_rows"});
int res_id;
RETURN_IF_ERROR(mapping.default_expr->execute(&eval_block, &res_id));
*column = eval_block.get_columns()[res_id];
}
return Status::OK();
}
*column = mapping.table_type->create_column_const_with_default_value(current_rows);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,8 @@ TEST(DataTypeSerDeDecodedValuesTest, ReadNullableInt32Values) {
ASSERT_TRUE(st.ok()) << st;

const auto& nullable_column = assert_cast<const ColumnNullable&>(*column);
const auto& nested_column = assert_cast<const ColumnInt32&>(nullable_column.get_nested_column());
const auto& nested_column =
assert_cast<const ColumnInt32&>(nullable_column.get_nested_column());
ASSERT_EQ(nullable_column.size(), 4);
EXPECT_FALSE(nullable_column.is_null_at(0));
EXPECT_TRUE(nullable_column.is_null_at(1));
Expand Down
4 changes: 2 additions & 2 deletions be/test/format/new_parquet/parquet_reader_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,8 @@ void write_int_pair_parquet_file(const std::string& file_path, int64_t row_group
builder.version(::parquet::ParquetVersion::PARQUET_2_6);
builder.data_page_version(::parquet::ParquetDataPageVersion::V2);
builder.compression(::parquet::Compression::UNCOMPRESSED);
PARQUET_THROW_NOT_OK(::parquet::arrow::WriteTable(
*table, arrow::default_memory_pool(), out, row_group_size, builder.build()));
PARQUET_THROW_NOT_OK(::parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), out,
row_group_size, builder.build()));
}

Block build_file_block(const std::vector<reader::SchemaField>& schema) {
Expand Down
10 changes: 5 additions & 5 deletions be/test/format/reader/expr/cast_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ class Int64ChildGreaterThanExpr final : public VExpr {
Status execute_column_impl(VExprContext* context, const Block* block, const Selector* selector,
size_t count, ColumnPtr& result_column) const override {
ColumnPtr child_column;
RETURN_IF_ERROR(get_child(0)->execute_column(context, block, selector, count, child_column));
RETURN_IF_ERROR(
get_child(0)->execute_column(context, block, selector, count, child_column));
const auto& input = assert_cast<const ColumnInt64&>(*child_column);
auto result = ColumnUInt8::create();
auto& result_data = result->get_data();
Expand Down Expand Up @@ -261,8 +262,8 @@ TEST_F(CastTest, ColumnMapperBuildsCastFilterForTypeMismatch) {
table_filter.slot_ids = {7};

reader::FileScanRequest file_request;
ASSERT_TRUE(mapper.create_scan_request({table_filter}, {}, projected_columns, &file_request)
.ok());
ASSERT_TRUE(
mapper.create_scan_request({table_filter}, {}, projected_columns, &file_request).ok());
ASSERT_EQ(file_request.expression_filters.size(), 1);
ASSERT_EQ(file_request.predicate_columns, std::vector<reader::ColumnId>({0}));
const auto& localized_expr = file_request.expression_filters[0].conjunct->root();
Expand All @@ -285,8 +286,7 @@ TEST_F(CastTest, ColumnMapperBuildsCastFilterForTypeMismatch) {
ASSERT_TRUE(status.ok()) << status;
IColumn::Filter filter(block.rows(), 1);
bool can_filter_all = false;
status = conjunct->execute_filter(&block, filter.data(), block.rows(), false,
&can_filter_all);
status = conjunct->execute_filter(&block, filter.data(), block.rows(), false, &can_filter_all);
ASSERT_TRUE(status.ok()) << status;
EXPECT_FALSE(can_filter_all);
ASSERT_EQ(filter.size(), 2);
Expand Down
Loading
Loading