# Jupyter Notebook: [Understanding Polars UDF Capabilities](https://medium.com/@npotapov)

## Setup and Imports

In [1]:
from datetime import datetime, timedelta
from random import choice, gauss, randrange, seed
from functools import lru_cache
import pyarrow.compute as pc

import polars as pl
import numpy as np
seed(42)

## Data Preparation and DataFrame Creation

In [2]:
base_time: datetime = datetime(2024, 8, 9, 0, 0, 0, 0)
num_records: int = 1_000_000

user_actions_data: list[dict] = [
    {
        "online_store": choice(["Shop1", "Shop2", "Shop3"]),
        "product_id": choice(["0001", "0002", "0003"]),
        "quantity": choice([1.0, 2.0, 3.0]),
        "action_type": ("purchase" if gauss() > 0.6 else "view"),
        "action_dt": base_time - timedelta(minutes=randrange(num_records)),
    }
    for x in range(num_records)
]
user_actions_df: pl.DataFrame = pl.DataFrame(user_actions_data)

## Using map_elements

In [3]:
%%time

user_actions_df.with_columns(  
    pl.col("quantity")
    .map_elements(lambda x: x ** 2)
    .alias("quantity_2")
).head()

CPU times: user 170 ms, sys: 26.8 ms, total: 197 ms
Wall time: 164 ms


Expr.map_elements is significantly slower than the native expressions API.
Only use if you absolutely CANNOT implement your logic otherwise.
Replace this expression...
  - pl.col("quantity").map_elements(lambda x: ...)
with this one instead:
  + pl.col("quantity") ** 2



online_store,product_id,quantity,action_type,action_dt,quantity_2
str,str,f64,str,datetime[μs],f64
"""Shop3""","""0001""",1.0,"""view""",2024-04-29 09:24:00,1.0
"""Shop3""","""0001""",3.0,"""view""",2023-02-16 15:54:00,9.0
"""Shop3""","""0001""",3.0,"""view""",2024-03-02 19:02:00,9.0
"""Shop1""","""0003""",3.0,"""view""",2024-07-20 16:16:00,9.0
"""Shop3""","""0001""",3.0,"""view""",2024-03-01 11:32:00,9.0


In [4]:
%%time

user_actions_df.with_columns(  
    quantity_2=pl.col("quantity") ** 2
).head()

CPU times: user 1.4 ms, sys: 6.53 ms, total: 7.93 ms
Wall time: 3.36 ms


online_store,product_id,quantity,action_type,action_dt,quantity_2
str,str,f64,str,datetime[μs],f64
"""Shop3""","""0001""",1.0,"""view""",2024-04-29 09:24:00,1.0
"""Shop3""","""0001""",3.0,"""view""",2023-02-16 15:54:00,9.0
"""Shop3""","""0001""",3.0,"""view""",2024-03-02 19:02:00,9.0
"""Shop1""","""0003""",3.0,"""view""",2024-07-20 16:16:00,9.0
"""Shop3""","""0001""",3.0,"""view""",2024-03-01 11:32:00,9.0


In [5]:
%%time

(
    user_actions_df
    .group_by("action_type")
    .agg(
        pl.col("quantity")
        .implode() # aggregate all column values into a list
        .map_elements(lambda x: x.sum())
    )
)  

CPU times: user 150 ms, sys: 38.8 ms, total: 189 ms
Wall time: 60.5 ms


action_type,quantity
str,f64
"""view""",1450707.0
"""purchase""",548727.0


In [6]:
%%time

(
    user_actions_df
    .group_by("action_type")
    .agg(pl.col("quantity").sum())
)  

CPU times: user 71.1 ms, sys: 16.5 ms, total: 87.6 ms
Wall time: 30.4 ms


action_type,quantity
str,f64
"""view""",1450707.0
"""purchase""",548727.0


In [7]:
def udf(action_type: str) -> str:
    return action_type.upper()

user_actions_df.select(pl.col("action_type").map_elements(lambda x: udf(x))).head()

