Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] Fix crash during decoding min/max when type mismatch in parquet file #8849

Merged
merged 3 commits into from Jul 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 6 additions & 1 deletion be/src/connector/hive_connector.cpp
Expand Up @@ -77,6 +77,7 @@ Status HiveDataSource::_init_conjunct_ctxs(RuntimeState* state) {
RETURN_IF_ERROR(Expr::create_expr_trees(&_pool, hdfs_scan_node.partition_conjuncts, &_partition_conjunct_ctxs));
_has_partition_conjuncts = true;
}

RETURN_IF_ERROR(Expr::prepare(_min_max_conjunct_ctxs, state));
RETURN_IF_ERROR(Expr::prepare(_partition_conjunct_ctxs, state));
RETURN_IF_ERROR(Expr::open(_min_max_conjunct_ctxs, state));
Expand Down Expand Up @@ -261,7 +262,11 @@ Status HiveDataSource::_init_scanner(RuntimeState* state) {
return Status::NotSupported(msg);
}
RETURN_IF_ERROR(scanner->init(state, scanner_params));
RETURN_IF_ERROR(scanner->open(state));
Status st = scanner->open(state);
if (!st.ok()) {
auto msg = fmt::format("file = {}", native_file_path);
return st.clone_and_append(msg);
}
_scanner = scanner;
return Status::OK();
}
Expand Down
9 changes: 4 additions & 5 deletions be/src/exec/vectorized/hdfs_scanner.cpp
Expand Up @@ -66,9 +66,8 @@ Status HdfsScanner::init(RuntimeState* runtime_state, const HdfsScannerParams& s
}
}

// min/max conjuncts.
RETURN_IF_ERROR(
Expr::clone_if_not_exists(_scanner_params.min_max_conjunct_ctxs, runtime_state, &_min_max_conjunct_ctxs));
// No need to clone. It's cloned from outside.
_min_max_conjunct_ctxs = _scanner_params.min_max_conjunct_ctxs;

Status status = do_init(runtime_state, scanner_params);
return status;
Expand Down Expand Up @@ -167,10 +166,10 @@ void HdfsScanner::close(RuntimeState* runtime_state) noexcept {
if (!_closed.compare_exchange_strong(expect, true)) return;
update_counter();
Expr::close(_conjunct_ctxs, runtime_state);
Expr::close(_min_max_conjunct_ctxs, runtime_state);
for (auto& it : _conjunct_ctxs_by_slot) {
Expr::close(it.second, runtime_state);
}

do_close(runtime_state);
_file.reset(nullptr);
_raw_file.reset(nullptr);
Expand All @@ -179,7 +178,7 @@ void HdfsScanner::close(RuntimeState* runtime_state) noexcept {
}
}

