<a href="https://colab.research.google.com/github/Benjamindavid03/MachineLearningLab/blob/main/Spark_Example.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Prepare and Import the Data

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
!unzip "/content/drive/MyDrive/Colab Datasets/DelayedFlights.csv.zip"

Archive:  /content/drive/MyDrive/Colab Datasets/DelayedFlights.csv.zip
  inflating: DelayedFlights.csv      


# Setting up Pyspark in python

In [4]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [5]:
!wget -q https://downloads.apache.org/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz

In [6]:
!tar xf spark-3.0.3-bin-hadoop2.7.tgz

In [7]:
!pip install -q findspark

In [8]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.3-bin-hadoop2.7"
!export SPARK_HOME=/content/spark-3.0.3-bin-hadoop2.7
! echo $SPARK_HOME

/content/spark-3.0.3-bin-hadoop2.7


# Initialize Spark

In [9]:
import findspark
findspark.init()

# create a SparkSession, which is the entry point to Spark.



In [10]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [11]:
spark

# View Spark UI

In [12]:
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip
get_ipython().system_raw('./ngrok http 4050 &')
!ngrok config add-authtoken 2AyXjKIeBsO3IMCwnIGArgk0wPq_5JvTpU4uDDtEFwPh2EbAy
!curl -s http://localhost:4040/api/tunnels

--2022-06-26 09:18:17--  https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
Resolving bin.equinox.io (bin.equinox.io)... 54.161.241.46, 52.202.168.65, 18.205.222.128, ...
Connecting to bin.equinox.io (bin.equinox.io)|54.161.241.46|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 13832437 (13M) [application/octet-stream]
Saving to: ‘ngrok-stable-linux-amd64.zip’


2022-06-26 09:18:17 (38.2 MB/s) - ‘ngrok-stable-linux-amd64.zip’ saved [13832437/13832437]

Archive:  ngrok-stable-linux-amd64.zip
  inflating: ngrok                   
