Skip to content

Commit

Permalink
Merge pull request #45 from martindurant/pre_allocate
Browse files Browse the repository at this point in the history
Pre allocate
  • Loading branch information
martindurant committed Dec 15, 2016
2 parents ea1680d + 90d313e commit ac7fd39
Show file tree
Hide file tree
Showing 10 changed files with 323 additions and 106 deletions.
132 changes: 103 additions & 29 deletions fastparquet/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@

from .core import read_thrift
from .thrift_structures import parquet_thrift
from . import core, schema, converted_types, encoding, writer
from . import core, schema, converted_types, encoding, writer, dataframe
from .util import (default_open, ParquetException, sep_from_open, val_to_num,
ensure_bytes)
ensure_bytes)


class ParquetFile(object):
Expand Down Expand Up @@ -110,7 +110,8 @@ def _read_partitions(self):
col.file_path or "")
for key, val in partitions:
cats.setdefault(key, set()).add(val)
self.cats = {key: list(v) for key, v in cats.items()}
self.cats = {key: list([val_to_num(x) for x in v])
for key, v in cats.items()}

def row_group_filename(self, rg):
if rg.columns[0].file_path:
Expand All @@ -120,21 +121,37 @@ def row_group_filename(self, rg):
return self.fn


def read_row_group_file(self, rg, columns, categories, index=None):
def read_row_group_file(self, rg, columns, categories, index=None,
assign=None):
""" Open file for reading, and process it as a row-group """
fn = self.row_group_filename(rg)
return core.read_row_group_file(
ret = False
if assign is None:
df, assign = self.pre_allocate(rg.num_rows, columns,
categories, index)
ret = True
core.read_row_group_file(
fn, rg, columns, categories, self.helper, self.cats,
open=self.open, selfmade=self.selfmade, index=index)
open=self.open, selfmade=self.selfmade, index=index,
assign=assign)
if ret:
return df

def read_row_group(self, rg, columns, categories, infile=None,
index=None):
index=None, assign=None):
"""
Access row-group in a file and read some columns into a data-frame.
"""
return core.read_row_group(
ret = False
if assign is None:
df, assign = self.pre_allocate(rg.num_rows, columns,
categories, index)
ret = True
core.read_row_group(
infile, rg, columns, categories, self.helper, self.cats,
self.selfmade, index=index)
self.selfmade, index=index, assign=assign)
if ret:
return df

