Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speedup speedup #259

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
50 changes: 30 additions & 20 deletions fastparquet/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -540,12 +540,12 @@ def filter_out_stats(rg, filters, schema):
s = column.meta_data.statistics
if s.max is not None:
b = ensure_bytes(s.max)
vmax = encoding.read_plain(b, column.meta_data.type, 1)
vmax = encoding.read_plain(b, column.meta_data.type, 1, se)
if se.converted_type is not None:
vmax = converted_types.convert(vmax, se)
if s.min is not None:
b = ensure_bytes(s.min)
vmin = encoding.read_plain(b, column.meta_data.type, 1)
vmin = encoding.read_plain(b, column.meta_data.type, 1, se)
if se.converted_type is not None:
vmin = converted_types.convert(vmin, se)
out = filter_val(op, val, vmin, vmax)
Expand All @@ -554,13 +554,14 @@ def filter_out_stats(rg, filters, schema):
return False


def statistics(obj):
def statistics(obj, s=None):
"""
Return per-column statistics for a ParquerFile

Parameters
----------
obj: ParquetFile
se: schema or element, passed down

Returns
-------
Expand All @@ -577,46 +578,55 @@ def statistics(obj):
"""
if isinstance(obj, parquet_thrift.ColumnChunk):
md = obj.meta_data
s = obj.meta_data.statistics
st = obj.meta_data.statistics
rv = {}
if not s:
if not st:
return rv
if s.max is not None:
if st.max is not None:
try:
if md.type == parquet_thrift.Type.BYTE_ARRAY:
rv['max'] = ensure_bytes(s.max)
if md.type in [parquet_thrift.Type.BYTE_ARRAY,
parquet_thrift.Type.FIXED_LEN_BYTE_ARRAY] :
rv['max'] = ensure_bytes(st.max)
if s.converted_type in [parquet_thrift.ConvertedType.UTF8,
parquet_thrift.ConvertedType.JSON]:
rv['max'] = rv['max'].decode('utf8')
else:
rv['max'] = encoding.read_plain(ensure_bytes(s.max),
rv['max'] = encoding.read_plain(ensure_bytes(st.max),
md.type, 1)[0]
except:
rv['max'] = None
if s.min is not None:
if st.min is not None:
try:
if md.type == parquet_thrift.Type.BYTE_ARRAY:
rv['min'] = ensure_bytes(s.min)
if md.type in [parquet_thrift.Type.BYTE_ARRAY,
parquet_thrift.Type.FIXED_LEN_BYTE_ARRAY]:
rv['min'] = ensure_bytes(st.min)
if s.converted_type in [parquet_thrift.ConvertedType.UTF8,
parquet_thrift.ConvertedType.JSON]:
rv['min'] = rv['min'].decode('utf8')
else:
rv['min'] = encoding.read_plain(ensure_bytes(s.min),
rv['min'] = encoding.read_plain(ensure_bytes(st.min),
md.type, 1)[0]
except:
rv['min'] = None
if s.null_count is not None:
rv['null_count'] = s.null_count
if s.distinct_count is not None:
rv['distinct_count'] = s.distinct_count
if st.null_count is not None:
rv['null_count'] = st.null_count
if st.distinct_count is not None:
rv['distinct_count'] = st.distinct_count
return rv

if isinstance(obj, parquet_thrift.RowGroup):
return {'.'.join(c.meta_data.path_in_schema): statistics(c)
return {'.'.join(c.meta_data.path_in_schema):
statistics(c, s=s.schema_element(c.meta_data.path_in_schema))
for c in obj.columns}

if isinstance(obj, ParquetFile):
L = list(map(statistics, obj.row_groups))
schema = obj.schema
L = [statistics(rg, s=schema) for rg in obj.row_groups]
d = {n: {col: [item.get(col, {}).get(n, None) for item in L]
for col in obj.columns}
for n in ['min', 'max', 'null_count', 'distinct_count']}
if not L:
return d
schema = obj.schema
for col in obj.row_groups[0].columns:
column = '.'.join(col.meta_data.path_in_schema)
se = schema.schema_element(col.meta_data.path_in_schema)
Expand Down
4 changes: 4 additions & 0 deletions fastparquet/benchmarks/columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ def time_text():
with measure('%s: write, fixed: %s' % (t, fixed), result):
write(fn, df, has_nulls=False, write_index=False,
fixed_text={col: fixed}, object_encoding=t)
with measure('%s: write, fixed: %s, nostat' % (t, fixed), result):
write(fn, df, has_nulls=False, write_index=False,
fixed_text={col: fixed}, object_encoding=t,
stats=False)

pf = ParquetFile(fn)
pf.to_pandas() # warm-up
Expand Down
7 changes: 2 additions & 5 deletions fastparquet/converted_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

from .thrift_structures import parquet_thrift
from .util import PY2
from .speedups import array_decode_utf8

logger = logging.getLogger('parquet') # pylint: disable=invalid-name

Expand Down Expand Up @@ -100,9 +99,7 @@ def convert(data, se, timestamp96=True):
if ctype is None:
return data
if ctype == parquet_thrift.ConvertedType.UTF8:
if isinstance(data, list) or data.dtype != "O":
data = np.asarray(data, dtype="O")
return array_decode_utf8(data)
return data
if ctype == parquet_thrift.ConvertedType.DECIMAL:
scale_factor = 10**-se.scale
if data.dtype.kind in ['i', 'f']:
Expand Down Expand Up @@ -159,7 +156,7 @@ def from_bytes(d):
out = np.empty(len(data), dtype="O")
else:
out = data
out[:] = [json.loads(d.decode('utf8')) for d in data]
out[:] = [json.loads(d) for d in data]
return out
elif ctype == parquet_thrift.ConvertedType.BSON:
if isinstance(data, list) or data.dtype != "O":
Expand Down
22 changes: 15 additions & 7 deletions fastparquet/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from .compression import decompress_data
from .converted_types import convert, typemap
from .schema import _is_list_like, _is_map_like
from .speedups import unpack_byte_array
from .speedups import decode
from .thrift_structures import parquet_thrift, read_thrift
from .util import val_to_num, byte_buffer, ex_from_sep

Expand Down Expand Up @@ -101,6 +101,7 @@ def read_data_page(f, helper, header, metadata, skip_nulls=False,
dtype=np.uint8))

repetition_levels = read_rep(io_obj, daph, helper, metadata)
se = helper.schema_element(metadata.path_in_schema)

if skip_nulls and not helper.is_required(metadata.path_in_schema):
num_nulls = 0
Expand All @@ -115,7 +116,7 @@ def read_data_page(f, helper, header, metadata, skip_nulls=False,
values = encoding.read_plain(raw_bytes[io_obj.loc:],
metadata.type,
int(daph.num_values - num_nulls),
width=width)
width=width, se=se)
elif daph.encoding in [parquet_thrift.Encoding.PLAIN_DICTIONARY,
parquet_thrift.Encoding.RLE]:
# bit_width is stored as single byte.
Expand Down Expand Up @@ -155,15 +156,22 @@ def read_dictionary_page(file_obj, schema_helper, page_header, column_metadata):
Consumes data using the plain encoding and returns an array of values.
"""
raw_bytes = _read_page(file_obj, page_header, column_metadata)
se = schema_helper.schema_element(column_metadata.path_in_schema)
if column_metadata.type == parquet_thrift.Type.BYTE_ARRAY:
values = unpack_byte_array(raw_bytes,
page_header.dictionary_page_header.num_values)
if se.converted_type in [parquet_thrift.ConvertedType.UTF8,
parquet_thrift.ConvertedType.JSON]:
values = decode(raw_bytes,
page_header.dictionary_page_header.num_values,
utf8=True)
else:
values = decode(raw_bytes,
page_header.dictionary_page_header.num_values,
utf8=False)
else:
width = schema_helper.schema_element(
column_metadata.path_in_schema).type_length
width = se.type_length
values = encoding.read_plain(
raw_bytes, column_metadata.type,
page_header.dictionary_page_header.num_values, width)
page_header.dictionary_page_header.num_values, width, se=se)
return values


Expand Down
25 changes: 18 additions & 7 deletions fastparquet/encoding.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import numba
import numpy as np

from .speedups import unpack_byte_array
from .speedups import decode, array_decode
from .thrift_structures import parquet_thrift
from .util import byte_buffer

Expand All @@ -27,24 +27,35 @@ def read_plain_boolean(raw_bytes, count):
}


def read_plain(raw_bytes, type_, count, width=0):
def read_plain(raw_bytes, type_, count, width=0, se=None):
if type_ in DECODE_TYPEMAP:
dtype = DECODE_TYPEMAP[type_]
return np.frombuffer(byte_buffer(raw_bytes), dtype=dtype, count=count)
if type_ == parquet_thrift.Type.FIXED_LEN_BYTE_ARRAY:
if count == 1:
width = len(raw_bytes)
dtype = np.dtype('S%i' % width)
return np.frombuffer(byte_buffer(raw_bytes), dtype=dtype, count=count)
out = np.frombuffer(byte_buffer(raw_bytes), dtype=dtype, count=count)
if se.converted_type in [parquet_thrift.ConvertedType.UTF8,
parquet_thrift.ConvertedType.JSON]:
return array_decode(out)
else:
return out
if type_ == parquet_thrift.Type.BOOLEAN:
return read_plain_boolean(raw_bytes, count)
# variable byte arrays (rare)
try:
return np.array(unpack_byte_array(raw_bytes, count), dtype='O')
if se.converted_type in [parquet_thrift.ConvertedType.UTF8,
parquet_thrift.ConvertedType.JSON]:
return decode(raw_bytes, count, utf8=True)
else:
return decode(raw_bytes, count, utf8=False)
except RuntimeError:
if count == 1:
# e.g., for statistics
return np.array([raw_bytes], dtype='O')
if se.converted_type in [parquet_thrift.ConvertedType.UTF8,
parquet_thrift.ConvertedType.JSON]:
return np.array([raw_bytes.decode('utf8')], dtype='O')
else:
return np.array([raw_bytes], dtype='O')
else:
raise

Expand Down