Skip to content

Commit

Permalink
Fixes 370 (#371)
Browse files Browse the repository at this point in the history
* Fixed error if a certain partition is empty, when loading a partioned parquet file

* fixes issue due to, if a column is DateTime,  GroupBy.indices returns key differently than GroupBy.__iter__.

* added test for when reading and filtering a partioned parquet file and then writing it again with some empty partitions

* sorted_partitioned_columns() now takes into account filters
  • Loading branch information
andrethrill authored and martindurant committed Sep 4, 2018
1 parent 845494f commit 4129ac2
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 1 deletion.
9 changes: 8 additions & 1 deletion fastparquet/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,7 @@ def statistics(obj):
return d


def sorted_partitioned_columns(pf):
def sorted_partitioned_columns(pf, filters=None):
"""
The columns that are known to be sorted partition-by-partition
Expand All @@ -703,6 +703,13 @@ def sorted_partitioned_columns(pf):
statistics
"""
s = statistics(pf)
if (filters is not None) & (filters != []):
idx_list = [i for i, rg in enumerate(pf.row_groups) if
not(filter_out_stats(rg, filters, pf.schema)) and
not(filter_out_cats(rg, filters))]
for stat in s.keys():
for col in s[stat].keys():
s[stat][col] = [s[stat][col][i] for i in idx_list]
columns = pf.columns
out = dict()
for c in columns:
Expand Down
30 changes: 30 additions & 0 deletions fastparquet/test/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,36 @@ def test_sorted_row_group_columns(tempdir):
assert result == expected


def test_sorted_row_group_columns_with_filters(tempdir):
dd = pytest.importorskip('dask.dataframe')
# create dummy dataframe
df = pd.DataFrame({'unique': [0, 0, 1, 1, 2, 2, 3, 3],
'id': ['id1', 'id2',
'id1', 'id2',
'id1', 'id2',
'id1', 'id2']},
index=[0, 0, 1, 1, 2, 2, 3, 3])
df = dd.from_pandas(df, npartitions=2)
fn = os.path.join(tempdir, 'foo.parquet')
df.to_parquet(fn,
engine='fastparquet',
partition_on=['id'])
# load ParquetFile
pf = ParquetFile(fn)
filters = [('id', '==', 'id1')]

# without filters no columns are sorted
result = sorted_partitioned_columns(pf)
expected = {}
assert result == expected

# with filters both columns are sorted
result = sorted_partitioned_columns(pf, filters=filters)
expected = {'index': {'min': [0, 2], 'max': [1, 3]},
'unique': {'min': [0, 2], 'max': [1, 3]}}
assert result == expected


def test_iter(tempdir):
df = pd.DataFrame({'x': [1, 2, 3, 4],
'y': [1.0, 2.0, 1.0, 2.0],
Expand Down

0 comments on commit 4129ac2

Please sign in to comment.