Skip to content

Commit

Permalink
Merge pull request #38 from martindurant/iter
Browse files Browse the repository at this point in the history
Add iteration function
  • Loading branch information
martindurant committed Dec 4, 2016
2 parents 446edef + 8519ca1 commit c4c300f
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 13 deletions.
69 changes: 56 additions & 13 deletions fastparquet/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,25 @@ def grab_cats(self, columns, row_group_index=0):
grab_dict=True)
return out

def to_pandas(self, columns=None, categories=None, filters=[],
index=None):
def filter_row_groups(self, filters):
"""
Select row groups using set of filters
Parameters
----------
filters: list of tuples
See ``filter_out_cats`` and ``filter_out_stats``
Returns
-------
Filtered list of row groups
"""
return [rg for rg in self.row_groups if
not(filter_out_stats(rg, filters, self.helper)) and
not(filter_out_cats(rg, filters))]

def iter_row_groups(self, columns=None, categories=None, filters=[],
index=None):
"""
Read data from parquet into a Pandas dataframe.
Expand All @@ -187,28 +204,54 @@ def to_pandas(self, columns=None, categories=None, filters=[],
Returns
-------
Pandas data-frame
Generator yielding one Pandas data-frame per row-group
"""
columns = columns or self.columns
rgs = [rg for rg in self.row_groups if
not(filter_out_stats(rg, filters, self.helper)) and
not(filter_out_cats(rg, filters))]
rgs = self.filter_row_groups(filters)
if all(column.file_path is None for rg in self.row_groups
for column in rg.columns):
with self.open(self.fn) as f:
tot = [self.read_row_group(rg, columns, categories, infile=f,
for rg in rgs:
yield self.read_row_group(rg, columns, categories, infile=f,
index=index)
for rg in rgs]
else:
tot = [self.read_row_group_file(rg, columns, categories, index)
for rg in rgs]
for rg in rgs:
yield self.read_row_group_file(rg, columns, categories, index)

if len(tot) == 0:
return pd.DataFrame(columns=columns + list(self.cats))
def to_pandas(self, columns=None, categories=None, filters=[],
index=None):
"""
Read data from parquet into a Pandas dataframe.
Parameters
----------
columns: list of names or `None`
Column to load (see `ParquetFile.columns`). Any columns in the
data not in this list will be ignored. If `None`, read all columns.
categories: list of names or `None`
If a column is encoded using dictionary encoding in every row-group
and its name is also in this list, it will generate a Pandas
Category-type column, potentially saving memory and time.
filters: list of tuples
Filter syntax: [(column, op, val), ...],
where op is [==, >, >=, <, <=, !=, in, not in]
index: string or None
Column to assign to the index. If None, index is simple sequential
integers.
Returns
-------
Pandas data-frame
"""
tot = self.iter_row_groups(columns, categories, filters, index)
columns = columns or self.columns

# TODO: if categories vary from one rg to next, need
# pandas.types.concat.union_categoricals
return pd.concat(tot, ignore_index=index is None)
try:
return pd.concat(tot, ignore_index=index is None)
except ValueError:
return pd.DataFrame(columns=columns + list(self.cats))

@property
def count(self):
Expand Down
19 changes: 19 additions & 0 deletions fastparquet/test/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import numpy as np
import pandas as pd
import pytest

from fastparquet.util import tempdir
from fastparquet import write, ParquetFile
Expand Down Expand Up @@ -82,3 +83,21 @@ def test_sorted_row_group_columns(tempdir):
'z': {'min': ['a', 'c'], 'max': ['b', 'd']}}

assert result == expected


def test_iter(tempdir):
df = pd.DataFrame({'x': [1, 2, 3, 4],
'y': [1.0, 2.0, 1.0, 2.0],
'z': ['a', 'b', 'c', 'd']})
df.index.name = 'index'

fn = os.path.join(tempdir, 'foo.parquet')
write(fn, df, row_group_offsets=[0, 2], write_index=True)
pf = ParquetFile(fn)
out = iter(pf.iter_row_groups(index='index'))
d1 = next(out)
pd.util.testing.assert_frame_equal(d1, df[:2])
d2 = next(out)
pd.util.testing.assert_frame_equal(d2, df[2:])
with pytest.raises(StopIteration):
next(out)

0 comments on commit c4c300f

Please sign in to comment.