<a href="https://colab.research.google.com/github/Lucasrichards19/Data-science/blob/main/LucasRichards_M5_Setting_Up_Spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Setting up Spark**:

Before you can connect to a Spark cluster, Spark needs to be installed. The code below is boilerplate code that can be used to set-up Spark. Please note that this code will be leveraged in all the notebooks since each nodebook is a separate entity.

In [1]:
# Update environment (if needed)
# !sudo apt update

# Download and install Java
!sudo apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Install Apache Spark 3.2.1 with Hadoop 3.2
!wget -nc -q https://archive.apache.org/dist/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz

# Unzip the folder
!tar xf  spark-3.2.1-bin-hadoop3.2.tgz

# Install findspark library that will locate Spark on the system
!pip install -q findspark

debconf: unable to initialize frontend: Dialog
debconf: (No usable dialog-like program is installed, so the dialog based frontend cannot be used. at /usr/share/perl5/Debconf/FrontEnd/Dialog.pm line 76, <> line 2.)
debconf: falling back to frontend: Readline
debconf: unable to initialize frontend: Readline
debconf: (This frontend requires a controlling tty.)
debconf: falling back to frontend: Teletype
dpkg-preconfigure: unable to re-open stdin: 


In [2]:
# Setting the environment variables

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"

In [3]:
# Initialize Saprk

import findspark
findspark.init()

**PySpark** provides a Python wrapper on top of Spark -  import SparkSession which provides a single point of entry to interact with underlying Spark functionality

In [4]:
# Import SparkSession from pyspark.sql
from pyspark.sql import SparkSession

# Create spark session
spark = SparkSession.builder.getOrCreate()

spark.version

'3.2.1'

In [5]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


List the contents of Google Drive

In [7]:
# After executing the cell above, Drive
# files will be present in "/content/My Drive".
!ls "/content/drive/My Drive/"

 casamiento.docx	    Sustentate
'Colab Notebooks'	   'TESIS   Wake park'
'Party Invite.gform'	   'Untitled document.gdoc'
 RSVP.gform		   'Untitled presentation.gslides'
'sectors_perf (3).gsheet'


**Using DataFrames**

Spark's main data structure is the Resilient Distributed Dataset (RDD) which is a low level object that allows Spark to process large quantities of data by spreading data across multiple nodes in the cluster. However, RDDs are hard to work with directly; therefore, park DataFrame will be utilized since it provides an abstraction built on top of RDDs.

The Spark DataFrame behaves a lot like a SQL table (a table contains attributes in the columns and observations in the rows). Not only are DataFrames easier to work with, they are also more optimized for complicated operations than RDDs.

Read the Airlines.csv file from your Google Drive into a Spark DataFrame

In [10]:
df = spark.read.option("header", "true").csv("/content/drive/My Drive/airline.csv")

Display the contents of the DataFrame

In [11]:
df.limit(20).show()

+--------+----------+---------+--------+----+------+-------------+----+
|ArrDelay|CRSDepTime|DayOfWeek|DepDelay|Dest|Origin|UniqueCarrier|Year|
+--------+----------+---------+--------+----+------+-------------+----+
|      91|      2045|        2|     108| BOS|   ORD|           UA|2005|
|      81|      1905|        2|      95| PDX|   LAX|           UA|2005|
|      -4|      1042|        5|      -3| ORD|   DAY|           UA|2005|
|     -11|      1734|        5|      -1| DEN|   PIT|           UA|2005|
|     -12|      1010|        5|       0| IAH|   ORD|           UA|2005|
|       1|       712|        1|       0| DEN|   BUR|           UA|2005|
|     144|      1955|        7|     115| SLC|   ORD|           UA|2005|
|      10|      1930|        2|      22| MSP|   DEN|           UA|2005|
|       0|       925|        5|      -4| DEN|   MCI|           UA|2005|
|      97|      1711|        4|      89| SEA|   DEN|           UA|2005|
|      -9|      1300|        3|      -5| PDX|   SFO|           U

Output the column names and the "rows" and "column" counts

In [12]:
#outputing the column names
df.columns #Column Names


['ArrDelay',
 'CRSDepTime',
 'DayOfWeek',
 'DepDelay',
 'Dest',
 'Origin',
 'UniqueCarrier',
 'Year']

In [13]:
#Outputing the row count
df.count()  #Row Count

1234729

In [14]:
#Output the column count
len(df.columns) #Column Count

8

In order to examine the summary of any particular column of a DataFrame, we use the describe method. 

The describe "method" gives us the statistical summary of the given column

In [15]:
#Examine the summary of the "DepDelay" column
df.describe('DepDelay').show()

+-------+-----------------+
|summary|         DepDelay|
+-------+-----------------+
|  count|          1234729|
|   mean|8.197566746135685|
| stddev|28.78716040501349|
|    min|               -1|
|    max|               NA|
+-------+-----------------+



In order to select particular columns from the DataFrame, use the select method

In [16]:
df.select('ArrDelay','DepDelay').show()

+--------+--------+
|ArrDelay|DepDelay|
+--------+--------+
|      91|     108|
|      81|      95|
|      -4|      -3|
|     -11|      -1|
|     -12|       0|
|       1|       0|
|     144|     115|
|      10|      22|
|       0|      -4|
|      97|      89|
|      -9|      -5|
|      69|      63|
|      -3|      12|
|       1|      26|
|     -14|     -10|
|       4|      11|
|      23|      -4|
|      -5|      14|
|     -11|      11|
|     -13|      -4|
+--------+--------+
only showing top 20 rows