action_type
str
"""VIEW"""
"""VIEW"""
"""VIEW"""
"""VIEW"""
"""VIEW"""


In [8]:
@lru_cache(maxsize=3) # default maxsize=128
def udf2(action_type: str) -> str:
    return action_type.upper()

user_actions_df.select(pl.col("action_type").map_elements(udf2)).head()

action_type
str
"""VIEW"""
"""VIEW"""
"""VIEW"""
"""VIEW"""
"""VIEW"""


In [9]:
udf2.cache_info()

CacheInfo(hits=999999, misses=3, maxsize=3, currsize=3)

In [10]:
user_actions_df.select(pl.col("quantity").map_elements(np.log)).head()

Expr.map_elements is significantly slower than the native expressions API.
Only use if you absolutely CANNOT implement your logic otherwise.
Replace this expression...
  - pl.col("quantity").map_elements(np.log)
with this one instead:
  + pl.col("quantity").log()

  user_actions_df.select(pl.col("quantity").map_elements(np.log)).head()


quantity
f64
0.0
1.098612
1.098612
1.098612
1.098612


In [11]:
def udf3(action_type: str, quantity:int) -> float:
    return quantity / 2 if action_type == 'view' else quantity * 2

In [12]:
(
    user_actions_df
    .select(
        pl.struct(["action_type", "quantity"])
        .map_elements(lambda obj: udf3(obj['action_type'], obj['quantity']), return_dtype=pl.Float64)
    )
    .head()
)

action_type
f64
0.5
1.5
1.5
1.5
1.5


## Using map_batches

In [13]:
user_actions_df.with_columns(
    pl.col('quantity').map_batches(
        lambda x: x.to_numpy().max(),
        returns_scalar=True,
    )
).head()

online_store,product_id,quantity,action_type,action_dt
str,str,f64,str,datetime[μs]
"""Shop3""","""0001""",3.0,"""view""",2024-04-29 09:24:00
"""Shop3""","""0001""",3.0,"""view""",2023-02-16 15:54:00
"""Shop3""","""0001""",3.0,"""view""",2024-03-02 19:02:00
"""Shop1""","""0003""",3.0,"""view""",2024-07-20 16:16:00
"""Shop3""","""0001""",3.0,"""view""",2024-03-01 11:32:00


In [14]:
def udf4(input):
    print(type(input))
    return input.slice(1,2)

In [15]:
(  
    user_actions_df
    .lazy()
    .map_batches(lambda x: udf4(x), streamable=True)
    .collect()
)

<class 'polars.dataframe.frame.DataFrame'>


online_store,product_id,quantity,action_type,action_dt
str,str,f64,str,datetime[μs]
"""Shop3""","""0001""",3.0,"""view""",2023-02-16 15:54:00
"""Shop3""","""0001""",3.0,"""view""",2024-03-02 19:02:00


In [16]:
def udf5(series):
    mean = series.mean()
    return pl.Series([value - mean for value in series])

user_actions_df.select(pl.col("quantity").map_batches(udf5, return_dtype=pl.Float64)).head()

quantity
f64
-0.999434
1.000566
1.000566
1.000566
1.000566


In [17]:
user_actions_df.select(quantity=pl.col("quantity") - pl.col("quantity").mean()).head()

quantity
f64
-0.999434
1.000566
1.000566
1.000566
1.000566


In [18]:
(
    user_actions_df.with_columns(
        pl.col('online_store').map_batches(
            lambda text: pl.from_arrow(
                pc.replace_substring_regex(
                    text.to_arrow(), pattern=r"[\p{L}]", replacement="*"
                )
            ),
            return_dtype=pl.Utf8,
        )
    ).head()
)

online_store,product_id,quantity,action_type,action_dt
str,str,f64,str,datetime[μs]
"""****3""","""0001""",1.0,"""view""",2024-04-29 09:24:00
"""****3""","""0001""",3.0,"""view""",2023-02-16 15:54:00
"""****3""","""0001""",3.0,"""view""",2024-03-02 19:02:00
"""****1""","""0003""",3.0,"""view""",2024-07-20 16:16:00
"""****3""","""0001""",3.0,"""view""",2024-03-01 11:32:00


