In [1]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('PySparkLearning').getOrCreate()


### 2. Create PySpark UDF

##### 2.1 Create a DataFrame

In [2]:
columns = ["Seqno","Name"]

data = [("1", "john jones"),
        ("2", "tracey smith"),
        ("3", "amy sanders")
       ]

df = spark.createDataFrame(data=data,schema=columns)

df.show(truncate=False)

+-----+------------+
|Seqno|Name        |
+-----+------------+
|1    |john jones  |
|2    |tracey smith|
|3    |amy sanders |
+-----+------------+



##### 2.2 Create a Python Function

The first step in creating a UDF is creating a Python function. Below snippet creates a function `convertCase()` which takes a string parameter and converts the first letter of every word to capital letter. UDF’s take parameters of your choice and returns a value.



In [3]:
def convertCase(str):
    resStr=""
    arr = str.split(" ")
    for x in arr:
       resStr= resStr + x[0:1].upper() + x[1:len(x)] + " "
    return resStr 

##### 2.3 Convert a Python function to PySpark UDF

Now convert this function convertCase() to UDF by passing the function to PySpark SQL `udf()`, this function is available at `org.apache.spark.sql.functions.udf package`. Make sure you import this package before using it.

PySpark SQL `udf()` function returns `org.apache.spark.sql.expressions.UserDefinedFunction` class object.

In [7]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Converting function to UDF 
convertUDF = udf(lambda z: convertCase(z), StringType())


# Note: The default type of the udf() is StringType hence, you can also write the above statement without return type.

# Converting function to UDF 
# StringType() is by default hence not required 

convertUDF = udf(lambda z: convertCase(z)) 

### 3. Using UDF with DataFrame

##### 3.1 Using UDF with PySpark DataFrame select()
Now you can use `convertUDF()` on a DataFrame column as a regular build-in function.

In [9]:
from pyspark.sql.functions import col

df.select(col("Seqno"),convertUDF(col("Name")).alias("Name") ) \
   .show(truncate=False)

+-----+-------------+
|Seqno|Name         |
+-----+-------------+
|1    |John Jones   |
|2    |Tracey Smith |
|3    |Amy Sanders  |
+-----+-------------+



##### 3.2 Using UDF with PySpark DataFrame `withColumn()`
You could also use udf on DataFrame `withColumn() `function, to explain this I will create another `upperCase()` function which converts the input string to upper case.

In [10]:
def upperCase(str):
    return str.upper()


Let’s convert `upperCase()` python function to UDF and then use it with DataFrame `withColumn()`.
Below example converts the values of “Name” column to upper case and creates a new column “Curated Name”

In [12]:
upperCaseUDF = udf(lambda z:upperCase(z), StringType())   

df.withColumn("Curated Name", upperCaseUDF(col("Name"))) \
  .show(truncate=False)

+-----+------------+------------+
|Seqno|Name        |Curated Name|
+-----+------------+------------+
|1    |john jones  |JOHN JONES  |
|2    |tracey smith|TRACEY SMITH|
|3    |amy sanders |AMY SANDERS |
+-----+------------+------------+



##### 3.3 Registering PySpark UDF & use it on SQL

In order to use upperCase() function on PySpark SQL, you need to register the function with PySpark by using `spark.udf.register()`.



In [15]:
spark.udf.register("upperCaseUDF", upperCase, StringType())

df.createOrReplaceTempView("NAME_TABLE")

spark.sql("select Seqno, upperCaseUDF(Name) as Name from NAME_TABLE") \
     .show(truncate=False)

+-----+------------+
|Seqno|Name        |
+-----+------------+
|1    |JOHN JONES  |
|2    |TRACEY SMITH|
|3    |AMY SANDERS |
+-----+------------+



### 4. Creating UDF using annotation

In the previous sections, you have learned creating a UDF is a 2 step process, first, you need to create a Python function, second convert function to UDF using SQL `udf()` function, however, you can avoid these two steps and create it with just a single step by using annotations.



In [16]:
@udf(returnType=StringType()) 
def upperCase(str):
    return str.upper()

