Skip to content

Commit

Permalink
Merge pull request #43 from martindurant/append_merge
Browse files Browse the repository at this point in the history
Append and merge
  • Loading branch information
martindurant committed Dec 12, 2016
2 parents 03f723d + 6d68dff commit 9179be6
Show file tree
Hide file tree
Showing 7 changed files with 481 additions and 87 deletions.
9 changes: 7 additions & 2 deletions fastparquet/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@

from .core import read_thrift
from .thrift_structures import parquet_thrift
from .writer import write
from . import core, schema, converted_types, encoding
from . import core, schema, converted_types, encoding, writer
from .util import (default_open, ParquetException, sep_from_open, val_to_num,
ensure_bytes)

Expand Down Expand Up @@ -52,6 +51,12 @@ def __init__(self, fn, verify=False, open_with=default_open,
self.fn = fn
with open_with(fn, 'rb') as f:
self._parse_header(f, verify)
if all(rg.columns[0].file_path is None for rg in self.row_groups):
self.file_scheme = 'simple'
elif all(rg.columns[0].file_path is not None for rg in self.row_groups):
self.file_scheme = 'hive'
else:
self.file_scheme = 'mixed'
self.open = open_with
self.sep = sep
self._read_partitions()
Expand Down
17 changes: 17 additions & 0 deletions fastparquet/test/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,20 @@ def test_iter(tempdir):
pd.util.testing.assert_frame_equal(d2, df[2:])
with pytest.raises(StopIteration):
next(out)


def test_attributes(tempdir):
df = pd.DataFrame({'x': [1, 2, 3, 4],
'y': [1.0, 2.0, 1.0, 2.0],
'z': ['a', 'b', 'c', 'd']})

fn = os.path.join(tempdir, 'foo.parquet')
write(fn, df, row_group_offsets=[0, 2])
pf = ParquetFile(fn)
assert pf.columns == ['x', 'y', 'z']
assert len(pf.row_groups) == 2
assert pf.count == 4
assert fn == pf.info['name']
assert fn in str(pf)
for col in df:
assert pf.dtypes[col] == df.dtypes[col]
19 changes: 19 additions & 0 deletions fastparquet/test/test_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from fastparquet.util import thrift_copy
from fastparquet import parquet_thrift


def test_thrift_copy():
fmd = parquet_thrift.FileMetaData()
rg0 = parquet_thrift.RowGroup()
rg0.num_rows = 5
rg1 = parquet_thrift.RowGroup()
rg1.num_rows = 15
fmd.row_groups = [rg0, rg1]

fmd2 = thrift_copy(fmd)

assert fmd is not fmd2
assert fmd == fmd2
assert fmd2.row_groups[0] is not rg0
rg0.num_rows = 25
assert fmd2.row_groups[0].num_rows == 5
166 changes: 165 additions & 1 deletion fastparquet/test/test_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def test_roundtrip_s3(s3):
data.loc[100, 'f'] = np.nan
data['cat'] = data.hello.astype('category')
noop = lambda x: True
myopen = lambda f: s3.open(f, 'wb')
myopen = s3.open
write(TEST_DATA+'/temp_parq', data, file_scheme='hive',
row_group_offsets=[0, 500], open_with=myopen, mkdirs=noop)
myopen = s3.open
Expand Down Expand Up @@ -506,3 +506,167 @@ def test_many_categories(tempdir, n):
out = pf.to_pandas(categories=['x'])

tm.assert_frame_equal(df, out)


@pytest.mark.parametrize('row_groups', ([0], [0, 2]))
@pytest.mark.parametrize('dirs', (['', ''], ['cat=1', 'cat=2']))
def test_merge(tempdir, dirs, row_groups):
fn = str(tempdir)

os.makedirs(os.path.join(fn, dirs[0]), exist_ok=True)
df0 = pd.DataFrame({'a': [1, 2, 3, 4]})
fn0 = os.sep.join([fn, dirs[0], 'out0.parq'])
write(fn0, df0, row_group_offsets=row_groups)

os.makedirs(os.path.join(fn, dirs[1]), exist_ok=True)
df1 = pd.DataFrame({'a': [5, 6, 7, 8]})
fn1 = os.sep.join([fn, dirs[1], 'out1.parq'])
write(fn1, df1, row_group_offsets=row_groups)

# with file-names
pf = writer.merge([fn0, fn1])
assert len(pf.row_groups) == 2 * len(row_groups)
out = pf.to_pandas().a.tolist()
assert out == [1, 2, 3, 4, 5, 6, 7, 8]
if "cat=1" in dirs:
assert 'cat' in pf.cats

# with instances
pf = writer.merge([ParquetFile(fn0), ParquetFile(fn1)])
assert len(pf.row_groups) == 2 * len(row_groups)
out = pf.to_pandas().a.tolist()
assert out == [1, 2, 3, 4, 5, 6, 7, 8]
if "cat=1" in dirs:
assert 'cat' in pf.cats


def test_merge_s3(tempdir, s3):
fn = str(tempdir)

df0 = pd.DataFrame({'a': [1, 2, 3, 4]})
fn0 = TEST_DATA + '/out0.parq'
write(fn0, df0, open_with=s3.open)

df1 = pd.DataFrame({'a': [5, 6, 7, 8]})
fn1 = TEST_DATA + '/out1.parq'
write(fn1, df1, open_with=s3.open)

# with file-names
pf = writer.merge([fn0, fn1], open_with=s3.open)
assert len(pf.row_groups) == 2
out = pf.to_pandas().a.tolist()
assert out == [1, 2, 3, 4, 5, 6, 7, 8]


def test_merge_fail(tempdir):
fn = str(tempdir)

df0 = pd.DataFrame({'a': [1, 2, 3, 4]})
fn0 = os.sep.join([fn, 'out0.parq'])
write(fn0, df0)

df1 = pd.DataFrame({'a': ['a', 'b', 'c']})
fn1 = os.sep.join([fn, 'out1.parq'])
write(fn1, df1)

with pytest.raises(ValueError) as e:
writer.merge([fn0, fn1])
assert 'schemas' in str(e)

os.remove(fn1)
write(fn1, df0, file_scheme='hive')
with pytest.raises(ValueError) as e:
writer.merge([fn0, fn1])
assert 'multi-file' in str(e)


def test_analyse_paths():
file_list = ['a', 'b']
base, out = writer.analyse_paths(file_list, '/')
assert (base, out) == ('', ['a', 'b'])

file_list = ['c/a', 'c/b']
base, out = writer.analyse_paths(file_list, '/')
assert (base, out) == ('c', ['a', 'b'])

file_list = ['c/d/a', 'c/d/b']
base, out = writer.analyse_paths(file_list, '/')
assert (base, out) == ('c/d', ['a', 'b'])

file_list = ['c/cat=1/a', 'c/cat=2/b', 'c/cat=1/c']
base, out = writer.analyse_paths(file_list, '/')
assert (base, out) == ('c', ['cat=1/a', 'cat=2/b', 'cat=1/c'])

file_list = ['c/cat=2/b', 'c/cat/a', 'c/cat=1/c']
with pytest.raises(ValueError) as e:
writer.analyse_paths(file_list, '/')
assert 'c/cat/a' in str(e)

file_list = ['c/cat=2/b', 'c/fred=2/a', 'c/cat=1/c']
with pytest.raises(ValueError) as e:
writer.analyse_paths(file_list, '/')
assert 'directories' in str(e)

file_list = ['c/cat=2/b', 'c/a', 'c/cat=1/c']
with pytest.raises(ValueError) as e:
writer.analyse_paths(file_list, '/')
assert 'nesting' in str(e)


def test_append_simple(tempdir):
fn = os.path.join(str(tempdir), 'test.parq')
df = pd.DataFrame({'a': [1, 2, 3, 0],
'b': ['a', 'a', 'b', 'b']})
write(fn, df, write_index=False)
write(fn, df, append=True, write_index=False)

pf = ParquetFile(fn)
expected = pd.concat([df, df], ignore_index=True)
pd.util.testing.assert_frame_equal(pf.to_pandas(), expected)


@pytest.mark.parametrize('row_groups', ([0], [0, 2]))
@pytest.mark.parametrize('partition', ([], ['b']))
def test_append(tempdir, row_groups, partition):
fn = str(tempdir)
df0 = pd.DataFrame({'a': [1, 2, 3, 0],
'b': ['a', 'b', 'a', 'b'],
'c': True})
df1 = pd.DataFrame({'a': [4, 5, 6, 7],
'b': ['a', 'b', 'a', 'b'],
'c': False})
write(fn, df0, partition_on=partition, file_scheme='hive',
row_group_offsets=row_groups)
write(fn, df1, partition_on=partition, file_scheme='hive',
row_group_offsets=row_groups, append=True)

pf = ParquetFile(fn)

expected = pd.concat([df0, df1], ignore_index=True)

assert len(pf.row_groups) == 2 * len(row_groups) * (len(partition) + 1)
items_out = {tuple(row[1])
for row in pf.to_pandas()[['a', 'b', 'c']].iterrows()}
items_in = {tuple(row[1])
for row in expected.iterrows()}
assert items_in == items_out


def test_append_fail(tempdir):
fn = str(tempdir)
df0 = pd.DataFrame({'a': [1, 2, 3, 0],
'b': ['a', 'b', 'a', 'b'],
'c': True})
df1 = pd.DataFrame({'a': [4, 5, 6, 7],
'b': ['a', 'b', 'a', 'b'],
'c': False})
write(fn, df0, file_scheme='hive')
with pytest.raises(ValueError) as e:
write(fn, df1, file_scheme='simple', append=True)
assert 'existing file scheme' in str(e)

fn2 = os.path.join(fn, 'temp.parq')
write(fn2, df0, file_scheme='simple')
with pytest.raises(ValueError) as e:
write(fn2, df1, file_scheme='hive', append=True)
assert 'existing file scheme' in str(e)
59 changes: 52 additions & 7 deletions fastparquet/util.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import ast
import os
import shutil
import tempfile

import pandas as pd
import pytest
import tempfile
import thriftpy


class ParquetException(Exception):
Expand All @@ -14,16 +14,12 @@ class ParquetException(Exception):


def sep_from_open(opener):
if opener in [default_open, default_openw]:
if opener is default_open:
return os.sep
else:
return '/'


def default_openw(f):
return open(f, 'wb')


def default_mkdirs(f):
os.makedirs(f, exist_ok=True)

Expand Down Expand Up @@ -61,3 +57,52 @@ def ensure_bytes(s):
return s.encode()
else:
return s


def thrift_print(structure, offset=0):
"""
Handy recursive text ouput for thrift structures
"""
if not isinstance(structure, thriftpy.thrift.TPayload):
return str(structure)
s = str(structure.__class__) + '\n'
for key in dir(structure):
if key.startswith('_') or key in ['thrift_spec', 'read', 'write',
'default_spec']:
continue
s = s + ' ' * offset + key + ': ' + thrift_print(getattr(structure, key)
, offset+2) + '\n'
return s
thriftpy.thrift.TPayload.__str__ = thrift_print
thriftpy.thrift.TPayload.__repr__ = thrift_print


def thrift_copy(structure):
"""
Recursively copy a thriftpy structure
"""
base = structure.__class__()
for key in dir(structure):
if key.startswith('_') or key in ['thrift_spec', 'read', 'write',
'default_spec']:
continue
val = getattr(structure, key)
if isinstance(val, list):
setattr(base, key, [thrift_copy(item)
if isinstance(item, thriftpy.thrift.TPayload)
else item for item in val])
elif isinstance(val, thriftpy.thrift.TPayload):
setattr(base, key, thrift_copy(val))
else:
setattr(base, key, val)
return base


def index_like(index):
"""
Does index look like a default range index?
"""
return not (isinstance(index, pd.RangeIndex) and
index._start == 0 and
index._stop == len(index) and
index._step == 1 and index.name is None)

0 comments on commit 9179be6

Please sign in to comment.