# Parquet files
By the end of this lecture you will be able to:
- read from a Parquet file
- use query optimisation to read a subset of the data
- get the schema of a Parquet file
- set the parallelisation strategy for reading a Parquet file
- write a Parquet file with compression
- write a Parquet file that is larger than memory

## What is a Parquet file?
A Parquet file is:
- where data is stored in columns rather than rows as in a CSV
- where each column has a name and a dtype that matches its name and dtype in a `DataFrame`

The Apache Parquet and Apache Arrow projects evolved together as columnar formats where Apache Parquet is the format for the data on disk and Apache Arrow is the format for the data in memory.

Compared to CSV a Parquet file:
- is much faster to read and write than a CSV file
- takes much less space on disk, especially once compression is applied
- allows Polars to select which columns to read from the file
- allows Polars to select which subsets to read from the file (in lazy mode with a predicate pushdown optimisation)
- preserves the dtypes of columns

*I use Parquet files whenever possible for my data engineering pipelines* with the exception of small files that I want to open to read manually

In [None]:
from pathlib import Path

import polars as pl

## Creating a Titanic Parquet file
We begin by creating a Parquet file from the Titanic CSV file

In [None]:
csv_file = "../data/titanic.csv"

We create the Parquet Titanic directory in the `data_files/parquet` sub-directory of the `io` sub-directory. We use Python's built-in `pathlib` library for this.

In [None]:
parquet_file_path = Path("data_files/parquet/titanic")
# If this directory doesn't exist on your machine then create it
if not parquet_file_path.exists():
    parquet_file_path.mkdir(parents=True,exist_ok=True)

Now we define the path that we will write the Parquet file to

In [None]:
parquet_file = "data_files/parquet/titanic/titanic.parquet"

Finally we read the CSV and write to the Parquet path

In [None]:
pl.read_csv(csv_file).write_parquet(parquet_file)

## Reading a Parquet file
We read the Parquet file to a `DataFrame`

In [None]:
df = pl.read_parquet(parquet_file)
df.head(3)

A Parquet file has a *footer* where meta data about the file is stored. Polars can quickly read this footer to get the schema of a Parquet file without having to read any data.

In Polars we can use the `read_parquet_schema` function for this

In [None]:
pl.read_parquet_schema(parquet_file)

We can select a subset of columns to read from a Parquet file with the `columns` argument

In [None]:
(
    pl.read_parquet(
        parquet_file,
        columns=["Pclass","Name"]
    )
    .head(3)
)

We can also specify the number of rows that we want to read with `n_rows`

In [None]:
(
    pl.read_parquet(
        parquet_file,
        n_rows=2
    )
)

If we are running out of memory when reading a Parquet file we can specify `low_memory = True`. This can help to reduce peak memory usage at the expense of a longer load time

In [None]:
(
    pl.read_parquet(
        parquet_file,
        low_memory=True
    )
    .head(2)
)

Polars reads the Parquet file in multiple threads into different chunks of memory. By default Polars then combines all the chunks into a single chunk in parallel. With the `low_memory=True` argument Polars reduces peak memory usage by not doing this recombination in parallel.

As with CSVs, `low_memory = True` only reduces memory usage marginally. If your query is too big to fit in memory use `streaming` in lazy mode.

## Writing a Parquet file
When we write a Parquet file we can specify different compression algorithms. I recommend using the default `zstd` in most cases for a good balance of compressed file size on disk and read time into memory. The `lz4` option is an alternative when faster reading and writing is preferred.

In [None]:
(
    df
    .write_parquet(
        parquet_file,
        compression="zstd"
    )
)

We can also adjust the degree of compression with `compression_level`. The range of values depends on the compression scheme chosen - see the docstrings for details: https://docs.pola.rs/api/python/stable/reference/api/polars.DataFrame.write_parquet.html#polars.DataFrame.write_parquet
## Query optimisations on Parquet files

### Projection pushdown for subsets of columns
When we work in lazy mode in Polars the query optimiser detects when only a subset of columns must be read automatically - this is the projection pushdown query optimisation

In [None]:
print(
    pl.scan_parquet(parquet_file)
    .select("Pclass","Name")
    .explain()
)

