# Pyspark UDF examples

## Setup

In [1]:
import pandas as pd
import numpy as np
import os
from pyspark import SparkConf #, SparkContext 
from pyspark.sql import SparkSession #, SQLContext https://spark.apache.org/docs/1.6.1/sql-programming-guide.html
from pyspark.sql import functions as F # access to the sql functions https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions
from pyspark.sql import types as T
from IPython.display import HTML

In [4]:
spark = SparkSession.builder.appName('SparkByExamples.com') \
    .config("spark.driver.maxResultSize", 0) \
    .getOrCreate()

In [5]:
# spark.stop()

## Example data creation

In [6]:
data2 = [('James','','Smith','1991-04-01','M',3000),
  ('Michael','Rose','','2000-05-19','M',4000),
  ('Robert','','Williams','1978-09-05','M',4000),
  ('Maria','Anne','Jones','1967-12-01','F',4000),
  ('Jen','Mary','Brown','1980-02-17','F',-1)
]

columns2 = ["firstname","middlename","lastname","dob","gender","salary"]
df2 = spark.createDataFrame(data=data2, schema = columns2)
df2.show(truncate=False)

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|dob       |gender|salary|
+---------+----------+--------+----------+------+------+
|James    |          |Smith   |1991-04-01|M     |3000  |
|Michael  |Rose      |        |2000-05-19|M     |4000  |
|Robert   |          |Williams|1978-09-05|M     |4000  |
|Maria    |Anne      |Jones   |1967-12-01|F     |4000  |
|Jen      |Mary      |Brown   |1980-02-17|F     |-1    |
+---------+----------+--------+----------+------+------+



In [7]:
columns = ["Seqno","Name"]
data = [("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders")]

df = spark.createDataFrame(data=data,schema=columns)

df.show(truncate=False)

+-----+------------+
|Seqno|Name        |
+-----+------------+
|1    |john jones  |
|2    |tracey smith|
|3    |amy sanders |
+-----+------------+



## Function creation

You will notice that I have created four functions where two of them do identical work but have different names. I have used the [decarator pattern](https://datanoon.com/blog/pyspark_udf/) to create the spark user defined functions without an additional call of `F.udf()`.

In [8]:
# Python functions defined
def convertCase(str):
    resStr=""
    arr = str.split(" ")
    for x in arr:
       resStr= resStr + x[0:1].upper() + x[1:len(x)] + " "
    return resStr 

def upperCase(str):
    return str.upper()

# Now we need to convert them
convertCaseUDF2 = F.udf(lambda z: convertCase(z),T.StringType())
upperCaseUDF2 = F.udf(lambda z:upperCase(z),T.StringType())   


# Spark UDF functions directly without creating the python environment functions
@F.udf(returnType=T.StringType()) 
def convertCaseUDF(str):
    resStr=""
    arr = str.split(" ")
    for x in arr:
       resStr= resStr + x[0:1].upper() + x[1:len(x)] + " "
    return resStr 

@F.udf(returnType=T.StringType()) 
def upperCaseUDF(str):
    return str.upper()


Notice that the functions each look a bit different on print. The first two functons are spark UDFs and the last function acts as we would expect in Python.

In [9]:
print(convertCaseUDF("wow we are good"))
print(convertCaseUDF2("wow we are good"))
print(convertCase("wow we are good"))

Column<b'convertCaseUDF(wow we are good)'>
Column<b'<lambda>(wow we are good)'>
Wow We Are Good 


Now if we try each of functions on a Spark `DataFrame` we can see the results. As expected, the first two UDFs returned results and the final non-UDF function wouldn't work with the Spark `DataFrame`.

In [10]:
df.select(F.col('Seqno'),
        convertCaseUDF(F.col('Name')).alias('Name')) \
    .limit(2) \
    .show(truncate=False) 

+-----+-------------+
|Seqno|Name         |
+-----+-------------+
|1    |John Jones   |
|2    |Tracey Smith |
+-----+-------------+



In [11]:
df.select(F.col('Seqno'),
        convertCaseUDF2(F.col('Name')).alias('Name')) \
    .limit(2) \
    .show(truncate=False) 



+-----+-------------+
|Seqno|Name         |
+-----+-------------+
|1    |John Jones   |
|2    |Tracey Smith |
+-----+-------------+



In [12]:
df.select(F.col('Seqno'),
    convertCase(F.col('Name')).alias('Name')) \
    .show(truncate=False)

TypeError: 'Column' object is not callable

## Using `.withColumn()`

The `.WithColumn()` method acts much like the `assign()` method in pandas or the `mutate()` function in dplyr. We can change the current columns or add addition columns as the next few examples highlight.

In [13]:
print(df2.printSchema())
print(df2.withColumn("salary",F.col("salary").cast("Integer")).printSchema())

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)

