Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 27 additions & 29 deletions be/src/format/new_parquet/parquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "common/exception.h"
#include "core/block/block.h"
#include "core/data_type/data_type_nullable.h"
#include "exprs/vexpr_context.h"
#include "format/new_parquet/column_reader.h"
#include "format/new_parquet/parquet_column_schema.h"
#include "format/new_parquet/parquet_statistics.h"
Expand Down Expand Up @@ -193,13 +194,8 @@ void ParquetReader::_fill_schema_field(const ParquetColumnSchema& column_schema,
}
}

bool ParquetReader::_has_structured_filter(const reader::FileLocalFilter& local_filter) {
for (const auto& predicate : local_filter.predicates) {
if (predicate != nullptr) {
return true;
}
}
return false;
bool ParquetReader::_has_expression_filter(const reader::FileLocalFilter& local_filter) {
return local_filter.conjunct != nullptr;
}

Status ParquetReader::_read_filter_columns(int64_t batch_rows, Block* file_block,
Expand All @@ -220,43 +216,34 @@ Status ParquetReader::_read_filter_columns(int64_t batch_rows, Block* file_block
return Status::Corruption("Parquet filter column {} returned {} rows, expected {} rows",
column_reader->name(), column_rows, batch_rows);
}
file_block->replace_by_position(block_position, std::move(column));

for (const auto& local_filter : _request->local_filters) {
if (local_filter.file_column_id != file_field_id ||
!_has_structured_filter(local_filter)) {
!_has_expression_filter(local_filter)) {
continue;
}
if (*selected_rows == 0) {
break;
}
for (const auto& predicate : local_filter.predicates) {
*selected_rows = predicate->evaluate(*column, selection->data(), *selected_rows);
if (*selected_rows == 0) {
break;
}
}
IColumn::Filter filter(static_cast<size_t>(batch_rows), 1);
bool can_filter_all = false;
RETURN_IF_ERROR(local_filter.conjunct->execute_filter(
file_block, filter.data(), static_cast<size_t>(batch_rows), false,
&can_filter_all));
*selected_rows =
can_filter_all
? 0
: _apply_filter_to_selection(filter, selection, *selected_rows);
break;
}
file_block->replace_by_position(block_position, std::move(column));
if (*selected_rows == 0) {
break;
}
}
return Status::OK();
}

Status ParquetReader::_validate_supported_local_filters(
const std::vector<reader::FileLocalFilter>& local_filters) {
for (const auto& local_filter : local_filters) {
if (local_filter.conjunct != nullptr) {
return Status::NotSupported(
"Parquet expression filter fallback is not implemented for field {}",
local_filter.file_column_id);
}
}
return Status::OK();
}

