Skip to content

Commit

Permalink
Fix crash during decoding min/max when type mismatch in parquet file
Browse files Browse the repository at this point in the history
  • Loading branch information
dirtysalt committed Jul 18, 2022
1 parent c90cbd4 commit 17547d7
Show file tree
Hide file tree
Showing 10 changed files with 114 additions and 19 deletions.
7 changes: 6 additions & 1 deletion be/src/connector/hive_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ Status HiveDataSource::_init_conjunct_ctxs(RuntimeState* state) {
RETURN_IF_ERROR(Expr::create_expr_trees(&_pool, hdfs_scan_node.partition_conjuncts, &_partition_conjunct_ctxs));
_has_partition_conjuncts = true;
}

RETURN_IF_ERROR(Expr::prepare(_min_max_conjunct_ctxs, state));
RETURN_IF_ERROR(Expr::prepare(_partition_conjunct_ctxs, state));
RETURN_IF_ERROR(Expr::open(_min_max_conjunct_ctxs, state));
Expand Down Expand Up @@ -261,7 +262,11 @@ Status HiveDataSource::_init_scanner(RuntimeState* state) {
return Status::NotSupported(msg);
}
RETURN_IF_ERROR(scanner->init(state, scanner_params));
RETURN_IF_ERROR(scanner->open(state));
Status st = scanner->open(state);
if (!st.ok()) {
auto msg = fmt::format("file = {}", native_file_path);
return st.clone_and_append(msg);
}
_scanner = scanner;
return Status::OK();
}
Expand Down
15 changes: 14 additions & 1 deletion be/src/exec/vectorized/hdfs_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,18 @@ Status HdfsScanner::init(RuntimeState* runtime_state, const HdfsScannerParams& s
RETURN_IF_ERROR(
Expr::clone_if_not_exists(_scanner_params.min_max_conjunct_ctxs, runtime_state, &_min_max_conjunct_ctxs));

// Why we need this class? Because in above code, we clone many conjuncts from runtime_state->obj_pool()
// And if we quit execution by early abortion, ~RuntimeState() is called directly
// And those conjuncts are released befoe `HdfsScanner` class, so when we call `HdfsScanner::close`
// we will get invalid pointers. And to resolve this problem, we add this instance at the end of obj_pool.
// so this instance must be released before conjuncts, and `HdfsScanner::close` will get valid pointers.
struct ReleaseFence {
HdfsScanner* ptr;
ReleaseFence(HdfsScanner* p) : ptr(p) {}
~ReleaseFence() { ptr->finalize(); }
};
_runtime_state->obj_pool()->add(new ReleaseFence(this));

Status status = do_init(runtime_state, scanner_params);
return status;
}
Expand Down Expand Up @@ -171,6 +183,7 @@ void HdfsScanner::close(RuntimeState* runtime_state) noexcept {
for (auto& it : _conjunct_ctxs_by_slot) {
Expr::close(it.second, runtime_state);
}

do_close(runtime_state);
_file.reset(nullptr);
_raw_file.reset(nullptr);
Expand All @@ -179,7 +192,7 @@ void HdfsScanner::close(RuntimeState* runtime_state) noexcept {
}
}

