Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 37 additions & 16 deletions be/src/format/reader/column_mapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,33 @@

namespace doris::reader {

struct FileSlotRewriteInfo {
size_t block_position = 0;
DataTypePtr file_type;
DataTypePtr table_type;
std::string file_column_name;
};

static VExprSPtr rewrite_table_expr_to_file_expr(
const VExprSPtr& expr, const std::map<int32_t, size_t>& table_column_to_file_position) {
const VExprSPtr& expr,
const std::map<int32_t, FileSlotRewriteInfo>& table_column_to_file_slot) {
if (expr == nullptr) {
return nullptr;
}
if (expr->is_slot_ref()) {
const auto* slot_ref = assert_cast<const VSlotRef*>(expr.get());
const auto position_it = table_column_to_file_position.find(slot_ref->slot_id());
if (position_it != table_column_to_file_position.end()) {
return TableSlotRef::create_shared(slot_ref->slot_id(),
cast_set<int>(position_it->second), -1,
slot_ref->data_type(), slot_ref->expr_name());
const auto rewrite_it = table_column_to_file_slot.find(slot_ref->slot_id());
if (rewrite_it != table_column_to_file_slot.end()) {
const auto& rewrite_info = rewrite_it->second;
auto file_slot = TableSlotRef::create_shared(
slot_ref->slot_id(), cast_set<int>(rewrite_info.block_position), -1,
rewrite_info.file_type, rewrite_info.file_column_name);
if (rewrite_info.file_type->equals(*rewrite_info.table_type)) {
return file_slot;
}
auto cast_expr = Cast::create_shared(rewrite_info.table_type);
cast_expr->add_child(std::move(file_slot));
return cast_expr;
}
return expr;
}
Expand All @@ -59,7 +74,7 @@ static VExprSPtr rewrite_table_expr_to_file_expr(
rewritten_children.reserve(expr->children().size());
for (const auto& child : expr->children()) {
rewritten_children.push_back(
rewrite_table_expr_to_file_expr(child, table_column_to_file_position));
rewrite_table_expr_to_file_expr(child, table_column_to_file_slot));
}
expr->set_children(std::move(rewritten_children));
return expr;
Expand Down Expand Up @@ -95,19 +110,25 @@ static void rebuild_projection(ColumnMapping* mapping, size_t block_position) {
mapping->projection = VExprContext::create_shared(expr);
}

static std::map<int32_t, size_t> build_file_position_map(const std::vector<ColumnMapping>& mappings,
const FileScanRequest& file_request) {
std::map<int32_t, size_t> table_column_to_file_position;
// Build a map from table column id to file slot rewrite info for all columns in the given mappings that have a file column id and are present in the file request.
static std::map<int32_t, FileSlotRewriteInfo> build_file_slot_rewrite_map(
const std::vector<ColumnMapping>& mappings, const FileScanRequest& file_request) {
std::map<int32_t, FileSlotRewriteInfo> table_column_to_file_slot;
for (const auto& mapping : mappings) {
if (!mapping.file_column_id.has_value()) {
continue;
}
const auto position_it = file_request.column_positions.find(*mapping.file_column_id);
if (position_it != file_request.column_positions.end()) {
table_column_to_file_position.emplace(mapping.table_column_id, position_it->second);
table_column_to_file_slot.emplace(
mapping.table_column_id,
FileSlotRewriteInfo {.block_position = position_it->second,
.file_type = mapping.file_type,
.table_type = mapping.table_type,
.file_column_name = mapping.file_column_name});
}
}
return table_column_to_file_position;
return table_column_to_file_slot;
}

static bool is_complex_type(const DataTypePtr& type) {
Expand Down Expand Up @@ -348,9 +369,9 @@ Status TableColumnMapper::localize_filters(const std::vector<TableFilter>& table
add_scan_column(file_request, *mapping->file_column_id, &file_request->predicate_columns);
}

// Build the complete table-slot to file-block position map after all predicate columns have
// been assigned. This keeps expression localization independent from filter iteration order.
const auto table_column_to_file_position = build_file_position_map(_mappings, *file_request);
// Build the complete table-slot rewrite map after all predicate columns have been assigned.
// This keeps expression localization independent from filter iteration order.
const auto table_column_to_file_slot = build_file_slot_rewrite_map(_mappings, *file_request);
for (const auto& table_filter : table_filters) {
if (!table_filter.can_be_localized()) {
continue;
Expand All @@ -359,7 +380,7 @@ Status TableColumnMapper::localize_filters(const std::vector<TableFilter>& table
FileExpressionFilter expression_filter;
expression_filter.conjunct =
VExprContext::create_shared(rewrite_table_expr_to_file_expr(
table_filter.conjunct->root(), table_column_to_file_position));
table_filter.conjunct->root(), table_column_to_file_slot));
expression_filter.file_column_ids.reserve(table_filter.slot_ids.size());
for (const auto table_column_id : table_filter.slot_ids) {
const auto* mapping = _find_mapping(table_column_id);
Expand Down
2 changes: 1 addition & 1 deletion be/src/format/reader/file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ struct FileScanRequest {

std::vector<ColumnId> predicate_columns;
std::vector<ColumnId> non_predicate_columns;
std::map<ColumnId, size_t> column_positions;
std::map<ColumnId, size_t> column_positions; // file_column_id -> file-local block position
std::map<ColumnId, FieldProjection> complex_projections;
std::vector<FileExpressionFilter> expression_filters;
std::vector<FileColumnPredicateFilter> column_predicate_filters;
Expand Down
89 changes: 89 additions & 0 deletions be/test/format/reader/expr/cast_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,33 @@ class CastTest : public testing::Test {
MockRuntimeState state;
};

class Int64ChildGreaterThanExpr final : public VExpr {
public:
explicit Int64ChildGreaterThanExpr(int64_t value)
: VExpr(std::make_shared<DataTypeUInt8>(), false), _value(value) {}

Status execute_column_impl(VExprContext* context, const Block* block, const Selector* selector,
size_t count, ColumnPtr& result_column) const override {
ColumnPtr child_column;
RETURN_IF_ERROR(get_child(0)->execute_column(context, block, selector, count, child_column));
const auto& input = assert_cast<const ColumnInt64&>(*child_column);
auto result = ColumnUInt8::create();
auto& result_data = result->get_data();
result_data.resize(count);
for (size_t row = 0; row < count; ++row) {
result_data[row] = input.get_element(row) > _value;
}
result_column = std::move(result);
return Status::OK();
}

const std::string& expr_name() const override { return _expr_name; }

private:
const int64_t _value;
const std::string _expr_name = "Int64ChildGreaterThanExpr";
};

TEST_F(CastTest, CastIntSlotToBigInt) {
auto source_type = std::make_shared<DataTypeInt32>();
auto return_type = std::make_shared<DataTypeInt64>();
Expand Down Expand Up @@ -189,6 +216,9 @@ TEST_F(CastTest, ColumnMapperBuildsCastProjectionForTypeMismatch) {
auto status = mapper.create_mapping(projected_columns, {}, file_schema);
ASSERT_TRUE(status.ok()) << status;
ASSERT_EQ(mapper.mappings().size(), 1);
reader::FileScanRequest file_request;
status = mapper.create_scan_request({}, {}, projected_columns, &file_request);
ASSERT_TRUE(status.ok()) << status;
const auto& mapping = mapper.mappings()[0];
EXPECT_FALSE(mapping.is_trivial);
ASSERT_NE(mapping.projection, nullptr);
Expand All @@ -207,4 +237,63 @@ TEST_F(CastTest, ColumnMapperBuildsCastProjectionForTypeMismatch) {
mapping.projection->close();
}

TEST_F(CastTest, ColumnMapperBuildsCastFilterForTypeMismatch) {
reader::TableColumnMapper mapper;
reader::TableColumn table_column;
table_column.id = 7;
table_column.name = "value";
table_column.type = std::make_shared<DataTypeInt64>();
std::vector<reader::TableColumn> projected_columns {table_column};

reader::SchemaField file_field;
file_field.id = 0;
file_field.name = "value";
file_field.type = std::make_shared<DataTypeInt32>();
std::vector<reader::SchemaField> file_schema {file_field};

auto status = mapper.create_mapping(projected_columns, {}, file_schema);
ASSERT_TRUE(status.ok()) << status;

auto predicate = std::make_shared<Int64ChildGreaterThanExpr>(15);
predicate->add_child(TableSlotRef::create_shared(7, 7, -1, table_column.type, "value"));
reader::TableFilter table_filter;
table_filter.conjunct = VExprContext::create_shared(predicate);
table_filter.slot_ids = {7};

reader::FileScanRequest file_request;
ASSERT_TRUE(mapper.create_scan_request({table_filter}, {}, projected_columns, &file_request)
.ok());
ASSERT_EQ(file_request.expression_filters.size(), 1);
ASSERT_EQ(file_request.predicate_columns, std::vector<reader::ColumnId>({0}));
const auto& localized_expr = file_request.expression_filters[0].conjunct->root();
ASSERT_EQ(localized_expr->get_num_children(), 1);
const auto& localized_child = localized_expr->children()[0];
ASSERT_NE(dynamic_cast<const Cast*>(localized_child.get()), nullptr);
ASSERT_EQ(localized_child->get_num_children(), 1);
const auto* localized_slot =
assert_cast<const TableSlotRef*>(localized_child->children()[0].get());
EXPECT_EQ(localized_slot->column_id(), 0);
EXPECT_TRUE(localized_slot->data_type()->equals(*file_field.type));
EXPECT_TRUE(localized_child->data_type()->equals(*table_column.type));

Block block;
block.insert(ColumnHelper::create_column_with_name<DataTypeInt32>({11, 22}));
auto* conjunct = file_request.expression_filters[0].conjunct.get();
status = conjunct->prepare(&state, RowDescriptor());
ASSERT_TRUE(status.ok()) << status;
status = conjunct->open(&state);
ASSERT_TRUE(status.ok()) << status;
IColumn::Filter filter(block.rows(), 1);
bool can_filter_all = false;
status = conjunct->execute_filter(&block, filter.data(), block.rows(), false,
&can_filter_all);
ASSERT_TRUE(status.ok()) << status;
EXPECT_FALSE(can_filter_all);
ASSERT_EQ(filter.size(), 2);
EXPECT_EQ(filter[0], 0);
EXPECT_EQ(filter[1], 1);

file_request.expression_filters[0].conjunct->close();
}

} // namespace doris
Loading