Skip to content

Commit

Permalink
Merge pull request #30 from martindurant/auto_text_convert
Browse files Browse the repository at this point in the history
Auto text convert
  • Loading branch information
martindurant committed Nov 22, 2016
2 parents 5c796bb + 432c104 commit f8a8cea
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 50 deletions.
8 changes: 5 additions & 3 deletions fastparquet/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,10 +319,12 @@ def statistics(obj):
for col in obj.row_groups[0].columns:
column = '.'.join(col.meta_data.path_in_schema)
se = helper.schema_element(column)
if se.converted_type:
if se.converted_type is not None:
for name in ['min', 'max']:
d[name][column] = [x if x is None else converted_types.convert(x, se)
for x in d[name][column]]
d[name][column] = (
[None] if d[name][column] is None
else converted_types.convert(pd.Series(d[name][column]), se).tolist()
)
return d


Expand Down
2 changes: 1 addition & 1 deletion fastparquet/compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
try:
import lzo
compressions['LZO'] = lzo.compress
decompressions['LZO'] =lzo.decompress
decompressions['LZO'] = lzo.decompress
except ImportError:
pass
try:
Expand Down
11 changes: 6 additions & 5 deletions fastparquet/converted_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,15 @@ def convert(data, se):
scale_factor = 10**-se.scale
return data * scale_factor
elif ctype == parquet_thrift.ConvertedType.DATE:
return pd.to_datetime(data.map(datetime.date.fromordinal), box=False)
return pd.to_datetime(data.map(datetime.date.fromordinal))
elif ctype == parquet_thrift.ConvertedType.TIME_MILLIS:
return pd.to_timedelta(data, unit='ms', box=False)
return pd.to_timedelta(data, unit='ms')
elif ctype == parquet_thrift.ConvertedType.TIMESTAMP_MILLIS:
return pd.to_datetime(data, unit='ms', box=False)
return pd.to_datetime(data, unit='ms')
elif ctype == parquet_thrift.ConvertedType.TIME_MICROS:
return pd.to_timedelta(data, unit='us', box=False)
return pd.to_timedelta(data, unit='us')
elif ctype == parquet_thrift.ConvertedType.TIMESTAMP_MICROS:
return pd.to_datetime(data, unit='us', box=False)
return pd.to_datetime(data, unit='us')
elif ctype == parquet_thrift.ConvertedType.UINT_8:
return data.astype(np.uint8)
elif ctype == parquet_thrift.ConvertedType.UINT_16:
Expand All @@ -115,6 +115,7 @@ def convert(data, se):
elif ctype == parquet_thrift.ConvertedType.INTERVAL:
# for those that understand, output is month, day, ms
# maybe should convert to timedelta
# TODO: seems like a np.view should do this much faster
return data.map(lambda x: np.fromstring(x, dtype='<u4'))
else:
logger.info("Converted type '%s'' not handled",
Expand Down
4 changes: 3 additions & 1 deletion fastparquet/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,11 @@ def read_dictionary_page(file_obj, schema_helper, page_header, column_metadata):
values = [fobj.read(struct.unpack(b"<i", fobj.read(4))[0])
for _ in range(page_header.dictionary_page_header.num_values)]
else:
width = schema_helper.schema_element(
column_metadata.path_in_schema[-1]).type_length
values = encoding.read_plain(
raw_bytes, column_metadata.type,
page_header.dictionary_page_header.num_values)
page_header.dictionary_page_header.num_values, width)
return values


Expand Down
2 changes: 2 additions & 0 deletions fastparquet/encoding.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ def read_plain(raw_bytes, type_, count, width=0):
dtype = DECODE_TYPEMAP[type_]
return np.frombuffer(memoryview(raw_bytes), dtype=dtype, count=count)
if type_ == parquet_thrift.Type.FIXED_LEN_BYTE_ARRAY:
if count == 1:
width = len(raw_bytes)
dtype = np.dtype('S%i' % width)
return np.frombuffer(memoryview(raw_bytes), dtype=dtype, count=count)
if type_ == parquet_thrift.Type.BOOLEAN:
Expand Down
8 changes: 4 additions & 4 deletions fastparquet/test/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ def test_statistics(tempdir):
expected = {'distinct_count': {'x': [None, None],
'y': [None, None],
'z': [None, None]},
'max': {'x': [2, 3], 'y': [2.0, 1.0], 'z': [b'b', b'c']},
'min': {'x': [1, 3], 'y': [1.0, 1.0], 'z': [b'a', b'c']},
'max': {'x': [2, 3], 'y': [2.0, 1.0], 'z': ['b', 'c']},
'min': {'x': [1, 3], 'y': [1.0, 1.0], 'z': ['a', 'c']},
'null_count': {'x': [0, 0], 'y': [0, 0], 'z': [0, 0]}}

