In [36]:
import os
import numpy as np
import pandas as pd
import fastparquet
import pyarrow.parquet as pq
import pyarrow as pa
import random
import string
from multiprocessing import Pool

# Define input parameters
tmpdir = '.'                    # Parent Directory to write/read data
size   = 1000000                # Length of global dataframe
npart  = 8                      # Number of dataframe partitions
nprocs = 8                      # Number of processes to use
path = tmpdir + '/test_dataset' # Parquet dataset directory

### Investigating parallel I/O with Pandas, Apache Arrow and Parquet

- Rick Zamora (5/14/2019)


This notebook includes a simple exploration of the pyarrow-parquet writer/reader interface. The general goal is to discover the requirements for efficient parallel processing of partitioned pandas datasets.


To begin this exploration, we generate a *large* global pandas dataframe, and partition it into `npart` pieces.

In [37]:
# Define the global dataframe
df = pd.DataFrame(
    {
    'a': [random.choice(['A', 'B', 'C', 'D']) for c in range(size)], 
    'b': np.arange(size),
    'c': [random.random() for c in range(size)], 
    }
)

# Break the global dataframe into partitions
lsize = size//npart
df_part = [ df.iloc[ x * (size//npart) : size if (x == npart-1) else (x * (size//npart) + size//npart) ] for x in range(npart) ]
df_part[npart-1].head()

Unnamed: 0,a,b,c
875000,C,875000,0.313344
875001,A,875001,0.300352
875002,D,875002,0.214996
875003,D,875003,0.208321
875004,A,875004,0.44763


Above, we are showing the `head()` of the last partition. In order to write each of these partions, we define the `write_partition` function, which returns a *metadata* object for each partition. Note that the metadata object is actually a dictionary, with `'meta'`, and `'schema'` keys. 

In [38]:
# Define `write_partition` funciton
def write_partition(df, filename):
    t = pa.Table.from_pandas(df)
    metadata_list = []
    with open(filename, "wb") as fil:
        pq.write_table(
            t, 
            fil, 
            metadata_collector=metadata_list
        )
    # Return metadata & schema
    return {'meta': metadata_list[0], 'schema': t.schema}

In order to write the pandas dataframe in parallel, we can use a simple `multiprocessing.Pool` approach.

In [47]:
# Write the datafram in parallel (to multiple parquet files)
def fwrite(args):
    i = args[0]
    part = args[1]
    ipath = args[2]
    filename = ipath + "/part.%i.parquet" % i
    meta = write_partition(part, filename)
    return meta

fargs = [ [i, part, path] for i, part in enumerate(df_part) ]
os.system('rm -rf ' + path)
os.system('mkdir ' + path)
p = Pool(nprocs)
%time meta = p.map(fwrite, fargs)

# Check that there are multiple files
files = os.listdir(path)
files

CPU times: user 40 ms, sys: 72 ms, total: 112 ms
Wall time: 194 ms


['part.2.parquet',
 'part.6.parquet',
 'part.7.parquet',
 'part.4.parquet',
 'part.5.parquet',
 'part.3.parquet',
 'part.0.parquet',
 'part.1.parquet']

Here, we can see that each of the `npart` partitions is written as a separate file called `'part.<part>.parquet'` (where `part` is just an integer in this case). Since we are collecting the metadata/schema information for each of the partitions that is written, we can also write the schema information to a file called `_common_metadata` uisng the `write_metadata()` function available in `pyarrow.parquet`.

In [40]:
#pd_meta = None
#for md in meta:
#    pd_meta_part = json.loads(md['schema'].metadata[b"pandas"].decode("utf8"))
#    if not pd_meta:
#        pd_meta = pd_meta_part
#    elif pd_meta['index_columns'] and pd_meta_part['index_columns']:
#        pd_meta['index_columns'][0]['stop'] = max( pd_meta_part['index_columns'][0]['stop'], pd_meta['index_columns'][0]['stop'])      
#print(pd_meta)

# Write a `_common_metadata` (schema) file using the 0th partition
# (Note that this is not actually needed...)
schema = meta[0]['schema']
metadata_path = path + '/_common_metadata' 
with open(metadata_path, "wb") as fil:
    pq.write_metadata(schema, fil)

Notice that the `pq.write_metadata()` call above uses the `schema` object for the 0th partition. This actually means that not **all** information in `_common_metadata` will be true for the entire dataset. More specifically, the `"index_columns"` item will have something like this:

```[{"kind": "range", "name": null, "start": 0, "stop": 50000, "step": 1}]```
            
Therefore, the `"start"` and `"stop"` values will correspond to the 0th partition, rather than the global dataset.

Although we just took the time to write out a `_common_metadata` file, it does not currently make much sense to use the file to read back the dataset.  I say this, because that file does not seem include the type of metadata we really need for typical data-processing tasks (such as the number of partitions, the schema, and the row statistics).

Assuming that a single-file metadata solution is currently missing, we can temporarily rely on the `ParquetDataset` class in `pyarrow.parquet`. When passed a list of files (or a directory in our case), `ParquetDataset` allows you to iterate through the various `pieces` (partitions) of the global dataset.

In [41]:
%time dataset = pq.ParquetDataset(path)

CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 6.49 ms


In [42]:
pieces = dataset.pieces
piece = pieces[0]
%time md = piece.get_metadata(lambda fn: pq.ParquetFile(open(fn, mode="rb")))
columns = md.schema.names
md

CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 716 µs


<pyarrow._parquet.FileMetaData object at 0x7fcbc9868ae8>
  created_by: parquet-cpp version 1.5.1-SNAPSHOT
  num_columns: 3
  num_rows: 125000
  num_row_groups: 1
  format_version: 1.0
  serialized_size: 991

As shown above, rather than reading the metadata from a dedicated metadata file, we can use the `dataset` obect to read the metadata from each piece (partition).  If we are interested in the row statistics of each partition, we can iterate trhough each file and generate our own `stats` structure. For example:

In [43]:
row_groups = [
    piece.get_metadata(
        lambda fn: pq.ParquetFile(open(fn, mode="rb"))
    ).row_group(0)
    for piece in pieces
]
stats = []
for row_group in row_groups:
    s = {"num-rows": row_group.num_rows, "columns": []}
    for i, name in enumerate(columns):
        column = row_group.column(i)
        d = {"name": name}
        if column.statistics:
            d.update(
                {
                    "min": column.statistics.min,
                    "max": column.statistics.max,
                    "null_count": column.statistics.null_count,
                }
            )
        s["columns"].append(d)
    stats.append(s)
stats[0]

{'num-rows': 125000,
 'columns': [{'name': 'a', 'min': b'A', 'max': b'D', 'null_count': 0},
  {'name': 'b', 'min': 0, 'max': 124999, 'null_count': 0},
  {'name': 'c', 'min': 6.62184e-06, 'max': 0.999997, 'null_count': 0}]}

Using the `pieces` member of `ParquetDataset` class, it is straightforward to read back each partition of the dataset in parallel. Since we know the names of the columns and the *pieces* to iterate through, we can use the same `multiprocessing.Pool` approach that we used to write the dataset:

In [44]:
def read_partition(piece, columns):
    with open(piece.path, mode="rb") as f:
        table = piece.read(
            columns=columns,
            use_pandas_metadata=True,
            file=f,
            use_threads=False
        )
    df = table.to_pandas(use_threads=False)
    return df[list(columns)]

In [45]:
def fread(args):
    piece = args[0]
    columns = args[1]
    dfrd = read_partition(piece, columns=columns)
    return dfrd

fargs = [ [piece, columns] for piece in pieces ]
p = Pool(nprocs)
%time df_part_rd = p.map(fread, fargs)

CPU times: user 24 ms, sys: 20 ms, total: 44 ms
Wall time: 344 ms


Here, we can also confirm that each partition of `df_part_rd` is the same as `df_part`:

In [46]:
assert(df_part_rd[i] == df_part[i] for i in range(npart))

Warning: This document should be viewed as my own personal notes.  There may be serious mistakes/errors.