Skip to content

Commit

Permalink
Merge pull request #177 from martindurant/flatten
Browse files Browse the repository at this point in the history
Flatten schema to enable loading of non-repeated columns
  • Loading branch information
martindurant committed Jun 29, 2017
2 parents 5d598ac + 086e191 commit 38f9084
Show file tree
Hide file tree
Showing 10 changed files with 163 additions and 72 deletions.
45 changes: 45 additions & 0 deletions docs/source/details.rst
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,51 @@ A couple of caveats should be noted:
- complex numbers must have their real and imaginary parts stored as two
separate float columns.

Spark Timestamps
----------------

Fastparquet can read and write int96-style timestamps, as typically found in Apache
Spark and Map-Reduce output.

Currently, int96-style timestamps are the only known use of the int96 type without
an explicit schema-level converted type assignment. They will be automatically converted to
times upon loading.

Similarly on writing, the ``times`` keyword controls the encoding of timestamp columns:
"int64" is the default and faster option, producing parquet standard compliant data, but
"int96" is required to write data which is compatible with Spark.

Reading Nested Schema
---------------------

Fastparquet can read nested schemas. The principal mechamism is *flattening*, whereby
parquet schema struct columns become top-level columns. For instance, if a schema looks
like

.. code-block::
root
| - visitor: OPTIONAL
| - ip: BYTE_ARRAY, UTF8, OPTIONAL
- network_id: BYTE_ARRAY, UTF8, OPTIONAL
then the ``ParquetFile`` will include entries "visitor.ip" and "visitor.network_id" in its
``columns``, and these will become ordinary Pandas columns.

Fastparquet also handles some parquet LIST and MAP types. For instance, the schema may include

.. code-block::
| - tags: LIST, OPTIONAL
- list: REPEATED
- element: BYTE_ARRAY, UTF8, OPTIONAL
In this case, ``columns`` would include an entry "tags", which evaluates to an object column
containing lists of strings. Reading such columns will be relatively slow.
If the 'element' type is anything other than a primitive type,
i.e., a struct, map or list, than fastparquet will not be able to read it, and the resulting
column will either not be contained in the output, or contain only ``None`` values.

Partitions and row-groups
-------------------------

Expand Down
6 changes: 2 additions & 4 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ Briefly, some features of interest:
- choice of compression per-column and various optimized encoding schemes; ability to choose row divisions and partitioning on write.
- acceleration of both reading and writing using `numba <http://numba.pydata.org/>`_
- ability to read and write to arbitrary file-like objects, allowing interoperability with `s3fs <http://s3fs.readthedocs.io/>`_, `hdfs3 <http://hdfs3.readthedocs.io/>`_, `adlfs <https://github.com/Azure/azure-data-lake-store-python>`_ and possibly others.
- (in development) can be called from `dask <http://dask.pydata.org>`_, to enable parallel reading and writing with Parquet files, possibly distributed across a cluster.
- can be called from `dask <http://dask.pydata.org>`_, to enable parallel reading and writing with Parquet files, possibly distributed across a cluster.

Caveats, Known Issues
---------------------
Expand All @@ -48,9 +48,7 @@ fastparquet is, however, capable of reading all the data files from the
project. Some encoding mechanisms in Parquet are rare, and may be implemented
on request - please post an issue.

Nested data types do not fit well with the pandas tabular model, and are not
currently supported. We do aim to support 1-level nesting (lists and key-value
maps) in the future.
Some deeply-nested columns will not be readable, e.g., lists of lists.

Not all output options will be compatible with every other Parquet
framework, which each implement only a subset of the standard, see
Expand Down
59 changes: 31 additions & 28 deletions fastparquet/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,10 @@ def helper(self):
@property
def columns(self):
""" Column names """
return list(self._schema[0].children)
return [c for c, i in self._schema[0].children.items()
if len(getattr(i, 'children', [])) == 0
or i.converted_type in [parquet_thrift.ConvertedType.LIST,
parquet_thrift.ConvertedType.MAP]]

@property
def statistics(self):
Expand Down Expand Up @@ -179,26 +182,25 @@ def row_group_filename(self, rg):
return self.fn

def read_row_group_file(self, rg, columns, categories, index=None,
assign=None, timestamp96=[]):
assign=None):
""" Open file for reading, and process it as a row-group """
if categories is None:
categories = self.categories
fn = self.row_group_filename(rg)
ret = False
if assign is None:
df, assign = self.pre_allocate(
rg.num_rows, columns, categories, index,
timestamp96=timestamp96)
rg.num_rows, columns, categories, index)
ret = True
core.read_row_group_file(
fn, rg, columns, categories, self.schema, self.cats,
open=self.open, selfmade=self.selfmade, index=index,
assign=assign, timestamp96=timestamp96)
assign=assign)
if ret:
return df

def read_row_group(self, rg, columns, categories, infile=None,
index=None, assign=None, timestamp96=[]):
index=None, assign=None):
"""
Access row-group in a file and read some columns into a data-frame.
"""
Expand All @@ -207,13 +209,11 @@ def read_row_group(self, rg, columns, categories, infile=None,
ret = False
if assign is None:
df, assign = self.pre_allocate(rg.num_rows, columns,
categories, index,
timestamp96=timestamp96)
categories, index)
ret = True
core.read_row_group(
infile, rg, columns, categories, self.schema, self.cats,
self.selfmade, index=index, assign=assign,
timestamp96=timestamp96, sep=self.sep)
self.selfmade, index=index, assign=assign, sep=self.sep)
if ret:
return df

