# User Defined Functions



## Prepare environment
First, we are going to prepare the environment for running PySaprk in the Google Collab Machine

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
!python /content/drive/MyDrive/UDL/install_pyspark.py

Install JAVA 8
Collecting wget
  Downloading wget-3.2.zip (10 kB)
Building wheels for collected packages: wget
  Building wheel for wget (setup.py) ... [?25l[?25hdone
  Created wheel for wget: filename=wget-3.2-py3-none-any.whl size=9675 sha256=84181c2033e5cb1144f1e4db485c7f78fef4af0004158908c05606a053b5b277
  Stored in directory: /root/.cache/pip/wheels/a1/b6/7c/0e63e34eb06634181c63adacca38b79ff8f35c37e3c13e3c02
Successfully built wget
Installing collected packages: wget
Successfully installed wget-3.2
Obtaining last version of spark


  soup = BeautifulSoup(html_doc)
Getting version spark-3.2.1
Downloading https://downloads.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop2.7.tgz
Installing PySpark
[K     |████████████████████████████████| 281.4 MB 39 kB/s 
[K     |████████████████████████████████| 198 kB 49.8 MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
Setting environment variables for JAVA_HOME and SPARK_HOME


## Start working with Spark
Now we now and understand how Spark appeared in our lives and more or less how it works (and you know, it's amazing 🤭), we can start to work with it.
As you now, the SparkSession is the way programmers "talk" with Spark. So, we need to inicialize that.

In [3]:
from pyspark.sql import SparkSession

spark = (SparkSession
 .builder
 .appName("example")
 .getOrCreate())

## Create a DF to program the example mentioned in slides

In [4]:
df = spark.createDataFrame([("juan fernando", 20), ("valentina laverde", 31), ("teresa sánchez", 30), ("julieta ponce", 35), ("antonio garcía", 25)], ["name", "age"])
df.show()

+-----------------+---+
|             name|age|
+-----------------+---+
|    juan fernando| 20|
|valentina laverde| 31|
|   teresa sánchez| 30|
|    julieta ponce| 35|
|   antonio garcía| 25|
+-----------------+---+



Remember, what we want is to convert the first letter to capital case.
Fist, we need to create a python function, that from a given input (string) it converts the value into same string with first letter as capital case letter.

In [20]:
def convertCase(lower_string):
    result=""
    arr = lower_string.split(" ")
    for x in arr:
       result= result + x[0].upper() + x[1:len(x)] + " "
    return result[0:-1] 

Now, we convert the funciton to udf (the default type of UDF is StringType)

In [6]:
import pyspark.sql.functions as F

convertUDF = F.udf(lambda z: convertCase(z)) 

Now, we can use the convertUDF, as a function of sparkSQL, for example, in a select() or in a withColumn() call

In [7]:
df.select(convertUDF(F.col("name")).alias("name"), F.col("age") ) \
   .show(truncate=False)

+-----------------+---+
|name             |age|
+-----------------+---+
|Juan Fernando    |20 |
|Valentina Laverde|31 |
|Teresa Sánchez   |30 |
|Julieta Ponce    |35 |
|Antonio García   |25 |
+-----------------+---+



In [8]:
df.withColumn("corrected name", convertUDF(F.col("name")))\
  .show(truncate=False)

+-----------------+---+-----------------+
|name             |age|corrected name   |
+-----------------+---+-----------------+
|juan fernando    |20 |Juan Fernando    |
|valentina laverde|31 |Valentina Laverde|
|teresa sánchez   |30 |Teresa Sánchez   |
|julieta ponce    |35 |Julieta Ponce    |
|antonio garcía   |25 |Antonio García   |
+-----------------+---+-----------------+



We can also use our UDF on SQL

In [9]:
import pyspark.sql.types as T
spark.udf.register("convertUDF", convertCase, T.StringType())
df.createOrReplaceTempView("NAMES")
spark.sql("select convertUDF(name) as name, age from NAMES") \
     .show(truncate=False)

+-----------------+---+
|name             |age|
+-----------------+---+
|Juan Fernando    |20 |
|Valentina Laverde|31 |
|Teresa Sánchez   |30 |
|Julieta Ponce    |35 |
|Antonio García   |25 |
+-----------------+---+



Another way to create UDF method, is to use the annotation @udf(resturnType=\<type\>) above the method definition

In [10]:
@F.udf(returnType=T.StringType()) 
def upperCase(str):
    return str.upper()


In [11]:
df.withColumn("Upper Name", upperCase(F.col("Name"))) \
.show(truncate=False)

+-----------------+---+-----------------+
|name             |age|Upper Name       |
+-----------------+---+-----------------+
|juan fernando    |20 |JUAN FERNANDO    |
|valentina laverde|31 |VALENTINA LAVERDE|
|teresa sánchez   |30 |TERESA SÁNCHEZ   |
|julieta ponce    |35 |JULIETA PONCE    |
|antonio garcía   |25 |ANTONIO GARCÍA   |
+-----------------+---+-----------------+



## Handling null check

In [12]:
df_nulls = spark.createDataFrame([("juan fernando", 20), (None, 31), ("teresa sánchez", 30), ("julieta ponce", 35), ("antonio garcía", 25)], ["name", "age"])
df_nulls.show()

+--------------+---+
|          name|age|
+--------------+---+
| juan fernando| 20|
|          null| 31|
|teresa sánchez| 30|
| julieta ponce| 35|
|antonio garcía| 25|
+--------------+---+



In [14]:
df_nulls.createOrReplaceTempView("NAMES_NULLS")
spark.sql("select convertUDF(name) as Name from NAMES_NULLS " + \
         "where name is not null and convertUDF(name) like '%Juan%'") \
     .show(truncate=False) 
#IT COULD FAIL if the udf is executed befoure the not null check

+-------------+
|Name         |
+-------------+
|Juan Fernando|
+-------------+



To aboid this, we can filter nulls in the registration of the UDF

In [21]:
spark.udf.register("_nullsafeUDF", lambda str: convertCase(str) if not str is None else "" , T.StringType())

<function __main__.<lambda>>

In [22]:
spark.sql("select _nullsafeUDF(name) as Name from NAMES_NULLS " + \
         "where _nullsafeUDF(name) like '%Juan%'") \
     .show(truncate=False)

+-------------+
|Name         |
+-------------+
|Juan Fernando|
+-------------+



# Exercise 1:



*   Get data from the CSV: https://raw.githubusercontent.com/tidyverse/ggplot2/main/data-raw/diamonds.csv and save it in a dataframe.
*   Generate a new column, called "cut_color_id". This column, will have the first letter of the *cut* column, and the *color* column value. As an example, if the *cut* is "Premium" and the *color* is "I", the result in the new column will be "PI". Do it with a UDF.
*   Take into account, is better to use the functions of spark, if we can, because they are more optized than UDFs. Do you know how to do the same without an UDF? Do it.



# Caching and Persistence of Data


# DataFrame.cache()




In [25]:
df_to_cache = spark.range(1*10000000).toDF("id").withColumn("sqaure", F.col("id")*F.col("id"))
df_to_cache.show()

+---+------+
| id|sqaure|
+---+------+
|  0|     0|
|  1|     1|
|  2|     4|
|  3|     9|
|  4|    16|
|  5|    25|
|  6|    36|
|  7|    49|
|  8|    64|
|  9|    81|
| 10|   100|
| 11|   121|
| 12|   144|
| 13|   169|
| 14|   196|
| 15|   225|
| 16|   256|
| 17|   289|
| 18|   324|
| 19|   361|
+---+------+
only showing top 20 rows



In [26]:
#cache this data
df_to_cache.cache()

DataFrame[id: bigint, sqaure: bigint]

In [27]:
import time

startTimeQuery = time.process_time()
df_to_cache.count()
endTimeQuery = time.process_time()
endTimeQuery - startTimeQuery

0.05209608299999857

In [28]:
startTimeQuery = time.process_time()
df_to_cache.count()
endTimeQuery = time.process_time()
endTimeQuery - startTimeQuery

0.0006143160000000591

In [29]:
df_to_persist = spark.range(10001000).toDF("id2").withColumn("sqaure", F.col("id2")*F.col("id2"))

In [30]:
from pyspark.storagelevel import StorageLevel
#persist this data
df_to_persist.persist(StorageLevel.DISK_ONLY)

DataFrame[id2: bigint, sqaure: bigint]

In [31]:
startTimeQuery = time.process_time()
df_to_persist.count()
endTimeQuery = time.process_time()
endTimeQuery - startTimeQuery

0.047231854000001405

In [32]:
startTimeQuery = time.process_time()
df_to_persist.count()
endTimeQuery = time.process_time()
endTimeQuery - startTimeQuery

0.0037607709999996075

As this data is now saved on disk, after use it, we are going to erase it.

In [33]:
df_to_persist.unpersist()

DataFrame[id2: bigint, sqaure: bigint]

In [34]:
df_to_cache.unpersist()

DataFrame[id: bigint, sqaure: bigint]