# Introduction


- **Pandas user-defined function (UDF):**  also known as vectorized UDF
- It is a user-defined function that uses Apache Arrow to transfer data and pandas to work with the data
- Pandas UDFs allow vectorized operations that can increase performance up to 100x compared to row-at-a-time Python UDFs

# Type hint in pandas udf

- In Python, type hints are used to statically indicate the type of a value. This can be done for variables, parameters, function arguments, and return values
- In the context of Pandas UDFs, type hints are used to specify the types of the input and output arguments of the UDF
- This can help to improve the **the readability, maintainability of the code, and helps to catch errors at compile time**

In [0]:
import pandas as pd

In [0]:
def multiply(a: pd.Series, b: pd.Series) -> pd.Series:
    return a*b

In [0]:
x = pd.Series([2, 3, 4])
y = pd.Series([5, 6, 7])

In [0]:
print(multiply(x, y))

# 1. Series to Series UDF

- **Use:** To vectorize scalar operations & with APIs such as `select` and `withColumn`
- Spark runs a pandas UDF by splitting columns into batches, calling the function for each batch as a subset of the data, then concatenating the results

## Create a pandas UDF that computes the product of 2 columns

In [0]:
# Import necessary libraries
import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

In [0]:
# Declare the function and create the UDF
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
    return a * b

multiply = pandas_udf(multiply_func, returnType=LongType())

In [0]:
# Test the Function Locally
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))

In [0]:
# Create a Spark DataFrame
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))

In [0]:
df.show()

In [0]:
# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()

# 2. Iterator of Series to Iterator of Series UDF

**An iterator UDF is the same as a scalar pandas UDF except:**
1) The Python function: Takes an iterator of batches instead of a single input batch as input, Returns an iterator of output batches instead of a single output batch
2) The length of the entire output in the iterator should be the same as the length of the entire input
3) The wrapped pandas UDF takes a single Spark column as an input

- Useful when the UDF execution requires initializing some state
- **For example,** loading a ML model file to apply inference to every input batch

In [0]:
# Import necessary libraries
import pandas as pd
from typing import Iterator
from pyspark.sql.functions import col, pandas_udf, struct

In [0]:
# Create a Pandas DataFrame
pdf = pd.DataFrame([1, 2, 3], columns=["x"])

# Convert the Pandas DataFrame to a Spark DataFrame
df = spark.createDataFrame(pdf)

In [0]:
df.show()

In [0]:
# Define a Pandas UDF 'plus_one'

# When the UDF is called with the column, the input to the underlying function is an iterator of pd.Series
@pandas_udf("long")
def plus_one(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    for x in batch_iter:
        yield x + 1

In [0]:
# Apply the 'plus_one' UDF to the "x" column of the Spark DataFrame
df.select(plus_one(col("x"))).show()

In [0]:
# Define another Pandas UDF named 'plus_y'

y_bc = spark.sparkContext.broadcast(1)

@pandas_udf("long")
def plus_y(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    y = y_bc.value  # initialize states
    try:
        for x in batch_iter:
            yield x + y
    finally:
        pass  # release resources here, if any

In [0]:
# Apply the 'plus_y' UDF to the "x" column of the Spark DataFrame
df.select(plus_y(col("x"))).show()

# 3. Iterator of multiple Series to Iterator of Series UDF

- Similar characteristics and restrictions as Iterator of Series to Iterator of Series UDF
- The specified function takes an iterator of batches and outputs an iterator of batches
- It is also useful when the UDF execution requires initializing some state

**The differences are:**
- The underlying Python function takes an iterator of a tuple of pandas Series
- The wrapped pandas UDF takes multiple Spark columns as an input

In [0]:
# Importing Libraries
import pandas as pd
from typing import Iterator, Tuple
from pyspark.sql.functions import col, pandas_udf, struct

In [0]:
# Create a Pandas DataFrame
pdf = pd.DataFrame([1, 2, 3], columns=["x"])

# Convert the Pandas DataFrame to a Spark DataFrame
df = spark.createDataFrame(pdf)

In [0]:
df.show()

In [0]:
# Define a Pandas UDF 'multiply_two_cols'
@pandas_udf("long")
def multiply_two_cols(
        iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
    for a, b in iterator:
        yield a * b

In [0]:
# Apply the 'multiply_two_cols' UDF to the "x" column of the Spark DataFrame
df.select(multiply_two_cols("x", "x")).show()

# 4. Series to scalar UDF

- Similar to Spark aggregate functions
- Defines an aggregation from one or more pandas Series to a scalar value, where each pandas Series represents a Spark column
- **Use:** With APIs such as select, withColumn, groupBy.agg, and pyspark.sql.Window

- The return type should be a primitive data type, and the returned scalar can be either a Python primitive type, **for example**, int or float or a NumPy data type (numpy.int64 or numpy.float64)
- Does not support partial aggregation and all data for each group is loaded into memory

## Compute mean with select, groupBy, and window operations

In [0]:
# Importing Libraries
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window

In [0]:
# Creating DataFrame
df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

In [0]:
df.show()

In [0]:
#  Define a Pandas UDF 'mean_udf'
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
    return v.mean()

In [0]:
# Apply the 'mean_udf' UDF
df.select(mean_udf(df['v'])).show()

In [0]:
# Use UDF with GroupBy
df.groupby("id").agg(mean_udf(df['v'])).show()

In [0]:
# Using Window Function
w = Window \
    .partitionBy('id') \
    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()