diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 5736c557bd0c7..45210d68a334a 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -507,7 +507,11 @@ if(ARROW_JSON) endif() if(ARROW_ORC) - list(APPEND ARROW_SRCS adapters/orc/adapter.cc adapters/orc/adapter_util.cc) + list(APPEND + ARROW_SRCS + adapters/orc/adapter.cc + adapters/orc/adapter_options.cc + adapters/orc/adapter_util.cc) endif() if(CXX_LINKER_SUPPORTS_VERSION_SCRIPT) diff --git a/cpp/src/arrow/adapters/orc/CMakeLists.txt b/cpp/src/arrow/adapters/orc/CMakeLists.txt index ca901b07dfd69..d68a8afb4c1f2 100644 --- a/cpp/src/arrow/adapters/orc/CMakeLists.txt +++ b/cpp/src/arrow/adapters/orc/CMakeLists.txt @@ -20,7 +20,8 @@ # # Headers: top level -install(FILES adapter.h DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/arrow/adapters/orc") +install(FILES adapter.h adapter_options.h + DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/arrow/adapters/orc") # pkg-config support arrow_add_pkg_config("arrow-orc") diff --git a/cpp/src/arrow/adapters/orc/adapter.cc b/cpp/src/arrow/adapters/orc/adapter.cc index 9bb4abfd0f050..41ea28bdee748 100644 --- a/cpp/src/arrow/adapters/orc/adapter.cc +++ b/cpp/src/arrow/adapters/orc/adapter.cc @@ -92,8 +92,7 @@ namespace orc { namespace { -// The following are required by ORC to be uint64_t -constexpr uint64_t kOrcWriterBatchSize = 128 * 1024; +// The following is required by ORC to be uint64_t constexpr uint64_t kOrcNaturalWriteSize = 128 * 1024; using internal::checked_cast; @@ -632,37 +631,37 @@ class ArrowOutputStream : public liborc::OutputStream { class ORCFileWriter::Impl { public: - Status Open(arrow::io::OutputStream* output_stream) { + Status Open(arrow::io::OutputStream* output_stream, const WriteOptions& write_options) { out_stream_ = std::unique_ptr( checked_cast(new ArrowOutputStream(*output_stream))); + batch_size_ = write_options.batch_size(); + orc_options_ = write_options.get_orc_writer_options(); return Status::OK(); } Status Write(const Table& table) { - std::unique_ptr orc_options = - std::unique_ptr(new liborc::WriterOptions()); ARROW_ASSIGN_OR_RAISE(auto orc_schema, GetOrcType(*(table.schema()))); ORC_CATCH_NOT_OK( - writer_ = liborc::createWriter(*orc_schema, out_stream_.get(), *orc_options)) + writer_ = liborc::createWriter(*orc_schema, out_stream_.get(), *orc_options_)) int64_t num_rows = table.num_rows(); const int num_cols_ = table.num_columns(); std::vector arrow_index_offset(num_cols_, 0); std::vector arrow_chunk_offset(num_cols_, 0); std::unique_ptr batch = - writer_->createRowBatch(kOrcWriterBatchSize); + writer_->createRowBatch(batch_size_); liborc::StructVectorBatch* root = internal::checked_cast(batch.get()); while (num_rows > 0) { for (int i = 0; i < num_cols_; i++) { RETURN_NOT_OK(adapters::orc::WriteBatch( - *(table.column(i)), kOrcWriterBatchSize, &(arrow_chunk_offset[i]), + *(table.column(i)), batch_size_, &(arrow_chunk_offset[i]), &(arrow_index_offset[i]), (root->fields)[i])); } root->numElements = (root->fields)[0]->numElements; writer_->add(*batch); batch->clear(); - num_rows -= kOrcWriterBatchSize; + num_rows -= batch_size_; } return Status::OK(); } @@ -675,6 +674,8 @@ class ORCFileWriter::Impl { private: std::unique_ptr writer_; std::unique_ptr out_stream_; + std::shared_ptr orc_options_; + uint64_t batch_size_; }; ORCFileWriter::~ORCFileWriter() {} @@ -682,10 +683,10 @@ ORCFileWriter::~ORCFileWriter() {} ORCFileWriter::ORCFileWriter() { impl_.reset(new ORCFileWriter::Impl()); } Result> ORCFileWriter::Open( - io::OutputStream* output_stream) { + io::OutputStream* output_stream, const WriteOptions& writer_options) { std::unique_ptr result = std::unique_ptr(new ORCFileWriter()); - Status status = result->impl_->Open(output_stream); + Status status = result->impl_->Open(output_stream, writer_options); RETURN_NOT_OK(status); return std::move(result); } diff --git a/cpp/src/arrow/adapters/orc/adapter.h b/cpp/src/arrow/adapters/orc/adapter.h index e053eab435b22..5513cabf1da67 100644 --- a/cpp/src/arrow/adapters/orc/adapter.h +++ b/cpp/src/arrow/adapters/orc/adapter.h @@ -21,6 +21,7 @@ #include #include +#include "arrow/adapters/orc/adapter_options.h" #include "arrow/io/interfaces.h" #include "arrow/memory_pool.h" #include "arrow/record_batch.h" @@ -264,8 +265,11 @@ class ARROW_EXPORT ORCFileWriter { /// \brief Creates a new ORC writer. /// /// \param[in] output_stream a pointer to the io::OutputStream to write into + /// \param[in] write_options the ORC writer options for Arrow /// \return the returned writer object - static Result> Open(io::OutputStream* output_stream); + static Result> Open( + io::OutputStream* output_stream, + const WriteOptions& write_options = WriteOptions()); /// \brief Write a table /// diff --git a/cpp/src/arrow/adapters/orc/adapter_options.cc b/cpp/src/arrow/adapters/orc/adapter_options.cc new file mode 100644 index 0000000000000..85b6815990df5 --- /dev/null +++ b/cpp/src/arrow/adapters/orc/adapter_options.cc @@ -0,0 +1,216 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/adapters/orc/adapter_options.h" + +#include "arrow/adapters/orc/adapter.h" +#include "orc/Common.hh" +#include "orc/Int128.hh" +#include "orc/Writer.hh" + +namespace liborc = orc; + +namespace arrow { + +namespace adapters { + +namespace orc { + +std::string FileVersion::ToString() const { + std::stringstream ss; + ss << major() << '.' << minor(); + return ss.str(); +} + +const FileVersion& FileVersion::v_0_11() { + static FileVersion version(0, 11); + return version; +} + +const FileVersion& FileVersion::v_0_12() { + static FileVersion version(0, 12); + return version; +} + +WriteOptions::WriteOptions() + : orc_writer_options_(std::make_shared()), batch_size_(1024) { + // PASS +} + +WriteOptions::WriteOptions(const WriteOptions& rhs) + : orc_writer_options_( + std::make_shared(*(rhs.orc_writer_options_.get()))), + batch_size_(rhs.batch_size_) { + // PASS +} + +WriteOptions::WriteOptions(WriteOptions& rhs) { + // swap orc_writer_options_ with rhs + orc_writer_options_.swap(rhs.orc_writer_options_); + batch_size_ = rhs.batch_size_; +} + +WriteOptions& WriteOptions::operator=(const WriteOptions& rhs) { + if (this != &rhs) { + orc_writer_options_.reset( + new liborc::WriterOptions(*(rhs.orc_writer_options_.get()))); + batch_size_ = rhs.batch_size_; + } + return *this; +} + +WriteOptions::~WriteOptions() { + // PASS +} + +WriteOptions& WriteOptions::set_batch_size(uint64_t size) { + batch_size_ = size; + return *this; +} + +uint64_t WriteOptions::batch_size() const { return batch_size_; } + +RleVersion WriteOptions::rle_version() const { + return static_cast(orc_writer_options_->getRleVersion()); +} + +WriteOptions& WriteOptions::set_stripe_size(uint64_t size) { + orc_writer_options_->setStripeSize(size); + return *this; +} + +uint64_t WriteOptions::stripe_size() const { + return orc_writer_options_->getStripeSize(); +} + +WriteOptions& WriteOptions::set_compression_block_size(uint64_t size) { + orc_writer_options_->setCompressionBlockSize(size); + return *this; +} + +uint64_t WriteOptions::compression_block_size() const { + return orc_writer_options_->getCompressionBlockSize(); +} + +WriteOptions& WriteOptions::set_row_index_stride(uint64_t stride) { + orc_writer_options_->setRowIndexStride(stride); + return *this; +} + +uint64_t WriteOptions::row_index_stride() const { + return orc_writer_options_->getRowIndexStride(); +} + +WriteOptions& WriteOptions::set_dictionary_key_size_threshold(double val) { + orc_writer_options_->setDictionaryKeySizeThreshold(val); + return *this; +} + +double WriteOptions::dictionary_key_size_threshold() const { + return orc_writer_options_->getDictionaryKeySizeThreshold(); +} + +WriteOptions& WriteOptions::set_file_version(const FileVersion& version) { + // Only Hive_0_11 and Hive_0_12 version are supported currently + uint32_t major = version.major(), minor = version.minor(); + if (major == 0 && (minor == 11 || minor == 12)) { + orc_writer_options_->setFileVersion(liborc::FileVersion(major, minor)); + return *this; + } + throw std::logic_error("Unsupported file version specified."); +} + +FileVersion WriteOptions::file_version() const { + liborc::FileVersion orc_file_version_ = orc_writer_options_->getFileVersion(); + return FileVersion(orc_file_version_.getMajor(), orc_file_version_.getMinor()); +} + +WriteOptions& WriteOptions::set_compression(CompressionKind comp) { + orc_writer_options_->setCompression(static_cast(comp)); + return *this; +} + +CompressionKind WriteOptions::compression() const { + return static_cast(orc_writer_options_->getCompression()); +} + +WriteOptions& WriteOptions::set_compression_strategy(CompressionStrategy strategy) { + orc_writer_options_->setCompressionStrategy( + static_cast(strategy)); + return *this; +} + +CompressionStrategy WriteOptions::compression_strategy() const { + return static_cast(orc_writer_options_->getCompressionStrategy()); +} + +bool WriteOptions::aligned_bitpacking() const { + return orc_writer_options_->getAlignedBitpacking(); +} + +WriteOptions& WriteOptions::set_padding_tolerance(double tolerance) { + orc_writer_options_->setPaddingTolerance(tolerance); + return *this; +} + +double WriteOptions::padding_tolerance() const { + return orc_writer_options_->getPaddingTolerance(); +} + +WriteOptions& WriteOptions::set_error_stream(std::ostream& err_stream) { + orc_writer_options_->setErrorStream(err_stream); + return *this; +} + +std::ostream* WriteOptions::error_stream() const { + return orc_writer_options_->getErrorStream(); +} + +bool WriteOptions::enable_index() const { return orc_writer_options_->getEnableIndex(); } + +bool WriteOptions::enable_dictionary() const { + return orc_writer_options_->getEnableDictionary(); +} + +WriteOptions& WriteOptions::set_columns_use_bloom_filter( + const std::set& columns) { + orc_writer_options_->setColumnsUseBloomFilter(columns); + return *this; +} + +bool WriteOptions::is_column_use_bloom_filter(uint64_t column) const { + return orc_writer_options_->isColumnUseBloomFilter(column); +} + +WriteOptions& WriteOptions::set_bloom_filter_fpp(double fpp) { + orc_writer_options_->setBloomFilterFPP(fpp); + return *this; +} + +double WriteOptions::bloom_filter_fpp() const { + return orc_writer_options_->getBloomFilterFPP(); +} + +// delibrately not provide setter to write bloom filter version because +// we only support UTF8 for now. +BloomFilterVersion WriteOptions::bloom_filter_version() const { + return static_cast(orc_writer_options_->getBloomFilterVersion()); +} + +} // namespace orc +} // namespace adapters +} // namespace arrow diff --git a/cpp/src/arrow/adapters/orc/adapter_options.h b/cpp/src/arrow/adapters/orc/adapter_options.h new file mode 100644 index 0000000000000..3737db96cc5a8 --- /dev/null +++ b/cpp/src/arrow/adapters/orc/adapter_options.h @@ -0,0 +1,498 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include "arrow/io/interfaces.h" +#include "arrow/util/visibility.h" +#include "orc/OrcFile.hh" + +namespace liborc = orc; + +namespace arrow { + +namespace adapters { + +namespace orc { +// Components of ORC Writer Options + +enum CompressionKind { + CompressionKind_NONE = 0, + CompressionKind_ZLIB = 1, + CompressionKind_SNAPPY = 2, + CompressionKind_LZO = 3, + CompressionKind_LZ4 = 4, + CompressionKind_ZSTD = 5, + CompressionKind_MAX = INT32_MAX +}; + +enum CompressionStrategy { + CompressionStrategy_SPEED = 0, + CompressionStrategy_COMPRESSION +}; + +enum RleVersion { RleVersion_1 = 0, RleVersion_2 = 1 }; + +enum BloomFilterVersion { + // Include both the BLOOM_FILTER and BLOOM_FILTER_UTF8 streams to support + // both old and new readers. + ORIGINAL = 0, + // Only include the BLOOM_FILTER_UTF8 streams that consistently use UTF8. + // See ORC-101 + UTF8 = 1, + FUTURE = INT32_MAX +}; + +class ARROW_EXPORT FileVersion { + private: + uint32_t major_version; + uint32_t minor_version; + + public: + static const FileVersion& v_0_11(); + static const FileVersion& v_0_12(); + + FileVersion(uint32_t major, uint32_t minor) + : major_version(major), minor_version(minor) {} + + /** + * Get major version + */ + uint32_t major() const { return this->major_version; } + + /** + * Get minor version + */ + uint32_t minor() const { return this->minor_version; } + + bool operator==(const FileVersion& right) const { + return this->major_version == right.major() && this->minor_version == right.minor(); + } + + bool operator!=(const FileVersion& right) const { return !(*this == right); } + + std::string ToString() const; +}; + +/** + * Options for creating a Reader. + */ +class ARROW_EXPORT ReadOptions { + public: + ReadOptions(); + ReadOptions(const ReadOptions&); + ReadOptions(ReadOptions&); + ReadOptions& operator=(const ReadOptions&); + virtual ~ReadOptions(); + + /** + * Set the stream to use for printing warning or error messages. + */ + ReadOptions& set_error_stream(std::ostream& stream); + + /** + * Set a serialized copy of the file tail to be used when opening the file. + * + * When one process opens the file and other processes need to read + * the rows, we want to enable clients to just read the tail once. + * By passing the string returned by Reader.serialized_file_tail(), to + * this function, the second reader will not need to read the file tail + * from disk. + * + * @param serialization the bytes of the serialized tail to use + */ + ReadOptions& set_serialized_file_tail(const std::string& serialization); + + /** + * Set the location of the tail as defined by the logical length of the + * file. + */ + ReadOptions& set_tail_location(uint64_t offset); + + /** + * Get the stream to write warnings or errors to. + */ + std::ostream* error_stream() const; + + /** + * Get the serialized file tail that the user passed in. + */ + std::string serialized_file_tail() const; + + /** + * Get the desired tail location. + * @return if not set, return the maximum long. + */ + uint64_t tail_location() const; +}; + +/** + * Options for creating a RowReader. + */ +class RowReaderOptions { + public: + RowReaderOptions(); + RowReaderOptions(const RowReaderOptions&); + RowReaderOptions(RowReaderOptions&); + RowReaderOptions& operator=(const RowReaderOptions&); + virtual ~RowReaderOptions(); + + /** + * For files that have structs as the top-level object, select the fields + * to read. The first field is 0, the second 1, and so on. By default, + * all columns are read. This option clears any previous setting of + * the selected columns. + * @param include a list of fields to read + * @return this + */ + RowReaderOptions& include(const std::list& include); + + /** + * For files that have structs as the top-level object, select the fields + * to read by name. By default, all columns are read. This option clears + * any previous setting of the selected columns. + * @param include a list of fields to read + * @return this + */ + RowReaderOptions& include(const std::list& include); + + /** + * Selects which type ids to read. The root type is always 0 and the + * rest of the types are labeled in a preorder traversal of the tree. + * The parent types are automatically selected, but the children are not. + * + * This option clears any previous setting of the selected columns or + * types. + * @param types a list of the type ids to read + * @return this + */ + RowReaderOptions& include_types(const std::list& types); + + /** + * Set the section of the file to process. + * @param offset the starting byte offset + * @param length the number of bytes to read + * @return this + */ + RowReaderOptions& range(uint64_t offset, uint64_t length); + + /** + * For Hive 0.11 (and 0.12) decimals, the precision was unlimited + * and thus may overflow the 38 digits that is supported. If one + * of the Hive 0.11 decimals is too large, the reader may either convert + * the value to NULL or throw an exception. That choice is controlled + * by this setting. + * + * Defaults to true. + * + * @param should_throw should the reader throw a ParseError? + * @return returns *this + */ + RowReaderOptions& throw_on_hive11_decimal_overflow(bool should_throw); + + /** + * For Hive 0.11 (and 0.12) written decimals, which have unlimited + * scale and precision, the reader forces the scale to a consistent + * number that is configured. This setting changes the scale that is + * forced upon these old decimals. See also throwOnHive11DecimalOverflow. + * + * Defaults to 6. + * + * @param forced_scale the scale that will be forced on Hive 0.11 decimals + * @return returns *this + */ + RowReaderOptions& forced_scale_on_hive11_decimal(int32_t forced_scale); + + /** + * Set enable encoding block mode. + * By enable encoding block mode, Row Reader will not decode + * dictionary encoded string vector, but instead return an index array with + * reference to corresponding dictionary. + */ + RowReaderOptions& set_enable_lazy_decoding(bool enable); + + /** + * Set search argument for predicate push down + */ + RowReaderOptions& searchArgument(std::unique_ptr sargs); + + /** + * Should enable encoding block mode + */ + bool get_enable_lazy_decoding() const; + + /** + * Were the field ids set? + */ + bool get_indexes_set() const; + + /** + * Were the type ids set? + */ + bool get_type_ids_set() const; + + /** + * Get the list of selected field or type ids to read. + */ + const std::list& getInclude() const; + + /** + * Were the include names set? + */ + bool getNamesSet() const; + + /** + * Get the list of selected columns to read. All children of the selected + * columns are also selected. + */ + const std::list& getIncludeNames() const; + + /** + * Get the start of the range for the data being processed. + * @return if not set, return 0 + */ + uint64_t getOffset() const; + + /** + * Get the end of the range for the data being processed. + * @return if not set, return the maximum long + */ + uint64_t getLength() const; + + /** + * Should the reader throw a ParseError when a Hive 0.11 decimal is + * larger than the supported 38 digits of precision? Otherwise, the + * data item is replaced by a NULL. + */ + bool getThrowOnHive11DecimalOverflow() const; + + /** + * What scale should all Hive 0.11 decimals be normalized to? + */ + int32_t getForcedScaleOnHive11Decimal() const; + + /** + * Get search argument for predicate push down + */ + std::shared_ptr getSearchArgument() const; + + /** + * Set desired timezone to return data of timestamp type + */ + RowReaderOptions& setTimezoneName(const std::string& zoneName); + + /** + * Get desired timezone to return data of timestamp type + */ + const std::string& getTimezoneName() const; +}; + +/** + * Options for creating a Writer. + */ +class ARROW_EXPORT WriteOptions { + public: + WriteOptions(); + WriteOptions(const WriteOptions&); + WriteOptions(WriteOptions&); + WriteOptions& operator=(const WriteOptions&); + virtual ~WriteOptions(); + + /** + * Get the ORC writer options + * @return The ORC writer options this WriteOption encapsulates + */ + std::shared_ptr get_orc_writer_options() const { + return orc_writer_options_; + } + + /** + * Set the batch size. + */ + WriteOptions& set_batch_size(uint64_t size); + + /** + * Get the batch size. + * @return if not set, return default value. + */ + uint64_t batch_size() const; + + /** + * Set the stripe size. + */ + WriteOptions& set_stripe_size(uint64_t size); + + /** + * Get the stripe size. + * @return if not set, return default value. + */ + uint64_t stripe_size() const; + + /** + * Set the data compression block size. + */ + WriteOptions& set_compression_block_size(uint64_t size); + + /** + * Get the data compression block size. + * @return if not set, return default value. + */ + uint64_t compression_block_size() const; + + /** + * Set row index stride (the number of rows per an entry in the row index). Use value + * 0 to disable row index. + */ + WriteOptions& set_row_index_stride(uint64_t stride); + + /** + * Get the row index stride (the number of rows per an entry in the row index). + * @return if not set, return default value. + */ + uint64_t row_index_stride() const; + + /** + * Set the dictionary key size threshold. + * 0 to disable dictionary encoding. + * 1 to always enable dictionary encoding. + */ + WriteOptions& set_dictionary_key_size_threshold(double val); + + /** + * Get the dictionary key size threshold. + */ + double dictionary_key_size_threshold() const; + + /** + * Set Orc file version + */ + WriteOptions& set_file_version(const FileVersion& version); + + /** + * Get Orc file version + */ + FileVersion file_version() const; + + /** + * Set compression kind. + */ + WriteOptions& set_compression(CompressionKind comp); + + /** + * Get the compression kind. + * @return if not set, return default value which is ZLIB. + */ + CompressionKind compression() const; + + /** + * Set the compression strategy. + */ + WriteOptions& set_compression_strategy(CompressionStrategy strategy); + + /** + * Get the compression strategy. + * @return if not set, return default value which is speed. + */ + CompressionStrategy compression_strategy() const; + + /** + * Get if the bitpacking should be aligned. + * @return true if should be aligned, return false otherwise + */ + bool aligned_bitpacking() const; + + /** + * Set the padding tolerance. + */ + WriteOptions& set_padding_tolerance(double tolerance); + + /** + * Get the padding tolerance. + * @return if not set, return default value which is zero. + */ + double padding_tolerance() const; + + /** + * Set the error stream. + */ + WriteOptions& set_error_stream(std::ostream& err_stream); + + /** + * Get the error stream. + * @return if not set, return std::err. + */ + std::ostream* error_stream() const; + + /** + * Get the RLE version. + */ + RleVersion rle_version() const; + + /** + * Get whether or not to write row group index + * @return if not set, the default is false + */ + bool enable_index() const; + + /** + * Get whether or not to enable dictionary encoding + * @return if not set, the default is false + */ + bool enable_dictionary() const; + + /** + * Set columns that use BloomFilter + */ + WriteOptions& set_columns_use_bloom_filter(const std::set& columns); + + /** + * Get whether this column uses BloomFilter + */ + bool is_column_use_bloom_filter(uint64_t column) const; + + /** + * Get columns that use BloomFilter + * @return The set of columns that use BloomFilter + */ + std::set columns_use_bloom_filter() const; + + /** + * Set false positive probability of BloomFilter + */ + WriteOptions& set_bloom_filter_fpp(double fpp); + + /** + * Get false positive probability of BloomFilter + */ + double bloom_filter_fpp() const; + + /** + * Get version of BloomFilter + */ + BloomFilterVersion bloom_filter_version() const; + + private: + std::shared_ptr orc_writer_options_; + uint64_t batch_size_; +}; + +} // namespace orc +} // namespace adapters +} // namespace arrow diff --git a/cpp/src/arrow/adapters/orc/adapter_test.cc b/cpp/src/arrow/adapters/orc/adapter_test.cc index 39c66b90f6dc3..a89c831da3166 100644 --- a/cpp/src/arrow/adapters/orc/adapter_test.cc +++ b/cpp/src/arrow/adapters/orc/adapter_test.cc @@ -226,13 +226,48 @@ std::shared_ptr GenerateRandomTable(const std::shared_ptr& schema return Table::Make(schema, cv); } +arrow::adapters::orc::WriteOptions GenerateRandomWriteOptions(uint64_t num_cols) { + auto arrow_write_options = arrow::adapters::orc::WriteOptions(); + arrow_write_options.set_batch_size( + arrow::random_single_int(4ull, 8ull)); + arrow_write_options.set_file_version(arrow::adapters::orc::FileVersion( + 0, arrow::random_single_int(11, 12))); + arrow_write_options.set_stripe_size( + arrow::random_single_int(4ull, 128ull)); + arrow_write_options.set_compression_block_size( + arrow::random_single_int(4ull, 128ull)); + arrow_write_options.set_row_index_stride( + arrow::random_single_int(0, 128ull)); + arrow_write_options.set_compression(static_cast( + arrow::random_single_int(0, 5))); + arrow_write_options.set_compression_strategy( + static_cast( + arrow::random_single_int(0, 1))); + arrow_write_options.set_padding_tolerance( + arrow::random_single_real(0, 1)); + arrow_write_options.set_dictionary_key_size_threshold( + arrow::random_single_real(0, 1)); + arrow_write_options.set_bloom_filter_fpp( + arrow::random_single_real(0, 1)); + std::set bloom_filter_cols; + for (uint64_t i = 0; i < num_cols; i++) { + if (arrow::random_single_int(0, 1) == 1) { + bloom_filter_cols.insert(i); + } + } + arrow_write_options.set_columns_use_bloom_filter(bloom_filter_cols); + return arrow_write_options; +} + void AssertTableWriteReadEqual(const std::shared_ptr
& input_table, const std::shared_ptr
& expected_output_table, const int64_t max_size = kDefaultSmallMemStreamSize) { EXPECT_OK_AND_ASSIGN(auto buffer_output_stream, io::BufferOutputStream::Create(max_size)); - EXPECT_OK_AND_ASSIGN(auto writer, - adapters::orc::ORCFileWriter::Open(buffer_output_stream.get())); + arrow::adapters::orc::WriteOptions write_options = + GenerateRandomWriteOptions(input_table->num_columns()); + EXPECT_OK_AND_ASSIGN(auto writer, adapters::orc::ORCFileWriter::Open( + buffer_output_stream.get(), write_options)); ARROW_EXPECT_OK(writer->Write(*input_table)); ARROW_EXPECT_OK(writer->Close()); EXPECT_OK_AND_ASSIGN(auto buffer, buffer_output_stream->Finish()); @@ -372,6 +407,71 @@ TEST(TestAdapterRead, ReadIntAndStringFileMultipleStripes) { } // WriteORC tests +class TestORCWriteOptions : public ::testing::Test { + public: + TestORCWriteOptions() { arrow_write_options = arrow::adapters::orc::WriteOptions(); } + void SetWriteOptions() { + arrow_write_options.set_batch_size(2048); + arrow_write_options.set_file_version(arrow::adapters::orc::FileVersion(0, 11)); + arrow_write_options.set_stripe_size(1024); + arrow_write_options.set_compression_block_size(1024 * 1024); + arrow_write_options.set_row_index_stride(0); + arrow_write_options.set_compression( + arrow::adapters::orc::CompressionKind::CompressionKind_LZO); + arrow_write_options.set_compression_strategy( + arrow::adapters::orc::CompressionStrategy::CompressionStrategy_COMPRESSION); + arrow_write_options.set_padding_tolerance(0.05); + arrow_write_options.set_dictionary_key_size_threshold(0.1); + arrow_write_options.set_bloom_filter_fpp(0.1); + arrow_write_options.set_columns_use_bloom_filter({0, 2}); + } + + protected: + arrow::adapters::orc::WriteOptions arrow_write_options; +}; + +TEST_F(TestORCWriteOptions, DefaultOptions) { + ASSERT_EQ(arrow_write_options.batch_size(), 1024); + std::shared_ptr orc_writer_options = + arrow_write_options.get_orc_writer_options(); + ASSERT_EQ(orc_writer_options->getFileVersion(), liborc::FileVersion(0, 12)); + ASSERT_EQ(orc_writer_options->getStripeSize(), 64 * 1024 * 1024); + ASSERT_EQ(orc_writer_options->getCompressionBlockSize(), 64 * 1024); + ASSERT_EQ(orc_writer_options->getRowIndexStride(), 10000); + ASSERT_EQ(orc_writer_options->getCompression(), + liborc::CompressionKind::CompressionKind_ZLIB); + ASSERT_EQ(orc_writer_options->getCompressionStrategy(), + liborc::CompressionStrategy::CompressionStrategy_SPEED); + ASSERT_DOUBLE_EQ(orc_writer_options->getPaddingTolerance(), 0.0); + ASSERT_DOUBLE_EQ(orc_writer_options->getDictionaryKeySizeThreshold(), 0.0); + ASSERT_DOUBLE_EQ(orc_writer_options->getBloomFilterFPP(), 0.05); + for (uint64_t i = 0; i < 4; i++) { + ASSERT_FALSE(orc_writer_options->isColumnUseBloomFilter(i)); + } +} + +TEST_F(TestORCWriteOptions, ModifiedOptions) { + SetWriteOptions(); + ASSERT_EQ(arrow_write_options.batch_size(), 2048); + std::shared_ptr orc_writer_options = + arrow_write_options.get_orc_writer_options(); + ASSERT_EQ(orc_writer_options->getFileVersion(), liborc::FileVersion(0, 11)); + ASSERT_EQ(orc_writer_options->getStripeSize(), 1024); + ASSERT_EQ(orc_writer_options->getCompressionBlockSize(), 1024 * 1024); + ASSERT_EQ(orc_writer_options->getRowIndexStride(), 0); + ASSERT_EQ(orc_writer_options->getCompression(), + liborc::CompressionKind::CompressionKind_LZO); + ASSERT_EQ(orc_writer_options->getCompressionStrategy(), + liborc::CompressionStrategy::CompressionStrategy_COMPRESSION); + ASSERT_DOUBLE_EQ(orc_writer_options->getPaddingTolerance(), 0.05); + ASSERT_DOUBLE_EQ(orc_writer_options->getDictionaryKeySizeThreshold(), 0.1); + ASSERT_DOUBLE_EQ(orc_writer_options->getBloomFilterFPP(), 0.1); + for (uint64_t i = 0; i < 2; i++) { + ASSERT_TRUE(orc_writer_options->isColumnUseBloomFilter(2 * i)); + ASSERT_FALSE(orc_writer_options->isColumnUseBloomFilter(2 * i + 1)); + } +} + // Trivial class TestORCWriterTrivialNoConversion : public ::testing::Test { diff --git a/cpp/src/arrow/testing/random.h b/cpp/src/arrow/testing/random.h index c77ae2525c44f..ba154d1891ff5 100644 --- a/cpp/src/arrow/testing/random.h +++ b/cpp/src/arrow/testing/random.h @@ -322,6 +322,16 @@ class ARROW_TESTING_EXPORT RandomArrayGenerator { std::shared_ptr FixedSizeBinary(int64_t size, int32_t byte_width, double null_probability = 0); + // /// \brief Generate a random StructArray + // /// + // /// \param[in] children Vector of Arrays containing the data for each child + // /// \param[in] size The size of the generated list array + // /// \param[in] null_probability the probability of a list value being null + // /// + // /// \return a generated Array + // std::shared_ptr Struct(const ArrayVector& children, int64_t size, + // double null_probability); + /// \brief Generate a random ListArray /// /// \param[in] values The underlying values array @@ -460,6 +470,22 @@ ARROW_TESTING_EXPORT void rand_month_day_nanos(int64_t N, std::vector* out); +template +U random_single_int(T lower, T upper) { + const int random_seed = 0; + std::default_random_engine gen(random_seed); + std::uniform_int_distribution d(lower, upper); + return static_cast(d(gen)); +} + +template +U random_single_real(T min_value, T max_value) { + const int random_seed = 0; + std::default_random_engine gen(random_seed); + ::arrow::random::uniform_real_distribution d(min_value, max_value); + return static_cast(d(gen)); +} + template void randint(int64_t N, T lower, T upper, std::vector* out) { const int random_seed = 0; diff --git a/docs/source/cpp/orc.rst b/docs/source/cpp/orc.rst new file mode 100644 index 0000000000000..8ac7a0751d449 --- /dev/null +++ b/docs/source/cpp/orc.rst @@ -0,0 +1,55 @@ +.. Licensed to the Apache Software Foundation (ASF) under one +.. or more contributor license agreements. See the NOTICE file +.. distributed with this work for additional information +.. regarding copyright ownership. The ASF licenses this file +.. to you under the Apache License, Version 2.0 (the +.. "License"); you may not use this file except in compliance +.. with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, +.. software distributed under the License is distributed on an +.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +.. KIND, either express or implied. See the License for the +.. specific language governing permissions and limitations +.. under the License. + +.. default-domain:: cpp +.. highlight:: cpp + +.. cpp:namespace:: arrow::adapters::orc + +============================= +Reading and Writing ORC files +============================= + +Arrow provides a fast ORC reader and a fast ORC writer allowing ingestion of external data +as Arrow tables. + +.. seealso:: + :ref:`ORC reader/writer API reference `. + +Basic usage +=========== + +An ORC file is written to a :class:`~arrow::io::OutputStream`. + +.. code-block:: cpp + + #include + { + // Oneshot write + // ... + std::shared_ptr output = ...; + auto writer_options = WriterOptions(); + ARROW_ASSIGN_OR_RAISE(auto writer, ORCFileWriter::Open(output.get(), writer_options)); + if (!(writer->Write(*input_table)).ok()) { + // Handle write error... + } + if (!(writer->Close()).ok()) { + // Handle close error... + } + } + +.. note:: The writer does not yet support all Arrow types. \ No newline at end of file diff --git a/docs/source/python/orc.rst b/docs/source/python/orc.rst new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/python/pyarrow/_orc.pxd b/python/pyarrow/_orc.pxd index 736622591b430..89c5ec8f1caf9 100644 --- a/python/pyarrow/_orc.pxd +++ b/python/pyarrow/_orc.pxd @@ -18,8 +18,10 @@ # distutils: language = c++ # cython: language_level = 3 +from libcpp cimport bool as c_bool from libc.string cimport const_char from libcpp.vector cimport vector as std_vector +from libcpp.set cimport set as std_set from pyarrow.includes.common cimport * from pyarrow.includes.libarrow cimport (CArray, CSchema, CStatus, CResult, CTable, CMemoryPool, @@ -29,6 +31,66 @@ from pyarrow.includes.libarrow cimport (CArray, CSchema, CStatus, CRandomAccessFile, COutputStream, TimeUnit) +cdef extern from "arrow/adapters/orc/adapter_options.h" \ + namespace "arrow::adapters::orc" nogil: + enum CompressionKind" arrow::adapters::orc::CompressionKind": + _CompressionKind_NONE" arrow::adapters::orc::CompressionKind::CompressionKind_NONE" + _CompressionKind_ZLIB" arrow::adapters::orc::CompressionKind::CompressionKind_ZLIB" + _CompressionKind_SNAPPY" arrow::adapters::orc::CompressionKind::CompressionKind_SNAPPY" + _CompressionKind_LZO" arrow::adapters::orc::CompressionKind::CompressionKind_LZO" + _CompressionKind_LZ4" arrow::adapters::orc::CompressionKind::CompressionKind_LZ4" + _CompressionKind_ZSTD" arrow::adapters::orc::CompressionKind::CompressionKind_ZSTD" + _CompressionKind_MAX" arrow::adapters::orc::CompressionKind::CompressionKind_MAX" + + enum CompressionStrategy" arrow::adapters::orc::CompressionStrategy": + _CompressionStrategy_SPEED" arrow::adapters::orc::CompressionStrategy::CompressionStrategy_SPEED" + _CompressionStrategy_COMPRESSION" arrow::adapters::orc::CompressionStrategy::CompressionStrategy_COMPRESSION" + + enum RleVersion" arrow::adapters::orc::RleVersion": + _RleVersion_1" arrow::adapters::orc::RleVersion::RleVersion_1" + _RleVersion_2" arrow::adapters::orc::RleVersion::RleVersion_2" + + enum BloomFilterVersion" arrow::adapters::orc::BloomFilterVersion": + _BloomFilterVersion_ORIGINAL" arrow::adapters::orc::BloomFilterVersion::ORIGINAL" + _BloomFilterVersion_UTF8" arrow::adapters::orc::BloomFilterVersion::UTF8" + _BloomFilterVersion_FUTURE" arrow::adapters::orc::BloomFilterVersion::FUTURE" + + cdef cppclass FileVersion" arrow::adapters::orc::FileVersion": + FileVersion(uint32_t major, uint32_t minor) + uint32_t major() + uint32_t minor() + c_string ToString() + + cdef cppclass WriteOptions" arrow::adapters::orc::WriteOptions": + WriteOptions() + WriteOptions& set_batch_size(uint64_t size) + uint64_t batch_size() + WriteOptions& set_stripe_size(uint64_t size) + uint64_t stripe_size() + WriteOptions& set_compression_block_size(uint64_t size) + uint64_t compression_block_size() + WriteOptions& set_row_index_stride(uint64_t stride) + uint64_t row_index_stride() + WriteOptions& set_dictionary_key_size_threshold(double val) + double dictionary_key_size_threshold() + WriteOptions& set_file_version(const FileVersion& version) + FileVersion file_version() + WriteOptions& set_compression(CompressionKind comp) + CompressionKind compression() + WriteOptions& set_compression_strategy(CompressionStrategy strategy) + CompressionStrategy compression_strategy() + c_bool aligned_bitpacking() + WriteOptions& set_padding_tolerance(double tolerance) + double padding_tolerance() + RleVersion rle_version() + c_bool enable_index() + c_bool enable_dictionary() + WriteOptions& set_columns_use_bloom_filter(const std_set[uint64_t]& columns) + c_bool is_column_use_bloom_filter(uint64_t column) + WriteOptions& set_bloom_filter_fpp(double fpp) + double bloom_filter_fpp() + BloomFilterVersion bloom_filter_version() + cdef extern from "arrow/adapters/orc/adapter.h" \ namespace "arrow::adapters::orc" nogil: @@ -56,7 +118,8 @@ cdef extern from "arrow/adapters/orc/adapter.h" \ cdef cppclass ORCFileWriter: @staticmethod - CResult[unique_ptr[ORCFileWriter]] Open(COutputStream* output_stream) + CResult[unique_ptr[ORCFileWriter]] Open( + COutputStream* output_stream, const WriteOptions& writer_options) CStatus Write(const CTable& table) diff --git a/python/pyarrow/_orc.pyx b/python/pyarrow/_orc.pyx index 18ca28682a46a..9919380f1a883 100644 --- a/python/pyarrow/_orc.pyx +++ b/python/pyarrow/_orc.pyx @@ -39,6 +39,206 @@ from pyarrow.lib cimport (check_status, _Weakrefable, from pyarrow.lib import tobytes +cdef compression_kind_from_enum(CompressionKind compression_kind_): + return { + _CompressionKind_NONE: 'UNCOMPRESSED', + _CompressionKind_ZLIB: 'ZLIB', + _CompressionKind_SNAPPY: 'SNAPPY', + _CompressionKind_LZO: 'LZO', + _CompressionKind_LZ4: 'LZ4', + _CompressionKind_ZSTD: 'ZSTD', + }.get(compression_kind_, 'UNKNOWN') + + +cdef CompressionKind compression_kind_from_name(name): + name = name.upper() + if name == 'ZLIB': + return _CompressionKind_ZLIB + elif name == 'SNAPPY': + return _CompressionKind_SNAPPY + elif name == 'LZO': + return _CompressionKind_LZO + elif name == 'LZ4': + return _CompressionKind_LZ4 + elif name == 'ZSTD': + return _CompressionKind_ZSTD + elif name == 'UNCOMPRESSED': + return _CompressionKind_NONE + raise ValueError('Unknown CompressionKind: {0}'.format(name)) + + +cdef compression_strategy_from_enum(CompressionStrategy compression_strategy_): + return { + _CompressionStrategy_SPEED: 'SPEED', + _CompressionStrategy_COMPRESSION: 'COMPRESSION', + }.get(compression_strategy_, 'UNKNOWN') + + +cdef CompressionStrategy compression_strategy_from_name(name): + name = name.upper() + # SPEED is the default value in the ORC C++ implementaton + if name == 'COMPRESSION': + return _CompressionStrategy_COMPRESSION + elif name == 'SPEED': + return _CompressionStrategy_SPEED + raise ValueError('Unknown CompressionStrategy: {0}'.format(name)) + + +cdef rle_version_from_enum(RleVersion rle_version_): + return { + _RleVersion_1: '1', + _RleVersion_2: '2', + }.get(rle_version_, 'UNKNOWN') + + +cdef bloom_filter_version_from_enum(BloomFilterVersion bloom_filter_version_): + return { + _BloomFilterVersion_ORIGINAL: 'ORIGINAL', + _BloomFilterVersion_UTF8: 'UTF8', + _BloomFilterVersion_FUTURE: 'FUTURE', + }.get(bloom_filter_version_, 'UNKNOWN') + + +cdef file_version_from_class(FileVersion file_version_): + cdef object file_version = file_version_.ToString() + return file_version + +cdef shared_ptr[WriteOptions] _create_write_options( + file_version=None, + batch_size=None, + stripe_size=None, + compression=None, + compression_block_size=None, + compression_strategy=None, + row_index_stride=None, + padding_tolerance=None, + dictionary_key_size_threshold=None, + bloom_filter_columns=None, + bloom_filter_fpp=None +) except *: + """General writer options""" + cdef: + shared_ptr[WriteOptions] options + + options = make_shared[WriteOptions]() + + # batch_size + + if batch_size is not None: + if isinstance(batch_size, int) and batch_size > 0: + deref(options).set_batch_size(batch_size) + else: + raise ValueError("Invalid ORC writer batch size: {0}" + .format(batch_size)) + + # file_version + + if file_version is not None: + if str(file_version) == "0.12": + deref(options).set_file_version(FileVersion(0, 12)) + elif str(file_version) == "0.11": + deref(options).set_file_version(FileVersion(0, 11)) + else: + raise ValueError("Unsupported ORC file version: {0}" + .format(file_version)) + + # stripe_size + + if stripe_size is not None: + if isinstance(stripe_size, int) and stripe_size > 0: + deref(options).set_stripe_size(stripe_size) + else: + raise ValueError("Invalid ORC stripe size: {0}" + .format(stripe_size)) + + # compression + + if compression is not None: + if isinstance(compression, basestring): + deref(options).set_compression( + compression_kind_from_name(compression)) + else: + raise ValueError("Unsupported ORC compression kind: {0}" + .format(compression)) + + # compression_block_size + + if compression_block_size is not None: + if isinstance(compression_block_size, int) and compression_block_size > 0: + deref(options).set_compression_block_size(compression_block_size) + else: + raise ValueError("Invalid ORC compression block size: {0}" + .format(compression_block_size)) + + # compression_strategy + + if compression_strategy is not None: + if isinstance(compression, basestring): + deref(options).set_compression_strategy( + compression_strategy_from_name(compression_strategy)) + else: + raise ValueError("Unsupported ORC compression strategy: {0}" + .format(compression_strategy)) + + # row_index_stride + + if row_index_stride is not None: + if isinstance(row_index_stride, int) and row_index_stride > 0: + deref(options).set_row_index_stride(row_index_stride) + else: + raise ValueError("Invalid ORC row index stride: {0}" + .format(row_index_stride)) + + # padding_tolerance + + if padding_tolerance is not None: + try: + padding_tolerance = float(padding_tolerance) + deref(options).set_padding_tolerance(padding_tolerance) + except Exception: + raise ValueError("Invalid ORC padding tolerance: {0}" + .format(padding_tolerance)) + + # dictionary_key_size_threshold + + if dictionary_key_size_threshold is not None: + try: + dictionary_key_size_threshold = float( + dictionary_key_size_threshold) + deref(options).set_dictionary_key_size_threshold( + dictionary_key_size_threshold) + except Exception: + raise ValueError("Invalid ORC dictionary key size threshold: {0}" + .format(dictionary_key_size_threshold)) + + # bloom_filter_columns + + if bloom_filter_columns is not None: + try: + bloom_filter_columns = set(bloom_filter_columns) + for col in bloom_filter_columns: + assert isinstance(col, int) and col >= 0 + deref(options).set_columns_use_bloom_filter( + bloom_filter_columns) + except Exception: + raise ValueError("Invalid ORC BloomFilter columns: {0}" + .format(bloom_filter_columns)) + + # False positive rate of the Bloom Filter + + if bloom_filter_fpp is not None: + try: + bloom_filter_fpp = float(bloom_filter_fpp) + assert bloom_filter_fpp >= 0 and bloom_filter_fpp <= 1 + deref(options).set_bloom_filter_fpp( + bloom_filter_fpp) + except Exception: + raise ValueError("Invalid ORC BloomFilter false positive rate: {0}" + .format(dictionary_key_size_threshold)) + + return options + + cdef class ORCReader(_Weakrefable): cdef: object source @@ -140,16 +340,43 @@ cdef class ORCReader(_Weakrefable): cdef class ORCWriter(_Weakrefable): cdef: - object source + object sink unique_ptr[ORCFileWriter] writer shared_ptr[COutputStream] rd_handle - def open(self, object source): - self.source = source - get_writer(source, &self.rd_handle) + def open(self, object sink, file_version=None, + batch_size=None, + stripe_size=None, + compression=None, + compression_block_size=None, + compression_strategy=None, + row_index_stride=None, + padding_tolerance=None, + dictionary_key_size_threshold=None, + bloom_filter_columns=None, + bloom_filter_fpp=None): + cdef: + shared_ptr[WriteOptions] write_options + self.sink = sink + get_writer(sink, &self.rd_handle) + + write_options = _create_write_options( + file_version=file_version, + batch_size=batch_size, + stripe_size=stripe_size, + compression=compression, + compression_block_size=compression_block_size, + compression_strategy=compression_strategy, + row_index_stride=row_index_stride, + padding_tolerance=padding_tolerance, + dictionary_key_size_threshold=dictionary_key_size_threshold, + bloom_filter_columns=bloom_filter_columns, + bloom_filter_fpp=bloom_filter_fpp + ) + with nogil: self.writer = move(GetResultValue[unique_ptr[ORCFileWriter]]( - ORCFileWriter.Open(self.rd_handle.get()))) + ORCFileWriter.Open(self.rd_handle.get(), deref(write_options)))) def write(self, Table table): cdef: diff --git a/python/pyarrow/orc.py b/python/pyarrow/orc.py index 87f80588621a2..4ebcd9fc08792 100644 --- a/python/pyarrow/orc.py +++ b/python/pyarrow/orc.py @@ -117,8 +117,40 @@ def read(self, columns=None): return self.reader.read(columns=columns) +_orc_writer_args_docs = """file_version : {"0.11", "0.12"}, default "0.12" + Determine which ORC file version to use. Hive 0.11 / ORC v0 is the older + version as defined `here ` + while Hive 0.12 / ORC v1 is the newer one as defined + `here `. +batch_size : int, default 1024 + Number of rows the ORC writer writes at a time. +stripe_size : int, default 64 * 1024 * 1024 + Size of each ORC stripe. +compression : string, default 'zlib' + Specify the compression codec. + Valid values: {'UNCOMPRESSED', 'SNAPPY', 'ZLIB', 'LZ0', 'LZ4', 'ZSTD'} +compression_block_size : int, default 64 * 1024 + Specify the size of each compression block. +compression_strategy : string, default 'speed' + Specify the compression strategy i.e. speed vs size reduction. + Valid values: {'SPEED', 'COMPRESSION'} +row_index_stride : int, default 10000 + Specify the row index stride i.e. the number of rows per + an entry in the row index. +padding_tolerance : double, default 0.0 + Set the padding tolerance. +dictionary_key_size_threshold : double, default 0.0 + Set the dictionary key size threshold. 0 to disable dictionary encoding. + 1 to always enable dictionary encoding. +bloom_filter_columns : None, set-like or list-like, default None + Set columns that use the bloom filter. +bloom_filter_fpp: double, default 0.05 + Set false positive probability of the bloom filter. +""" + + class ORCWriter: - """ + __doc__ = """ Writer interface for a single ORC file Parameters @@ -127,11 +159,50 @@ class ORCWriter: Writable target. For passing Python file objects or byte buffers, see pyarrow.io.PythonFileInterface, pyarrow.io.BufferOutputStream or pyarrow.io.FixedSizeBufferWriter. - """ + {} + """.format(_orc_writer_args_docs) + + def __init__(self, where, file_version='0.12', + batch_size=1024, + stripe_size=67108864, + compression='zlib', + compression_block_size=65536, + compression_strategy='speed', + row_index_stride=10000, + padding_tolerance=0.0, + dictionary_key_size_threshold=0.0, + bloom_filter_columns=None, + bloom_filter_fpp=0.05, + ): - def __init__(self, where): self.writer = _orc.ORCWriter() - self.writer.open(where) + self.writer.open( + where, + file_version=file_version, + batch_size=batch_size, + stripe_size=stripe_size, + compression=compression, + compression_block_size=compression_block_size, + compression_strategy=compression_strategy, + row_index_stride=row_index_stride, + padding_tolerance=padding_tolerance, + dictionary_key_size_threshold=dictionary_key_size_threshold, + bloom_filter_columns=bloom_filter_columns, + bloom_filter_fpp=bloom_filter_fpp + ) + self.is_open = True + + def __del__(self): + if getattr(self, 'is_open', False): + self.close() + + def __enter__(self): + return self + + def __exit__(self, *args, **kwargs): + self.close() + # return false since we want to propagate exceptions + return False def write(self, table): """ @@ -143,17 +214,54 @@ def write(self, table): schema : pyarrow.lib.Table The table to be written into the ORC file """ + assert self.is_open self.writer.write(table) def close(self): """ Close the ORC file """ - self.writer.close() + if self.is_open: + self.writer.close() + self.is_open = False -def write_table(table, where): - """ +def write_table(table, where, file_version='0.12', + batch_size=1024, + stripe_size=67108864, + compression='zlib', + compression_block_size=65536, + compression_strategy='speed', + row_index_stride=10000, + padding_tolerance=0.0, + dictionary_key_size_threshold=0.0, + bloom_filter_columns=None, + bloom_filter_fpp=0.05): + if isinstance(where, Table): + warnings.warn( + "The order of the arguments has changed. Pass as " + "'write_table(table, where)' instead. The old order will raise " + "an error in the future.", FutureWarning, stacklevel=2 + ) + table, where = where, table + with ORCWriter( + where, + file_version=file_version, + batch_size=batch_size, + stripe_size=stripe_size, + compression=compression, + compression_block_size=compression_block_size, + compression_strategy=compression_strategy, + row_index_stride=row_index_stride, + padding_tolerance=padding_tolerance, + dictionary_key_size_threshold=dictionary_key_size_threshold, + bloom_filter_columns=bloom_filter_columns, + bloom_filter_fpp=bloom_filter_fpp + ) as writer: + writer.write(table) + + +write_table.__doc__ = """ Write a table into an ORC file Parameters @@ -164,14 +272,5 @@ def write_table(table, where): Writable target. For passing Python file objects or byte buffers, see pyarrow.io.PythonFileInterface, pyarrow.io.BufferOutputStream or pyarrow.io.FixedSizeBufferWriter. - """ - if isinstance(where, Table): - warnings.warn( - "The order of the arguments has changed. Pass as " - "'write_table(table, where)' instead. The old order will raise " - "an error in the future.", FutureWarning, stacklevel=2 - ) - table, where = where, table - writer = ORCWriter(where) - writer.write(table) - writer.close() + {} + """.format(_orc_writer_args_docs)