-
Notifications
You must be signed in to change notification settings - Fork 4k
ARROW-1019: [C++] Implement compressed streams #2777
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
09e6e3c to
0efd75f
Compare
|
Will review this today |
wesm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some minor comments, but this looks great!
| #include <memory> | ||
| #include <mutex> | ||
| #include <string> | ||
| #include <utility> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cpp/src/arrow/io/compressed.cc
Outdated
|
|
||
| class CompressedOutputStream::Impl { | ||
| public: | ||
| Impl(MemoryPool* pool, Codec* codec, std::shared_ptr<OutputStream> raw) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could do const std::shared_ptr<T>& to possibly avoid extra copy
| int64_t bytes_read, bytes_written; | ||
| int64_t input_len = nbytes; | ||
| int64_t output_len = compressed_->size() - compressed_pos_; | ||
| uint8_t* output = compressed_->mutable_data() + compressed_pos_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if there might be a use for an intermediate abstraction "CompressionBuffer" that encapsulates some of this book-keeping that shows up in many places (unit tests and implementations). This could be passed into the stream compressor functions instead of raw pointers, allowing the compressor to request that the buffer be enlarged, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know how it would look like, though.
| if (is_open_) { | ||
| is_open_ = false; | ||
| RETURN_NOT_OK(FinalizeCompression()); | ||
| return raw_->Close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we definitely want to close the passed output stream? I'm trying to think if there are scenarios where we would not want to
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think by default, yes. We could add a constructor argument if we need to keep the underlying file alive in some cases.
| bool need_more_output; | ||
| int64_t bytes_read, bytes_written; | ||
| int64_t input_len = compressed_->size() - compressed_pos_; | ||
| const uint8_t* input = compressed_->data() + compressed_pos_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same question here re: output buffer bookkeeping
cpp/src/arrow/io/compressed.cc
Outdated
|
|
||
| private: | ||
| // Write 64 KB compressed data at a time | ||
| static const int64_t CHUNK_SIZE = 64 * 1024; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kChunkSize
| std::shared_ptr<CompressedOutputStream>* out); | ||
| static Status Make(MemoryPool* pool, util::Codec* codec, | ||
| std::shared_ptr<OutputStream> raw, | ||
| std::shared_ptr<CompressedOutputStream>* out); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe use const std::shared_ptr<T>& here and elsewhere in the public APIs for consistency
| /// \brief Create a compressed output stream wrapping the given output stream. | ||
| static Status Make(util::Codec* codec, std::shared_ptr<OutputStream> raw, | ||
| std::shared_ptr<CompressedOutputStream>* out); | ||
| static Status Make(MemoryPool* pool, util::Codec* codec, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can codec be const? Also, can it by const Codec&?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, Codec::MakeCompressor and Codec::MakeDecompressor are non-const methods. IMHO it doesn't mean much to have a const Codec.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's more of an argument passing consistency. We generally use const T& for immutable arguments, T* for mutable arguments, and const T* in the rarer case when the input may be null
| std::shared_ptr<CompressedInputStream>* out); | ||
| static Status Make(MemoryPool* pool, util::Codec* codec, | ||
| std::shared_ptr<InputStream> raw, | ||
| std::shared_ptr<CompressedInputStream>* out); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here
| ASSERT_RAISES(IOError, stream->Read(1024, &out_buf)); | ||
| } | ||
|
|
||
| // NOTE: Snappy doesn't support streaming decompression |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could (should?) define a framed format of our own devising
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But is it useful? The main point here is to interact with existing files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably not. I will say YAGNI for now ;)
a0b333b to
64461f3
Compare
|
+1. Let me have a look at the appveyor build |
|
I'll wait a little while to make sure appveyor looks good then merge |
Also works for other bundled compression types. Based on PR #2777. Author: Antoine Pitrou <antoine@python.org> Closes #2786 from pitrou/ARROW-3380-gzipped-csv-read and squashes the following commits: 9a2244f <Antoine Pitrou> ARROW-3380: Support reading gzipped CSV files
Implement
CompressedInputStreamandCompressedOutputStreamC++ classes. Tested with gzip, brotli and zstd codecs.I initially intended to expose the functionality in Python, but
NativeFileexpects aRandomAccessFilein read mode (rather than a mereInputStream).