In [1]:
import pyarrow.parquet as pq
import pyarrow as pa
from pyarrow import csv
import pandas as pd
import numpy as np
import os
import psutil

In [2]:
## Read Palmer Station Penguin dataset from GitHub
df = pd.read_csv("https://raw.githubusercontent.com/allisonhorst/"
                 "palmerpenguins/47a3476d2147080e7ceccef4cf70105c808f2cbf/"
                 "data-raw/penguins_raw.csv")

In [3]:
# Increase dataset to 1m rows and reset index
df = df.sample(1_000_000, replace=True).reset_index(drop=True)


# Update sample number (0 to 999'999)
df["Sample Number"] = df.index
# Add some random variation to numeric columns
df[["Culmen Length (mm)", "Culmen Depth (mm)", 
    "Flipper Length (mm)", "Body Mass (g)"]] = df[["Culmen Length (mm)", "Culmen Depth (mm)", 
                                                   "Flipper Length (mm)", "Body Mass (g)"]] \
                                               + np.random.rand(df.shape[0], 4)

# Create dataframe where missing numeric values are filled with zero
df_nonan = df.copy()
df_nonan[["Culmen Length (mm)", "Culmen Depth (mm)", 
          "Flipper Length (mm)", "Body Mass (g)"]] = df[["Culmen Length (mm)", "Culmen Depth (mm)", 
                                                         "Flipper Length (mm)", "Body Mass (g)"]].fillna(0)

In [4]:
%%timeit
# Write to csv
df.to_csv("penguin-dataset.csv")

6.76 s ± 87.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [8]:
# Write to csv
df.to_csv("penguin-dataset.csv")

In [5]:
%%timeit
# Write to parquet
df.to_parquet("penguin-dataset.parquet")

735 ms ± 12.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [9]:
# Write to parquet
df.to_parquet("penguin-dataset.parquet")

In [6]:
%%timeit
# Write to Arrow
# Convert from pandas to Arrow
table = pa.Table.from_pandas(df)
# Write out to file
with pa.OSFile('penguin-dataset.arrow', 'wb') as sink:
    with pa.RecordBatchFileWriter(sink, table.schema) as writer:
        writer.write_table(table)

456 ms ± 19.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [10]:
# Write to Arrow
# Convert from pandas to Arrow
table = pa.Table.from_pandas(df)
# Write out to file
with pa.OSFile('penguin-dataset.arrow', 'wb') as sink:
    with pa.RecordBatchFileWriter(sink, table.schema) as writer:
        writer.write_table(table)

In [7]:
%%timeit
# Convert from no-NaN pandas to Arrow
table_nonan = pa.Table.from_pandas(df_nonan)
# Write out to file
with pa.OSFile('penguin-dataset-nonan.arrow', 'wb') as sink:
    with pa.RecordBatchFileWriter(sink, table_nonan.schema) as writer:
        writer.write_table(table_nonan)

424 ms ± 14.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [11]:
# Convert from no-NaN pandas to Arrow
table_nonan = pa.Table.from_pandas(df_nonan)
# Write out to file
with pa.OSFile('penguin-dataset-nonan.arrow', 'wb') as sink:
    with pa.RecordBatchFileWriter(sink, table_nonan.schema) as writer:
        writer.write_table(table_nonan)

In [12]:
%%timeit
pd.read_csv("penguin-dataset.csv")["Flipper Length (mm)"].mean()

1.28 s ± 11 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [13]:
%%timeit
csv.read_csv("penguin-dataset.csv")["Flipper Length (mm)"].to_pandas().mean()

202 ms ± 16.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [14]:
%%timeit
pd.read_parquet("penguin-dataset.parquet", columns=["Flipper Length (mm)"]).mean()

24.4 ms ± 1.05 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [15]:
%%timeit
pq.read_pandas("penguin-dataset.parquet", columns=["Flipper Length (mm)"]).to_pandas().mean()

