In [174]:
from datetime import datetime

import numpy as np
import polars as pl
from polars import col, lit
import pandas as pd
from pandas.io.common import get_handle

## Polars

* [Python Docs](https://pola-rs.github.io/polars-book/user-guide/introduction.html)
* [Github](https://github.com/pola-rs/polars)
* [PyPI](https://pypi.org/project/polars/)
* Features:
    * Leazy & Eager computation
    * Rust implementation
    * Arrow Memory Format
    * Easy and transparent parallelisation using multithreading
    * PySpark-like Syntax
    * Supports real NA values in contrast to Pandas
    * Easily deal with complex data types, e.g. list of strings/floats

In [2]:
pl.__version__

'0.7.15'

Download a huge csv as a test. Takes a while and only needed once...

In [135]:
with get_handle("http://sdm.lbl.gov/fastbit/data/star2002-full.csv.gz", compression="gzip", mode="r") as fh_in, open("big.csv", "bw") as fh_out:
    fh_out.write(fh_in.handle.buffer.read())

## Eager Execution

In [136]:
edf = pl.read_csv("./big.csv", has_headers=False)

In [137]:
edf.filter(col("column_1") == 1).select(["column_9"]).head()

column_9
i64
654
61
7
27
1


#### alternatively *Pandas* style (not recommended!)

In [138]:
edf[edf["column_1"] == 1][["column_9"]].head()

column_9
i64
654
61
7
27
1


Why shouldn't I use the Pandas style? Because ...

* it's much harder to read since it's not *operator chaining*,
* it's more verbose if you assign actual variable names to your dataframes and not just use `df` all the time. Check out this filtering example: `agg_metric_df[agg_metric_df["metric_1"] < 0.9]`. Using `col` to refer to the column of the current dataframe is much cleaner.,
* it's not possible to switch later from eager to lazy execution

## Lazy Execution

Just switching `read_csv` to `scan_csv` is all it needs to go from eager to lazy in this example. `collect` or `fetch` is then used to trigger the execution.

In [139]:
ldf = pl.scan_csv("./star2002-full.csv", has_headers=False)

In [140]:
ldf = ldf.filter(col("column_1") == 1)
ldf.select(["column_9"]).collect().head()

column_9
i64
654
61
7
27
1


#### Pandas style fails

In [141]:
ldf = pl.scan_csv("./star2002-full.csv", has_headers=False)
ldf[ldf["column_1"] == 1][["column_9"]].head()

TypeError: 'LazyFrame' object is not subscriptable

## Dealing with missing values

In [253]:
left_df = pl.DataFrame({"a": [1, 2, 3], "b": [None, "b", "c"]})
right_df = pl.DataFrame({"a": [1, 2], "c": [42., 69.]})

df = left_df.join(right_df, on="a", how="left")
df

a,b,c
i64,str,f64
1,,42.0
2,"""b""",69.0
3,"""c""",


In [254]:
df.filter(col("c").is_null())

a,b,c
i64,str,f64
3,"""c""",


# New columns

In [264]:
df.with_column((lit(3)*col("c")).alias("3*c"))

a,b,c,3*c
i64,str,f64,f64
1,,42.0,126.0
2,"""b""",69.0,207.0
3,"""c""",,


# Column Expressions

In [144]:
df = pl.DataFrame(
    {
        "nrs": [1, 2, 3, None, 5],
        "names": ["foo", "ham", "spam", "egg", None],
        "random": np.random.rand(5),
        "groups": ["A", "A", "B", "C", "B"],
    }
)
df

nrs,names,random,groups
i64,str,f64,str
1.0,"""foo""",0.921,"""A"""
2.0,"""ham""",0.109,"""A"""
3.0,"""spam""",0.754,"""B"""
,"""egg""",0.405,"""C"""
5.0,,0.709,"""B"""


In [145]:
df.select([pl.sum("nrs"), pl.col("names").sort()])

nrs,names
i64,str
11,
11,"""egg"""
11,"""foo"""
11,"""ham"""
11,"""spam"""


In [146]:
df.select(
    [
        col("names").n_unique().alias("unique_names_1"),
        col("names").unique().count().alias("unique_names_2")
    ]
)

unique_names_1,unique_names_2
u32,u32
5,5


In [147]:
df.select(col("names").filter(col("names").str_contains(r"am$")).count())

names
u32
2


Complex expressions are possible which are all *embarassingly parallel* by design and thus parallelized

In [148]:
df.select([pl.when(col("random") > 0.5).then(0).otherwise(col("random")) * pl.sum("nrs")])

literal
f64
0.0
1.201
0.0
4.451
0.0


Even window expressions are possible

In [149]:
df.select([
        col("*"),  # select all
        col("random").sum().over("groups").alias("sum[random]/groups"),
        col("random").list().over("names").alias("random/name"),
    ])

nrs,names,random,groups,sum[random]/groups,random/name
i64,str,f64,str,f64,list
1.0,"""foo""",0.921,"""A""",1.03,"""[0.9211602139428208]"""
2.0,"""ham""",0.109,"""A""",1.03,"""[0.10918347611397283]"""
3.0,"""spam""",0.754,"""B""",1.463,"""[0.7536068481694201]"""
,"""egg""",0.405,"""C""",0.405,"""[0.4046214089168467]"""
5.0,,0.709,"""B""",1.463,"""[0.7094905646270304]"""


# GroupBy

In [244]:
df = pl.read_csv("https://theunitedstates.io/congress-legislators/legislators-current.csv")

In [220]:
q = (
    df
    .lazy() # allows for working only on a subset using limit
    .groupby("first_name")
    .agg([pl.count("party"), col("gender").list(), pl.first("last_name")])
    .sort("party_count", reverse=True)
    .limit(5)
)
q.collect()

first_name,party_count,gender_agg_list,last_name_first
str,u32,list,str
"""John""",19,"""[M, M, ... M]""","""Barrasso"""
"""Mike""",12,"""[M, M, ... M]""","""Kelly"""
"""Michael""",11,"""[M, M, ... M]""","""Bennet"""
"""David""",11,"""[M, M, ... M]""","""Cicilline"""
"""James""",9,"""[M, M, ... M]""","""Inhofe"""


even conditionals work with aggregations

In [221]:
q = (
    df.lazy()
    .groupby("state")
    .agg(
        [
            (col("party") == "Democrat").sum().alias("anti"),
            (col("party") == "Republican").sum().alias("pro"),
        ]
    )
    .sort("pro", reverse=True)
    .limit(5)
)
q.collect()

state,anti,pro
str,u32,u32
"""TX""",13,24
"""FL""",10,18
"""OH""",4,13
"""CA""",44,11
"""NC""",5,10


expressions allow to easily compose more complex aggregations

In [222]:
def compute_age() -> pl.Expr:
    # Date64 is time in ms
    ms_to_year = 1e3 * 3600 * 24 * 365
    return (
        lit(datetime(2021, 1, 1)) - col("birthday")
    ) / (ms_to_year)


def avg_age(gender: str) -> pl.Expr:
    return (
        compute_age()
        .filter(col("gender") == gender)
        .mean()
        .alias(f"avg {gender} age")
    )


q = (
    df.lazy()
    .groupby(["state"])
    .agg(
        [
             avg_age("M"),
             avg_age("F"),
            (col("gender") == "M").sum().alias("# male"),
            (col("gender") == "F").sum().alias("# female"),
        ]
    )
    .limit(5)
)
q.collect()

state,avg M age,avg F age,# male,# female
str,f64,f64,u32,u32
"""AZ""",60.004,59.168,8,3
"""KY""",62.0,,8,0
"""MD""",67.334,,10,0
"""DC""",,83.611,0,1
"""VA""",63.239,46.494,10,3


# User-Defined (Aggregation) Functions

In [341]:
df = pl.DataFrame({"foo": np.arange(10), "bar": np.random.rand(10), "cls": np.random.randint(2, size=10)})

In [342]:
df

foo,bar,cls
i64,f64,i64
0,0.229,1
1,0.938,1
2,0.52,1
3,0.618,0
4,0.703,1
5,0.035,1
6,0.694,0
7,0.808,0
8,0.564,1
9,0.507,1


#### Vector Operations

`map` for vector operations on a whole column

In [302]:
def my_custom_func(s: pl.Series) -> pl.Series:
    return np.exp(s) / np.log(s)

my_udf = pl.udf(my_custom_func, output_type=pl.Float64)

In [321]:
df.filter(pl.col("bar").map(my_udf) > -1)

foo,bar
i64,f64
0,0.699
1,0.928
2,0.821
3,0.982
4,0.751
5,0.913
6,0.363
7,0.98
8,0.782


`apply` for scalar operations on a cell level

In [325]:
df.select(col("bar").apply(lambda x: 3*x))

bar
f64
2.098
2.785
2.462
2.945
2.254
2.739
1.089
2.939
2.345
0.197


#### Aggregation Operations

In [358]:
def my_custom_agg_func(s: pl.Series) -> pl.Series:
    groups = s.to_numpy()
    return pl.Series([(3*g).sum() for g in groups])

my_agg_udf = pl.udf(my_custom_agg_func, output_type=pl.Float64)

In [359]:
df.groupby(["cls"]).agg([col("bar").map(my_agg_udf)])

cls,bar
i64,f64
0,6.359
1,10.486
