Skip to content

Commit

Permalink
Merge pull request #231 from martindurant/append_and_partition
Browse files Browse the repository at this point in the history
More care over append when partitioning multiple columns
  • Loading branch information
martindurant committed Oct 6, 2017
2 parents a60c2dc + bef5623 commit 9b94fcb
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 3 deletions.
4 changes: 2 additions & 2 deletions fastparquet/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ def _read_partitions(self):
if self.file_scheme in ['simple', 'flat', 'other']:
self.cats = {}
return
cats = {}
cats = OrderedDict()
for rg in self.row_groups:
for col in rg.columns:
s = ex_from_sep(self.sep)
Expand All @@ -177,7 +177,7 @@ def _read_partitions(self):
key = 'dir%i' % i
cats.setdefault(key, set()).add(val)
self.cats = OrderedDict([(key, list([val_to_num(x) for x in v]))
for key, v in cats.items()])
for key, v in cats.items()])

def row_group_filename(self, rg):
if rg.columns[0].file_path:
Expand Down
19 changes: 19 additions & 0 deletions fastparquet/test/test_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -778,6 +778,25 @@ def test_append_fail(tempdir):
assert 'existing file scheme' in str(e)


def test_append_w_partitioning(tempdir):
fn = str(tempdir)
df = pd.DataFrame({'a': np.random.choice([1, 2, 3], size=50),
'b': np.random.choice(['hello', 'world'], size=50),
'c': np.random.randint(50, size=50)})
write(fn, df, file_scheme='hive', partition_on=['a', 'b'])
write(fn, df, file_scheme='hive', partition_on=['a', 'b'], append=True)
write(fn, df, file_scheme='hive', partition_on=['a', 'b'], append=True)
write(fn, df, file_scheme='hive', partition_on=['a', 'b'], append=True)
pf = ParquetFile(fn)
out = pf.to_pandas()
assert len(out) == 200
assert sorted(out.a)[::4] == sorted(df.a)
with pytest.raises(ValueError):
write(fn, df, file_scheme='hive', partition_on=['a'], append=True)
with pytest.raises(ValueError):
write(fn, df, file_scheme='hive', partition_on=['b', 'a'], append=True)


def test_bad_object_encoding(tempdir):
df = pd.DataFrame({'x': ['a', 'ab']})
with pytest.raises(ValueError) as e:
Expand Down
4 changes: 3 additions & 1 deletion fastparquet/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -808,7 +808,9 @@ def write(filename, data, row_group_offsets=50000000,
'existing file scheme is not.' % file_scheme)
fmd = pf.fmd
i_offset = find_max_part(fmd.row_groups)
partition_on = list(pf.cats)
if tuple(partition_on) != tuple(pf.cats):
raise ValueError('When appending, partitioning columns must'
' match existing data')
else:
i_offset = 0
fn = sep.join([filename, '_metadata'])
Expand Down

0 comments on commit 9b94fcb

Please sign in to comment.