In [1]:
import pypolars as pl
from pypolars.lazy import *
import numpy as np
from string import ascii_letters
import pandas as pd
import os
from typing import Union

# Lazyness

Py-polars has a lazy API that supports a subset of the eager API. Lazyness means that operations aren't executed until you ask for them. Let's start with a short example..

Below we'll create a DataFrame in an eager fashion (meaning that the creation of the DataFrame is executed at once).

In [2]:
df = pl.DataFrame({"a": np.arange(0, 10),
              "b": np.random.rand(10),
               "c": list(ascii_letters[:10])})
df

+-----+-------+-----+
| a   | b     | c   |
| --- | ---   | --- |
| i64 | f64   | str |
| 0   | 0.251 | "a" |
+-----+-------+-----+
| 1   | 0.765 | "b" |
+-----+-------+-----+
| 2   | 0.349 | "c" |
+-----+-------+-----+
| 3   | 0.441 | "d" |
+-----+-------+-----+
| 4   | 0.378 | "e" |
+-----+-------+-----+
| 5   | 0.55  | "f" |
+-----+-------+-----+
| 6   | 0.344 | "g" |
+-----+-------+-----+
| 7   | 0.43  | "h" |
+-----+-------+-----+
| 8   | 0.244 | "i" |
+-----+-------+-----+
| 9   | 0.956 | "j" |
+-----+-------+-----+

## Lazy DataFrame
To make this a lazy dataframe we call the `.lazy` method. As we can see, not much happens.

In [3]:
ldf = df.lazy()
ldf

<pypolars.lazy.LazyFrame at 0x7f7f7033b860>

We can filter this `DataFrame` on all the rows, but we'll see that again nothing happens. 

*Note the `col` and `lit` (meaning **column** and **literal value**) are part of the lazy **dsl** (domain specific language) and are needed to build a query plan.*

In [4]:
ldf = ldf.filter(col("a") == (lit(2)))
ldf

<pypolars.lazy.LazyFrame at 0x7f7f391684e0>

The query is only executed when we ask for it. This can be done with `.collect` method. 
Below we execute the query and obtain our results.

In [5]:
ldf.collect()

+-----+-------+-----+
| a   | b     | c   |
| --- | ---   | --- |
| i64 | f64   | str |
| 2   | 0.349 | "c" |
+-----+-------+-----+

# Why lazy?
This lazyness opens up quite some cool possibitlies from an optimization perspective. 
It allows polars to modify the query right before executing it and make suboptimal queries more performant. Let's show this using various operations, comparing lazy execution with eager execution in both Polars and Pandas.

Let's create 2 DataFrames with quite some columns and rows.

In [6]:
def rand_string(n: int, set_size: int, lower=True) -> str:
    s = "".join(np.random.choice(list(ascii_letters[:set_size]), n))
    if lower:
        return s.lower()
    return s

In [33]:
rows = 30_000
columns = 30
key_size = 5
key_set_size = 10

np.random.seed(1)

dtypes = [np.float32, np.float64, np.int, str]

df_a = pl.DataFrame({f"column_{i}": np.array(np.random.rand(rows) * 10, dtype=np.random.choice(dtypes)) for i in range(columns)})
s = pl.Series("key",  np.array([rand_string(key_size, key_set_size) for _ in range(rows)]))
df_a.insert_at_idx(0, s)

rows = 20_000
columns = 8
df_b = pl.DataFrame({f"column_{i}": np.array(np.random.rand(rows) * 10, dtype=np.random.choice(dtypes)) for i in range(columns)})
s = pl.Series("key",  np.array([rand_string(key_size, key_set_size) for _ in range(rows)]))
df_b.insert_at_idx(0, s)


print("Showing a subset of df_a:")
# only show a sub_slice
df_a[:3, :10]

Showing a subset of df_a:


