<a target="_blank" href="https://colab.research.google.com/github/bettercodepaul/data2day_2023_polars/blob/main/data2day_2023_Polars_Teil_3.ipynb">
  <img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/>
</a>

# Polars: Der Turbo Boost für Dataframes - Teil 3

Wichtige Links zur Erinnerung:

- Homepage von Polars: https://www.pola.rs/
- User-Guide: https://pola-rs.github.io/polars/user-guide/
- API-Referenz: https://pola-rs.github.io/polars/py-polars/html/reference/

## Installation + Vorbereitung

In [None]:
import urllib.request
import os.path

In [None]:
REQUIREMENTS_URL = "https://github.com/bettercodepaul/data2day_2023_polars/raw/main/requirements.txt"
urllib.request.urlretrieve(REQUIREMENTS_URL, os.path.basename(REQUIREMENTS_URL))

In [None]:
# nicht vergessen, dass die Laufzeitumgebung ggf. neu gestartet werden muss
!pip install -qr requirements.txt

In [None]:
import polars as pl

In [None]:
# bis zu 60 Zeichen pro Spalte ausgeben und Fließkommazahlen nicht abkürzen
pl.Config(fmt_str_lengths=60, fmt_float="full")

In [None]:
# CSV-Daten herunterladen
CSV_DATA_URL = "https://github.com/bettercodepaul/data2day_2023_polars/raw/main/spotify-charts-2017-2021-global-top200.csv.gz"
LOCAL_CSV_DATA_FILE_NAME = os.path.basename(CSV_DATA_URL)
urllib.request.urlretrieve(CSV_DATA_URL, LOCAL_CSV_DATA_FILE_NAME)
REGION_DATA_URL = "https://github.com/bettercodepaul/data2day_2023_polars/raw/main/region-info.csv"
LOCAL_REGION_DATA_FILE_NAME = os.path.basename(REGION_DATA_URL)
urllib.request.urlretrieve(REGION_DATA_URL, LOCAL_REGION_DATA_FILE_NAME)

In [None]:
# Parquet-Daten herunterladen
BIG_DATA_URL = "https://github.com/bettercodepaul/data2day_2023_polars/releases/download/data-parquet/spotify-charts-2017-2021.parquet"
LOCAL_BIG_DATA_FILE_NAME = os.path.basename(BIG_DATA_URL)
urllib.request.urlretrieve(BIG_DATA_URL, LOCAL_BIG_DATA_FILE_NAME)

In [None]:
# Übungen und Hilfsfunktionen herunterladen
EXERCISES_URL = "https://github.com/bettercodepaul/data2day_2023_polars/raw/main/data2day_exercises.py"
urllib.request.urlretrieve(EXERCISES_URL, os.path.basename(EXERCISES_URL))

In [None]:
from data2day_exercises import *

## When.Then.Otherwise

Manchmal wollt ihr einen Ausdruck in bestimmten Fällen so und in anderen Fällen so berechnen.

Dafür gibt es die Methoden `when.then.otherwise`, die einem `if.then.else` entsprechen.

In [None]:
df = pl.read_csv("spotify-charts-2017-2021-global-top200.csv.gz", try_parse_dates=True)

Das funktioniert dann so:

In [None]:
(df
    .with_columns(
        pl.when(pl.col("date").dt.day().eq(14) & pl.col("date").dt.month().eq(2))
        .then(pl.col("streams"))
        .otherwise(pl.lit(0))
        .alias("valentinesStreams")
    )
    .filter(pl.col("title").eq("Starboy") & pl.col("date").dt.week().eq(7))
    .select("date", "streams", "valentinesStreams")
    .head(5)
)

Wir könnten auch die Trend-Spalte selber nachstellen:

