Skip to content

Commit

Permalink
Merge pull request #33 from martindurant/read_as_arrays
Browse files Browse the repository at this point in the history
Convert on numpy arrays, convert to series later
  • Loading branch information
martindurant committed Nov 29, 2016
2 parents f8a8cea + d4117f3 commit bcdb683
Show file tree
Hide file tree
Showing 8 changed files with 430 additions and 139 deletions.
31 changes: 20 additions & 11 deletions fastparquet/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def _parse_header(self, f, verify=True):
for chunk in rg.columns:
self.group_files.setdefault(i, set()).add(chunk.file_path)
self.helper = schema.SchemaHelper(self.schema)
self.selfmade = self.created_by == "fastparquet-python"

@property
def columns(self):
Expand All @@ -109,18 +110,21 @@ def row_group_filename(self, rg):
return self.sep.join([os.path.dirname(self.fn),
rg.columns[0].file_path])

def read_row_group_file(self, rg, columns, categories):
def read_row_group_file(self, rg, columns, categories, index=None):
""" Open file for reading, and process it as a row-group """
fn = self.row_group_filename(rg)
return core.read_row_group_file(fn, rg, columns, categories,
self.helper, self.cats, open=self.open)
return core.read_row_group_file(
fn, rg, columns, categories, self.helper, self.cats,
open=self.open, selfmade=self.selfmade, index=index)

def read_row_group(self, rg, columns, categories, infile=None):
def read_row_group(self, rg, columns, categories, infile=None,
index=None):
"""
Access row-group in a file and read some columns into a data-frame.
"""
return core.read_row_group(infile, rg, columns, categories,
self.helper, self.cats)
return core.read_row_group(
infile, rg, columns, categories, self.helper, self.cats,
self.selfmade, index=index)