void HdfsScanner::fianlize() {
void HdfsScanner::finalize() {
if (_runtime_state != nullptr) {
close(_runtime_state);
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/vectorized/hdfs_scanner.h
Expand Up @@ -188,7 +188,7 @@ class HdfsScanner {
void close(RuntimeState* runtime_state) noexcept;
Status get_next(RuntimeState* runtime_state, ChunkPtr* chunk);
Status init(RuntimeState* runtime_state, const HdfsScannerParams& scanner_params);
void fianlize();
void finalize();

int64_t num_bytes_read() const { return _stats.bytes_read; }
int64_t raw_rows_read() const { return _stats.raw_rows_read; }
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/vectorized/hdfs_scanner_orc.h
Expand Up @@ -14,7 +14,7 @@ class OrcRowReaderFilter;
class HdfsOrcScanner final : public HdfsScanner {
public:
HdfsOrcScanner() = default;
~HdfsOrcScanner() override { fianlize(); }
~HdfsOrcScanner() override = default;

Status do_open(RuntimeState* runtime_state) override;
void do_close(RuntimeState* runtime_state) noexcept override;
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/vectorized/hdfs_scanner_parquet.h
Expand Up @@ -9,7 +9,7 @@ namespace starrocks::vectorized {
class HdfsParquetScanner final : public HdfsScanner {
public:
HdfsParquetScanner() = default;
~HdfsParquetScanner() override { fianlize(); }
~HdfsParquetScanner() override = default;

Status do_open(RuntimeState* runtime_state) override;
void do_close(RuntimeState* runtime_state) noexcept override;
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/vectorized/hdfs_scanner_text.h
Expand Up @@ -11,7 +11,7 @@ namespace starrocks::vectorized {
class HdfsTextScanner final : public HdfsScanner {
public:
HdfsTextScanner() = default;
~HdfsTextScanner() override { fianlize(); }
~HdfsTextScanner() override = default;

Status do_open(RuntimeState* runtime_state) override;
void do_close(RuntimeState* runtime_state) noexcept override;
Expand Down
11 changes: 6 additions & 5 deletions be/src/exprs/expr.cpp
Expand Up @@ -218,11 +218,6 @@ Status Expr::create_tree_from_thrift(ObjectPool* pool, const std::vector<TExprNo
DCHECK(expr != nullptr);
if (parent != nullptr) {
parent->add_child(expr);
} else {
DCHECK(root_expr != nullptr);
DCHECK(ctx != nullptr);
*root_expr = expr;
*ctx = pool->add(new ExprContext(expr));
}
for (int i = 0; i < num_children; i++) {
*node_idx += 1;
Expand All @@ -233,6 +228,12 @@ Status Expr::create_tree_from_thrift(ObjectPool* pool, const std::vector<TExprNo
return Status::InternalError("Failed to reconstruct expression tree from thrift.");
}
}
if (parent == nullptr) {
DCHECK(root_expr != nullptr);
DCHECK(ctx != nullptr);
*root_expr = expr;
*ctx = pool->add(new ExprContext(expr));
}
return Status::OK();
}

Expand Down
30 changes: 18 additions & 12 deletions be/src/formats/parquet/file_reader.cpp
Expand Up @@ -168,9 +168,11 @@ Status FileReader::_read_min_max_chunk(const tparquet::RowGroup& row_group, vect
column_order = column_idx < column_orders.size() ? &column_orders[column_idx] : nullptr;
}

Status status = _decode_min_max_column(*field, ctx.timezone, slot->type(), *column_meta, column_order,
&(*min_chunk)->columns()[i], &(*max_chunk)->columns()[i]);
if (!status.ok()) {
bool decode_ok = false;
RETURN_IF_ERROR(_decode_min_max_column(*field, ctx.timezone, slot->type(), *column_meta, column_order,
&(*min_chunk)->columns()[i], &(*max_chunk)->columns()[i],
&decode_ok));
if (!decode_ok) {
dirtysalt marked this conversation as resolved.
Show resolved Hide resolved
*exist = false;
return Status::OK();
}
Expand All @@ -193,9 +195,12 @@ int FileReader::_get_partition_column_idx(const std::string& col_name) const {
Status FileReader::_decode_min_max_column(const ParquetField& field, const std::string& timezone,
const TypeDescriptor& type, const tparquet::ColumnMetaData& column_meta,
const tparquet::ColumnOrder* column_order, vectorized::ColumnPtr* min_column,
vectorized::ColumnPtr* max_column) {
vectorized::ColumnPtr* max_column, bool* decode_ok) {
*decode_ok = true;

if (!_can_use_min_max_stats(column_meta, column_order)) {
return Status::NotSupported("min max statistics not supported");
dirtysalt marked this conversation as resolved.
Show resolved Hide resolved
*decode_ok = false;
return Status::OK();
}

switch (column_meta.type) {
Expand All @@ -210,7 +215,7 @@ Status FileReader::_decode_min_max_column(const ParquetField& field, const std::
RETURN_IF_ERROR(PlainDecoder<int32_t>::decode(column_meta.statistics.max, &max_value));
}
std::unique_ptr<ColumnConverter> converter;
ColumnConverterFactory::create_converter(field, type, timezone, &converter);
RETURN_IF_ERROR(ColumnConverterFactory::create_converter(field, type, timezone, &converter));
dirtysalt marked this conversation as resolved.
Show resolved Hide resolved

if (!converter->need_convert) {
(*min_column)->append_numbers(&min_value, sizeof(int32_t));
Expand All @@ -224,7 +229,7 @@ Status FileReader::_decode_min_max_column(const ParquetField& field, const std::
max_scr_column->append_numbers(&max_value, sizeof(int32_t));
converter->convert(max_scr_column, max_column->get());
}
return Status::OK();
break;
}
case tparquet::Type::type::INT64: {
int64_t min_value = 0;
Expand All @@ -237,7 +242,7 @@ Status FileReader::_decode_min_max_column(const ParquetField& field, const std::
RETURN_IF_ERROR(PlainDecoder<int64_t>::decode(column_meta.statistics.min, &min_value));
}
std::unique_ptr<ColumnConverter> converter;
ColumnConverterFactory::create_converter(field, type, timezone, &converter);
RETURN_IF_ERROR(ColumnConverterFactory::create_converter(field, type, timezone, &converter));

if (!converter->need_convert) {
(*min_column)->append_numbers(&min_value, sizeof(int64_t));
Expand All @@ -251,7 +256,7 @@ Status FileReader::_decode_min_max_column(const ParquetField& field, const std::
max_scr_column->append_numbers(&max_value, sizeof(int64_t));
converter->convert(max_scr_column, max_column->get());
}
return Status::OK();
break;
}
case tparquet::Type::type::BYTE_ARRAY: {
Slice min_slice;
Expand All @@ -264,7 +269,7 @@ Status FileReader::_decode_min_max_column(const ParquetField& field, const std::
RETURN_IF_ERROR(PlainDecoder<Slice>::decode(column_meta.statistics.max, &max_slice));
}
std::unique_ptr<ColumnConverter> converter;
ColumnConverterFactory::create_converter(field, type, timezone, &converter);
RETURN_IF_ERROR(ColumnConverterFactory::create_converter(field, type, timezone, &converter));

if (!converter->need_convert) {
(*min_column)->append_strings(std::vector<Slice>{min_slice});
Expand All @@ -278,11 +283,12 @@ Status FileReader::_decode_min_max_column(const ParquetField& field, const std::
max_scr_column->append_strings(std::vector<Slice>{max_slice});
converter->convert(max_scr_column, max_column->get());
}
return Status::OK();
break;
}
default:
return Status::NotSupported("min max statistics not supported");
*decode_ok = false;
}
return Status::OK();
}

bool FileReader::_can_use_min_max_stats(const tparquet::ColumnMetaData& column_meta,
Expand Down
2 changes: 1 addition & 1 deletion be/src/formats/parquet/file_reader.h
Expand Up @@ -75,7 +75,7 @@ class FileReader {
static Status _decode_min_max_column(const ParquetField& field, const std::string& timezone,
const TypeDescriptor& type, const tparquet::ColumnMetaData& column_meta,
const tparquet::ColumnOrder* column_order, vectorized::ColumnPtr* min_column,
vectorized::ColumnPtr* max_column);
vectorized::ColumnPtr* max_column, bool* decode_ok);
static bool _can_use_min_max_stats(const tparquet::ColumnMetaData& column_meta,
const tparquet::ColumnOrder* column_order);
// statistics.min_value max_value
Expand Down
Binary file not shown.
114 changes: 78 additions & 36 deletions be/test/exec/vectorized/hdfs_scanner_test.cpp
Expand Up @@ -422,16 +422,11 @@ TEST_F(HdfsScannerTest, TestOrcGetNextWithMinMaxFilterNoRows) {
// id min/max = 2629/5212, PART_Y min/max=20/20
std::vector<int> thres = {20, 30, 20, 20};
extend_mtypes_orc_min_max_conjuncts(&_pool, param, thres);
for (ExprContext* ctx : param->min_max_conjunct_ctxs) {
ctx->prepare(_runtime_state);
ctx->open(_runtime_state);
}
Expr::prepare(param->min_max_conjunct_ctxs, _runtime_state);
Expr::open(param->min_max_conjunct_ctxs, _runtime_state);

Status status = scanner->init(_runtime_state, *param);
EXPECT_TRUE(status.ok());
for (ExprContext* ctx : param->min_max_conjunct_ctxs) {
ctx->close(_runtime_state);
}

status = scanner->open(_runtime_state);
EXPECT_TRUE(status.ok());
Expand All @@ -455,16 +450,11 @@ TEST_F(HdfsScannerTest, TestOrcGetNextWithMinMaxFilterRows1) {
// id min/max = 2629/5212, PART_Y min/max=20/20
std::vector<int> thres = {2000, 5000, 20, 20};
extend_mtypes_orc_min_max_conjuncts(&_pool, param, thres);
for (ExprContext* ctx : param->min_max_conjunct_ctxs) {
ctx->prepare(_runtime_state);
ctx->open(_runtime_state);
}
Expr::prepare(param->min_max_conjunct_ctxs, _runtime_state);
Expr::open(param->min_max_conjunct_ctxs, _runtime_state);

Status status = scanner->init(_runtime_state, *param);
EXPECT_TRUE(status.ok());
for (ExprContext* ctx : param->min_max_conjunct_ctxs) {
ctx->close(_runtime_state);
}

status = scanner->open(_runtime_state);
EXPECT_TRUE(status.ok());
Expand All @@ -488,16 +478,11 @@ TEST_F(HdfsScannerTest, TestOrcGetNextWithMinMaxFilterRows2) {
// id min/max = 2629/5212, PART_Y min/max=20/20
std::vector<int> thres = {3000, 10000, 20, 20};
extend_mtypes_orc_min_max_conjuncts(&_pool, param, thres);
for (ExprContext* ctx : param->min_max_conjunct_ctxs) {
ctx->prepare(_runtime_state);
ctx->open(_runtime_state);
}
Expr::prepare(param->min_max_conjunct_ctxs, _runtime_state);
Expr::open(param->min_max_conjunct_ctxs, _runtime_state);

Status status = scanner->init(_runtime_state, *param);
EXPECT_TRUE(status.ok());
for (ExprContext* ctx : param->min_max_conjunct_ctxs) {
ctx->close(_runtime_state);
}

status = scanner->open(_runtime_state);
EXPECT_TRUE(status.ok());
Expand Down Expand Up @@ -671,16 +656,11 @@ TEST_F(HdfsScannerTest, TestOrcGetNextWithDatetimeMinMaxFilter) {
param->min_max_conjunct_ctxs.push_back(ctx);
}

for (ExprContext* ctx : param->min_max_conjunct_ctxs) {
ctx->prepare(_runtime_state);
ctx->open(_runtime_state);
}
Expr::prepare(param->min_max_conjunct_ctxs, _runtime_state);
Expr::open(param->min_max_conjunct_ctxs, _runtime_state);

Status status = scanner->init(_runtime_state, *param);
EXPECT_TRUE(status.ok());
for (ExprContext* ctx : param->min_max_conjunct_ctxs) {
ctx->close(_runtime_state);
}

scanner->disable_use_orc_sargs();
status = scanner->open(_runtime_state);
Expand Down Expand Up @@ -904,19 +884,13 @@ TEST_F(HdfsScannerTest, DecodeMinMaxDateTime) {
param->min_max_conjunct_ctxs.push_back(ctx);
}

for (ExprContext* ctx : param->min_max_conjunct_ctxs) {
ctx->prepare(_runtime_state);
ctx->open(_runtime_state);
}
Expr::prepare(param->min_max_conjunct_ctxs, _runtime_state);
Expr::open(param->min_max_conjunct_ctxs, _runtime_state);

auto scanner = std::make_shared<HdfsOrcScanner>();
Status status = scanner->init(_runtime_state, *param);
EXPECT_TRUE(status.ok());

for (ExprContext* ctx : param->min_max_conjunct_ctxs) {
ctx->close(_runtime_state);
}

scanner->disable_use_orc_sargs();
status = scanner->open(_runtime_state);
EXPECT_TRUE(status.ok()) << status.to_string();
Expand Down Expand Up @@ -1074,4 +1048,72 @@ TEST_F(HdfsScannerTest, TestParquetCoalesceReadAcrossRowGroup) {

scanner->close(_runtime_state);
}

// =============================================================================

/*
file: file:/Users/dirlt/Downloads/part-00000-4a878ed5-fa12-4e43-a164-1650976be336-c000.snappy.parquet
creator: parquet-mr version 1.10.1 (build a89df8f9932b6ef6633d06069e50c9b7970bebd1)
extra: org.apache.spark.version = 2.4.7
extra: org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields":[{"name":"vin","type":"string","nullable":true,"metadata":{}},{"name":"log_domain","type":"string","nullable":true,"metadata":{}},{"name":"file_name","type":"string","nullable":true,"metadata":{}},{"name":"is_collection","type":"integer","nullable":false,"metadata":{}},{"name":"is_center","type":"integer","nullable":false,"metadata":{}},{"name":"is_cloud","type":"integer","nullable":false,"metadata":{}},{"name":"collection_time","type":"string","nullable":false,"metadata":{}},{"name":"center_time","type":"string","nullable":false,"metadata":{}},{"name":"cloud_time","type":"string","nullable":false,"metadata":{}},{"name":"error_collection_tips","type":"string","nullable":false,"metadata":{}},{"name":"error_center_tips","type":"string","nullable":false,"metadata":{}},{"name":"error_cloud_tips","type":"string","nullable":false,"metadata":{}},{"name":"error_collection_time","type":"string","nullable":false,"metadata":{}},{"name":"error_center_time","type":"string","nullable":false,"metadata":{}},{"name":"error_cloud_time","type":"string","nullable":false,"metadata":{}},{"name":"original_time","type":"string","nullable":false,"metadata":{}},{"name":"is_original","type":"integer","nullable":false,"metadata":{}}]}

file schema: spark_schema
--------------------------------------------------------------------------------
vin: OPTIONAL BINARY O:UTF8 R:0 D:1
log_domain: OPTIONAL BINARY O:UTF8 R:0 D:1
file_name: OPTIONAL BINARY O:UTF8 R:0 D:1
is_collection: REQUIRED INT32 R:0 D:0
is_center: REQUIRED INT32 R:0 D:0
is_cloud: REQUIRED INT32 R:0 D:0
collection_time: REQUIRED BINARY O:UTF8 R:0 D:0
center_time: REQUIRED BINARY O:UTF8 R:0 D:0
cloud_time: REQUIRED BINARY O:UTF8 R:0 D:0
error_collection_tips: REQUIRED BINARY O:UTF8 R:0 D:0
error_center_tips: REQUIRED BINARY O:UTF8 R:0 D:0
error_cloud_tips: REQUIRED BINARY O:UTF8 R:0 D:0
error_collection_time: REQUIRED BINARY O:UTF8 R:0 D:0
error_center_time: REQUIRED BINARY O:UTF8 R:0 D:0
error_cloud_time: REQUIRED BINARY O:UTF8 R:0 D:0
original_time: REQUIRED BINARY O:UTF8 R:0 D:0
is_original: REQUIRED INT32 R:0 D:0
*/

TEST_F(HdfsScannerTest, TestParqueTypeMismatchDecodeMinMax) {
SlotDesc parquet_descs[] = {{"vin", TypeDescriptor::from_primtive_type(PrimitiveType::TYPE_VARCHAR, 22)},
{"is_cloud", TypeDescriptor::from_primtive_type(PrimitiveType::TYPE_VARCHAR)},
{""}};

SlotDesc min_max_descs[] = {{"vin", TypeDescriptor::from_primtive_type(PrimitiveType::TYPE_VARCHAR, 22)}, {""}};

const std::string parquet_file = "./be/test/exec/test_data/parquet_scanner/type_mismatch_decode_min_max.parquet";

auto scanner = std::make_shared<HdfsParquetScanner>();
ObjectPool* pool = &_pool;
auto* range = _create_scan_range(parquet_file, 0, 0);
auto* tuple_desc = _create_tuple_desc(parquet_descs);
auto* param = _create_param(parquet_file, range, tuple_desc);

// select vin,is_cloud from table where is_cloud >= '0';
auto* min_max_tuple_desc = _create_tuple_desc(min_max_descs);
{
std::vector<TExprNode> nodes;
TExprNode lit_node = create_string_literal_node(TPrimitiveType::VARCHAR, "0");
push_binary_pred_texpr_node(nodes, TExprOpcode::GE, min_max_tuple_desc->slots()[0], TPrimitiveType::VARCHAR,
lit_node);
ExprContext* ctx = create_expr_context(pool, nodes);
param->min_max_conjunct_ctxs.push_back(ctx);
}

param->min_max_tuple_desc = min_max_tuple_desc;
Expr::prepare(param->min_max_conjunct_ctxs, _runtime_state);
Expr::open(param->min_max_conjunct_ctxs, _runtime_state);

Status status = scanner->init(_runtime_state, *param);
EXPECT_TRUE(status.ok());

status = scanner->open(_runtime_state);
EXPECT_TRUE(!status.ok());
scanner->close(_runtime_state);
}

} // namespace starrocks::vectorized