In [None]:
(df
    .join(
        # Position vom Vortag ermitteln
        df.select("url", pl.col("date").dt.offset_by("1d"), pl.col("rank").alias("previous_rank")),
        how="left",
        on=["url", "date"]
    )
    .with_columns(
        pl.when(pl.col("rank").lt(pl.col("previous_rank")))
        .then(pl.lit("MOVE_UP"))
        .otherwise(
            pl.when(pl.col("rank").gt(pl.col("previous_rank")))
            .then(pl.lit("MOVE_DOWN"))
            .otherwise(
                pl.when(pl.col("rank").eq(pl.col("previous_rank")))
                .then(pl.lit("SAME_POSITION"))
                .otherwise(pl.lit("NEW_ENTRY"))
            )
        ).alias("myTrend")
    )
    .select("title", "artist", "date", "trend", "myTrend")
    .sample(10)
)

Wenn es nur darum geht bestimmte, einzelne Werte mit anderen zu ersetzen, kann auch `map_dict` sehr praktisch sein.

In [None]:
mapping = {
    "SAME_POSITION": "➡️",
    "NEW_ENTRY": "🆕",
    "MOVE_UP": "⬆️",
    "MOVE_DOWN": "⬇️"
}
(df
  .with_columns(pl.col("trend")
  .map_dict(mapping).alias("trendSymbol"))
  .group_by("trendSymbol")
  .count()
)

## Custom Expressions

### Bitte nicht: `map_*`

Die denkbar schlechstes Möglichkeit eigene Funktionen in eine Polars-Abfrage einzuschmuggeln sind die verschiedenen map-Methoden `map_rows`, `map_batches`, `map_elements` und `map_groups`, die eine UDF (User Defined Function) ausführen.

Das sollte vermieden werden, weil die Performance darunter sehr stark leidet.

### Expression Factories




Eine elegantere Art & Weise zu modularem Code zu kommen, sind eigene Methoden, die neue Expressions erstellen und z.B. mit der `pipe` Methode aufgerufen werden können.

Hier ein Beispiel für die Methode `sum` für die es in Polars keinen `min_count` Parameter gibt (den es aber in Pandas gibt). Der `min_count` Parameter bestimmt, wie viele Werte mindestens vorhanden sein müssen, damit die Summe gebildet wird.

In [None]:
def sum(expr: pl.Expr, min_count=0) -> pl.Expr:
    if min_count > 0:
        return pl.when(
            expr.is_not_null().sum().ge(pl.lit(min_count))
        ).then(expr.sum())
    else:
        return expr.sum()

In [None]:
pl.DataFrame({
    "value": [42, 43, None], 
}).select(
    pl.col("value").pipe(sum, min_count=2).alias("min_count=2"),
    pl.col("value").pipe(sum, min_count=3).alias("min_count=3")
)

Wir können solche Methoden auch in einem eigenen Namespace registrieren.

In [None]:
@pl.api.register_expr_namespace("special")
class Special:
    def __init__(self, expr: pl.Expr):
        self._expr = expr

    def sum(self, min_count=0) -> pl.Expr:
        if min_count > 0:
            return pl.when(
                self._expr.is_not_null().sum().ge(pl.lit(min_count))
            ).then(self._expr.sum())
        else:
            return self._expr.sum()

Jetzt können wir die Methode innerhalb des eigenen Namespace "special" aufrufen.

In [None]:
pl.DataFrame({
    "value": [42, 43, None], 
}).select(
    pl.col("value").special.sum(min_count=2).alias("min_count=2"),
    pl.col("value").special.sum(min_count=3).alias("min_count=3")
)

Weiterführende Informationen gibt es hier:

