#PySpark UDF (User Defined Function)


---


![pyspark_udf image](/files/pyspark_udf.jpg)


---


**PySpark UDF (a.k.a User Defined Function) is the most useful feature of Spark SQL & DataFrame that is used to extend the PySpark build in capabilities. In this article, I will explain what is UDF? why do we need it and how to create and use it on DataFrame select(), withColumn() and SQL using PySpark (Spark with Python) examples.**

**Note: UDF’s are the most expensive operations hence use them only you have no choice and when essential. In the later section of the article, I will explain why using UDF’s is an expensive operation in detail.**


---


##Table of contents

- PySpark UDF Introduction
  - What is UDF?
  - Why do we need it?
- Create PySpark UDF (User Defined Function)
  - Create a DataFrame
  - Create a Python function
  - Convert python function to UDF
- Using UDF with DataFrame
  - Using UDF with DataFrame select()
  - Using UDF with DataFrame withColumn()
  - Registring UDF & Using it on SQL query
- Create UDF using annotation
- Special handling
  - Null check
  - Performance concern

##1. PySpark UDF Introduction


--- 


###1.1 What is UDF?

**UDF’s a.k.a User Defined Functions, If you are coming from SQL background, UDF’s are nothing new to you as most of the traditional RDBMS databases support User Defined Functions, these functions need to register in the database library and use them on SQL as regular functions.**

**PySpark UDF’s are similar to UDF on traditional databases. In PySpark, you create a function in a Python syntax and wrap it with PySpark SQL udf() or register it as udf and use it on DataFrame and SQL respectively.**


---

###1.2 Why do we need a UDF?

**UDF’s are used to extend the functions of the framework and re-use these functions on multiple DataFrame’s. For example, you wanted to convert every first letter of a word in a name string to a capital case; PySpark build-in features don’t have this function hence you can create it a UDF and reuse this as needed on many Data Frames. UDF’s are once created they can be re-used on several DataFrame’s and SQL expressions.**

**Before you create any UDF, do your research to check if the similar function you wanted is already available in Spark SQL Functions. PySpark SQL provides several predefined common functions and many more new functions are added with every release. hence, It is best to check before you reinventing the wheel.**

***When you creating UDF’s you need to design them very carefully otherwise you will come across optimization & performance issues.***

##2. Create PySpark UDF

###2.1 Create a DataFrame

**Before we jump in creating a UDF, first let’s create a PySpark DataFrame.**

In [0]:
columns = ["Seqno", "Name"]

data = [
    ("1", "john james"),\
    ("2", "tracey smith"),\
    ("3", "amy sanders")
]

df = spark.createDataFrame(data=data, schema=columns)
df.printSchema()
df.show(truncate=False)

root
 |-- Seqno: string (nullable = true)
 |-- Name: string (nullable = true)

+-----+------------+
|Seqno|Name        |
+-----+------------+
|1    |john james  |
|2    |tracey smith|
|3    |amy sanders |
+-----+------------+



###2.2 Create a Python Function


---

**The first step in creating a UDF is creating a Python function. Below snippet creates a function convertCase() which takes a string parameter and converts the first letter of every word to capital letter. UDF’s take parameters of your choice and returns a value.**

In [0]:
def convertCase(name_str):
    resStr = ""
    arr = name_str.split(" ")
    for x in arr:
        resStr = resStr + x[0:1].upper() + x[1:len(x)] + " "
    return resStr

convertCase("ajinkya kulkarni")

Out[15]: 'Ajinkya Kulkarni '

**The first step in creating a UDF is creating a Python function. Below snippet creates a function convertCase() which takes a string parameter and converts the first letter of every word to capital letter. UDF’s take parameters of your choice and returns a value.**

##2.3 Convert a Python function to PySpark UDF


**Now convert this function convertCase() to UDF by passing the function to PySpark SQL udf(), this function is available at org.apache.spark.sql.functions.udf package. Make sure you import this package before using it.**

**PySpark SQL udf() function returns org.apache.spark.sql.expressions.UserDefinedFunction class object.**

In [0]:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

#Converting function to UDF
convertUDF = udf(lambda z: convertCase(z), StringType())

**Note: The default type of the udf() is StringType hence, you can also write the above statement without return type.**

In [0]:
#Converting function to UDF
#StringType() is by default hence not required

convertUDF = udf(lambda z : convertCase(z))

##3. Using UDF with DataFrame

---

###3.1 Using UDF with PySpark DataFrame select()


---


**Now you can use convertUDF() on a DataFrame column as a regular build-in function.**

In [0]:
df.select(col("Seqno"),\
convertUDF(col("name")).alias("Name"))\
.show(truncate=False)

+-----+-------------+
|Seqno|Name         |
+-----+-------------+
|1    |John James   |
|2    |Tracey Smith |
|3    |Amy Sanders  |
+-----+-------------+



###3.2 Using UDF with PySpark DataFrame withColumn()


---


**You could also use udf on DataFrame withColumn() function, to explain this I will create another upperCase() function which converts the input string to upper case.**

In [0]:
def upperCase(name_str):
    return name_str.upper()

**Let’s convert upperCase() python function to UDF and then use it with DataFrame withColumn(). Below example converts the values of “Name” column to upper case and creates a new column “Curated Name”**

In [0]:
upperCaseUDF = udf(lambda z:upperCase(z), StringType())

df.withColumn("Curated Name", upperCaseUDF(col("Name")))\
.show(truncate=False)

+-----+------------+------------+
|Seqno|Name        |Curated Name|
+-----+------------+------------+
|1    |john james  |JOHN JAMES  |
|2    |tracey smith|TRACEY SMITH|
|3    |amy sanders |AMY SANDERS |
+-----+------------+------------+



