In [3]:
from pyspark.sql.types import *
import pyspark.sql
import pyspark.sql.functions as sf

# Pandas UDFs

"Normal" Python UDFs are pretty expensive (in terms of execution time), since for every record the following steps need to be performed:
* record is serialized inside JVM
* record is sent to an external Python process
* record is deserialized inside Python
* record is Processed in Python
* result is serialized in Python
* result is sent back to JVM
* result is deserialized and stored inside result DataFrame

This does not only sound like a lot of work, it actually is. Therefore Python UDFs are a magnitude slower than native UDFs written in Scala or Java, which run directly inside the JVM.

But since Spark 2.3 an alternative approach is available for defining Python UDFs with so called *Pandas UDFs*. Pandas is a commonly used Python framework which also offers DataFrames (but Pandas DataFrames, not Spark DataFrames). Spark 2.3 now can convert inside the JVM a Spark DataFrame into a shareable memory buffer by using a library called *Arrow*. Python then can also treat this memory buffer as a Pandas DataFrame and can directly work on this shared memory.

This approach has two major advantages:
* No need for serialization and deserialization, since data is shared directly in memory between the JVM and Python
* Pandas has lots of very efficient implementations in C for many functions

Due to these two facts, Pandas UDFs are much faster and should be preferred over traditional Python UDFs whenever possible.

# Sales Data Example

In this notebook we will be using a data set called "Watson Sales Product Sample Data" which was downloaded from https://www.ibm.com/communities/analytics/watson-analytics-blog/sales-products-sample-data/

In [None]:
basedir = "s3://dimajix-training/data"

In [12]:
data = spark.read\
    .option("header", True) \
    .option("inferSchema", True) \
    .csv(basedir + "/watson-sales-products/WA_Sales_Products_2012-14.csv")

data.printSchema()

