# Pandas UDFs

"Normal" Python UDFs are pretty expensive (in terms of execution time), since for every record the following steps need to be performed:
* record is serialized inside JVM
* record is sent to an external Python process
* record is deserialized inside Python
* record is Processed in Python
* result is serialized in Python
* result is sent back to JVM
* result is deserialized and stored inside result DataFrame

This does not only sound like a lot of work, it actually is. Therefore Python UDFs are a magnitude slower than native UDFs written in Scala or Java, which run directly inside the JVM.

But since Spark 2.3 an alternative approach is available for defining Python UDFs with so called *Pandas UDFs*. Pandas is a commonly used Python framework which also offers DataFrames (but Pandas DataFrames, not Spark DataFrames). Spark 2.3 now can convert inside the JVM a Spark DataFrame into a shareable memory buffer by using a library called *Arrow*. Python then can also treat this memory buffer as a Pandas DataFrame and can directly work on this shared memory.

This approach has two major advantages:
* No need for serialization and deserialization, since data is shared directly in memory between the JVM and Python
* Pandas has lots of very efficient implementations in C for many functions

Due to these two facts, Pandas UDFs are much faster and should be preferred over traditional Python UDFs whenever possible.

In [None]:
import pyspark.sql
import pyspark.sql.functions as f

from pyspark.sql.types import *
from pyspark.sql import SparkSession

if not 'spark' in locals():
    spark = SparkSession.builder \
        .master("local[*]") \
        .config("spark.driver.memory","24G") \
        .getOrCreate()

spark

In [None]:
spark.conf.set("spark.sql.adaptive.enabled", False)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

# Sales Data Example

In this notebook we will be using a data set called "Watson Sales Product Sample Data" which was downloaded from https://www.ibm.com/communities/analytics/watson-analytics-blog/sales-products-sample-data/

In [None]:
basedir = "s3://dimajix-training/data"

In [None]:
data = spark.read\
    .option("header", True) \
    .option("inferSchema", True) \
    .csv(basedir + "/watson-sales-products/WA_Sales_Products_2012-14.csv")

data.printSchema()

In [None]:
data.limit(10).toPandas()

# 1. Classic UDF Approach

As an example, let's create a function which simply increments a numeric column by one. First let us have a look using a traditional Python UDF:

### Python function

In [None]:
def prev_quarter(quarter):
    q = int(quarter[1:2])
    y = int(quarter[3:8])
    
    prev_q = q - 1
    if (prev_q <= 0):
        prev_y = y - 1
        prev_q = 4
    else:
        prev_y = y
    
    return "Q" + str(prev_q) + " " + str(prev_y)
    
print(prev_quarter("Q1 2012"))
print(prev_quarter("Q4 2012"))

### Spark UDF

In [None]:
from pyspark.sql.functions import udf

# Use udf to define a row-at-a-time udf
@udf('string')
# Input/output are both a single double value
def prev_quarter_udf(quarter):
    # YOUR CODE HERE

result = # YOUR CODE HERE
result.limit(10).toPandas()

# 2. Scalar Pandas UDF

Increment a value using a Pandas UDF. The Pandas UDF receives a `pandas.Series` object and also has to return a `pandas.Series` object.

In [None]:
from pyspark.sql.functions import pandas_udf, PandasUDFType

# YOUR CODE HERE

result = data.withColumn('prev_quarter', prev_quarter_pudf(data["Quarter"]))
result.limit(10).toPandas()

## 2.1 Using Python Type Hints

When using Spark >= 3.0.0 and Python >= 3.6, the now preferred way of passing type information is to use Python type hints.

In [None]:
from pyspark.sql.functions import pandas_udf
import pandas as pd

# YOUR CODE HERE

result = data.withColumn('prev_quarter', prev_quarter_pudf(data["Quarter"]))
result.limit(10).toPandas()

## 2.2 Multi Arguments

Of course you can also create simple Pandas UDFs with more than one argument as follows:

In [None]:
from pyspark.sql.functions import pandas_udf

# YOUR CODE HERE

result = data.withColumn('product_shortcode', short_code(data["Product line"], data["Product type"]))
result.limit(10).toPandas()

## 2.3 Exercise

Write a small Pandas UDF called `hash_code` which calculates the hash value (using the Python function `hash`) from the concatenation of two columns. Use this function for the two columns `Product line` and `Product type`. Note that the Python function `hash` returns a 64bit integer, which corresponds to a `LongType` in PySpark.

