Skip to content

Commit

Permalink
Allow reading from any file-like (#330)
Browse files Browse the repository at this point in the history
* Allow reading from any file-like

* new zstd

* fix zstandard

* reinstate option
  • Loading branch information
martindurant committed Apr 27, 2018
1 parent caccd51 commit e2962a9
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 5 deletions.
12 changes: 11 additions & 1 deletion fastparquet/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ class ParquetFile(object):
fn: path/URL string or list of paths
Location of the data. If a directory, will attempt to read a file
"_metadata" within that directory. If a list of paths, will assume
that they make up a single parquet data set.
that they make up a single parquet data set. This parameter can also
be any file-like object, in which case this must be a single-file
dataset.
verify: bool [False]
test file start/end byte markers
open_with: function
Expand Down Expand Up @@ -87,6 +89,14 @@ def __init__(self, fn, verify=False, open_with=default_open,
self.fn = '_metadata'
self.fmd = fmd
self._set_attrs()
elif hasattr(fn, 'read'):
# file-like
self._parse_header(fn, verify)
if self.file_scheme not in ['simple', 'empty']:
raise ValueError('Cannot use file-like input '
'with multi-file data')
open_with = lambda *args, **kwargs: fn
self.fn = None
else:
try:
fn2 = join_path(fn, '_metadata')
Expand Down
22 changes: 18 additions & 4 deletions fastparquet/compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,18 +74,32 @@ def lz4_decompress(data, uncompressed_size):
except ImportError:
pass
try:
import zstd
import zstandard
def zstd_compress(data, **kwargs):
kwargs['write_content_size'] = False
cctx = zstd.ZstdCompressor(**kwargs)
return cctx.compress(data, allow_empty=True)
cctx = zstandard.ZstdCompressor(**kwargs)
return cctx.compress(data)
def zstd_decompress(data, uncompressed_size):
dctx = zstd.ZstdDecompressor()
dctx = zstandard.ZstdDecompressor()
return dctx.decompress(data, max_output_size=uncompressed_size)
compressions['ZSTD'] = zstd_compress
decompressions['ZSTD'] = zstd_decompress
except ImportError:
pass
if 'ZSTD' not in compressions:
try:
import zstd
def zstd_compress(data, **kwargs):
kwargs['write_content_size'] = False
cctx = zstd.ZstdCompressor(**kwargs)
return cctx.compress(data, allow_empty=True)
def zstd_decompress(data, uncompressed_size):
dctx = zstd.ZstdDecompressor()
return dctx.decompress(data, max_output_size=uncompressed_size)
compressions['ZSTD'] = zstd_compress
decompressions['ZSTD'] = zstd_decompress
except ImportError:
pass

compressions = {k.upper(): v for k, v in compressions.items()}
decompressions = {k.upper(): v for k, v in decompressions.items()}
Expand Down
18 changes: 18 additions & 0 deletions fastparquet/test/test_api.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import unicode_literals

import io
import os

import numpy as np
Expand Down Expand Up @@ -142,6 +143,23 @@ def test_open_standard(tempdir):
pd.util.testing.assert_frame_equal(d2, df)


def test_filelike(tempdir):
df = pd.DataFrame({'x': [1, 2, 3, 4],
'y': [1.0, 2.0, 1.0, 2.0],
'z': ['a', 'b', 'c', 'd']})
fn = os.path.join(tempdir, 'foo.parquet')
write(fn, df, row_group_offsets=[0, 2])
with open(fn, 'rb') as f:
pf = ParquetFile(f, open_with=open)
d2 = pf.to_pandas()
pd.util.testing.assert_frame_equal(d2, df)

b = io.BytesIO(open(fn, 'rb').read())
pf = ParquetFile(b, open_with=open)
d2 = pf.to_pandas()
pd.util.testing.assert_frame_equal(d2, df)


def test_cast_index(tempdir):
df = pd.DataFrame({'i8': np.array([1, 2, 3, 4], dtype='uint8'),
'i16': np.array([1, 2, 3, 4], dtype='int16'),
Expand Down

0 comments on commit e2962a9

Please sign in to comment.