Skip to content

Commit

Permalink
Merge pull request #34 from martindurant/for_dask
Browse files Browse the repository at this point in the history
To allow dask to read parallel chunks from one file
  • Loading branch information
martindurant committed Nov 29, 2016
2 parents bcdb683 + 66c8c44 commit 18a9c38
Showing 1 changed file with 13 additions and 8 deletions.
21 changes: 13 additions & 8 deletions fastparquet/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,16 @@ def __init__(self, fn, verify=False, open_with=default_open,
sep=os.sep):
try:
fn2 = sep.join([fn, '_metadata'])
f = open_with(fn2, 'rb')
self.fn = fn2
with open_with(fn2, 'rb') as f:
self._parse_header(f, verify)
fn = fn2
except (IOError, OSError):
f = open_with(fn, 'rb')
self.fn = fn
with open_with(fn, 'rb') as f:
self._parse_header(f, verify)
self.open = open_with
self.fn = fn
self.sep = sep
with f as f:
self._parse_header(f, verify)
self._read_partitions()

def _parse_header(self, f, verify=True):
Expand Down Expand Up @@ -107,8 +108,12 @@ def _read_partitions(self):
self.cats = {key: list(v) for key, v in cats.items()}

def row_group_filename(self, rg):
return self.sep.join([os.path.dirname(self.fn),
rg.columns[0].file_path])
if rg.columns[0].file_path:
return self.sep.join([os.path.dirname(self.fn),
rg.columns[0].file_path])
else:
return self.fn


def read_row_group_file(self, rg, columns, categories, index=None):
""" Open file for reading, and process it as a row-group """
Expand Down Expand Up @@ -331,7 +336,7 @@ def statistics(obj):
if se.converted_type is not None:
for name in ['min', 'max']:
d[name][column] = (
[None] if d[name][column] is None
[None] if d[name][column] is None or None in d[name][column]
else list(converted_types.convert(d[name][column], se))
)
return d
Expand Down

0 comments on commit 18a9c38

Please sign in to comment.