Skip to content

Commit

Permalink
Merge pull request #209 from martindurant/thorough_partition_reading
Browse files Browse the repository at this point in the history
Refactor partition interpretation
  • Loading branch information
martindurant committed Sep 7, 2017
2 parents a1f88f4 + 1c15849 commit 9ddfc62
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 48 deletions.
27 changes: 13 additions & 14 deletions fastparquet/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from . import core, schema, converted_types, encoding, dataframe
from .util import (default_open, ParquetException, val_to_num,
ensure_bytes, check_column_names, metadata_from_many,
ex_from_sep)
ex_from_sep, get_file_scheme)


class ParquetFile(object):
Expand Down Expand Up @@ -97,14 +97,6 @@ def __init__(self, fn, verify=False, open_with=default_open,
self.fn = fn
with open_with(fn, 'rb') as f:
self._parse_header(f, verify)
if not self.row_groups:
self.file_scheme = 'empty'
elif all(rg.columns[0].file_path is None for rg in self.row_groups):
self.file_scheme = 'simple'
elif all(rg.columns[0].file_path is not None for rg in self.row_groups):
self.file_scheme = 'hive'
else:
self.file_scheme = 'mixed'
self.open = open_with

def _parse_header(self, f, verify=True):
Expand Down Expand Up @@ -143,6 +135,8 @@ def _set_attrs(self):
self.group_files.setdefault(i, set()).add(chunk.file_path)
self.schema = schema.SchemaHelper(self._schema)
self.selfmade = self.created_by.split(' ', 1)[0] == "fastparquet-python"
self.file_scheme = get_file_scheme([rg.columns[0].file_path
for rg in self.row_groups], self.sep)
self._read_partitions()
self._dtypes()

Expand All @@ -163,15 +157,19 @@ def statistics(self):
return statistics(self)

def _read_partitions(self):
if self.file_scheme in ['simple', 'flat', 'other']:
self.cats = {}
return
cats = {}
for rg in self.row_groups:
for col in rg.columns:
s = ex_from_sep(self.sep)
partitions = s.findall(col.file_path or "")
if partitions:
path = col.file_path or ""
if self.file_scheme == 'hive':
partitions = s.findall(path)
for key, val in partitions:
cats.setdefault(key, set()).add(val)
elif self.sep in (col.file_path or ""):
else:
for i, val in enumerate(col.file_path.split(self.sep)[:-1]):
key = 'dir%i' % i
cats.setdefault(key, set()).add(val)
Expand Down Expand Up @@ -199,7 +197,7 @@ def read_row_group_file(self, rg, columns, categories, index=None,
core.read_row_group_file(
fn, rg, columns, categories, self.schema, self.cats,
open=self.open, selfmade=self.selfmade, index=index,
assign=assign)
assign=assign, scheme=self.file_scheme)
if ret:
return df

Expand All @@ -217,7 +215,8 @@ def read_row_group(self, rg, columns, categories, infile=None,
ret = True
core.read_row_group(
infile, rg, columns, categories, self.schema, self.cats,
self.selfmade, index=index, assign=assign, sep=self.sep)
self.selfmade, index=index, assign=assign, sep=self.sep,
scheme=self.file_scheme)
if ret:
return df

Expand Down
13 changes: 7 additions & 6 deletions fastparquet/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,11 +277,11 @@ def read_col(column, schema_helper, infile, use_cat=False,

def read_row_group_file(fn, rg, columns, categories, schema_helper, cats,
open=open, selfmade=False, index=None, assign=None,
sep=os.sep):
sep=os.sep, scheme='hive'):
with open(fn, mode='rb') as f:
return read_row_group(f, rg, columns, categories, schema_helper, cats,
selfmade=selfmade, index=index, assign=assign,
sep=sep)
sep=sep, scheme=scheme)