/bin/bash: ngrok: command not found
{"tunnels":[{"name":"command_line","uri":"/api/tunnels/command_line","public_url":"https://b8a4-104-197-125-38.ngrok.io","proto":"https","config":{"addr":"http://localhost:4050","inspect":true},"metrics":{"conns":{"count":0,"gauge":0,"rate1":0,"rate5":0,"rate15":0,"p50":0,"p90":0,"p95":0,"p99":0},"http":{"count":0,"rate1":0,"rate5":0,"rate15":0,"p50":0,"p90":0,"p95":0,"p99":0}}},{"name"

In [13]:
df = spark.read.csv("DelayedFlights.csv", header=True, inferSchema=True) #Load the CSV using spark

In [14]:
df.printSchema() #Show column details

root
 |-- _c0: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: double (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: double (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: double (nullable = true)
 |-- CRSElapsedTime: double (nullable = true)
 |-- AirTime: double (nullable = true)
 |-- ArrDelay: double (nullable = true)
 |-- DepDelay: double (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: integer (nullable = true)
 |-- TaxiIn: double (nullable = true)
 |-- TaxiOut: double (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted

In [15]:
df.show(5) # Show only 5 rows in DF

+---+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|_c0|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+---+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|  0|2008|    1|         3|        4| 2003.0|      1955| 2211.0|      2225|       

In [16]:
df.count() #Number of rows in DF

1936758

In [17]:
df.select("Year","Month","DayOfMonth","FlightNum").show(5) #Display specific columns

+----+-----+----------+---------+
|Year|Month|DayOfMonth|FlightNum|
+----+-----+----------+---------+
|2008|    1|         3|      335|
|2008|    1|         3|     3231|
|2008|    1|         3|      448|
|2008|    1|         3|     3920|
|2008|    1|         3|      378|
+----+-----+----------+---------+
only showing top 5 rows



In [18]:
df.describe().show() # Describing the columns

+-------+------------------+-------+------------------+------------------+------------------+------------------+------------------+-----------------+-----------------+-------------+------------------+-------+------------------+-----------------+------------------+-----------------+------------------+-------+-------+-----------------+-----------------+------------------+--------------------+----------------+--------------------+------------------+------------------+------------------+-------------------+------------------+
|summary|               _c0|   Year|             Month|        DayofMonth|         DayOfWeek|           DepTime|        CRSDepTime|          ArrTime|       CRSArrTime|UniqueCarrier|         FlightNum|TailNum| ActualElapsedTime|   CRSElapsedTime|           AirTime|         ArrDelay|          DepDelay| Origin|   Dest|         Distance|           TaxiIn|           TaxiOut|           Cancelled|CancellationCode|            Diverted|      CarrierDelay|      WeatherDelay|   

# Distinct values for Categorical columns

In [19]:
df.select("AirTime").distinct().show()

+-------+
|AirTime|
+-------+
|  305.0|
|  299.0|
|  496.0|
|  596.0|
|  558.0|
|  147.0|
|  170.0|
|  184.0|
|  576.0|
|  169.0|
|  160.0|
|  608.0|
|   70.0|
|   67.0|
|  311.0|
|  379.0|
|  486.0|
|    8.0|
|  571.0|
|  168.0|
+-------+
only showing top 20 rows



# Aggregate with Groupby

In [20]:
from pyspark.sql import functions as F
df.groupBy("Year").agg(F.sum("FlightNum")).show()

+----+--------------+
|Year|sum(FlightNum)|
+----+--------------+
|2008|    4230389297|
+----+--------------+



In [21]:
from pyspark.sql import functions as F
df.groupBy("Month").agg(F.sum("FlightNum")).show()

+-----+--------------+
|Month|sum(FlightNum)|
+-----+--------------+
|   12|     462827129|
|    1|     420956179|
|    6|     436885627|
|    3|     425587779|
|    5|     323301264|
|    9|     209158942|
|    4|     331227729|
|    8|     338770016|
|    7|     398696159|
|   10|     216036590|
|   11|     239755667|
|    2|     427186216|
+-----+--------------+



# Counting and Removing Null values

In [22]:
df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns]).show()

+---+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|_c0|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+---+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|  0|   0|    0|         0|        0|      0|         0|   7110|         0|       

In [23]:
df = df.fillna({'ActualElapsedTime':0, 'CRSElapsedTime':0})

In [24]:
df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns]).show()

+---+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|_c0|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+---+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|  0|   0|    0|         0|        0|      0|         0|   7110|         0|       

In [27]:
df.write.csv("/content/drive/My Drive/PySpark on Colab/preprocessed_data")

In [28]:
df.rdd.getNumPartitions()

2

In [67]:
# A JSON dataset is pointed to by path.
# The path can be either a single text file or a directory storing text files
path = "/content/drive/MyDrive/Colab Datasets/people.json"
peopleDF = spark.read.json(path)
peopleDF.count()

88

In [68]:
peopleDF.printSchema()

root
 |-- _corrupt_record: string (nullable = true)
 |-- country: string (nullable = true)
 |-- email: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- id: long (nullable = true)
 |-- last_name: string (nullable = true)
 |-- modified: string (nullable = true)
 |-- vip: boolean (nullable = true)



In [69]:
# The inferred schema can be visualized using the printSchema() method
peopleDF.printSchema()
# root
#  |-- age: long (nullable = true)
#  |-- name: string (nullable = true)

# Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")
peopleDF.show()

root
 |-- _corrupt_record: string (nullable = true)
 |-- country: string (nullable = true)
 |-- email: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- id: long (nullable = true)
 |-- last_name: string (nullable = true)
 |-- modified: string (nullable = true)
 |-- vip: boolean (nullable = true)

+--------------------+-----------+--------------------+----------+----+----------+----------+-----+
|     _corrupt_record|    country|               email|first_name|  id| last_name|  modified|  vip|
+--------------------+-----------+--------------------+----------+----+----------+----------+-----+
|[{"id":1,"first_n...|       null|                null|      null|null|      null|      null| null|
|                null|      China|awilliamson1@naro...|    Albert|   2|Williamson|2015-03-11| true|
|                null|       Peru|    mfuller2@npr.org|   Mildred|   3|    Fuller|2015-02-15| true|
|                null|    Belarus|rrobinson3@google.pl|   Russell|   4|  Robinso

In [74]:
# SQL statements can be run by using the sql methods provided by spark
teenagerNamesDF = spark.sql('SELECT id,email,first_name FROM people WHERE id BETWEEN 10 AND 25')
teenagerNamesDF.show()

+---+--------------------+----------+
| id|               email|first_name|
+---+--------------------+----------+
| 10|ralvarez9@nsw.gov.au|     Roger|
| 11|mcarpentera@so-ne...|     Maria|
| 12|ledwardsb@storify...|      Lori|
| 13|pmitchellc@ebay.c...|   Phillip|
| 14|   clopezd@hexun.com|     Craig|
| 15|mgeorgee@squaresp...|     Marie|
| 16|    jduncanf@pbs.org|      Jean|
| 17|    kbutlerg@wix.com|  Kimberly|
| 18|hramirezh@instagr...|   Heather|
| 19|jsandersi@earthli...|     Jason|
| 20|   jevansj@google.de|      Juan|
| 21|btuckerk@business...|     Billy|
| 22|fduncanl@smugmug.com|      Fred|
| 23|dpetersonm@delici...|    Daniel|
| 24|kgilbertn@guardia...|     Kelly|
| 25|   aharto@oakley.com|     Aaron|
+---+--------------------+----------+

