# UDF

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = (
    SparkSession.builder
    .appName("example-udf")
    .getOrCreate()
)

## Create a DataFrame

In [2]:
data = [("1", "john jones"),
        ("2", "tracey smith"),
        ("3", "amy sanders")]

columns = ["Seqno","Name"]

df = spark.createDataFrame(data=data,schema=columns)

df.show(truncate=False)


+-----+------------+
|Seqno|Name        |
+-----+------------+
|1    |john jones  |
|2    |tracey smith|
|3    |amy sanders |
+-----+------------+



## Create a Python Function

The first step in creating a UDF is creating a Python function. Below snippet creates a function convertCase() which takes a string parameter and converts the first letter of every word to capital letter. UDF’s take parameters of your choice and returns a value.

In [50]:
def convertCase(str):
    resStr=""
    arr = str.split(" ")
    for x in arr:
       resStr= resStr + x[0:1].upper() + x[1:len(x)] + " "
    return resStr 


## Convert a Python function to PySpark UDF

Now convert this function convertCase() to UDF by passing the function to PySpark SQL udf(), this function is available at org.apache.spark.sql.functions.udf package. Make sure you import this package before using it.

PySpark SQL udf() function returns org.apache.spark.sql.expressions.UserDefinedFunction class object.

In [51]:
""" Converting function to UDF """
convertUDF = udf(lambda z: convertCase(z), StringType())


In [52]:
""" Converting function to UDF 
StringType() is by default hence not required """
convertUDF = udf(lambda z: convertCase(z)) 


## Using UDF with PySpark DataFrame select()

In [53]:
df.select(col("Seqno"), \
    convertUDF(col("Name")).alias("Name") ) \
   .show(truncate=False)


+-----+-------------+
|Seqno|Name         |
+-----+-------------+
|1    |John Jones   |
|2    |Tracey Smith |
|3    |Amy Sanders  |
+-----+-------------+



## Using UDF with PySpark DataFrame withColumn()

You could also use udf on DataFrame withColumn() function, to explain this I will create another upperCase() function which converts the input string to upper case.

In [54]:
def upperCase(str):
    return str.upper()


In [55]:
upperCaseUDF = udf(lambda z:upperCase(z))   

df.withColumn("Cureated Name", upperCaseUDF(col("Name"))) \
  .show(truncate=False)


+-----+------------+-------------+
|Seqno|Name        |Cureated Name|
+-----+------------+-------------+
|1    |john jones  |JOHN JONES   |
|2    |tracey smith|TRACEY SMITH |
|3    |amy sanders |AMY SANDERS  |
+-----+------------+-------------+



## Registering PySpark UDF & use it on SQL

In order to use convertCase() function on PySpark SQL, you need to register the function with PySpark by using spark.udf.register().



In [56]:
""" Using UDF on SQL """
spark.udf.register("convertUDF", convertCase)

df.createOrReplaceTempView("NAME_TABLE")

spark.sql("select Seqno, convertUDF(Name) as Name from NAME_TABLE").show(truncate=False)


+-----+-------------+
|Seqno|Name         |
+-----+-------------+
|1    |John Jones   |
|2    |Tracey Smith |
|3    |Amy Sanders  |
+-----+-------------+



## Creating UDF using annotation

In the previous sections, you have learned creating a UDF is a 2 step process, first, you need to create a Python function, second convert function to UDF using SQL udf() function, however, you can avoid these two steps and create it with just a single step by using annotations.

In [57]:
@udf(returnType=StringType()) 
def upperCase(str):
    return str.upper()

df.withColumn("Cureated Name", upperCase(col("Name"))).show(truncate=False)


+-----+------------+-------------+
|Seqno|Name        |Cureated Name|
+-----+------------+-------------+
|1    |john jones  |JOHN JONES   |
|2    |tracey smith|TRACEY SMITH |
|3    |amy sanders |AMY SANDERS  |
+-----+------------+-------------+



## Special Handling

### Execution order

One thing to aware is in PySpark/Spark does not guarantee the order of evaluation of subexpressions meaning expressions are not guarantee to evaluated left-to-right or in any other fixed order. PySpark reorders the execution for query optimization and planning hence, AND, OR, WHERE and HAVING expression will have side effects.

So when you are designing and using UDF, you have to be very careful especially with null handling as these results runtime exceptions.

In [58]:
""" 
No guarantee Name is not null will execute first.
If convertUDF(Name) like '%John%' execute first then you will get runtime error
"""
spark.sql("""
             select Seqno, convertUDF(Name) as Name 
               from NAME_TABLE
              where Name is not null 
                and convertUDF(Name) like '%John%'
          """) \
     .show(truncate=False)  


+-----+-----------+
|Seqno|Name       |
+-----+-----------+
|1    |John Jones |
+-----+-----------+



### Handling null check

UDF’s are error-prone when not designed carefully. for example, when you have a column that contains the value null on some records

