diff --git a/be/src/format/new_orc/orc_reader.cpp b/be/src/format/new_orc/orc_reader.cpp new file mode 100644 index 00000000000000..de02f816fee6a1 --- /dev/null +++ b/be/src/format/new_orc/orc_reader.cpp @@ -0,0 +1,536 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "format/new_orc/orc_reader.h" + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "common/cast_set.h" +#include "common/consts.h" +#include "common/exception.h" +#include "core/block/block.h" +#include "core/column/column_decimal.h" +#include "core/column/column_nullable.h" +#include "core/column/column_string.h" +#include "core/column/column_vector.h" +#include "core/data_type/data_type_date_or_datetime_v2.h" +#include "core/data_type/data_type_date_time.h" +#include "core/data_type/data_type_decimal.h" +#include "core/data_type/data_type_nullable.h" +#include "core/data_type/data_type_number.h" +#include "core/data_type/data_type_string.h" +#include "core/types.h" +#include "core/value/vdatetime_value.h" +#include "io/fs/file_reader.h" +#include "runtime/exec_env.h" +#include "util/slice.h" + +namespace doris::new_orc { +namespace { + +constexpr uint64_t DEFAULT_ORC_READ_BATCH_SIZE = 4096; +constexpr uint64_t DEFAULT_ORC_NATURAL_READ_SIZE = 128 * 1024; +constexpr int DECIMAL_PRECISION_FOR_HIVE11 = BeConsts::MAX_DECIMAL128_PRECISION; +constexpr int DECIMAL_SCALE_FOR_HIVE11 = 10; + +class DorisOrcInputStream final : public ::orc::InputStream { +public: + DorisOrcInputStream(std::string file_name, io::FileReaderSPtr file_reader, + io::IOContext* io_ctx) + : _file_name(std::move(file_name)), + _file_reader(std::move(file_reader)), + _io_ctx(io_ctx) {} + + uint64_t getLength() const override { return _file_reader->size(); } + + uint64_t getNaturalReadSize() const override { return DEFAULT_ORC_NATURAL_READ_SIZE; } + + void read(void* buf, uint64_t length, uint64_t offset) override { + uint64_t bytes_read = 0; + auto* out = static_cast(buf); + while (bytes_read < length) { + size_t loop_read = 0; + Status st = _file_reader->read_at( + static_cast(offset + bytes_read), + Slice(out + bytes_read, static_cast(length - bytes_read)), &loop_read, + _io_ctx); + if (!st.ok()) { + throw ::orc::ParseError("Failed to read " + _file_name + ": " + + st.to_string_no_stack()); + } + if (loop_read == 0) { + break; + } + bytes_read += loop_read; + } + if (bytes_read != length) { + throw ::orc::ParseError("Short read from " + _file_name); + } + } + + const std::string& getName() const override { return _file_name; } + +private: + std::string _file_name; + io::FileReaderSPtr _file_reader; + io::IOContext* _io_ctx = nullptr; +}; + +template +Status append_numeric_values(MutableColumnPtr& column, const ::orc::ColumnVectorBatch& batch, + size_t rows) { + const auto* orc_batch = dynamic_cast(&batch); + if (orc_batch == nullptr) { + return Status::InternalError("Unexpected ORC numeric batch type {}", batch.toString()); + } + auto& data = assert_cast(*column).get_data(); + const size_t old_size = data.size(); + data.resize(old_size + rows); + for (size_t row = 0; row < rows; ++row) { + data[old_size + row] = static_cast(orc_batch->data[row]); + } + return Status::OK(); +} + +template +Status append_floating_values(MutableColumnPtr& column, const ::orc::ColumnVectorBatch& batch, + size_t rows) { + const auto* orc_batch = dynamic_cast(&batch); + if (orc_batch == nullptr) { + return Status::InternalError("Unexpected ORC floating batch type {}", batch.toString()); + } + auto& data = assert_cast(*column).get_data(); + const size_t old_size = data.size(); + data.resize(old_size + rows); + for (size_t row = 0; row < rows; ++row) { + data[old_size + row] = static_cast(orc_batch->data[row]); + } + return Status::OK(); +} + +size_t trim_right_spaces(const char* value, size_t length) { + while (length > 0 && value[length - 1] == ' ') { + --length; + } + return length; +} + +Int128 to_int128(::orc::Int128 value) { + const auto high_bits = static_cast<__uint128_t>(static_cast(value.getHighBits())); + const auto low_bits = static_cast<__uint128_t>(value.getLowBits()); + return static_cast((high_bits << 64) | low_bits); +} + +} // namespace + +struct OrcReaderScanState { + std::unique_ptr<::orc::Reader> reader; + const ::orc::Type* root_type = nullptr; + ::orc::ReaderMetrics reader_metrics; + ::orc::RowReaderOptions row_reader_options; + std::unique_ptr<::orc::RowReader> row_reader; + const ::orc::Type* selected_type = nullptr; + std::unique_ptr<::orc::ColumnVectorBatch> batch; + std::vector read_columns; + std::map column_to_selected_batch_index; + bool row_reader_created = false; +}; + +OrcReader::OrcReader(std::shared_ptr& system_properties, + std::unique_ptr& file_description, + std::shared_ptr io_ctx, RuntimeProfile* profile) + : FileReader(system_properties, file_description, io_ctx, profile) {} + +OrcReader::~OrcReader() = default; + +Status OrcReader::init(RuntimeState* state) { + RETURN_IF_ERROR(reader::FileReader::init(state)); + _state = std::make_unique(); + + ::orc::ReaderOptions options; + options.setMemoryPool(*ExecEnv::GetInstance()->orc_memory_pool()); + options.setReaderMetrics(&_state->reader_metrics); + + auto input_stream = std::make_unique(_file_description->path, + _tracing_file_reader, _io_ctx.get()); + try { + _state->reader = ::orc::createReader(std::move(input_stream), options); + _state->root_type = &_state->reader->getType(); + } catch (const std::exception& e) { + return Status::InternalError("Failed to open ORC file {}: {}", _file_description->path, + e.what()); + } + return Status::OK(); +} + +DataTypePtr OrcReader::_convert_to_doris_type(const ::orc::Type& type) const { + DataTypePtr data_type; + switch (type.getKind()) { + case ::orc::TypeKind::BOOLEAN: + data_type = std::make_shared(); + break; + case ::orc::TypeKind::BYTE: + data_type = std::make_shared(); + break; + case ::orc::TypeKind::SHORT: + data_type = std::make_shared(); + break; + case ::orc::TypeKind::INT: + data_type = std::make_shared(); + break; + case ::orc::TypeKind::LONG: + data_type = std::make_shared(); + break; + case ::orc::TypeKind::FLOAT: + data_type = std::make_shared(); + break; + case ::orc::TypeKind::DOUBLE: + data_type = std::make_shared(); + break; + case ::orc::TypeKind::STRING: + case ::orc::TypeKind::BINARY: + data_type = std::make_shared(); + break; + case ::orc::TypeKind::VARCHAR: + data_type = std::make_shared(cast_set(type.getMaximumLength()), + PrimitiveType::TYPE_VARCHAR); + break; + case ::orc::TypeKind::CHAR: + data_type = std::make_shared(cast_set(type.getMaximumLength()), + PrimitiveType::TYPE_CHAR); + break; + case ::orc::TypeKind::DATE: + data_type = std::make_shared(); + break; + case ::orc::TypeKind::TIMESTAMP: + case ::orc::TypeKind::TIMESTAMP_INSTANT: + data_type = std::make_shared(6); + break; + case ::orc::TypeKind::DECIMAL: + data_type = std::make_shared>( + type.getPrecision() == 0 ? DECIMAL_PRECISION_FOR_HIVE11 + : cast_set(type.getPrecision()), + type.getPrecision() == 0 ? DECIMAL_SCALE_FOR_HIVE11 + : cast_set(type.getScale())); + break; + default: + throw doris::Exception( + Status::NotSupported("ORC type {} is not supported by new ORC reader", + static_cast(type.getKind()))); + } + return make_nullable(data_type); +} + +Status OrcReader::_fill_schema_field(const ::orc::Type& root_type, uint64_t child_idx, + reader::SchemaField* field) const { + const auto* child = root_type.getSubtype(child_idx); + DORIS_CHECK(child != nullptr); + field->id = static_cast(child_idx); + field->name = root_type.getFieldName(child_idx); + try { + field->type = _convert_to_doris_type(*child); + } catch (const doris::Exception& e) { + return e.to_status(); + } + field->children.clear(); + return Status::OK(); +} + +Status OrcReader::get_schema(std::vector* file_schema) const { + if (file_schema == nullptr) { + return Status::InvalidArgument("file_schema is null"); + } + if (_state == nullptr || _state->root_type == nullptr) { + return Status::Uninitialized("OrcReader is not open"); + } + if (_state->root_type->getKind() != ::orc::TypeKind::STRUCT) { + return Status::NotSupported("ORC reader only supports top-level struct schema"); + } + file_schema->clear(); + file_schema->reserve(_state->root_type->getSubtypeCount()); + for (uint64_t child_idx = 0; child_idx < _state->root_type->getSubtypeCount(); ++child_idx) { + reader::SchemaField field; + RETURN_IF_ERROR(_fill_schema_field(*_state->root_type, child_idx, &field)); + file_schema->push_back(std::move(field)); + } + return Status::OK(); +} + +Status OrcReader::open(std::unique_ptr& request) { + if (_state == nullptr || _state->reader == nullptr || _state->root_type == nullptr) { + return Status::Uninitialized("OrcReader is not open"); + } + RETURN_IF_ERROR(reader::FileReader::open(request)); + if (!_request->expression_filters.empty() || !_request->column_predicate_filters.empty() || + !_request->reader_expression_map.empty()) { + return Status::NotSupported( + "New ORC reader does not support file-local filters or reader expressions"); + } + if (!_request->complex_projections.empty()) { + return Status::NotSupported("New ORC reader does not support complex projections"); + } + + if (_request->column_positions.empty()) { + for (const auto file_column_id : _request->predicate_columns) { + _request->column_positions.emplace(file_column_id, file_column_id); + } + for (const auto file_column_id : _request->non_predicate_columns) { + _request->column_positions.emplace(file_column_id, file_column_id); + } + } + + _state->read_columns.clear(); + _state->read_columns.reserve(_request->predicate_columns.size() + + _request->non_predicate_columns.size()); + _state->read_columns.insert(_state->read_columns.end(), _request->predicate_columns.begin(), + _request->predicate_columns.end()); + _state->read_columns.insert(_state->read_columns.end(), _request->non_predicate_columns.begin(), + _request->non_predicate_columns.end()); + + std::sort(_state->read_columns.begin(), _state->read_columns.end()); + _state->read_columns.erase( + std::unique(_state->read_columns.begin(), _state->read_columns.end()), + _state->read_columns.end()); + + const auto num_fields = static_cast(_state->root_type->getSubtypeCount()); + std::list include_columns; + for (const auto file_column_id : _state->read_columns) { + DORIS_CHECK(file_column_id >= 0 && file_column_id < num_fields); + DORIS_CHECK(_request->column_positions.count(file_column_id) > 0); + include_columns.push_back(static_cast(file_column_id)); + } + _state->row_reader_options.include(include_columns); + _state->row_reader_options.setTimezoneName("UTC"); + _state->row_reader_options.setEnableLazyDecoding(false); + _state->row_reader_options.setUseTightNumericVector(false); + RETURN_IF_ERROR(_create_row_reader()); + _eof = _state->reader->getNumberOfRows() == 0; + return Status::OK(); +} + +Status OrcReader::_create_row_reader() { + try { + _state->row_reader = _state->reader->createRowReader(_state->row_reader_options); + _state->selected_type = &_state->row_reader->getSelectedType(); + DORIS_CHECK(_state->selected_type->getKind() == ::orc::TypeKind::STRUCT); + _state->batch = _state->row_reader->createRowBatch(DEFAULT_ORC_READ_BATCH_SIZE); + _state->column_to_selected_batch_index.clear(); + for (uint64_t selected_idx = 0; selected_idx < _state->selected_type->getSubtypeCount(); + ++selected_idx) { + const auto field_name = _state->selected_type->getFieldName(selected_idx); + for (const auto file_column_id : _state->read_columns) { + if (field_name == _state->root_type->getFieldName(file_column_id)) { + _state->column_to_selected_batch_index.emplace( + file_column_id, static_cast(selected_idx)); + break; + } + } + } + DORIS_CHECK(_state->column_to_selected_batch_index.size() == _state->read_columns.size()); + _state->row_reader_created = true; + } catch (const std::exception& e) { + return Status::InternalError("Failed to create ORC row reader: {}", e.what()); + } + return Status::OK(); +} + +Status OrcReader::_decode_column(const ::orc::Type& type, const ::orc::ColumnVectorBatch& batch, + MutableColumnPtr& column, size_t rows) const { + DORIS_CHECK(column->is_nullable()); + auto& nullable_column = assert_cast(*column); + auto nested_column = nullable_column.get_nested_column_ptr(); + auto& null_map = nullable_column.get_null_map_data(); + const size_t old_size = null_map.size(); + null_map.resize(old_size + rows); + if (batch.hasNulls) { + for (size_t row = 0; row < rows; ++row) { + null_map[old_size + row] = !batch.notNull[row]; + } + } else { + std::memset(null_map.data() + old_size, 0, rows); + } + + switch (type.getKind()) { + case ::orc::TypeKind::BOOLEAN: + return append_numeric_values(nested_column, + batch, rows); + case ::orc::TypeKind::BYTE: + return append_numeric_values(nested_column, batch, + rows); + case ::orc::TypeKind::SHORT: + return append_numeric_values(nested_column, + batch, rows); + case ::orc::TypeKind::INT: + return append_numeric_values(nested_column, + batch, rows); + case ::orc::TypeKind::LONG: + return append_numeric_values(nested_column, + batch, rows); + case ::orc::TypeKind::FLOAT: + return append_floating_values(nested_column, batch, rows); + case ::orc::TypeKind::DOUBLE: + return append_floating_values(nested_column, batch, rows); + case ::orc::TypeKind::STRING: + case ::orc::TypeKind::BINARY: + case ::orc::TypeKind::VARCHAR: + case ::orc::TypeKind::CHAR: { + const auto* orc_batch = dynamic_cast(&batch); + if (orc_batch == nullptr) { + return Status::InternalError("Unexpected ORC string batch type {}", batch.toString()); + } + auto& string_column = assert_cast(*nested_column); + static constexpr const char* EMPTY_STRING = ""; + for (size_t row = 0; row < rows; ++row) { + if (batch.hasNulls && !batch.notNull[row]) { + string_column.insert_data(EMPTY_STRING, 0); + continue; + } + auto length = static_cast(orc_batch->length[row]); + if (type.getKind() == ::orc::TypeKind::CHAR) { + length = trim_right_spaces(orc_batch->data[row], length); + } + string_column.insert_data(length == 0 ? EMPTY_STRING : orc_batch->data[row], length); + } + return Status::OK(); + } + case ::orc::TypeKind::DATE: { + const auto* orc_batch = dynamic_cast(&batch); + if (orc_batch == nullptr) { + return Status::InternalError("Unexpected ORC date batch type {}", batch.toString()); + } + auto& data = assert_cast(*nested_column).get_data(); + const size_t old_data_size = data.size(); + data.resize(old_data_size + rows); + auto& date_dict = date_day_offset_dict::get(); + for (size_t row = 0; row < rows; ++row) { + data[old_data_size + row] = date_dict[cast_set(orc_batch->data[row])]; + } + return Status::OK(); + } + case ::orc::TypeKind::TIMESTAMP: + case ::orc::TypeKind::TIMESTAMP_INSTANT: { + const auto* orc_batch = dynamic_cast(&batch); + if (orc_batch == nullptr) { + return Status::InternalError("Unexpected ORC timestamp batch type {}", + batch.toString()); + } + auto& data = assert_cast(*nested_column).get_data(); + const size_t old_data_size = data.size(); + data.resize(old_data_size + rows); + for (size_t row = 0; row < rows; ++row) { + auto& value = + reinterpret_cast&>(data[old_data_size + row]); + value.from_unixtime(orc_batch->data[row], cctz::utc_time_zone()); + value.set_microsecond(cast_set(orc_batch->nanoseconds[row] / 1000)); + } + return Status::OK(); + } + case ::orc::TypeKind::DECIMAL: { + auto& data = assert_cast(*nested_column).get_data(); + const size_t old_data_size = data.size(); + data.resize(old_data_size + rows); + if (const auto* decimal64_batch = dynamic_cast(&batch); + decimal64_batch != nullptr) { + for (size_t row = 0; row < rows; ++row) { + data[old_data_size + row] = Decimal128V3(decimal64_batch->values[row]); + } + return Status::OK(); + } + const auto* decimal128_batch = dynamic_cast(&batch); + if (decimal128_batch == nullptr) { + return Status::InternalError("Unexpected ORC decimal batch type {}", batch.toString()); + } + for (size_t row = 0; row < rows; ++row) { + data[old_data_size + row] = Decimal128V3(to_int128(decimal128_batch->values[row])); + } + return Status::OK(); + } + default: + return Status::NotSupported("ORC type {} is not supported by new ORC reader", + static_cast(type.getKind())); + } +} + +Status OrcReader::get_block(Block* file_block, size_t* rows, bool* eof) { + if (_state == nullptr || !_state->row_reader_created || _state->batch == nullptr) { + return Status::Uninitialized("OrcReader is not open"); + } + *rows = 0; + if (_eof) { + *eof = true; + return Status::OK(); + } + + bool has_next = false; + try { + has_next = _state->row_reader->next(*_state->batch); + } catch (const std::exception& e) { + return Status::InternalError("Failed to read ORC batch: {}", e.what()); + } + if (!has_next) { + _eof = true; + *eof = true; + return Status::OK(); + } + + const auto batch_rows = static_cast(_state->batch->numElements); + auto* struct_batch = dynamic_cast<::orc::StructVectorBatch*>(_state->batch.get()); + if (struct_batch == nullptr) { + return Status::InternalError("New ORC reader expects struct row batch"); + } + + for (size_t idx = 0; idx < _state->read_columns.size(); ++idx) { + const auto file_column_id = _state->read_columns[idx]; + const auto position_it = _request->column_positions.find(file_column_id); + DORIS_CHECK(position_it != _request->column_positions.end()); + const auto block_position = position_it->second; + DORIS_CHECK(block_position < file_block->columns()); + const auto* type = _state->root_type->getSubtype(static_cast(file_column_id)); + DORIS_CHECK(type != nullptr); + const auto batch_index_it = _state->column_to_selected_batch_index.find(file_column_id); + DORIS_CHECK(batch_index_it != _state->column_to_selected_batch_index.end()); + const size_t selected_batch_idx = batch_index_it->second; + DORIS_CHECK(selected_batch_idx < struct_batch->fields.size()); + auto column = file_block->get_by_position(block_position).column->assume_mutable(); + RETURN_IF_ERROR(_decode_column(*type, *struct_batch->fields[selected_batch_idx], column, + batch_rows)); + file_block->replace_by_position(block_position, std::move(column)); + } + + *rows = batch_rows; + *eof = false; + return Status::OK(); +} + +Status OrcReader::close() { + _state.reset(); + return FileReader::close(); +} + +} // namespace doris::new_orc diff --git a/be/src/format/new_orc/orc_reader.h b/be/src/format/new_orc/orc_reader.h new file mode 100644 index 00000000000000..7e43e3f05fd3f9 --- /dev/null +++ b/be/src/format/new_orc/orc_reader.h @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include + +#include "common/status.h" +#include "format/reader/file_reader.h" + +namespace doris::new_orc { + +struct OrcReaderScanState; + +struct OrcScanRequest : public reader::FileScanRequest {}; + +class OrcReader final : public reader::FileReader { +public: + OrcReader(std::shared_ptr& system_properties, + std::unique_ptr& file_description, + std::shared_ptr io_ctx, RuntimeProfile* profile); + ~OrcReader() override; + + Status init(RuntimeState* state) override; + Status get_schema(std::vector* file_schema) const override; + Status open(std::unique_ptr& request) override; + Status get_block(Block* file_block, size_t* rows, bool* eof) override; + Status close() override; + +private: + DataTypePtr _convert_to_doris_type(const ::orc::Type& type) const; + Status _fill_schema_field(const ::orc::Type& root_type, uint64_t child_idx, + reader::SchemaField* field) const; + Status _create_row_reader(); + Status _decode_column(const ::orc::Type& type, const ::orc::ColumnVectorBatch& batch, + MutableColumnPtr& column, size_t rows) const; + + std::unique_ptr _state; +}; + +} // namespace doris::new_orc diff --git a/be/src/format/reader/table_reader.cpp b/be/src/format/reader/table_reader.cpp index 8289d637d78b14..2209b13d794fad 100644 --- a/be/src/format/reader/table_reader.cpp +++ b/be/src/format/reader/table_reader.cpp @@ -30,6 +30,7 @@ #include "core/assert_cast.h" #include "exec/common/endian.h" #include "exprs/vslot_ref.h" +#include "format/new_orc/orc_reader.h" #include "format/new_parquet/parquet_reader.h" #include "format/reader/column_mapper.h" #include "format/table/deletion_vector_reader.h" @@ -203,7 +204,11 @@ Status TableReader::create_next_reader(bool* eos) { _system_properties, _current_task->data_file, _io_ctx, _scanner_profile); break; } - case FileFormat::ORC: + case FileFormat::ORC: { + _data_reader.reader = std::make_unique( + _system_properties, _current_task->data_file, _io_ctx, _scanner_profile); + break; + } case FileFormat::CSV: return Status::NotSupported("TableReader does not support file format {}", static_cast(_format)); diff --git a/be/src/format/transformer/vparquet_transformer.cpp b/be/src/format/transformer/vparquet_transformer.cpp index 8c90713ab298e8..57c2f8aaa6b8b6 100644 --- a/be/src/format/transformer/vparquet_transformer.cpp +++ b/be/src/format/transformer/vparquet_transformer.cpp @@ -208,7 +208,7 @@ Status VParquetTransformer::_parse_properties() { //build arrow writer properties ::parquet::ArrowWriterProperties::Builder arrow_builder; if (_parquet_options.enable_int96_timestamps) { - arrow_builder.enable_force_write_int96_timestamps(); + arrow_builder.enable_deprecated_int96_timestamps(); } arrow_builder.store_schema(); _arrow_properties = arrow_builder.build(); diff --git a/be/test/format/new_orc/orc_reader_test.cpp b/be/test/format/new_orc/orc_reader_test.cpp new file mode 100644 index 00000000000000..96d6ef2785d390 --- /dev/null +++ b/be/test/format/new_orc/orc_reader_test.cpp @@ -0,0 +1,197 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "format/new_orc/orc_reader.h" + +#include + +#include +#include +#include +#include +#include +#include + +#include "core/assert_cast.h" +#include "core/block/block.h" +#include "core/column/column_nullable.h" +#include "core/column/column_string.h" +#include "core/column/column_vector.h" +#include "core/data_type/data_type_nullable.h" +#include "core/data_type/primitive_type.h" +#include "format/orc/orc_memory_stream_test.h" +#include "format/reader/file_reader.h" +#include "gen_cpp/Types_types.h" +#include "io/io_common.h" +#include "runtime/runtime_state.h" + +namespace doris { +namespace { + +constexpr int64_t ROW_COUNT = 5; + +void write_orc_file(const std::string& file_path) { + auto type = std::unique_ptr<::orc::Type>( + ::orc::Type::buildTypeFromString("struct")); + + MemoryOutputStream memory_stream(1024 * 1024); + ::orc::WriterOptions options; + options.setCompression(::orc::CompressionKind_NONE); + options.setMemoryPool(::orc::getDefaultPool()); + auto writer = ::orc::createWriter(*type, &memory_stream, options); + auto batch = writer->createRowBatch(ROW_COUNT); + auto& struct_batch = dynamic_cast<::orc::StructVectorBatch&>(*batch); + auto& id_batch = dynamic_cast<::orc::LongVectorBatch&>(*struct_batch.fields[0]); + auto& value_batch = dynamic_cast<::orc::StringVectorBatch&>(*struct_batch.fields[1]); + + std::vector values {"one", "two", "three", "four", "five"}; + for (int64_t row = 0; row < ROW_COUNT; ++row) { + id_batch.data[row] = row + 1; + value_batch.data[row] = values[row].data(); + value_batch.length[row] = static_cast(values[row].size()); + } + struct_batch.numElements = ROW_COUNT; + id_batch.numElements = ROW_COUNT; + value_batch.numElements = ROW_COUNT; + + writer->add(*batch); + writer->close(); + + std::ofstream out(file_path, std::ios::binary); + out.write(memory_stream.getData(), static_cast(memory_stream.getLength())); +} + +Block build_file_block(const std::vector& schema) { + Block block; + for (const auto& field : schema) { + block.insert({field.type->create_column(), field.type, field.name}); + } + return block; +} + +class NewOrcReaderTest : public testing::Test { +protected: + void SetUp() override { + _test_dir = std::filesystem::temp_directory_path() / "doris_new_orc_reader_test"; + std::filesystem::remove_all(_test_dir); + std::filesystem::create_directories(_test_dir); + _file_path = (_test_dir / "reader.orc").string(); + write_orc_file(_file_path); + } + + void TearDown() override { std::filesystem::remove_all(_test_dir); } + + std::unique_ptr create_reader() const { + auto system_properties = std::make_shared(); + system_properties->system_type = TFileType::FILE_LOCAL; + auto file_description = std::make_unique(); + file_description->path = _file_path; + file_description->file_size = static_cast(std::filesystem::file_size(_file_path)); + return std::make_unique(system_properties, file_description, nullptr, + nullptr); + } + + std::filesystem::path _test_dir; + std::string _file_path; +}; + +TEST_F(NewOrcReaderTest, GetSchemaReturnsFileLocalColumns) { + auto reader = create_reader(); + RuntimeState state {TQueryOptions(), TQueryGlobals()}; + ASSERT_TRUE(reader->init(&state).ok()); + + std::vector schema; + ASSERT_TRUE(reader->get_schema(&schema).ok()); + ASSERT_EQ(schema.size(), 2); + EXPECT_EQ(schema[0].id, 0); + EXPECT_EQ(schema[0].name, "id"); + EXPECT_EQ(schema[0].type->get_primitive_type(), TYPE_INT); + EXPECT_TRUE(schema[0].type->is_nullable()); + EXPECT_EQ(schema[1].id, 1); + EXPECT_EQ(schema[1].name, "value"); + EXPECT_EQ(schema[1].type->get_primitive_type(), TYPE_STRING); + EXPECT_TRUE(schema[1].type->is_nullable()); +} + +TEST_F(NewOrcReaderTest, ReadFileLocalColumnsThenEof) { + auto reader = create_reader(); + RuntimeState state {TQueryOptions(), TQueryGlobals()}; + ASSERT_TRUE(reader->init(&state).ok()); + + std::vector schema; + ASSERT_TRUE(reader->get_schema(&schema).ok()); + Block block = build_file_block(schema); + + auto request = std::make_unique(); + request->non_predicate_columns = {0, 1}; + ASSERT_TRUE(reader->open(request).ok()); + + size_t rows = 0; + bool eof = false; + ASSERT_TRUE(reader->get_block(&block, &rows, &eof).ok()); + EXPECT_FALSE(eof); + ASSERT_EQ(rows, ROW_COUNT); + + const auto& ids_nullable = assert_cast(*block.get_by_position(0).column); + const auto& ids = assert_cast(ids_nullable.get_nested_column()); + const auto& values_nullable = + assert_cast(*block.get_by_position(1).column); + const auto& values = assert_cast(values_nullable.get_nested_column()); + ASSERT_EQ(ids.size(), ROW_COUNT); + ASSERT_EQ(values.size(), ROW_COUNT); + EXPECT_EQ(ids.get_element(0), 1); + EXPECT_EQ(ids.get_element(4), 5); + EXPECT_EQ(values.get_data_at(0).to_string(), "one"); + EXPECT_EQ(values.get_data_at(4).to_string(), "five"); + + rows = 0; + eof = false; + ASSERT_TRUE(reader->get_block(&block, &rows, &eof).ok()); + EXPECT_TRUE(eof); + EXPECT_EQ(rows, 0); +} + +TEST_F(NewOrcReaderTest, ReadsProjectedColumnIntoRequestedBlockPosition) { + auto reader = create_reader(); + RuntimeState state {TQueryOptions(), TQueryGlobals()}; + ASSERT_TRUE(reader->init(&state).ok()); + + std::vector schema; + ASSERT_TRUE(reader->get_schema(&schema).ok()); + Block block; + block.insert({schema[1].type->create_column(), schema[1].type, schema[1].name}); + + auto request = std::make_unique(); + request->non_predicate_columns = {1}; + request->column_positions.emplace(1, 0); + ASSERT_TRUE(reader->open(request).ok()); + + size_t rows = 0; + bool eof = false; + ASSERT_TRUE(reader->get_block(&block, &rows, &eof).ok()); + EXPECT_FALSE(eof); + ASSERT_EQ(rows, ROW_COUNT); + + const auto& values_nullable = + assert_cast(*block.get_by_position(0).column); + const auto& values = assert_cast(values_nullable.get_nested_column()); + EXPECT_EQ(values.get_data_at(0).to_string(), "one"); + EXPECT_EQ(values.get_data_at(4).to_string(), "five"); +} + +} // namespace +} // namespace doris