df.withColumn("Curated Name", upperCase(col("Name"))) \
.show(truncate=False)

+-----+------------+------------+
|Seqno|Name        |Curated Name|
+-----+------------+------------+
|1    |john jones  |JOHN JONES  |
|2    |tracey smith|TRACEY SMITH|
|3    |amy sanders |AMY SANDERS |
+-----+------------+------------+



### 5. Special Handling

##### 5.1 Execution order
One thing to aware is in PySpark/Spark does not guarantee the order of evaluation of subexpressions meaning expressions are not guarantee to evaluated left-to-right or in any other fixed order. PySpark reorders the execution for query optimization and planning hence - AND, OR, WHERE and HAVING expression will have side effects.

So when you are designing and using UDF, you have to be very careful especially with null handling as these results runtime exceptions.

In [18]:
""" 
No guarantee here that Name is not null condition(expression) will execute first
If convertUDF(Name) like '%John%' execute first then 
you will get runtime error
"""

spark.sql("select Seqno, convertUDF(Name) as Name from NAME_TABLE " + 
          "where Name is not null and convertUDF(Name) like '%John%'") \
     .show(truncate=False)  

+-----+-----------+
|Seqno|Name       |
+-----+-----------+
|1    |John Jones |
+-----+-----------+



##### 5.2 Handling null check

UDF’s are error-prone when not designed carefully. for example, when you have a column that contains the value null on some records

In [20]:
""" null check """

columns = ["Seqno","Name"]

data = [("1", "john jones"),
        ("2", "tracey smith"),
        ("3", "amy sanders"),
        ('4',None)
       ]

df2 = spark.createDataFrame(data=data,schema=columns)
df2.show(truncate=False)
df2.createOrReplaceTempView("NAME_TABLE2")

spark.sql("select convertUDF(Name) from NAME_TABLE2") \
     .show(truncate=False)


+-----+------------+
|Seqno|Name        |
+-----+------------+
|1    |john jones  |
|2    |tracey smith|
|3    |amy sanders |
|4    |null        |
+-----+------------+



PythonException: 
  An exception was thrown from Python worker in the executor. The below is the Python worker stacktrace.
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main
    process()
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process
    serializer.dump_stream(out_iter, outfile)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 223, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream
    for obj in iterator:
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 212, in _batched
    for item in iterator:
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in mapper
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in <genexpr>
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 90, in <lambda>
    return lambda *a: f(*a)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-3-6cc992af9cb0>", line 3, in convertCase
AttributeError: 'NoneType' object has no attribute 'split'


Note that from the above snippet, record with “Seqno 4” has value “None” for “name” column. Since we are not handling null with UDF function, using this on DataFrame returns above error.

**Below points to remember**

- Its always best practice to check for null inside a UDF function rather than checking for null outside.
- In any case, if you can’t do a null check in UDF at lease use IF or CASE WHEN to check for null and call UDF conditionally.

In [23]:
spark.udf.register("_nullsafeUDF", lambda str: convertCase(str) if not str is None else "" , StringType())

spark.sql("select _nullsafeUDF(Name) as Name from NAME_TABLE2") \
     .show(truncate=False)

spark.sql("select Seqno, _nullsafeUDF(Name) as Name from NAME_TABLE2 " + \
          " where Name is not null and _nullsafeUDF(Name) like '%John%'") \
     .show(truncate=False)  

+-------------+
|Name         |
+-------------+
|John Jones   |
|Tracey Smith |
|Amy Sanders  |
|             |
+-------------+

+-----+-----------+
|Seqno|Name       |
+-----+-----------+
|1    |John Jones |
+-----+-----------+



#### 5.3 Performance concern using UDF

UDF’s are a black box to PySpark hence it can’t apply optimization and you will lose all the optimization PySpark does on Dataframe/Dataset. When possible you should use [Spark SQL built-in functions](https://sparkbyexamples.com/spark/spark-sql-functions/)  as these functions provide optimization. Consider creating UDF only when existing built-in SQL function doesn’t have it.