In [77]:
""" null check """

data = [("1", "john jones"),
        ("2", "tracey smith"),
        ("3", "amy sanders"),
        ('4',None)]

columns = ["Seqno","Name"]

df2 = spark.createDataFrame(data=data,schema=columns)
df2.show(truncate=False)
df2.createOrReplaceTempView("NAME_TABLE2")

spark.sql("select convertUDF(Name) from NAME_TABLE2").show(truncate=False)


+-----+------------+
|Seqno|Name        |
+-----+------------+
|1    |john jones  |
|2    |tracey smith|
|3    |amy sanders |
|4    |null        |
+-----+------------+



PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/Users/rodrigolima82/opt/anaconda3/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main
    process()
  File "/Users/rodrigolima82/opt/anaconda3/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process
    serializer.dump_stream(out_iter, outfile)
  File "/Users/rodrigolima82/opt/anaconda3/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 223, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/Users/rodrigolima82/opt/anaconda3/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream
    for obj in iterator:
  File "/Users/rodrigolima82/opt/anaconda3/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 212, in _batched
    for item in iterator:
  File "/Users/rodrigolima82/opt/anaconda3/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in mapper
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/Users/rodrigolima82/opt/anaconda3/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in <genexpr>
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/Users/rodrigolima82/opt/anaconda3/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 90, in <lambda>
    return lambda *a: f(*a)
  File "/Users/rodrigolima82/opt/anaconda3/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-50-6cc992af9cb0>", line 3, in convertCase
AttributeError: 'NoneType' object has no attribute 'split'


In [60]:
spark.udf.register("_nullsafeUDF", lambda str: convertCase(str) if not str is None else "" , StringType())

spark.sql("select _nullsafeUDF(Name) from NAME_TABLE2").show(truncate=False)

spark.sql("""
             select Seqno, 
                    _nullsafeUDF(Name) as Name 
               from NAME_TABLE2
              where Name is not null 
                and _nullsafeUDF(Name) like '%John%'
          """) \
     .show(truncate=False)    


+------------------+
|_nullsafeUDF(Name)|
+------------------+
|John Jones        |
|Tracey Smith      |
|Amy Sanders       |
|                  |
+------------------+

+-----+-----------+
|Seqno|Name       |
+-----+-----------+
|1    |John Jones |
+-----+-----------+



The code block displayed below contains an error. The code block should use Python method find_most_freq_letter to find the letter present most in column itemName of DataFrame itemsDf and return it in a new column most_frequent_letter. Find the error.
>
Code block:
>
- `find_most_freq_letter_udf = udf(find_most_freq_letter)`
- `itemsDf.withColumn(“most_frequent_letter”, find_most_freq_letter(“itemName”))`

In [61]:
data = [(1, 'Thick Coat for Walking in the Snow', 'Sports Company Inc.'),
        (2, 'Elegant Outdoors Summer Dress', 'YetiX'),
        (3, 'Outdoors Backpack', 'Sports Company Inc.')]

columns = ["itemId", "itemName", "supplier"]

itemsDf = spark.createDataFrame(data=data, schema = columns)

In [62]:
def find_most_freq_letter(str):
    return str.upper()

find_most_freq_letter_udf = udf(find_most_freq_letter)   

itemsDf.withColumn('most_frequent_letter', find_most_freq_letter_udf('itemName')).show()

+------+--------------------+-------------------+--------------------+
|itemId|            itemName|           supplier|most_frequent_letter|
+------+--------------------+-------------------+--------------------+
|     1|Thick Coat for Wa...|Sports Company Inc.|THICK COAT FOR WA...|
|     2|Elegant Outdoors ...|              YetiX|ELEGANT OUTDOORS ...|
|     3|   Outdoors Backpack|Sports Company Inc.|   OUTDOORS BACKPACK|
+------+--------------------+-------------------+--------------------+



In [63]:
@udf(returnType = StringType()) 
def find_most_freq_letter(str):
    return str.upper()

itemsDf.withColumn("most_frequent_letter", find_most_freq_letter('itemName')).show()


+------+--------------------+-------------------+--------------------+
|itemId|            itemName|           supplier|most_frequent_letter|
+------+--------------------+-------------------+--------------------+
|     1|Thick Coat for Wa...|Sports Company Inc.|THICK COAT FOR WA...|
|     2|Elegant Outdoors ...|              YetiX|ELEGANT OUTDOORS ...|
|     3|   Outdoors Backpack|Sports Company Inc.|   OUTDOORS BACKPACK|
+------+--------------------+-------------------+--------------------+



