<a href="https://colab.research.google.com/github/Muntasir90629/PySpark-Note/blob/main/my_work_21_10_21_Abhishek_mamidi_PySpark_Transformations_and_action.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Setting UP PYSPARK

In [None]:
# print working directory
!pwd
# List files and folders
!ls
# Check the open jdk version on colab
!ls /usr/lib/jvm/
# Download and install Java 8
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# Check if we have java 8 or not
!ls /usr/lib/jvm/
# Download Apache Spark binary: This link can change based on the version. Update this link with the latest version before using
!wget -q https://downloads.apache.org/spark/spark-3.2.0/spark-3.2.0-bin-hadoop2.7.tgz
# Unzip file
!tar -xvzf spark-3.2.0-bin-hadoop2.7.tgz
# Install findspark: Adds Pyspark to sys.path at runtime
!pip install -q findspark
# Install pyspark
!pip install pyspark
# Add environmental variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.0-bin-hadoop2.7"

# findspark will locate spark in the system
import findspark
findspark.init()

/content
sample_data
default-java  java-1.11.0-openjdk-amd64  java-11-openjdk-amd64
Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Get:3 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Ign:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:6 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:7 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Hit:8 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:9 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ Packages [69.5 kB]
Get:10 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:12 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRel

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
        .master("local") \
        .appName("Hands-on PySpark on Google Colab") \
        .getOrCreate()

In [None]:
spark

In [None]:
!wget -q https://archive.ics.uci.edu/ml/machine-learning-databases/00603/in-vehicle-coupon-recommendation.csv -P sample_data/

In [None]:
filepath="/content/sample_data/in-vehicle-coupon-recommendation.csv"

spark_df=spark.read.format('csv').options(header='true',inferSchema='true').load(filepath)

In [None]:
spark_df.show(5,truncate=False)

+---------------+---------+-------+-----------+----+---------------------+----------+------+---+-----------------+------------+------------------------+----------+---------------+----+-----+-----------+---------+--------------------+----------------+----------------+-----------------+-----------------+--------------+-------------+---+
|destination    |passanger|weather|temperature|time|coupon               |expiration|gender|age|maritalStatus    |has_children|education               |occupation|income         |car |Bar  |CoffeeHouse|CarryAway|RestaurantLessThan20|Restaurant20To50|toCoupon_GEQ5min|toCoupon_GEQ15min|toCoupon_GEQ25min|direction_same|direction_opp|Y  |
+---------------+---------+-------+-----------+----+---------------------+----------+------+---+-----------------+------------+------------------------+----------+---------------+----+-----+-----------+---------+--------------------+----------------+----------------+-----------------+-----------------+--------------+--------

# Transformations and Actions

In [None]:
from pyspark.sql import functions as f

# Actions

In [None]:
spark_df.columns

['destination',
 'passanger',
 'weather',
 'temperature',
 'time',
 'coupon',
 'expiration',
 'gender',
 'age',
 'maritalStatus',
 'has_children',
 'education',
 'occupation',
 'income',
 'car',
 'Bar',
 'CoffeeHouse',
 'CarryAway',
 'RestaurantLessThan20',
 'Restaurant20To50',
 'toCoupon_GEQ5min',
 'toCoupon_GEQ15min',
 'toCoupon_GEQ25min',
 'direction_same',
 'direction_opp',
 'Y']

**show**

In [None]:
columns_to_use = ["destination", "passanger", "weather", "temperature", "time", "coupon", "gender", "age", "has_children", "income", "Y"]

spark_df=spark_df.select(*columns_to_use)

spark_df.show(5,truncate=False)


+---------------+---------+-------+-----------+----+---------------------+------+---+------------+---------------+---+
|destination    |passanger|weather|temperature|time|coupon               |gender|age|has_children|income         |Y  |
+---------------+---------+-------+-----------+----+---------------------+------+---+------------+---------------+---+
|No Urgent Place|Alone    |Sunny  |55         |2PM |Restaurant(<20)      |Female|21 |1           |$37500 - $49999|1  |
|No Urgent Place|Friend(s)|Sunny  |80         |10AM|Coffee House         |Female|21 |1           |$37500 - $49999|0  |
|No Urgent Place|Friend(s)|Sunny  |80         |10AM|Carry out & Take away|Female|21 |1           |$37500 - $49999|1  |
|No Urgent Place|Friend(s)|Sunny  |80         |2PM |Coffee House         |Female|21 |1           |$37500 - $49999|0  |
|No Urgent Place|Friend(s)|Sunny  |80         |2PM |Coffee House         |Female|21 |1           |$37500 - $49999|0  |
+---------------+---------+-------+-----------+-

