In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType
spark = SparkSession.builder.appName('UDF1').getOrCreate()

In [2]:
cols = ['SeqNo', 'Name']
data = [("1", "john jones"),
       ("2", "tracey smith"),
       ("3", "amy sanders")]
df = spark.createDataFrame(data = data, schema = cols)
df.show(truncate=False)

+-----+------------+
|SeqNo|Name        |
+-----+------------+
|1    |john jones  |
|2    |tracey smith|
|3    |amy sanders |
+-----+------------+



In [6]:
def convert_case(input_str):
    resStr = ""
    arr = input_str.split(" ")
    print(arr)
    for x in arr:
        resStr = resStr + x[0:1].upper() + x[1:len(input_str)] + " "
    return resStr

In [11]:
""" Converting function to UDF 
StringType() is by default hence not required """

# convertUDF = udf(lambda x: convert_case(x), StringType())
convertUDF = udf(lambda x: convert_case(x))

# Using UDF with PySpark DataFrame select()

In [12]:
df.select(col("SeqNo"), convertUDF(col("Name")).alias("Name")) \
    .show(truncate=False)

+-----+-------------+
|SeqNo|Name         |
+-----+-------------+
|1    |John Jones   |
|2    |Tracey Smith |
|3    |Amy Sanders  |
+-----+-------------+



# Using UDF with PySpark DataFrame withColumn()

In [13]:
def upperCase(str):
    return str.upper()
upperCaseDF = udf(lambda z: upperCase(z), StringType())

In [15]:
df.withColumn("Curated Name", upperCaseDF(col("Name"))).show(truncate=False)

+-----+------------+------------+
|SeqNo|Name        |Curated Name|
+-----+------------+------------+
|1    |john jones  |JOHN JONES  |
|2    |tracey smith|TRACEY SMITH|
|3    |amy sanders |AMY SANDERS |
+-----+------------+------------+



# Registering PySpark UDF & use it on SQL

In order to use convertCase() function on PySpark SQL, you need to register the function with PySpark by using spark.udf.register().

In [18]:
spark.udf.register("convertUDF", convert_case, StringType())
df.createOrReplaceTempView('NAME_TABLE')
spark.sql("select SeqNo, convertUDF(Name) as Name from NAME_TABLE").show(truncate=False)

+-----+-------------+
|SeqNo|Name         |
+-----+-------------+
|1    |John Jones   |
|2    |Tracey Smith |
|3    |Amy Sanders  |
+-----+-------------+



# Creating UDF using annotation

In the previous sections, you have learned creating a UDF is a 2 step process, first, you need to create a Python function, second convert function to UDF using SQL udf() function, however, you can avoid these two steps and create it with just a single step by using annotations.

In [19]:
@udf(returnType=StringType())
def upper_Case(strng):
    return strng.upper()
df.withColumn("Curated Name", upper_Case(col("Name"))).show(truncate=False)

+-----+------------+------------+
|SeqNo|Name        |Curated Name|
+-----+------------+------------+
|1    |john jones  |JOHN JONES  |
|2    |tracey smith|TRACEY SMITH|
|3    |amy sanders |AMY SANDERS |
+-----+------------+------------+



<h1>Execution order</h1>
<p>One thing to be aware is PySpark/Spark does not guarantee the order of evaluation of subexpressions meaning expressions are not guarantee to evaluated left-to-right or in any other fixed order. PySpark reorders the execution for query optimization and planning hence, AND, OR, WHERE and HAVING expression will have side effects.

So when you are designing and using UDF, you have to be very careful especially with null handling as these results runtime exceptions.</p>

In [22]:
""" 
No guarantee Name is not null will execute first
If convertUDF(Name) like '%John%' execute first then 
you will get runtime error
"""
spark.sql("select Seqno, convertUDF(Name) as Name from NAME_TABLE where Name is not null and convertUDF(Name) like '%John%'") \
     .show(truncate=False)  

+-----+-----------+
|Seqno|Name       |
+-----+-----------+
|1    |John Jones |
+-----+-----------+



UDF’s are error-prone when not designed carefully. for example, when you have a column that contains the value null on some records

In [23]:
""" null check """

columns = ["Seqno","Name"]
data = [("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders"),
    ('4',None)]

df2 = spark.createDataFrame(data=data,schema=columns)
df2.show(truncate=False)
df2.createOrReplaceTempView("NAME_TABLE2")

spark.sql("select convertUDF(Name) from NAME_TABLE2") \
     .show(truncate=False)

+-----+------------+
|Seqno|Name        |
+-----+------------+
|1    |john jones  |
|2    |tracey smith|
|3    |amy sanders |
|4    |null        |
+-----+------------+



PythonException: 
  An exception was thrown from Python worker in the executor. The below is the Python worker stacktrace.
Traceback (most recent call last):
  File "/home/boom/Documents/programming/big_data/spark-3.0.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main
    process()
  File "/home/boom/Documents/programming/big_data/spark-3.0.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process
    serializer.dump_stream(out_iter, outfile)
  File "/home/boom/Documents/programming/big_data/spark-3.0.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py", line 223, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/home/boom/Documents/programming/big_data/spark-3.0.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream
    for obj in iterator:
  File "/home/boom/Documents/programming/big_data/spark-3.0.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py", line 212, in _batched
    for item in iterator:
  File "/home/boom/Documents/programming/big_data/spark-3.0.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py", line 450, in mapper
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/home/boom/Documents/programming/big_data/spark-3.0.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py", line 450, in <genexpr>
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/home/boom/Documents/programming/big_data/spark-3.0.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py", line 90, in <lambda>
    return lambda *a: f(*a)
  File "/home/boom/Documents/programming/big_data/spark-3.0.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-6-d1f61c41c9e9>", line 3, in convert_case
AttributeError: 'NoneType' object has no attribute 'split'


<ul>Below points to remember

<li>Its always best practice to check for null inside a UDF function rather than checking for null outside.</li>
<li>In any case, if you can’t do a null check in UDF at lease use IF or CASE WHEN to check for null and call UDF conditionally.</li> </ul>

In [26]:

spark.udf.register("_nullsafeUDF", lambda str: convert_case(str) if not str is None else "" , StringType())

spark.sql("select _nullsafeUDF(Name) from NAME_TABLE2") \
     .show(truncate=False)

+------------------+
|_nullsafeUDF(Name)|
+------------------+
|John Jones        |
|Tracey Smith      |
|Amy Sanders       |
|                  |
+------------------+



In [27]:
spark.sql("select Seqno, _nullsafeUDF(Name) as Name from NAME_TABLE2 " + \
          " where Name is not null and _nullsafeUDF(Name) like '%John%'") \
     .show(truncate=False)    


+-----+-----------+
|Seqno|Name       |
+-----+-----------+
|1    |John Jones |
+-----+-----------+



UDF’s are a black box to PySpark hence it can’t apply optimization and you will lose all the optimization PySpark does on Dataframe/Dataset. When possible you should use Spark SQL built-in functions as these functions provide optimization. Consider creating UDF only when existing built-in SQL function doesn’t have it.