23.8 ms ± 782 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [16]:
%%timeit
with pa.OSFile('penguin-dataset.arrow', 'rb') as source:
    table = pa.ipc.open_file(source).read_all().column("Flipper Length (mm)")
result = table.to_pandas().mean()

40.9 ms ± 2.01 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [17]:
%%timeit
source = pa.memory_map('penguin-dataset.arrow', 'r')
table = pa.ipc.RecordBatchFileReader(source).read_all().column("Flipper Length (mm)")
result = table.to_pandas().mean()

4.43 ms ± 111 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


In [18]:
%%timeit
source = pa.memory_map('penguin-dataset-nonan.arrow', 'r')
table = pa.ipc.RecordBatchFileReader(source).read_all().column("Flipper Length (mm)")
result = table.to_pandas().mean()

2.81 ms ± 67.4 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


In [19]:
# Measure initial memory consumption
memory_init = psutil.Process(os.getpid()).memory_info().rss >> 20

In [20]:
col_csv = csv.read_csv("penguin-dataset.csv")["Flipper Length (mm)"]
memory_post_csv = psutil.Process(os.getpid()).memory_info().rss >> 20

In [21]:
col_parquet = pd.read_parquet("penguin-dataset.parquet", columns=["Flipper Length (mm)"])
memory_post_parquet = psutil.Process(os.getpid()).memory_info().rss >> 20

In [22]:
with pa.OSFile('penguin-dataset.arrow', 'rb') as source:
    col_arrow_file = pa.ipc.open_file(source).read_all().column("Flipper Length (mm)").to_pandas()
memory_post_arrowos = psutil.Process(os.getpid()).memory_info().rss >> 20

In [23]:
source = pa.memory_map('penguin-dataset.arrow', 'r')
table_mmap = pa.ipc.RecordBatchFileReader(source).read_all().column("Flipper Length (mm)")
col_arrow_mapped = table_mmap.to_pandas()
memory_post_arrowmmap = psutil.Process(os.getpid()).memory_info().rss >> 20

In [24]:
source = pa.memory_map('penguin-dataset-nonan.arrow', 'r')
table_mmap_zc = pa.ipc.RecordBatchFileReader(source).read_all().column("Flipper Length (mm)")
col_arrow_mapped_zc = table_mmap_zc.to_pandas()
memory_post_arrowmmap_zc = psutil.Process(os.getpid()).memory_info().rss >> 20

In [25]:
# Print memory consumption
print(f"csv: {memory_post_csv - memory_init}\n"
      f"Parquet: {memory_post_parquet - memory_post_csv}\n"
      f"Arrow file API: {memory_post_arrowos - memory_post_parquet}\n"
      f"Arrow memory-mapped API with NaNs: {memory_post_arrowmmap - memory_post_arrowos}\n"
      f"Arrow memory-mapped API (zero-copy): {memory_post_arrowmmap_zc - memory_post_arrowmmap}\n")

csv: 494
Parquet: 21
Arrow file API: 196
Arrow memory-mapped API with NaNs: 5
Arrow memory-mapped API (zero-copy): 0



In [26]:
def convert_hdf5_to_parquet(h5_file, parquet_file, chunksize=100000):

    stream = pd.read_hdf(h5_file, chunksize=chunksize)

    for i, chunk in enumerate(stream):
        print("Chunk {}".format(i))

        if i == 0:
            # Infer schema and open parquet file on first chunk
            parquet_schema = pa.Table.from_pandas(df=chunk).schema
            parquet_writer = pq.ParquetWriter(parquet_file, parquet_schema, compression='snappy')

        table = pa.Table.from_pandas(chunk, schema=parquet_schema)
        parquet_writer.write_table(table)

    parquet_writer.close()

In [27]:
convert_hdf5_to_parquet('5k_pbmcs_10X.sparse.h5ad', '5k_pbmcs_10X.parquet')

ValueError: No dataset in HDF5 file.