In [1]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 46 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 44.7 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845513 sha256=3269bcb38bbc3e3db914f4509373c37a592920fda83552b88feae75ca90ae3c2
  Stored in directory: /root/.cache/pip/wheels/42/59/f5/79a5bf931714dcd201b26025347785f087370a10a3329a899c
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.1


##What is UDF?

UDF’s a.k.a User Defined Functions, If you are coming from SQL background, UDF’s are nothing new to you as most of the traditional RDBMS databases support User Defined Functions, these functions need to register in the database library and use them on SQL as regular functions.

PySpark UDF’s are similar to UDF on traditional databases. In PySpark, you create a function in a Python syntax and wrap it with PySpark SQL udf() or register it as udf and use it on DataFrame and SQL respectively.

##Why do we need a UDF?

UDF’s are used to extend the functions of the framework and re-use these functions on multiple DataFrame’s. 

For example, you wanted to convert every first letter of a word in a name string to a capital case; PySpark build-in features don’t have this function hence you can create it a UDF and reuse this as needed on many Data Frames. 

UDF’s are once created they can be re-used on several DataFrame’s and SQL expressions.

Note: When you creating UDF’s you need to design them very carefully otherwise you will come across optimization & performance issues.

In [2]:
import pyspark
from pyspark.sql import SparkSession

In [8]:
spark = SparkSession.builder.appName('SparkUDF').getOrCreate()

columns = ["Seqno","Name"]
data = [("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders"),
    ("4", "Madhav Madhav"),
    ("5", "Yash Yash")]

df = spark.createDataFrame(data=data,schema=columns)

df.show(truncate=False)

+-----+-------------+
|Seqno|Name         |
+-----+-------------+
|1    |john jones   |
|2    |tracey smith |
|3    |amy sanders  |
|4    |Madhav Madhav|
|5    |Yash Yash    |
+-----+-------------+



The first step in creating a UDF is creating a Python function. 

Below snippet creates a function convertCase() which takes a string parameter and converts the first letter of every word to capital letter. 

UDF’s take parameters of your choice and returns a value.

In [9]:
def convertCase(str):
    resStr=""
    arr = str.split(" ")
    for x in arr:
       resStr= resStr + x[0:1].upper() + x[1:len(x)] + " "
    return resStr 

Convert a Python function to PySpark UDF

In [12]:
""" Converting function to UDF """
from pyspark.sql.functions import udf
from pyspark.sql.types import StructType, IntegerType, StringType

convertUDF = udf(lambda z: convertCase(z),StringType())

In [13]:
""" Converting function to UDF 
StringType() is by default hence not required """

convertUDF = udf(lambda z: convertCase(z)) 


Using UDF with DataFrame

Using UDF with PySpark DataFrame select()

In [17]:
from pyspark.sql.functions import *

df.select(col("Seqno"), \
    convertUDF(col("Name")).alias("Name") ) \
   .show(truncate=False)

+-----+--------------+
|Seqno|Name          |
+-----+--------------+
|1    |John Jones    |
|2    |Tracey Smith  |
|3    |Amy Sanders   |
|4    |Madhav Madhav |
|5    |Yash Yash     |
+-----+--------------+



Using UDF with PySpark DataFrame withColumn()

In [18]:
def upperCase(str):
    return str.upper()

Let’s convert upperCase() python function to UDF and then use it with DataFrame withColumn(). 

Below example converts the values of “Name” column to upper case and creates a new column “Curated Name”

In [20]:
upperCaseUDF = udf(lambda z:upperCase(z),StringType())   


df.withColumn("Cureated Name", upperCaseUDF(col("Name"))).show(truncate=False)


+-----+-------------+-------------+
|Seqno|Name         |Cureated Name|
+-----+-------------+-------------+
|1    |john jones   |JOHN JONES   |
|2    |tracey smith |TRACEY SMITH |
|3    |amy sanders  |AMY SANDERS  |
|4    |Madhav Madhav|MADHAV MADHAV|
|5    |Yash Yash    |YASH YASH    |
+-----+-------------+-------------+



In [22]:
""" Using UDF on SQL """

spark.udf.register("convertUDF", convertCase,StringType())

df.createOrReplaceTempView("NAME_TABLE")

spark.sql("select Seqno, convertUDF(Name) as Name from NAME_TABLE").show(truncate=False)     

+-----+--------------+
|Seqno|Name          |
+-----+--------------+
|1    |John Jones    |
|2    |Tracey Smith  |
|3    |Amy Sanders   |
|4    |Madhav Madhav |
|5    |Yash Yash     |
+-----+--------------+



In [23]:
spark.sql("select Seqno, convertUDF(Name) as Name from NAME_TABLE " + \
          "where Name is not null and convertUDF(Name) like '%John%'").show(truncate=False)

+-----+-----------+
|Seqno|Name       |
+-----+-----------+
|1    |John Jones |
+-----+-----------+



In [24]:
""" null check """

columns = ["Seqno","Name"]
data = [("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders"),
    ("6",None)]

In [25]:
df2 = spark.createDataFrame(data=data,schema=columns)
df2.show(truncate=False)
df2.createOrReplaceTempView("NAME_TABLE2")

+-----+------------+
|Seqno|Name        |
+-----+------------+
|1    |john jones  |
|2    |tracey smith|
|3    |amy sanders |
|6    |null        |
+-----+------------+



In [26]:
spark.udf.register("_nullsafeUDF", lambda str: convertCase(str) if not str is None else "" , StringType())

<function __main__.<lambda>(str)>

In [27]:
spark.sql("select _nullsafeUDF(Name) from NAME_TABLE2").show(truncate=False)

+------------------+
|_nullsafeUDF(Name)|
+------------------+
|John Jones        |
|Tracey Smith      |
|Amy Sanders       |
|                  |
+------------------+



In [32]:
spark.sql("select Seqno, _nullsafeUDF(Name) as Name from NAME_TABLE2 " + \
          " where Name is not null and _nullsafeUDF(Name) like '%Tra%'").show(truncate=False) 

+-----+-------------+
|Seqno|Name         |
+-----+-------------+
|2    |Tracey Smith |
+-----+-------------+