def grab_cats(self, columns, row_group_index=0):
""" Read dictionaries of first row_group
Expand Down Expand Up @@ -187,7 +204,7 @@ def filter_row_groups(self, filters):
not(filter_out_cats(rg, filters))]

def iter_row_groups(self, columns=None, categories=None, filters=[],
index=None):
index=None, assign=None):
"""
Read data from parquet into a Pandas dataframe.
Expand All @@ -196,16 +213,21 @@ def iter_row_groups(self, columns=None, categories=None, filters=[],
columns: list of names or `None`
Column to load (see `ParquetFile.columns`). Any columns in the
data not in this list will be ignored. If `None`, read all columns.
categories: list of names or `None`
categories: list, dict or `None`
If a column is encoded using dictionary encoding in every row-group
and its name is also in this list, it will generate a Pandas
Category-type column, potentially saving memory and time.
Category-type column, potentially saving memory and time. If a
dict {col: int}, the value indicates the number of categories,
so that the optimal data-dtype can be allocated.
filters: list of tuples
Filter syntax: [(column, op, val), ...],
where op is [==, >, >=, <, <=, !=, in, not in]
index: string or None
Column to assign to the index. If None, index is simple sequential
integers.
assign: dict {cols: array}
Pre-allocated memory to write to. If None, will allocate memory
here.
Returns
-------
Expand All @@ -217,11 +239,18 @@ def iter_row_groups(self, columns=None, categories=None, filters=[],
for column in rg.columns):
with self.open(self.fn) as f:
for rg in rgs:
yield self.read_row_group(rg, columns, categories, infile=f,
index=index)
df, views = self.pre_allocate(rg.num_rows, columns,
categories, index)
self.read_row_group(rg, columns, categories, infile=f,
index=index, assign=views)
yield df
else:
for rg in rgs:
yield self.read_row_group_file(rg, columns, categories, index)
df, views = self.pre_allocate(rg.num_rows, columns,
categories, index)
self.read_row_group_file(rg, columns, categories, index,
assign=views)
yield df

def to_pandas(self, columns=None, categories=None, filters=[],
index=None):
Expand All @@ -233,10 +262,12 @@ def to_pandas(self, columns=None, categories=None, filters=[],
columns: list of names or `None`
Column to load (see `ParquetFile.columns`). Any columns in the
data not in this list will be ignored. If `None`, read all columns.
categories: list of names or `None`
categories: list, dict or `None`
If a column is encoded using dictionary encoding in every row-group
and its name is also in this list, it will generate a Pandas
Category-type column, potentially saving memory and time.
Category-type column, potentially saving memory and time. If a
dict {col: int}, the value indicates the number of categories,
so that the optimal data-dtype can be allocated.
filters: list of tuples
Filter syntax: [(column, op, val), ...],
where op is [==, >, >=, <, <=, !=, in, not in]
Expand All @@ -248,19 +279,33 @@ def to_pandas(self, columns=None, categories=None, filters=[],
-------
Pandas data-frame
"""
tot = self.iter_row_groups(columns, categories, filters, index)
num_row_groups = len(self.filter_row_groups(filters))
rgs = self.filter_row_groups(filters)
size = sum(rg.num_rows for rg in rgs)
columns = columns or self.columns

# TODO: if categories vary from one rg to next, need
# pandas.types.concat.union_categoricals
try:
if num_row_groups > 1:
return pd.concat(tot, ignore_index=index is None, copy=False)
else:
return next(iter(tot))
except (ValueError, StopIteration):
return pd.DataFrame(columns=columns + list(self.cats))
df, views = self.pre_allocate(size, columns, categories, index)
start = 0
if self.file_scheme == 'simple':
with self.open(self.fn) as f:
for rg in rgs:
parts = {name: (v if name.endswith('-catdef')
else v[start:start + rg.num_rows])
for (name, v) in views.items()}
self.read_row_group(rg, columns, categories, infile=f,
index=index, assign=parts)
start += rg.num_rows
else:
for rg in rgs:
parts = {name: (v if name.endswith('-catdef')
else v[start:start + rg.num_rows])
for (name, v) in views.items()}
self.read_row_group_file(rg, columns, categories, index,
assign=parts)
start += rg.num_rows
return df

def pre_allocate(self, size, columns, categories, index):
return _pre_allocate(size, columns, categories, index, self.cats,
self.dtypes)

@property
def count(self):
Expand All @@ -278,6 +323,18 @@ def dtypes(self):
""" Implied types of the columns in the schema """
dtype = {f.name: converted_types.typemap(f)
for f in self.schema if f.num_children is None}
for col, dt in dtype.copy().items():
if dt.kind == 'i':
num_nulls = 0
for rg in self.row_groups:
chunks = [c for c in rg.columns
if c.meta_data.path_in_schema[-1] == col]
for chunk in chunks:
if chunk.meta_data.statistics is not None:
num_nulls += (
chunk.meta_data.statistics.null_count or 0)
if num_nulls:
dtype[col] = np.dtype('f')
for cat in self.cats:
dtype[cat] = "category"
# pd.Series(self.cats[cat]).map(val_to_num).dtype
Expand All @@ -289,6 +346,23 @@ def __str__(self):
__repr__ = __str__


def _pre_allocate(size, columns, categories, index, cs, dt):
cols = [c for c in columns if index != c]
categories = categories or {}
cats = cs.copy()
if isinstance(categories, dict):
cats.update(categories)
dtypes = ['category' if c in categories else dt[c]
for c in cols]
index_type = ('category' if index in categories
else dt.get(index, None))
cols.extend(cs)
dtypes.extend(['category'] * len(cs))
df, views = dataframe.empty(dtypes, size, cols=cols, index_name=index,
index_type=index_type, cats=cats)
return df, views


def filter_out_stats(rg, filters, helper):
"""
According to the filters, should this row-group be excluded
Expand Down
16 changes: 8 additions & 8 deletions fastparquet/benchmarks/columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,19 @@ def time_column():
write(fn, df, has_nulls=False)

pf = ParquetFile(fn)
pf.to_pandas(categories=['w']) # warm-up
pf.to_pandas(categories={'w': 3}) # warm-up

with measure('%s: read, no nulls' % d.dtypes[col], result):
pf.to_pandas(categories=['w'])
pf.to_pandas(categories={'w': 3})

with measure('%s: write, no nulls, has_null=True' % d.dtypes[col], result):
write(fn, df, has_nulls=True)

pf = ParquetFile(fn)
pf.to_pandas(categories=['w']) # warm-up
pf.to_pandas(categories={'w': 3}) # warm-up

with measure('%s: read, no nulls, has_null=True' % d.dtypes[col], result):
pf.to_pandas(categories=['w'])
pf.to_pandas(categories={'w': 3})

if d.dtypes[col].kind == 'm':
d.loc[n//2, col] = pd.to_datetime('NaT')
Expand All @@ -62,19 +62,19 @@ def time_column():
write(fn, df, has_nulls=True)

pf = ParquetFile(fn)
pf.to_pandas(categories=['w']) # warm-up
pf.to_pandas(categories={'w': 3}) # warm-up

with measure('%s: read, with null, has_null=True' % d.dtypes[col], result):
pf.to_pandas(categories=['w'])
pf.to_pandas(categories={'w': 3})

with measure('%s: write, with null, has_null=False' % d.dtypes[col], result):
write(fn, df, has_nulls=False)

pf = ParquetFile(fn)
pf.to_pandas(categories=['w']) # warm-up
pf.to_pandas(categories={'w': 3}) # warm-up

with measure('%s: read, with null, has_null=False' % d.dtypes[col], result):
pf.to_pandas(categories=['w'])
pf.to_pandas(categories={'w': 3})

return result

Expand Down
5 changes: 2 additions & 3 deletions fastparquet/converted_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ def convert(data, se):
data: pandas series of primitive type
se: a schema element.
"""
#data = np.asarray(data, dtype=simple[se.type])
ctype = se.converted_type
if ctype is None:
return data
Expand Down Expand Up @@ -155,8 +154,8 @@ def convert(data, se):
return data


@numba.njit()
def time_shift(indata, outdata, factor=1000):
@numba.njit(nogil=True)
def time_shift(indata, outdata, factor=1000): # pragma: no cover
for i in range(len(indata)):
if indata[i] == nat:
outdata[i] = nat
Expand Down

0 comments on commit ac7fd39

Please sign in to comment.