# https://changhsinlee.com/pyspark-udf/
# How to Turn Python Functions into PySpark Functions (UDF)

Here’s the problem: I have a Python function that iterates over my data, but going through each row in the dataframe takes several days. If I have a computing cluster with many nodes, how can I distribute this Python function in PySpark to speed up this process — maybe cut the total time down to less than a few hours — with the least amount of work?

In other words, how do I turn a Python function into a Spark user defined function, or _UDF_? I’ll explain my solution here.

# Registering a UDF

PySpark UDFs work in a similar way as the pandas ```.map()``` and ```.apply()``` methods for pandas series and dataframes. If I have a function that can use values from a row in the dataframe as input, then I can map it to the entire dataframe. The only difference is that with PySpark UDFs I have to specify the output data type.

As an example, I will create a PySpark dataframe from a pandas dataframe.

In [1]:
import pandas as pd
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("example-from-chinese-dude").getOrCreate()

In [2]:
# Example Data:
df_pd = pd.DataFrame(data={'integers': [1, 2, 3],
                           'floats': [-1.0, 0.5, 2.7],
                           'integer_arrays': [[1, 2], [3, 4, 5], [6, 7, 8, 9]]})

df = spark.createDataFrame(df_pd)
df.printSchema()
print("\n")
df.show()

root
 |-- integers: long (nullable = true)
 |-- floats: double (nullable = true)
 |-- integer_arrays: array (nullable = true)
 |    |-- element: long (containsNull = true)



+--------+------+--------------+
|integers|floats|integer_arrays|
+--------+------+--------------+
|       1|  -1.0|        [1, 2]|
|       2|   0.5|     [3, 4, 5]|
|       3|   2.7|  [6, 7, 8, 9]|
+--------+------+--------------+



# Primitive type outputs

Let's say I have a python function ```square()``` that squares a number, and I want to register this functions as a Spark UDF.

In [3]:
def square(value):
    return value**2

As long as the python function's output has a corresponding data type in Spark, then I can turn it into a UDF.  When registering UDFs, I have to specify the data type using the types from ```pyspark.sql.types```.  All the types supported by PySpark can be found [here](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=types#module-pyspark.sql.types).

**Here's a small gotcha**---because Spark UDF does not convert integers to floats, unlike the Python function which works for both integers and floats, a Spark UDF will return a column of NULLS if the input data type does not match the output data type, as in the following example:

### Registering UDF with integer type output.

In [4]:
# Integer type output.
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf

In [5]:
square_udf_int = udf(f=lambda z: square(z), returnType=IntegerType())

In [6]:
df.select("integers",
          "floats",
          square_udf_int("integers").alias("int_squared"),
          square_udf_int("floats").alias("float_squared")).show()

+--------+------+-----------+-------------+
|integers|floats|int_squared|float_squared|
+--------+------+-----------+-------------+
|       1|  -1.0|          1|         null|
|       2|   0.5|          4|         null|
|       3|   2.7|          9|         null|
+--------+------+-----------+-------------+



### Registering UDF with float type output.

In [7]:
# Float type output.
from pyspark.sql.types import FloatType
from pyspark.sql.functions import udf

In [8]:
square_udf_float = udf(f=lambda z: square(z), returnType=FloatType())

In [9]:
df.select("integers",
          "floats",
          square_udf_float("integers").alias("int_squared"),
          square_udf_float("floats").alias("float_squared")).show()

+--------+------+-----------+-------------+
|integers|floats|int_squared|float_squared|
+--------+------+-----------+-------------+
|       1|  -1.0|       null|          1.0|
|       2|   0.5|       null|         0.25|
|       3|   2.7|       null|         7.29|
+--------+------+-----------+-------------+



### Specifying the float type output in the Python function

Specifying the data type in the Pytyhon function output is probably the safer way.  Because I _(the Chinese guy)_ usually load data into Spark from from Hive tables whose schemas were made by others, specifying the return data type means the UDF should still work as intended even if the Hive schema has changed.

In [10]:
# Forcing the output to be float
def square_float(x):
    return float(x**2)


square_udf_float2 = udf(f=lambda z: square_float(z), returnType=FloatType())

In [11]:
df.select("integers",
          "floats",
          square_udf_float2("integers").alias("int_squared"),
          square_udf_float2("floats").alias("float_squared")).show()

