# How to group DataFrame columns into buckets and map the buckets to different values 🚀

Have you ever heard of pyspark's [`Bucketizer`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.Bucketizer.html)? It can be really useful! Although you perhaps won't need it for some simple transformation, it can be really useful for certain usecases. 

In this blogpost, we will:

1. Explore the `Bucketizer` class
2. Combine it with `create_map`
3. Use a module so we don't have to write the logic ourselves 🗝🥳

Let's get started!

## The problem

First, let's boot up a local spark session:

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/07/21 11:50:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Say we have this dataset containing some persons:

In [2]:
from pyspark.sql import Row

people = spark.createDataFrame(
    [
        Row(age=12, name="Damian"),
        Row(age=15, name="Jake"),
        Row(age=18, name="Dominic"),
        Row(age=20, name="John"),
        Row(age=27, name="Jerry"),
        Row(age=101, name="Jerry's Grandpa"),
    ]
)
people

DataFrame[age: bigint, name: string]

Now, what we would like to do, is map each person's age to an age category.

|age range|life phase|
|-|-|
|0 to 12|Child|
|12 to 18|Teenager|
|18 to 25|Young adulthood|
|25 to 70|Adult|
|70 and beyond|Elderly|

How best to go about this?

## Using pyspark UDF's
A first option is to use a PySpark UDF:

In [3]:
from pyspark.sql.functions import udf
from pyspark.sql import DataFrame

def age_to_life_phase(age: int):
    if age < 0:
        return "Not yet born"
    elif age < 12:
        return "Child"
    elif age < 18:
        return "Teenager"
    elif age < 70:
        return "Adult"
    else:  # age >= 70
        return "Elderly"

bucketed_udf = udf(age_to_life_phase, "string")
people_with_phase_udf: DataFrame = people.withColumn("life phase", bucketed_udf("age"))
people_with_phase_udf.show()

                                                                                

+---+---------------+----------+
|age|           name|life phase|
+---+---------------+----------+
| 12|         Damian|  Teenager|
| 15|           Jake|  Teenager|
| 18|        Dominic|     Adult|
| 20|           John|     Adult|
| 27|          Jerry|     Adult|
|101|Jerry's Grandpa|   Elderly|
+---+---------------+----------+



But UDF's are sloooow 🐌. To keep our edge, we'll need something faster 😲.


## Using `Bucketizer` + `create_map`
Luckily, there's pyspark's `Bucketizer`. It works like so:

In [4]:
from pyspark.ml.feature import Bucketizer

bucketizer = Bucketizer(
    inputCol="age",
    outputCol="life phase",
    splits=[
        -float("inf"), 0, 12, 18, 25, 70, float("inf")
    ]
)
bucketed: DataFrame = bucketizer.transform(people)
bucketed.show()

+---+---------------+----------+
|age|           name|life phase|
+---+---------------+----------+
| 12|         Damian|       2.0|
| 15|           Jake|       2.0|
| 18|        Dominic|       3.0|
| 20|           John|       3.0|
| 27|          Jerry|       4.0|
|101|Jerry's Grandpa|       5.0|
+---+---------------+----------+



Cool! We just put our ages in buckets, represented by numbers. Let's now map each bucket to a life phase.

In [5]:
from pyspark.sql.functions import lit, create_map
from typing import Dict
from pyspark.sql.column import Column

range_mapper = create_map(
    [lit(0.0), lit("Not yet born")]
    + [lit(1.0), lit("Child")]
    + [lit(2.0), lit("Teenager")]
    + [lit(3.0), lit("Young adulthood")]
    + [lit(4.0), lit("Adult")]
    + [lit(5.0), lit("Elderly")]
)
people_phase_column: Column = bucketed["life phase"]
people_with_phase: DataFrame = bucketed.withColumn(
    "life phase", range_mapper[people_phase_column]
)
people_with_phase.show()


+---+---------------+---------------+
|age|           name|     life phase|
+---+---------------+---------------+
| 12|         Damian|       Teenager|
| 15|           Jake|       Teenager|
| 18|        Dominic|Young adulthood|
| 20|           John|Young adulthood|
| 27|          Jerry|          Adult|
|101|Jerry's Grandpa|        Elderly|
+---+---------------+---------------+



🎉 Success!

Using a combination of `Bucketizer` and `create_map`, we managed to map people's age to their life phases.

## `pyspark-bucketmap`

🎁 As a bonus, I put all of the above in a neat little module, which you can install simply using `pip`.

In [6]:
%pip install pyspark-bucketmap

