Skip to content

Commit

Permalink
Read multi indexes (#331)
Browse files Browse the repository at this point in the history
* Read multi indexes
to_pandas() can also accept a list of columns

* fix categories
  • Loading branch information
0x0L authored and martindurant committed May 2, 2018
1 parent e2962a9 commit 1ff6bfe
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 70 deletions.
55 changes: 24 additions & 31 deletions fastparquet/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from collections import OrderedDict
import json
import os
import re
import six
import struct

Expand All @@ -19,7 +18,7 @@
from . import core, schema, converted_types, encoding, dataframe
from .util import (default_open, ParquetException, val_to_num,
ensure_bytes, check_column_names, metadata_from_many,
ex_from_sep, get_file_scheme)
ex_from_sep, get_file_scheme, STR_TYPE)


class ParquetFile(object):
Expand Down Expand Up @@ -307,10 +306,11 @@ def iter_row_groups(self, columns=None, categories=None, filters=[],
(This is not row-level filtering)
Filter syntax: [(column, op, val), ...],
where op is [==, >, >=, <, <=, !=, in, not in]
index: string or None
Column to assign to the index. If None, index is inferred from the
metadata (if this was originally pandas data); if the metadata does
not exist or index is False, index is simple sequential integers.
index: string or list of strings or False or None
Column(s) to assign to the (multi-)index. If None, index is
inferred from the metadata (if this was originally pandas data); if
the metadata does not exist or index is False, index is simple
sequential integers.
assign: dict {cols: array}
Pre-allocated memory to write to. If None, will allocate memory
here.
Expand All @@ -319,11 +319,10 @@ def iter_row_groups(self, columns=None, categories=None, filters=[],
-------
Generator yielding one Pandas data-frame per row-group
"""
if index is None:
index = self._get_index(index)
index = self._get_index(index)
columns = columns or self.columns
if index and index not in columns:
columns.append(index)
if index:
columns += [i for i in index if i not in columns]
check_column_names(self.columns, columns, categories)
rgs = self.filter_row_groups(filters)
if all(column.file_path is None for rg in self.row_groups
Expand All @@ -347,13 +346,8 @@ def _get_index(self, index=None):
if index is None:
index = json.loads(self.key_value_metadata.get('pandas', '{}')).get(
'index_columns', [])
if len(index) > 1:
raise NotImplementedError('multi-index not yet supported, '
'use index=False')
if index:
return index[0]
else:
return None
if isinstance(index, STR_TYPE):
index = [index]
return index

def to_pandas(self, columns=None, categories=None, filters=[],
Expand All @@ -378,22 +372,22 @@ def to_pandas(self, columns=None, categories=None, filters=[],
(This is not row-level filtering)
Filter syntax: [(column, op, val), ...],
where op is [==, >, >=, <, <=, !=, in, not in]
index: string or None
Column to assign to the index. If None, index is inferred from the
metadata (if this was originally pandas data); if the metadata does
not exist or index is False, index is simple sequential integers.
index: string or list of strings or False or None
Column(s) to assign to the (multi-)index. If None, index is
inferred from the metadata (if this was originally pandas data); if
the metadata does not exist or index is False, index is simple
sequential integers.
Returns
-------
Pandas data-frame
"""
rgs = self.filter_row_groups(filters)
size = sum(rg.num_rows for rg in rgs)
if index is None:
index = self._get_index(index)
index = self._get_index(index)
columns = columns or self.columns
if index and index not in columns:
columns.append(index)
if index:
columns += [i for i in index if i not in columns]
check_column_names(self.columns + list(self.cats), columns, categories)
df, views = self.pre_allocate(size, columns, categories, index)
start = 0
Expand Down Expand Up @@ -500,7 +494,8 @@ def __str__(self):


def _pre_allocate(size, columns, categories, index, cs, dt, tz=None):
cols = [c for c in columns if index != c]
index = index or []
cols = [c for c in columns if c not in index]
categories = categories or {}
cats = cs.copy()
if isinstance(categories, dict):
Expand All @@ -512,13 +507,11 @@ def get_type(name):
return dt.get(name, None)

dtypes = [get_type(c) for c in cols]
index_type = get_type(index)
index_types = [get_type(i) for i in index]
cols.extend(cs)
dtypes.extend(['category'] * len(cs))
df, views = dataframe.empty(dtypes, size, cols=cols, index_name=index,
index_type=index_type, cats=cats, timezones=tz)
if index and re.match(r'__index_level_\d+__', index):
df.index.name = None
df, views = dataframe.empty(dtypes, size, cols=cols, index_names=index,
index_types=index_types, cats=cats, timezones=tz)
return df, views


Expand Down
99 changes: 60 additions & 39 deletions fastparquet/dataframe.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import re
from collections import OrderedDict
import numpy as np
from pandas.core.index import CategoricalIndex, RangeIndex, Index
from pandas.core.index import CategoricalIndex, RangeIndex, Index, MultiIndex
from pandas.core.internals import BlockManager
from pandas import Categorical, DataFrame, Series
from pandas.api.types import is_categorical_dtype
from .util import STR_TYPE


def empty(types, size, cats=None, cols=None, index_type=None, index_name=None,
def empty(types, size, cats=None, cols=None, index_types=None, index_names=None,
timezones=None):
"""
Create empty DataFrame to assign into
Expand Down Expand Up @@ -43,53 +44,72 @@ def empty(types, size, cats=None, cols=None, index_type=None, index_name=None,
if isinstance(types, STR_TYPE):
types = types.split(',')
cols = cols if cols is not None else range(len(types))

def cat(col):
if cats is None or col not in cats:
return RangeIndex(0, 2**14)
elif isinstance(cats[col], int):
return RangeIndex(0, cats[col])
else: # explicit labels list
return cats[col]

df = OrderedDict()
for t, col in zip(types, cols):
if str(t) == 'category':
if cats is None or col not in cats:
df[str(col)] = Categorical(
[], categories=RangeIndex(0, 2**14),
fastpath=True)
elif isinstance(cats[col], int):
df[str(col)] = Categorical(
[], categories=RangeIndex(0, cats[col]),
fastpath=True)
else: # explicit labels list
df[str(col)] = Categorical([], categories=cats[col],
fastpath=True)
df[str(col)] = Categorical([], categories=cat(col), fastpath=True)
else:
d = np.empty(0, dtype=t)
if d.dtype.kind == "M" and str(col) in timezones:
d = Series(d).dt.tz_localize(timezones[str(col)])
df[str(col)] = d
df = DataFrame(df)

if index_type is not None and index_type is not False:
if index_name is None:
df = DataFrame(df)
if not index_types:
index = RangeIndex(size)
elif len(index_types) == 1:
t, col = index_types[0], index_names[0]
if col is None:
raise ValueError('If using an index, must give an index name')
if str(index_type) == 'category':
if cats is None or index_name not in cats:
c = Categorical(
[], categories=RangeIndex(0, 2**14),
fastpath=True)
elif isinstance(cats[index_name], int):
c = Categorical(
[], categories=RangeIndex(0, cats[index_name]),
fastpath=True)
else: # explicit labels list
c = Categorical([], categories=cats[index_name],
fastpath=True)
vals = np.empty(size, dtype=c.codes.dtype)
if str(t) == 'category':
c = Categorical([], categories=cat(col), fastpath=True)
vals = np.zeros(size, dtype=c.codes.dtype)
index = CategoricalIndex(c)
index._data._codes = vals
views[index_name] = vals
views[col] = vals
views[col+'-catdef'] = index._data
else:
index = Index(np.empty(size, dtype=index_type))
views[index_name] = index.values

axes = [df._data.axes[0], index]
d = np.empty(size, dtype=t)
# if d.dtype.kind == "M" and str(col) in timezones:
# d = Series(d).dt.tz_localize(timezones[str(col)])
index = Index(d)
views[col] = index.values
else:
axes = [df._data.axes[0], RangeIndex(size)]
index = MultiIndex([[]], [[]])
# index = MultiIndex.from_arrays(indexes)
index._levels = list()
index._labels = list()
for i, col in enumerate(index_names):
if str(index_types[i]) == 'category':
c = Categorical([], categories=cat(col), fastpath=True)
z = CategoricalIndex(c)
z._data._codes = c.categories._data
z._set_categories = c._set_categories
index._levels.append(z)

vals = np.zeros(size, dtype=c.codes.dtype)
index._labels.append(vals)

views[col] = index._labels[i]
views[col+'-catdef'] = index._levels[i]
else:
d = np.empty(size, dtype=index_types[i])
# if d.dtype.kind == "M" and str(col) in timezones:
# d = Series(d).dt.tz_localize(timezones[str(col)])
index._levels.append(Index(d))
index._labels.append(np.arange(size, dtype=int))
views[col] = index._levels[i]._data

axes = [df._data.axes[0], index]

# allocate and create blocks
blocks = []
Expand Down Expand Up @@ -131,8 +151,9 @@ def empty(types, size, cats=None, cols=None, index_type=None, index_name=None,
else:
views[col] = block.values[i]

if index_name is not None and index_name is not False:
df.index.name = index_name
if str(index_type) == 'category':
views[index_name+'-catdef'] = df._data.axes[1].values
if index_names:
df.index.names = [
None if re.match(r'__index_level_\d+__', n) else n
for n in index_names
]
return df, views
29 changes: 29 additions & 0 deletions fastparquet/test/test_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,3 +306,32 @@ def test_null_sizes(tempdir):
pf = fastparquet.ParquetFile(tempdir)
assert pf.dtypes['a'] == 'float16'
assert pf.dtypes['b'] == 'float64'


def test_multi_index(tempdir):
r = pd.date_range('2000', '2000-01-03')
df = pd.DataFrame({'a': r, 'b': [1, 3, 3], 'c': [1.0, np.nan, 3]})
df = df.set_index(['a', 'b'])
fastparquet.write(tempdir, df, has_nulls=True, file_scheme='hive')
dg = fastparquet.ParquetFile(tempdir).to_pandas()
assert dg.shape == (3, 1)
assert len(dg.index.levels) == 2
assert dg.index.levels[0].name == 'a'
assert dg.index.levels[0].dtype == '<M8[ns]'
assert dg.index.levels[1].name == 'b'
assert dg.index.levels[1].dtype == int


def test_multi_index_category(tempdir):
r = pd.date_range('2000', '2000-01-03')
df = pd.DataFrame({'a': r, 'b': ['X', 'X', 'L'], 'c': [1.0, np.nan, 3]})
df['b'] = df['b'].astype('category')
df = df.set_index(['a', 'b'])
fastparquet.write(tempdir, df, has_nulls=True, file_scheme='hive')
dg = fastparquet.ParquetFile(tempdir).to_pandas()
assert dg.shape == (3, 1)
assert len(dg.index.levels) == 2
assert dg.index.levels[0].name == 'a'
assert dg.index.levels[0].dtype == '<M8[ns]'
assert dg.index.levels[1].name == 'b'
assert dg.index.levels[1].is_categorical()

0 comments on commit 1ff6bfe

Please sign in to comment.