Skip to content

Commit

Permalink
small fixes (#278)
Browse files Browse the repository at this point in the history
* update requirements

note: will need follow-on updates to the feedstock

* 8 and 16 bit ints now written as int32 plain, for compatability

Fixes #256 following apache/parquet-format#79

* Tentatively fix py2/str

Fixes #250

* Fixes #274

Should not include the fixture in pytest parameterize - confuses py2/win32,
apparently (and not recommended by pytest).

* Addition to previous commit: re-enable test
  • Loading branch information
martindurant committed Jan 18, 2018
1 parent 90e31d6 commit 4730530
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 32 deletions.
3 changes: 3 additions & 0 deletions fastparquet/test/test_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from fastparquet import ParquetFile
from fastparquet import write, parquet_thrift
from fastparquet import writer, encoding
from pandas.testing import assert_frame_equal
import pytest

from fastparquet.util import default_mkdirs
Expand Down Expand Up @@ -120,6 +121,8 @@ def test_roundtrip(tempdir, scheme, row_groups, comp):

for col in r.columns:
assert (df[col] == data[col]).all()
# tests https://github.com/dask/fastparquet/issues/250
assert isinstance(data[col][0], type(df[col][0]))


def test_bad_coltype(tempdir):
Expand Down
69 changes: 41 additions & 28 deletions fastparquet/test/test_partition_filters_specialstrings.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,47 +31,60 @@ def frame_symbol_dtTrade_type_strike(days=1 * 252,
index=index, columns=[x for x in string.ascii_uppercase[0:numbercolumns]])
return df

@pytest.mark.parametrize('tempdir,input_symbols,input_days,file_scheme,input_columns,partitions,filters',
[
(tempdir, ['NOW', 'SPY', 'VIX'], 2*252, 'hive', 2, ['symbol', 'year'], [('symbol', '==', 'SPY')]),
(tempdir, ['now', 'SPY', 'VIX'], 2*252, 'hive', 2, ['symbol', 'year'], [('symbol', '==', 'SPY')]),
(tempdir, ['TODAY', 'SPY', 'VIX'], 2*252, 'hive', 2, ['symbol', 'year'], [('symbol', '==', 'SPY')]),
(tempdir, ['VIX*', 'SPY', 'VIX'], 2*252, 'hive', 2, ['symbol', 'year'], [('symbol', '==', 'SPY')]),
(tempdir, ['QQQ*', 'SPY', 'VIX'], 2*252, 'hive', 2, ['symbol', 'year'], [('symbol', '==', 'SPY')]),
(tempdir, ['QQQ!', 'SPY', 'VIX'], 2*252, 'hive', 2, ['symbol', 'year'], [('symbol', '==', 'SPY')]),
(tempdir, ['Q%QQ', 'SPY', 'VIX'], 2*252, 'hive', 2, ['symbol', 'year'], [('symbol', '==', 'SPY')]),
(tempdir, ['NOW', 'SPY', 'VIX'], 10, 'hive', 2, ['symbol', 'dtTrade'], [('symbol', '==', 'SPY')]),
(tempdir, ['NOW', 'SPY', 'VIX'], 10, 'hive', 2, ['symbol', 'dtTrade'],
[('dtTrade','==','2005-01-02T00:00:00.000000000')]),
(tempdir, ['NOW', 'SPY', 'VIX'], 10, 'hive', 2, ['symbol', 'dtTrade'],
[('dtTrade','==', Timestamp('2005-01-01 00:00:00'))]),
]
)

@pytest.mark.skipif(sys.platform=='win32' and PY2, reason='does not work on windows 32 py2.7')
def test_frame_write_read_verify(tempdir, input_symbols, input_days, file_scheme,
input_columns, partitions, filters):
#Generate Temp Director for parquet Files
@pytest.mark.parametrize('input_symbols,input_days,file_scheme,input_columns,'
'partitions,filters',
[
(['NOW', 'SPY', 'VIX'], 2 * 252, 'hive', 2,
['symbol', 'year'], [('symbol', '==', 'SPY')]),
(['now', 'SPY', 'VIX'], 2 * 252, 'hive', 2,
['symbol', 'year'], [('symbol', '==', 'SPY')]),
(['TODAY', 'SPY', 'VIX'], 2 * 252, 'hive', 2,
['symbol', 'year'], [('symbol', '==', 'SPY')]),
(['VIX*', 'SPY', 'VIX'], 2 * 252, 'hive', 2,
['symbol', 'year'], [('symbol', '==', 'SPY')]),
(['QQQ*', 'SPY', 'VIX'], 2 * 252, 'hive', 2,
['symbol', 'year'], [('symbol', '==', 'SPY')]),
(['QQQ!', 'SPY', 'VIX'], 2 * 252, 'hive', 2,
['symbol', 'year'], [('symbol', '==', 'SPY')]),
(['Q%QQ', 'SPY', 'VIX'], 2 * 252, 'hive', 2,
['symbol', 'year'], [('symbol', '==', 'SPY')]),
(['NOW', 'SPY', 'VIX'], 10, 'hive', 2,
['symbol', 'dtTrade'], [('symbol', '==', 'SPY')]),
(['NOW', 'SPY', 'VIX'], 10, 'hive', 2,
['symbol', 'dtTrade'],
[('dtTrade', '==',
'2005-01-02T00:00:00.000000000')]),
(['NOW', 'SPY', 'VIX'], 10, 'hive', 2,
['symbol', 'dtTrade'],
[('dtTrade', '==',
Timestamp('2005-01-01 00:00:00'))]),
]
)
def test_frame_write_read_verify(tempdir, input_symbols, input_days,
file_scheme,
input_columns, partitions, filters):
# Generate Temp Director for parquet Files
fdir = str(tempdir)
fname = os.path.join(fdir, 'test')

#Generate Test Input Frame
# Generate Test Input Frame
input_df = frame_symbol_dtTrade_type_strike(days=input_days,
symbols=input_symbols,
numbercolumns=input_columns)
symbols=input_symbols,
numbercolumns=input_columns)
input_df.reset_index(inplace=True)
write(fname, input_df, partition_on=partitions, file_scheme=file_scheme, compression='SNAPPY')
write(fname, input_df, partition_on=partitions, file_scheme=file_scheme,
compression='SNAPPY')

