Skip to content

Commit

Permalink
Robust partition names (#336)
Browse files Browse the repository at this point in the history
* Add tests to demonstrate partition name problems

* Fix parsing of datetimelike partition_on names

* Add exceptions/warnings for bad partition names

* Update documentation on reading hive partitioned files.

* Fix for broken tests

* Add ‘now’ as special case in `val_to_num`

* Handle literals more deliberately
  • Loading branch information
Spacerat authored and martindurant committed May 17, 2018
1 parent 4b74130 commit fce1cfd
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 13 deletions.
37 changes: 32 additions & 5 deletions fastparquet/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import os
import six
import struct
import warnings

import numpy as np
from fastparquet.util import join_path
Expand All @@ -18,7 +19,7 @@
from . import core, schema, converted_types, encoding, dataframe
from .util import (default_open, ParquetException, val_to_num,
ensure_bytes, check_column_names, metadata_from_many,
ex_from_sep, get_file_scheme, STR_TYPE)
ex_from_sep, get_file_scheme, STR_TYPE, groupby_types)


class ParquetFile(object):
Expand All @@ -27,6 +28,12 @@ class ParquetFile(object):
Reads the metadata (row-groups and schema definition) and provides
methods to extract the data from the files.
Note that when reading parquet files partitioned using directories
(i.e. using the hive/drill scheme), an attempt is made to coerce
the partition values to a number, datetime or timedelta. Fastparquet
cannot read a hive/drill parquet file with partition names which coerce
to the same value, such as "0.7" and ".7".
Parameters
----------
fn: path/URL string or list of paths
Expand Down Expand Up @@ -62,7 +69,7 @@ class ParquetFile(object):
file_scheme: str
'simple': all row groups are within the same file; 'hive': all row
groups are in other files; 'mixed': row groups in this file and others
too; 'empty': no row grops at all.
too; 'empty': no row groups at all.
info: dict
Combination of some of the other attributes
key_value_metadata: list
Expand Down Expand Up @@ -172,19 +179,39 @@ def _read_partitions(self):
self.cats = {}
return
cats = OrderedDict()
raw_cats = OrderedDict()
for rg in self.row_groups:
for col in rg.columns:
s = ex_from_sep('/')
path = col.file_path or ""
if self.file_scheme == 'hive':
partitions = s.findall(path)
for key, val in partitions:
cats.setdefault(key, set()).add(val)
cats.setdefault(key, set()).add(val_to_num(val))
raw_cats.setdefault(key, set()).add(val)
else:
for i, val in enumerate(col.file_path.split('/')[:-1]):
key = 'dir%i' % i
cats.setdefault(key, set()).add(val)
self.cats = OrderedDict([(key, list([val_to_num(x) for x in v]))
cats.setdefault(key, set()).add(val_to_num(val))
raw_cats.setdefault(key, set()).add(val)

for key, v in cats.items():
# Check that no partition names map to the same value after transformation by val_to_num
raw = raw_cats[key]
if len(v) != len(raw):
conflicts_by_value = OrderedDict()
for raw_val in raw_cats[key]:
conflicts_by_value.setdefault(val_to_num(raw_val), set()).add(raw_val)
conflicts = [c for k in conflicts_by_value.values() if len(k) > 1 for c in k]
raise ValueError("Partition names map to the same value: %s" % conflicts)
vals_by_type = groupby_types(v)

# Check that all partition names map to the same type after transformation by val_to_num
if len(vals_by_type) > 1:
examples = [x[0] for x in vals_by_type.values()]
warnings.warn("Partition names coerce to values of different types, e.g. %s" % examples)

self.cats = OrderedDict([(key, list(v))
for key, v in cats.items()])

def row_group_filename(self, rg):
Expand Down
77 changes: 77 additions & 0 deletions fastparquet/test/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,62 @@ def test_numerical_partition_name(tempdir):
assert out[out.y1 == 'aa'].x.tolist() == [1, 5, 5]
assert out[out.y1 == 'bb'].x.tolist() == [2]

def test_floating_point_partition_name(tempdir):
df = pd.DataFrame({'x': [1e99, 5e-10, 2e+2, -0.1], 'y1': ['aa', 'aa', 'bb', 'aa']})
write(tempdir, df, file_scheme='hive', partition_on=['y1'])
pf = ParquetFile(tempdir)
out = pf.to_pandas()
assert out[out.y1 == 'aa'].x.tolist() == [1e99, 5e-10, -0.1]
assert out[out.y1 == 'bb'].x.tolist() == [200.0]

def test_datetime_partition_names(tempdir):
date_strings = ['2015-05-09', '2018-10-15', '2020-10-17', '2015-05-09']
df = pd.DataFrame({
'date': date_strings,
'x': [1, 5, 2, 5]
})
write(tempdir, df, file_scheme='hive', partition_on=['date'])
pf = ParquetFile(tempdir)
out = pf.to_pandas()
assert set(out.date.tolist()) == set(pd.to_datetime(date_strings).tolist())
assert out[out.date == '2015-05-09'].x.tolist() == [1, 5]
assert out[out.date == '2020-10-17'].x.tolist() == [2]


@pytest.mark.parametrize('partitions', [['2017-05-09', 'may 9 2017'], ['0.7', '.7']])
def test_datetime_partition_no_dupilcates(tempdir, partitions):
df = pd.DataFrame({
'partitions': partitions,
'x': [1, 2]
})
write(tempdir, df, file_scheme='hive', partition_on=['partitions'])
with pytest.raises(ValueError, match=r'Partition names map to the same value.*'):
ParquetFile(tempdir)


@pytest.mark.parametrize('categories', [['2017-05-09', 'may 9 2017'], ['0.7', '.7']])
def test_datetime_category_no_dupilcates(tempdir, categories):
# The purpose of this test is to ensure that the changes made for the previous test
# haven't broken categories in general.
df = pd.DataFrame({
'categories': categories,
'x': [1, 2]
}).astype({'categories': 'category'})
fn = os.path.join(tempdir, 'foo.parquet')
write(fn, df)
assert ParquetFile(fn).to_pandas().categories.tolist() == categories


@pytest.mark.parametrize('partitions', [['2017-01-05', '1421'], ['0.7', '10']])
def test_mixed_partition_types_warning(tempdir, partitions):
df = pd.DataFrame({
'partitions': partitions,
'x': [1, 2]
})
write(tempdir, df, file_scheme='hive', partition_on=['partitions'])
with pytest.warns(UserWarning, match=r'Partition names coerce to values of different types.*'):
ParquetFile(tempdir)


def test_filter_without_paths(tempdir):
fn = os.path.join(tempdir, 'test.parq')
Expand Down Expand Up @@ -275,6 +331,27 @@ def test_filter_special(tempdir):
assert out.symbol.tolist() == ['NOW', 'NOW', 'NOW']


def test_filter_dates(tempdir):
df = pd.DataFrame({
'x': [1, 2, 3, 4, 5, 6, 7],
'date': [
'2015-05-09', '2017-05-15', '2017-05-14',
'2017-05-13', '2015-05-10', '2015-05-11', '2017-05-12'
]
})
write(tempdir, df, file_scheme='hive', partition_on=['date'])
pf = ParquetFile(tempdir)
out_1 = pf.to_pandas(filters=[('date', '>', '2017-01-01')])

assert set(out_1.x.tolist()) == {2, 3, 4, 7}
expected_dates = set(pd.to_datetime(['2017-05-15', '2017-05-14', '2017-05-13', '2017-05-12']))
assert set(out_1.date.tolist()) == expected_dates

out_2 = pf.to_pandas(filters=[('date', '==', pd.to_datetime('may 9 2015'))])
assert out_2.x.tolist() == [1]
assert out_2.date.tolist() == pd.to_datetime(['2015-05-09']).tolist()


def test_in_filter(tempdir):
symbols = ['a', 'a', 'b', 'c', 'c', 'd']
values = [1, 2, 3, 4, 5, 6]
Expand Down
30 changes: 29 additions & 1 deletion fastparquet/test/test_util.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import os
import pandas as pd
import pytest

from fastparquet.util import analyse_paths, get_file_scheme, val_to_num, join_path
from fastparquet.util import analyse_paths, get_file_scheme, val_to_num, join_path, groupby_types


def test_analyse_paths():
Expand Down Expand Up @@ -80,3 +81,30 @@ def test_val_to_num():
assert val_to_num('07') == 7
assert val_to_num('0') == 0
assert val_to_num('00') == 0
assert val_to_num('-20') == -20
assert val_to_num(7) == 7
assert val_to_num(0.7) == 0.7
assert val_to_num(0) == 0
assert val_to_num('NOW') == 'NOW'
assert val_to_num('now') == 'now'
assert val_to_num('TODAY') == 'TODAY'
assert val_to_num('') == ''
assert val_to_num('2018-10-10') == pd.to_datetime('2018-10-10')
assert val_to_num('2018-10-09') == pd.to_datetime('2018-10-09')
assert val_to_num('2017-12') == pd.to_datetime('2017-12')
assert val_to_num('5e+6') == 5e6
assert val_to_num('5e-6') == 5e-6
assert val_to_num('0xabc') == '0xabc'
assert val_to_num('hello world') == 'hello world'
# The following tests document an idiosyncrasy of val_to_num which is difficult
# to avoid while timedeltas are supported.
assert val_to_num('50+20') == pd.to_timedelta('50+20')
assert val_to_num('50-20') == pd.to_timedelta('50-20')


def test_groupby_types():
assert len(groupby_types([1, 2, 3])) == 1
assert len(groupby_types(["1", "2", "3.0"])) == 1
assert len(groupby_types([1, 2, 3.0])) == 2
assert len(groupby_types([1, "2", "3.0"])) == 2
assert len(groupby_types([pd.to_datetime("2000"), "2000"])) == 2
29 changes: 22 additions & 7 deletions fastparquet/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import pandas as pd
import re
import six
import numbers
from collections import defaultdict

try:
from pandas.api.types import is_categorical_dtype
Expand All @@ -17,7 +19,6 @@
STR_TYPE = six.string_types[0] # 'str' for Python3, 'basestring' for Python2
created_by = "fastparquet-python version 1.0.0 (build 111)"


class ParquetException(Exception):
"""Generic Exception related to unexpected data format when
reading parquet file."""
Expand All @@ -38,25 +39,34 @@ def default_open(f, mode='rb'):


def val_to_num(x):
if x in ['NOW', 'TODAY']:
"""Parse a string as a number, date or timedelta if possible, otherwise return the string"""
if isinstance(x, numbers.Real):
return x
if x in ['now', 'NOW', 'TODAY', '']:
return x
if set(x) == {'0'}:
# special case for values like "000"
return 0
if x == "True":
return True
if x == "False":
return False
try:
return int(x, base=10)
except:
pass
try:
return ast.literal_eval(x.lstrip('0'))
return float(x)
except:
pass
try:
return pd.to_datetime(x)
except:
pass
try:
# TODO: determine the valid usecases for this, then try to limit the set of
# strings which may get inadvertently converted to timedeltas
return pd.to_timedelta(x)
except:
return x


if PY2:
def ensure_bytes(s):
return s.encode('utf-8') if isinstance(s, unicode) else s
Expand Down Expand Up @@ -198,6 +208,11 @@ def infer_dtype(column):
except AttributeError:
return pd.lib.infer_dtype(column)

def groupby_types(iterable):
groups = defaultdict(list)
for x in iterable:
groups[type(x)].append(x)
return groups

def get_column_metadata(column, name):
"""Produce pandas column metadata block"""
Expand Down

0 comments on commit fce1cfd

Please sign in to comment.