**count: count the number of rows in a dataframe**

In [None]:
spark_df.count(),len(spark_df.columns)

(12684, 11)

**collect: Run action command to compute and get the result**

In [None]:
spark_df.agg(f.min("temperature"))

DataFrame[min(temperature): int]

In [None]:
spark_df.agg(f.min("temperature")).show()

+----------------+
|min(temperature)|
+----------------+
|              30|
+----------------+



In [None]:
spark_df.agg(f.min("temperature")).collect()[0][0]

30

In [None]:
spark_df.collect()[:5]

[Row(destination='No Urgent Place', passanger='Alone', weather='Sunny', temperature=55, time='2PM', coupon='Restaurant(<20)', gender='Female', age='21', has_children=1, income='$37500 - $49999', Y=1),
 Row(destination='No Urgent Place', passanger='Friend(s)', weather='Sunny', temperature=80, time='10AM', coupon='Coffee House', gender='Female', age='21', has_children=1, income='$37500 - $49999', Y=0),
 Row(destination='No Urgent Place', passanger='Friend(s)', weather='Sunny', temperature=80, time='10AM', coupon='Carry out & Take away', gender='Female', age='21', has_children=1, income='$37500 - $49999', Y=1),
 Row(destination='No Urgent Place', passanger='Friend(s)', weather='Sunny', temperature=80, time='2PM', coupon='Coffee House', gender='Female', age='21', has_children=1, income='$37500 - $49999', Y=0),
 Row(destination='No Urgent Place', passanger='Friend(s)', weather='Sunny', temperature=80, time='2PM', coupon='Coffee House', gender='Female', age='21', has_children=1, income='$375

**Transformations**

In [None]:
spark_df.show(5,truncate=False)

+---------------+---------+-------+-----------+----+---------------------+------+---+------------+---------------+---+
|destination    |passanger|weather|temperature|time|coupon               |gender|age|has_children|income         |Y  |
+---------------+---------+-------+-----------+----+---------------------+------+---+------------+---------------+---+
|No Urgent Place|Alone    |Sunny  |55         |2PM |Restaurant(<20)      |Female|21 |1           |$37500 - $49999|1  |
|No Urgent Place|Friend(s)|Sunny  |80         |10AM|Coffee House         |Female|21 |1           |$37500 - $49999|0  |
|No Urgent Place|Friend(s)|Sunny  |80         |10AM|Carry out & Take away|Female|21 |1           |$37500 - $49999|1  |
|No Urgent Place|Friend(s)|Sunny  |80         |2PM |Coffee House         |Female|21 |1           |$37500 - $49999|0  |
|No Urgent Place|Friend(s)|Sunny  |80         |2PM |Coffee House         |Female|21 |1           |$37500 - $49999|0  |
+---------------+---------+-------+-----------+-

**distinct: Get unique values in a columns**

In [None]:
spark_df.select("gender").distinct().show()

+------+
|gender|
+------+
|Female|
|  Male|
+------+



In [None]:
spark_df.select("passanger").distinct().show()

+---------+
|passanger|
+---------+
|  Partner|
|    Alone|
|Friend(s)|
|   Kid(s)|
+---------+



**WithColumn: Create new columns & withColumnRenamed**

In [None]:
updated_spark_df = spark_df.withColumn("cont", f.lit("full_data"))

In [None]:
updated_spark_df.show(3, truncate=False)

+---------------+---------+-------+-----------+----+---------------------+------+---+------------+---------------+---+---------+
|destination    |passanger|weather|temperature|time|coupon               |gender|age|has_children|income         |Y  |cont     |
+---------------+---------+-------+-----------+----+---------------------+------+---+------------+---------------+---+---------+
|No Urgent Place|Alone    |Sunny  |55         |2PM |Restaurant(<20)      |Female|21 |1           |$37500 - $49999|1  |full_data|
|No Urgent Place|Friend(s)|Sunny  |80         |10AM|Coffee House         |Female|21 |1           |$37500 - $49999|0  |full_data|
|No Urgent Place|Friend(s)|Sunny  |80         |10AM|Carry out & Take away|Female|21 |1           |$37500 - $49999|1  |full_data|
+---------------+---------+-------+-----------+----+---------------------+------+---+------------+---------------+---+---------+
only showing top 3 rows



In [None]:
updated_spark_df = updated_spark_df.withColumn("gender_mapped", f.when(f.col("gender") == "Female", 0)\
                                                                  .when(f.col("gender") == "Male", 1)\
                                                                  .otherwise(2))

In [None]:
updated_spark_df.sample(0.2).show(5, truncate=False)

+---------------+---------+-------+-----------+----+---------------------+------+---+------------+---------------+---+---------+-------------+
|destination    |passanger|weather|temperature|time|coupon               |gender|age|has_children|income         |Y  |cont     |gender_mapped|
+---------------+---------+-------+-----------+----+---------------------+------+---+------------+---------------+---+---------+-------------+
|No Urgent Place|Friend(s)|Sunny  |80         |6PM |Restaurant(<20)      |Female|21 |1           |$37500 - $49999|1  |full_data|0            |
|No Urgent Place|Kid(s)   |Sunny  |80         |10AM|Carry out & Take away|Female|21 |1           |$37500 - $49999|1  |full_data|0            |
|No Urgent Place|Friend(s)|Sunny  |80         |6PM |Restaurant(<20)      |Male  |21 |0           |$62500 - $74999|1  |full_data|1            |
|Work           |Alone    |Sunny  |55         |7AM |Bar                  |Male  |21 |0           |$62500 - $74999|1  |full_data|1            |

In [None]:
updated_spark_df = updated_spark_df.withColumnRenamed("passanger", "passenger")

In [None]:
updated_spark_df.show(5, truncate=False)

+---------------+---------+-------+-----------+----+---------------------+------+---+------------+---------------+---+---------+-------------+
|destination    |passenger|weather|temperature|time|coupon               |gender|age|has_children|income         |Y  |cont     |gender_mapped|
+---------------+---------+-------+-----------+----+---------------------+------+---+------------+---------------+---+---------+-------------+
|No Urgent Place|Alone    |Sunny  |55         |2PM |Restaurant(<20)      |Female|21 |1           |$37500 - $49999|1  |full_data|0            |
|No Urgent Place|Friend(s)|Sunny  |80         |10AM|Coffee House         |Female|21 |1           |$37500 - $49999|0  |full_data|0            |
|No Urgent Place|Friend(s)|Sunny  |80         |10AM|Carry out & Take away|Female|21 |1           |$37500 - $49999|1  |full_data|0            |
|No Urgent Place|Friend(s)|Sunny  |80         |2PM |Coffee House         |Female|21 |1           |$37500 - $49999|0  |full_data|0            |

In [None]:
updated_spark_df = updated_spark_df.withColumnRenamed("cont", "const_col")
updated_spark_df.show(5, truncate=False)

+---------------+---------+-------+-----------+----+---------------------+------+---+------------+---------------+---+---------+-------------+
|destination    |passenger|weather|temperature|time|coupon               |gender|age|has_children|income         |Y  |const_col|gender_mapped|
+---------------+---------+-------+-----------+----+---------------------+------+---+------------+---------------+---+---------+-------------+
|No Urgent Place|Alone    |Sunny  |55         |2PM |Restaurant(<20)      |Female|21 |1           |$37500 - $49999|1  |full_data|0            |
|No Urgent Place|Friend(s)|Sunny  |80         |10AM|Coffee House         |Female|21 |1           |$37500 - $49999|0  |full_data|0            |
|No Urgent Place|Friend(s)|Sunny  |80         |10AM|Carry out & Take away|Female|21 |1           |$37500 - $49999|1  |full_data|0            |
|No Urgent Place|Friend(s)|Sunny  |80         |2PM |Coffee House         |Female|21 |1           |$37500 - $49999|0  |full_data|0            |

**filter: Filter data**

In [None]:
updated_spark_df.select("temperature").distinct().show()

+-----------+
|temperature|
+-----------+
|         55|
|         80|
|         30|
+-----------+



In [None]:
temp_80_df = updated_spark_df.filter(f.col("temperature")<80)
temp_80_df.show(3)

+---------------+---------+-------+-----------+----+--------------------+------+---+------------+---------------+---+---------+-------------+
|    destination|passenger|weather|temperature|time|              coupon|gender|age|has_children|         income|  Y|const_col|gender_mapped|
+---------------+---------+-------+-----------+----+--------------------+------+---+------------+---------------+---+---------+-------------+
|No Urgent Place|    Alone|  Sunny|         55| 2PM|     Restaurant(<20)|Female| 21|           1|$37500 - $49999|  1|full_data|            0|
|No Urgent Place|Friend(s)|  Sunny|         55| 2PM|Carry out & Take ...|Female| 21|           1|$37500 - $49999|  1|full_data|            0|
|No Urgent Place|   Kid(s)|  Sunny|         55| 2PM|     Restaurant(<20)|Female| 21|           1|$37500 - $49999|  1|full_data|            0|
+---------------+---------+-------+-----------+----+--------------------+------+---+------------+---------------+---+---------+-------------+
only s

In [None]:
updated_spark_df.count(), temp_80_df.count()

(12684, 6156)

In [None]:
temp_45_90_df = updated_spark_df.filter((f.col("temperature") > 55) & (f.col("temperature") < 90))
temp_45_90_df.show(3)

+---------------+---------+-------+-----------+----+--------------------+------+---+------------+---------------+---+---------+-------------+
|    destination|passenger|weather|temperature|time|              coupon|gender|age|has_children|         income|  Y|const_col|gender_mapped|
+---------------+---------+-------+-----------+----+--------------------+------+---+------------+---------------+---+---------+-------------+
|No Urgent Place|Friend(s)|  Sunny|         80|10AM|        Coffee House|Female| 21|           1|$37500 - $49999|  0|full_data|            0|
|No Urgent Place|Friend(s)|  Sunny|         80|10AM|Carry out & Take ...|Female| 21|           1|$37500 - $49999|  1|full_data|            0|
|No Urgent Place|Friend(s)|  Sunny|         80| 2PM|        Coffee House|Female| 21|           1|$37500 - $49999|  0|full_data|            0|
+---------------+---------+-------+-----------+----+--------------------+------+---+------------+---------------+---+---------+-------------+
only s

In [None]:

updated_spark_df.count(), temp_45_90_df.count()

(12684, 6528)

**groupby and aggregate**

In [None]:
spark_df.show(5, truncate=False)

+---------------+---------+-------+-----------+----+---------------------+------+---+------------+---------------+---+
|destination    |passanger|weather|temperature|time|coupon               |gender|age|has_children|income         |Y  |
+---------------+---------+-------+-----------+----+---------------------+------+---+------------+---------------+---+
|No Urgent Place|Alone    |Sunny  |55         |2PM |Restaurant(<20)      |Female|21 |1           |$37500 - $49999|1  |
|No Urgent Place|Friend(s)|Sunny  |80         |10AM|Coffee House         |Female|21 |1           |$37500 - $49999|0  |
|No Urgent Place|Friend(s)|Sunny  |80         |10AM|Carry out & Take away|Female|21 |1           |$37500 - $49999|1  |
|No Urgent Place|Friend(s)|Sunny  |80         |2PM |Coffee House         |Female|21 |1           |$37500 - $49999|0  |
|No Urgent Place|Friend(s)|Sunny  |80         |2PM |Coffee House         |Female|21 |1           |$37500 - $49999|0  |
+---------------+---------+-------+-----------+-

In [None]:
spark_df.groupby("gender").count().show()

+------+-----+
|gender|count|
+------+-----+
|Female| 6511|
|  Male| 6173|
+------+-----+



In [None]:
spark_df.groupby("temperature").count().show()

+-----------+-----+
|temperature|count|
+-----------+-----+
|         55| 3840|
|         80| 6528|
|         30| 2316|
+-----------+-----+



In [None]:
spark_df.groupby("gender").agg(f.min("temperature"), f.max("temperature")).show()

+------+----------------+----------------+
|gender|min(temperature)|max(temperature)|
+------+----------------+----------------+
|Female|              30|              80|
|  Male|              30|              80|
+------+----------------+----------------+



In [None]:
spark_df.groupby("gender").agg(f.min("temperature").alias("min_temperature"), f.max("temperature").alias("max_temperature")).show()

+------+---------------+---------------+
|gender|min_temperature|max_temperature|
+------+---------------+---------------+
|Female|             30|             80|
|  Male|             30|             80|
+------+---------------+---------------+

