Skip to content

Commit

Permalink
Merge pull request #183 from martindurant/single_partition_fix
Browse files Browse the repository at this point in the history
Single partition fix
  • Loading branch information
martindurant committed Jul 21, 2017
2 parents ff73af0 + 66ca59e commit 65769ad
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 33 deletions.
8 changes: 6 additions & 2 deletions fastparquet/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ class ParquetFile(object):
evaluated to a file open for reading. Defaults to the built-in `open`.
sep: string [`os.sep`]
Path separator to use, if data is in multiple files.
root: str
If passing a list of files, the top directory of the data-set may
be ambiguous for partitioning where the upmost field has only one
value. Use this to specify the data'set root directory, if required.
Attributes
----------
Expand Down Expand Up @@ -74,11 +78,11 @@ class ParquetFile(object):
Max/min/count of each column chunk
"""
def __init__(self, fn, verify=False, open_with=default_open,
sep=os.sep):
sep=os.sep, root=False):
self.sep = sep
if isinstance(fn, (tuple, list)):
basepath, fmd = metadata_from_many(fn, verify_schema=verify,
open_with=open_with)
open_with=open_with, root=root)
self.fn = sep.join([basepath, '_metadata']) # effective file
self.fmd = fmd
self._set_attrs()
Expand Down
4 changes: 2 additions & 2 deletions fastparquet/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,8 @@ def read_row_group_arrays(file, rg, columns, categories, schema_helper, cats,
key, value = out[name], maps[name]
else:
value, key = out[name], maps[name]
for i, (k, v) in enumerate(zip(key, value)):
out[name][i] = dict(zip(k, v)) if k is not None else None
out[name][:] = [dict(zip(k, v)) if k is not None else None
for k, v in zip(key, value)]


def read_row_group(file, rg, columns, categories, schema_helper, cats,
Expand Down
17 changes: 17 additions & 0 deletions fastparquet/test/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,23 @@ def test_read_multiple_no_metadata(tempdir):
pd.util.testing.assert_frame_equal(out, df)


def test_single_upper_directory(tempdir):
df = pd.DataFrame({'x': [1, 5, 2, 5], 'y': ['aa'] * 4})
write(tempdir, df, file_scheme='hive', partition_on='y')
pf = ParquetFile(tempdir)
out = pf.to_pandas()
assert (out.y == 'aa').all()

os.unlink(os.path.join(tempdir, '_metadata'))
os.unlink(os.path.join(tempdir, '_common_metadata'))
import glob
flist = list(sorted(glob.glob(os.path.join(tempdir, '*/*'))))
pf = ParquetFile(flist, root=tempdir)
assert pf.fn == os.path.join(tempdir, '_metadata')
out = pf.to_pandas()
assert (out.y == 'aa').all()


def test_filter_without_paths(tempdir):
fn = os.path.join(tempdir, 'test.parq')
df = pd.DataFrame({
Expand Down
32 changes: 17 additions & 15 deletions fastparquet/test/test_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ def test_roundtrip_complex(tempdir, scheme,):
def test_datetime_roundtrip(tempdir, df, capsys):
fname = os.path.join(tempdir, 'test.parquet')
w = False
if 'x' in df and str(df.x.dtype.tz) == 'Europe/London':
if 'x' in df and 'Europe/' in str(df.x.dtype.tz):
with pytest.warns(UserWarning) as w:
write(fname, df)
else:
Expand Down Expand Up @@ -617,20 +617,22 @@ def test_autocat(tempdir):
assert out.dtypes['o'] == 'category'

# regression test
pf.fmd.key_value_metadata = [parquet_thrift.KeyValue(
key='fastparquet.cats', value='{"o": 2}')]
pf._set_attrs()
assert 'o' in pf.categories
assert pf.categories['o'] == 2
assert pf.dtypes['o'] == 'category'
out = pf.to_pandas()
assert out.dtypes['o'] == 'category'
out = pf.to_pandas(categories={})
assert str(out.dtypes['o']) != 'category'
out = pf.to_pandas(categories=['o'])
assert out.dtypes['o'] == 'category'
out = pf.to_pandas(categories={'o': 2})
assert out.dtypes['o'] == 'category'
with pytest.warns(UserWarning) as w:
pf.fmd.key_value_metadata = [parquet_thrift.KeyValue(
key='fastparquet.cats', value='{"o": 2}')]
pf._set_attrs()
assert 'o' in pf.categories
assert pf.categories['o'] == 2
assert pf.dtypes['o'] == 'category'
out = pf.to_pandas()
assert out.dtypes['o'] == 'category'
out = pf.to_pandas(categories={})
assert str(out.dtypes['o']) != 'category'
out = pf.to_pandas(categories=['o'])
assert out.dtypes['o'] == 'category'
out = pf.to_pandas(categories={'o': 2})
assert out.dtypes['o'] == 'category'
assert 'Regression warning' in str(w.list[0].message)


@pytest.mark.parametrize('row_groups', ([0], [0, 2]))
Expand Down
1 change: 1 addition & 0 deletions fastparquet/test/test_with_n.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
here = os.path.dirname(__file__)
count = 1000


def test_read_bitpacked():
results = np.empty(1000000, dtype=np.int32)
with open(os.path.join(here, 'bitpack')) as f:
Expand Down
36 changes: 24 additions & 12 deletions fastparquet/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ def byte_buffer(raw_bytes):
return buffer(raw_bytes) if PY2 else memoryview(raw_bytes)


def metadata_from_many(file_list, verify_schema=False, open_with=default_open):
def metadata_from_many(file_list, verify_schema=False, open_with=default_open,
root=False):
"""
Given list of parquet files, make a FileMetaData that points to them
Expand All @@ -148,6 +149,9 @@ def metadata_from_many(file_list, verify_schema=False, open_with=default_open):
Whether to assert that the schemas in each file are identical
open_with: function
Use this to open each path.
root: str
Top of the dataset's directory tree, for cases where it can't be
automatically inferred.
Returns
-------
Expand All @@ -163,7 +167,7 @@ def metadata_from_many(file_list, verify_schema=False, open_with=default_open):
pfs = [api.ParquetFile(fn, open_with=open_with) for fn in file_list]
else:
raise ValueError("Merge requires all PaquetFile instances or none")
basepath, file_list = analyse_paths(file_list, sep)
basepath, file_list = analyse_paths(file_list, sep, root=root)