###3.3 Registering PySpark UDF & use it on SQL


---


**In order to use convertCase() function on PySpark SQL, you need to register the function with PySpark by using spark.udf.register().**

In [0]:
""" Using UDF on SQL """

spark.udf.register("convertUDF", convertCase, StringType())

df.createOrReplaceTempView("NAME_TABLE")
spark.sql("select Seqno, convertUDF(Name) as Name from NAME_TABLE")\
.show(truncate=False)

+-----+-------------+
|Seqno|Name         |
+-----+-------------+
|1    |John James   |
|2    |Tracey Smith |
|3    |Amy Sanders  |
+-----+-------------+



##4. Creating UDF using annotation


---


**In the previous sections, you have learned creating a UDF is a 2 step process, first, you need to create a Python function, second convert function to UDF using SQL udf() function, however, you can avoid these two steps and create it with just a single step by using annotations.**

In [0]:
@udf(returnType=StringType())
def upperCase(name_str):
    return name_str.upper()

df.withColumn("Curated Name", upperCase(col("Name")))\
.show(truncate=False)

+-----+------------+------------+
|Seqno|Name        |Curated Name|
+-----+------------+------------+
|1    |john james  |JOHN JAMES  |
|2    |tracey smith|TRACEY SMITH|
|3    |amy sanders |AMY SANDERS |
+-----+------------+------------+



##5. Special Handling

---


###5.1 Execution order

**One thing to aware is in PySpark/Spark does not guarantee the order of evaluation of subexpressions meaning expressions are not guarantee to evaluated left-to-right or in any other fixed order. PySpark reorders the execution for query optimization and planning hence, AND, OR, WHERE and HAVING expression will have side effects.**

**So when you are designing and using UDF, you have to be very careful especially with null handling as these results runtime exceptions.**

In [0]:
"""
No guarantee Name is not null will execute first
If convertUDF(Name) like '%John%' execute first then 
you will get runtime error
"""

spark.sql("select Seqno, convertUDF(Name) as Name from NAME_TABLE " +\
         "where Name is not null and convertUDF(Name) like '%John%'")\
.show(truncate=False)

+-----+-----------+
|Seqno|Name       |
+-----+-----------+
|1    |John James |
+-----+-----------+



###5.2 Handling null check

---


**UDF’s are error-prone when not designed carefully. for example, when you have a column that contains the value null on some records**

In [0]:
""" null check """

columns = ["Seqno", "Name"]

data = [
    ("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders"),
    ('4',None)
]

df2 = spark.createDataFrame(data=data, schema=columns)
df2.show(truncate=False)
df2.createOrReplaceTempView("NAME_TABLE2")

spark.sql("select convertUDF(Name) from NAME_TABLE2")\
.show(truncate=False)

+-----+------------+
|Seqno|Name        |
+-----+------------+
|1    |john jones  |
|2    |tracey smith|
|3    |amy sanders |
|4    |null        |
+-----+------------+



[0;31m---------------------------------------------------------------------------[0m
[0;31mPythonException[0m                           Traceback (most recent call last)
File [0;32m<command-2656410894484160>:16[0m
[1;32m     13[0m df2[38;5;241m.[39mshow(truncate[38;5;241m=[39m[38;5;28;01mFalse[39;00m)
[1;32m     14[0m df2[38;5;241m.[39mcreateOrReplaceTempView([38;5;124m"[39m[38;5;124mNAME_TABLE2[39m[38;5;124m"[39m)
[0;32m---> 16[0m spark[38;5;241m.[39msql([38;5;124m"[39m[38;5;124mselect convertUDF(Name) from NAME_TABLE2[39m[38;5;124m"[39m)\
[1;32m     17[0m [38;5;241m.[39mshow(truncate[38;5;241m=[39m[38;5;28;01mFalse[39;00m)

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:48[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()
[1;32m     47[0m [38;5;28;01mtry[39;00m:
[0;32m---> 48[0m     res [38;5;241m=[39m [43mfu

**Note that from the above snippet, record with “Seqno 4” has value “None” for “name” column. Since we are not handling null with UDF function, using this on DataFrame returns below error. Note that in Python None is considered null.**


---


####Below points to remember

- Its always best practice to check for null inside a UDF function rather than checking for null outside.
- In any case, if you can’t do a null check in UDF at lease use IF or CASE WHEN to check for null and call UDF conditionally.

In [0]:
spark.udf.register("_nullsafeUDF", lambda name_str: convertCase(name_str) if not name_str is None else "", StringType())

spark.sql("select _nullsafeUDF(Name) from NAME_TABLE2")\
.show(truncate=False)

spark.sql(" select Seqno, _nullsafeUDF(Name) as Name from NAME_TABLE2" + \
         " where Name is not null and _nullsafeUDF(Name) like '%John%' ")\
.show(truncate=False)

+------------------+
|_nullsafeUDF(Name)|
+------------------+
|John Jones        |
|Tracey Smith      |
|Amy Sanders       |
|                  |
+------------------+

+-----+-----------+
|Seqno|Name       |
+-----+-----------+
|1    |John Jones |
+-----+-----------+



**This executes successfully without errors as we are checking for null/none while registering UDF.**

###5.3 Performance concern using UDF


---


**UDFs are a black box to PySpark hence it can’t apply optimization and you will lose all the optimization PySpark does on Dataframe/Dataset. When possible you should use Spark SQL built-in functions as these functions provide optimization. Consider creating UDF only when the existing built-in SQL function doesn’t have it.**

In [0]:
def myfun(nm_str):
    print(not nm_str is None)
    if not nm_str is None :
        return nm_str.upper()
    else:
        return ''

In [0]:
myfun(None)


False
Out[53]: ''

In [0]:
myfun('ak')

True
Out[55]: 'AK'