### Predicate pushdown for subsets of rows

A Parquet file internally is broken into groups of rows (called row groups). Parquet files can store simple min/max statistics of the data in each row group. In a lazy query Polars can use these statistics to determine if only some row groups of the file need to be read.

Polars add these statistics by default when it writes a Parquet file. Note that we can write a file with statistics setting `statistics=False` in `write_parquet`. Turning off statistics makes writing the file faster.

We make a demonstration here of how the Polars can use the statistics to speed up lazy queries.

We are going to create a new `DataFrame` and write it to Parquet. First we make a new directory for this

In [None]:
statistics_parquet_file_path = Path("data_files/parquet/statistics")
# If this directory doesn't exist on your machine then create it
if not statistics_parquet_file_path.exists():
    statistics_parquet_file_path.mkdir(parents=True,exist_ok=True)
statistics_parquet_file = "data_files/parquet/statistics/statistics.parquet"

We now make a one-column `DataFrame` with many rows and write it to Parquet

In [None]:
(
    pl.DataFrame(
        {
            "id":pl.int_range(0,10_000_000,eager=True)
        }
    )
    .write_parquet(
        statistics_parquet_file,
    )
)

If we apply a filter on a **lazy** scan of a Parquet file with statistics, Polars uses the row groups to reduce how much data must be read from the file based on the condition in the `SELECTION` of the query plan.

In this example we look for rows where the value of the `id` column is between one million and 2 million.

In [None]:
print(
    pl.scan_parquet(statistics_parquet_file)
    .filter(
        pl.col("id").is_in([1_000_000,2_000_000])
    )
    .explain()
)

We now compare how long it takes in eager mode to read the whole file and then apply the `filter` to the lazy mode with query optimisations applied

In [None]:
%%timeit -n1 -r3
(
    pl.read_parquet(statistics_parquet_file)
    .filter(
        pl.col("id") < 1000
    )
)

In [None]:
%%timeit -n1 -r3
(
    pl.scan_parquet(statistics_parquet_file)
    .filter(
        pl.col("id") < 1000
    )
    .collect()
)

So in this case the query is much faster as Polars can limit how much data is must read out of the file.

### Changing the row group size
By default the number of rows in each row group is set to 512^2 (262144). We can adjust this with the `row_group_size` argument. However,...
- setting the `row_group_size` to a much smaller value (say 1000) makes the file much bigger
- parsing the row group statistics is slow and so even a query that only requires a small number of rows above might be slower with smaller row groups
- I've omitted the example but if I try to set the row group size to be smaller the query above is no faster than with the default `row_group_size`

Personally, I just keep the default row-group size.

### Taking advantage of row groups
To take advantage of row groups you need to:
- sort the data in your file so that similar values are clustered together
- do lazy queries with a `SELECTION` condition

Consider an example where we have a large time series dataset with a datetime column called `time` and a vendor ID column called `VendorID`.

If we mainly want to query this by time period then we should sort this by `time` before writing
```python
df.sort("time","VendorID").write_parquet()
```

But if we mainly want to query this by vendor then we should sort this by `VendorID` before writing
```python
df.sort("VendorID","time").write_parquet()
```

If the rows that meet the `SELECTION` condition are spread through the file then Polars will end up reading many row groups and the query may be slower than on a file without statistics as Polars must evaluate the statistics.

## Modifying the parallel strategy
Polars reads a Parquet file in parallel. It can do this by either reading columns in parallel or row groups in parallel. So how does Polars choose which to do?

The basic rule is that Polars counts how many columns and row groups there are and then parallelizes the reading of the larger of the two. So if there are 10 row groups and 20 columns, Polars will read the columns in parallel. This is a solid basic strategy but it isn't guaranteed to be the fastest for your data.

We can instead tell Polars to do parallelism by `columns` or `row_groups` by setting the `parallel` argument in the `scan_parquet` function. For example, to read columns in parallel

In [None]:
(
    pl.scan_parquet(statistics_parquet_file, parallel='columns')
    .collect()
    .head()
)

