Skip to content

Commit

Permalink
Merge pull request #169 from martindurant/master
Browse files Browse the repository at this point in the history
Enable append to an empty dataset
  • Loading branch information
martindurant committed Jun 9, 2017
2 parents 2478345 + 3fd9b40 commit af5ed19
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 8 deletions.
38 changes: 35 additions & 3 deletions fastparquet/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,38 @@ class ParquetFile(object):
evaluated to a file open for reading. Defaults to the built-in `open`.
sep: string [`os.sep`]
Path separator to use, if data is in multiple files.
Attributes
----------
cats: dict
Columns derived from hive/drill directory information, with known
values for each column.
categories: list
Columns marked as categorical in the extra metadata (meaning the
data must have come from pandas).
columns: list of str
The data columns available
count: int
Total number of rows
dtypes: dict
Expected output types for each column
file_scheme: str
'simple': all row groups are within the same file; 'hive': all row
groups are in other files; 'mixed': row groups in this file and others
too; 'empty': no row grops at all.
info: dict
Combination of some of the other attributes
key_value_metadata: list
Additional information about this data's origin, e.g., pandas
description.
row_groups: list
Thrift objects for each row group
schema: schema.SchemaHelper
print this for a representation of the column structure
self_made: bool
If this file was created by fastparquet
statistics: dict
Max/min/count of each column chunk
"""
def __init__(self, fn, verify=False, open_with=default_open,
sep=os.sep):
Expand All @@ -64,7 +96,9 @@ def __init__(self, fn, verify=False, open_with=default_open,
self.fn = fn
with open_with(fn, 'rb') as f:
self._parse_header(f, verify)
if all(rg.columns[0].file_path is None for rg in self.row_groups):
if not self.row_groups:
self.file_scheme = 'empty'
elif all(rg.columns[0].file_path is None for rg in self.row_groups):
self.file_scheme = 'simple'
elif all(rg.columns[0].file_path is not None for rg in self.row_groups):
self.file_scheme = 'hive'
Expand Down Expand Up @@ -113,8 +147,6 @@ def _set_attrs(self):

@ property
def helper(self):
warnings.warn('The "helper" attribute of ParquetFile has been'
'renamed to "schema".')
return self.schema

@property
Expand Down
1 change: 1 addition & 0 deletions fastparquet/test/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ def test_read_multiple_no_metadata(tempdir):
df = pd.DataFrame({'x': [1, 5, 2, 5]})
write(tempdir, df, file_scheme='hive', row_group_offsets=[0, 2])
os.unlink(os.path.join(tempdir, '_metadata'))
os.unlink(os.path.join(tempdir, '_common_metadata'))
import glob
flist = list(sorted(glob.glob(os.path.join(tempdir, '*'))))
pf = ParquetFile(flist)
Expand Down
18 changes: 18 additions & 0 deletions fastparquet/test/test_output.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals

import datetime
import numpy as np
Expand Down Expand Up @@ -717,6 +718,23 @@ def test_append_simple(tempdir):
check_categorical=False)


@pytest.mark.parametrize('scheme', ('hive', 'simple'))
def test_append_empty(tempdir, scheme):
fn = os.path.join(str(tempdir), 'test.parq')
df = pd.DataFrame({'a': [1, 2, 3, 0],
'b': ['a', 'a', 'b', 'b']})
write(fn, df.head(0), write_index=False, file_scheme=scheme)
pf = ParquetFile(fn)
assert pf.count == 0
assert pf.file_scheme == 'empty'
write(fn, df, append=True, write_index=False, file_scheme=scheme)

pf = ParquetFile(fn)
pd.util.testing.assert_frame_equal(pf.to_pandas(), df,
check_categorical=False)



@pytest.mark.parametrize('row_groups', ([0], [0, 2]))
@pytest.mark.parametrize('partition', ([], ['b']))
def test_append(tempdir, row_groups, partition):
Expand Down
4 changes: 3 additions & 1 deletion fastparquet/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,9 @@ def metadata_from_many(file_list, verify_schema=False, open_with=default_open):
fmd.row_groups = []

for pf, fn in zip(pfs, file_list):
if pf.file_scheme != 'simple':
if pf.file_scheme not in ['simple', 'empty']:
# should remove 'empty' datasets up front? Get ignored on load
# anyway.
raise ValueError('Cannot merge multi-file input', fn)
for rg in pf.row_groups:
rg = thrift_copy(rg)
Expand Down
8 changes: 4 additions & 4 deletions fastparquet/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,7 @@ def write_simple(fn, data, fmd, row_group_offsets, compression,
"""
if append:
pf = api.ParquetFile(fn, open_with=open_with)
if pf.file_scheme != 'simple':
if pf.file_scheme not in ['simple', 'empty']:
raise ValueError('File scheme requested is simple, but '
'existing file scheme is not')
fmd = pf.fmd
Expand Down Expand Up @@ -811,9 +811,9 @@ def write(filename, data, row_group_offsets=50000000,
elif file_scheme in ['hive', 'drill']:
if append:
pf = api.ParquetFile(filename, open_with=open_with)
if pf.file_scheme != 'hive':
raise ValueError('Requested file scheme is hive, '
'but existing file scheme is not.')
if pf.file_scheme not in ['hive', 'empty']:
raise ValueError('Requested file scheme is %s, but '
'existing file scheme is not.' % file_scheme)
fmd = pf.fmd
i_offset = find_max_part(fmd.row_groups)
partition_on = list(pf.cats)
Expand Down

0 comments on commit af5ed19

Please sign in to comment.