Skip to content

Commit

Permalink
Merge pull request #155 from martindurant/fix_NOW
Browse files Browse the repository at this point in the history
Very special case for partition: NOW should be kept as string
  • Loading branch information
martindurant committed May 30, 2017
2 parents ee2fe1f + a8c8bd7 commit 24753fa
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 7 deletions.
12 changes: 8 additions & 4 deletions fastparquet/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -496,12 +496,12 @@ def filter_out_stats(rg, filters, schema):
if s.max is not None:
b = ensure_bytes(s.max)
vmax = encoding.read_plain(b, column.meta_data.type, 1)
if se.converted_type:
if se.converted_type is not None:
vmax = converted_types.convert(vmax, se)
if s.min is not None:
b = ensure_bytes(s.min)
vmin = encoding.read_plain(b, column.meta_data.type, 1)
if se.converted_type:
if se.converted_type is not None:
vmin = converted_types.convert(vmin, se)
out = filter_val(op, val, vmin, vmax)
if out is True:
Expand Down Expand Up @@ -632,12 +632,16 @@ def filter_out_cats(rg, filters, sep='/'):
return False
s = ex_from_sep(sep)
partitions = s.findall(rg.columns[0].file_path)
pairs = [(p[0], val_to_num(p[1])) for p in partitions]
pairs = [(p[0], p[1]) for p in partitions]
for cat, v in pairs:

app_filters = [f[1:] for f in filters if f[0] == cat]
for op, val in app_filters:
out = filter_val(op, val, v, v)
if isinstance(val, str):
v0 = v
else:
v0 = val_to_num(v)
out = filter_val(op, val, v0, v0)
if out is True:
return True
return False
Expand Down
12 changes: 12 additions & 0 deletions fastparquet/test/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,3 +194,15 @@ def test_filter_without_paths(tempdir):
pd.util.testing.assert_frame_equal(out, df)
out = pf.to_pandas(filters=[['x', '>', 30]])
assert len(out) == 0


def test_filter_special(tempdir):
df = pd.DataFrame({
'x': [1, 2, 3, 4, 5, 6, 7],
'symbol': ['NOW', 'OI', 'OI', 'OI', 'NOW', 'NOW', 'OI']
})
write(tempdir, df, file_scheme='hive', partition_on=['symbol'])
pf = ParquetFile(tempdir)
out = pf.to_pandas(filters=[('symbol', '==', 'NOW')])
assert out.x.tolist() == [1, 5, 6]
assert out.symbol.tolist() == ['NOW', 'NOW', 'NOW']
86 changes: 86 additions & 0 deletions fastparquet/test/test_partition_filters_specialstrings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@

import os
import shutil
import pytest
import numpy as np
import pandas as pd
from pandas.tslib import Timestamp
from fastparquet.util import tempdir
from fastparquet import write, ParquetFile
import datetime as dt
import string

def frame_symbol_dtTrade_type_strike(days=1 * 252,
start_date=dt.datetime(2005, 1, 1, hour=0, minute=0, second=0),
symbols=['SPY', 'FB', 'TLT'],
numbercolumns=1):
base = start_date
date_list = [base + dt.timedelta(days=x) for x in range(0, days)]
tuple_list = []
for x in symbols:
for y in date_list:
tuple_list.append((x, y.year, y))
index = pd.MultiIndex.from_tuples(tuple_list, names=('symbol', 'year', 'dtTrade'))
np.random.seed(seed=0)
df = pd.DataFrame(np.random.randn(index.size, numbercolumns),
index=index, columns=[x for x in string.ascii_uppercase[0:numbercolumns]])
return df

@pytest.mark.parametrize('tempdir,input_symbols,input_days,file_scheme,input_columns,partitions,filters',
[
(tempdir, ['NOW', 'SPY', 'VIX'], 2*252, 'hive', 2, ['symbol', 'year'], [('symbol', '==', 'SPY')]),
(tempdir, ['now', 'SPY', 'VIX'], 2*252, 'hive', 2, ['symbol', 'year'], [('symbol', '==', 'SPY')]),
(tempdir, ['TODAY', 'SPY', 'VIX'], 2*252, 'hive', 2, ['symbol', 'year'], [('symbol', '==', 'SPY')]),
(tempdir, ['VIX*', 'SPY', 'VIX'], 2*252, 'hive', 2, ['symbol', 'year'], [('symbol', '==', 'SPY')]),
(tempdir, ['QQQ*', 'SPY', 'VIX'], 2*252, 'hive', 2, ['symbol', 'year'], [('symbol', '==', 'SPY')]),
(tempdir, ['QQQ!', 'SPY', 'VIX'], 2*252, 'hive', 2, ['symbol', 'year'], [('symbol', '==', 'SPY')]),
(tempdir, ['Q%QQ', 'SPY', 'VIX'], 2*252, 'hive', 2, ['symbol', 'year'], [('symbol', '==', 'SPY')]),
(tempdir, ['NOW', 'SPY', 'VIX'], 10, 'hive', 2, ['symbol', 'dtTrade'], [('symbol', '==', 'SPY')]),
(tempdir, ['NOW', 'SPY', 'VIX'], 10, 'hive', 2, ['symbol', 'dtTrade'],
[('dtTrade','==','2005-01-02T00:00:00.000000000')]),
(tempdir, ['NOW', 'SPY', 'VIX'], 10, 'hive', 2, ['symbol', 'dtTrade'],
[('dtTrade','==', Timestamp('2005-01-01 00:00:00'))]),
]
)
def test_frame_write_read_verify(tempdir, input_symbols, input_days, file_scheme,
input_columns, partitions, filters):
#Generate Temp Director for parquet Files
fdir = str(tempdir)
fname = os.path.join(fdir, 'test')

#Generate Test Input Frame
input_df = frame_symbol_dtTrade_type_strike(days=input_days,
symbols=input_symbols,
numbercolumns=input_columns)
input_df.reset_index(inplace=True)
write(fname, input_df, partition_on=partitions, file_scheme=file_scheme, compression='SNAPPY')

#Read Back Whole Parquet Structure
output_df = ParquetFile(fname).to_pandas()
for col in output_df.columns:
assert col in input_df.columns.values
assert len(input_df) == len(output_df)

#Read with filters
filtered_output_df = ParquetFile(fname).to_pandas(filters=filters)

#Filter Input Frame to Match What Should Be Expected from parquet read
# Handle either string or non-string inputs / works for timestamps
filterStrings = []
for name, operator, value in filters:
if isinstance(value, str):
value = "'{}'".format(value)
else:
value = value.__repr__()
filterStrings.append("{} {} {}".format(name, operator, value))
filters_expression = " and ".join(filterStrings)
filtered_input_df = input_df.query(filters_expression)

# Check to Ensure Columns Match
for col in filtered_output_df.columns:
assert col in filtered_input_df.columns.values
# Check to Ensure Number of Rows Match
assert len(filtered_input_df) == len(filtered_output_df)

# Clean Up
shutil.rmtree(fdir, ignore_errors=True)
7 changes: 4 additions & 3 deletions fastparquet/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,15 @@ def default_open(f, mode='rb'):


def val_to_num(x):
# What about ast.literal_eval?
if x in ['NOW', 'TODAY']:
return x
try:
return ast.literal_eval(x)
except ValueError:
except:
pass
try:
return pd.to_datetime(x)
except ValueError:
except:
pass
try:
return pd.to_timedelta(x)
Expand Down

0 comments on commit 24753fa

Please sign in to comment.