Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cherry-pick][branch-2.2][bugfix] Fix crash during decoding min/max when type mismatch in parquet file #11778

Merged
merged 1 commit into from
Sep 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
30 changes: 18 additions & 12 deletions be/src/exec/parquet/file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,11 @@ Status FileReader::_read_min_max_chunk(const tparquet::RowGroup& row_group, vect
column_order = column_idx < column_orders.size() ? &column_orders[column_idx] : nullptr;
}

Status status = _decode_min_max_column(*field, _param.timezone, slot->type(), *column_meta, column_order,
&(*min_chunk)->columns()[i], &(*max_chunk)->columns()[i]);
if (!status.ok()) {
bool decode_ok = false;
RETURN_IF_ERROR(_decode_min_max_column(*field, _param.timezone, slot->type(), *column_meta, column_order,
&(*min_chunk)->columns()[i], &(*max_chunk)->columns()[i],
&decode_ok));
if (!decode_ok) {
*exist = false;
return Status::OK();
}
Expand All @@ -277,9 +279,12 @@ int FileReader::_get_partition_column_idx(const std::string& col_name) const {
Status FileReader::_decode_min_max_column(const ParquetField& field, const std::string& timezone,
const TypeDescriptor& type, const tparquet::ColumnMetaData& column_meta,
const tparquet::ColumnOrder* column_order, vectorized::ColumnPtr* min_column,
vectorized::ColumnPtr* max_column) {
vectorized::ColumnPtr* max_column, bool* decode_ok) {
*decode_ok = true;

if (!_can_use_min_max_stats(column_meta, column_order)) {
return Status::NotSupported("min max statistics not supported");
*decode_ok = false;
return Status::OK();
}

switch (column_meta.type) {
Expand All @@ -294,7 +299,7 @@ Status FileReader::_decode_min_max_column(const ParquetField& field, const std::
RETURN_IF_ERROR(PlainDecoder<int32_t>::decode(column_meta.statistics.max, &max_value));
}
std::unique_ptr<ColumnConverter> converter;
ColumnConverterFactory::create_converter(field, type, timezone, &converter);
RETURN_IF_ERROR(ColumnConverterFactory::create_converter(field, type, timezone, &converter));

if (!converter->need_convert) {
(*min_column)->append_numbers(&min_value, sizeof(int32_t));
Expand All @@ -308,7 +313,7 @@ Status FileReader::_decode_min_max_column(const ParquetField& field, const std::
max_scr_column->append_numbers(&max_value, sizeof(int32_t));
converter->convert(max_scr_column, max_column->get());
}
return Status::OK();
break;
}
case tparquet::Type::type::INT64: {
int64_t min_value = 0;
Expand All @@ -321,7 +326,7 @@ Status FileReader::_decode_min_max_column(const ParquetField& field, const std::
RETURN_IF_ERROR(PlainDecoder<int64_t>::decode(column_meta.statistics.min, &min_value));
}
std::unique_ptr<ColumnConverter> converter;
ColumnConverterFactory::create_converter(field, type, timezone, &converter);
RETURN_IF_ERROR(ColumnConverterFactory::create_converter(field, type, timezone, &converter));

if (!converter->need_convert) {
(*min_column)->append_numbers(&min_value, sizeof(int64_t));
Expand All @@ -335,7 +340,7 @@ Status FileReader::_decode_min_max_column(const ParquetField& field, const std::
max_scr_column->append_numbers(&max_value, sizeof(int64_t));
converter->convert(max_scr_column, max_column->get());
}
return Status::OK();
break;
}
case tparquet::Type::type::BYTE_ARRAY: {
Slice min_slice;
Expand All @@ -348,7 +353,7 @@ Status FileReader::_decode_min_max_column(const ParquetField& field, const std::
RETURN_IF_ERROR(PlainDecoder<Slice>::decode(column_meta.statistics.max, &max_slice));
}
std::unique_ptr<ColumnConverter> converter;
ColumnConverterFactory::create_converter(field, type, timezone, &converter);
RETURN_IF_ERROR(ColumnConverterFactory::create_converter(field, type, timezone, &converter));

if (!converter->need_convert) {
(*min_column)->append_strings(std::vector<Slice>{min_slice});
Expand All @@ -362,11 +367,12 @@ Status FileReader::_decode_min_max_column(const ParquetField& field, const std::
max_scr_column->append_strings(std::vector<Slice>{max_slice});
converter->convert(max_scr_column, max_column->get());
}
return Status::OK();
break;
}
default:
return Status::NotSupported("min max statistics not supported");
*decode_ok = false;
}
return Status::OK();
}

bool FileReader::_can_use_min_max_stats(const tparquet::ColumnMetaData& column_meta,
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/parquet/file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class FileReader {
static Status _decode_min_max_column(const ParquetField& field, const std::string& timezone,
const TypeDescriptor& type, const tparquet::ColumnMetaData& column_meta,
const tparquet::ColumnOrder* column_order, vectorized::ColumnPtr* min_column,
vectorized::ColumnPtr* max_column);
vectorized::ColumnPtr* max_column, bool* decode_ok);
static bool _can_use_min_max_stats(const tparquet::ColumnMetaData& column_meta,
const tparquet::ColumnOrder* column_order);
// statistics.min_value max_value
Expand Down
11 changes: 6 additions & 5 deletions be/src/exprs/expr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,11 +219,6 @@ Status Expr::create_tree_from_thrift(ObjectPool* pool, const std::vector<TExprNo
DCHECK(expr != nullptr);
if (parent != nullptr) {
parent->add_child(expr);
} else {
DCHECK(root_expr != nullptr);
DCHECK(ctx != nullptr);
*root_expr = expr;
*ctx = pool->add(new ExprContext(expr));
}
for (int i = 0; i < num_children; i++) {
*node_idx += 1;
Expand All @@ -234,6 +229,12 @@ Status Expr::create_tree_from_thrift(ObjectPool* pool, const std::vector<TExprNo
return Status::InternalError("Failed to reconstruct expression tree from thrift.");
}
}
if (parent == nullptr) {
DCHECK(root_expr != nullptr);
DCHECK(ctx != nullptr);
*root_expr = expr;
*ctx = pool->add(new ExprContext(expr));
}
return Status::OK();
}

Expand Down
Binary file not shown.
85 changes: 77 additions & 8 deletions be/test/exec/vectorized/hdfs_scanner_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ static void extend_partition_values(ObjectPool* pool, HdfsScannerParams* params,
params->partition_values = part_values;
}

#define READ_ORC_ROWS(scanner, exp) \
#define READ_SCANNER_ROWS(scanner, exp) \
do { \
auto chunk = vectorized::ChunkHelper::new_chunk(*tuple_desc, 0); \
uint64_t records = 0; \
Expand Down Expand Up @@ -341,7 +341,7 @@ TEST_F(HdfsScannerTest, TestOrcGetNext) {

status = scanner->open(_runtime_state);
EXPECT_TRUE(status.ok());
READ_ORC_ROWS(scanner, 100);
READ_SCANNER_ROWS(scanner, 100);
EXPECT_EQ(scanner->raw_rows_read(), 100);
scanner->close(_runtime_state);
}
Expand Down Expand Up @@ -423,7 +423,7 @@ TEST_F(HdfsScannerTest, TestOrcGetNextWithMinMaxFilterNoRows) {

status = scanner->open(_runtime_state);
EXPECT_TRUE(status.ok());
READ_ORC_ROWS(scanner, 0);
READ_SCANNER_ROWS(scanner, 0);
EXPECT_EQ(scanner->raw_rows_read(), 0);
scanner->close(_runtime_state);
}
Expand Down Expand Up @@ -456,7 +456,7 @@ TEST_F(HdfsScannerTest, TestOrcGetNextWithMinMaxFilterRows1) {

status = scanner->open(_runtime_state);
EXPECT_TRUE(status.ok());
READ_ORC_ROWS(scanner, 100);
READ_SCANNER_ROWS(scanner, 100);
EXPECT_EQ(scanner->raw_rows_read(), 100);
scanner->close(_runtime_state);
}
Expand Down Expand Up @@ -489,7 +489,7 @@ TEST_F(HdfsScannerTest, TestOrcGetNextWithMinMaxFilterRows2) {

status = scanner->open(_runtime_state);
EXPECT_TRUE(status.ok());
READ_ORC_ROWS(scanner, 100);
READ_SCANNER_ROWS(scanner, 100);
EXPECT_EQ(scanner->raw_rows_read(), 100);
scanner->close(_runtime_state);
}
Expand Down Expand Up @@ -571,7 +571,7 @@ TEST_F(HdfsScannerTest, TestOrcGetNextWithDictFilter) {
scanner->disable_use_orc_sargs();
status = scanner->open(_runtime_state);
EXPECT_TRUE(status.ok());
READ_ORC_ROWS(scanner, 1000);
READ_SCANNER_ROWS(scanner, 1000);
// since we use dict filter eval cache, we can do filter on orc cvb
// so actually read rows is 1000.
EXPECT_EQ(scanner->raw_rows_read(), 1000);
Expand Down Expand Up @@ -675,7 +675,7 @@ TEST_F(HdfsScannerTest, TestOrcGetNextWithDatetimeMinMaxFilter) {
scanner->disable_use_orc_sargs();
status = scanner->open(_runtime_state);
EXPECT_TRUE(status.ok());
READ_ORC_ROWS(scanner, 4640);
READ_SCANNER_ROWS(scanner, 4640);
EXPECT_EQ(scanner->raw_rows_read(), 4640);
scanner->close(_runtime_state);
}
Expand Down Expand Up @@ -782,11 +782,80 @@ TEST_F(HdfsScannerTest, TestOrcGetNextWithPaddingCharDictFilter) {
scanner->disable_use_orc_sargs();
status = scanner->open(_runtime_state);
EXPECT_TRUE(status.ok());
READ_ORC_ROWS(scanner, 1000);
READ_SCANNER_ROWS(scanner, 1000);
// since we use dict filter eval cache, we can do filter on orc cvb
// so actually read rows is 1000.
EXPECT_EQ(scanner->raw_rows_read(), 1000);
scanner->close(_runtime_state);
}

// =============================================================================

/*
file: file:/Users/dirlt/Downloads/part-00000-4a878ed5-fa12-4e43-a164-1650976be336-c000.snappy.parquet
creator: parquet-mr version 1.10.1 (build a89df8f9932b6ef6633d06069e50c9b7970bebd1)
extra: org.apache.spark.version = 2.4.7
extra: org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields":[{"name":"vin","type":"string","nullable":true,"metadata":{}},{"name":"log_domain","type":"string","nullable":true,"metadata":{}},{"name":"file_name","type":"string","nullable":true,"metadata":{}},{"name":"is_collection","type":"integer","nullable":false,"metadata":{}},{"name":"is_center","type":"integer","nullable":false,"metadata":{}},{"name":"is_cloud","type":"integer","nullable":false,"metadata":{}},{"name":"collection_time","type":"string","nullable":false,"metadata":{}},{"name":"center_time","type":"string","nullable":false,"metadata":{}},{"name":"cloud_time","type":"string","nullable":false,"metadata":{}},{"name":"error_collection_tips","type":"string","nullable":false,"metadata":{}},{"name":"error_center_tips","type":"string","nullable":false,"metadata":{}},{"name":"error_cloud_tips","type":"string","nullable":false,"metadata":{}},{"name":"error_collection_time","type":"string","nullable":false,"metadata":{}},{"name":"error_center_time","type":"string","nullable":false,"metadata":{}},{"name":"error_cloud_time","type":"string","nullable":false,"metadata":{}},{"name":"original_time","type":"string","nullable":false,"metadata":{}},{"name":"is_original","type":"integer","nullable":false,"metadata":{}}]}

file schema: spark_schema
--------------------------------------------------------------------------------
vin: OPTIONAL BINARY O:UTF8 R:0 D:1
log_domain: OPTIONAL BINARY O:UTF8 R:0 D:1
file_name: OPTIONAL BINARY O:UTF8 R:0 D:1
is_collection: REQUIRED INT32 R:0 D:0
is_center: REQUIRED INT32 R:0 D:0
is_cloud: REQUIRED INT32 R:0 D:0
collection_time: REQUIRED BINARY O:UTF8 R:0 D:0
center_time: REQUIRED BINARY O:UTF8 R:0 D:0
cloud_time: REQUIRED BINARY O:UTF8 R:0 D:0
error_collection_tips: REQUIRED BINARY O:UTF8 R:0 D:0
error_center_tips: REQUIRED BINARY O:UTF8 R:0 D:0
error_cloud_tips: REQUIRED BINARY O:UTF8 R:0 D:0
error_collection_time: REQUIRED BINARY O:UTF8 R:0 D:0
error_center_time: REQUIRED BINARY O:UTF8 R:0 D:0
error_cloud_time: REQUIRED BINARY O:UTF8 R:0 D:0
original_time: REQUIRED BINARY O:UTF8 R:0 D:0
is_original: REQUIRED INT32 R:0 D:0
*/

TEST_F(HdfsScannerTest, TestParqueTypeMismatchDecodeMinMax) {
SlotDesc parquet_descs[] = {{"vin", TypeDescriptor::from_primtive_type(PrimitiveType::TYPE_VARCHAR, 22)},
{"is_cloud", TypeDescriptor::from_primtive_type(PrimitiveType::TYPE_VARCHAR)},
{""}};

SlotDesc min_max_descs[] = {{"vin", TypeDescriptor::from_primtive_type(PrimitiveType::TYPE_VARCHAR, 22)}, {""}};

const std::string parquet_file = "./be/test/exec/test_data/parquet_scanner/type_mismatch_decode_min_max.parquet";

auto scanner = std::make_shared<HdfsParquetScanner>();
ObjectPool* pool = &_pool;
auto* range = _create_scan_range(parquet_file, 0, 0);
auto* tuple_desc = _create_tuple_desc(parquet_descs);
auto* param = _create_param(parquet_file, range, tuple_desc);

// select vin,is_cloud from table where is_cloud >= '0';
auto* min_max_tuple_desc = _create_tuple_desc(min_max_descs);
{
std::vector<TExprNode> nodes;
TExprNode lit_node = create_string_literal_node(TPrimitiveType::VARCHAR, "0");
push_binary_pred_texpr_node(nodes, TExprOpcode::GE, min_max_tuple_desc->slots()[0], TPrimitiveType::VARCHAR,
lit_node);
ExprContext* ctx = create_expr_context(pool, nodes);
param->min_max_conjunct_ctxs.push_back(ctx);
}

param->min_max_tuple_desc = min_max_tuple_desc;
Expr::prepare(param->min_max_conjunct_ctxs, _runtime_state);
Expr::open(param->min_max_conjunct_ctxs, _runtime_state);

Status status = scanner->init(_runtime_state, *param);
EXPECT_TRUE(status.ok());
for (ExprContext* ctx : param->min_max_conjunct_ctxs) {
ctx->close(_runtime_state);
}

status = scanner->open(_runtime_state);
EXPECT_TRUE(!status.ok());
scanner->close(_runtime_state);
}
} // namespace starrocks::vectorized