<a href="https://colab.research.google.com/github/GauravSahani1417/PySpark-Basic-Implementation/blob/main/PySpark_User_defined_functions.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Setting up the PySpark environment

# Install java 8
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Download Apache Spark binary: This link can change based on the version. Update this link with the latest version before using
!wget -q https://downloads.apache.org/spark/spark-3.0.2/spark-3.0.2-bin-hadoop2.7.tgz

# Unzip file
!tar -xf spark-3.0.2-bin-hadoop2.7.tgz

# Install findspark: Adds Pyspark to sys.path at runtime
!pip install -q findspark

# Install pyspark
!pip install pyspark

# Add environmental variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.2-bin-hadoop2.7"

# findspark will locate spark in the system
import findspark
findspark.init()

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
0% [Connecting to archive.ubuntu.com (91.189.88.142)] [1 InRelease 14.2 kB/88.70% [Connecting to archive.ubuntu.com (91.189.88.142)] [Connected to cloud.r-pro0% [1 InRelease gpgv 88.7 kB] [Waiting for headers] [Waiting for headers] [Wait                                                                               Get:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Ign:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Hit:4 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:5 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Ign:6 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Get:7 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release [697 B]
Hit:8 https://developer.download.nvidia.com/compute/ma

In [None]:
#Spark Initialization
from pyspark.sql import SparkSession

spark = SparkSession.builder \
        .master("local") \
        .appName("Hands-on PySpark on Google Colab") \
        .getOrCreate()

In [None]:
spark

In [None]:
!wget -q https://archive.ics.uci.edu/ml/machine-learning-databases/00603/in-vehicle-coupon-recommendation.csv -P sample_data/

In [None]:
# We can set header='true' and inferSchema='true' to infer the schema while reading the data

filepath = "sample_data/in-vehicle-coupon-recommendation.csv"
spark_df = spark.read.format('csv').options(header='true', inferSchema='true').load(filepath)
spark_df.show(5, truncate=False)

+---------------+---------+-------+-----------+----+---------------------+----------+------+---+-----------------+------------+------------------------+----------+---------------+----+-----+-----------+---------+--------------------+----------------+----------------+-----------------+-----------------+--------------+-------------+---+
|destination    |passanger|weather|temperature|time|coupon               |expiration|gender|age|maritalStatus    |has_children|education               |occupation|income         |car |Bar  |CoffeeHouse|CarryAway|RestaurantLessThan20|Restaurant20To50|toCoupon_GEQ5min|toCoupon_GEQ15min|toCoupon_GEQ25min|direction_same|direction_opp|Y  |
+---------------+---------+-------+-----------+----+---------------------+----------+------+---+-----------------+------------+------------------------+----------+---------------+----+-----+-----------+---------+--------------------+----------------+----------------+-----------------+-----------------+--------------+--------

Introduction to UDF: User Defined Functions

It extends the capability of Spark by defining our own functions in Python. 

*   It extends the capability of Spark by defining our own functions in Python. 
*   PySpark doesn't provide all the possible transformations. For example: you want to select middle letter in a string column. You cannot do it straight away using PySpark built-in functions. In this case, you can write a udf python function and use it to transform a PySpark DataFrame.


*   If the UDF is not properly created, the you might face performance issues. So, you have to create the function in a optimal way.
*   NOTE: My suggestion is to use udf functions, if there are no in-built Spark SQL functions to achieve that task.

Steps for using UDF:

*   Create a python function
*   Convert the above python function to UDF
*   Use this UDF function on a PySpark column for transforming data

In [None]:
from pyspark.sql import functions as F
from pyspark.sql import types as T

In [None]:
columns_to_use = ["destination", "passanger", "weather", "time", "coupon", "income"]
spark_df = spark_df.select(*columns_to_use)
spark_df.show(5, truncate=False)

+---------------+---------+-------+----+---------------------+---------------+
|destination    |passanger|weather|time|coupon               |income         |
+---------------+---------+-------+----+---------------------+---------------+
|No Urgent Place|Alone    |Sunny  |2PM |Restaurant(<20)      |$37500 - $49999|
|No Urgent Place|Friend(s)|Sunny  |10AM|Coffee House         |$37500 - $49999|
|No Urgent Place|Friend(s)|Sunny  |10AM|Carry out & Take away|$37500 - $49999|
|No Urgent Place|Friend(s)|Sunny  |2PM |Coffee House         |$37500 - $49999|
|No Urgent Place|Friend(s)|Sunny  |2PM |Coffee House         |$37500 - $49999|
+---------------+---------+-------+----+---------------------+---------------+
only showing top 5 rows



In [None]:
#Goal: Convert income column to numerical column by taking the average of income range

spark_df.select("income").distinct().show()

+----------------+
|          income|
+----------------+
| $75000 - $87499|
| $12500 - $24999|
|Less than $12500|
| $50000 - $62499|
| $25000 - $37499|
| $37500 - $49999|
| $62500 - $74999|
| $87500 - $99999|
| $100000 or More|
+----------------+



In [None]:
## STEP 1: Create a python function

def transform_income(income_str):

    income_str = str(income_str)

    if income_str[0] == "L":
        income_str = income_str.split(" ")[-1]
        avg_income = income_str[1:]
        avg_income = float(avg_income)
        return avg_income

    elif income_str[-1] == "e":
        income_str = income_str.split(" ")[0]
        avg_income = income_str[1:]
        avg_income = float(avg_income)
        return avg_income

    else:
        income_str = income_str.split(" - ")
        avg_income = (int(income_str[0][1:]) + int(income_str[1][1:]))/2
        return avg_income

In [None]:
transform_income("Less than $12500"), transform_income("$100000 or More"), transform_income("$37500 - $49999")

(12500.0, 100000.0, 43749.5)

In [None]:
## STEP 2: Convert python function to UDF function

transform_income_udf = F.udf(f=lambda row: transform_income(row), returnType=T.FloatType())

In [None]:
## STEP 3: Apply the udf function

updated_spark_df = spark_df.withColumn("income_float", transform_income_udf(F.col("income")))
updated_spark_df.sample(0.2).show(10, truncate=False)

+---------------+---------+-------+----+---------------------+---------------+------------+
|destination    |passanger|weather|time|coupon               |income         |income_float|
+---------------+---------+-------+----+---------------------+---------------+------------+
|No Urgent Place|Friend(s)|Sunny  |10AM|Carry out & Take away|$37500 - $49999|43749.5     |
|No Urgent Place|Friend(s)|Sunny  |2PM |Coffee House         |$37500 - $49999|43749.5     |
|No Urgent Place|Kid(s)   |Sunny  |2PM |Restaurant(<20)      |$37500 - $49999|43749.5     |
|No Urgent Place|Kid(s)   |Sunny  |2PM |Restaurant(<20)      |$37500 - $49999|43749.5     |
|No Urgent Place|Alone    |Sunny  |2PM |Restaurant(<20)      |$62500 - $74999|68749.5     |
|No Urgent Place|Friend(s)|Sunny  |2PM |Coffee House         |$62500 - $74999|68749.5     |
|No Urgent Place|Friend(s)|Sunny  |2PM |Coffee House         |$62500 - $74999|68749.5     |
|No Urgent Place|Friend(s)|Sunny  |6PM |Restaurant(<20)      |$62500 - $74999|68

In [None]:
updated_spark_df.filter(F.col("income") == "Less than $12500").show(3)

+---------------+---------+-------+----+---------------+----------------+------------+
|    destination|passanger|weather|time|         coupon|          income|income_float|
+---------------+---------+-------+----+---------------+----------------+------------+
|No Urgent Place|    Alone|  Sunny| 2PM|Restaurant(<20)|Less than $12500|     12500.0|
|No Urgent Place|Friend(s)|  Sunny|10AM|   Coffee House|Less than $12500|     12500.0|
|No Urgent Place|Friend(s)|  Sunny|10AM|            Bar|Less than $12500|     12500.0|
+---------------+---------+-------+----+---------------+----------------+------------+
only showing top 3 rows



In [None]:
updated_spark_df.filter(F.col("income") == "$100000 or More").show(3)

+---------------+---------+-------+----+---------------+---------------+------------+
|    destination|passanger|weather|time|         coupon|         income|income_float|
+---------------+---------+-------+----+---------------+---------------+------------+
|No Urgent Place|    Alone|  Sunny| 2PM|Restaurant(<20)|$100000 or More|    100000.0|
|No Urgent Place|Friend(s)|  Sunny|10AM|   Coffee House|$100000 or More|    100000.0|
|No Urgent Place|Friend(s)|  Sunny|10AM|            Bar|$100000 or More|    100000.0|
+---------------+---------+-------+----+---------------+---------------+------------+
only showing top 3 rows