IColumn::Filter ParquetReader::_selection_to_filter(const SelectionVector& selection,
uint16_t selected_rows, int64_t batch_rows) {
IColumn::Filter filter(static_cast<size_t>(batch_rows), 0);
Expand All @@ -266,6 +253,19 @@ IColumn::Filter ParquetReader::_selection_to_filter(const SelectionVector& selec
return filter;
}

uint16_t ParquetReader::_apply_filter_to_selection(const IColumn::Filter& filter,
SelectionVector* selection,
uint16_t selected_rows) {
uint16_t new_selected_rows = 0;
for (uint16_t selection_idx = 0; selection_idx < selected_rows; ++selection_idx) {
const auto row_idx = selection->get_index(selection_idx);
if (filter[row_idx] != 0) {
selection->set_index(new_selected_rows++, static_cast<SelectionVector::Index>(row_idx));
}
}
return new_selected_rows;
}

Status ParquetReader::_open_next_row_group(bool* has_row_group) {
*has_row_group = false;
while (_state->next_row_group_idx < _state->selected_row_groups.size()) {
Expand Down Expand Up @@ -456,8 +456,6 @@ Status ParquetReader::open(std::unique_ptr<reader::FileScanRequest>& request) {
local_filter.file_column_id);
}
}
RETURN_IF_ERROR(_validate_supported_local_filters(_request->local_filters));

RETURN_IF_ERROR(select_row_groups_by_statistics(*_state->metadata, _state->file_schema,
*_request, &_state->selected_row_groups));
RETURN_IF_ERROR(_reset_reader_position());
Expand Down
6 changes: 3 additions & 3 deletions be/src/format/new_parquet/parquet_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,13 @@ class ParquetReader : public reader::FileReader {
void _reset_current_row_group();
void _fill_schema_field(const ParquetColumnSchema& column_schema,
reader::SchemaField* field) const;
bool _has_structured_filter(const reader::FileLocalFilter& local_filter);
bool _has_expression_filter(const reader::FileLocalFilter& local_filter);
Status _read_filter_columns(int64_t batch_rows, Block* file_block, SelectionVector* selection,
uint16_t* selected_rows);
Status _validate_supported_local_filters(
const std::vector<reader::FileLocalFilter>& local_filters);
IColumn::Filter _selection_to_filter(const SelectionVector& selection, uint16_t selected_rows,
int64_t batch_rows);
uint16_t _apply_filter_to_selection(const IColumn::Filter& filter, SelectionVector* selection,
uint16_t selected_rows);
Status _open_next_row_group(bool* has_row_group);
Status _read_current_row_group_batch(int64_t batch_rows, Block* file_block, size_t* rows);

Expand Down
44 changes: 43 additions & 1 deletion be/test/format/new_parquet/parquet_reader_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
#include "core/data_type/data_type_number.h"
#include "core/data_type/primitive_type.h"
#include "core/field.h"
#include "exprs/vexpr.h"
#include "exprs/vexpr_context.h"
#include "format/new_parquet/parquet_reader.h"
#include "format/reader/file_reader.h"
#include "gen_cpp/Types_types.h"
Expand All @@ -45,6 +47,43 @@ namespace {

constexpr int64_t ROW_COUNT = 5;

class Int32GreaterThanExpr final : public VExpr {
public:
Int32GreaterThanExpr(int column_id, int32_t value)
: VExpr(std::make_shared<DataTypeUInt8>(), false),
_column_id(column_id),
_value(value) {}

Status execute_column_impl(VExprContext* context, const Block* block, const Selector* selector,
size_t count, ColumnPtr& result_column) const override {
const auto& input =
assert_cast<const ColumnInt32&>(*block->get_by_position(_column_id).column);
auto result = ColumnUInt8::create();
auto& result_data = result->get_data();
result_data.resize(count);
for (size_t row = 0; row < count; ++row) {
const size_t input_row = selector == nullptr ? row : (*selector)[row];
result_data[row] = input.get_element(input_row) > _value;
}
result_column = std::move(result);
return Status::OK();
}

const std::string& expr_name() const override { return _expr_name; }

private:
const int _column_id;
const int32_t _value;
const std::string _expr_name = "Int32GreaterThanExpr";
};

VExprContextSPtr create_int32_greater_than_conjunct(int column_id, int32_t value) {
auto ctx = VExprContext::create_shared(std::make_shared<Int32GreaterThanExpr>(column_id, value));
ctx->_prepared = true;
ctx->_opened = true;
return ctx;
}

std::shared_ptr<arrow::Array> finish_array(arrow::ArrayBuilder* builder) {
std::shared_ptr<arrow::Array> array;
EXPECT_TRUE(builder->Finish(&array).ok());
Expand Down Expand Up @@ -265,6 +304,7 @@ TEST_F(NewParquetReaderTest, ReadPredicateAndNonPredicateColumnsWithSelection) {
request->non_predicate_columns = {1};
reader::FileLocalFilter filter;
filter.file_column_id = 0;
filter.conjunct = create_int32_greater_than_conjunct(0, 2);
filter.predicates.push_back(create_comparison_predicate<PredicateType::GT>(
0, "id", schema[0].type, Field::create_field<TYPE_INT>(2), false));
request->local_filters.push_back(std::move(filter));
Expand Down Expand Up @@ -306,9 +346,11 @@ TEST_F(NewParquetReaderTest, PredicateFiltersRowGroupsByStatistics) {
std::vector<reader::SchemaField> schema;
ASSERT_TRUE(reader->get_schema(&schema).ok());
auto request = std::make_unique<reader::FileScanRequest>();
request->non_predicate_columns = {0, 1};
request->predicate_columns = {0};
request->non_predicate_columns = {1};
reader::FileLocalFilter filter;
filter.file_column_id = 0;
filter.conjunct = create_int32_greater_than_conjunct(0, 2);
filter.predicates.push_back(create_comparison_predicate<PredicateType::GT>(
0, "id", schema[0].type, Field::create_field<TYPE_INT>(2), false));
request->local_filters.push_back(std::move(filter));
Expand Down
Loading
Loading