None
root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)

None


In [14]:
df2.withColumn("salary",F.col("salary")*100).show()

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|1991-04-01|     M|300000|
|  Michael|      Rose|        |2000-05-19|     M|400000|
|   Robert|          |Williams|1978-09-05|     M|400000|
|    Maria|      Anne|   Jones|1967-12-01|     F|400000|
|      Jen|      Mary|   Brown|1980-02-17|     F|  -100|
+---------+----------+--------+----------+------+------+



In [15]:
df2.withColumn("CopiedColumn",F.col("salary")* -1).show()

+---------+----------+--------+----------+------+------+------------+
|firstname|middlename|lastname|       dob|gender|salary|CopiedColumn|
+---------+----------+--------+----------+------+------+------------+
|    James|          |   Smith|1991-04-01|     M|  3000|       -3000|
|  Michael|      Rose|        |2000-05-19|     M|  4000|       -4000|
|   Robert|          |Williams|1978-09-05|     M|  4000|       -4000|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|       -4000|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|           1|
+---------+----------+--------+----------+------+------+------------+



In [16]:
df2.withColumn("Country", F.lit("USA")) \
   .withColumn("anotherColumn",F.lit("anotherValue")).show()

+---------+----------+--------+----------+------+------+-------+-------------+
|firstname|middlename|lastname|       dob|gender|salary|Country|anotherColumn|
+---------+----------+--------+----------+------+------+-------+-------------+
|    James|          |   Smith|1991-04-01|     M|  3000|    USA| anotherValue|
|  Michael|      Rose|        |2000-05-19|     M|  4000|    USA| anotherValue|
|   Robert|          |Williams|1978-09-05|     M|  4000|    USA| anotherValue|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|    USA| anotherValue|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|    USA| anotherValue|
+---------+----------+--------+----------+------+------+-------+-------------+



To rename columns we can use `.withColumnRenamed()`. If we want to drop the column the method is `.drop()`.

In [17]:
df2.withColumnRenamed("gender","sex") \
  .show(truncate=False) 

+---------+----------+--------+----------+---+------+
|firstname|middlename|lastname|dob       |sex|salary|
+---------+----------+--------+----------+---+------+
|James    |          |Smith   |1991-04-01|M  |3000  |
|Michael  |Rose      |        |2000-05-19|M  |4000  |
|Robert   |          |Williams|1978-09-05|M  |4000  |
|Maria    |Anne      |Jones   |1967-12-01|F  |4000  |
|Jen      |Mary      |Brown   |1980-02-17|F  |-1    |
+---------+----------+--------+----------+---+------+



In [18]:
df2.drop("middlename") \
.show(truncate=False) 

+---------+--------+----------+------+------+
|firstname|lastname|dob       |gender|salary|
+---------+--------+----------+------+------+
|James    |Smith   |1991-04-01|M     |3000  |
|Michael  |        |2000-05-19|M     |4000  |
|Robert   |Williams|1978-09-05|M     |4000  |
|Maria    |Jones   |1967-12-01|F     |4000  |
|Jen      |Brown   |1980-02-17|F     |-1    |
+---------+--------+----------+------+------+