assert s == expected
Expand All @@ -42,7 +42,7 @@ def test_logical_types(tempdir):

s = statistics(p)

assert isinstance(s['min']['D'][0], np.datetime64)
assert isinstance(s['min']['D'][0], (np.datetime64, pd.tslib.Timestamp))


def test_empty_statistics(tempdir):
Expand Down Expand Up @@ -79,6 +79,6 @@ def test_sorted_row_group_columns(tempdir):

result = sorted_partitioned_columns(pf)
expected = {'x': {'min': [1, 3], 'max': [2, 4]},
'z': {'min': [b'a', b'c'], 'max': [b'b', b'd']}}
'z': {'min': ['a', 'c'], 'max': ['b', 'd']}}

assert result == expected
34 changes: 33 additions & 1 deletion fastparquet/test/test_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import pandas as pd
import pandas.util.testing as tm
from fastparquet import ParquetFile
from fastparquet import write
from fastparquet import write, parquet_thrift
from fastparquet import writer, encoding
import pytest
import shutil
Expand Down Expand Up @@ -400,3 +400,35 @@ def test_naive_index(tempdir):
r = ParquetFile(fn)

assert set(r.columns) == {'x', 'y', 'index'}


def test_text_convert(tempdir):
df = pd.DataFrame({'a': ['a'] * 100,
'b': [b'a'] * 100})
fn = os.path.join(tempdir, 'tmp.parq')

write(fn, df, fixed_text={'a': 1, 'b': 2})
pf = ParquetFile(fn)
assert pf.schema[1].type == parquet_thrift.Type.FIXED_LEN_BYTE_ARRAY
assert pf.schema[1].type_length == 1
assert pf.schema[2].type == parquet_thrift.Type.FIXED_LEN_BYTE_ARRAY
assert pf.schema[2].type_length == 2
assert pf.statistics['max']['a'] == ['a']
df2 = pf.to_pandas()
tm.assert_frame_equal(df, df2)

write(fn, df)
pf = ParquetFile(fn)
assert pf.schema[1].type == parquet_thrift.Type.BYTE_ARRAY
assert pf.schema[2].type == parquet_thrift.Type.BYTE_ARRAY
assert pf.statistics['max']['a'] == ['a']
df2 = pf.to_pandas()
tm.assert_frame_equal(df, df2)

write(fn, df, fixed_text={'a': 1})
pf = ParquetFile(fn)
assert pf.schema[1].type == parquet_thrift.Type.FIXED_LEN_BYTE_ARRAY
assert pf.schema[2].type == parquet_thrift.Type.BYTE_ARRAY
assert pf.statistics['max']['a'] == ['a']
df2 = pf.to_pandas()
tm.assert_frame_equal(df, df2)
105 changes: 70 additions & 35 deletions fastparquet/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
parquet_thrift.Type.FLOAT: np.float32}