def grab_cats(self, columns, row_group_index=0):
""" Read dictionaries of first row_group
Expand Down Expand Up @@ -154,7 +158,8 @@ def grab_cats(self, columns, row_group_index=0):
grab_dict=True)
return out

def to_pandas(self, columns=None, categories=None, filters=[]):
def to_pandas(self, columns=None, categories=None, filters=[],
index=None):
"""
Read data from parquet into a Pandas dataframe.
Expand All @@ -170,6 +175,9 @@ def to_pandas(self, columns=None, categories=None, filters=[]):
filters: list of tuples
Filter syntax: [(column, op, val), ...],
where op is [==, >, >=, <, <=, !=, in, not in]
index: string or None
Column to assign to the index. If None, index is simple sequential
integers.
Returns
-------
Expand All @@ -182,18 +190,19 @@ def to_pandas(self, columns=None, categories=None, filters=[]):
if all(column.file_path is None for rg in self.row_groups
for column in rg.columns):
with self.open(self.fn) as f:
tot = [self.read_row_group(rg, columns, categories, infile=f)
tot = [self.read_row_group(rg, columns, categories, infile=f,
index=index)
for rg in rgs]
else:
tot = [self.read_row_group_file(rg, columns, categories)
tot = [self.read_row_group_file(rg, columns, categories, index)
for rg in rgs]

if len(tot) == 0:
return pd.DataFrame(columns=columns + list(self.cats))

# TODO: if categories vary from one rg to next, need
# pandas.types.concat.union_categoricals
return pd.concat(tot, ignore_index=True)
return pd.concat(tot, ignore_index=index is None)

@property
def count(self):
Expand Down Expand Up @@ -323,7 +332,7 @@ def statistics(obj):
for name in ['min', 'max']:
d[name][column] = (
[None] if d[name][column] is None
else converted_types.convert(pd.Series(d[name][column]), se).tolist()
else list(converted_types.convert(d[name][column], se))
)
return d

Expand Down
123 changes: 123 additions & 0 deletions fastparquet/benchmarks/columns.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
from contextlib import contextmanager
import datetime
import os
import numpy as np
import pandas as pd
import time
from fastparquet import write, ParquetFile
from dask.utils import tmpdir


@contextmanager
def measure(name, result):
t0 = time.time()
yield
t1 = time.time()
result[name] = round((t1 - t0) * 1000, 3)


def time_column():
with tmpdir() as tempdir:
result = {}
fn = os.path.join(tempdir, 'temp.parq')
n = 10000000
r = np.random.randint(-1e10, 1e10, n).view('timedelta64[ns]')
df = pd.DataFrame({'x': r.copy()})

write(fn, df)
with measure('write random times, no nulls', result):
write(fn, df, has_nulls=False)

pf = ParquetFile(fn)
out = pf.to_pandas() # warm-up

with measure('read random times, no nulls', result):
out = pf.to_pandas()

with measure('write random times, no nulls but has_null=True', result):
write(fn, df)

pf = ParquetFile(fn)
out = pf.to_pandas() # warm-up

with measure('read random times, no nulls but has_null=True', result):
out = pf.to_pandas()

df.loc[n//2, 'x'] = pd.to_datetime('NaT')
with measure('write random times, with null', result):
write(fn, df)

pf = ParquetFile(fn)
out = pf.to_pandas() # warm-up

with measure('read random times, with null', result):
out = pf.to_pandas()

df.loc[n//2, 'x'] = pd.to_datetime('NaT')
with measure('write random times, with null but has_null=False', result):
write(fn, df, has_nulls=False)

pf = ParquetFile(fn)
out = pf.to_pandas() # warm-up

with measure('read random times, with null but has_null=False', result):
out = pf.to_pandas()

return result


if __name__ == '__main__':
result = {}
for f in [time_column]:
result.update(f())
for k in sorted(result):
print(k, result[k])


def time_find_nulls(N=10000000):
x = np.random.random(N)
df = pd.DataFrame({'x': x})
result = {}
run_find_nulls(df, result)
df.loc[N//2, 'x'] = np.nan
run_find_nulls(df, result)
df.loc[:, 'x'] = np.nan
df.loc[N//2, 'x'] = np.random.random()
run_find_nulls(df, result)
df.loc[N//2, 'x'] = np.nan
run_find_nulls(df, result)

x = np.random.randint(0, 2**30, N)
df = pd.DataFrame({'x': x})
run_find_nulls(df, result)

df = pd.DataFrame({'x': x.view('datetime64[s]')})
run_find_nulls(df, result)
v = df.loc[N//2, 'x']
df.loc[N//2, 'x'] = pd.to_datetime('NaT')
run_find_nulls(df, result)
df.loc[:, 'x'] = pd.to_datetime('NaT')
df.loc[N//2, 'x'] = v
run_find_nulls(df, result)
df.loc[:, 'x'] = pd.to_datetime('NaT')
run_find_nulls(df, result)

out = [(k + (v, )) for k, v in result.items()]
df = pd.DataFrame(out, columns=('type', 'nvalid', 'op', 'time'))
df.groupby(('type', 'nvalid', 'op')).sum()
return df


def run_find_nulls(df, res):
nvalid = (df.x == df.x).sum()
with measure((df.x.dtype.kind, nvalid, 'notnull'), res):
df.x.notnull()
with measure((df.x.dtype.kind, nvalid, 'notnull,sum'), res):
df.x.notnull().sum()
with measure((df.x.dtype.kind, nvalid, 'notnull,any'), res):
df.x.notnull().any()
with measure((df.x.dtype.kind, nvalid, 'notnull,all'), res):
df.x.notnull().all()
with measure((df.x.dtype.kind, nvalid, 'count'), res):
df.x.count()

95 changes: 66 additions & 29 deletions fastparquet/converted_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import datetime
import json
import logging
import numba
import numpy as np
import os
import pandas as pd
Expand All @@ -38,33 +39,36 @@ def BSON(x):

DAYS_TO_MILLIS = 86400000000000
"""Number of millis in a day. Used to convert a Date to a date"""
nat = np.datetime64('NaT').view('int64')

simple = {parquet_thrift.Type.INT32: np.int32,
parquet_thrift.Type.INT64: np.int64,
parquet_thrift.Type.FLOAT: np.float32,
parquet_thrift.Type.DOUBLE: np.float64,
parquet_thrift.Type.BOOLEAN: np.bool_,
simple = {parquet_thrift.Type.INT32: np.dtype('int32'),
parquet_thrift.Type.INT64: np.dtype('int64'),
parquet_thrift.Type.FLOAT: np.dtype('float32'),
parquet_thrift.Type.DOUBLE: np.dtype('float64'),
parquet_thrift.Type.BOOLEAN: np.dtype('bool'),
parquet_thrift.Type.INT96: np.dtype('S12'),
parquet_thrift.Type.BYTE_ARRAY: np.dtype("O")}
parquet_thrift.Type.BYTE_ARRAY: np.dtype("O"),
parquet_thrift.Type.FIXED_LEN_BYTE_ARRAY: np.dtype("O")}
complex = {parquet_thrift.ConvertedType.UTF8: np.dtype("O"),
parquet_thrift.ConvertedType.DECIMAL: np.float64,
parquet_thrift.ConvertedType.UINT_8: np.uint8,
parquet_thrift.ConvertedType.UINT_16: np.uint16,
parquet_thrift.ConvertedType.UINT_32: np.uint32,
parquet_thrift.ConvertedType.UINT_64: np.uint64,
parquet_thrift.ConvertedType.INT_8: np.int8,
parquet_thrift.ConvertedType.INT_16: np.int16,
parquet_thrift.ConvertedType.INT_32: np.int32,
parquet_thrift.ConvertedType.INT_64: np.int64,
parquet_thrift.ConvertedType.DECIMAL: np.dtype('float64'),
parquet_thrift.ConvertedType.UINT_8: np.dtype('uint8'),
parquet_thrift.ConvertedType.UINT_16: np.dtype('uint16'),
parquet_thrift.ConvertedType.UINT_32: np.dtype('uint32'),
parquet_thrift.ConvertedType.UINT_64: np.dtype('uint64'),
parquet_thrift.ConvertedType.INT_8: np.dtype('int8'),
parquet_thrift.ConvertedType.INT_16: np.dtype('int16'),
parquet_thrift.ConvertedType.INT_32: np.dtype('int32'),
parquet_thrift.ConvertedType.INT_64: np.dtype('int64'),
parquet_thrift.ConvertedType.TIME_MILLIS: np.dtype('<m8[ns]'),
parquet_thrift.ConvertedType.DATE: np.dtype('<M8[ns]'),
parquet_thrift.ConvertedType.TIMESTAMP_MILLIS: np.dtype(
'<M8[ns]')
parquet_thrift.ConvertedType.TIMESTAMP_MILLIS: np.dtype('<M8[ns]'),
parquet_thrift.ConvertedType.TIME_MICROS: np.dtype('<m8[ns]'),
parquet_thrift.ConvertedType.TIMESTAMP_MICROS: np.dtype('<M8[ns]')
}


def typemap(se):
"""Get the final (pandas) dtype - no actual conversion"""
"""Get the final dtype - no actual conversion"""
if se.converted_type is None:
if se.type in simple:
return simple[se.type]
Expand All @@ -83,23 +87,38 @@ def convert(data, se):
data: pandas series of primitive type
se: a schema element.
"""
# TODO: if input is categorical, only map on categories
data = np.asarray(data, dtype=simple[se.type])
ctype = se.converted_type
if ctype is None:
return data
if ctype == parquet_thrift.ConvertedType.UTF8:
return data.astype("O").str.decode('utf8')
if isinstance(data, list) or data.dtype != "O":
out = np.empty(len(data), dtype="O")
else:
out = data
out[:] = [s.decode('utf8') for s in data]
return out
if ctype == parquet_thrift.ConvertedType.DECIMAL:
scale_factor = 10**-se.scale
return data * scale_factor
elif ctype == parquet_thrift.ConvertedType.DATE:
return pd.to_datetime(data.map(datetime.date.fromordinal))
return (data * DAYS_TO_MILLIS).view('datetime64[ns]')
elif ctype == parquet_thrift.ConvertedType.TIME_MILLIS:
return pd.to_timedelta(data, unit='ms')
out = np.empty(len(data), dtype='int64')
time_shift(data, out, 1000000)
return out.view('timedelta64[ns]')
elif ctype == parquet_thrift.ConvertedType.TIMESTAMP_MILLIS:
return pd.to_datetime(data, unit='ms')
out = np.empty_like(data)
time_shift(data, out, 1000000)
return out.view('datetime64[ns]')
elif ctype == parquet_thrift.ConvertedType.TIME_MICROS:
return pd.to_timedelta(data, unit='us')
out = np.empty_like(data)
time_shift(data, out)
return out.view('timedelta64[ns]')
elif ctype == parquet_thrift.ConvertedType.TIMESTAMP_MICROS:
return pd.to_datetime(data, unit='us')
out = np.empty_like(data)
time_shift(data, out)
return out.view('datetime64[ns]')
elif ctype == parquet_thrift.ConvertedType.UINT_8:
return data.astype(np.uint8)
elif ctype == parquet_thrift.ConvertedType.UINT_16:
Expand All @@ -109,15 +128,33 @@ def convert(data, se):
elif ctype == parquet_thrift.ConvertedType.UINT_64:
return data.astype(np.uint64)
elif ctype == parquet_thrift.ConvertedType.JSON:
return data.astype('O').str.decode('utf8').map(json.loads)
if isinstance(data, list) or data.dtype != "O":
out = np.empty(len(data), dtype="O")
else:
out = data
out[:] = [json.loads(d.decode('utf8')) for d in data]
return out
elif ctype == parquet_thrift.ConvertedType.BSON:
return data.map(unbson)
if isinstance(data, list) or data.dtype != "O":
out = np.empty(len(data), dtype="O")
else:
out = data
out[:] = [unbson(d) for d in data]
return out
elif ctype == parquet_thrift.ConvertedType.INTERVAL:
# for those that understand, output is month, day, ms
# maybe should convert to timedelta
# TODO: seems like a np.view should do this much faster
return data.map(lambda x: np.fromstring(x, dtype='<u4'))
return data.view('<u4').reshape((len(data), -1))
else:
logger.info("Converted type '%s'' not handled",
parquet_thrift.ConvertedType._VALUES_TO_NAMES[ctype]) # pylint:disable=protected-access
return data


@numba.njit()
def time_shift(indata, outdata, factor=1000):
for i in range(len(indata)):
if indata[i] == nat:
outdata[i] = nat
else:
outdata[i] = indata[i] * factor

0 comments on commit bcdb683

Please sign in to comment.