From 999c131908379f646fc18de66254151b4fbf447c Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Wed, 10 May 2017 13:43:05 -0700 Subject: [PATCH] ORC-176: Refactor common classes for writer and reader This is mainly a refactoring change for ORC-176, including: 1. Extracted common classes and functions into Common.hh and Common.cc; 2. Put InputStream interface and its implementations from Compression.hh to InputStream.hh and InputStream.cc. --- c++/include/orc/Common.hh | 167 ++++++++++++++++++++++++++++ c++/include/orc/Reader.hh | 137 +---------------------- c++/src/CMakeLists.txt | 2 + c++/src/Common.cc | 107 ++++++++++++++++++ c++/src/Compression.cc | 195 --------------------------------- c++/src/Compression.hh | 92 +--------------- c++/src/Reader.cc | 82 -------------- c++/src/io/InputStream.cc | 222 ++++++++++++++++++++++++++++++++++++++ c++/src/io/InputStream.hh | 116 ++++++++++++++++++++ 9 files changed, 616 insertions(+), 504 deletions(-) create mode 100644 c++/include/orc/Common.hh create mode 100644 c++/src/Common.cc create mode 100644 c++/src/io/InputStream.cc create mode 100644 c++/src/io/InputStream.hh diff --git a/c++/include/orc/Common.hh b/c++/include/orc/Common.hh new file mode 100644 index 0000000000..f499b81f78 --- /dev/null +++ b/c++/include/orc/Common.hh @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ORC_COMMON_HH +#define ORC_COMMON_HH + +#include "orc/Vector.hh" +#include "orc/Type.hh" +#include "Exceptions.hh" +#include "wrap/orc-proto-wrapper.hh" + +#include + +namespace orc { + enum CompressionKind { + CompressionKind_NONE = 0, + CompressionKind_ZLIB = 1, + CompressionKind_SNAPPY = 2, + CompressionKind_LZO = 3, + CompressionKind_LZ4 = 4, + CompressionKind_ZSTD = 5, + CompressionKind_MAX = INT64_MAX + }; + + /** + * Get the name of the CompressionKind. + */ + std::string compressionKindToString(CompressionKind kind); + + enum WriterVersion { + WriterVersion_ORIGINAL = 0, + WriterVersion_HIVE_8732 = 1, + WriterVersion_HIVE_4243 = 2, + WriterVersion_HIVE_12055 = 3, + WriterVersion_HIVE_13083 = 4, + WriterVersion_ORC_101 = 5, + WriterVersion_ORC_135 = 6, + WriterVersion_MAX = INT64_MAX + }; + + /** + * Get the name of the WriterVersion. + */ + std::string writerVersionToString(WriterVersion kind); + + enum StreamKind { + StreamKind_PRESENT = 0, + StreamKind_DATA = 1, + StreamKind_LENGTH = 2, + StreamKind_DICTIONARY_DATA = 3, + StreamKind_DICTIONARY_COUNT = 4, + StreamKind_SECONDARY = 5, + StreamKind_ROW_INDEX = 6, + StreamKind_BLOOM_FILTER = 7 + }; + + /** + * Get the string representation of the StreamKind. + */ + std::string streamKindToString(StreamKind kind); + + class StreamInformation { + public: + virtual ~StreamInformation(); + + virtual StreamKind getKind() const = 0; + virtual uint64_t getColumnId() const = 0; + virtual uint64_t getOffset() const = 0; + virtual uint64_t getLength() const = 0; + }; + + enum ColumnEncodingKind { + ColumnEncodingKind_DIRECT = 0, + ColumnEncodingKind_DICTIONARY = 1, + ColumnEncodingKind_DIRECT_V2 = 2, + ColumnEncodingKind_DICTIONARY_V2 = 3 + }; + + std::string columnEncodingKindToString(ColumnEncodingKind kind); + + class StripeInformation { + public: + virtual ~StripeInformation(); + + /** + * Get the byte offset of the start of the stripe. + * @return the bytes from the start of the file + */ + virtual uint64_t getOffset() const = 0; + + /** + * Get the total length of the stripe in bytes. + * @return the number of bytes in the stripe + */ + virtual uint64_t getLength() const = 0; + + /** + * Get the length of the stripe's indexes. + * @return the number of bytes in the index + */ + virtual uint64_t getIndexLength() const = 0; + + /** + * Get the length of the stripe's data. + * @return the number of bytes in the stripe + */ + virtual uint64_t getDataLength()const = 0; + + /** + * Get the length of the stripe's tail section, which contains its index. + * @return the number of bytes in the tail + */ + virtual uint64_t getFooterLength() const = 0; + + /** + * Get the number of rows in the stripe. + * @return a count of the number of rows + */ + virtual uint64_t getNumberOfRows() const = 0; + + /** + * Get the number of streams in the stripe. + */ + virtual uint64_t getNumberOfStreams() const = 0; + + /** + * Get the StreamInformation for the given stream. + */ + virtual ORC_UNIQUE_PTR + getStreamInformation(uint64_t streamId) const = 0; + + /** + * Get the column encoding for the given column. + * @param colId the columnId + */ + virtual ColumnEncodingKind getColumnEncoding(uint64_t colId) const = 0; + + /** + * Get the dictionary size. + * @param colId the columnId + * @return the size of the dictionary or 0 if there isn't one + */ + virtual uint64_t getDictionarySize(uint64_t colId) const = 0; + + /** + * Get the writer timezone. + */ + virtual const std::string& getWriterTimezone() const = 0; + }; +} + +#endif diff --git a/c++/include/orc/Reader.hh b/c++/include/orc/Reader.hh index 76d7853451..3912bd77a1 100644 --- a/c++/include/orc/Reader.hh +++ b/c++/include/orc/Reader.hh @@ -19,6 +19,7 @@ #ifndef ORC_READER_HH #define ORC_READER_HH +#include "orc/Common.hh" #include "orc/orc-config.hh" #include "orc/Statistics.hh" #include "orc/Type.hh" @@ -34,142 +35,6 @@ namespace orc { struct ReaderOptionsPrivate; struct RowReaderOptionsPrivate; - enum CompressionKind { - CompressionKind_NONE = 0, - CompressionKind_ZLIB = 1, - CompressionKind_SNAPPY = 2, - CompressionKind_LZO = 3, - CompressionKind_LZ4 = 4, - CompressionKind_ZSTD = 5, - CompressionKind_MAX = INT64_MAX - }; - - /** - * Get the name of the CompressionKind. - */ - std::string compressionKindToString(CompressionKind kind); - - enum WriterVersion { - WriterVersion_ORIGINAL = 0, - WriterVersion_HIVE_8732 = 1, - WriterVersion_HIVE_4243 = 2, - WriterVersion_HIVE_12055 = 3, - WriterVersion_HIVE_13083 = 4, - WriterVersion_ORC_101 = 5, - WriterVersion_ORC_135 = 6, - WriterVersion_MAX = INT64_MAX - }; - - /** - * Get the name of the WriterVersion. - */ - std::string writerVersionToString(WriterVersion kind); - - enum StreamKind { - StreamKind_PRESENT = 0, - StreamKind_DATA = 1, - StreamKind_LENGTH = 2, - StreamKind_DICTIONARY_DATA = 3, - StreamKind_DICTIONARY_COUNT = 4, - StreamKind_SECONDARY = 5, - StreamKind_ROW_INDEX = 6, - StreamKind_BLOOM_FILTER = 7 - }; - - /** - * Get the string representation of the StreamKind. - */ - std::string streamKindToString(StreamKind kind); - - class StreamInformation { - public: - virtual ~StreamInformation(); - - virtual StreamKind getKind() const = 0; - virtual uint64_t getColumnId() const = 0; - virtual uint64_t getOffset() const = 0; - virtual uint64_t getLength() const = 0; - }; - - enum ColumnEncodingKind { - ColumnEncodingKind_DIRECT = 0, - ColumnEncodingKind_DICTIONARY = 1, - ColumnEncodingKind_DIRECT_V2 = 2, - ColumnEncodingKind_DICTIONARY_V2 = 3 - }; - - std::string columnEncodingKindToString(ColumnEncodingKind kind); - - class StripeInformation { - public: - virtual ~StripeInformation(); - - /** - * Get the byte offset of the start of the stripe. - * @return the bytes from the start of the file - */ - virtual uint64_t getOffset() const = 0; - - /** - * Get the total length of the stripe in bytes. - * @return the number of bytes in the stripe - */ - virtual uint64_t getLength() const = 0; - - /** - * Get the length of the stripe's indexes. - * @return the number of bytes in the index - */ - virtual uint64_t getIndexLength() const = 0; - - /** - * Get the length of the stripe's data. - * @return the number of bytes in the stripe - */ - virtual uint64_t getDataLength()const = 0; - - /** - * Get the length of the stripe's tail section, which contains its index. - * @return the number of bytes in the tail - */ - virtual uint64_t getFooterLength() const = 0; - - /** - * Get the number of rows in the stripe. - * @return a count of the number of rows - */ - virtual uint64_t getNumberOfRows() const = 0; - - /** - * Get the number of streams in the stripe. - */ - virtual uint64_t getNumberOfStreams() const = 0; - - /** - * Get the StreamInformation for the given stream. - */ - virtual ORC_UNIQUE_PTR - getStreamInformation(uint64_t streamId) const = 0; - - /** - * Get the column encoding for the given column. - * @param colId the columnId - */ - virtual ColumnEncodingKind getColumnEncoding(uint64_t colId) const = 0; - - /** - * Get the dictionary size. - * @param colId the columnId - * @return the size of the dictionary or 0 if there isn't one - */ - virtual uint64_t getDictionarySize(uint64_t colId) const = 0; - - /** - * Get the writer timezone. - */ - virtual const std::string& getWriterTimezone() const = 0; - }; - /** * Options for creating a Reader. */ diff --git a/c++/src/CMakeLists.txt b/c++/src/CMakeLists.txt index d717fb4d8e..c39437a5f2 100644 --- a/c++/src/CMakeLists.txt +++ b/c++/src/CMakeLists.txt @@ -135,10 +135,12 @@ add_custom_command(OUTPUT orc_proto.pb.h orc_proto.pb.cc add_library (orc STATIC "${CMAKE_CURRENT_BINARY_DIR}/Adaptor.hh" orc_proto.pb.h + io/InputStream.cc wrap/orc-proto-wrapper.cc ByteRLE.cc ColumnPrinter.cc ColumnReader.cc + Common.cc Compression.cc Exceptions.cc Int128.cc diff --git a/c++/src/Common.cc b/c++/src/Common.cc new file mode 100644 index 0000000000..7813612933 --- /dev/null +++ b/c++/src/Common.cc @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "orc/Common.hh" + +#include + +namespace orc { + + std::string compressionKindToString(CompressionKind kind) { + switch (static_cast(kind)) { + case CompressionKind_NONE: + return "none"; + case CompressionKind_ZLIB: + return "zlib"; + case CompressionKind_SNAPPY: + return "snappy"; + case CompressionKind_LZO: + return "lzo"; + case CompressionKind_LZ4: + return "lz4"; + case CompressionKind_ZSTD: + return "zstd"; + } + std::stringstream buffer; + buffer << "unknown - " << kind; + return buffer.str(); + } + + std::string writerVersionToString(WriterVersion version) { + switch (static_cast(version)) { + case WriterVersion_ORIGINAL: + return "original"; + case WriterVersion_HIVE_8732: + return "HIVE-8732"; + case WriterVersion_HIVE_4243: + return "HIVE-4243"; + case WriterVersion_HIVE_12055: + return "HIVE-12055"; + case WriterVersion_HIVE_13083: + return "HIVE-13083"; + case WriterVersion_ORC_101: + return "ORC-101"; + case WriterVersion_ORC_135: + return "ORC-135"; + } + std::stringstream buffer; + buffer << "future - " << version; + return buffer.str(); + } + + std::string streamKindToString(StreamKind kind) { + switch (static_cast(kind)) { + case StreamKind_PRESENT: + return "present"; + case StreamKind_DATA: + return "data"; + case StreamKind_LENGTH: + return "length"; + case StreamKind_DICTIONARY_DATA: + return "dictionary"; + case StreamKind_DICTIONARY_COUNT: + return "dictionary count"; + case StreamKind_SECONDARY: + return "secondary"; + case StreamKind_ROW_INDEX: + return "index"; + case StreamKind_BLOOM_FILTER: + return "bloom"; + } + std::stringstream buffer; + buffer << "unknown - " << kind; + return buffer.str(); + } + + std::string columnEncodingKindToString(ColumnEncodingKind kind) { + switch (static_cast(kind)) { + case ColumnEncodingKind_DIRECT: + return "direct"; + case ColumnEncodingKind_DICTIONARY: + return "dictionary"; + case ColumnEncodingKind_DIRECT_V2: + return "direct rle2"; + case ColumnEncodingKind_DICTIONARY_V2: + return "dictionary rle2"; + } + std::stringstream buffer; + buffer << "unknown - " << kind; + return buffer.str(); + } + +} diff --git a/c++/src/Compression.cc b/c++/src/Compression.cc index 81cc578550..e2f9dbf4d5 100644 --- a/c++/src/Compression.cc +++ b/c++/src/Compression.cc @@ -33,201 +33,6 @@ namespace orc { - void printBuffer(std::ostream& out, - const char *buffer, - uint64_t length) { - const uint64_t width = 24; - out << std::hex; - for(uint64_t line = 0; line < (length + width - 1) / width; ++line) { - out << std::setfill('0') << std::setw(7) << (line * width); - for(uint64_t byte = 0; - byte < width && line * width + byte < length; ++byte) { - out << " " << std::setfill('0') << std::setw(2) - << static_cast(0xff & buffer[line * width + - byte]); - } - out << "\n"; - } - out << std::dec; - } - - PositionProvider::PositionProvider(const std::list& posns) { - position = posns.begin(); - } - - uint64_t PositionProvider::next() { - uint64_t result = *position; - ++position; - return result; - } - - SeekableInputStream::~SeekableInputStream() { - // PASS - } - - SeekableArrayInputStream::~SeekableArrayInputStream() { - // PASS - } - - SeekableArrayInputStream::SeekableArrayInputStream - (const unsigned char* values, - uint64_t size, - uint64_t blkSize - ): data(reinterpret_cast(values)) { - length = size; - position = 0; - blockSize = blkSize == 0 ? length : static_cast(blkSize); - } - - SeekableArrayInputStream::SeekableArrayInputStream(const char* values, - uint64_t size, - uint64_t blkSize - ): data(values) { - length = size; - position = 0; - blockSize = blkSize == 0 ? length : static_cast(blkSize); - } - - bool SeekableArrayInputStream::Next(const void** buffer, int*size) { - uint64_t currentSize = std::min(length - position, blockSize); - if (currentSize > 0) { - *buffer = data + position; - *size = static_cast(currentSize); - position += currentSize; - return true; - } - *size = 0; - return false; - } - - void SeekableArrayInputStream::BackUp(int count) { - if (count >= 0) { - uint64_t unsignedCount = static_cast(count); - if (unsignedCount <= blockSize && unsignedCount <= position) { - position -= unsignedCount; - } else { - throw std::logic_error("Can't backup that much!"); - } - } - } - - bool SeekableArrayInputStream::Skip(int count) { - if (count >= 0) { - uint64_t unsignedCount = static_cast(count); - if (unsignedCount + position <= length) { - position += unsignedCount; - return true; - } else { - position = length; - } - } - return false; - } - - google::protobuf::int64 SeekableArrayInputStream::ByteCount() const { - return static_cast(position); - } - - void SeekableArrayInputStream::seek(PositionProvider& seekPosition) { - position = seekPosition.next(); - } - - std::string SeekableArrayInputStream::getName() const { - std::ostringstream result; - result << "SeekableArrayInputStream " << position << " of " << length; - return result.str(); - } - - static uint64_t computeBlock(uint64_t request, uint64_t length) { - return std::min(length, request == 0 ? 256 * 1024 : request); - } - - SeekableFileInputStream::SeekableFileInputStream(InputStream* stream, - uint64_t offset, - uint64_t byteCount, - MemoryPool& _pool, - uint64_t _blockSize - ):pool(_pool), - input(stream), - start(offset), - length(byteCount), - blockSize(computeBlock - (_blockSize, - length)) { - - position = 0; - buffer.reset(new DataBuffer(pool)); - pushBack = 0; - } - - SeekableFileInputStream::~SeekableFileInputStream() { - // PASS - } - - bool SeekableFileInputStream::Next(const void** data, int*size) { - uint64_t bytesRead; - if (pushBack != 0) { - *data = buffer->data() + (buffer->size() - pushBack); - bytesRead = pushBack; - } else { - bytesRead = std::min(length - position, blockSize); - buffer->resize(bytesRead); - if (bytesRead > 0) { - input->read(buffer->data(), bytesRead, start+position); - *data = static_cast(buffer->data()); - } - } - position += bytesRead; - pushBack = 0; - *size = static_cast(bytesRead); - return bytesRead != 0; - } - - void SeekableFileInputStream::BackUp(int signedCount) { - if (signedCount < 0) { - throw std::logic_error("can't backup negative distances"); - } - uint64_t count = static_cast(signedCount); - if (pushBack > 0) { - throw std::logic_error("can't backup unless we just called Next"); - } - if (count > blockSize || count > position) { - throw std::logic_error("can't backup that far"); - } - pushBack = static_cast(count); - position -= pushBack; - } - - bool SeekableFileInputStream::Skip(int signedCount) { - if (signedCount < 0) { - return false; - } - uint64_t count = static_cast(signedCount); - position = std::min(position + count, length); - pushBack = 0; - return position < length; - } - - int64_t SeekableFileInputStream::ByteCount() const { - return static_cast(position); - } - - void SeekableFileInputStream::seek(PositionProvider& location) { - position = location.next(); - if (position > length) { - position = length; - throw std::logic_error("seek too far"); - } - pushBack = 0; - } - - std::string SeekableFileInputStream::getName() const { - std::ostringstream result; - result << input->getName() << " from " << start << " for " - << length; - return result.str(); - } - enum DecompressState { DECOMPRESS_HEADER, DECOMPRESS_START, DECOMPRESS_CONTINUE, diff --git a/c++/src/Compression.hh b/c++/src/Compression.hh index efd374a6f5..8c3eda7782 100644 --- a/c++/src/Compression.hh +++ b/c++/src/Compression.hh @@ -19,100 +19,10 @@ #ifndef ORC_COMPRESSION_HH #define ORC_COMPRESSION_HH -#include "orc/OrcFile.hh" - -#include "Adaptor.hh" -#include "wrap/zero-copy-stream-wrapper.h" - -#include -#include -#include -#include -#include -#include +#include "io/InputStream.hh" namespace orc { - void printBuffer(std::ostream& out, - const char *buffer, - uint64_t length); - - class PositionProvider { - private: - std::list::const_iterator position; - public: - PositionProvider(const std::list& positions); - uint64_t next(); - }; - - /** - * A subclass of Google's ZeroCopyInputStream that supports seek. - * By extending Google's class, we get the ability to pass it directly - * to the protobuf readers. - */ - class SeekableInputStream: public google::protobuf::io::ZeroCopyInputStream { - public: - virtual ~SeekableInputStream(); - virtual void seek(PositionProvider& position) = 0; - virtual std::string getName() const = 0; - }; - - /** - * Create a seekable input stream based on a memory range. - */ - class SeekableArrayInputStream: public SeekableInputStream { - private: - const char* data; - uint64_t length; - uint64_t position; - uint64_t blockSize; - - public: - SeekableArrayInputStream(const unsigned char* list, - uint64_t length, - uint64_t block_size = 0); - SeekableArrayInputStream(const char* list, - uint64_t length, - uint64_t block_size = 0); - virtual ~SeekableArrayInputStream(); - virtual bool Next(const void** data, int*size) override; - virtual void BackUp(int count) override; - virtual bool Skip(int count) override; - virtual google::protobuf::int64 ByteCount() const override; - virtual void seek(PositionProvider& position) override; - virtual std::string getName() const override; - }; - - /** - * Create a seekable input stream based on an input stream. - */ - class SeekableFileInputStream: public SeekableInputStream { - private: - MemoryPool& pool; - InputStream* const input; - const uint64_t start; - const uint64_t length; - const uint64_t blockSize; - std::unique_ptr > buffer; - uint64_t position; - uint64_t pushBack; - - public: - SeekableFileInputStream(InputStream* input, - uint64_t offset, - uint64_t byteCount, - MemoryPool& pool, - uint64_t blockSize = 0); - virtual ~SeekableFileInputStream(); - - virtual bool Next(const void** data, int*size) override; - virtual void BackUp(int count) override; - virtual bool Skip(int count) override; - virtual int64_t ByteCount() const override; - virtual void seek(PositionProvider& position) override; - virtual std::string getName() const override; - }; - /** * Create a decompressor for the given compression kind. * @param kind the compression type to implement diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc index 9c423bdd0f..fe8608a93f 100644 --- a/c++/src/Reader.cc +++ b/c++/src/Reader.cc @@ -35,48 +35,6 @@ namespace orc { - std::string compressionKindToString(CompressionKind kind) { - switch (static_cast(kind)) { - case CompressionKind_NONE: - return "none"; - case CompressionKind_ZLIB: - return "zlib"; - case CompressionKind_SNAPPY: - return "snappy"; - case CompressionKind_LZO: - return "lzo"; - case CompressionKind_LZ4: - return "lz4"; - case CompressionKind_ZSTD: - return "zstd"; - } - std::stringstream buffer; - buffer << "unknown - " << kind; - return buffer.str(); - } - - std::string writerVersionToString(WriterVersion version) { - switch (static_cast(version)) { - case WriterVersion_ORIGINAL: - return "original"; - case WriterVersion_HIVE_8732: - return "HIVE-8732"; - case WriterVersion_HIVE_4243: - return "HIVE-4243"; - case WriterVersion_HIVE_12055: - return "HIVE-12055"; - case WriterVersion_HIVE_13083: - return "HIVE-13083"; - case WriterVersion_ORC_101: - return "ORC-101"; - case WriterVersion_ORC_135: - return "ORC-135"; - } - std::stringstream buffer; - buffer << "future - " << version; - return buffer.str(); - } - uint64_t getCompressionBlockSize(const proto::PostScript& ps) { if (ps.has_compressionblocksize()) { return ps.compressionblocksize(); @@ -961,46 +919,6 @@ namespace orc { postscriptLength)); } - std::string streamKindToString(StreamKind kind) { - switch (static_cast(kind)) { - case StreamKind_PRESENT: - return "present"; - case StreamKind_DATA: - return "data"; - case StreamKind_LENGTH: - return "length"; - case StreamKind_DICTIONARY_DATA: - return "dictionary"; - case StreamKind_DICTIONARY_COUNT: - return "dictionary count"; - case StreamKind_SECONDARY: - return "secondary"; - case StreamKind_ROW_INDEX: - return "index"; - case StreamKind_BLOOM_FILTER: - return "bloom"; - } - std::stringstream buffer; - buffer << "unknown - " << kind; - return buffer.str(); - } - - std::string columnEncodingKindToString(ColumnEncodingKind kind) { - switch (static_cast(kind)) { - case ColumnEncodingKind_DIRECT: - return "direct"; - case ColumnEncodingKind_DICTIONARY: - return "dictionary"; - case ColumnEncodingKind_DIRECT_V2: - return "direct rle2"; - case ColumnEncodingKind_DICTIONARY_V2: - return "dictionary rle2"; - } - std::stringstream buffer; - buffer << "unknown - " << kind; - return buffer.str(); - } - RowReader::~RowReader() { // PASS } diff --git a/c++/src/io/InputStream.cc b/c++/src/io/InputStream.cc new file mode 100644 index 0000000000..fd91b23e70 --- /dev/null +++ b/c++/src/io/InputStream.cc @@ -0,0 +1,222 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "Exceptions.hh" +#include "InputStream.hh" + +#include +#include + +namespace orc { + + void printBuffer(std::ostream& out, + const char *buffer, + uint64_t length) { + const uint64_t width = 24; + out << std::hex; + for(uint64_t line = 0; line < (length + width - 1) / width; ++line) { + out << std::setfill('0') << std::setw(7) << (line * width); + for(uint64_t byte = 0; + byte < width && line * width + byte < length; ++byte) { + out << " " << std::setfill('0') << std::setw(2) + << static_cast(0xff & buffer[line * width + + byte]); + } + out << "\n"; + } + out << std::dec; + } + + PositionProvider::PositionProvider(const std::list& posns) { + position = posns.begin(); + } + + uint64_t PositionProvider::next() { + uint64_t result = *position; + ++position; + return result; + } + + SeekableInputStream::~SeekableInputStream() { + // PASS + } + + SeekableArrayInputStream::~SeekableArrayInputStream() { + // PASS + } + + SeekableArrayInputStream::SeekableArrayInputStream + (const unsigned char* values, + uint64_t size, + uint64_t blkSize + ): data(reinterpret_cast(values)) { + length = size; + position = 0; + blockSize = blkSize == 0 ? length : static_cast(blkSize); + } + + SeekableArrayInputStream::SeekableArrayInputStream(const char* values, + uint64_t size, + uint64_t blkSize + ): data(values) { + length = size; + position = 0; + blockSize = blkSize == 0 ? length : static_cast(blkSize); + } + + bool SeekableArrayInputStream::Next(const void** buffer, int*size) { + uint64_t currentSize = std::min(length - position, blockSize); + if (currentSize > 0) { + *buffer = data + position; + *size = static_cast(currentSize); + position += currentSize; + return true; + } + *size = 0; + return false; + } + + void SeekableArrayInputStream::BackUp(int count) { + if (count >= 0) { + uint64_t unsignedCount = static_cast(count); + if (unsignedCount <= blockSize && unsignedCount <= position) { + position -= unsignedCount; + } else { + throw std::logic_error("Can't backup that much!"); + } + } + } + + bool SeekableArrayInputStream::Skip(int count) { + if (count >= 0) { + uint64_t unsignedCount = static_cast(count); + if (unsignedCount + position <= length) { + position += unsignedCount; + return true; + } else { + position = length; + } + } + return false; + } + + google::protobuf::int64 SeekableArrayInputStream::ByteCount() const { + return static_cast(position); + } + + void SeekableArrayInputStream::seek(PositionProvider& seekPosition) { + position = seekPosition.next(); + } + + std::string SeekableArrayInputStream::getName() const { + std::ostringstream result; + result << "SeekableArrayInputStream " << position << " of " << length; + return result.str(); + } + + static uint64_t computeBlock(uint64_t request, uint64_t length) { + return std::min(length, request == 0 ? 256 * 1024 : request); + } + + SeekableFileInputStream::SeekableFileInputStream(InputStream* stream, + uint64_t offset, + uint64_t byteCount, + MemoryPool& _pool, + uint64_t _blockSize + ):pool(_pool), + input(stream), + start(offset), + length(byteCount), + blockSize(computeBlock + (_blockSize, + length)) { + + position = 0; + buffer.reset(new DataBuffer(pool)); + pushBack = 0; + } + + SeekableFileInputStream::~SeekableFileInputStream() { + // PASS + } + + bool SeekableFileInputStream::Next(const void** data, int*size) { + uint64_t bytesRead; + if (pushBack != 0) { + *data = buffer->data() + (buffer->size() - pushBack); + bytesRead = pushBack; + } else { + bytesRead = std::min(length - position, blockSize); + buffer->resize(bytesRead); + if (bytesRead > 0) { + input->read(buffer->data(), bytesRead, start+position); + *data = static_cast(buffer->data()); + } + } + position += bytesRead; + pushBack = 0; + *size = static_cast(bytesRead); + return bytesRead != 0; + } + + void SeekableFileInputStream::BackUp(int signedCount) { + if (signedCount < 0) { + throw std::logic_error("can't backup negative distances"); + } + uint64_t count = static_cast(signedCount); + if (pushBack > 0) { + throw std::logic_error("can't backup unless we just called Next"); + } + if (count > blockSize || count > position) { + throw std::logic_error("can't backup that far"); + } + pushBack = static_cast(count); + position -= pushBack; + } + + bool SeekableFileInputStream::Skip(int signedCount) { + if (signedCount < 0) { + return false; + } + uint64_t count = static_cast(signedCount); + position = std::min(position + count, length); + pushBack = 0; + return position < length; + } + + int64_t SeekableFileInputStream::ByteCount() const { + return static_cast(position); + } + + void SeekableFileInputStream::seek(PositionProvider& location) { + position = location.next(); + if (position > length) { + position = length; + throw std::logic_error("seek too far"); + } + pushBack = 0; + } + + std::string SeekableFileInputStream::getName() const { + std::ostringstream result; + result << input->getName() << " from " << start << " for " + << length; + return result.str(); + } + +} diff --git a/c++/src/io/InputStream.hh b/c++/src/io/InputStream.hh new file mode 100644 index 0000000000..5ea8c7ea60 --- /dev/null +++ b/c++/src/io/InputStream.hh @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ORC_INPUTSTREAM_HH +#define ORC_INPUTSTREAM_HH + +#include "Adaptor.hh" +#include "orc/OrcFile.hh" +#include "wrap/zero-copy-stream-wrapper.h" + +#include +#include +#include +#include +#include + +namespace orc { + + void printBuffer(std::ostream& out, + const char *buffer, + uint64_t length); + + class PositionProvider { + private: + std::list::const_iterator position; + public: + PositionProvider(const std::list& positions); + uint64_t next(); + }; + + /** + * A subclass of Google's ZeroCopyInputStream that supports seek. + * By extending Google's class, we get the ability to pass it directly + * to the protobuf readers. + */ + class SeekableInputStream: public google::protobuf::io::ZeroCopyInputStream { + public: + virtual ~SeekableInputStream(); + virtual void seek(PositionProvider& position) = 0; + virtual std::string getName() const = 0; + }; + + /** + * Create a seekable input stream based on a memory range. + */ + class SeekableArrayInputStream: public SeekableInputStream { + private: + const char* data; + uint64_t length; + uint64_t position; + uint64_t blockSize; + + public: + SeekableArrayInputStream(const unsigned char* list, + uint64_t length, + uint64_t block_size = 0); + SeekableArrayInputStream(const char* list, + uint64_t length, + uint64_t block_size = 0); + virtual ~SeekableArrayInputStream(); + virtual bool Next(const void** data, int*size) override; + virtual void BackUp(int count) override; + virtual bool Skip(int count) override; + virtual google::protobuf::int64 ByteCount() const override; + virtual void seek(PositionProvider& position) override; + virtual std::string getName() const override; + }; + + /** + * Create a seekable input stream based on an input stream. + */ + class SeekableFileInputStream: public SeekableInputStream { + private: + MemoryPool& pool; + InputStream* const input; + const uint64_t start; + const uint64_t length; + const uint64_t blockSize; + std::unique_ptr > buffer; + uint64_t position; + uint64_t pushBack; + + public: + SeekableFileInputStream(InputStream* input, + uint64_t offset, + uint64_t byteCount, + MemoryPool& pool, + uint64_t blockSize = 0); + virtual ~SeekableFileInputStream(); + + virtual bool Next(const void** data, int*size) override; + virtual void BackUp(int count) override; + virtual bool Skip(int count) override; + virtual int64_t ByteCount() const override; + virtual void seek(PositionProvider& position) override; + virtual std::string getName() const override; + }; + +} + +#endif //ORC_INPUTSTREAM_HH