if verify_schema:
for pf in pfs[1:]:
Expand Down Expand Up @@ -202,25 +206,33 @@ def ex_from_sep(sep):
return seps[sep]


def analyse_paths(file_list, sep=os.sep):
def analyse_paths(file_list, sep=os.sep, root=False):
"""Consolidate list of file-paths into acceptable parquet relative paths"""
path_parts_list = [fn.split(sep) for fn in file_list]
if len({len(path_parts) for path_parts in path_parts_list}) > 1:
raise ValueError('Mixed nesting in merge files')
basepath = path_parts_list[0][:-1]
s = ex_from_sep(sep)
out_list = []
for i, path_parts in enumerate(path_parts_list):
j = len(path_parts) - 1
for k, (base_part, path_part) in enumerate(zip(basepath, path_parts)):
if base_part != path_part:
j = k
break
basepath = basepath[:j]
if root is False:
basepath = path_parts_list[0][:-1]
for i, path_parts in enumerate(path_parts_list):
j = len(path_parts) - 1
for k, (base_part, path_part) in enumerate(zip(basepath, path_parts)):
if base_part != path_part:
j = k
break
basepath = basepath[:j]
l = len(basepath)

else:
basepath = root.split(sep)
l = len(basepath)
assert all(p[:l] == basepath for p in path_parts_list
), "All paths must begin with the given root"
l = len(basepath)
if len({tuple([p.split('=')[0] for p in parts[l:-1]])
for parts in path_parts_list}) > 1:
raise ValueError('Partitioning directories do not agree')
out_list = []
for path_parts in path_parts_list:
for path_part in path_parts[l:-1]:
if s.match(path_part) is None:
Expand Down
10 changes: 8 additions & 2 deletions fastparquet/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -940,7 +940,8 @@ def write_common_metadata(fn, fmd, open_with=default_open,
f.write(MARKER)


def merge(file_list, verify_schema=True, open_with=default_open):
def merge(file_list, verify_schema=True, open_with=default_open,
root=False):
"""
Create a logical data-set out of multiple parquet files.
Expand All @@ -958,13 +959,18 @@ def merge(file_list, verify_schema=True, open_with=default_open):
open_with: func
Used for opening a file for writing as f(path, mode). If input list
is ParquetFile instances, will be inferred from the first one of these.
root: str
If passing a list of files, the top directory of the data-set may
be ambiguous for partitioning where the upmost field has only one
value. Use this to specify the data'set root directory, if required.
Returns
-------
ParquetFile instance corresponding to the merged data.
"""
sep = sep_from_open(open_with)
basepath, fmd = metadata_from_many(file_list, verify_schema, open_with)
basepath, fmd = metadata_from_many(file_list, verify_schema, open_with,
root=root)

out_file = sep.join([basepath, '_metadata'])
write_common_metadata(out_file, fmd, open_with, no_row_groups=False)
Expand Down

0 comments on commit 65769ad

Please sign in to comment.