+--------+------+-----------+-------------+
|integers|floats|int_squared|float_squared|
+--------+------+-----------+-------------+
|       1|  -1.0|        1.0|          1.0|
|       2|   0.5|        4.0|         0.25|
|       3|   2.7|        9.0|         7.29|
+--------+------+-----------+-------------+



### Composite type outputs

If the output of the Python function is a liast, then the values in the list have to be of the same type, which is specified within ```ArrayType()``` when registering the UDF.

In [12]:
from pyspark.sql.types import ArrayType

In [13]:
def square_list(some_iterable):
    return [float(val)**2 for val in some_iterable]


square_list_udf = udf(f=lambda y: square_list(y), returnType=ArrayType(FloatType()))

df.select("integer_arrays", 
          square_list_udf("integer_arrays").alias("squares_of_integer_arrays")).show(truncate=False)

+--------------+-------------------------+
|integer_arrays|squares_of_integer_arrays|
+--------------+-------------------------+
|[1, 2]        |[1.0, 4.0]               |
|[3, 4, 5]     |[9.0, 16.0, 25.0]        |
|[6, 7, 8, 9]  |[36.0, 49.0, 64.0, 81.0] |
+--------------+-------------------------+



For a function that returns a tuple of mixed typed values, I can make a corresponding ```StructType()```, which is a composite type in Spark, and specify what is in the struct with ```StructField()```.  For example, if I have a function that returns the position and the letter from ```ascii_letters```.

In [14]:
import string
from pyspark.sql.types import StructType, StructField, StringType

In [15]:
def convert_ascii(number):
    return [number, string.ascii_letters[number]]


convert_ascii(1)

[1, 'b']

In [16]:
array_schema = StructType(fields=[
    StructField(name="number", dataType=IntegerType(), nullable=False),
    StructField(name="letters", dataType=StringType(), nullable=False)
])

In [17]:
spark_convert_ascii = udf(f=lambda z: convert_ascii(z), returnType=array_schema)

df_ascii = df.select("integers", spark_convert_ascii("integers").alias("ascii_mapping"))
df_ascii.show()

+--------+-------------+
|integers|ascii_mapping|
+--------+-------------+
|       1|       [1, b]|
|       2|       [2, c]|
|       3|       [3, d]|
+--------+-------------+



Notice that the schema looks like a tree, with nullable option specified as in ```StructField()```.

In [18]:
df_ascii.printSchema()

root
 |-- integers: long (nullable = true)
 |-- ascii_mapping: struct (nullable = true)
 |    |-- number: integer (nullable = false)
 |    |-- letters: string (nullable = false)



# Some UDF problems I've _(the Chinese guy)_ seen:

### Py4JJaveError

Most of the ```Py4JJaveError``` exceptions I've seen came from mismatched data types between Python and Spark, especially when the function uses a data type from a Python module like ```numpy```.  So I'd first look into that if there's an error.

For example, if the output is a ```numpy.ndarray```, then the UDF thows an exception.

In [19]:
import numpy as np

# Example data
pd_np = pd.DataFrame(data={"int_arrays": [[1, 2, 3], [4, 5]]})
df_np = spark.createDataFrame(pd_np)
df_np.show()

+----------+
|int_arrays|
+----------+
| [1, 2, 3]|
|    [4, 5]|
+----------+



In [20]:
# squares with a numpy function, which returns a np.ndarray
def square_array_wrong(x):
    return np.square(x)


my_result = square_array_wrong([1, 2, 3])
print(my_result)
print(type(my_result))

[1 4 9]
<class 'numpy.ndarray'>


The function returns a ```numpy.ndarray``` whose values are also numpy objects ```numpy.int32``` instead of Python primitives.

When combined with the spark udf function, it throws a ```Py4JJavaError```.

In [21]:
# Uncomment the following code to see the Py4JJavaError that would be produced.
# spark_square_array_wrong = udf(f=square_array_wrong, returnType=ArrayType(FloatType()))

# df_np.withColumn("doubled", spark_square_array_wrong("int_arrays")).show()

The solution is to convert it back to a list whose values are Python primitives.

In [22]:
def square_array_right(x):
    return np.square(x).tolist()

spark_square_array_right = udf(f=square_array_right, returnType=ArrayType(IntegerType()))

Now the UDF will work as intended:

In [23]:
zz = df_np.withColumn("squared", spark_square_array_right("int_arrays"))
zz.show()

+----------+---------+
|int_arrays|  squared|
+----------+---------+
| [1, 2, 3]|[1, 4, 9]|
|    [4, 5]| [16, 25]|
+----------+---------+