void HdfsScanner::fianlize() {
void HdfsScanner::finalize() {
if (_runtime_state != nullptr) {
close(_runtime_state);
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/vectorized/hdfs_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ class HdfsScanner {
void close(RuntimeState* runtime_state) noexcept;
Status get_next(RuntimeState* runtime_state, ChunkPtr* chunk);
Status init(RuntimeState* runtime_state, const HdfsScannerParams& scanner_params);
void fianlize();
void finalize();

int64_t num_bytes_read() const { return _stats.bytes_read; }
int64_t raw_rows_read() const { return _stats.raw_rows_read; }
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/vectorized/hdfs_scanner_orc.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class OrcRowReaderFilter;
class HdfsOrcScanner final : public HdfsScanner {
public:
HdfsOrcScanner() = default;
~HdfsOrcScanner() override { fianlize(); }
~HdfsOrcScanner() override = default;

Status do_open(RuntimeState* runtime_state) override;
void do_close(RuntimeState* runtime_state) noexcept override;
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/vectorized/hdfs_scanner_parquet.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace starrocks::vectorized {
class HdfsParquetScanner final : public HdfsScanner {
public:
HdfsParquetScanner() = default;
~HdfsParquetScanner() override { fianlize(); }
~HdfsParquetScanner() override = default;

Status do_open(RuntimeState* runtime_state) override;
void do_close(RuntimeState* runtime_state) noexcept override;
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/vectorized/hdfs_scanner_text.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace starrocks::vectorized {
class HdfsTextScanner final : public HdfsScanner {
public:
HdfsTextScanner() = default;
~HdfsTextScanner() override { fianlize(); }
~HdfsTextScanner() override = default;

Status do_open(RuntimeState* runtime_state) override;
void do_close(RuntimeState* runtime_state) noexcept override;
Expand Down
30 changes: 18 additions & 12 deletions be/src/formats/parquet/file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,11 @@ Status FileReader::_read_min_max_chunk(const tparquet::RowGroup& row_group, vect
column_order = column_idx < column_orders.size() ? &column_orders[column_idx] : nullptr;
}

Status status = _decode_min_max_column(*field, ctx.timezone, slot->type(), *column_meta, column_order,
&(*min_chunk)->columns()[i], &(*max_chunk)->columns()[i]);
if (!status.ok()) {
bool decode_ok = false;
RETURN_IF_ERROR(_decode_min_max_column(*field, ctx.timezone, slot->type(), *column_meta, column_order,
&(*min_chunk)->columns()[i], &(*max_chunk)->columns()[i],
&decode_ok));
if (!decode_ok) {
*exist = false;
return Status::OK();
}
Expand All @@ -193,9 +195,12 @@ int FileReader::_get_partition_column_idx(const std::string& col_name) const {
Status FileReader::_decode_min_max_column(const ParquetField& field, const std::string& timezone,
const TypeDescriptor& type, const tparquet::ColumnMetaData& column_meta,
const tparquet::ColumnOrder* column_order, vectorized::ColumnPtr* min_column,
vectorized::ColumnPtr* max_column) {
vectorized::ColumnPtr* max_column, bool* decode_ok) {
*decode_ok = true;

if (!_can_use_min_max_stats(column_meta, column_order)) {
return Status::NotSupported("min max statistics not supported");
*decode_ok = false;
return Status::OK();
}

switch (column_meta.type) {
Expand All @@ -210,7 +215,7 @@ Status FileReader::_decode_min_max_column(const ParquetField& field, const std::
RETURN_IF_ERROR(PlainDecoder<int32_t>::decode(column_meta.statistics.max, &max_value));
}
std::unique_ptr<ColumnConverter> converter;
ColumnConverterFactory::create_converter(field, type, timezone, &converter);
RETURN_IF_ERROR(ColumnConverterFactory::create_converter(field, type, timezone, &converter));

if (!converter->need_convert) {
(*min_column)->append_numbers(&min_value, sizeof(int32_t));
Expand All @@ -224,7 +229,7 @@ Status FileReader::_decode_min_max_column(const ParquetField& field, const std::
max_scr_column->append_numbers(&max_value, sizeof(int32_t));
converter->convert(max_scr_column, max_column->get());
}
return Status::OK();
break;
}
case tparquet::Type::type::INT64: {
int64_t min_value = 0;
Expand All @@ -237,7 +242,7 @@ Status FileReader::_decode_min_max_column(const ParquetField& field, const std::
RETURN_IF_ERROR(PlainDecoder<int64_t>::decode(column_meta.statistics.min, &min_value));
}
std::unique_ptr<ColumnConverter> converter;
ColumnConverterFactory::create_converter(field, type, timezone, &converter);
RETURN_IF_ERROR(ColumnConverterFactory::create_converter(field, type, timezone, &converter));

if (!converter->need_convert) {
(*min_column)->append_numbers(&min_value, sizeof(int64_t));
Expand All @@ -251,7 +256,7 @@ Status FileReader::_decode_min_max_column(const ParquetField& field, const std::
max_scr_column->append_numbers(&max_value, sizeof(int64_t));
converter->convert(max_scr_column, max_column->get());
}
return Status::OK();
break;
}
case tparquet::Type::type::BYTE_ARRAY: {
Slice min_slice;
Expand All @@ -264,7 +269,7 @@ Status FileReader::_decode_min_max_column(const ParquetField& field, const std::
RETURN_IF_ERROR(PlainDecoder<Slice>::decode(column_meta.statistics.max, &max_slice));
}
std::unique_ptr<ColumnConverter> converter;
ColumnConverterFactory::create_converter(field, type, timezone, &converter);
RETURN_IF_ERROR(ColumnConverterFactory::create_converter(field, type, timezone, &converter));

if (!converter->need_convert) {
(*min_column)->append_strings(std::vector<Slice>{min_slice});
Expand All @@ -278,11 +283,12 @@ Status FileReader::_decode_min_max_column(const ParquetField& field, const std::
max_scr_column->append_strings(std::vector<Slice>{max_slice});
converter->convert(max_scr_column, max_column->get());
}
return Status::OK();
break;
}
default:
return Status::NotSupported("min max statistics not supported");
*decode_ok = false;
}
return Status::OK();
}

bool FileReader::_can_use_min_max_stats(const tparquet::ColumnMetaData& column_meta,
Expand Down
2 changes: 1 addition & 1 deletion be/src/formats/parquet/file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class FileReader {
static Status _decode_min_max_column(const ParquetField& field, const std::string& timezone,
const TypeDescriptor& type, const tparquet::ColumnMetaData& column_meta,
const tparquet::ColumnOrder* column_order, vectorized::ColumnPtr* min_column,
vectorized::ColumnPtr* max_column);
vectorized::ColumnPtr* max_column, bool* decode_ok);
static bool _can_use_min_max_stats(const tparquet::ColumnMetaData& column_meta,
const tparquet::ColumnOrder* column_order);
// statistics.min_value max_value
Expand Down
Binary file not shown.
71 changes: 71 additions & 0 deletions be/test/exec/vectorized/hdfs_scanner_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1074,4 +1074,75 @@ TEST_F(HdfsScannerTest, TestParquetCoalesceReadAcrossRowGroup) {

scanner->close(_runtime_state);
}

// =============================================================================

/*
file: file:/Users/dirlt/Downloads/part-00000-4a878ed5-fa12-4e43-a164-1650976be336-c000.snappy.parquet
creator: parquet-mr version 1.10.1 (build a89df8f9932b6ef6633d06069e50c9b7970bebd1)
extra: org.apache.spark.version = 2.4.7
extra: org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields":[{"name":"vin","type":"string","nullable":true,"metadata":{}},{"name":"log_domain","type":"string","nullable":true,"metadata":{}},{"name":"file_name","type":"string","nullable":true,"metadata":{}},{"name":"is_collection","type":"integer","nullable":false,"metadata":{}},{"name":"is_center","type":"integer","nullable":false,"metadata":{}},{"name":"is_cloud","type":"integer","nullable":false,"metadata":{}},{"name":"collection_time","type":"string","nullable":false,"metadata":{}},{"name":"center_time","type":"string","nullable":false,"metadata":{}},{"name":"cloud_time","type":"string","nullable":false,"metadata":{}},{"name":"error_collection_tips","type":"string","nullable":false,"metadata":{}},{"name":"error_center_tips","type":"string","nullable":false,"metadata":{}},{"name":"error_cloud_tips","type":"string","nullable":false,"metadata":{}},{"name":"error_collection_time","type":"string","nullable":false,"metadata":{}},{"name":"error_center_time","type":"string","nullable":false,"metadata":{}},{"name":"error_cloud_time","type":"string","nullable":false,"metadata":{}},{"name":"original_time","type":"string","nullable":false,"metadata":{}},{"name":"is_original","type":"integer","nullable":false,"metadata":{}}]}
file schema: spark_schema
--------------------------------------------------------------------------------
vin: OPTIONAL BINARY O:UTF8 R:0 D:1
log_domain: OPTIONAL BINARY O:UTF8 R:0 D:1
file_name: OPTIONAL BINARY O:UTF8 R:0 D:1
is_collection: REQUIRED INT32 R:0 D:0
is_center: REQUIRED INT32 R:0 D:0
is_cloud: REQUIRED INT32 R:0 D:0
collection_time: REQUIRED BINARY O:UTF8 R:0 D:0
center_time: REQUIRED BINARY O:UTF8 R:0 D:0
cloud_time: REQUIRED BINARY O:UTF8 R:0 D:0
error_collection_tips: REQUIRED BINARY O:UTF8 R:0 D:0
error_center_tips: REQUIRED BINARY O:UTF8 R:0 D:0
error_cloud_tips: REQUIRED BINARY O:UTF8 R:0 D:0
error_collection_time: REQUIRED BINARY O:UTF8 R:0 D:0
error_center_time: REQUIRED BINARY O:UTF8 R:0 D:0
error_cloud_time: REQUIRED BINARY O:UTF8 R:0 D:0
original_time: REQUIRED BINARY O:UTF8 R:0 D:0
is_original: REQUIRED INT32 R:0 D:0
*/

TEST_F(HdfsScannerTest, TestParqueTypeMismatchDecodeMinMax) {
SlotDesc parquet_descs[] = {{"vin", TypeDescriptor::from_primtive_type(PrimitiveType::TYPE_VARCHAR, 22)},
{"is_cloud", TypeDescriptor::from_primtive_type(PrimitiveType::TYPE_VARCHAR)},
{""}};

SlotDesc min_max_descs[] = {{"vin", TypeDescriptor::from_primtive_type(PrimitiveType::TYPE_VARCHAR, 22)}, {""}};

const std::string parquet_file = "./be/test/exec/test_data/parquet_scanner/type_mismatch_decode_min_max.parquet";

auto scanner = std::make_shared<HdfsParquetScanner>();
ObjectPool* pool = &_pool;
auto* range = _create_scan_range(parquet_file, 0, 0);
auto* tuple_desc = _create_tuple_desc(parquet_descs);
auto* param = _create_param(parquet_file, range, tuple_desc);

// select vin,is_cloud from table where is_cloud >= '0';
auto* min_max_tuple_desc = _create_tuple_desc(min_max_descs);
{
std::vector<TExprNode> nodes;
TExprNode lit_node = create_string_literal_node(TPrimitiveType::VARCHAR, "0");
push_binary_pred_texpr_node(nodes, TExprOpcode::GE, min_max_tuple_desc->slots()[0], TPrimitiveType::VARCHAR,
lit_node);
ExprContext* ctx = create_expr_context(pool, nodes);
param->min_max_conjunct_ctxs.push_back(ctx);
}

param->min_max_tuple_desc = min_max_tuple_desc;
for (ExprContext* ctx : param->min_max_conjunct_ctxs) {
ctx->prepare(_runtime_state);
ctx->open(_runtime_state);
}

Status status = scanner->init(_runtime_state, *param);
EXPECT_TRUE(status.ok());
for (ExprContext* ctx : param->min_max_conjunct_ctxs) {
ctx->close(_runtime_state);
}
status = scanner->open(_runtime_state);
EXPECT_TRUE(!status.ok());
}

} // namespace starrocks::vectorized

0 comments on commit 17547d7

Please sign in to comment.