From 2f0361b23ebf379b00f6d5dabc7fc1e35f848e98 Mon Sep 17 00:00:00 2001 From: "wangxiangyu@360shuke.com" Date: Fri, 23 Sep 2022 11:52:16 +0800 Subject: [PATCH] [enhancement](load) enhance load from orc file --- be/src/exec/orc_scanner.cpp | 76 ++++++++++++++++++++++++++++++------- be/src/exec/orc_scanner.h | 7 ++++ 2 files changed, 69 insertions(+), 14 deletions(-) diff --git a/be/src/exec/orc_scanner.cpp b/be/src/exec/orc_scanner.cpp index 6b3384f042b38c..b8d2e91c4d8521 100644 --- a/be/src/exec/orc_scanner.cpp +++ b/be/src/exec/orc_scanner.cpp @@ -139,18 +139,6 @@ ORCScanner::~ORCScanner() { Status ORCScanner::open() { RETURN_IF_ERROR(BaseScanner::open()); - if (!_ranges.empty()) { - std::list include_cols; - TBrokerRangeDesc range = _ranges[0]; - _num_of_columns_from_file = range.__isset.num_of_columns_from_file - ? range.num_of_columns_from_file - : _src_slot_descs.size(); - for (int i = 0; i < _num_of_columns_from_file; i++) { - auto slot_desc = _src_slot_descs.at(i); - include_cols.push_back(slot_desc->col_name()); - } - _row_reader_options.include(include_cols); - } return Status::OK(); } @@ -186,8 +174,13 @@ Status ORCScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bool* ((orc::StructVectorBatch*)_batch.get())->fields; for (int column_ipos = 0; column_ipos < _num_of_columns_from_file; ++column_ipos) { auto slot_desc = _src_slot_descs[column_ipos]; - orc::ColumnVectorBatch* cvb = batch_vec[_position_in_orc_original[column_ipos]]; + if (_map_column_to_id.find(slot_desc->col_name()) == _map_column_to_id.end()) { + // if slot not exist in file, set to null + _src_tuple->set_null(slot_desc->null_indicator_offset()); + continue; + } + orc::ColumnVectorBatch* cvb = batch_vec[_position_in_orc_original[column_ipos]]; if (cvb->hasNulls && !cvb->notNull[_current_line_of_group]) { if (!slot_desc->is_nullable()) { std::stringstream str_error; @@ -446,6 +439,24 @@ Status ORCScanner::open_next_reader() { if (_reader->getNumberOfRows() == 0) { continue; } + // build map from column name to type id + build_name_id_map(); + // set include names into read options + std::map _include_cols_in_src_slots; + std::list cols; + _num_of_columns_from_file = range.__isset.num_of_columns_from_file + ? range.num_of_columns_from_file + : _src_slot_descs.size(); + for (int i = 0; i < _num_of_columns_from_file; i++) { + auto slot_desc = _src_slot_descs.at(i); + + // get only columns exist orc file + if (_map_column_to_id.find(slot_desc->col_name()) != _map_column_to_id.end()) { + _include_cols_in_src_slots[cols.size()] = i; + cols.push_back(slot_desc->col_name()); + } + } + _row_reader_options.include(cols); _total_groups = _reader->getNumberOfStripes(); _current_group = 0; @@ -462,7 +473,9 @@ Status ORCScanner::open_next_reader() { //include columns must in reader field, otherwise createRowReader will throw exception auto pos = std::find(include_cols.begin(), include_cols.end(), _row_reader->getSelectedType().getFieldName(i)); - _position_in_orc_original.at(std::distance(include_cols.begin(), pos)) = orc_index++; + _position_in_orc_original.at( + _include_cols_in_src_slots[std::distance(include_cols.begin(), pos)]) = + orc_index++; } return Status::OK(); } @@ -475,4 +488,39 @@ void ORCScanner::close() { _row_reader.reset(nullptr); } +void ORCScanner::build_name_id_map() { + _map_column_to_id.clear(); + std::vector columns; + const orc::Type& type = _reader->getType(); + build_name_id_map_impl(columns, &type); +} + +void ORCScanner::build_name_id_map_impl(std::vector& columns, const orc::Type* type) { + if (orc::STRUCT == type->getKind()) { + for (size_t i = 0; i < type->getSubtypeCount(); ++i) { + const std::string& fieldName = type->getFieldName(i); + columns.push_back(fieldName); + _map_column_to_id[dot_column_path(columns)] = type->getSubtype(i)->getColumnId(); + build_name_id_map_impl(columns, type->getSubtype(i)); + columns.pop_back(); + } + } else { + // other non-primitive type + for (size_t j = 0; j < type->getSubtypeCount(); ++j) { + build_name_id_map_impl(columns, type->getSubtype(j)); + } + } +} + +std::string ORCScanner::dot_column_path(const std::vector& columns) { + if (columns.empty()) { + return std::string(); + } + std::ostringstream columnStream; + std::copy(columns.begin(), columns.end(), + std::ostream_iterator(columnStream, ".")); + std::string columnPath = columnStream.str(); + return columnPath.substr(0, columnPath.length() - 1); +} + } // namespace doris diff --git a/be/src/exec/orc_scanner.h b/be/src/exec/orc_scanner.h index 86b73b6b99377e..ee2cd3b36d11ca 100644 --- a/be/src/exec/orc_scanner.h +++ b/be/src/exec/orc_scanner.h @@ -19,6 +19,7 @@ #define ORC_SCANNER_H #include +#include #include "exec/base_scanner.h" @@ -47,6 +48,11 @@ class ORCScanner : public BaseScanner { private: // Read next buffer from reader Status open_next_reader(); + // Generate column path + std::string dot_column_path(const std::vector& columns); + // Build map from column name to type id + void build_name_id_map(); + void build_name_id_map_impl(std::vector& columns, const orc::Type* type); private: const std::vector& _ranges; @@ -62,6 +68,7 @@ class ORCScanner : public BaseScanner { std::shared_ptr _batch; std::unique_ptr _reader; std::unique_ptr _row_reader; + std::map _map_column_to_id; // The batch after reading from orc data is arranged in the original order, // so we need to record the index in the original order to correspond the column names to the order std::vector _position_in_orc_original;