## Using map_columns

In [19]:
user_actions_df.schema

Schema([('online_store', String),
        ('product_id', String),
        ('quantity', Float64),
        ('action_type', String),
        ('action_dt', Datetime(time_unit='us', time_zone=None))])

In [20]:
# shrink_dtype - Shrink numeric columns to the minimal required datatype.
user_actions_df.map_columns('quantity', lambda x: x.shrink_dtype()).schema

Schema([('online_store', String),
        ('product_id', String),
        ('quantity', Float32),
        ('action_type', String),
        ('action_dt', Datetime(time_unit='us', time_zone=None))])

## Using map_rows

In [21]:
user_actions_df.map_rows(lambda row: (row[0] + row[1], row[2])).head()

column_0,column_1
str,f64
"""Shop30001""",1.0
"""Shop30001""",3.0
"""Shop30001""",3.0
"""Shop10003""",3.0
"""Shop30001""",3.0


## Using map_groups

In [22]:
def udf6(group_df:pl.DataFrame) -> pl.DataFrame:
    group_df.glimpse(max_items_per_column=1)
    return group_df.max()

In [23]:
user_actions_df.sort("online_store").group_by("online_store").map_groups(udf6).sort('online_store') 

Rows: 332835
Columns: 5
$ online_store          <str> 'Shop1'
$ product_id            <str> '0003'
$ quantity              <f64> 3.0
$ action_type           <str> 'view'
$ action_dt    <datetime[μs]> 2024-07-20 16:16:00

Rows: 333569
Columns: 5
$ online_store          <str> 'Shop2'
$ product_id            <str> '0003'
$ quantity              <f64> 2.0
$ action_type           <str> 'view'
$ action_dt    <datetime[μs]> 2022-12-28 14:11:00

Rows: 333596
Columns: 5
$ online_store          <str> 'Shop3'
$ product_id            <str> '0001'
$ quantity              <f64> 1.0
$ action_type           <str> 'view'
$ action_dt    <datetime[μs]> 2024-04-29 09:24:00



online_store,product_id,quantity,action_type,action_dt
str,str,f64,str,datetime[μs]
"""Shop1""","""0003""",3.0,"""view""",2024-08-08 23:57:00
"""Shop2""","""0003""",3.0,"""view""",2024-08-08 23:55:00
"""Shop3""","""0003""",3.0,"""view""",2024-08-08 23:56:00


In [24]:
user_actions_df.sort('online_store').group_by("online_store").agg(pl.all().max())

online_store,product_id,quantity,action_type,action_dt
str,str,f64,str,datetime[μs]
"""Shop1""","""0003""",3.0,"""view""",2024-08-08 23:57:00
"""Shop2""","""0003""",3.0,"""view""",2024-08-08 23:55:00
"""Shop3""","""0003""",3.0,"""view""",2024-08-08 23:56:00


In [25]:
def udf7(group_df: pl.DataFrame) -> pl.DataFrame:
    q_min = group_df["quantity"].min()
    q_max = group_df["quantity"].max()
    return group_df.with_columns(
        ((pl.col("quantity") - q_min) / (q_max - q_min)).alias("quantity_norm")
    )

user_actions_df.group_by("online_store").map_groups(udf7).head()

online_store,product_id,quantity,action_type,action_dt,quantity_norm
str,str,f64,str,datetime[μs],f64
"""Shop2""","""0003""",2.0,"""view""",2022-12-28 14:11:00,0.5
"""Shop2""","""0001""",1.0,"""view""",2023-11-06 08:18:00,0.0
"""Shop2""","""0003""",1.0,"""view""",2022-10-07 09:44:00,0.0
"""Shop2""","""0001""",3.0,"""view""",2022-11-03 05:37:00,1.0
"""Shop2""","""0003""",1.0,"""purchase""",2023-03-14 22:43:00,0.0