Selecting Distinct Multiple Columns

In [17]:
df.select('ArrDelay','DepDelay').distinct().show()

+--------+--------+
|ArrDelay|DepDelay|
+--------+--------+
|     -20|      -6|
|       5|      10|
|     -23|      -5|
|      37|      45|
|     101|     100|
|       6|      -9|
|      26|       0|
|     197|     211|
|      50|      46|
|      69|      77|
|     116|      80|
|       2|      22|
|      74|      -6|
|      71|      77|
|      39|       4|
|      85|      95|
|     -11|      17|
|      31|       3|
|     110|      95|
|      49|      17|
+--------+--------+
only showing top 20 rows



Filtering Data

In [18]:
df.filter(df.Origin=='LGA').show()

+--------+----------+---------+--------+----+------+-------------+----+
|ArrDelay|CRSDepTime|DayOfWeek|DepDelay|Dest|Origin|UniqueCarrier|Year|
+--------+----------+---------+--------+----+------+-------------+----+
|     -14|       800|        1|       3| ORD|   LGA|           UA|2005|
|      -6|       700|        5|      -1| ORD|   LGA|           UA|2005|
|      NA|      1400|        4|      NA| ORD|   LGA|           UA|2005|
|      27|      1400|        3|      -3| ORD|   LGA|           UA|2005|
|      -6|       900|        3|      -2| ORD|   LGA|           UA|2005|
|       4|      1403|        1|      -7| DEN|   LGA|           UA|2005|
|      12|       745|        5|       4| DEN|   LGA|           UA|2005|
|     -16|      1530|        7|      -7| PIT|   LGA|           US|2005|
|     -14|       715|        3|      -5| CLT|   LGA|           US|2005|
|     -12|       700|        1|      -6| BOS|   LGA|           US|2005|
|       1|       800|        2|      -2| BOS|   LGA|           U

Filtering Data (Multiple Parameters)

In [19]:
df.filter((df.Dest=='GSO') & (df.Origin=='LGA')).show()

+--------+----------+---------+--------+----+------+-------------+----+
|ArrDelay|CRSDepTime|DayOfWeek|DepDelay|Dest|Origin|UniqueCarrier|Year|
+--------+----------+---------+--------+----+------+-------------+----+
|      -2|      1000|        3|      -6| GSO|   LGA|           OH|2005|
|       8|      1000|        3|       0| GSO|   LGA|           OH|2005|
|     -19|      1815|        3|      -1| GSO|   LGA|           OH|2005|
|      -4|      1000|        3|      -5| GSO|   LGA|           OH|2005|
|      22|      1805|        6|      50| GSO|   LGA|           OH|2005|
|      -7|      1627|        1|       3| GSO|   LGA|           OH|2005|
|     -18|      1000|        2|       0| GSO|   LGA|           OH|2005|
|       2|      1805|        3|       0| GSO|   LGA|           OH|2005|
|      35|      1805|        2|      40| GSO|   LGA|           OH|2005|
|      -8|      1000|        6|     -10| GSO|   LGA|           OH|2005|
|      -7|      1627|        7|      -4| GSO|   LGA|           O

**Performing SQL Queries** - SQL queries can be passes directly to any DataFrame; for that, we need to create a table from the DataFrame using the registerTempTable

In [20]:
df.registerTempTable('airlines')



Typically the entry point into all SQL functionality in Spark is the SQLContext class. To create a basic instance of this call, all we need is a SparkContext reference.

In [21]:
spark.sql('select * from airlines').show()

+--------+----------+---------+--------+----+------+-------------+----+
|ArrDelay|CRSDepTime|DayOfWeek|DepDelay|Dest|Origin|UniqueCarrier|Year|
+--------+----------+---------+--------+----+------+-------------+----+
|      91|      2045|        2|     108| BOS|   ORD|           UA|2005|
|      81|      1905|        2|      95| PDX|   LAX|           UA|2005|
|      -4|      1042|        5|      -3| ORD|   DAY|           UA|2005|
|     -11|      1734|        5|      -1| DEN|   PIT|           UA|2005|
|     -12|      1010|        5|       0| IAH|   ORD|           UA|2005|
|       1|       712|        1|       0| DEN|   BUR|           UA|2005|
|     144|      1955|        7|     115| SLC|   ORD|           UA|2005|
|      10|      1930|        2|      22| MSP|   DEN|           UA|2005|
|       0|       925|        5|      -4| DEN|   MCI|           UA|2005|
|      97|      1711|        4|      89| SEA|   DEN|           UA|2005|
|      -9|      1300|        3|      -5| PDX|   SFO|           U

In [24]:
spark.sql('select distinct(Dest) from airlines').show(10)

+----+
|Dest|
+----+
| BGM|
| DLG|
| PSE|
| INL|
| MSY|
| GEG|
| SNA|
| BUR|
| GRB|
| GTF|
+----+
only showing top 10 rows



In [25]:
import datetime
import pytz

datetime.datetime.now(pytz.timezone('US/Central')).strftime("%a, %d %B %Y %H:%M:%S")

'Thu, 03 November 2022 20:12:35'