Skip to content

Commit

Permalink
decompressionreader: implement i/o stream class and API for decompres…
Browse files Browse the repository at this point in the history
…sion

Like we just did for compression.

This is a precursor to #13.
  • Loading branch information
indygreg committed Mar 16, 2017
1 parent 690d534 commit 6f3b613
Show file tree
Hide file tree
Showing 11 changed files with 880 additions and 0 deletions.
2 changes: 2 additions & 0 deletions NEWS.rst
Expand Up @@ -17,6 +17,8 @@ Backwards Compatibility Notes
Changes
-------

* New ``ZstdDecompressor.stream_reader()`` API to obtain a read-only i/o stream
of decompressed data for a source.
* New ``ZstdCompressor.stream_reader()`` API to obtain a read-only i/o stream of
compressed data for a source.
* Renamed ``ZstdDecompressor.read_from()`` to ``ZstdDecompressor.read_to_iter()``.
Expand Down
36 changes: 36 additions & 0 deletions README.rst
Expand Up @@ -617,6 +617,42 @@ result in a lot of work for the memory allocator and may result in
If the exact size of decompressed data is unknown, it is **strongly**
recommended to use a streaming API.

Stream Reader API
^^^^^^^^^^^^^^^^^

``stream_reader(source)`` can be used to obtain an object conforming to the
``io.RawIOBase`` interface for reading decompressed output as a stream::

with open(path, 'rb') as fh:
dctx = zstd.ZstdDecpmpressor()
with dctx.stream_reader(fh) as reader:
while True:
chunk = reader.read(16384)
if not chunk:
break

# Do something with decompressed chunk.

The stream can only be read within a context manager. When the context
manager exits, the stream is closed and the underlying resource is
released and future operations against the stream will fail.

The ``source`` argument to ``stream_reader()`` can be any object with a
``read(size)`` method or any object implementing the *buffer protocol*.

If the ``source`` is a stream, you can specify how large ``read()`` requests
to that stream should be via the ``read_size`` argument. It defaults to
``zstandard.DECOMPRESSION_RECOMMENDED_INPUT_SIZE``.::

with open(path, 'rb') as fh:
dctx = zstd.ZstdDecompressor()
# Will perform fh.read(8192) when obtaining data for the decompressor.
with dctx.stream_reader(fh, read_size=8192) as reader:
...

The stream returned by ``stream_reader()`` is neither writable nor seekable
``tell()`` returns the number of decompressed bytes emitted so far.

Streaming Input API
^^^^^^^^^^^^^^^^^^^

Expand Down
9 changes: 9 additions & 0 deletions bench.py
Expand Up @@ -349,6 +349,15 @@ def decompress_multi_decompress_to_buffer_list(chunks, opts, threads):
zctx.multi_decompress_to_buffer(chunks, threads=threads)


@bench('discrete', 'stream_reader()')
def decompress_stream_reader(chunks, opts):
zctx = zstd.ZstdDecompressor(**opts)
for chunk in chunks:
with zctx.stream_reader(chunk) as reader:
while reader.read(16384):
pass


@bench('discrete', 'write_to()')
def decompress_write_to(chunks, opts):
zctx = zstd.ZstdDecompressor(**opts)
Expand Down

0 comments on commit 6f3b613

Please sign in to comment.