## UDF with `withColumn()`

Now that we understand the `.withColumn()` method.  Let's see how we leverage our UDFs.

In [19]:
df.withColumn("Cureated Name", upperCaseUDF(F.col("Name"))) \
  .show(truncate=False)

+-----+------------+-------------+
|Seqno|Name        |Cureated Name|
+-----+------------+-------------+
|1    |john jones  |JOHN JONES   |
|2    |tracey smith|TRACEY SMITH |
|3    |amy sanders |AMY SANDERS  |
+-----+------------+-------------+



When we want to leverage Spark SQL we will need to include an additional step to register the UDF with Spark.

- Notice that we have to specify the output type when we create the UDFs in a two step process as shown above.
- However, if we use the decarator method then we don't hve to specify the output type.

In [20]:
spark.udf.register("convertUDF2", convertCase, T.StringType()) # specify again
spark.udf.register("convertUDF", convertCaseUDF) # no need to specify

<function __main__.convertCaseUDF(str)>

In [21]:
df.createOrReplaceTempView("NAME_TABLE")
spark.sql("select Seqno, convertUDF(Name) as Name from NAME_TABLE") \
     .show(truncate=False)

+-----+-------------+
|Seqno|Name         |
+-----+-------------+
|1    |John Jones   |
|2    |Tracey Smith |
|3    |Amy Sanders  |
+-----+-------------+



In [22]:
spark.sql("select Seqno, convertUDF2(Name) as Name from NAME_TABLE") \
     .show(truncate=False)

+-----+-------------+
|Seqno|Name         |
+-----+-------------+
|1    |John Jones   |
|2    |Tracey Smith |
|3    |Amy Sanders  |
+-----+-------------+



## Pandas UDFs

To enable data scientists to leverage the value of big data, Spark added a Python API in version 0.7, with support for user-defined functions. These user-defined functions operate one-row-at-a-time, and thus suffer from high serialization and invocation overhead. As a result, many data pipelines define UDFs in Java and Scala and then invoke them from Python. __Pandas UDFs built on top of Apache Arrow bring you the best of both worlds—the ability to define low-overhead, high-performance UDFs entirely in Python.__

Spark has implemented `pandas_udf` using [Arrow](https://arrow.apache.org/).  Some may have heard of the Feather format which is part of Arrow. Arrow is fast! You can find an overview [here](https://arrow.apache.org/overview/). Enabling these settings also speed up the `.toPandas()` method.

- `spark.conf.set("spark.sql.execution.arrow.enabled", "true")`
- `spark.conf.set("spark.sql.execution.arrow.fallback.enabled", "true")`

_What about the “Feather” file format?_

The Feather v1 format was a simplified custom container for writing a subset of the Arrow format to disk prior to the development of the Arrow IPC file format. “Feather version 2” is now exactly the Arrow IPC file format and we have retained the “Feather” name and APIs for backwards compatibility.

__We will leverage these settings when we move into DataBricks.__

In [27]:
# datrand = spark.range(1 << 22).toDF("id").withColumn("x", F.rand())
# %time pdf = datrand.toPandas()
# %time pdf = datrand.toPandas()

# spark.conf.set("spark.sql.execution.arrow.enabled", "true")
# spark.conf.set("spark.sql.execution.arrow.fallback.enabled", "true")
# spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "50000")

### References

- https://machinelearninggeeks.com/pyspark-udf-user-defined-function/
- https://sparkbyexamples.com/pyspark/pyspark-withcolumn/
- https://sparkbyexamples.com/pyspark/pyspark-udf-user-defined-function/
- https://datanoon.com/blog/pyspark_udf/
- https://docs.databricks.com/spark/latest/spark-sql/udf-python.html
- https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html
- https://arrow.apache.org/blog/2017/07/26/spark-arrow/