Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] Fix crash during decoding min/max when type mismatch in parquet file #8849

Merged
merged 3 commits into from Jul 19, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 6 additions & 1 deletion be/src/connector/hive_connector.cpp
Expand Up @@ -77,6 +77,7 @@ Status HiveDataSource::_init_conjunct_ctxs(RuntimeState* state) {
RETURN_IF_ERROR(Expr::create_expr_trees(&_pool, hdfs_scan_node.partition_conjuncts, &_partition_conjunct_ctxs));
_has_partition_conjuncts = true;
}

RETURN_IF_ERROR(Expr::prepare(_min_max_conjunct_ctxs, state));
RETURN_IF_ERROR(Expr::prepare(_partition_conjunct_ctxs, state));
RETURN_IF_ERROR(Expr::open(_min_max_conjunct_ctxs, state));
Expand Down Expand Up @@ -261,7 +262,11 @@ Status HiveDataSource::_init_scanner(RuntimeState* state) {
return Status::NotSupported(msg);
}
RETURN_IF_ERROR(scanner->init(state, scanner_params));
RETURN_IF_ERROR(scanner->open(state));
Status st = scanner->open(state);
if (!st.ok()) {
auto msg = fmt::format("file = {}", native_file_path);
return st.clone_and_append(msg);
}
_scanner = scanner;
return Status::OK();
}
Expand Down
15 changes: 14 additions & 1 deletion be/src/exec/vectorized/hdfs_scanner.cpp
Expand Up @@ -70,6 +70,18 @@ Status HdfsScanner::init(RuntimeState* runtime_state, const HdfsScannerParams& s
RETURN_IF_ERROR(
Expr::clone_if_not_exists(_scanner_params.min_max_conjunct_ctxs, runtime_state, &_min_max_conjunct_ctxs));

// Why we need this class? Because in above code, we clone many conjuncts from runtime_state->obj_pool()
// And if we quit execution by early abortion, ~RuntimeState() is called directly
// And those conjuncts are released befoe `HdfsScanner` class, so when we call `HdfsScanner::close`
// we will get invalid pointers. And to resolve this problem, we add this instance at the end of obj_pool.
// so this instance must be released before conjuncts, and `HdfsScanner::close` will get valid pointers.
struct ReleaseFence {
HdfsScanner* ptr;
ReleaseFence(HdfsScanner* p) : ptr(p) {}
~ReleaseFence() { ptr->finalize(); }
};
_runtime_state->obj_pool()->add(new ReleaseFence(this));

Status status = do_init(runtime_state, scanner_params);
return status;
}
Expand Down Expand Up @@ -171,6 +183,7 @@ void HdfsScanner::close(RuntimeState* runtime_state) noexcept {
for (auto& it : _conjunct_ctxs_by_slot) {
Expr::close(it.second, runtime_state);
}

do_close(runtime_state);
_file.reset(nullptr);
_raw_file.reset(nullptr);
Expand All @@ -179,7 +192,7 @@ void HdfsScanner::close(RuntimeState* runtime_state) noexcept {
}
}

