#❓ What's the optimal way to read partitioned parquet files into pandas?
______________________________
#### **[Upgini](https://github.com/upgini)  "What's new" monthly digest, November 2022**
______________________________
##Options
1.   Baseline with Pandas - it can read parquet file with pyarrow under the hood, so why extra dependences?
2.   [Vaex.io](https://github.com/vaexio/vaex) - based on pyarrow as well, read by chunks (out-of-core execution)
3.   [Pola.rs](https://github.com/pola-rs/polars) - just implemented out-of-core execution (end of Oct'22) and has two modes:
  - Pyarrow as a parquet format reader
  - Rust based parquet reader (<- we'll test this )
4.   [Pyarrow](https://arrow.apache.org/docs/python/install.html) - basic building block for other libs

##Steps to read partitioned parquet files
1. Read partitions
2. Apply filters on values, if any (preferably @read operation)
3. Sort (as order is not guaranteed for partitioned parquet files)
4. Convert filtered and sorted dataset to pandas dataframe

##Things to measure

1.   Execution time
2.   Memory consumption

Let's install all the libraries, and a memory profiler, to estimate mem utilization

In [None]:
%pip install -Uq pandas vaex-core polars memory_profiler

[K     |████████████████████████████████| 12.2 MB 877 kB/s 
[K     |████████████████████████████████| 4.5 MB 46.0 MB/s 
[K     |████████████████████████████████| 14.2 MB 52.3 MB/s 
[K     |████████████████████████████████| 110 kB 66.2 MB/s 
[K     |████████████████████████████████| 1.1 MB 54.5 MB/s 
[K     |████████████████████████████████| 237 kB 58.9 MB/s 
[K     |████████████████████████████████| 51 kB 4.3 MB/s 
[?25h  Building wheel for aplus (setup.py) ... [?25l[?25hdone


In [None]:
import numpy as np
import pandas as pd
import polars as pl
import pyarrow.parquet as pq
import pyarrow.dataset as ds
import vaex
import gc
import os
import requests
%load_ext memory_profiler

## Bechmark dataset
For benchmark we'll take [Kaggle dataset](https://www.kaggle.com/datasets/wordsforthewise/lending-club) from LandingClub, it's listed under СС0 license.  

*   Loan request rejections for 11 years of history - from 2007 till 2018; 24+mln of records
*   Dataset has been converted to partitioned parquet files with ZSTD compression and year as a partition variable (12 files)

Let's copy that into local file system for Colab, otherwise - stable measurement is not garanteed

In [None]:
base_url = "https://github.com/upgini/upgini/raw/main/notebooks/digest202211/"
current_dir = os.getcwd()
os.mkdir(current_dir+"/data")
os.chdir(current_dir+"/data")

for year in range(2007, 2019):
    file_name = f"bench_data_{year}.parquet"
    url = base_url + file_name
    response = requests.get(url)
    with open(f"{file_name}", "wb") as f:
        f.write(response.content)

Thanks to Vaex for dir parsing code 🙏

In [None]:
import vaex.file
import glob
path="/content/data/*.parquet"
filenames = []
path = vaex.file.stringyfy(path)
naked_path, options = vaex.file.split_options(path)
if glob.has_magic(naked_path):
        filenames.extend(list(sorted(vaex.file.glob(path))))
else:
        filenames.append(path)

## 1. Pandas `read_parquet` via *pyarrow*

In [None]:
%%memit
df = pd.read_parquet(
    filenames,
    engine="pyarrow",
    filters=[('Risk_Score','>',0)]
    ).sort_values(by="Application Date")

peak memory: 1970.91 MiB, increment: 1606.54 MiB


In [None]:
 %%memit
del df
_ = gc.collect()

peak memory: 1884.49 MiB, increment: 0.00 MiB


#### With `use_threads=True` pyarrow param


In [None]:
%%timeit
df = pd.read_parquet(
    filenames,
    engine="pyarrow",
    use_threads=True,
    filters=[('Risk_Score','>',0)]
    ).sort_values(by="Application Date")

15.8 s ± 726 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


#### With `use_threads=False` pyarrow param

In [None]:
%%timeit
df = pd.read_parquet(
    filenames,
    engine="pyarrow",
    use_threads=False,
    filters=[('Risk_Score','>',0)]
    ).sort_values(by="Application Date")

15.4 s ± 1.14 s per loop (mean ± std. dev. of 7 runs, 1 loop each)


## 2. Vaex `open`

In [None]:
%%memit
df = vaex.open(filenames)
df_pd = df[df["Risk_Score"]>0].sort(by="Application Date").to_pandas_df()

peak memory: 6844.53 MiB, increment: 4954.29 MiB


In [None]:
%%memit
del df_pd, df
_ = gc.collect()

peak memory: 6014.50 MiB, increment: 0.00 MiB


In [None]:
%%timeit
df = vaex.open(filenames)
df_pd = df[df["Risk_Score"]>0].sort(by="Application Date").to_pandas_df()

24.3 s ± 2.94 s per loop (mean ± std. dev. of 7 runs, 1 loop each)


## 3. Polars

#### Full read with `read_parquet`

In [None]:
%%memit
df = pl.read_parquet("/content/data/*.parquet").filter(pl.col("Risk_Score")>0)
df_pd = df.sort("Application Date").to_pandas()

peak memory: 8532.83 MiB, increment: 5633.97 MiB


In [None]:
%%memit
del df_pd, df
_ = gc.collect()

peak memory: 4325.67 MiB, increment: 0.00 MiB


In [None]:
%%timeit
df = pl.read_parquet(
    "/content/data/*.parquet"
    ).filter(pl.col("Risk_Score")>0).sort("Application Date").to_pandas()

14.3 s ± 1.82 s per loop (mean ± std. dev. of 7 runs, 1 loop each)


#### Stream/out-of-core `scan_parquet`

In [None]:
%%memit
df = pl.scan_parquet("/content/data/*.parquet").filter(pl.col("Risk_Score")>0)
df_pd = df.sort("Application Date").collect().to_pandas()

peak memory: 5662.12 MiB, increment: 2201.96 MiB


In [None]:
%%memit
del df_pd, df
_ = gc.collect()

peak memory: 3825.09 MiB, increment: 0.00 MiB


In [None]:
%%timeit
df = pl.scan_parquet(
    "/content/data/*.parquet"
    ).filter(pl.col("Risk_Score")>0).sort("Application Date").collect().to_pandas()

11.3 s ± 828 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


## 4. Pyarrow

#### With `ParquetDataset` class

In [None]:
%%memit
df_pd = pq.ParquetDataset(
    filenames,
    use_legacy_dataset=False,
    filters=[('Risk_Score','>',0)]
    ).read().sort_by("Application Date").to_pandas()

peak memory: 5398.18 MiB, increment: 1701.32 MiB


In [None]:
%%memit
del df_pd
_ = gc.collect()

peak memory: 3899.60 MiB, increment: -23.79 MiB


In [None]:
%%timeit
df_pd = pq.ParquetDataset(
    filenames,
    use_legacy_dataset=False,
    filters=[('Risk_Score','>',0)]
    ).read().sort_by("Application Date").to_pandas()

11 s ± 621 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


#### With `Dataset` class

In [None]:
%%memit
df_pd = ds.dataset(
    filenames
    ).scanner(filter=ds.field("Risk_Score") > 0).to_table().sort_by("Application Date").to_pandas()

peak memory: 5613.84 MiB, increment: 1720.72 MiB


In [None]:
%%memit
del df_pd
_ = gc.collect()

peak memory: 3919.64 MiB, increment: 0.00 MiB


In [None]:
%%timeit
df_pd = ds.dataset(
    filenames
    ).scanner(filter=ds.field("Risk_Score") > 0).to_table().sort_by("Application Date").to_pandas()

11.3 s ± 675 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


##Conclusion

1. **Pandas** - a solid option, not the fastest one, but for sure memory efficient. Memory increment ~1,6GB; execution ~15,5s
2. **Vaex** - slowest one. We wan't be able to tune chunk size to speed up reading with out-of-core execution, probably in the future releases it's gonna be fixed. Memory increment ~5,0GB; execution ~24,5s
3. **Polars** - with `scan_parquet` is among two fastest options. But it has issues with Decimal type support in parquet files (not tested here 😉), so we'll keep looking on it's improvement with out-of-core execution and data types support.  Memory increment ~2,2GB; execution ~11,5s
4. **Pyarrow** - with `ParquetDataset` class one of the two fastest options. And a second place on memory consumption with a small gap from Pandas. The most balanced choice for this scenario. Memory increment ~1,7GB; execution ~11,3s
______________________________
Thanks for reading! If you found this useful or interesting, please share with a friend.
______________________________
## 🔗 Useful links
* Upgini Library [Documentation](https://github.com/upgini/upgini#readme)
* More [Notebooks and Guides](https://github.com/upgini/upgini#briefcase-use-cases)
* Kaggle public [Notebooks](https://www.kaggle.com/romaupgini/code)


<sup>😔 Found mistype or a bug in code snippet? Our bad! <a href="https://github.com/upgini/upgini/issues/new?assignees=&title=readme%2Fbug">
Please report it here.</a></sup>