def find_type(data, convert=False):
def find_type(data, convert=False, fixed_text=None):
""" Get appropriate typecodes for column dtype
Data conversion may happen here, only at write time.
Expand All @@ -51,11 +51,8 @@ def find_type(data, convert=False):
before saving to parquet, we will not make any assumptions for them.
If the dtype is "object" the first ten items will be examined, and is str
or bytes, will be stored as variable length byte strings; if dict or list,
(nested data) will be stored with JSON encoding.
To be stored as fixed-length byte strings, the dtype must be "bytesXX"
(pandas notation) or "|SXX" (numpy notation)
or bytes, will be stored as variable or fixed length byte strings;
if dict or list, (nested data) will be stored with JSON encoding.
In the case of catagoricals, the data type refers to the labels; the data
(codes) will be stored as int. The labels are usually variable length
Expand All @@ -69,12 +66,18 @@ def find_type(data, convert=False):
Parameters
----------
A pandas series.
data: pd.Series
convert: bool (False)
Whether to actually perform the conversion, or just infer types
fixed_text: int or None
For str and bytes, the fixed-string length to use. If None, object
column will remain variable length.
Returns
-------
- a thrift schema element
- a thrift typecode to be passed to the column chunk writer
- converted data (None if convert is False)
"""
out = None
Expand All @@ -98,14 +101,29 @@ def find_type(data, convert=False):
elif dtype == "O":
head = data[:10] if isinstance(data, pd.Index) else data.valid()[:10]
if all(isinstance(i, str) for i in head):
type, converted_type, width = (parquet_thrift.Type.BYTE_ARRAY,
parquet_thrift.ConvertedType.UTF8, None)
if convert:
out = data.str.encode('utf8').values
if fixed_text:
width = fixed_text
if convert:
out = data.str.encode('utf8').values.astype('S%i' % width)
type, converted_type = (
parquet_thrift.Type.FIXED_LEN_BYTE_ARRAY,
parquet_thrift.ConvertedType.UTF8)
else:
type, converted_type, width = (parquet_thrift.Type.BYTE_ARRAY,
parquet_thrift.ConvertedType.UTF8, None)
if convert:
out = data.str.encode('utf8').values
elif all(isinstance(i, bytes) for i in head):
type, converted_type, width = parquet_thrift.Type.BYTE_ARRAY, None, None
if convert:
out = data.values
if fixed_text:
width = fixed_text
if convert:
out = data.values.astype('S%i' % width)
type, converted_type = (
parquet_thrift.Type.FIXED_LEN_BYTE_ARRAY, None)
else:
type, converted_type, width = parquet_thrift.Type.BYTE_ARRAY, None, None
if convert:
out = data.values
elif all(isinstance(i, (list, dict)) for i in head):
type, converted_type, width = (parquet_thrift.Type.BYTE_ARRAY,
parquet_thrift.ConvertedType.JSON, None)
Expand Down Expand Up @@ -189,10 +207,10 @@ def write_thrift(fobj, thrift):
return fobj.tell() - t0


def encode_plain(data, se):
def encode_plain(data, se, fixed_text=None):
"""PLAIN encoding; returns byte representation"""
se, type, out = find_type(data, True)
if data.dtype == "O":
se, type, out = find_type(data, True, fixed_text=fixed_text)
if se.type == parquet_thrift.Type.BYTE_ARRAY:
return b''.join([struct.pack('<l', len(x)) + x for x in out])
else:
return out.tobytes()
Expand Down Expand Up @@ -287,7 +305,7 @@ def encode_rle_bp(data, width, o, withlength=False):
o.loc = end


