# Working with Spark SQL




**Originally prepared by Usman Alim ([ualim@ucalgary.ca](mailto:ualim@ucalgary.ca)) for DATA 608 W2019** 

Further Reading:

* **Spark SQL, DataFrames and Datasets Guide** [ver. 2.2.0](https://spark.apache.org/docs/2.2.0/sql-programming-guide.html)
* For a more in-depth treatment, please consult **[Mastering Apache Spark](https://jaceklaskowski.gitbooks.io/mastering-apache-spark/)**, by Jacek Laskowski.

## Introduction

- Spark SQL DataFrames are conceptually similar to pandas DataFrames. Under the hood, they are different though since they are natively implemented in Java in a distributed fashion.

- Evaluations are lazy. Inspect the evaluation plan for details.  [It means that the execution will not start until an action is triggered.]

- A loaded DataFrame _does not_ reside on the driver node. It is distributed.

- Spark DataFrames can be converted to pandas DataFrames. However, pandas Dataframes _are not distributed_ and reside on the driver node. Be aware of memory limitations. 

- DataFrames can be cached for efficiency.

- We can run SQL queries on a DataFrame, and also on files (that support them) directly to return DataFrames.

- Grouping, partitioning and bucketing operations are available.

- We can run built-in transformations on columns, or supply user defined functions (UDFs). 

## Outline

- [Basic DataFrame Operations](#basicOps)
- [Grouping](#grouping)
- [User Defined Functions](#UDFs)
- [Exercise]

## Installation process

1.  Install OpenJDK

- Spark is written in Scala and runs on the JVM (Java Virtual Machine). So, we have to install OpenJDK

- OpenJDK is a free and open-source implementation of the Java Platform
- JDK is a software development kit to develop applications in Java
- It is a software bundle which provides Java class libraries with necessary components to run Java code. JVM executes Java byte code and provides an environment for executing it. JDK is platform dependent
- Spark has some incompatibility issues with Java 11. So, let's downgrade the Java version to 8.

2.   Install findspark and pyspark python libraries
3.   Add environment variables
4.   Start PySpark session
5.   Load data into this notebook

In [1]:
# print working directory
!pwd

# List files and folders
!ls

# Check the open jdk version on colab
!ls /usr/lib/jvm/

/content
sample_data
default-java  java-1.11.0-openjdk-amd64  java-11-openjdk-amd64


In [2]:
# Run this cell if you are running PySpark on Colab
#Installing java

!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [4]:
#Downloading latest spark version. 
!wget -q https://archive.apache.org/dist/spark/spark-3.3.2/spark-3.3.2-bin-hadoop2.tgz

#Untar it
!tar xf spark-3.3.2-bin-hadoop2.tgz

In [5]:
#Install pyspark: Adds Pyspark to sys.path at runtime
!pip install -q findspark

# Install pyspark
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m22.0 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824025 sha256=c74a029d094190cbe2d4367d8f09b62129c3969b666df068bbdcd36a3ce08dba
  Stored in directory: /root/.cache/pip/wheels/6c/e3/9b/0525ce8a69478916513509d43693511463c6468db0de237c86
Successfully built pyspark
Installing collected packages: py4j, pyspa

In [6]:
# set the locations where Spark and Java are installed to let know Colab where to find it
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.2-bin-hadoop2"

In [8]:
# Create a spark session: The entry point into all functionality in Spark is the SparkSession class. 

# findspark will locate spark in the system
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

spark

The following 'in-vehicle couple recommendation' dataset consists of :

- **User-context features** like Gender, Marital-Status, Income, Education-level, general preference of user with respect to the venue etc.
- **Demographic features** like Weather, temperature, Is user driving in same direction as coupon venue etc.
- And **some General(but very useful)** features like Type of coupons, Time before coupon expires, Driving distance to the coupon venue etc.
And finally the labels/target (0-Not Accepted & 1-Accepted)

In [10]:
!wget -q https://archive.ics.uci.edu/ml/machine-learning-databases/00603/in-vehicle-coupon-recommendation.csv -P sample_data/

filepath = "sample_data/in-vehicle-coupon-recommendation.csv"
df = spark.read.format('csv').options(header='true', inferSchema='true').load(filepath)
df.show(5, truncate=False)

df.printSchema()

+---------------+---------+-------+-----------+----+---------------------+----------+------+---+-----------------+------------+------------------------+----------+---------------+----+-----+-----------+---------+--------------------+----------------+----------------+-----------------+-----------------+--------------+-------------+---+
|destination    |passanger|weather|temperature|time|coupon               |expiration|gender|age|maritalStatus    |has_children|education               |occupation|income         |car |Bar  |CoffeeHouse|CarryAway|RestaurantLessThan20|Restaurant20To50|toCoupon_GEQ5min|toCoupon_GEQ15min|toCoupon_GEQ25min|direction_same|direction_opp|Y  |
+---------------+---------+-------+-----------+----+---------------------+----------+------+---+-----------------+------------+------------------------+----------+---------------+----+-----+-----------+---------+--------------------+----------------+----------------+-----------------+-----------------+--------------+--------

## <a name="basicOps"></a>Basic DataFrame Operations

- Head and Summary Views

- Interoperating with Pandas

- Caching

- Selecting Columns and Subsampling

- Filtering and running SQL Queries.

In [11]:
## Summaries

print("Number of Partitions = " + str(df.rdd.getNumPartitions()))
print("Number of Rows = " + str(df.count()))


# This will return a specified number of Rows as a list
head = df.head(10)

# Produces a summary DataFrame. This is an expensive operation!
summary = df.describe()

print(type(head))
print(type(summary))


Number of Partitions = 1
Number of Rows = 12684
<class 'list'>
<class 'pyspark.sql.dataframe.DataFrame'>


In [None]:
# Use show to print the contents of a dataframe. For efficiency reasons,
# the output is truncated after a certain number of columns. Note that 
# show needs to send data to the driver. 

summary.show()

# Can print the head as follows:

#for r in head:
#    print(r)

+-------+-----------+---------+-------+------------------+-----+---------------+----------+------+------------------+-------------+------------------+--------------------+--------------------+----------------+--------------------+-----+-----------+---------+--------------------+----------------+----------------+------------------+-------------------+-------------------+------------------+------------------+
|summary|destination|passanger|weather|       temperature| time|         coupon|expiration|gender|               age|maritalStatus|      has_children|           education|          occupation|          income|                 car|  Bar|CoffeeHouse|CarryAway|RestaurantLessThan20|Restaurant20To50|toCoupon_GEQ5min| toCoupon_GEQ15min|  toCoupon_GEQ25min|     direction_same|     direction_opp|                 Y|
+-------+-----------+---------+-------+------------------+-----+---------------+----------+------+------------------+-------------+------------------+--------------------+-------

### Interoperating with Pandas

In [None]:
# The output is not very pretty. For pretty printing of "small" 
# DataFrames, we can covert the DataFrames to pandas. 

# **Please be aware that this will send data over to the driver.**

import pandas as pd

# Convert the head to a pandas DataFrame and display
display( pd.DataFrame( head, columns=df.columns ) )


# The toPandas() function will converty a Spark SQL DataFrame to a 
# pandas DataFrame
display(summary.toPandas())

Unnamed: 0,destination,passanger,weather,temperature,time,coupon,expiration,gender,age,maritalStatus,...,CoffeeHouse,CarryAway,RestaurantLessThan20,Restaurant20To50,toCoupon_GEQ5min,toCoupon_GEQ15min,toCoupon_GEQ25min,direction_same,direction_opp,Y
0,No Urgent Place,Alone,Sunny,55,2PM,Restaurant(<20),1d,Female,21,Unmarried partner,...,never,,4~8,1~3,1,0,0,0,1,1
1,No Urgent Place,Friend(s),Sunny,80,10AM,Coffee House,2h,Female,21,Unmarried partner,...,never,,4~8,1~3,1,0,0,0,1,0
2,No Urgent Place,Friend(s),Sunny,80,10AM,Carry out & Take away,2h,Female,21,Unmarried partner,...,never,,4~8,1~3,1,1,0,0,1,1
3,No Urgent Place,Friend(s),Sunny,80,2PM,Coffee House,2h,Female,21,Unmarried partner,...,never,,4~8,1~3,1,1,0,0,1,0
4,No Urgent Place,Friend(s),Sunny,80,2PM,Coffee House,1d,Female,21,Unmarried partner,...,never,,4~8,1~3,1,1,0,0,1,0
5,No Urgent Place,Friend(s),Sunny,80,6PM,Restaurant(<20),2h,Female,21,Unmarried partner,...,never,,4~8,1~3,1,1,0,0,1,1
6,No Urgent Place,Friend(s),Sunny,55,2PM,Carry out & Take away,1d,Female,21,Unmarried partner,...,never,,4~8,1~3,1,1,0,0,1,1
7,No Urgent Place,Kid(s),Sunny,80,10AM,Restaurant(<20),2h,Female,21,Unmarried partner,...,never,,4~8,1~3,1,1,0,0,1,1
8,No Urgent Place,Kid(s),Sunny,80,10AM,Carry out & Take away,2h,Female,21,Unmarried partner,...,never,,4~8,1~3,1,1,0,0,1,1
9,No Urgent Place,Kid(s),Sunny,80,10AM,Bar,1d,Female,21,Unmarried partner,...,never,,4~8,1~3,1,1,0,0,1,0


Unnamed: 0,summary,destination,passanger,weather,temperature,time,coupon,expiration,gender,age,...,CoffeeHouse,CarryAway,RestaurantLessThan20,Restaurant20To50,toCoupon_GEQ5min,toCoupon_GEQ15min,toCoupon_GEQ25min,direction_same,direction_opp,Y
0,count,12684,12684,12684,12684.0,12684,12684,12684,12684,12684,...,12467,12533,12554,12495,12684.0,12684.0,12684.0,12684.0,12684.0,12684.0
1,mean,,,,63.301797540208135,,,1.0,,29.887815247850035,...,,,,,1.0,0.5614947965941344,0.119126458530432,0.2147587511825922,0.7852412488174078,0.5684326710816777
2,stddev,,,,19.15448575684057,,,0.0,,7.697275065801651,...,,,,,0.0,0.4962235416149696,0.3239500256352576,0.410671068264036,0.410671068264036,0.495314356461186
3,min,Home,Alone,Rainy,30.0,10AM,Bar,1d,Female,21,...,1~3,1~3,1~3,1~3,1.0,0.0,0.0,0.0,0.0,0.0
4,max,Work,Partner,Sunny,80.0,7AM,Restaurant(<20),2h,Male,below21,...,never,never,never,never,1.0,1.0,1.0,1.0,1.0,1.0


### Caching 

In [12]:
## Caching

# Up until this point, nothing is cached. To make the DataFrame persist
# we need to cache it. Caching depends on the storage level.

df.cache()
df.show(5)

import time 



t1 = time.time()
# first count will trigger evaluation of count *and* cache
count1 = df.count()
dt1 = time.time() - t1
print("dt1: ", dt1)


t2 = time.time()
# second count operates on cached data only
count2 = df.count()
dt2 = time.time() - t2
print("dt2: ", dt2)



+---------------+---------+-------+-----------+----+--------------------+----------+------+---+-----------------+------------+--------------------+----------+---------------+----+-----+-----------+---------+--------------------+----------------+----------------+-----------------+-----------------+--------------+-------------+---+
|    destination|passanger|weather|temperature|time|              coupon|expiration|gender|age|    maritalStatus|has_children|           education|occupation|         income| car|  Bar|CoffeeHouse|CarryAway|RestaurantLessThan20|Restaurant20To50|toCoupon_GEQ5min|toCoupon_GEQ15min|toCoupon_GEQ25min|direction_same|direction_opp|  Y|
+---------------+---------+-------+-----------+----+--------------------+----------+------+---+-----------------+------------+--------------------+----------+---------------+----+-----+-----------+---------+--------------------+----------------+----------------+-----------------+-----------------+--------------+-------------+---+
|No 

In [14]:
# An operation that uses the entire DataFrame will result in full caching
display( df.describe().toPandas() )

# Again, inspect the web UI to verify that this is indeed the case.

Unnamed: 0,summary,destination,passanger,weather,temperature,time,coupon,expiration,gender,age,...,CoffeeHouse,CarryAway,RestaurantLessThan20,Restaurant20To50,toCoupon_GEQ5min,toCoupon_GEQ15min,toCoupon_GEQ25min,direction_same,direction_opp,Y
0,count,12684,12684,12684,12684.0,12684,12684,12684,12684,12684,...,12467,12533,12554,12495,12684.0,12684.0,12684.0,12684.0,12684.0,12684.0
1,mean,,,,63.301797540208135,,,1.0,,29.887815247850035,...,,,,,1.0,0.5614947965941344,0.119126458530432,0.2147587511825922,0.7852412488174078,0.5684326710816777
2,stddev,,,,19.15448575684057,,,0.0,,7.697275065801651,...,,,,,0.0,0.4962235416149696,0.3239500256352576,0.410671068264036,0.410671068264036,0.495314356461186
3,min,Home,Alone,Rainy,30.0,10AM,Bar,1d,Female,21,...,1~3,1~3,1~3,1~3,1.0,0.0,0.0,0.0,0.0,0.0
4,max,Work,Partner,Sunny,80.0,7AM,Restaurant(<20),2h,Male,below21,...,never,never,never,never,1.0,1.0,1.0,1.0,1.0,1.0


### Takeaway: Cache tables that you will be using often!

### Selecting Columns and Subsampling


In [None]:
# Column selection is fairly straightforward. 

# We can select one or more columns using the select() method to 
# return another dataframe with the selected columns.


df.select('passanger','age').show()


+---------+---+
|passanger|age|
+---------+---+
|    Alone| 21|
|Friend(s)| 21|
|Friend(s)| 21|
|Friend(s)| 21|
|Friend(s)| 21|
|Friend(s)| 21|
|Friend(s)| 21|
|   Kid(s)| 21|
|   Kid(s)| 21|
|   Kid(s)| 21|
|   Kid(s)| 21|
|   Kid(s)| 21|
|   Kid(s)| 21|
|    Alone| 21|
|    Alone| 21|
|    Alone| 21|
|    Alone| 21|
|    Alone| 21|
|    Alone| 21|
|    Alone| 21|
+---------+---+
only showing top 20 rows



In [None]:
# Use the sample() method to subsample a DataFrame. A subsampling
# fraction needs to be specified. Sampling with replacement is also 
# supported.

# This is a convenient method as we can work with a small subset of the
# data on the driver.

localDF = df.sample(0.01).toPandas()
print( len(localDF) )
localDF.head()

135


Unnamed: 0,destination,passanger,weather,temperature,time,coupon,expiration,gender,age,maritalStatus,...,CoffeeHouse,CarryAway,RestaurantLessThan20,Restaurant20To50,toCoupon_GEQ5min,toCoupon_GEQ15min,toCoupon_GEQ25min,direction_same,direction_opp,Y
0,Home,Alone,Sunny,55,6PM,Restaurant(20-50),1d,Male,46,Single,...,4~8,1~3,1~3,never,1,1,0,0,1,0
1,No Urgent Place,Partner,Sunny,80,10AM,Coffee House,2h,Female,21,Unmarried partner,...,1~3,1~3,1~3,1~3,1,0,0,0,1,1
2,Home,Alone,Sunny,55,6PM,Bar,1d,Female,21,Single,...,never,1~3,1~3,4~8,1,0,0,1,0,1
3,No Urgent Place,Partner,Sunny,80,10AM,Coffee House,2h,Male,36,Unmarried partner,...,never,4~8,gt8,less1,1,1,0,0,1,0
4,Work,Alone,Sunny,55,7AM,Bar,1d,Female,26,Single,...,1~3,1~3,gt8,1~3,1,1,1,0,1,1


### Filtering and running SQL Queries

In [18]:
# Use the filter functions with a boolean expression (as a string)
# to filter. 

# Note that there are other ways to filter (via column objects)
# but the syntax does not generalize nicely to boolean expressions 

# filtering using a string
display(df.filter("age > 40").toPandas())

Unnamed: 0,destination,passanger,weather,temperature,time,coupon,expiration,gender,age,maritalStatus,...,CoffeeHouse,CarryAway,RestaurantLessThan20,Restaurant20To50,toCoupon_GEQ5min,toCoupon_GEQ15min,toCoupon_GEQ25min,direction_same,direction_opp,Y
0,No Urgent Place,Alone,Sunny,55,2PM,Restaurant(<20),1d,Male,46,Single,...,4~8,1~3,1~3,never,1,0,0,0,1,1
1,No Urgent Place,Friend(s),Sunny,80,10AM,Coffee House,2h,Male,46,Single,...,4~8,1~3,1~3,never,1,0,0,0,1,1
2,No Urgent Place,Friend(s),Sunny,80,10AM,Bar,1d,Male,46,Single,...,4~8,1~3,1~3,never,1,0,0,0,1,0
3,No Urgent Place,Friend(s),Sunny,80,10AM,Carry out & Take away,2h,Male,46,Single,...,4~8,1~3,1~3,never,1,1,0,0,1,1
4,No Urgent Place,Friend(s),Sunny,80,2PM,Coffee House,1d,Male,46,Single,...,4~8,1~3,1~3,never,1,0,0,0,1,1
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1774,Work,Alone,Rainy,55,7AM,Carry out & Take away,1d,Male,41,Divorced,...,less1,less1,1~3,4~8,1,0,0,0,1,0
1775,Work,Alone,Snowy,30,7AM,Coffee House,1d,Male,41,Divorced,...,less1,less1,1~3,4~8,1,0,0,1,0,1
1776,Work,Alone,Snowy,30,7AM,Restaurant(<20),1d,Male,41,Divorced,...,less1,less1,1~3,4~8,1,1,0,0,1,0
1777,Work,Alone,Snowy,30,7AM,Bar,1d,Male,41,Divorced,...,less1,less1,1~3,4~8,1,1,1,0,1,0


In [32]:
# Alternatively, we can run an SQL query on a table that has been 
# registered with the SQL Context. Temporary views are session scoped and will disappear if the session that creates it terminates.

df.createOrReplaceTempView("vehicle_coupon")
spark.sql("select * from vehicle_coupon where age > 40 and maritalStatus = 'Single'").toPandas()

Unnamed: 0,destination,passanger,weather,temperature,time,coupon,expiration,gender,age,maritalStatus,...,CoffeeHouse,CarryAway,RestaurantLessThan20,Restaurant20To50,toCoupon_GEQ5min,toCoupon_GEQ15min,toCoupon_GEQ25min,direction_same,direction_opp,Y
0,No Urgent Place,Alone,Sunny,55,2PM,Restaurant(<20),1d,Male,46,Single,...,4~8,1~3,1~3,never,1,0,0,0,1,1
1,No Urgent Place,Friend(s),Sunny,80,10AM,Coffee House,2h,Male,46,Single,...,4~8,1~3,1~3,never,1,0,0,0,1,1
2,No Urgent Place,Friend(s),Sunny,80,10AM,Bar,1d,Male,46,Single,...,4~8,1~3,1~3,never,1,0,0,0,1,0
3,No Urgent Place,Friend(s),Sunny,80,10AM,Carry out & Take away,2h,Male,46,Single,...,4~8,1~3,1~3,never,1,1,0,0,1,1
4,No Urgent Place,Friend(s),Sunny,80,2PM,Coffee House,1d,Male,46,Single,...,4~8,1~3,1~3,never,1,0,0,0,1,1
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
446,Work,Alone,Rainy,55,7AM,Carry out & Take away,1d,Male,46,Single,...,1~3,4~8,4~8,4~8,1,0,0,0,1,0
447,Work,Alone,Snowy,30,7AM,Coffee House,1d,Male,46,Single,...,1~3,4~8,4~8,4~8,1,0,0,1,0,1
448,Work,Alone,Snowy,30,7AM,Restaurant(<20),1d,Male,46,Single,...,1~3,4~8,4~8,4~8,1,1,0,0,1,1
449,Work,Alone,Snowy,30,7AM,Bar,1d,Male,46,Single,...,1~3,4~8,4~8,4~8,1,1,1,0,1,1


## <a name="grouping"></a>Grouping

- We can use the DataFrame API or an SQL query to group.

- Grouping is usually followed by an aggregation.

- Grouping and aggregations are implemented as MapReduce operations.
    
    - First the partitions are aggregated in parallel based on the grouping column (Map). 
    - Then parallel aggregations are performed across partitions (Reduce). 
    - Data is shuffled between the two stages.

- By default, Spark uses 200 shuffle partitions. This can impact performance for large datasets.

- An exchange stage is introduced in the computation. You can see it in the webUI. 

- Use `sqlCtx.setConf("spark.sql.shuffle.partitions", NUM)` to adjust number of shuffle partitions. 

In [33]:
# Let's group by one column and inspect the webUI
df.groupBy("destination").avg().toPandas()


Unnamed: 0,destination,avg(temperature),avg(has_children),avg(toCoupon_GEQ5min),avg(toCoupon_GEQ15min),avg(toCoupon_GEQ25min),avg(direction_same),avg(direction_opp),avg(Y)
0,Home,61.495212,0.407476,1.0,0.446401,0.145814,0.470806,0.529194,0.506333
1,Work,59.946271,0.397914,1.0,0.642541,0.328382,0.379267,0.620733,0.502212
2,No Urgent Place,65.92233,0.425752,1.0,0.579978,0.0,0.0,1.0,0.633774


In [34]:
# Can group by more than one column
groupDF = df.groupBy("passanger", "Y").avg()
print( groupDF.rdd.getNumPartitions() )
groupDF.toPandas()




1


Unnamed: 0,passanger,Y,avg(temperature),avg(has_children),avg(toCoupon_GEQ5min),avg(toCoupon_GEQ15min),avg(toCoupon_GEQ25min),avg(direction_same),avg(direction_opp),avg(Y)
0,Friend(s),0,67.999071,0.4039,1.0,0.650882,0.0,0.0,1.0,0.0
1,Alone,0,60.138568,0.404157,1.0,0.580543,0.237009,0.305716,0.694284,0.0
2,Partner,0,61.091954,0.183908,1.0,0.498851,0.091954,0.126437,0.873563,0.0
3,Alone,1,62.986202,0.379068,1.0,0.43296,0.157251,0.384535,0.615465,1.0
4,Kid(s),1,67.5,0.976378,1.0,0.732283,0.007874,0.051181,0.948819,1.0
5,Friend(s),1,65.986042,0.358847,1.0,0.674471,0.0,0.0,1.0,1.0
6,Kid(s),0,62.279116,0.98996,1.0,0.801205,0.004016,0.048193,0.951807,0.0
7,Partner,1,64.0625,0.15,1.0,0.407813,0.0625,0.129688,0.870313,1.0


In [36]:
# Can also run grouped aggregations through SQL
#Can you determine the number of times that each group of passengers accepted the vehicle coupon?

# Write your code here, Hint: Use Spark SQL and grouping


Unnamed: 0,passanger,max(age),count(Y)
0,Alone,below21,7305
1,Friend(s),below21,3298
2,Kid(s),50plus,1006
3,Partner,below21,1075


## <a name="UDFs"></a>User Defined Functions (UDFs)

- UDFs allow us to run custom functions on Spark SQL DataFrame columns.

- They are similar to pandas DataFrame transformations.

- UDFs are internally converted by Spark to a suitable format so that they can run in parallel.

- Spark SQL has some useful built-in Python UDFs. Please see the API [here](https://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html).

- We can also write custom UDFs in Python.

- `Groupby` also supports UDFs.


### Python UDF Steps

- Define the schema of the output (optional). If not defined, Spark will infer the type of the output.
- Define a Python function that take column (one or more) entries as input, and outputs (one or more) entries (consistent with the schema).
- Register the function as a UDF.
- Use the UDF.



In [40]:
# Example: Let's compute the square of temperature as an Integer

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
from pyspark.sql.types import FloatType


dist_udf = udf(lambda x: int(x*x), IntegerType())

df.select('temperature', dist_udf('temperature').alias('temperature_new')).show()


+-----------+---------------+
|temperature|temperature_new|
+-----------+---------------+
|         55|           3025|
|         80|           6400|
|         80|           6400|
|         80|           6400|
|         80|           6400|
|         80|           6400|
|         55|           3025|
|         80|           6400|
|         80|           6400|
|         80|           6400|
|         80|           6400|
|         55|           3025|
|         55|           3025|
|         55|           3025|
|         55|           3025|
|         80|           6400|
|         55|           3025|
|         55|           3025|
|         80|           6400|
|         80|           6400|
+-----------+---------------+
only showing top 20 rows



**Exercise: **Convert income column to numerical column by taking the average of income range.

In [41]:
df.select("income").distinct().show()

+----------------+
|          income|
+----------------+
| $75000 - $87499|
| $12500 - $24999|
|Less than $12500|
| $50000 - $62499|
| $25000 - $37499|
| $37500 - $49999|
| $62500 - $74999|
| $87500 - $99999|
| $100000 or More|
+----------------+



In [48]:
## STEP 1: Create a python function

def transform_income(income_str):

    income_str = str(income_str)

    if income_str[0] == "L":
        income_str = income_str.split(" ")[2]
        avg_income = income_str[1:]
        avg_income = float(avg_income)
        return avg_income

    #Write the two other conditions

In [49]:
## STEP 2: Convert python function to UDF function

#Write your code here

In [57]:
## STEP 3: Apply the udf function

columns_to_use = ["destination", "passanger", "weather", "time", "coupon", "income"]
spark_df = df.select(*columns_to_use)
spark_df.show(5, truncate=False)


+---------------+---------+-------+----+---------------------+---------------+
|destination    |passanger|weather|time|coupon               |income         |
+---------------+---------+-------+----+---------------------+---------------+
|No Urgent Place|Alone    |Sunny  |2PM |Restaurant(<20)      |$37500 - $49999|
|No Urgent Place|Friend(s)|Sunny  |10AM|Coffee House         |$37500 - $49999|
|No Urgent Place|Friend(s)|Sunny  |10AM|Carry out & Take away|$37500 - $49999|
|No Urgent Place|Friend(s)|Sunny  |2PM |Coffee House         |$37500 - $49999|
|No Urgent Place|Friend(s)|Sunny  |2PM |Coffee House         |$37500 - $49999|
+---------------+---------+-------+----+---------------------+---------------+
only showing top 5 rows

+---------------+---------+-------+----+---------------------+---------------+------------+
|destination    |passanger|weather|time|coupon               |income         |income_float|
+---------------+---------+-------+----+---------------------+---------------+--

In [None]:
updated_spark_df.filter('income == "Less than $12500"').show(3)

+---------------+---------+-------+----+---------------+----------------+------------+
|    destination|passanger|weather|time|         coupon|          income|income_float|
+---------------+---------+-------+----+---------------+----------------+------------+
|No Urgent Place|    Alone|  Sunny| 2PM|Restaurant(<20)|Less than $12500|     12500.0|
|No Urgent Place|Friend(s)|  Sunny|10AM|   Coffee House|Less than $12500|     12500.0|
|No Urgent Place|Friend(s)|  Sunny|10AM|            Bar|Less than $12500|     12500.0|
+---------------+---------+-------+----+---------------+----------------+------------+
only showing top 3 rows



In [None]:
updated_spark_df.filter('income == "$100000 or More"').show(3)

+---------------+---------+-------+----+---------------+---------------+------------+
|    destination|passanger|weather|time|         coupon|         income|income_float|
+---------------+---------+-------+----+---------------+---------------+------------+
|No Urgent Place|    Alone|  Sunny| 2PM|Restaurant(<20)|$100000 or More|    100000.0|
|No Urgent Place|Friend(s)|  Sunny|10AM|   Coffee House|$100000 or More|    100000.0|
|No Urgent Place|Friend(s)|  Sunny|10AM|            Bar|$100000 or More|    100000.0|
+---------------+---------+-------+----+---------------+---------------+------------+
only showing top 3 rows