There is a also a new alternative strategy called `prefiltered`, which does a bit of both and can be useful when we are applying a predicate filter. The `prefiltered` strategy first evaluates the pushed-down predicates in parallel and determines which rows need to read. Then, this strategy parallelizes over both the columns and the row groups while filtering out rows that do not need to be read. 

In some cases with large files and significant filtering the `prefiltered` can provide a significant speedup. In other cases, the `prefiltered` strategy may be slower. You can see for yourself by setting the `parallel` argument in the `scan_parquet` function.

In [None]:
(
    pl.scan_parquet(statistics_parquet_file, parallel='prefiltered')
    .filter(
        pl.col("id") < 1000
    )
    .collect()
    .head()
)

Overall, if querying a large Parquet file is a bottleneck in your pipeline it may be worth experimenting with this argument to see if you can get a speed-up.

### Writing a larger-than-memory Parquet file
We can use the streaming engine to process a larger-than-memory query in batches and write the output to a Parquet file in batches. We use the `sink_parquet` method to write to the Parquet file in this way.

In this example we create a lazy query by scanning the Parquet file created above and then sink the output to another Parquet file.

In [None]:
sink_parquet_file = "data_files/parquet/titanic/titanic_sink.parquet"
(
    pl.scan_parquet(parquet_file)
    .group_by("Pclass")
    .agg(
        pl.col("PassengerId").count().alias("counts")
    )
    .sink_parquet(sink_parquet_file)
)

The `sink_parquet` approach only requires a `LazyFrame`, the query does not have to begin with a `scan_parquet`. This means we can use it to convert a larger-than-memory CSV file to a Parquet file

In [None]:
sink_parquet_file = "data_files/parquet/titanic/titanic_sink.parquet"
(
    pl.scan_csv(csv_file)
    .sink_parquet(sink_parquet_file)
)

There is also a `sink_csv` method available: https://pola-rs.github.io/polars/py-polars/html/reference/api/polars.LazyFrame.sink_csv.html

For the `sink_parquet` method to work the full query must work in streaming mode. Recall that we confirm this by confirming that the entire query is within the `--- STREAMING` block in the query plan

In [None]:
print(
    pl.scan_csv(csv_file)
    .group_by("Pclass")
    .agg(
        pl.col("PassengerId").count().alias("counts")
    )
    .explain(streaming=True)
)

## Exercises
In the exercises you will develop your understanding of:
- read and writing Parquet files
- categorical dtypes in Parquet files
- reading the schema of Parquet files
- reading a subset of Parquet files

### Exercise 1
We will write a new Parquet file for the exercises to this path

In [None]:
exercise_parquet_file = "data_files/parquet/titanic/titanic_exercise.parquet"

Before we write to this file read the Parquet file created at the start of the notebook to a `DataFrame`. 

Convert the `Sex` column to `pl.Categorical`

In [None]:
df = (
    pl.read_parquet(parquet_file)
    .with_columns(<blank>)
)
df.head(3)

Write the `DataFrame` with a categorical column to `exercise_parquet_file`

Read the schema of `exercise_parquet_file` to confirm whether Parquet can preserve categorical encodings

Create a lazy query that only reads these columns
```python
["Survived","Pclass","Age","Sex"]
```

## Solutions

### Solution to exercise 1
We will write a new Parquet file for the exercises to this path

In [None]:
exercise_parquet_file = "data_files/parquet/titanic/titanic_exercise.parquet"

Before we write to this file read the Parquet file created at the start of the notebook to a `DataFrame`. 

Convert the `Sex` column to `pl.Categorical`

In [None]:
df = (
    pl.read_parquet(parquet_file)
    .with_columns(pl.col("Sex").cast(pl.Categorical))
)
df.head(3)

Write the `DataFrame` with a categorical column to `exercise_parquet_file`

In [None]:
df.write_parquet(exercise_parquet_file)

Read the schema of `exercise_parquet_file` to confirm whether Parquet can preserve categorical encodings

In [None]:
pl.read_parquet_schema(exercise_parquet_file)

Create a lazy query that only reads these columns
```python
["Survived","Pclass","Age","Sex"]
```

In [None]:
(
    pl.scan_parquet(exercise_parquet_file)
    .select(["Survived","Pclass","Age","Sex"])
)