In [None]:
# YOUR CODE HERE

## 2.4 Benefits & Limtations

Scalar Pandas UDFs are used for vectorizing scalar operations. They can be used with functions such as select and withColumn. The Python function should take `pandas.Series` and `pandas.DataFrame`(in case of nested columns) as inputs and return a `pandas.Series` or a `pandas.DataFrame` of the same length. Internally, Spark will execute a Pandas UDF by splitting columns into batches and calling the function for each batch as a subset of the data, then concatenating the results together.

One important conceptional limitation of the Pandas scalar UDF is that the resulting Series / DataFrame has to have the same number of rows as the incoming DataFrame. We will soon see an alternative API which will remove this limitation.

# 3. Pandas Series Iterator UDFs

In addition to the simple Pandas Series UDF, Spark also supports a related Pandas Series Iterator UDF, which will work on an iterator of Serieses. The main benefit of this function is that it can perform some expensive initilization logic at the beginnning, whose cost will be amortized over the different sub-series in the iterator.

In [None]:
from pyspark.sql.functions import pandas_udf
from typing import Iterator

# YOUR CODE HERE

result = data.withColumn('prev_quarter', prev_quarter_pudf(data["Quarter"]))
result.limit(10).toPandas()

In [None]:
result.explain()

## 3.1 Benefits & Limitations

The same remarks as for Pandas Series UDFs also apply to the iterator based variant of the API. The main benefit of this variant is the possibility to perform expensive initialization stuff at the beginning.

# 4. Pandas Map UDFs

The method `DataFrame.mapInPandas` also provides a very efficient implementation for applying a Pandas function to a whole Spark DataFrame.

In [None]:
from typing import Iterator
from functools import reduce

