# Forecasting electricity demand with Polars
In this notebook we learn about:
- Loading data
- Configuring Polars
- Inspecting a `DataFrame`
- Selecting rows and columns
- Fast-track algorithms on sorted data
- Query optimisation
- Larger than memory datasets
- Visualisation
- Building a simple ML forecasting pipeline

## About me
- PhD in climate physics from University of Oxford
- Founded my data science consultantcy at [Rho-Signal](https://www.rhosignal.com/)
- Deployed ML pipelines with Polars for [The Electric Storage Company](https://theelectricstoragecompany.com/)
- Polars contributor focused on communication and accessibility
- Creator of the [Data Analysis with Polars course on Udemy](https://www.udemy.com/course/data-analysis-with-polars/?referralCode=A29DCDA40D369080C05A)
- Connect with me on [LinkedIn](https://www.linkedin.com/in/liam-brannigan-9080b214a/)

In [None]:
# %pip install -U polars numpy catboost altair vegafusion[embed]

In [None]:
from datetime import datetime,date,timedelta

import polars as pl
import polars.selectors as cs

import numpy as np
import catboost

import altair as alt
import vegafusion as vf

vf.enable()

Course materials available from my [Data Analysis with Polars course on Udemy](https://www.udemy.com/course/data-analysis-with-polars/?referralCode=A29DCDA40D369080C05A)

## Polars at a glance
- Dataframe library for tabular data
- APIs in Rust, Python, SQL, R, Node
- Built-in parallelisation
- Scales to larger-than-memory datasets
- Apache Arrow in-memory
- Type security

## Loading data
Polars supports a wide range of data sources including:
- CSV
- Parquet
- Arrow/Feather/IPC
- JSON
- Excel
- Databases

Recommended: use Parquet to take advantage of columnar selection and dtype preservation

> Course material: See the I/O section for lectures on each of these data sources

In [None]:
df = pl.read_csv("load_features.csv",try_parse_dates=True)
df

We can configure how Polars looks and behaves with the `pl.Config` namespace. 

For example, to set the number of rows that are printed we use `pl.Config.set_tbl_rows`

In [None]:
pl.Config.set_tbl_rows(6)

Explore other configuration options in the `pl.Config` namespace

In [None]:
pl.Config

Now when we print the `DataFrame` again we have 6 rows printed

In [None]:
df

## Overview of a DataFrame
Polars prints the dtype of each column under the name.

We can also get descriptive statistics with `df.describe`

In [None]:
df.describe()

Polars stores the `null_count` as an attribute so we can retrieve this cheaply.

For wide `DataFrames` we can use `glimpse` to see the first rows printed vertically

In [None]:
print(df.glimpse())

## Selecting and transforming data

### Using `[]`

We can select rows and columns using `[]` indexing...

In [None]:
df[:3,["time","load"]]

...but using this square bracket approach means that you don't get the benefits of **parallelisation**, **query optimisation** and **streaming** large datasets!

Square brackets are best used for:
- quickly inspecting data in interactive mode
- extracting values at the end of a calculation

To really take advantage of Polars we use the **Expression API**.

### Selecting and transforming `DataFrames` with the Expression API
To filter rows we use the `filter` method. 

Here we find all rows before a certain date

In [None]:
(
    df
    .filter(
        pl.col("time") < datetime(2020,1,1)
    )
)

> For more on filtering rows see the Filtering rows section of the course material. 

To select a **subset** of columns we use `select`

In [None]:
(
    df
    .select(
        pl.col("time"),pl.col("load")
    )
)

To **add or transform** a column we use `with_columns`.

In this example we add feature columns with:
- day-of-the-week
- one day lagged values

In [None]:
df2 = (
    df
    .with_columns(
        day_of_week = pl.col("time").dt.weekday(),
        lag_one_day = pl.col("load").shift(1)
    )
)
df2.head(2)

Polars runs multiple expressions in parallel.

> For more on selecting, adding and transforming columns see the Selecting & Transformations section of the course.

#### Visualisation
We visualise our features using Altair

In [None]:
alt.Chart(
    df2.select("load","day_of_week"),
    title="Box plot of load by day-of-week",
    width=450
).mark_boxplot(extent='min-max').encode(
    x="day_of_week:N",
    y="load:Q",
    color="day_of_week:N"
)

In [None]:
alt.Chart(
    df2.select("lag_one_day","load","day_of_week"),
    title="Load vs previous day's load by day-of-week"
).mark_circle().encode(
    x=alt.X("lag_one_day:Q",scale=alt.Scale(zero=False)),
    y=alt.Y("load:Q",scale=alt.Scale(zero=False)),
    color="day_of_week:N"
)

> For more on visualisation with Matplotlib, Seaborn and Plotly see the Visualisation section of the course or [my blog posts on visualisation](https://www.rhosignal.com/tags/visualisation/).

### Working with multiple columns
We use `pl.col` to reference a column or columns in the Expression API.

This could be verbose!

Polars has lots of ways to reference multiple columns at once.

For example we can do the same expression on all the floating point columns with the dtype in `pl.col`

In [None]:
(
    df2
    .with_columns(
        pl.col(pl.FLOAT_DTYPES).round()
    )
)

Polars **selectors** also provide ways to do multi-column selections. 

In this example we use a selector to select the time column and all the temperature columns

In [None]:
(
    df
    .select(
        "time",
        cs.contains("_temp")
    )
)

> For more on selectors see the Selecting and Transformations section of the course and [the official docs](https://pola-rs.github.io/polars/py-polars/html/reference/selectors.html)


### Why use expressions?

- expressions can be run in **parallel**
- expressions can be **optimised** in lazy mode by the query optimiser
- `[]` indexing can only be used in eager mode, but expressions can also **be used in lazy mode**

Get in the habit of using expressions as your default

## Fast-track algorithms on sorted data
Polars has fast-track algorithms when it knows a column is sorted. These allow optimised operations for:
- statistics (min/max/median/quantile)
- filter
- groupby
- join

We tell Polars a column is sorted using the `set_sorted` expression

In [None]:
df = (
    df
    .with_columns(
        pl.col("time").set_sorted()
   )
)
df["time"].flags

> For more on fast-track algorithms in the course see the lecture on Sorting in the Selecting & Transformations section, the introductory lecture on groupby in the Statistics & Grouping section and the introductory lecture on joins in the Combining DataFrames section. See also this blog post from my site [introducing sorted algorithms](https://www.rhosignal.com/posts/polars-loves-sorted-data-1-statistics/) and this blog post on [groupby with sorted algorithms](https://www.rhosignal.com/posts/polars-sorted-data-2/)

## Lazy mode and query optimisation
So far we have used eager mode where code is executed line-by-line.

Polars also has a lazy mode where it builds a query plan step-by-step before executing it.

In fact expressions in eager mode are really just line-by-line lazy mode under the hood.

In this example we start our lazy query from the CSV. We replace `pl.read_csv` with `pl.scan_csv` to show we are starting a lazy query

In [None]:
df_lazy = pl.scan_csv("load_features.csv",try_parse_dates=True)
df_lazy

When we run `pl.scan_csv`:
- Polars has a look at the first N rows of the CSV to get the column names and infer types
- Polars creates a `LazyFrame` where the first step in the query plan is to read the CSV

Any additional commands we apply to the `LazyFrame` will update the query plan until we evaluate the query. 

Here we have a query to create our training `LazyFrame`

In [None]:
df_train = (
    pl.scan_csv("load_features.csv",try_parse_dates=True)
    # Add features
    .with_columns(
        day_of_week = pl.col("time").dt.weekday(),
        # lag_one_day = pl.col("load").shift(1)
    )
    # Filter for records before 2020
    .filter(
        pl.col("time") < datetime(2020,1,1)
    )
    # Remove a redundant feature
    .select(pl.exclude("sunshine"))
)
df_train

To get the **optimised plan** use `.explain`

In [None]:
print(
    df_train.explain()
)

`df_train` is a `LazyFrame` rather than a `DataFrame`

- `DataFrame` -> expressions operate on the data
- `LazyFrame` -> expressions update the query plan

When new steps are added Polars will automatically optimise the query plan.

### Types of optimisations
- Projection pushdown (identifying relevant columns)
- Predicate pushdown (applying filters as early as possible)
- Combining predicates (combines multiple filter conditions)
- Slice pushdown (limit rows processed as early as possible when limited rows are required)
- Common subplan elimination (run duplicated transformations on the same data once and then re-use)
- Caching aggregations...

We evaluate a lazy query with the `collect` method

In [None]:
(
    df_train
    .collect()
)

> To get started with lazy mode see the Introduction section of the course material. The topic of lazy mode is then returned to within the other sections of the course.

### Streaming large datasets
Polars can process a lazy query for larger-than-memory datasets in batches. This is called **streaming** mode.

To evaluate a query in streaming mode pass the `streaming=True` argument to collect

In [None]:
(
    df_train
    .collect(streaming=True)
)

> For more on streaming mode see [this high-level video from my youtube channel](https://www.youtube.com/watch?v=3-C0Afs5TXQ) or blogs I have written such as this one on [setting parameters for streaming mode](https://www.rhosignal.com/posts/streaming-chunk-sizes/). Streaming mode is covered in the course primarily in the I/O section on CSV and Parquet files.

Not all operations are supported in streaming mode. You can check if a query will run in streaming with `explain`

In [None]:
print(
    df_train.explain(streaming=True)
)

The part of the query plan inside `--- PIPELINE` to `--- END PIPELINE` will be run in streaming mode.

See my blog post on [controlling streaming for more info](https://www.rhosignal.com/posts/streaming-chunk-sizes/).

Not all operations are supported by streaming - in this case the `shift` expression prevents streaming as it requires data from other batches. Removing it lets the whole pipeline work in streaming

In [None]:
print(
    pl.scan_csv("load_features.csv",try_parse_dates=True)
    # Comment out the shift expression
    .with_columns(
        day_of_week = pl.col("time").dt.weekday(),
        # lag_one_day = pl.col("load").shift(1)
    )
    # Filter for records before 2020
    .filter(
        pl.col("time") < datetime(2020,1,1)
    )
    # Remove a redundant feature
    .select(pl.exclude("sunshine"))
    .explain(streaming=True)
)

## Machine learning
At present you normally convert to `numpy` (or numpy-backed `pandas`) for ML libraries.

This situation is changing fast - see [this Sklearn issue thread](https://github.com/scikit-learn/scikit-learn/issues/25896) for some recent developments!

Conversion to numpy is typically a relatively cheap operation in the context of an ML pipeline

In [None]:
import catboost

# Evaluate the LazyFrame if needed
if isinstance(df_train,pl.LazyFrame):
    df_train = df_train.collect()

model = catboost.CatBoostRegressor()
X = df_train.drop("time","load").to_pandas()
y = df_train["load"].to_pandas()

model.fit(X,y,verbose=False)

Test the model out-of-sample

In [None]:
df_test = (
    pl.scan_csv("load_features.csv",try_parse_dates=True)
    .with_columns(
        day_of_week = pl.col("time").dt.weekday(),
        lag_one_day = pl.col("load").shift(1)
    )
    # Filter for records before 2020
    .filter(
        pl.col("time") >= datetime(2020,1,1)
    )
    # Remove a redundant feature
    .select(pl.exclude("sunshine"))
    .collect()
)
X_test = (
    df_test
    .drop("time","load")
    .to_pandas()
)


Create a `DataFrame` with out-of-sample values and predictions

In [None]:
pred_df = (
    df_test
    .with_columns(
        pl.Series("pred", model.predict(X_test))
    )
)
pred_df.head()

Visualise the outputs

In [None]:
(
    alt.Chart(
        (
            pred_df
            .select("time","load","pred")
            .melt(id_vars="time")
            .with_columns(
                pl.col("time").cast(pl.Datetime)
            )
        ),
    title="Out-of-sample test",
    width=700
    ).mark_line().encode(
        x="time:T",
        y="value:Q",
        color="variable:N"
    )
)

## Next steps
- Try Polars on your own data
- Check out the course materials
- Post questions on StackOverflow if you get stuck
- Join [the Polars discord](https://discord.com/invite/4UfP5cfBE7)

In [None]:
(
    pl.from_pandas(
        df
        .to_pandas(use_pyarrow_extension_array=True)
    )
)

In [None]:
df.write_database?