void HdfsScanner::fianlize() {
void HdfsScanner::finalize() {
if (_runtime_state != nullptr) {
close(_runtime_state);
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/vectorized/hdfs_scanner.h
Expand Up @@ -188,7 +188,7 @@ class HdfsScanner {
void close(RuntimeState* runtime_state) noexcept;
Status get_next(RuntimeState* runtime_state, ChunkPtr* chunk);
Status init(RuntimeState* runtime_state, const HdfsScannerParams& scanner_params);
void fianlize();
void finalize();

int64_t num_bytes_read() const { return _stats.bytes_read; }
int64_t raw_rows_read() const { return _stats.raw_rows_read; }
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/vectorized/hdfs_scanner_orc.h
Expand Up @@ -14,7 +14,7 @@ class OrcRowReaderFilter;
class HdfsOrcScanner final : public HdfsScanner {
public:
HdfsOrcScanner() = default;
~HdfsOrcScanner() override { fianlize(); }
~HdfsOrcScanner() override = default;

Status do_open(RuntimeState* runtime_state) override;
void do_close(RuntimeState* runtime_state) noexcept override;
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/vectorized/hdfs_scanner_parquet.h
Expand Up @@ -9,7 +9,7 @@ namespace starrocks::vectorized {
class HdfsParquetScanner final : public HdfsScanner {
public:
HdfsParquetScanner() = default;
~HdfsParquetScanner() override { fianlize(); }
~HdfsParquetScanner() override = default;

Status do_open(RuntimeState* runtime_state) override;
void do_close(RuntimeState* runtime_state) noexcept override;
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/vectorized/hdfs_scanner_text.h
Expand Up @@ -11,7 +11,7 @@ namespace starrocks::vectorized {
class HdfsTextScanner final : public HdfsScanner {
public:
HdfsTextScanner() = default;
~HdfsTextScanner() override { fianlize(); }
~HdfsTextScanner() override = default;

Status do_open(RuntimeState* runtime_state) override;
void do_close(RuntimeState* runtime_state) noexcept override;
Expand Down
30 changes: 18 additions & 12 deletions be/src/formats/parquet/file_reader.cpp
Expand Up @@ -168,9 +168,11 @@ Status FileReader::_read_min_max_chunk(const tparquet::RowGroup& row_group, vect
column_order = column_idx < column_orders.size() ? &column_orders[column_idx] : nullptr;
}

Status status = _decode_min_max_column(*field, ctx.timezone, slot->type(), *column_meta, column_order,
&(*min_chunk)->columns()[i], &(*max_chunk)->columns()[i]);
if (!status.ok()) {
bool decode_ok = false;
RETURN_IF_ERROR(_decode_min_max_column(*field, ctx.timezone, slot->type(), *column_meta, column_order,
&(*min_chunk)->columns()[i], &(*max_chunk)->columns()[i],
&decode_ok));
if (!decode_ok) {
dirtysalt marked this conversation as resolved.
Show resolved Hide resolved
*exist = false;
return Status::OK();
}
Expand All @@ -193,9 +195,12 @@ int FileReader::_get_partition_column_idx(const std::string& col_name) const {
Status FileReader::_decode_min_max_column(const ParquetField& field, const std::string& timezone,
const TypeDescriptor& type, const tparquet::ColumnMetaData& column_meta,
const tparquet::ColumnOrder* column_order, vectorized::ColumnPtr* min_column,
vectorized::ColumnPtr* max_column) {
vectorized::ColumnPtr* max_column, bool* decode_ok) {
*decode_ok = true;

if (!_can_use_min_max_stats(column_meta, column_order)) {
return Status::NotSupported("min max statistics not supported");
dirtysalt marked this conversation as resolved.
Show resolved Hide resolved
*decode_ok = false;
return Status::OK();
}

switch (column_meta.type) {
Expand All @@ -210,7 +215,7 @@ Status FileReader::_decode_min_max_column(const ParquetField& field, const std::
RETURN_IF_ERROR(PlainDecoder<int32_t>::decode(column_meta.statistics.max, &max_value));
}
std::unique_ptr<ColumnConverter> converter;
ColumnConverterFactory::create_converter(field, type, timezone, &converter);
RETURN_IF_ERROR(ColumnConverterFactory::create_converter(field, type, timezone, &converter));
dirtysalt marked this conversation as resolved.
Show resolved Hide resolved

if (!converter->need_convert) {
(*min_column)->append_numbers(&min_value, sizeof(int32_t));
Expand All @@ -224,7 +229,7 @@ Status FileReader::_decode_min_max_column(const ParquetField& field, const std::
max_scr_column->append_numbers(&max_value, sizeof(int32_t));
converter->convert(max_scr_column, max_column->get());
}
return Status::OK();
break;
}
case tparquet::Type::type::INT64: {
int64_t min_value = 0;
Expand All @@ -237,7 +242,7 @@ Status FileReader::_decode_min_max_column(const ParquetField& field, const std::
RETURN_IF_ERROR(PlainDecoder<int64_t>::decode(column_meta.statistics.min, &min_value));
}
std::unique_ptr<ColumnConverter> converter;
ColumnConverterFactory::create_converter(field, type, timezone, &converter);
RETURN_IF_ERROR(ColumnConverterFactory::create_converter(field, type, timezone, &converter));

if (!converter->need_convert) {
(*min_column)->append_numbers(&min_value, sizeof(int64_t));
Expand All @@ -251,7 +256,7 @@ Status FileReader::_decode_min_max_column(const ParquetField& field, const std::
max_scr_column->append_numbers(&max_value, sizeof(int64_t));
converter->convert(max_scr_column, max_column->get());
}
return Status::OK();
break;
}
case tparquet::Type::type::BYTE_ARRAY: {
Slice min_slice;
Expand All @@ -264,7 +269,7 @@ Status FileReader::_decode_min_max_column(const ParquetField& field, const std::
RETURN_IF_ERROR(PlainDecoder<Slice>::decode(column_meta.statistics.max, &max_slice));
}
std::unique_ptr<ColumnConverter> converter;
ColumnConverterFactory::create_converter(field, type, timezone, &converter);
RETURN_IF_ERROR(ColumnConverterFactory::create_converter(field, type, timezone, &converter));

if (!converter->need_convert) {
(*min_column)->append_strings(std::vector<Slice>{min_slice});
Expand All @@ -278,11 +283,12 @@ Status FileReader::_decode_min_max_column(const ParquetField& field, const std::
max_scr_column->append_strings(std::vector<Slice>{max_slice});
converter->convert(max_scr_column, max_column->get());
}
return Status::OK();
break;
}
default:
return Status::NotSupported("min max statistics not supported");
*decode_ok = false;
}
return Status::OK();
}

bool FileReader::_can_use_min_max_stats(const tparquet::ColumnMetaData& column_meta,
Expand Down
2 changes: 1 addition & 1 deletion be/src/formats/parquet/file_reader.h
Expand Up @@ -75,7 +75,7 @@ class FileReader {
static Status _decode_min_max_column(const ParquetField& field, const std::string& timezone,
const TypeDescriptor& type, const tparquet::ColumnMetaData& column_meta,
const tparquet::ColumnOrder* column_order, vectorized::ColumnPtr* min_column,
vectorized::ColumnPtr* max_column);
vectorized::ColumnPtr* max_column, bool* decode_ok);
static bool _can_use_min_max_stats(const tparquet::ColumnMetaData& column_meta,
const tparquet::ColumnOrder* column_order);
// statistics.min_value max_value
Expand Down
Binary file not shown.
71 changes: 71 additions & 0 deletions be/test/exec/vectorized/hdfs_scanner_test.cpp
Expand Up @@ -1074,4 +1074,75 @@ TEST_F(HdfsScannerTest, TestParquetCoalesceReadAcrossRowGroup) {

scanner->close(_runtime_state);
}

// =============================================================================

/*
file: file:/Users/dirlt/Downloads/part-00000-4a878ed5-fa12-4e43-a164-1650976be336-c000.snappy.parquet
creator: parquet-mr version 1.10.1 (build a89df8f9932b6ef6633d06069e50c9b7970bebd1)
extra: org.apache.spark.version = 2.4.7
extra: org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields":[{"name":"vin","type":"string","nullable":true,"metadata":{}},{"name":"log_domain","type":"string","nullable":true,"metadata":{}},{"name":"file_name","type":"string","nullable":true,"metadata":{}},{"name":"is_collection","type":"integer","nullable":false,"metadata":{}},{"name":"is_center","type":"integer","nullable":false,"metadata":{}},{"name":"is_cloud","type":"integer","nullable":false,"metadata":{}},{"name":"collection_time","type":"string","nullable":false,"metadata":{}},{"name":"center_time","type":"string","nullable":false,"metadata":{}},{"name":"cloud_time","type":"string","nullable":false,"metadata":{}},{"name":"error_collection_tips","type":"string","nullable":false,"metadata":{}},{"name":"error_center_tips","type":"string","nullable":false,"metadata":{}},{"name":"error_cloud_tips","type":"string","nullable":false,"metadata":{}},{"name":"error_collection_time","type":"string","nullable":false,"metadata":{}},{"name":"error_center_time","type":"string","nullable":false,"metadata":{}},{"name":"error_cloud_time","type":"string","nullable":false,"metadata":{}},{"name":"original_time","type":"string","nullable":false,"metadata":{}},{"name":"is_original","type":"integer","nullable":false,"metadata":{}}]}

file schema: spark_schema
--------------------------------------------------------------------------------
vin: OPTIONAL BINARY O:UTF8 R:0 D:1
log_domain: OPTIONAL BINARY O:UTF8 R:0 D:1
file_name: OPTIONAL BINARY O:UTF8 R:0 D:1
is_collection: REQUIRED INT32 R:0 D:0
is_center: REQUIRED INT32 R:0 D:0
is_cloud: REQUIRED INT32 R:0 D:0
collection_time: REQUIRED BINARY O:UTF8 R:0 D:0
center_time: REQUIRED BINARY O:UTF8 R:0 D:0
cloud_time: REQUIRED BINARY O:UTF8 R:0 D:0
error_collection_tips: REQUIRED BINARY O:UTF8 R:0 D:0
error_center_tips: REQUIRED BINARY O:UTF8 R:0 D:0
error_cloud_tips: REQUIRED BINARY O:UTF8 R:0 D:0
error_collection_time: REQUIRED BINARY O:UTF8 R:0 D:0
error_center_time: REQUIRED BINARY O:UTF8 R:0 D:0
error_cloud_time: REQUIRED BINARY O:UTF8 R:0 D:0
original_time: REQUIRED BINARY O:UTF8 R:0 D:0
is_original: REQUIRED INT32 R:0 D:0
*/

TEST_F(HdfsScannerTest, TestParqueTypeMismatchDecodeMinMax) {
SlotDesc parquet_descs[] = {{"vin", TypeDescriptor::from_primtive_type(PrimitiveType::TYPE_VARCHAR, 22)},
{"is_cloud", TypeDescriptor::from_primtive_type(PrimitiveType::TYPE_VARCHAR)},
{""}};

SlotDesc min_max_descs[] = {{"vin", TypeDescriptor::from_primtive_type(PrimitiveType::TYPE_VARCHAR, 22)}, {""}};

const std::string parquet_file = "./be/test/exec/test_data/parquet_scanner/type_mismatch_decode_min_max.parquet";

auto scanner = std::make_shared<HdfsParquetScanner>();
ObjectPool* pool = &_pool;
auto* range = _create_scan_range(parquet_file, 0, 0);
auto* tuple_desc = _create_tuple_desc(parquet_descs);
auto* param = _create_param(parquet_file, range, tuple_desc);

// select vin,is_cloud from table where is_cloud >= '0';
auto* min_max_tuple_desc = _create_tuple_desc(min_max_descs);
{
std::vector<TExprNode> nodes;
TExprNode lit_node = create_string_literal_node(TPrimitiveType::VARCHAR, "0");
push_binary_pred_texpr_node(nodes, TExprOpcode::GE, min_max_tuple_desc->slots()[0], TPrimitiveType::VARCHAR,
lit_node);
ExprContext* ctx = create_expr_context(pool, nodes);
param->min_max_conjunct_ctxs.push_back(ctx);
}

param->min_max_tuple_desc = min_max_tuple_desc;
for (ExprContext* ctx : param->min_max_conjunct_ctxs) {
ctx->prepare(_runtime_state);
ctx->open(_runtime_state);
}

Status status = scanner->init(_runtime_state, *param);
EXPECT_TRUE(status.ok());
for (ExprContext* ctx : param->min_max_conjunct_ctxs) {
ctx->close(_runtime_state);
}
status = scanner->open(_runtime_state);
EXPECT_TRUE(!status.ok());
}

} // namespace starrocks::vectorized