# Input/output are both an iterator of pandas.DataFrame
def hash_columns(# YOUR CODE HERE):
    for pdf in iterator:
        # Convert all columns to string columns
        cols = [pdf[col].apply(str) for col in pdf.columns]
        # Concatenate all columns
        h = reduce(lambda x,y: x + y, cols)
        # Hash result
        h = h.apply(hash)
        # YOUR CODE HERE
        
# Define result schema
result_schema = # YOUR CODE HERE

result = # YOUR CODE HERE
result.limit(10).toPandas()

In [None]:
result.explain()

## 4.1 Exercise

Implement a Pandas Map UDF which calculates the "Revenue per Item" as the ratio of the columns `Revenue` and `Quantity`. Only return those records with a "Revenue per Item" of at least 1200.

In [None]:
# YOUR CODE HERE

## 4.2 Benefits & Limitations

Similar to Pandas scalar UDFs, using `mapInPandas` does not see the full Spark DataFrame. Instead it will receive smaller chunks. Therefore some operations requiring the full DataFrame will not work, for example when trying to calculate global aggregates. One main advantage over simple scalar functions is that this method will not produce an individual column, but a full DataFrame. This implies, that the number of records of the outgoing DataFrame can be different from the incoming one. This is conceptionally not possible with the Scalar UDFs.

# 5. Grouped Pandas Map UDFs
While the example above transforms all records independently, but only one column at a time, Spark also offers a so called *grouped Pandas UDF* which operates on complete groups of records (as created by a `groupBy` method). This could be used to replace windowing functions with some Pandas implementation.

For example let's subtract the mean of a group from all entries of a group. In Spark this could be achieved directly by using windowed aggregations. But let's first have a look at a Python implementation which does not use Pandas Grouped UDFs

In [None]:
# YOUR CODE HERE


This example is even incomplete, as all other columns are now missing... we don't want to complete this example, since Pandas Grouped Map UDFs provide a much better approach

## 5.1 Using Pandas Grouped Map UDFs

Now let's try to implement the same function using a Pandas grouped UDF. Grouped map Pandas UDFs are used with `groupBy().apply()` which implements the “split-apply-combine” pattern. Split-apply-combine consists of three steps:
1. Split the data into groups by using DataFrame.groupBy.
2. Apply a function on each group. The input and output of the function are both pandas.DataFrame. The input data contains all the rows and columns for each group.
3. Combine the results into a new DataFrame.

To use groupBy().apply(), the user needs to define the following:
* A Python function that defines the computation for each group.
* A StructType object or a string that defines the schema of the output DataFrame.

The column labels of the returned `pandas.DataFrame` must either match the field names in the defined output schema if specified as strings, or match the field data types by position if not strings, e.g. integer indices.

In [None]:
from pyspark.sql.types import *

# Input/output are both a pandas.DataFrame
def subtract_mean(pdf: pd.DataFrame) -> pd.DataFrame:
    # YOUR CODE HERE

# Define result schema
result_schema = StructType(data.schema.fields + [StructField("revenue_diff", DoubleType())])

result = # YOUR CODE HERE
result.limit(10).toPandas()

## 5.2 Exercise

Implement a Pandas UDF to be used as a grouped map which calculates in minimum and maximum quantity per group and stores the result in two new additional columns `Min Quantity` and `Max Quantity`. Moreover the function should remove all records with a quantity smaller or equal to `(min_quantity + max_quantity)/2`. Apply this function to calculate the min/max per quarter and per product.

In [None]:
# YOUR CODE HERE

## 5.3 Limitations of Grouped Map UDFs

Grouped Map UDFs are the most flexible Spark Pandas UDFs in regards with the return type. A Grouped Map UDF always returns a `pandas.DataFrame`, but with an arbitrary amount of rows and columns (although the columns need to be defined in the schema in the Python decorator `@pandas_udf`). This means specifically that the number of rows is not fixed as opposed to scalar UDFs (where the number of output rows must match the number of input rows) and grouped map UDFs (which can only produce a single scalar value per incoming group).

# 6. Grouped Pandas Aggregate UDFs

Since version 2.4.0, Spark also supports Pandas aggregation functions. This is the only way to implement custom aggregation functions in Python. Note that this type of UDF does not support partial aggregation and all data for a group or window will be loaded into memory.

In [None]:
# YOUR CODE HERE

result = data.groupBy("Quarter").agg(mean_udf(data["Revenue"]).alias("mean_revenue"))
result.toPandas()

In [None]:
result.explain()

## 6.1 Full DataFrame

You can even apply a Pandas aggregate UDF to a full Spark DataFrame. But be aware that the whole data will be transferred to and processed by a single node. This means that this will not work well with huge data sets which do not fit into the memory of a single node.

In [None]:
result = # YOUR CODE HERE
result.toPandas()

In [None]:
result.explain()

## 6.2 Exercise

Write a Pandas Aggregate UDF called `sum_top_revenue` which first calculates the median value of a given Pandas Series. Then the UDF should sum up all records which are equal or larger than the median value. The function should be applied to the revenue per Quarter and per Product line.

In [None]:
# YOUR CODE HERE

## 6.3 Benefits & Limitations

A Grouped Aggregate UDF defines an aggregation from one or more `pandas.Series` to a single scalar value, where each `pandas.Series` represents a column within the group or window.

# Summary

We saw a couple of different Pandas UDF types, now the confusion starts when to use what. Actually most of the variants provide an interface that already imply their use case. 

* **Scalar UDF** This is the simplest form of a Pandas UDF and is used to transform one or multiple columns into a new (possibly nested) column. Each invocation of the Python code itself will receive a small subset of the whole data and is required to return the same number of rows. The UDF can be called at all places where a Spark function can be called (i.e. in `select`, `filter`, `withColumn` etc).
* **Map UDF** This form provides more flexibility than the scalar UDF, since the UDF will receive all columns from the Spark DataFrame. Each invocation will again receive a small subset of all rows, but with all columns. The UDF may return a Pandas DataFrame with a fixed set of columns but with a dynamic number of rows (i.e. it may return more or less rows than the incoming Pandas DataFrame). The Map UDF is used with the special PySpark method `mapInPandas`
* **Grouped Map UDF** This UDF is very powerful and can be used as a wide aggregate function in a `GROUP BY` transformation. Eeach invocation of the Python function will receive the full set of columns and the full set of rows belonging to one specific group. The function may again return a DataFrame with an arbitrary number of rows and is used with the special PySpark function `applyInPandas`.
* **Aggregation UDF** Finally PySpark also provides a simpler way for aggregating data than the grouped map UDF. The aggregation UDF has to return a single value (as opposed to a DataFrame with potentially multiple rows) and can be used whenever a Spark aggregate function (like `sum`, `avg`, ...) can be used.