+---------+----------+----------+----------+----------+----------+-----------------------+----------+----------+----------+
| key     | column_0 | column_1 | column_2 | column_3 | column_4 | column_5              | column_6 | column_7 | column_8 |
| ---     | ---      | ---      | ---      | ---      | ---      | ---                   | ---      | ---      | ---      |
| str     | f32      | i64      | f32      | f32      | f32      | str                   | i64      | f64      | f64      |
| "ejbid" | 4.17     | 9        | 6.627    | 4.492    | 2.78     | "2.3672007927500314"  | 3        | 4.789    | 9.194    |
+---------+----------+----------+----------+----------+----------+-----------------------+----------+----------+----------+
| "bdgbc" | 7.203    | 3        | 1.216    | 8.639    | 5.813    | "0.17036859867274767" | 9        | 8.049    | 0.638    |
+---------+----------+----------+----------+----------+----------+-----------------------+----------+----------+----------+
| "ijffe

In [34]:
df_a_pd = df_a.to_pandas()
df_b_pd = df_b.to_pandas()

# Where Polars loses (or wins?)
Let's start with an operation where polars is slower than pandas; filtering. A filter predicate creates a boolean array. Polars/Arrow stores these boolean values not as boolean values of 1 byte,
but as bits, meaning 1 bytes stores 8 booleans. This reduces memory 8-fold, but has some overhead on array creation. As we can see pandas is more than 5x faster, though there is a huge spread.

Pandas has something called a blockmanager which hugely increases filtering performance (I believe due to cache optimallity). However this blockmanager gives performance hits when modifying blocks and block consolidation is triggered. This block consolidation triggers:

* when the blockmanager has > 100 blocks
* groupby operation is executed
* Operations: diff, take, xs, reindex, _is_mixed_type, _is_numeric_mixed_type, values, fillna, replace, resample, concat