def read_row_group_arrays(file, rg, columns, categories, schema_helper, cats,
Expand Down Expand Up @@ -324,7 +324,7 @@ def read_row_group_arrays(file, rg, columns, categories, schema_helper, cats,

def read_row_group(file, rg, columns, categories, schema_helper, cats,
selfmade=False, index=None, assign=None,
sep=os.sep):
sep=os.sep, scheme='hive'):
"""
Access row-group in a file and read some columns into a data-frame.
"""
Expand All @@ -334,9 +334,10 @@ def read_row_group(file, rg, columns, categories, schema_helper, cats,
cats, selfmade, assign=assign)

for cat in cats:
s = ex_from_sep(sep)
partitions = s.findall(rg.columns[0].file_path)
if not partitions and sep in (rg.columns[0].file_path or ""):
if scheme == 'hive':
s = ex_from_sep(sep)
partitions = s.findall(rg.columns[0].file_path)
else:
partitions = [('dir%i' % i, v) for (i, v) in enumerate(
rg.columns[0].file_path.split(sep)[:-1])]
val = val_to_num([p[1] for p in partitions if p[0] == cat][0])
Expand Down
52 changes: 52 additions & 0 deletions fastparquet/test/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,3 +307,55 @@ def test_no_index_name(tempdir):
out = pf.to_pandas()
assert out.index.name is None
assert out.index.tolist() == [0, 1, 2]


def test_drill_list(tempdir):
df = pd.DataFrame({'a': ['x', 'y', 'z'], 'b': [4, 5, 6]})
dir1 = os.path.join(tempdir, 'x')
fn1 = os.path.join(dir1, 'part.0.parquet')
os.makedirs(dir1)
write(fn1, df)
dir2 = os.path.join(tempdir, 'y')
fn2 = os.path.join(dir2, 'part.0.parquet')
os.makedirs(dir2)
write(fn2, df)

pf = ParquetFile([fn1, fn2])
out = pf.to_pandas()
assert out.a.tolist() == ['x', 'y', 'z'] * 2
assert out.dir0.tolist() == ['x'] * 3 + ['y'] * 3


def test_hive_and_drill_list(tempdir):
df = pd.DataFrame({'a': ['x', 'y', 'z'], 'b': [4, 5, 6]})
dir1 = os.path.join(tempdir, 'x=0')
fn1 = os.path.join(dir1, 'part.0.parquet')
os.makedirs(dir1)
write(fn1, df)
dir2 = os.path.join(tempdir, 'y')
fn2 = os.path.join(dir2, 'part.0.parquet')
os.makedirs(dir2)
write(fn2, df)

pf = ParquetFile([fn1, fn2])
out = pf.to_pandas()
assert out.a.tolist() == ['x', 'y', 'z'] * 2
assert out.dir0.tolist() == ['x=0'] * 3 + ['y'] * 3


def test_bad_file_paths(tempdir):
df = pd.DataFrame({'a': ['x', 'y', 'z'], 'b': [4, 5, 6]})
dir1 = os.path.join(tempdir, 'x=0')
fn1 = os.path.join(dir1, 'part.=.parquet')
os.makedirs(dir1)
write(fn1, df)
dir2 = os.path.join(tempdir, 'y/z')
fn2 = os.path.join(dir2, 'part.0.parquet')
os.makedirs(dir2)
write(fn2, df)

pf = ParquetFile([fn1, fn2])
assert pf.file_scheme == 'other'
out = pf.to_pandas()
assert out.a.tolist() == ['x', 'y', 'z'] * 2
assert 'dir0' not in out
38 changes: 22 additions & 16 deletions fastparquet/test/test_util.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pytest

from fastparquet.util import thrift_copy, analyse_paths
from fastparquet.util import thrift_copy, analyse_paths, get_file_scheme
from fastparquet import parquet_thrift


Expand All @@ -25,21 +25,6 @@ def test_analyse_paths():
base, out = analyse_paths(file_list, '\\')
assert (base, out) == ('c', ['cat=1\\a', 'cat=2\\b', 'cat=1\\c'])

file_list = ['c/cat=2/b', 'c/cat/a', 'c/cat=1/c']
with pytest.raises(ValueError) as e:
analyse_paths(file_list, '/')
assert 'c/cat/a' in str(e)

file_list = ['c/cat=2/b', 'c/fred=2/a', 'c/cat=1/c']
with pytest.raises(ValueError) as e:
analyse_paths(file_list, '/')
assert 'directories' in str(e)

file_list = ['c/cat=2/b', 'c/a', 'c/cat=1/c']
with pytest.raises(ValueError) as e:
analyse_paths(file_list, '/')
assert 'nesting' in str(e)


def test_thrift_copy():
fmd = parquet_thrift.FileMetaData()
Expand All @@ -56,3 +41,24 @@ def test_thrift_copy():
assert fmd2.row_groups[0] is not rg0
rg0.num_rows = 25
assert fmd2.row_groups[0].num_rows == 5


def test_file_scheme():
paths = [None, None]
assert get_file_scheme(paths) == 'simple'
paths = []
assert get_file_scheme(paths) == 'empty' # this is pointless
paths = ['file']
assert get_file_scheme(paths) == 'flat'
paths = ['file', 'file']
assert get_file_scheme(paths) == 'flat'
paths = ['a=1/b=2/file', 'a=2/b=1/file']
assert get_file_scheme(paths) == 'hive'
paths = ['a=1/z=2/file', 'a=2/b=6/file'] # note key names do not match
assert get_file_scheme(paths) == 'drill'
paths = ['a=1/b=2/file', 'a=2/b/file']
assert get_file_scheme(paths) == 'drill'
paths = ['a/b/c/file', 'a/b/file']
assert get_file_scheme(paths) == 'other'


58 changes: 47 additions & 11 deletions fastparquet/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,16 +173,14 @@ def ex_from_sep(sep):


def analyse_paths(file_list, sep=os.sep, root=False):
"""Consolidate list of file-paths into acceptable parquet relative paths"""
"""Consolidate list of file-paths into parquet relative paths"""
path_parts_list = [fn.split(sep) for fn in file_list]
if len({len(path_parts) for path_parts in path_parts_list}) > 1:
raise ValueError('Mixed nesting in merge files')
s = ex_from_sep(sep)
if root is False:
basepath = path_parts_list[0][:-1]
for i, path_parts in enumerate(path_parts_list):
j = len(path_parts) - 1
for k, (base_part, path_part) in enumerate(zip(basepath, path_parts)):
for k, (base_part, path_part) in enumerate(
zip(basepath, path_parts)):
if base_part != path_part:
j = k
break
Expand All @@ -195,14 +193,8 @@ def analyse_paths(file_list, sep=os.sep, root=False):
assert all(p[:l] == basepath for p in path_parts_list
), "All paths must begin with the given root"
l = len(basepath)
if len({tuple([p.split('=')[0] for p in parts[l:-1]])
for parts in path_parts_list}) > 1:
raise ValueError('Partitioning directories do not agree')
out_list = []
for path_parts in path_parts_list:
for path_part in path_parts[l:-1]:
if s.match(path_part) is None:
raise ValueError('Malformed paths set at', sep.join(path_parts))
out_list.append(sep.join(path_parts[l:]))

return sep.join(basepath), out_list
Expand Down Expand Up @@ -261,3 +253,47 @@ def get_numpy_type(dtype):
return 'category'
else:
return str(dtype)


def get_file_scheme(paths, sep='/'):
"""For the given row groups, figure out if the partitioning scheme
Parameters
----------
paths: list of str
normally from row_group.columns[0].file_path
sep: str
path separator such as '/'
Returns
-------
'empty': no rgs at all
'simple': all rgs in a single file
'flat': multiple files in one directory
'hive': directories are all `key=value`; all files are at the same
directory depth
'drill': assume directory names are labels, and field names are of the
form dir0, dir1; all files are at the same directory depth
'other': none of the above, assume no partitioning
"""
if not paths:
return 'empty'
if set(paths) == {None}:
return 'simple'
if None in paths:
return 'other'
parts = [p.split(sep) for p in paths]
lens = [len(p) for p in parts]
if len(set(lens)) > 1:
return 'other'
if set(lens) == {1}:
return 'flat'
s = ex_from_sep(sep)
dirs = [p.rsplit(sep, 1)[0] for p in paths]
matches = [s.findall(d) for d in dirs]
if all(len(m) == (l - 1) for (m, l) in
zip(matches, lens)):
keys = (tuple(m[0] for m in parts) for parts in matches)
if len(set(keys)) == 1:
return 'hive'
return 'drill'
2 changes: 1 addition & 1 deletion fastparquet/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -803,7 +803,7 @@ def write(filename, data, row_group_offsets=50000000,
elif file_scheme in ['hive', 'drill']:
if append:
pf = api.ParquetFile(filename, open_with=open_with)
if pf.file_scheme not in ['hive', 'empty']:
if pf.file_scheme not in ['hive', 'empty', 'flat']:
raise ValueError('Requested file scheme is %s, but '
'existing file scheme is not.' % file_scheme)
fmd = pf.fmd
Expand Down

0 comments on commit 9ddfc62

Please sign in to comment.