[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip available: [0m[31;49m22.1.2[0m[39;49m -> [0m[32;49m22.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


Define the splits and mappings like before. Each dictionary key is a mapping to the n-th bucket (for example, bucket 1 refers to the range `0` to `12`).

In [7]:
from typing import List

splits: List[float] = [-float("inf"), 0, 12, 18, 25, 70, float("inf")]
mapping: Dict[int, Column] = {
    0: lit("Not yet born"),
    1: lit("Child"),
    2: lit("Teenager"),
    3: lit("Young adulthood"),
    4: lit("Adult"),
    5: lit("Elderly"),
}

Then, simply import `pyspark_bucketmap.BucketMap` and call `transform()`.

In [8]:
from pyspark_bucketmap import BucketMap
from typing import List, Dict

bucket_mapper = BucketMap(
    splits=splits, mapping=mapping, inputCol="age", outputCol="phase"
)
phases_actual: DataFrame = bucket_mapper.transform(people).select("name", "phase")
phases_actual.show()

+---------------+---------------+
|           name|          phase|
+---------------+---------------+
|         Damian|       Teenager|
|           Jake|       Teenager|
|        Dominic|Young adulthood|
|           John|Young adulthood|
|          Jerry|          Adult|
|Jerry's Grandpa|        Elderly|
+---------------+---------------+



You can find the module here:

[https://github.com/dunnkers/pyspark-bucketmap](https://github.com/dunnkers/pyspark-bucketmap)


## Benchmark

Using `pyspark-bucketmap` results in ~~faster~~ code compared to using pyspark UDF's:

In [145]:
def bucketmap_approach():
    bucket_mapper = BucketMap(
        splits=splits, mapping=mapping, inputCol="age", outputCol="phase"
    )
    return bucket_mapper.transform(df).select("name", "phase")

df: DataFrame = people
bucketmap_approach()

DataFrame[name: string, phase: string]

In [146]:
def udf_approach():
    bucketed_udf = udf(age_to_life_phase, "string")
    return df.withColumn("life phase", bucketed_udf("age"))

df: DataFrame = people
udf_approach()

DataFrame[age: bigint, name: string, life phase: string]

In [147]:
from itertools import repeat, product
from timeit import repeat as timeit
from functools import reduce
from typing import Callable
from copy import deepcopy

def benchmark(
    approach: Callable[[DataFrame], DataFrame],
    n_times: int = 1,
) -> dict:
    setup: Callable = lambda: reduce(lambda a, b: a.union(b), repeat(people, n_times))
    times: List[float] = timeit(deepcopy(approach), setup=setup, number=10, repeat=5)
    return dict(
        time=times, approach=approach.__name__, n_times=n_times
    )

benchmark(udf_approach, n_times=1)

{'time': [0.3006687560118735,
  0.31598263198975474,
  0.2988526640692726,
  0.34373067307751626,
  0.2657787390053272],
 'approach': 'udf_approach',
 'n_times': 1}

In [148]:
from typing import Tuple

benchmarks: List[Tuple[int, Callable]] = list(product(
    [1, 10, 100, 200, 300, 400, 500], [udf_approach, bucketmap_approach]
))
benchmarks

[(1, <function __main__.udf_approach()>),
 (1, <function __main__.bucketmap_approach()>),
 (10, <function __main__.udf_approach()>),
 (10, <function __main__.bucketmap_approach()>),
 (100, <function __main__.udf_approach()>),
 (100, <function __main__.bucketmap_approach()>),
 (200, <function __main__.udf_approach()>),
 (200, <function __main__.bucketmap_approach()>),
 (300, <function __main__.udf_approach()>),
 (300, <function __main__.bucketmap_approach()>),
 (400, <function __main__.udf_approach()>),
 (400, <function __main__.bucketmap_approach()>),
 (500, <function __main__.udf_approach()>),
 (500, <function __main__.bucketmap_approach()>)]

In [149]:
from tqdm import tqdm

benchmark_results = [
    benchmark(approach, n_times=n_times) for n_times, approach in tqdm(benchmarks)
]

100%|██████████| 14/14 [08:20<00:00, 35.73s/it]


In [152]:
import pandas as pd

df = pd.DataFrame(benchmark_results)
df["n"] = df["n_times"] * people.count()
df = df.explode("time")
df.head(2)

Unnamed: 0,time,approach,n_times,n
0,0.382576,udf_approach,1,6
0,0.338392,udf_approach,1,6


In [188]:
table = df.groupby(["approach", "n_times"]).agg({"time": "mean"}).unstack()
print(table.to_markdown())
table

| approach           |   ('time', 1) |   ('time', 10) |   ('time', 100) |   ('time', 200) |   ('time', 300) |   ('time', 400) |   ('time', 500) |
|:-------------------|--------------:|---------------:|----------------:|----------------:|----------------:|----------------:|----------------:|
| bucketmap_approach |      0.423795 |       0.44539  |        0.439529 |        0.328591 |        0.300757 |        0.485822 |        0.346207 |
| udf_approach       |      0.308301 |       0.297046 |        0.301018 |        0.286319 |        0.213509 |        0.166491 |        0.212083 |


Unnamed: 0_level_0,time,time,time,time,time,time,time
n_times,1,10,100,200,300,400,500
approach,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2,Unnamed: 5_level_2,Unnamed: 6_level_2,Unnamed: 7_level_2
bucketmap_approach,0.423795,0.44539,0.439529,0.328591,0.300757,0.485822,0.346207
udf_approach,0.308301,0.297046,0.301018,0.286319,0.213509,0.166491,0.212083


| approach           |   ('time', 1) |   ('time', 10) |   ('time', 100) |   ('time', 200) |   ('time', 300) |   ('time', 400) |   ('time', 500) |
|:-------------------|--------------:|---------------:|----------------:|----------------:|----------------:|----------------:|----------------:|
| bucketmap_approach |      0.423795 |       0.44539  |        0.439529 |        0.328591 |        0.300757 |        0.485822 |        0.346207 |
| udf_approach       |      0.308301 |       0.297046 |        0.301018 |        0.286319 |        0.213509 |        0.166491 |        0.212083 |


In [156]:
%pip install nbformat

[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip available: [0m[31;49m22.1.2[0m[39;49m -> [0m[32;49m22.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [157]:
import plotly.express as px

px.scatter(df, x="n", y="time", color="approach")

ValueError: Mime type rendering requires nbformat>=4.2.0 but it is not installed

|`pyspark-bucketmap`|pyspark UDF|
|-|-|
|17 ms ± 813 µs per loop|12.3 ms ± 281 µs per loop|

mean ± std. dev. of 7 runs, 1,000 loops each

## Conclusion
It's a wrap! Hope you learned something useful.

---

Written by [Jeroen Overschie](https://jeroenoverschie.nl/), working at [GoDataDriven](https://godatadriven.com/)