Read more about the [blockmanager](https://uwekorn.com/2020/05/24/the-one-pandas-internal.html). 

In [35]:
%%timeit
df_a["column_2"] < 1

1000 loops, best of 5: 458 µs per loop


In [36]:
%%timeit
df_a_pd["column_2"] < 1

The slowest run took 5.25 times longer than the fastest. This could mean that an intermediate result is being cached.
10000 loops, best of 5: 93.7 µs per loop


If we use this mask to select rows from the DataFrame we see that polars gets slower linearly with the number of columns. If we apply this filter on a DataFrame with a single column pandas is **1.2~1.4x** faster, again with a huge_spread in pandas.

In [37]:
%%timeit
df_a[:, :1][df_a["column_2"] < 1]

1000 loops, best of 5: 1.02 ms per loop


In [38]:
%%timeit
df_a_pd.iloc[:, :1][df_a_pd["column_2"] < 1]

1000 loops, best of 5: 663 µs per loop


This loss inreases as we have more columns in the DataFrame. Here we observe that with 30 columns, polars is more than **~5x** slower.

In [39]:
%%timeit
df_a[df_a["column_2"] < 1]

100 loops, best of 5: 5.91 ms per loop


In [40]:
%%timeit
df_a_pd[df_a_pd["column_2"] < 1]

1000 loops, best of 5: 1.24 ms per loop


# Where polars definitly wins
However, polars wins in all the expensive operations. Joins an groupby operations take most of the running time of query. Below we see that a join is more than **2.5** faster and that pandas join operation takes 500 / 6 **> 80x** the runtime of polars' DataFrame filter.

In [16]:
%%timeit
df_a.join(df_b, left_on="key", right_on="key", how="inner").shape

100 loops, best of 5: 5.15 ms per loop


In [17]:
%%timeit
df_a_pd.merge(df_b_pd, left_on="key", right_on="key", how="inner").shape

10 loops, best of 5: 27.6 ms per loop


In the groupby operation with an aggregation on all the columns we see that polars is more than **300x** faster. Again the pandas groupby runtime takes **>2500** times the runtime of the filter. So, this proofs that it is better to be fast in the expensive operations.

In [18]:
%%timeit
df_a.groupby("key").max();

10 loops, best of 5: 43.4 ms per loop


In [19]:
%%timeit
df_a_pd.groupby("key").max();

1 loop, best of 5: 13.2 s per loop


# Query optimization
As we've seen, filtering a large number of columns is the slower operation of polars, let's see if lazyness can help optimize that. We'll start with a sub-optimal query which you see often:

In [22]:
def eager(df: Union[pl.DataFrame, pd.DataFrame]):
    df = df[df['column_2'] < 9]
    df = df[df['column_3'] > 1]
    df = df[df['column_6'] > 1]
    df = df[df['column_4'] > 1]
    return df
    
assert eager(df_a_pd).shape == eager(df_a).shape
eager(df_a_pd).shape

(17413, 31)

## Eager polars

In [23]:
%%timeit
eager(df_a)

10 loops, best of 5: 36.5 ms per loop


## Eager pandas

In [24]:
%%timeit
eager(df_a_pd)

100 loops, best of 5: 14.2 ms per loop


In [20]:
def lazy_query(df_a: pl.DataFrame):
    return (df_a.lazy().filter(col("column_2") < lit(9))
            .filter(col("column_3") > lit(1))
            .filter(col("column_6") > lit(1))
            .filter(col("column_4") > lit(1)))

## Lazy polars

In [21]:
%%timeit
lazy_query(df_a).collect()

100 loops, best of 5: 12.8 ms per loop


## Optimization: Combine predicates
Above the query optimizer aggregated all the filter and executed them at once. This reduces a lot of extra allocations at every filter operations. With this optimization, filtering is faster than in pandas suboptimal query. We did increase the eager performance by **37/13 = ~2.8x** by rewriting the query.

## Optimization (Projection pushdown) Selecting important columns.
Let's look at another optimization. Let's say we are only interested in the columns `"key"` and `"column_1"`. A suboptimal eager could be written like below. This query could be more performant if the projection (selecting columns) was done before the selection (filtering rows). Below we see that the lazy query is optimized and selects the needed columns before doing the filter operation. This speeds up the query to **~2x** that of pandas and **~5x** that of polars.

In [41]:
def eager(df: Union[pl.DataFrame, pd.DataFrame]):
    df = df[df['column_2'] < 9]
    df = df[df['column_3'] > 1]
    df = df[df['column_6'] > 1]
    df = df[df['column_4'] > 1]
    return df[["key", "column_1"]]

def lazy_query(df_a: pl.DataFrame):
    return (df_a.lazy().filter(col("column_2") < lit(9))
            .filter(col("column_3") > lit(1))
            .filter(col("column_6") > lit(1))
            .filter(col("column_4") > lit(1))
            .select([col("key"), col("column_1")]))

## Eager polars

In [42]:
%%timeit
eager(df_a)

10 loops, best of 5: 36.9 ms per loop


## Eager pandas

In [43]:
%%timeit
eager(df_a_pd)

100 loops, best of 5: 15.1 ms per loop


## Lazy polars

In [44]:
%%timeit
lazy_query(df_a).collect()


100 loops, best of 5: 7.1 ms per loop


# Optimization: Combine join with predicates
The previous queries where suboptmial and could written more performantly. Now let's look at an optimization that we only get by lazyness.
Let's say we join `df_a` with `df_b`. Because we already have both DataFrames in memory it is hard to tell if we need to do the filter before or after the join for an optimal query. Let's try both.

In [45]:
def eager(df_a: Union[pl.DataFrame, pd.DataFrame], df_b: Union[pl.DataFrame, pd.DataFrame]):
    df_a = df_a[df_a["column_1"] < 1]
    df_b = df_b[df_b["column_1"] < 1]
    # pandas
    if hasattr(df_a, "values"):
        return df_a.merge(df_b, left_on="key", right_on="key")
    return df_a.join(df_b, left_on="key", right_on="key")


## Eager polars; filter before join

In [46]:
%%timeit
eager(df_a, df_b)

100 loops, best of 5: 8.62 ms per loop


## Eager pandas; filter before join

In [47]:
%%timeit
eager(df_a_pd, df_b_pd)

100 loops, best of 5: 11.2 ms per loop


In [48]:
def eager(df_a: Union[pl.DataFrame, pd.DataFrame], df_b: Union[pl.DataFrame, pd.DataFrame]):
    # pandas
    if hasattr(df_a, "values"):
        df = df_a.merge(df_b, left_on="key", right_on="key")
        df = df[df["column_1_x"] < 1]
        return df
    df = df_a.join(df_b, left_on="key", right_on="key")
    df = df[df["column_1"] < 1]
    return df


## Eager polars; filter after join

In [49]:
%%timeit
eager(df_a, df_b)

100 loops, best of 5: 7.1 ms per loop


## Eager pandas; filter after join

In [50]:
%%timeit
eager(df_a_pd, df_b_pd)

10 loops, best of 5: 50.2 ms per loop


In [51]:
def lazy_query(df_a: pl.DataFrame, df_b: pl.DataFrame):
    return (df_a.lazy()
         .join(df_b.lazy(), left_on=col("key"), right_on=col("key"), how="inner")
         .filter(col("column_1") < lit(1))
    )


## Lazy polars; filter after join

In [52]:
%%timeit
lazy_query(df_a, df_b).collect().shape

100 loops, best of 5: 4.76 ms per loop


As we can see, in pandas choosing the wrong order of filters has a large effect, slowing down the query more **5x**. In polars ther oder of the filtering doesn't have too much influence on the query time, both are ~7ms. 

In the lazy variant, the optimizer combines the join and the filter in a single operation, thereby saving a redundant allocation. This is not always optimal. If the join algorithm increases the number of rows by a factor of 10 or 100, this can be more expensive than a filter up front. However, it remains a reasonable default that reduces most queries. By lazyness the query is **~2x** faster in polars, and **~2x-10x** than pandas depending on the order of operations.

# Some other queries

In [56]:
def lazy_query(df_a: pl.DataFrame, df_b: pl.DataFrame):
    return (df_a.lazy()
         .join(df_b.lazy(), left_on=col("key"), right_on=col("key"), how="inner")
         .filter(col("column_1") < lit(1))
         .groupby("key")
         .agg([col("column_0").agg_sum()])
         .select([col("key"), col("column_0_sum")])
    )

In [57]:
%%timeit
lazy_query(df_a, df_b).collect()

100 loops, best of 5: 5 ms per loop


## Udf (User defined fucntions <3 Lazyness)
The lazy api also has access to all the `eager` operations on `Series` because there are udf's with almost no overhead (no serializing or pickling). Below we'll add a column `"udf"` with a `lambda` and help of the eager api. It still needs some polishing, as we need to make sure that we don't modify the dtypes. I hope you can imagine that this can be very powerful! :)


In [58]:
%%time
def lazy_query(df_a: pl.DataFrame, df_b: pl.DataFrame):
    return (df_a.lazy()
         .join(df_b.lazy(), left_on=col("key"), right_on=col("key"), how="inner")
         .filter(col("column_1") < lit(1))
         .groupby("key")
         .agg([col("column_0").agg_sum(), col("column_2").agg_max().alias("foo")])
         .with_column(col("foo").apply(
             lambda series: pl.Series("", np.ones(series.len(), dtype=np.float32) * series.sum() )
                                               ).alias('udf'))
         .select([col("key"), col("column_0_sum"), col("udf"), col("foo")])
    )

lazy_query(df_a, df_b).collect()

CPU times: user 14.1 ms, sys: 3.87 ms, total: 17.9 ms
Wall time: 10.9 ms


In [248]:
# More coming up later.