# Parquet files

## What is a Parquet file?

- Where data is stored in columns rather than rows as in CSV
- Each column has a name and a dtype that matches its name and dtype in a `DataFrame`

The Apache Parquet and Apache Arrow projects evolved together as columnar formats where `Apache Parquet` is the format for the data on `disk` and `Apache Arrow` is the format for the data in `memory`.

Compared to CSV, parquet:
- much faster to read and write than a CSV file
- much `less space` on disk, especially once compression is applied
- allows Polars to select which columns to read from the file
- allows Polars to select which subsets to read from the file (in lazy mode with a predicate push-down optimization)
- preserves the dtypes of columns

In [1]:
from pathlib import Path

import polars as pl

## Creating a Parquet file

In [2]:
csv_file = "data/titanic.csv"

In [3]:
parquet_file_path = Path("data/parquet/titanic")

if not parquet_file_path.exists():
    parquet_file_path.mkdir(parents=True,exist_ok=True)

In [4]:
parquet_file = "data/parquet/titanic/titanic.parquet"

In [5]:
pl.read_csv(csv_file).write_parquet(parquet_file)

## Reading a Parquet file

In [6]:
df = pl.read_parquet(parquet_file)

df.head(3)

PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
i64,i64,i64,str,str,f64,i64,i64,str,f64,str,str
1,0,3,"""Braund, Mr. Owen Harris""","""male""",22.0,1,0,"""A/5 21171""",7.25,,"""S"""
2,1,1,"""Cumings, Mrs. John Bradley (Fl…","""female""",38.0,1,0,"""PC 17599""",71.2833,"""C85""","""C"""
3,1,3,"""Heikkinen, Miss. Laina""","""female""",26.0,0,0,"""STON/O2. 3101282""",7.925,,"""S"""


Parquet file has a `footer` which stores the meta data.

Polars can read it quickly to get the schema and without needing to read any data.

In [7]:
pl.read_parquet_schema(parquet_file)

Schema([('PassengerId', Int64),
        ('Survived', Int64),
        ('Pclass', Int64),
        ('Name', String),
        ('Sex', String),
        ('Age', Float64),
        ('SibSp', Int64),
        ('Parch', Int64),
        ('Ticket', String),
        ('Fare', Float64),
        ('Cabin', String),
        ('Embarked', String)])

In [9]:
pl.read_parquet(
    parquet_file,
    columns=["Pclass", "Name"]
).head(3)

Pclass,Name
i64,str
3,"""Braund, Mr. Owen Harris"""
1,"""Cumings, Mrs. John Bradley (Fl…"
3,"""Heikkinen, Miss. Laina"""


In [10]:
pl.read_parquet(
    parquet_file,
    n_rows=2
).head(3)

PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
i64,i64,i64,str,str,f64,i64,i64,str,f64,str,str
1,0,3,"""Braund, Mr. Owen Harris""","""male""",22.0,1,0,"""A/5 21171""",7.25,,"""S"""
2,1,1,"""Cumings, Mrs. John Bradley (Fl…","""female""",38.0,1,0,"""PC 17599""",71.2833,"""C85""","""C"""


Low memory mode

In [11]:
pl.read_parquet(
    parquet_file,
    low_memory=True
).head(3)

PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
i64,i64,i64,str,str,f64,i64,i64,str,f64,str,str
1,0,3,"""Braund, Mr. Owen Harris""","""male""",22.0,1,0,"""A/5 21171""",7.25,,"""S"""
2,1,1,"""Cumings, Mrs. John Bradley (Fl…","""female""",38.0,1,0,"""PC 17599""",71.2833,"""C85""","""C"""
3,1,3,"""Heikkinen, Miss. Laina""","""female""",26.0,0,0,"""STON/O2. 3101282""",7.925,,"""S"""


## Writing a Parquet file
Writing a Parquet file we can specify different compression algorithms.

The default `zstd` in most cases for a good balance of compressed file size on disk and read time into memory. 

`lz4` option is an alternative when faster reading and writing is preferred.

In [13]:
df.write_parquet(
    parquet_file,
    compression="zstd"
)

## Query optimizations on Parquet files

### Projection push-down for subsets of columns

In lazy mode, when the query optimizer detects when there is only a subset of columns must be read automatically - this is the projection push-down query optimization

In [None]:
print(pl.scan_parquet(parquet_file).select("Pclass", "Name").explain())

Parquet SCAN [data/parquet/titanic/titanic.parquet]
PROJECT 2/12 COLUMNS
ESTIMATED ROWS: 891


### Predicate pushdown for subsets of rows

A Parquet file internally is broken into groups of rows which is called `row groups`. 

Parquet files can store simple min/max statistics of the data in each row group. 

In a lazy query Polars can use these statistics to determine if only some row groups of the file need to be read.

In [15]:
statistics_parquet_file_path = Path("data/parquet/statistics")

if not statistics_parquet_file_path.exists():
    statistics_parquet_file_path.mkdir(parents=True,exist_ok=True)
statistics_parquet_file = "data/parquet/statistics/statistics.parquet"

In [16]:
pl.DataFrame({
    "id":pl.int_range(0,10_000_000,eager=True)
}).write_parquet(statistics_parquet_file)

In [17]:
print(
    pl.scan_parquet(
        statistics_parquet_file
    ).filter(
        pl.col("id").is_in([1_000_000, 2_000_000])
    ).explain()
)

Parquet SCAN [data/parquet/statistics/statistics.parquet]
PROJECT */1 COLUMNS
SELECTION: col("id").is_in([[1000000, 2000000]])
ESTIMATED ROWS: 10000000


Eager mode and lazy mode comparison

In [18]:
%%timeit -n1 -r3

pl.read_parquet(statistics_parquet_file).filter(
        pl.col("id") < 1000
    )

151 ms ± 52.6 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)


In [19]:
%%timeit -n1 -r3

pl.scan_parquet(statistics_parquet_file).filter(
        pl.col("id") < 1000
    ).collect()

63.9 ms ± 9.42 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)


### Changing the row group size

By default the number of rows in each row group is set to 512^2 (262144). 

- Setting the `row_group_size` to a much smaller value (say 1000) makes the file much bigger
- Parsing the row group statistics is slow and so even a query that only requires a small number of rows above might be slower with smaller row groups

### Taking advantage of row groups

We need to:
- sort the data so that similar values are clustered together
- do lazy queries with a `SELECTION` condition

If the rows that meet the `SELECTION` condition are spread through the file, Polars will end up reading many row groups and the query may be slower than on a file without statistics as Polars must evaluate the statistics.

## Modifying the parallel strategy

Polars reads a Parquet file in parallel. 

It can do this by either reading columns in parallel or row groups in parallel.

Polars counts how many columns and row groups there are and then parallelizes the reading of the larger one. 

In [20]:
pl.scan_parquet(
    statistics_parquet_file,
    parallel="columns"
).collect().head()

id
i64
0
1
2
3
4


There is a also a new alternative strategy called `prefiltered`.

It first evaluates the pushed-down predicates in parallel and determines which rows need to read. 

Then, this strategy parallelizes over both the columns and the row groups while filtering out rows that do not need to be read. 

In some cases with large files and significant filtering the `prefiltered` can provide a significant speedup.

In [22]:
pl.scan_parquet(
    statistics_parquet_file,
    parallel="prefiltered"
).filter(
    pl.col("id") < 1000
).collect().head()

id
i64
0
1
2
3
4


### Writing a larger-than-memory Parquet file
We can use the streaming engine to process a larger-than-memory query in batches and write the output to a Parquet file in batches. 

`sink_parquet` method to write to the Parquet file in this way.

In [24]:
sink_parquet_file = "data/parquet/titanic/titanic_sink.parquet"

pl.scan_parquet(
    parquet_file
).group_by(
    "Pclass"
).agg(
    pl.col("PassengerId").count().alias("counts")
).sink_parquet(sink_parquet_file)

The `sink_parquet` only requires `LazyFrame`, and the query does not have to begin with `scan_parquet`.

In [25]:
pl.scan_csv(csv_file).sink_parquet(sink_parquet_file)

## Exercises

### Exercise 1

In [28]:
exercise_parquet_file = "data/parquet/titanic/titanic_exercise.parquet"

Before we write to this file read the Parquet file created at the start of the notebook to a `DataFrame`. 

Convert the `Sex` column to `pl.Categorical`

In [30]:
df = pl.read_parquet(parquet_file).with_columns(pl.col("Sex").cast(pl.Categorical()))

df.head(3)

PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
i64,i64,i64,str,cat,f64,i64,i64,str,f64,str,str
1,0,3,"""Braund, Mr. Owen Harris""","""male""",22.0,1,0,"""A/5 21171""",7.25,,"""S"""
2,1,1,"""Cumings, Mrs. John Bradley (Fl…","""female""",38.0,1,0,"""PC 17599""",71.2833,"""C85""","""C"""
3,1,3,"""Heikkinen, Miss. Laina""","""female""",26.0,0,0,"""STON/O2. 3101282""",7.925,,"""S"""


Write the `DataFrame` with a categorical column to `exercise_parquet_file`

In [31]:
df.write_parquet(exercise_parquet_file)

Read the schema of `exercise_parquet_file` to confirm whether Parquet can preserve categorical encodings

In [32]:
pl.read_parquet_schema(exercise_parquet_file)

Schema([('PassengerId', Int64),
        ('Survived', Int64),
        ('Pclass', Int64),
        ('Name', String),
        ('Sex', Categorical),
        ('Age', Float64),
        ('SibSp', Int64),
        ('Parch', Int64),
        ('Ticket', String),
        ('Fare', Float64),
        ('Cabin', String),
        ('Embarked', String)])

Create a lazy query that only reads these columns
```python
["Survived","Pclass","Age","Sex"]
```

In [33]:
pl.scan_parquet(
    exercise_parquet_file,
).select(
    ["Survived","Pclass","Age","Sex"]
)