root
 |-- Retailer country: string (nullable = true)
 |-- Order method type: string (nullable = true)
 |-- Retailer type: string (nullable = true)
 |-- Product line: string (nullable = true)
 |-- Product type: string (nullable = true)
 |-- Product: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Quarter: string (nullable = true)
 |-- Revenue: double (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Gross margin: double (nullable = true)



# 1. Classic UDF Approach

As an example, let's create a function which simply increments a numeric column by one. First let us have a look using a traditional Python UDF:

### Python function

In [18]:
def prev_quarter(quarter):
    q = int(quarter[1:2])
    y = int(quarter[3:8])
    
    prev_q = q - 1
    if (prev_q <= 0):
        prev_y = y - 1
        prev_q = 4
    else:
        prev_y = y
    
    return "Q" + str(prev_q) + " " + str(prev_y)
    
print(prev_quarter("Q1 2012"))
print(prev_quarter("Q4 2012"))

Q4 2011
Q3 2012


### Spark UDF

In [20]:
from pyspark.sql.functions import udf

# Use udf to define a row-at-a-time udf
@udf('string')
# Input/output are both a single double value
def prev_quarter_udf(quarter):
    return prev_quarter(quarter)

result = data.withColumn('prev_quarter', prev_quarter_udf(data["Quarter"]))
result.limit(10).toPandas()

Unnamed: 0,Retailer country,Order method type,Retailer type,Product line,Product type,Product,Year,Quarter,Revenue,Quantity,Gross margin,prev_quarter
0,United States,Fax,Outdoors Shop,Camping Equipment,Cooking Gear,TrailChef Deluxe Cook Set,2012,Q1 2012,59628.66,489,0.347548,Q4 2011
1,United States,Fax,Outdoors Shop,Camping Equipment,Cooking Gear,TrailChef Double Flame,2012,Q1 2012,35950.32,252,0.474274,Q4 2011
2,United States,Fax,Outdoors Shop,Camping Equipment,Tents,Star Dome,2012,Q1 2012,89940.48,147,0.352772,Q4 2011
3,United States,Fax,Outdoors Shop,Camping Equipment,Tents,Star Gazer 2,2012,Q1 2012,165883.41,303,0.282938,Q4 2011
4,United States,Fax,Outdoors Shop,Camping Equipment,Sleeping Bags,Hibernator Lite,2012,Q1 2012,119822.2,1415,0.29145,Q4 2011
5,United States,Fax,Outdoors Shop,Camping Equipment,Sleeping Bags,Hibernator Extreme,2012,Q1 2012,87728.96,352,0.398146,Q4 2011
6,United States,Fax,Outdoors Shop,Camping Equipment,Sleeping Bags,Hibernator Camp Cot,2012,Q1 2012,41837.46,426,0.335607,Q4 2011
7,United States,Fax,Outdoors Shop,Camping Equipment,Lanterns,Firefly Lite,2012,Q1 2012,8268.41,577,0.52896,Q4 2011
8,United States,Fax,Outdoors Shop,Camping Equipment,Lanterns,Firefly Extreme,2012,Q1 2012,9393.3,189,0.434205,Q4 2011
9,United States,Fax,Outdoors Shop,Camping Equipment,Lanterns,EverGlow Single,2012,Q1 2012,19396.5,579,0.461493,Q4 2011


# 2. Scalar Pandas UDF

Increment a value using a Pandas UDF. The Pandas UDF receives a `pandas.Series` object and also has to return a `pandas.Series` object.

In [21]:
from pyspark.sql.functions import pandas_udf, PandasUDFType

# Use pandas_udf to define a Pandas UDF
@pandas_udf('string', PandasUDFType.SCALAR)
# Input/output are both a pandas.Series of doubles
def prev_quarter_pudf(v):
    return v.apply(prev_quarter)

result = data.withColumn('prev_quarter', prev_quarter_pudf(data["Quarter"]))
result.limit(10).toPandas()

Unnamed: 0,Retailer country,Order method type,Retailer type,Product line,Product type,Product,Year,Quarter,Revenue,Quantity,Gross margin,prev_quarter
0,United States,Fax,Outdoors Shop,Camping Equipment,Cooking Gear,TrailChef Deluxe Cook Set,2012,Q1 2012,59628.66,489,0.347548,Q4 2011
1,United States,Fax,Outdoors Shop,Camping Equipment,Cooking Gear,TrailChef Double Flame,2012,Q1 2012,35950.32,252,0.474274,Q4 2011
2,United States,Fax,Outdoors Shop,Camping Equipment,Tents,Star Dome,2012,Q1 2012,89940.48,147,0.352772,Q4 2011
3,United States,Fax,Outdoors Shop,Camping Equipment,Tents,Star Gazer 2,2012,Q1 2012,165883.41,303,0.282938,Q4 2011
4,United States,Fax,Outdoors Shop,Camping Equipment,Sleeping Bags,Hibernator Lite,2012,Q1 2012,119822.2,1415,0.29145,Q4 2011
5,United States,Fax,Outdoors Shop,Camping Equipment,Sleeping Bags,Hibernator Extreme,2012,Q1 2012,87728.96,352,0.398146,Q4 2011
6,United States,Fax,Outdoors Shop,Camping Equipment,Sleeping Bags,Hibernator Camp Cot,2012,Q1 2012,41837.46,426,0.335607,Q4 2011
7,United States,Fax,Outdoors Shop,Camping Equipment,Lanterns,Firefly Lite,2012,Q1 2012,8268.41,577,0.52896,Q4 2011
8,United States,Fax,Outdoors Shop,Camping Equipment,Lanterns,Firefly Extreme,2012,Q1 2012,9393.3,189,0.434205,Q4 2011
9,United States,Fax,Outdoors Shop,Camping Equipment,Lanterns,EverGlow Single,2012,Q1 2012,19396.5,579,0.461493,Q4 2011


## 2.1 Limtations of Scalar UDFs

Scalar Pandas UDFs are used for vectorizing scalar operations. They can be used with functions such as select and withColumn. The Python function should take `pandas.Series` as inputs and return a `pandas.Series` of the same length. Internally, Spark will execute a Pandas UDF by splitting columns into batches and calling the function for each batch as a subset of the data, then concatenating the results together.

# 3. Grouped Pandas Aggregate UDFs

Since version 2.4.0, Spark also supports Pandas aggregation functions. This is the only way to implement custom aggregation functions in Python. Note that this type of UDF does not support partial aggregation and all data for a group or window will be loaded into memory.

In [25]:
@pandas_udf("double", PandasUDFType.GROUPED_AGG)
def mean_udf(v):
    return v.mean()

result = data.groupBy("Quarter").agg(mean_udf(data["Revenue"]).alias("mean_revenue"))
result.toPandas()

Unnamed: 0,Quarter,mean_revenue
0,Q1 2014,56259.616807
1,Q4 2012,37582.000088
2,Q2 2012,31604.267207
3,Q3 2013,44663.124562
4,Q3 2012,32882.506662
5,Q1 2013,40744.052459
6,Q2 2014,58878.36902
7,Q1 2012,34029.065862
8,Q2 2013,47540.27205
9,Q4 2013,48522.41469


## 3.1 Limitation of Aggregate UDFs

A Grouped Aggregate UDF defines an aggregation from one or more `pandas.Series` to a single scalar value, where each `pandas.Series` represents a column within the group or window.

# 4. Grouped Pandas Map UDFs
While the example above transforms all records independently, but only one column at a time, Spark also offers a so called *grouped Pandas UDF* which operates on complete groups of records (as created by a `groupBy` method). This could be used to replace windowing functions with some Pandas implementation.

For example let's subtract the mean of a group from all entries of a group. In Spark this could be achieved directly by using windowed aggregations. But let's first have a look at a Python implementation which does not use Pandas Grouped UDFs

In [26]:
import pandas as pd

@udf(ArrayType(DoubleType()))
def subtract_mean(values):
    series = pd.Series(values)
    center = series - series.mean()
    return [x for x in center]

groups = data.groupBy('Quarter').agg(sf.collect_list(data["Revenue"]).alias('values'))
result = groups.withColumn('center', sf.explode(subtract_mean(groups.values))).drop('values')
result.limit(10).toPandas()

Unnamed: 0,Quarter,center
0,Q1 2014,-14763.616807
1,Q1 2014,-25935.616807
2,Q1 2014,37599.433193
3,Q1 2014,9834.213193
4,Q1 2014,7572.783193
5,Q1 2014,-16801.366807
6,Q1 2014,22324.383193
7,Q1 2014,-45369.616807
8,Q1 2014,-24083.216807
9,Q1 2014,-32714.816807


This example is even incomplete, as all other columns are now missing... we don't want to complete this example, since Pandas Grouped Map UDFs provide a much better approach

## 4.1 Using Pandas Grouped Map UDFs

Now let's try to implement the same function using a Pandas grouped UDF. Grouped map Pandas UDFs are used with `groupBy().apply()` which implements the “split-apply-combine” pattern. Split-apply-combine consists of three steps:
1. Split the data into groups by using DataFrame.groupBy.
2. Apply a function on each group. The input and output of the function are both pandas.DataFrame. The input data contains all the rows and columns for each group.
3. Combine the results into a new DataFrame.

To use groupBy().apply(), the user needs to define the following:
* A Python function that defines the computation for each group.
* A StructType object or a string that defines the schema of the output DataFrame.

The column labels of the returned `pandas.DataFrame` must either match the field names in the defined output schema if specified as strings, or match the field data types by position if not strings, e.g. integer indices.

In [32]:
from pyspark.sql.types import *

# Define result schema
result_schema = StructType(data.schema.fields + [StructField("revenue_diff", DoubleType())])

@pandas_udf(result_schema, PandasUDFType.GROUPED_MAP)
# Input/output are both a pandas.DataFrame
def subtract_mean(pdf):
    revenue = pdf["Revenue"]
    return pdf.assign(revenue_diff=revenue - revenue.mean())

result = data.groupby('Quarter').apply(subtract_mean)
result.limit(10).toPandas()

Unnamed: 0,Retailer country,Order method type,Retailer type,Product line,Product type,Product,Year,Quarter,Revenue,Quantity,Gross margin,revenue_diff
0,United States,Fax,Outdoors Shop,Mountaineering Equipment,Rope,Husky Rope 50,2014,Q1 2014,41496.0,273,0.336118,-14763.616807
1,United States,Fax,Outdoors Shop,Mountaineering Equipment,Rope,Husky Rope 60,2014,Q1 2014,30324.0,168,0.299114,-25935.616807
2,United States,Fax,Outdoors Shop,Mountaineering Equipment,Rope,Husky Rope 100,2014,Q1 2014,93859.05,285,0.308627,37599.433193
3,United States,Fax,Outdoors Shop,Mountaineering Equipment,Rope,Husky Rope 200,2014,Q1 2014,66093.83,121,0.321989,9834.213193
4,United States,Fax,Outdoors Shop,Mountaineering Equipment,Safety,Granite Climbing Helmet,2014,Q1 2014,63832.4,908,0.252632,7572.783193
5,United States,Fax,Outdoors Shop,Mountaineering Equipment,Safety,Husky Harness,2014,Q1 2014,39458.25,639,0.291174,-16801.366807
6,United States,Fax,Outdoors Shop,Mountaineering Equipment,Safety,Husky Harness Extreme,2014,Q1 2014,78584.0,752,0.483923,22324.383193
7,United States,Fax,Outdoors Shop,Mountaineering Equipment,Safety,Granite Signal Mirror,2014,Q1 2014,10890.0,330,0.523939,-45369.616807
8,United States,Fax,Outdoors Shop,Mountaineering Equipment,Climbing Accessories,Firefly Charger,2014,Q1 2014,32176.4,626,0.564981,-24083.216807
9,United States,Fax,Outdoors Shop,Mountaineering Equipment,Climbing Accessories,Firefly Rechargeable Battery,2014,Q1 2014,23544.8,3098,0.585526,-32714.816807


## 4.2 Limitations of Grouped Map UDFs

Grouped Map UDFs are the most flexible Spark Pandas UDFs in regards with the return type. A Grouped Map UDF always returns a `pandas.DataFrame`, but with an arbitrary amount of rows and columns (although the columns need to be defined in the schema in the Python decorator `@pandas_udf`). This means specifically that the number of rows is not fixed as opposed to scalar UDFs (where the number of output rows must match the number of input rows) and grouped map UDFs (which can only produce a single scalar value per incoming group).