# CSE488
## Spark Tutorial in pySpark

In this tutorial you will learn how to use [Apache Spark](https://spark.apache.org) in local mode on a Colab enviroment.

Credits to [Tiziano Piccardi](http://piccardi.me/) for his Spark Tutorial used in the Applied Data Analysis class at EPFL.

### Setup

Let's setup Spark on your Colab environment.  Run the cell below!

In [1]:
!apt autoremove
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
0 upgraded, 0 newly installed, 0 to remove and 15 not upgraded.
Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m3.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=986d83a6eeaaefe383d1dacef148bba1f01357726325f42293b844a12d7e8536
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0
The following additional packages will be installed:
  libxtst6 openjdk-8-jre-headless
Suggested packages:
  openj

Now we authenticate a Google Drive client to download the file we will be processing in our Spark job.

**Make sure to follow the interactive instructions.**

In [2]:
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials

# Authenticate and create the PyDrive client
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

In [3]:
id='1L6pCQkldvdBoaEhRFzL0VnrggEFvqON4'
downloaded = drive.CreateFile({'id': id})
downloaded.GetContentFile('Bombing_Operations.json.gz')

id='14dyBmcTBA32uXPxDbqr0bFDIzGxMTWwl'
downloaded = drive.CreateFile({'id': id})
downloaded.GetContentFile('Aircraft_Glossary.json.gz')

If you executed the cells above, you should be able to see the files *Bombing_Operations.json.gz* and *Aircraft_Glossary.json.gz* under the "Files" tab on the left panel.

In [4]:
# Let's import the libraries we will need
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf

Let's initialize the Spark context.


In [5]:
# create the session
conf = SparkConf().set("spark.ui.port", "4050")

# create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

You can easily check the current version and get the link of the web interface. In the Spark UI, you can monitor the progress of your job and debug the performance bottlenecks (if your Colab is running with a **local runtime**).

In [6]:
spark

If you are running this Colab on the Google hosted runtime, the cell below will create a *ngrok* tunnel which will allow you to still check the Spark UI.

In [7]:
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip
get_ipython().system_raw('./ngrok http 4050 &')
!curl -s http://localhost:4040/api/tunnels | python3 -c \
    "import sys, json; print(json.load(sys.stdin)['tunnels'][0]['public_url'])"

--2023-12-02 09:51:46--  https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
Resolving bin.equinox.io (bin.equinox.io)... 54.161.241.46, 52.202.168.65, 54.237.133.81, ...
Connecting to bin.equinox.io (bin.equinox.io)|54.161.241.46|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 13921656 (13M) [application/octet-stream]
Saving to: ‘ngrok-stable-linux-amd64.zip’


2023-12-02 09:51:47 (31.8 MB/s) - ‘ngrok-stable-linux-amd64.zip’ saved [13921656/13921656]

Archive:  ngrok-stable-linux-amd64.zip
  inflating: ngrok                   
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/usr/lib/python3.10/json/__init__.py", line 293, in load
    return loads(fp.read(),
  File "/usr/lib/python3.10/json/__init__.py", line 346, in loads
    return _default_decoder.decode(s)
  File "/usr/lib/python3.10/json/decoder.py", line 337, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/usr/lib/python3.10/json/de

# Vietnam War

**Pres. Johnson**: _What do you think about this Vietnam thing? I’d like to hear you talk a little bit._

**Sen. Russell**: _Well, frankly, Mr. President, it’s the damn worse mess that I ever saw, and I don’t like to brag and I never have been right many times in my life, but I knew that we were going to get into this sort of mess when we went in there._

May 27, 1964

![banner](https://raw.githubusercontent.com/epfl-ada/2019/c17af0d3c73f11cb083717b7408fedd86245dc4d/Tutorials/04%20-%20Scaling%20Up/img/banner.jpg)

----

The Vietnam War, also known as the Second Indochina War, and in Vietnam as the Resistance War Against America or simply the American War, was a conflict that occurred in Vietnam, Laos, and Cambodia from 1 November 1955 to the fall of Saigon on 30 April 1975. It was the second of the Indochina Wars and was officially fought between North Vietnam and the government of South Vietnam.

**The dataset describes all the air force operation in during the Vietnam War.**

**Bombing_Operations** [Get the dataset here](https://drive.google.com/a/epfl.ch/file/d/1L6pCQkldvdBoaEhRFzL0VnrggEFvqON4/view?usp=sharing)

- AirCraft: _Aircraft model (example: EC-47)_
- ContryFlyingMission: _Country_
- MissionDate: _Date of the mission_
- OperationSupported: _Supported War operation_ (example: [Operation Rolling Thunder](https://en.wikipedia.org/wiki/Operation_Rolling_Thunder))
- PeriodOfDay: _Day or night_
- TakeoffLocation: _Take off airport_
- TimeOnTarget
- WeaponType
- WeaponsLoadedWeight

**Aircraft_Glossary** [Get the dataset here](https://drive.google.com/a/epfl.ch/file/d/14dyBmcTBA32uXPxDbqr0bFDIzGxMTWwl/view?usp=sharing)

- AirCraft: _Aircraft model (example: EC-47)_
- AirCraftName
- AirCraftType

**Dataset Information:**

THOR is a painstakingly cultivated database of historic aerial bombings from World War I through Vietnam. THOR has already proven useful in finding unexploded ordnance in Southeast Asia and improving Air Force combat tactics:
https://www.kaggle.com/usaf/vietnam-war-bombing-operations

Loading the Dataset

In [8]:
Bombing_operations = spark.read.json("Bombing_Operations.json.gz")

In [9]:
Aircraft_glossary = spark.read.json("Aircraft_Glossary.json.gz")

Showing the dataset

In [10]:
Bombing_operations.show()

+--------+--------------------+-----------+------------------+-----------+---------------+-------------+------------+--------------------+-------------------+
|AirCraft| ContryFlyingMission|MissionDate|OperationSupported|PeriodOfDay|TakeoffLocation|TargetCountry|TimeOnTarget|          WeaponType|WeaponsLoadedWeight|
+--------+--------------------+-----------+------------------+-----------+---------------+-------------+------------+--------------------+-------------------+
|   EC-47|UNITED STATES OF ...| 1971-06-05|              NULL|          D|   TAN SON NHUT|     CAMBODIA|      1005.0|                NULL|                  0|
|   EC-47|UNITED STATES OF ...| 1972-12-26|              NULL|          D|  NAKHON PHANOM|SOUTH VIETNAM|       530.0|                NULL|                  0|
|    RF-4|UNITED STATES OF ...| 1973-07-28|              NULL|          D|       UDORN AB|         LAOS|       730.0|                NULL|                  0|
|     A-1|UNITED STATES OF ...| 1970-02-02|   

In [11]:
Bombing_operations.take(5)

[Row(AirCraft='EC-47', ContryFlyingMission='UNITED STATES OF AMERICA', MissionDate='1971-06-05', OperationSupported=None, PeriodOfDay='D', TakeoffLocation='TAN SON NHUT', TargetCountry='CAMBODIA', TimeOnTarget=1005.0, WeaponType=None, WeaponsLoadedWeight=0),
 Row(AirCraft='EC-47', ContryFlyingMission='UNITED STATES OF AMERICA', MissionDate='1972-12-26', OperationSupported=None, PeriodOfDay='D', TakeoffLocation='NAKHON PHANOM', TargetCountry='SOUTH VIETNAM', TimeOnTarget=530.0, WeaponType=None, WeaponsLoadedWeight=0),
 Row(AirCraft='RF-4', ContryFlyingMission='UNITED STATES OF AMERICA', MissionDate='1973-07-28', OperationSupported=None, PeriodOfDay='D', TakeoffLocation='UDORN AB', TargetCountry='LAOS', TimeOnTarget=730.0, WeaponType=None, WeaponsLoadedWeight=0),
 Row(AirCraft='A-1', ContryFlyingMission='UNITED STATES OF AMERICA', MissionDate='1970-02-02', OperationSupported=None, PeriodOfDay='N', TakeoffLocation='NAKHON PHANOM', TargetCountry='LAOS', TimeOnTarget=1415.0, WeaponType='BLU

Showing the schema of JSON file

In [12]:
Bombing_operations.printSchema()

root
 |-- AirCraft: string (nullable = true)
 |-- ContryFlyingMission: string (nullable = true)
 |-- MissionDate: string (nullable = true)
 |-- OperationSupported: string (nullable = true)
 |-- PeriodOfDay: string (nullable = true)
 |-- TakeoffLocation: string (nullable = true)
 |-- TargetCountry: string (nullable = true)
 |-- TimeOnTarget: double (nullable = true)
 |-- WeaponType: string (nullable = true)
 |-- WeaponsLoadedWeight: long (nullable = true)



Executing Queries - SELECT

In [13]:
b1 = Bombing_operations.select("AirCraft","ContryFlyingMission","MissionDate")

In [14]:
b1.show()

+--------+--------------------+-----------+
|AirCraft| ContryFlyingMission|MissionDate|
+--------+--------------------+-----------+
|   EC-47|UNITED STATES OF ...| 1971-06-05|
|   EC-47|UNITED STATES OF ...| 1972-12-26|
|    RF-4|UNITED STATES OF ...| 1973-07-28|
|     A-1|UNITED STATES OF ...| 1970-02-02|
|    A-37|     VIETNAM (SOUTH)| 1970-10-08|
|     F-4|UNITED STATES OF ...| 1970-11-25|
|     A-4|UNITED STATES OF ...| 1972-03-08|
|     F-4|UNITED STATES OF ...| 1971-12-27|
|     A-7|UNITED STATES OF ...| 1972-05-24|
|   EC-47|UNITED STATES OF ...| 1972-09-12|
|   CH-53|UNITED STATES OF ...| 1974-06-13|
|   CH-53|UNITED STATES OF ...| 1974-12-19|
|     O-1|     VIETNAM (SOUTH)| 1973-10-24|
|    UH-1|     VIETNAM (SOUTH)| 1974-03-19|
|     C-7|UNITED STATES OF ...| 1970-05-08|
|     A-6|UNITED STATES OF ...| 1971-05-12|
|   EB-66|UNITED STATES OF ...| 1971-12-03|
|    T-28|                LAOS| 1971-12-19|
|     A-6|UNITED STATES OF ...| 1972-08-18|
|     A-7|UNITED STATES OF ...| 

EXECUTING QUERIES ... WHERE

In [15]:
b1.where("ContryFlyingMission=='VIETNAM (SOUTH)'").show()

+--------+-------------------+-----------+
|AirCraft|ContryFlyingMission|MissionDate|
+--------+-------------------+-----------+
|    A-37|    VIETNAM (SOUTH)| 1970-10-08|
|     O-1|    VIETNAM (SOUTH)| 1973-10-24|
|    UH-1|    VIETNAM (SOUTH)| 1974-03-19|
|   CH-47|    VIETNAM (SOUTH)| 1971-03-29|
|   AC-47|    VIETNAM (SOUTH)| 1971-09-20|
|    UH-1|    VIETNAM (SOUTH)| 1972-08-22|
|    UH-1|    VIETNAM (SOUTH)| 1973-05-13|
|   CH-47|    VIETNAM (SOUTH)| 1975-01-04|
|    A-37|    VIETNAM (SOUTH)| 1971-02-08|
|   C-119|    VIETNAM (SOUTH)| 1970-09-17|
|    A-37|    VIETNAM (SOUTH)| 1971-04-02|
|     A-1|    VIETNAM (SOUTH)| 1972-04-29|
|     A-1|    VIETNAM (SOUTH)| 1972-08-02|
|    UH-1|    VIETNAM (SOUTH)| 1973-10-24|
|    UH-1|    VIETNAM (SOUTH)| 1974-12-16|
|     A-1|    VIETNAM (SOUTH)| 1970-04-27|
|    UH-1|    VIETNAM (SOUTH)| 1975-02-10|
|    A-37|    VIETNAM (SOUTH)| 1971-03-03|
|     O-1|    VIETNAM (SOUTH)| 1970-08-29|
|     A-1|    VIETNAM (SOUTH)| 1971-07-29|
+--------+-

In [16]:
b1.where("ContryFlyingMission=='VIETNAM (SOUTH)' OR ContryFlyingMission=='LAOS'").show()

+--------+-------------------+-----------+
|AirCraft|ContryFlyingMission|MissionDate|
+--------+-------------------+-----------+
|    A-37|    VIETNAM (SOUTH)| 1970-10-08|
|     O-1|    VIETNAM (SOUTH)| 1973-10-24|
|    UH-1|    VIETNAM (SOUTH)| 1974-03-19|
|    T-28|               LAOS| 1971-12-19|
|   CH-47|    VIETNAM (SOUTH)| 1971-03-29|
|   AC-47|    VIETNAM (SOUTH)| 1971-09-20|
|    UH-1|    VIETNAM (SOUTH)| 1972-08-22|
|    UH-1|    VIETNAM (SOUTH)| 1973-05-13|
|   CH-47|    VIETNAM (SOUTH)| 1975-01-04|
|    A-37|    VIETNAM (SOUTH)| 1971-02-08|
|    T-28|               LAOS| 1972-05-07|
|   C-119|    VIETNAM (SOUTH)| 1970-09-17|
|    A-37|    VIETNAM (SOUTH)| 1971-04-02|
|     A-1|    VIETNAM (SOUTH)| 1972-04-29|
|     A-1|    VIETNAM (SOUTH)| 1972-08-02|
|    UH-1|    VIETNAM (SOUTH)| 1973-10-24|
|    UH-1|    VIETNAM (SOUTH)| 1974-12-16|
|     A-1|    VIETNAM (SOUTH)| 1970-04-27|
|    UH-1|    VIETNAM (SOUTH)| 1975-02-10|
|    A-37|    VIETNAM (SOUTH)| 1971-03-03|
+--------+-

EXECUTING QUERIES ... IN FORM OF SQL

In [17]:
Bombing_operations.createOrReplaceTempView("Bombing_operations")

query = """
        SELECT AirCraft,ContryFlyingMission,MissionDate
        FROM Bombing_operations
        WHERE ContryFlyingMission=='VIETNAM (SOUTH)' OR ContryFlyingMission=='LAOS'
        """

result = spark.sql(query)
result.show()

+--------+-------------------+-----------+
|AirCraft|ContryFlyingMission|MissionDate|
+--------+-------------------+-----------+
|    A-37|    VIETNAM (SOUTH)| 1970-10-08|
|     O-1|    VIETNAM (SOUTH)| 1973-10-24|
|    UH-1|    VIETNAM (SOUTH)| 1974-03-19|
|    T-28|               LAOS| 1971-12-19|
|   CH-47|    VIETNAM (SOUTH)| 1971-03-29|
|   AC-47|    VIETNAM (SOUTH)| 1971-09-20|
|    UH-1|    VIETNAM (SOUTH)| 1972-08-22|
|    UH-1|    VIETNAM (SOUTH)| 1973-05-13|
|   CH-47|    VIETNAM (SOUTH)| 1975-01-04|
|    A-37|    VIETNAM (SOUTH)| 1971-02-08|
|    T-28|               LAOS| 1972-05-07|
|   C-119|    VIETNAM (SOUTH)| 1970-09-17|
|    A-37|    VIETNAM (SOUTH)| 1971-04-02|
|     A-1|    VIETNAM (SOUTH)| 1972-04-29|
|     A-1|    VIETNAM (SOUTH)| 1972-08-02|
|    UH-1|    VIETNAM (SOUTH)| 1973-10-24|
|    UH-1|    VIETNAM (SOUTH)| 1974-12-16|
|     A-1|    VIETNAM (SOUTH)| 1970-04-27|
|    UH-1|    VIETNAM (SOUTH)| 1975-02-10|
|    A-37|    VIETNAM (SOUTH)| 1971-03-03|
+--------+-

In [18]:
query = """SELECT COUNT(*) AS NUM_OF_ROWS FROM Bombing_Operations"""
result1 = spark.sql(query)
result1.show()

+-----------+
|NUM_OF_ROWS|
+-----------+
|    4400775|
+-----------+



In [19]:
Bombing_operations.show()

+--------+--------------------+-----------+------------------+-----------+---------------+-------------+------------+--------------------+-------------------+
|AirCraft| ContryFlyingMission|MissionDate|OperationSupported|PeriodOfDay|TakeoffLocation|TargetCountry|TimeOnTarget|          WeaponType|WeaponsLoadedWeight|
+--------+--------------------+-----------+------------------+-----------+---------------+-------------+------------+--------------------+-------------------+
|   EC-47|UNITED STATES OF ...| 1971-06-05|              NULL|          D|   TAN SON NHUT|     CAMBODIA|      1005.0|                NULL|                  0|
|   EC-47|UNITED STATES OF ...| 1972-12-26|              NULL|          D|  NAKHON PHANOM|SOUTH VIETNAM|       530.0|                NULL|                  0|
|    RF-4|UNITED STATES OF ...| 1973-07-28|              NULL|          D|       UDORN AB|         LAOS|       730.0|                NULL|                  0|
|     A-1|UNITED STATES OF ...| 1970-02-02|   