- [Expr.pipe](https://pola-rs.github.io/polars/py-polars/html/reference/expressions/api/polars.Expr.pipe.html)
- [DataFrame.pipe](https://pola-rs.github.io/polars/py-polars/html/reference/dataframe/api/polars.DataFrame.pipe.html)
- [Extending the API](https://pola-rs.github.io/polars/py-polars/html/reference/api.html)

## Lazy vs. Eager

Bis jetzt haben wir Polars immer im "eager mode" benutzt. Jeder Funktionsaufruf hatte direkt eine Operation auf den Daten zur Folge.

Das hat Vorteile beim Debugging von Abfragen, verhindert aber viele Optimierungen, die Polars nur im "lazy mode" nutzen kann.

Für den "lazy mode" gibt es zwei Optionen.

### Eager Load + Lazy Query

Wenn ein Datensatz nicht zu groß ist, können wir ihn vollständig in den Speicher laden, wie wir es schon kennen.

In [None]:
df = pl.read_csv("spotify-charts-2017-2021-global-top200.csv.gz")
type(df)

Durch den Aufruf der `lazy` Methode schalten wir dann in den "lazy mode". Die Ausführung der Abfrage ist jetzt angehalten und es wird mit jedem weiteren Aufruf nur die Abfrage "formuliert".

In [None]:
lazy_df = df.lazy()
type(lazy_df)

In [None]:
# für einen lazy Dataframe wird der unoptimierte Abfragebaum ausgegeben
lazy_df.select("artist", "title").filter(pl.col("artist").eq("Dua Lipa"))

Der Plan wird von unten nach oben gelesen. Die griechischen Buchstaben sind aus der relationalen Algebra. Der Buchstabe π steht für die Operation Projektion (`select`), σ für die Operation Selektion (`filter`).

- Table π */9; σ -; bedeutet, dass alle neun Spalten gelesen werden und keine Selektion vorgenommen wird
- π 2/9 bedeutet, dass auf zwei von neun Spalten projiziert wird 
- FILTER BY ist die Selektion aus unserer Abfrage

In [None]:
# mit der Methode show_graph() können wir die optimierte Abfrage ausgeben
lazy_df.select("artist", "title").filter(pl.col("artist").eq("Dua Lipa")).show_graph()

Sowohl die Projektion als auch die Selektion passieren im optimierten Abfrageplan früher.

Die Abfrage wird letztendlich ausgeführt, wenn wir die Methode `collect` aufrufen. Das Ergebnis ist dann wieder ein normaler Dataframe.

In [None]:
result = lazy_df.select("artist", "title").filter(pl.col("artist").eq("Dua Lipa")).collect()
result.sample(2)

In [None]:
type(result)

Durch dieses Vorgehen, kann Polars Optimierungen vor der Ausführung der Abfrage vornehmen.

Eine Auswahl an Optimierungen findet ihr hier: https://pola-rs.github.io/polars/user-guide/lazy/optimizations/

### Lazy Load + Query

Wenn es sich nicht lohnt einen Datensatz vollständig in den Speicher zu laden, können wir auch das Laden der Daten verzögern, in dem wir die IO-Methoden mit dem Namen `scan_*` statt `write_*` nutzen.

Das funktioniert z.B. für Dateien in den Formaten CSV (`scan_csv`) und Parquet (`scan_parquet`), aber nicht für komprimierte CSVs.

Bisher haben wir immer mit einem kleinen Datensatz gearbeitet, der nur die globalen Top-200 Charts beinhaltet (362k Zeilen, 64 MB)

Wir können jetzt auf den richtigen Datensatz wechseln, der die Top-200 und die Viral-50 Charts für 70 verschiedene Regionen enthält (26m Zeilen, 4 GB).

In [None]:
df = pl.scan_parquet("spotify-charts-2017-2021.parquet")

Durch die optimierten Abfragen, werden nur die Daten aus der Datei geladen, die auch wirklich gebraucht werden.

In [None]:
(df
    .select("artist", "title")
    .filter(pl.col("artist").eq("Dua Lipa"))
).show_graph()

Je nach Abfrage können bestimmte Optimierungen nicht durchgeführt werden, weil sie das  Ergebnis verändern würden...

In [None]:
(df
    .head(2)
    .select("artist", "title")
    .filter(pl.col("artist").eq("Dua Lipa"))
).show_graph()

Manchmal lässt der Abfrage-Optimierer auch Potenzial liegen...

In [None]:
naive_query = (df
    .group_by("artist")
    .agg(pl.col("title").n_unique())
    .filter(pl.col("artist").eq("Dua Lipa"))
)
naive_query.show_graph()

In [None]:
%%timeit
naive_query.collect()

Wir optimieren händisch, dass zuerst gefiltert werden sollte, was die Abfrage deutlich beschleunigt.

In [None]:
optimized_query = (df
    .filter(pl.col("artist").eq("Dua Lipa"))
    .group_by("artist")
    .agg(pl.col("title").n_unique())
)
optimized_query.show_graph()

In [None]:
%%timeit
optimized_query.collect()

Der Optimierer wird allerdings auch ständig weiterentwickelt. Siehe für diesen konkreten Fall z.B. https://github.com/pola-rs/polars/issues/11678

### Streaming

Wenn das Endergebnis oder auch Zwischenergebnisse einer Abfrage nicht mehr in den RAM passen, hat Polars einen "streaming mode", der den benötigten RAM deutlich senken kann.

Wenn nur die Zwischen-Ergebnisse das Problem sind, kann der "streaming mode" mit `collect(streaming=True)` aktiviert werden. Das Endergebnis muss dann aber in den RAM passen.

Um auch ein End-Ergebnis, das nicht mehr in den RAM passt, auf die Festplatte zu schreiben, können die Methoden `sink_parquet`, `sink_csv` und `sink_ipc` genutzt werden. 

In [None]:
# Falls der Jupyter-Kernel abgestürzt ist, neu starten und diese Zeile ausführen
import polars as pl
df = pl.scan_parquet("spotify-charts-2017-2021.parquet")

In [None]:
# fraction ist der Anteil an Zeilen und beeinflusst den Speicherbedarf
# 0.003 ~ 4 GB (sollte mit 8 GB RAM laufen)
# 0.005 ~ 10 GB (sollte mit 16 GB RAM laufen)
# 0.008 ~ 26 GB (sollte mit 32 GB RAM laufen)
# 0.010 ~ 41 GB (sollte mit 64 GB RAM laufen)
# 0.015 ~ 92 GB (sollte mit 128 GB RAM laufen)
fraction = 0.008
row_count = round(26173514*fraction)
high_mem_query = (
    df.head(row_count).join(df.head(row_count), on="artist")
    .filter(
        pl.col("url").ne(pl.col("url_right")) &
        pl.col("date").gt(pl.col("date_right")) &
        pl.col("trend").eq("NEW_ENTRY") &
        pl.col("trend_right").eq("NEW_ENTRY")
    )
    .group_by("url").agg((pl.col("date") - pl.col("date_right")).min().alias("durationBetweenNewEntries"))
    .select(pl.col("durationBetweenNewEntries").mean())
)
print(f"Cross-product of {row_count:_} rows would contain {row_count**2:_} rows.")
print(f"Estimated size for the intermediate join result is {6e-10*row_count**2:.2f} GB.")

In [None]:
# probiere unterschiedliche Werte für "fraction" mit streaming=False und streaming=True
high_mem_query.collect(streaming=False)

Wenn eine Abfrage im Streaming-Modus ausgeführt werden kann, befindet sie sich innerhalb eines "Pipeline"-Knotens. Wenn einige Knoten nicht gestreamt werden können, werden sie außerhalb des "Pipeline"-Knotens angezeigt.

In [None]:
high_mem_query.show_graph(streaming=True)

## Übungen

In [None]:
df = (
    pl.scan_parquet("spotify-charts-2017-2021.parquet")
    .with_columns(pl.col("streams").cast(pl.Int64))
)

### Frage 21

In [None]:
q21.question()

In [None]:
q21_df = ...

In [None]:
q21.check(q21_df)

### Frage 22

In [None]:
q22.question()

In [None]:
q22_df = ...

In [None]:
q22.check(q22_df)

### Frage 23

In [None]:
q23.question()

In [None]:
q23_df = ...

In [None]:
q23.check(q23_df)

### Frage 24

In [None]:
q24.question()

In [None]:
q24_df = ...

In [None]:
q24.check(q24_df)

### Frage 25

In [None]:
q25.question()

In [None]:
region_df = pl.scan_csv("region-info.csv")
xmasYears_per_continent = ...


In [None]:
q25_df = ...

In [None]:
q25.check(q25_df)