diff --git a/be/src/connector/hive_connector.cpp b/be/src/connector/hive_connector.cpp index 954a7dc2dd0c0..75342078c4120 100644 --- a/be/src/connector/hive_connector.cpp +++ b/be/src/connector/hive_connector.cpp @@ -77,6 +77,7 @@ Status HiveDataSource::_init_conjunct_ctxs(RuntimeState* state) { RETURN_IF_ERROR(Expr::create_expr_trees(&_pool, hdfs_scan_node.partition_conjuncts, &_partition_conjunct_ctxs)); _has_partition_conjuncts = true; } + RETURN_IF_ERROR(Expr::prepare(_min_max_conjunct_ctxs, state)); RETURN_IF_ERROR(Expr::prepare(_partition_conjunct_ctxs, state)); RETURN_IF_ERROR(Expr::open(_min_max_conjunct_ctxs, state)); @@ -261,7 +262,11 @@ Status HiveDataSource::_init_scanner(RuntimeState* state) { return Status::NotSupported(msg); } RETURN_IF_ERROR(scanner->init(state, scanner_params)); - RETURN_IF_ERROR(scanner->open(state)); + Status st = scanner->open(state); + if (!st.ok()) { + auto msg = fmt::format("file = {}", native_file_path); + return st.clone_and_append(msg); + } _scanner = scanner; return Status::OK(); } diff --git a/be/src/exec/vectorized/hdfs_scanner.cpp b/be/src/exec/vectorized/hdfs_scanner.cpp index 8711a4c8ecc53..7b646989ff658 100644 --- a/be/src/exec/vectorized/hdfs_scanner.cpp +++ b/be/src/exec/vectorized/hdfs_scanner.cpp @@ -66,9 +66,8 @@ Status HdfsScanner::init(RuntimeState* runtime_state, const HdfsScannerParams& s } } - // min/max conjuncts. - RETURN_IF_ERROR( - Expr::clone_if_not_exists(_scanner_params.min_max_conjunct_ctxs, runtime_state, &_min_max_conjunct_ctxs)); + // No need to clone. It's cloned from outside. + _min_max_conjunct_ctxs = _scanner_params.min_max_conjunct_ctxs; Status status = do_init(runtime_state, scanner_params); return status; @@ -167,10 +166,10 @@ void HdfsScanner::close(RuntimeState* runtime_state) noexcept { if (!_closed.compare_exchange_strong(expect, true)) return; update_counter(); Expr::close(_conjunct_ctxs, runtime_state); - Expr::close(_min_max_conjunct_ctxs, runtime_state); for (auto& it : _conjunct_ctxs_by_slot) { Expr::close(it.second, runtime_state); } + do_close(runtime_state); _file.reset(nullptr); _raw_file.reset(nullptr); @@ -179,7 +178,7 @@ void HdfsScanner::close(RuntimeState* runtime_state) noexcept { } } -void HdfsScanner::fianlize() { +void HdfsScanner::finalize() { if (_runtime_state != nullptr) { close(_runtime_state); } diff --git a/be/src/exec/vectorized/hdfs_scanner.h b/be/src/exec/vectorized/hdfs_scanner.h index 1446d9faa6d51..cbe7f6b8b11b0 100644 --- a/be/src/exec/vectorized/hdfs_scanner.h +++ b/be/src/exec/vectorized/hdfs_scanner.h @@ -188,7 +188,7 @@ class HdfsScanner { void close(RuntimeState* runtime_state) noexcept; Status get_next(RuntimeState* runtime_state, ChunkPtr* chunk); Status init(RuntimeState* runtime_state, const HdfsScannerParams& scanner_params); - void fianlize(); + void finalize(); int64_t num_bytes_read() const { return _stats.bytes_read; } int64_t raw_rows_read() const { return _stats.raw_rows_read; } diff --git a/be/src/exec/vectorized/hdfs_scanner_orc.h b/be/src/exec/vectorized/hdfs_scanner_orc.h index 630b1bb92b720..6e1f1537a58ad 100644 --- a/be/src/exec/vectorized/hdfs_scanner_orc.h +++ b/be/src/exec/vectorized/hdfs_scanner_orc.h @@ -14,7 +14,7 @@ class OrcRowReaderFilter; class HdfsOrcScanner final : public HdfsScanner { public: HdfsOrcScanner() = default; - ~HdfsOrcScanner() override { fianlize(); } + ~HdfsOrcScanner() override = default; Status do_open(RuntimeState* runtime_state) override; void do_close(RuntimeState* runtime_state) noexcept override; diff --git a/be/src/exec/vectorized/hdfs_scanner_parquet.h b/be/src/exec/vectorized/hdfs_scanner_parquet.h index f9f7ddd12eed4..a1be58058fa44 100644 --- a/be/src/exec/vectorized/hdfs_scanner_parquet.h +++ b/be/src/exec/vectorized/hdfs_scanner_parquet.h @@ -9,7 +9,7 @@ namespace starrocks::vectorized { class HdfsParquetScanner final : public HdfsScanner { public: HdfsParquetScanner() = default; - ~HdfsParquetScanner() override { fianlize(); } + ~HdfsParquetScanner() override = default; Status do_open(RuntimeState* runtime_state) override; void do_close(RuntimeState* runtime_state) noexcept override; diff --git a/be/src/exec/vectorized/hdfs_scanner_text.h b/be/src/exec/vectorized/hdfs_scanner_text.h index ed5503e7a78fc..456f9795b8c26 100644 --- a/be/src/exec/vectorized/hdfs_scanner_text.h +++ b/be/src/exec/vectorized/hdfs_scanner_text.h @@ -11,7 +11,7 @@ namespace starrocks::vectorized { class HdfsTextScanner final : public HdfsScanner { public: HdfsTextScanner() = default; - ~HdfsTextScanner() override { fianlize(); } + ~HdfsTextScanner() override = default; Status do_open(RuntimeState* runtime_state) override; void do_close(RuntimeState* runtime_state) noexcept override; diff --git a/be/src/exprs/expr.cpp b/be/src/exprs/expr.cpp index 1ed1f4d154ad8..8dd3b85202db7 100644 --- a/be/src/exprs/expr.cpp +++ b/be/src/exprs/expr.cpp @@ -218,11 +218,6 @@ Status Expr::create_tree_from_thrift(ObjectPool* pool, const std::vectoradd_child(expr); - } else { - DCHECK(root_expr != nullptr); - DCHECK(ctx != nullptr); - *root_expr = expr; - *ctx = pool->add(new ExprContext(expr)); } for (int i = 0; i < num_children; i++) { *node_idx += 1; @@ -233,6 +228,12 @@ Status Expr::create_tree_from_thrift(ObjectPool* pool, const std::vectoradd(new ExprContext(expr)); + } return Status::OK(); } diff --git a/be/src/formats/parquet/file_reader.cpp b/be/src/formats/parquet/file_reader.cpp index 71283d4559729..c44eb32296a1b 100644 --- a/be/src/formats/parquet/file_reader.cpp +++ b/be/src/formats/parquet/file_reader.cpp @@ -168,9 +168,11 @@ Status FileReader::_read_min_max_chunk(const tparquet::RowGroup& row_group, vect column_order = column_idx < column_orders.size() ? &column_orders[column_idx] : nullptr; } - Status status = _decode_min_max_column(*field, ctx.timezone, slot->type(), *column_meta, column_order, - &(*min_chunk)->columns()[i], &(*max_chunk)->columns()[i]); - if (!status.ok()) { + bool decode_ok = false; + RETURN_IF_ERROR(_decode_min_max_column(*field, ctx.timezone, slot->type(), *column_meta, column_order, + &(*min_chunk)->columns()[i], &(*max_chunk)->columns()[i], + &decode_ok)); + if (!decode_ok) { *exist = false; return Status::OK(); } @@ -193,9 +195,12 @@ int FileReader::_get_partition_column_idx(const std::string& col_name) const { Status FileReader::_decode_min_max_column(const ParquetField& field, const std::string& timezone, const TypeDescriptor& type, const tparquet::ColumnMetaData& column_meta, const tparquet::ColumnOrder* column_order, vectorized::ColumnPtr* min_column, - vectorized::ColumnPtr* max_column) { + vectorized::ColumnPtr* max_column, bool* decode_ok) { + *decode_ok = true; + if (!_can_use_min_max_stats(column_meta, column_order)) { - return Status::NotSupported("min max statistics not supported"); + *decode_ok = false; + return Status::OK(); } switch (column_meta.type) { @@ -210,7 +215,7 @@ Status FileReader::_decode_min_max_column(const ParquetField& field, const std:: RETURN_IF_ERROR(PlainDecoder::decode(column_meta.statistics.max, &max_value)); } std::unique_ptr converter; - ColumnConverterFactory::create_converter(field, type, timezone, &converter); + RETURN_IF_ERROR(ColumnConverterFactory::create_converter(field, type, timezone, &converter)); if (!converter->need_convert) { (*min_column)->append_numbers(&min_value, sizeof(int32_t)); @@ -224,7 +229,7 @@ Status FileReader::_decode_min_max_column(const ParquetField& field, const std:: max_scr_column->append_numbers(&max_value, sizeof(int32_t)); converter->convert(max_scr_column, max_column->get()); } - return Status::OK(); + break; } case tparquet::Type::type::INT64: { int64_t min_value = 0; @@ -237,7 +242,7 @@ Status FileReader::_decode_min_max_column(const ParquetField& field, const std:: RETURN_IF_ERROR(PlainDecoder::decode(column_meta.statistics.min, &min_value)); } std::unique_ptr converter; - ColumnConverterFactory::create_converter(field, type, timezone, &converter); + RETURN_IF_ERROR(ColumnConverterFactory::create_converter(field, type, timezone, &converter)); if (!converter->need_convert) { (*min_column)->append_numbers(&min_value, sizeof(int64_t)); @@ -251,7 +256,7 @@ Status FileReader::_decode_min_max_column(const ParquetField& field, const std:: max_scr_column->append_numbers(&max_value, sizeof(int64_t)); converter->convert(max_scr_column, max_column->get()); } - return Status::OK(); + break; } case tparquet::Type::type::BYTE_ARRAY: { Slice min_slice; @@ -264,7 +269,7 @@ Status FileReader::_decode_min_max_column(const ParquetField& field, const std:: RETURN_IF_ERROR(PlainDecoder::decode(column_meta.statistics.max, &max_slice)); } std::unique_ptr converter; - ColumnConverterFactory::create_converter(field, type, timezone, &converter); + RETURN_IF_ERROR(ColumnConverterFactory::create_converter(field, type, timezone, &converter)); if (!converter->need_convert) { (*min_column)->append_strings(std::vector{min_slice}); @@ -278,11 +283,12 @@ Status FileReader::_decode_min_max_column(const ParquetField& field, const std:: max_scr_column->append_strings(std::vector{max_slice}); converter->convert(max_scr_column, max_column->get()); } - return Status::OK(); + break; } default: - return Status::NotSupported("min max statistics not supported"); + *decode_ok = false; } + return Status::OK(); } bool FileReader::_can_use_min_max_stats(const tparquet::ColumnMetaData& column_meta, diff --git a/be/src/formats/parquet/file_reader.h b/be/src/formats/parquet/file_reader.h index 27ba3b8394aa5..6004721f94d23 100644 --- a/be/src/formats/parquet/file_reader.h +++ b/be/src/formats/parquet/file_reader.h @@ -75,7 +75,7 @@ class FileReader { static Status _decode_min_max_column(const ParquetField& field, const std::string& timezone, const TypeDescriptor& type, const tparquet::ColumnMetaData& column_meta, const tparquet::ColumnOrder* column_order, vectorized::ColumnPtr* min_column, - vectorized::ColumnPtr* max_column); + vectorized::ColumnPtr* max_column, bool* decode_ok); static bool _can_use_min_max_stats(const tparquet::ColumnMetaData& column_meta, const tparquet::ColumnOrder* column_order); // statistics.min_value max_value diff --git a/be/test/exec/test_data/parquet_scanner/type_mismatch_decode_min_max.parquet b/be/test/exec/test_data/parquet_scanner/type_mismatch_decode_min_max.parquet new file mode 100644 index 0000000000000..8d9a8aba1dacd Binary files /dev/null and b/be/test/exec/test_data/parquet_scanner/type_mismatch_decode_min_max.parquet differ diff --git a/be/test/exec/vectorized/hdfs_scanner_test.cpp b/be/test/exec/vectorized/hdfs_scanner_test.cpp index d0310ae97ae07..0ae8788fa3e03 100644 --- a/be/test/exec/vectorized/hdfs_scanner_test.cpp +++ b/be/test/exec/vectorized/hdfs_scanner_test.cpp @@ -422,16 +422,11 @@ TEST_F(HdfsScannerTest, TestOrcGetNextWithMinMaxFilterNoRows) { // id min/max = 2629/5212, PART_Y min/max=20/20 std::vector thres = {20, 30, 20, 20}; extend_mtypes_orc_min_max_conjuncts(&_pool, param, thres); - for (ExprContext* ctx : param->min_max_conjunct_ctxs) { - ctx->prepare(_runtime_state); - ctx->open(_runtime_state); - } + Expr::prepare(param->min_max_conjunct_ctxs, _runtime_state); + Expr::open(param->min_max_conjunct_ctxs, _runtime_state); Status status = scanner->init(_runtime_state, *param); EXPECT_TRUE(status.ok()); - for (ExprContext* ctx : param->min_max_conjunct_ctxs) { - ctx->close(_runtime_state); - } status = scanner->open(_runtime_state); EXPECT_TRUE(status.ok()); @@ -455,16 +450,11 @@ TEST_F(HdfsScannerTest, TestOrcGetNextWithMinMaxFilterRows1) { // id min/max = 2629/5212, PART_Y min/max=20/20 std::vector thres = {2000, 5000, 20, 20}; extend_mtypes_orc_min_max_conjuncts(&_pool, param, thres); - for (ExprContext* ctx : param->min_max_conjunct_ctxs) { - ctx->prepare(_runtime_state); - ctx->open(_runtime_state); - } + Expr::prepare(param->min_max_conjunct_ctxs, _runtime_state); + Expr::open(param->min_max_conjunct_ctxs, _runtime_state); Status status = scanner->init(_runtime_state, *param); EXPECT_TRUE(status.ok()); - for (ExprContext* ctx : param->min_max_conjunct_ctxs) { - ctx->close(_runtime_state); - } status = scanner->open(_runtime_state); EXPECT_TRUE(status.ok()); @@ -488,16 +478,11 @@ TEST_F(HdfsScannerTest, TestOrcGetNextWithMinMaxFilterRows2) { // id min/max = 2629/5212, PART_Y min/max=20/20 std::vector thres = {3000, 10000, 20, 20}; extend_mtypes_orc_min_max_conjuncts(&_pool, param, thres); - for (ExprContext* ctx : param->min_max_conjunct_ctxs) { - ctx->prepare(_runtime_state); - ctx->open(_runtime_state); - } + Expr::prepare(param->min_max_conjunct_ctxs, _runtime_state); + Expr::open(param->min_max_conjunct_ctxs, _runtime_state); Status status = scanner->init(_runtime_state, *param); EXPECT_TRUE(status.ok()); - for (ExprContext* ctx : param->min_max_conjunct_ctxs) { - ctx->close(_runtime_state); - } status = scanner->open(_runtime_state); EXPECT_TRUE(status.ok()); @@ -671,16 +656,11 @@ TEST_F(HdfsScannerTest, TestOrcGetNextWithDatetimeMinMaxFilter) { param->min_max_conjunct_ctxs.push_back(ctx); } - for (ExprContext* ctx : param->min_max_conjunct_ctxs) { - ctx->prepare(_runtime_state); - ctx->open(_runtime_state); - } + Expr::prepare(param->min_max_conjunct_ctxs, _runtime_state); + Expr::open(param->min_max_conjunct_ctxs, _runtime_state); Status status = scanner->init(_runtime_state, *param); EXPECT_TRUE(status.ok()); - for (ExprContext* ctx : param->min_max_conjunct_ctxs) { - ctx->close(_runtime_state); - } scanner->disable_use_orc_sargs(); status = scanner->open(_runtime_state); @@ -904,19 +884,13 @@ TEST_F(HdfsScannerTest, DecodeMinMaxDateTime) { param->min_max_conjunct_ctxs.push_back(ctx); } - for (ExprContext* ctx : param->min_max_conjunct_ctxs) { - ctx->prepare(_runtime_state); - ctx->open(_runtime_state); - } + Expr::prepare(param->min_max_conjunct_ctxs, _runtime_state); + Expr::open(param->min_max_conjunct_ctxs, _runtime_state); auto scanner = std::make_shared(); Status status = scanner->init(_runtime_state, *param); EXPECT_TRUE(status.ok()); - for (ExprContext* ctx : param->min_max_conjunct_ctxs) { - ctx->close(_runtime_state); - } - scanner->disable_use_orc_sargs(); status = scanner->open(_runtime_state); EXPECT_TRUE(status.ok()) << status.to_string(); @@ -1074,4 +1048,72 @@ TEST_F(HdfsScannerTest, TestParquetCoalesceReadAcrossRowGroup) { scanner->close(_runtime_state); } + +// ============================================================================= + +/* +file: file:/Users/dirlt/Downloads/part-00000-4a878ed5-fa12-4e43-a164-1650976be336-c000.snappy.parquet +creator: parquet-mr version 1.10.1 (build a89df8f9932b6ef6633d06069e50c9b7970bebd1) +extra: org.apache.spark.version = 2.4.7 +extra: org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields":[{"name":"vin","type":"string","nullable":true,"metadata":{}},{"name":"log_domain","type":"string","nullable":true,"metadata":{}},{"name":"file_name","type":"string","nullable":true,"metadata":{}},{"name":"is_collection","type":"integer","nullable":false,"metadata":{}},{"name":"is_center","type":"integer","nullable":false,"metadata":{}},{"name":"is_cloud","type":"integer","nullable":false,"metadata":{}},{"name":"collection_time","type":"string","nullable":false,"metadata":{}},{"name":"center_time","type":"string","nullable":false,"metadata":{}},{"name":"cloud_time","type":"string","nullable":false,"metadata":{}},{"name":"error_collection_tips","type":"string","nullable":false,"metadata":{}},{"name":"error_center_tips","type":"string","nullable":false,"metadata":{}},{"name":"error_cloud_tips","type":"string","nullable":false,"metadata":{}},{"name":"error_collection_time","type":"string","nullable":false,"metadata":{}},{"name":"error_center_time","type":"string","nullable":false,"metadata":{}},{"name":"error_cloud_time","type":"string","nullable":false,"metadata":{}},{"name":"original_time","type":"string","nullable":false,"metadata":{}},{"name":"is_original","type":"integer","nullable":false,"metadata":{}}]} + +file schema: spark_schema +-------------------------------------------------------------------------------- +vin: OPTIONAL BINARY O:UTF8 R:0 D:1 +log_domain: OPTIONAL BINARY O:UTF8 R:0 D:1 +file_name: OPTIONAL BINARY O:UTF8 R:0 D:1 +is_collection: REQUIRED INT32 R:0 D:0 +is_center: REQUIRED INT32 R:0 D:0 +is_cloud: REQUIRED INT32 R:0 D:0 +collection_time: REQUIRED BINARY O:UTF8 R:0 D:0 +center_time: REQUIRED BINARY O:UTF8 R:0 D:0 +cloud_time: REQUIRED BINARY O:UTF8 R:0 D:0 +error_collection_tips: REQUIRED BINARY O:UTF8 R:0 D:0 +error_center_tips: REQUIRED BINARY O:UTF8 R:0 D:0 +error_cloud_tips: REQUIRED BINARY O:UTF8 R:0 D:0 +error_collection_time: REQUIRED BINARY O:UTF8 R:0 D:0 +error_center_time: REQUIRED BINARY O:UTF8 R:0 D:0 +error_cloud_time: REQUIRED BINARY O:UTF8 R:0 D:0 +original_time: REQUIRED BINARY O:UTF8 R:0 D:0 +is_original: REQUIRED INT32 R:0 D:0 +*/ + +TEST_F(HdfsScannerTest, TestParqueTypeMismatchDecodeMinMax) { + SlotDesc parquet_descs[] = {{"vin", TypeDescriptor::from_primtive_type(PrimitiveType::TYPE_VARCHAR, 22)}, + {"is_cloud", TypeDescriptor::from_primtive_type(PrimitiveType::TYPE_VARCHAR)}, + {""}}; + + SlotDesc min_max_descs[] = {{"vin", TypeDescriptor::from_primtive_type(PrimitiveType::TYPE_VARCHAR, 22)}, {""}}; + + const std::string parquet_file = "./be/test/exec/test_data/parquet_scanner/type_mismatch_decode_min_max.parquet"; + + auto scanner = std::make_shared(); + ObjectPool* pool = &_pool; + auto* range = _create_scan_range(parquet_file, 0, 0); + auto* tuple_desc = _create_tuple_desc(parquet_descs); + auto* param = _create_param(parquet_file, range, tuple_desc); + + // select vin,is_cloud from table where is_cloud >= '0'; + auto* min_max_tuple_desc = _create_tuple_desc(min_max_descs); + { + std::vector nodes; + TExprNode lit_node = create_string_literal_node(TPrimitiveType::VARCHAR, "0"); + push_binary_pred_texpr_node(nodes, TExprOpcode::GE, min_max_tuple_desc->slots()[0], TPrimitiveType::VARCHAR, + lit_node); + ExprContext* ctx = create_expr_context(pool, nodes); + param->min_max_conjunct_ctxs.push_back(ctx); + } + + param->min_max_tuple_desc = min_max_tuple_desc; + Expr::prepare(param->min_max_conjunct_ctxs, _runtime_state); + Expr::open(param->min_max_conjunct_ctxs, _runtime_state); + + Status status = scanner->init(_runtime_state, *param); + EXPECT_TRUE(status.ok()); + + status = scanner->open(_runtime_state); + EXPECT_TRUE(!status.ok()); + scanner->close(_runtime_state); +} + } // namespace starrocks::vectorized