# PySpark Lab2

In this lab we will see and test some more functionality of Spark.

As in the previous lab, we start the notebook by installing pyspark.

In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 34 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 54.0 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=85280310a5f58f827623188afccbf45c6eafbcdbaa517a8810699dbff37e0caf
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


## Get the dataset

In order to have a fast way to get the dataset we have prepared for this lab, we created a link to a file containing it in another google account, and written down all the necessary steps to get the file in the current path.

This file is 2007.csv, and contains information about flights during the year 2007.

Now, execute the following code cell.

In [None]:
!gdown --id "1QJ-wDWTc3oM_jbSlb5cB3HPJ7Jwy5iAH"
!unzip SparkTutorials2i3.zip
!mv Spark_Tutorial2/2006.csv Spark_Tutorial2/2007.csv Spark_Tutorial2/2008.csv .
!rm -r __MACOSX/ Spark_Tutorial3/ Spark_Tutorial2 SparkTutorials2i3.zip
!ls

Downloading...
From: https://drive.google.com/uc?id=1QJ-wDWTc3oM_jbSlb5cB3HPJ7Jwy5iAH
To: /content/SparkTutorials2i3.zip
100% 887M/887M [00:03<00:00, 245MB/s]
Archive:  SparkTutorials2i3.zip
   creating: Spark_Tutorial2/
  inflating: Spark_Tutorial2/.DS_Store  
   creating: __MACOSX/
   creating: __MACOSX/Spark_Tutorial2/
  inflating: __MACOSX/Spark_Tutorial2/._.DS_Store  
  inflating: Spark_Tutorial2/2008.csv  
  inflating: __MACOSX/Spark_Tutorial2/._2008.csv  
  inflating: Spark_Tutorial2/2007.csv  
  inflating: __MACOSX/Spark_Tutorial2/._2007.csv  
  inflating: Spark_Tutorial2/2006.csv  
  inflating: __MACOSX/Spark_Tutorial2/._2006.csv  
  inflating: Spark_Tutorial2/tutorial2.pdf  
  inflating: __MACOSX/Spark_Tutorial2/._tutorial2.pdf  
  inflating: __MACOSX/._Spark_Tutorial2  
   creating: Spark_Tutorial3/
  inflating: Spark_Tutorial3/Tutorial3.zip  
   creating: __MACOSX/Spark_Tutorial3/
  inflating: __MACOSX/Spark_Tutorial3/._Tutorial3.zip  
  inflating: Spark_Tutorial3/.DS_Store

## Map Reduce in Spark

For this lab we will be using three files. Let's load them!

In [None]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[1]").appName('testSparkSession').getOrCreate()

df2006 = spark.read.format("csv").option("header", "true").option("nullValue","NA").option("inferSchema", "true").load("2006.csv")
df2007 = spark.read.format("csv").option("header", "true").option("nullValue","NA").option("inferSchema", "true").load("2007.csv")
df2008 = spark.read.format("csv").option("header", "true").option("nullValue","NA").option("inferSchema", "true").load("2008.csv")

print ("df2006 number of partitions", df2006.rdd.getNumPartitions())
print ("df2007 number of partitions", df2007.rdd.getNumPartitions())
print ("df2008 number of partitions", df2008.rdd.getNumPartitions())

df2006 number of partitions 6
df2007 number of partitions 6
df2008 number of partitions 6


Now we loaded the three files and checked the number of partitions for each of them.

Let's check the number of elements of each dataframe too.

In [None]:
print ("df2006 number of elements", df2006.count())
print ("df2007 number of elements", df2007.count())
print ("df2008 number of elements", df2008.count())

df2006 number of elements 7141922
df2007 number of elements 7453215
df2008 number of elements 7009728


Let's now unify all data frames into one

In [None]:
df1 = df2006.union(df2007).union(df2008)

How many elements?

In [None]:
df1.count()

21604865

How many partitions?

In [None]:
df1.rdd.getNumPartitions()

18

Let's now do some filterting.

First we pick some columns, and remove the na values

In [None]:
df2 = df1.select("Year", "Month", "Origin", "Dest", "ArrDelay", "DepDelay")
df3 = df2.na.drop()

Now, as in the other lab we compute the sum of arrival and departure delays, and store it in a new column

In [None]:
from pyspark.sql.functions import expr
df4 = df3.withColumn("SumDelay", expr("ArrDelay + DepDelay"))

Again, we will use the cache functionality, to execute faster from this point

In [None]:
df4.cache()

DataFrame[Year: int, Month: int, Origin: string, Dest: string, ArrDelay: int, DepDelay: int, SumDelay: int]

Let's use grouping operations, to get for instance the averafe SumDelay for each Origin

In [None]:
from pyspark.sql.functions import avg
df5 = df4.groupBy("Origin").agg(avg("SumDelay"))
df5.show()
df5.count()

+------+-------------------+
|Origin|      avg(SumDelay)|
+------+-------------------+
|   BGM| 14.275419982316533|
|   DLG| 29.520242914979757|
|   PSE| 1.6995652173913043|
|   MSY| 16.297211527465407|
|   GEG|   8.89446637700977|
|   BUR| 12.278206798656793|
|   SNA| 10.670062286517982|
|   GRB|  20.25246703344686|
|   GTF|-1.4841212989493793|
|   IDA|   2.70422379478107|
|   GRR| 15.865221292490036|
|   EUG| 11.089682627446644|
|   PSG|  22.49418604651163|
|   MYR| 19.363025389374865|
|   PVD|  16.99863997911887|
|   GSO| 21.790378090751545|
|   ISO|  34.06303724928367|
|   OAK|  12.94959132626464|
|   COD| -6.474365750528541|
|   MSN| 20.945062622916883|
+------+-------------------+
only showing top 20 rows



315

We can also rename a column

In [None]:
df6 = df5.withColumnRenamed("avg(SumDelay)", "Average Delay")
df6.show()

+------+-------------------+
|Origin|      Average Delay|
+------+-------------------+
|   BGM| 14.275419982316533|
|   DLG| 29.520242914979757|
|   PSE| 1.6995652173913043|
|   MSY| 16.297211527465407|
|   GEG|   8.89446637700977|
|   BUR| 12.278206798656793|
|   SNA| 10.670062286517982|
|   GRB|  20.25246703344686|
|   GTF|-1.4841212989493793|
|   IDA|   2.70422379478107|
|   GRR| 15.865221292490036|
|   EUG| 11.089682627446644|
|   PSG|  22.49418604651163|
|   MYR| 19.363025389374865|
|   PVD|  16.99863997911887|
|   GSO| 21.790378090751545|
|   ISO|  34.06303724928367|
|   OAK|  12.94959132626464|
|   COD| -6.474365750528541|
|   MSN| 20.945062622916883|
+------+-------------------+
only showing top 20 rows