def encode_dict(data, se):
def encode_dict(data, se, _):
""" The data part of dictionary encoding is always int32s, with RLE/bitpack
"""
width = encoding.width_from_max_int(data.max())
Expand Down Expand Up @@ -352,11 +370,11 @@ def write_column(f, data, selement, encoding='PLAIN', compression=None):
"""
has_nulls = selement.repetition_type == parquet_thrift.FieldRepetitionType.OPTIONAL
fixed_text = selement.type_length
tot_rows = len(data)

# no NULL handling (but NaNs, NaTs are allowed)
if has_nulls:
print('has nulls!')
definition_data, data = make_definitions(data)
else:
definition_data = b""
Expand All @@ -373,7 +391,8 @@ def write_column(f, data, selement, encoding='PLAIN', compression=None):
dph = parquet_thrift.DictionaryPageHeader(
num_values=len(data.cat.categories),
encoding=parquet_thrift.Encoding.PLAIN)
bdata = encode['PLAIN'](pd.Series(data.cat.categories), selement)
bdata = encode['PLAIN'](pd.Series(data.cat.categories), selement,
fixed_text=fixed_text)
l0 = len(bdata)
if compression:
bdata = compress_data(bdata, compression)
Expand All @@ -391,21 +410,26 @@ def write_column(f, data, selement, encoding='PLAIN', compression=None):
f.write(bdata)
try:
max, min = data.max(), data.min()
max = encode['PLAIN'](pd.Series([max]), selement)
min = encode['PLAIN'](pd.Series([min]), selement)
max = encode['PLAIN'](pd.Series([max]), selement,
fixed_text=fixed_text)
min = encode['PLAIN'](pd.Series([min]), selement,
fixed_text=fixed_text)
except TypeError:
max, min = None, None
data = data.cat.codes.astype(np.int32)
cats = True
encoding = "PLAIN_DICTIONARY"

start = f.tell()
bdata = definition_data + repetition_data + encode[encoding](data, selement)
bdata = definition_data + repetition_data + encode[encoding](data, selement,
fixed_text)
try:
if encoding != 'PLAIN_DICTIONARY':
max, min = data.max(), data.min()
max = encode['PLAIN'](pd.Series([max], dtype=data.dtype), selement)
min = encode['PLAIN'](pd.Series([min], dtype=data.dtype), selement)
max = encode['PLAIN'](pd.Series([max], dtype=data.dtype), selement,
fixed_text=fixed_text)
min = encode['PLAIN'](pd.Series([min], dtype=data.dtype), selement,
fixed_text=fixed_text)
except TypeError:
max, min = None, None

Expand Down Expand Up @@ -487,7 +511,8 @@ def make_row_group(f, data, schema, file_path=None, compression=None,
return rg


def make_part_file(f, data, schema, compression=None, encoding='PLAIN'):
def make_part_file(f, data, schema, compression=None, encoding='PLAIN',
fixed_text=None):
if len(data) == 0:
return
with f as f:
Expand All @@ -505,7 +530,7 @@ def make_part_file(f, data, schema, compression=None, encoding='PLAIN'):
return rg


def make_metadata(data, has_nulls=[], ignore_columns=[]):
def make_metadata(data, has_nulls=True, ignore_columns=[], fixed_text=None):
root = parquet_thrift.SchemaElement(name='schema',
num_children=0)

Expand All @@ -518,12 +543,16 @@ def make_metadata(data, has_nulls=[], ignore_columns=[]):
for column in data.columns:
if column in ignore_columns:
continue
fixed = None if fixed_text is None else fixed_text.get(column, None)
if str(data[column].dtype) == 'category':
se, type, _ = find_type(data[column].cat.categories)
se, type, _ = find_type(data[column].cat.categories,
fixed_text=fixed)
se.name = column
else:
se, type, _ = find_type(data[column])
if column in has_nulls and str(data[column].dtype) in ['category', 'object']:
se, type, _ = find_type(data[column], fixed_text=fixed)
has_nulls = (has_nulls if has_nulls in [True, False]
else column in has_nulls)
if has_nulls and str(data[column].dtype) in ['category', 'object']:
se.repetition_type = parquet_thrift.FieldRepetitionType.OPTIONAL
fmd.schema.append(se)
root.num_children += 1
Expand All @@ -532,8 +561,8 @@ def make_metadata(data, has_nulls=[], ignore_columns=[]):

def write(filename, data, row_group_offsets=50000000, encoding="PLAIN",
compression=None, file_scheme='simple', open_with=default_openw,
mkdirs=default_mkdirs, has_nulls=[], write_index=None,
partition_on=[]):
mkdirs=default_mkdirs, has_nulls=True, write_index=None,
partition_on=[], fixed_text=None):
""" Write Pandas DataFrame to filename as Parquet Format
Parameters
Expand Down Expand Up @@ -562,8 +591,8 @@ def write(filename, data, row_group_offsets=50000000, encoding="PLAIN",
When called with a path/URL, creates any necessary dictionaries to
make that location writable, e.g., ``os.makedirs``. This is not
necessary if using the simple file scheme
has_nulls: list of strings
The named columns can have nulls. Only applies to Object and Category
has_nulls: bool or list of strings
Whether columns can have nulls. Only applies to Object and Category
columns, as pandas ints can't have NULLs, and NaN/NaT is equivalent
to NULL in float and time-like columns.
write_index: boolean
Expand All @@ -573,6 +602,11 @@ def write(filename, data, row_group_offsets=50000000, encoding="PLAIN",
Passed to groupby in order to split data within each row-group,
producing a structured directory tree. Note: as with pandas, null
values will be dropped. Ignored if file_scheme is simple.
fixed_text: {column: int length} or None
For bytes or str columns, values will be converted
to fixed-length strings of the given length for the given columns
before writing, potentially providing a large speed
boost.
Examples
--------
Expand All @@ -599,7 +633,8 @@ def write(filename, data, row_group_offsets=50000000, encoding="PLAIN",
data.index._step == 1 and data.index.name is None):
data = data.reset_index()
ignore = partition_on if file_scheme != 'simple' else []
fmd = make_metadata(data, has_nulls=has_nulls, ignore_columns=ignore)
fmd = make_metadata(data, has_nulls=has_nulls, ignore_columns=ignore,
fixed_text=fixed_text)
for i, start in enumerate(row_group_offsets):
end = row_group_offsets[i+1] if i < (len(row_group_offsets) - 1) else None
if file_scheme == 'simple':
Expand Down

0 comments on commit f8a8cea

Please sign in to comment.