#Read Back Whole Parquet Structure
# Read Back Whole Parquet Structure
output_df = ParquetFile(fname).to_pandas()
for col in output_df.columns:
assert col in input_df.columns.values
assert len(input_df) == len(output_df)

#Read with filters
# Read with filters
filtered_output_df = ParquetFile(fname).to_pandas(filters=filters)

#Filter Input Frame to Match What Should Be Expected from parquet read
# Filter Input Frame to Match What Should Be Expected from parquet read
# Handle either string or non-string inputs / works for timestamps
filterStrings = []
for name, operator, value in filters:
Expand Down
6 changes: 4 additions & 2 deletions fastparquet/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ def infer_object_encoding(data):
return "utf8"
elif PY2 and all(isinstance(i, unicode) for i in head):
return "utf8"
elif all(isinstance(i, STR_TYPE) for i in head) and PY2:
elif PY2 and all(isinstance(i, (str, bytes)) for i in head):
return "bytes"
elif all(isinstance(i, bytes) for i in head):
return 'bytes'
Expand Down Expand Up @@ -499,7 +499,9 @@ def write_column(f, data, selement, compression=None):
cats = True
encoding = "PLAIN_DICTIONARY"
elif str(data.dtype) in ['int8', 'int16', 'uint8', 'uint16']:
encoding = "RLE"
# encoding = "RLE"
# disallow bitpacking for compatability
data = data.astype('int32')

start = f.tell()
bdata = definition_data + repetition_data + encode[encoding](
Expand Down
3 changes: 1 addition & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
pandas
pandas>=0.19
numba>=0.28
numpy>=1.11
thrift>=0.10.0
six
cython

0 comments on commit 4730530

Please sign in to comment.