Expand Down Expand Up @@ -339,7 +339,7 @@ def _get_index(self, index=None):
return index

def to_pandas(self, columns=None, categories=None, filters=[],
index=None, timestamp96=[]):
index=None):
"""
Read data from parquet into a Pandas dataframe.
Expand Down Expand Up @@ -377,8 +377,7 @@ def to_pandas(self, columns=None, categories=None, filters=[],
if index and index not in columns:
columns.append(index)
check_column_names(self.columns, columns, categories)
df, views = self.pre_allocate(size, columns, categories, index,
timestamp96=timestamp96)
df, views = self.pre_allocate(size, columns, categories, index)
start = 0
if self.file_scheme == 'simple':
with self.open(self.fn) as f:
Expand All @@ -387,24 +386,23 @@ def to_pandas(self, columns=None, categories=None, filters=[],
else v[start:start + rg.num_rows])
for (name, v) in views.items()}
self.read_row_group(rg, columns, categories, infile=f,
index=index, assign=parts,
timestamp96=timestamp96)
index=index, assign=parts)
start += rg.num_rows
else:
for rg in rgs:
parts = {name: (v if name.endswith('-catdef')
else v[start:start + rg.num_rows])
for (name, v) in views.items()}
self.read_row_group_file(rg, columns, categories, index,
assign=parts, timestamp96=timestamp96)
assign=parts)
start += rg.num_rows
return df

def pre_allocate(self, size, columns, categories, index, timestamp96=[]):
def pre_allocate(self, size, columns, categories, index):
if categories is None:
categories = self.categories
return _pre_allocate(size, columns, categories, index, self.cats,
self._dtypes(categories), timestamp96=timestamp96)
self._dtypes(categories))

@property
def count(self):
Expand Down Expand Up @@ -440,16 +438,16 @@ def _dtypes(self, categories=None):
""" Implied types of the columns in the schema """
if categories is None:
categories = self.categories
dtype = {f.name: (converted_types.typemap(f)
dtype = {name: (converted_types.typemap(f)
if f.num_children in [None, 0] else np.dtype("O"))
for f in self.schema.root.children.values()}
for name, f in self.schema.root.children.items()}
for col, dt in dtype.copy().items():
if dt.kind in ['i', 'b']:
# int/bool columns that may have nulls become float columns
num_nulls = 0
for rg in self.row_groups:
chunks = [c for c in rg.columns
if c.meta_data.path_in_schema[-1] == col]
if '.'.join(c.meta_data.path_in_schema) == col]
for chunk in chunks:
if chunk.meta_data.statistics is None:
num_nulls = True
Expand All @@ -465,6 +463,8 @@ def _dtypes(self, categories=None):
dtype[col] = np.dtype('f4')
else:
dtype[col] = np.dtype('f8')
elif dt == 'S12':
dtype[col] = 'M8[ns]'
for field in categories:
dtype[field] = 'category'
for cat in self.cats:
Expand All @@ -478,7 +478,7 @@ def __str__(self):
__repr__ = __str__


def _pre_allocate(size, columns, categories, index, cs, dt, timestamp96=[]):
def _pre_allocate(size, columns, categories, index, cs, dt):
cols = [c for c in columns if index != c]
categories = categories or {}
cats = cs.copy()
Expand All @@ -488,8 +488,6 @@ def _pre_allocate(size, columns, categories, index, cs, dt, timestamp96=[]):
def get_type(name):
if name in categories:
return 'category'
if name in timestamp96:
return 'M8[ns]'
return dt.get(name, None)

dtypes = [get_type(c) for c in cols]
Expand Down Expand Up @@ -599,7 +597,7 @@ def statistics(obj):

if isinstance(obj, ParquetFile):
L = list(map(statistics, obj.row_groups))
d = {n: {col: [item[col].get(n, None) for item in L]
d = {n: {col: [item.get(col, {}).get(n, None) for item in L]
for col in obj.columns}
for n in ['min', 'max', 'null_count', 'distinct_count']}
if not L:
Expand All @@ -610,10 +608,15 @@ def statistics(obj):
se = schema.schema_element(col.meta_data.path_in_schema)
if se.converted_type is not None:
for name in ['min', 'max']:
d[name][column] = (
[None] if d[name][column] is None or None in d[name][column]
else list(converted_types.convert(np.array(d[name][column]), se))
try:
d[name][column] = (
[None] if d[name][column] is None
or None in d[name][column]
else list(converted_types.convert(
np.array(d[name][column]), se))
)
except KeyError:
d[name][column] = None
return d


Expand Down
2 changes: 1 addition & 1 deletion fastparquet/converted_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def typemap(se):
return np.dtype("O")


def convert(data, se, timestamp96=False):
def convert(data, se, timestamp96=True):
"""Convert known types from primitive to rich.
Parameters
Expand Down

0 comments on commit 38f9084

Please sign in to comment.