The code block displayed below contains at least one error. The code block should return a DataFrame with only one column, result. That column should include all values in column value from DataFrame transactionsDf raised to the power of 5, and a null value for rows in which there is no value in column value. Find the error(s).
>
Code block:
>
- `from pyspark.sql.functions import udf`
- `from pyspark.sql import types as T`
- `transactionsDf.createOrReplaceTempView(‘transactions’)`
- `def pow_5(x):`
- `return x**5`
- `spark.udf.register(pow_5, ‘power_5_udf’, T.LongType())`
- `spark.sql(‘SELECT power_5_udf(value) FROM transactions’)`

In [64]:
data = [(1, 3, 4, 25, 1, None, 1587915332),
         (2, 6, 7, 2, 2, None, 1586815312),
         (3, 3, None, 25, 3, None, 1585824821),
         (4, None, None, 3, 2, None, 1583244275),
         (5, None, None, None, 2, None, 1575285427),
         (6, 3, 2, 25, 2, None, 1572733275)]

schema = StructType([StructField('transactionId', IntegerType(), True),
                     StructField('predError', IntegerType(), True),
                     StructField('value', IntegerType(), True),
                     StructField('storeId', IntegerType(), True),
                     StructField('productId', IntegerType(), True),
                     StructField('f', IntegerType(), True),
                     StructField('transactionDate', LongType(), True)])

transactionsDf = spark.createDataFrame(data=data, schema=schema)

In [81]:
from pyspark.sql.functions import udf
from pyspark.sql import types as T

transactionsDf.createOrReplaceTempView('transactions')

def pow_5(x):
    return x ** 5

spark.udf.register('power_5_udf', pow_5, T.LongType())

spark.sql("SELECT power_5_udf(value) FROM transactions WHERE value is not null").show(truncate=False)


+------------------+
|power_5_udf(value)|
+------------------+
|1024              |
|16807             |
|32                |
+------------------+



Which of the following code blocks applies the boolean-returning Python function evaluateTestSuccess to column storeId of DataFrame transactionsDf as a user-defined function?

In [2]:
data = [(1, 3, 4, 25, 1, None, 1587915332),
         (2, 6, 7, 2, 2, None, 1586815312),
         (3, 3, None, 25, 3, None, 1585824821),
         (4, None, None, 3, 2, None, 1583244275),
         (5, None, None, None, 2, None, 1575285427),
         (6, 3, 2, 25, 2, None, 1572733275)]

schema = StructType([StructField('transactionId', IntegerType(), True),
                     StructField('predError', IntegerType(), True),
                     StructField('value', IntegerType(), True),
                     StructField('storeId', IntegerType(), True),
                     StructField('productId', IntegerType(), True),
                     StructField('f', IntegerType(), True),
                     StructField('transactionDate', LongType(), True)])

transactionsDf = spark.createDataFrame(data=data, schema=schema)

In [15]:
from pyspark.sql import types as T 

def evaluateTestSuccess(x):
    return 1

evaluateTestSuccessUDF = udf(evaluateTestSuccess, T.BooleanType()) 
transactionsDf.withColumn("result", evaluateTestSuccessUDF(col("storeId"))).show()



+-------------+---------+-----+-------+---------+----+---------------+------+
|transactionId|predError|value|storeId|productId|   f|transactionDate|result|
+-------------+---------+-----+-------+---------+----+---------------+------+
|            1|        3|    4|     25|        1|null|     1587915332|  null|
|            2|        6|    7|      2|        2|null|     1586815312|  null|
|            3|        3| null|     25|        3|null|     1585824821|  null|
|            4|     null| null|      3|        2|null|     1583244275|  null|
|            5|     null| null|   null|        2|null|     1575285427|  null|
|            6|        3|    2|     25|        2|null|     1572733275|  null|
+-------------+---------+-----+-------+---------+----+---------------+------+



In [None]:
# NameError: name 'evaluateTestSuccess' is not defined
evaluateTestSuccessUDF = udf(evaluateTestSuccess) 
transactionsDf.withColumn("result", evaluateTestSuccessUDF(storeId))

# NameError: name 'evaluateTestSuccess' is not defined
evaluateTestSuccessUDF = udf(evaluateTestSuccess) 
transactionsDf.withColumn("result", evaluateTestSuccessUDF(col("storeId")))

# AssertionError: col should be Column
from pyspark.sql import types as T 
evaluateTestSuccessUDF = udf(evaluateTestSuccess, T.IntegerType()) 
transactionsDf.withColumn("result", evaluateTestSuccess(col("storeId")))

# AssertionError: col should be Column
from pyspark.sql import types as T 
evaluateTestSuccessUDF = udf(evaluateTestSuccess, T.BooleanType()) 
transactionsDf.withColumn("result", evaluateTestSuccess(col("storeId")))


In [8]:
df = spark.range(5).toDF("num")

def power3(value):
    return value ** 3

power3_udf = udf(power3)
df.select("num", power3_udf(col("num"))).show()

+---+-----------+
|num|power3(num)|
+---+-----------+
|  0|          0|
|  1|          1|
|  2|          8|
|  3|         27|
|  4|         64|
+---+-----------+

