# San Francisco Air Traffic Passenger Statistics

# [Google Colaboratory](https://colab.research.google.com/notebooks/intro.ipynb)

### Download and install Spark

In [1]:
!apt-get update 
!apt-get install openjdk-8-jdk-headless -qq > /dev/null 
!wget -q https://downloads.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
!tar xf spark-3.1.2-bin-hadoop3.2.tgz 
!pip install -q findspark

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Get:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Ign:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Hit:4 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:5 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Ign:6 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Get:7 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release [697 B]
Hit:8 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:9 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Get:10 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release.gpg [836 B]
Hit:11 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Get:12 http://archive.ubunt

### Setup environment

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"
import findspark
findspark.init()
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate() 
spark

### Import San Francisco Air Traffic Passenger Statistics Data from DataSF




In [3]:
!wget -O SFO.csv https://data.sfgov.org/api/views/rkru-6vcg/rows.csv?accessType=DOWNLOAD

--2021-07-05 20:50:55--  https://data.sfgov.org/api/views/rkru-6vcg/rows.csv?accessType=DOWNLOAD
Resolving data.sfgov.org (data.sfgov.org)... 52.206.140.199, 52.206.68.26, 52.206.140.205
Connecting to data.sfgov.org (data.sfgov.org)|52.206.140.199|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified [text/csv]
Saving to: ‘SFO.csv’

SFO.csv                 [  <=>               ]   2.30M  7.33MB/s    in 0.3s    

2021-07-05 20:50:55 (7.33 MB/s) - ‘SFO.csv’ saved [2417417]



In [4]:
ls

[0m[01;34msample_data[0m/  [01;34mspark-3.1.2-bin-hadoop3.2[0m/
SFO.csv       spark-3.1.2-bin-hadoop3.2.tgz


In [5]:
SFO = spark.read.csv('SFO.csv',header=True)
SFO.show(5)

+---------------+-----------------+---------------------------+-----------------+---------------------------+-------------+----------+------------------+-------------------+----------+-------------+---------------+
|Activity Period|Operating Airline|Operating Airline IATA Code|Published Airline|Published Airline IATA Code|  GEO Summary|GEO Region|Activity Type Code|Price Category Code|  Terminal|Boarding Area|Passenger Count|
+---------------+-----------------+---------------------------+-----------------+---------------------------+-------------+----------+------------------+-------------------+----------+-------------+---------------+
|         200507|     ATA Airlines|                         TZ|     ATA Airlines|                         TZ|     Domestic|        US|          Deplaned|           Low Fare|Terminal 1|            B|          27271|
|         200507|     ATA Airlines|                         TZ|     ATA Airlines|                         TZ|     Domestic|        US|      

In [6]:
SFO = SFO.withColumnRenamed("Activity Period", "Activity_Period").withColumnRenamed(
    "Operating Airline", "Operating_Airline").withColumnRenamed(
    "Operating Airline IATA Code", "Operating_Airline_IATA_Code").withColumnRenamed(
    "Published Airline", "Published_Airline").withColumnRenamed(
    "Published Airline IATA Code", "Published_Airline_IATA_Code").withColumnRenamed(
    "GEO Summary", "GEO_Summary").withColumnRenamed(
    "GEO Region", "GEO_Region").withColumnRenamed(
    "Activity Type Code", "Activity_Type_Code").withColumnRenamed(
    "Price Category Code", "Price_Category_Code").withColumnRenamed(
    "Boarding Area", "Boarding_Area").withColumnRenamed(
    "Passenger Count", "Passenger_Count"
)

In [7]:
from pyspark.sql.functions import col

SFO = SFO.withColumn("Activity_Period", col("Activity_Period").cast("int"))
SFO = SFO.withColumn("Passenger_Count", col("Passenger_Count").cast("int"))

In [8]:
SFO.describe().show()

+-------+------------------+--------------------+---------------------------+--------------------+---------------------------+-------------+----------+------------------+-------------------+-------------+-------------+------------------+
|summary|   Activity_Period|   Operating_Airline|Operating_Airline_IATA_Code|   Published_Airline|Published_Airline_IATA_Code|  GEO_Summary|GEO_Region|Activity_Type_Code|Price_Category_Code|     Terminal|Boarding_Area|   Passenger_Count|
+-------+------------------+--------------------+---------------------------+--------------------+---------------------------+-------------+----------+------------------+-------------------+-------------+-------------+------------------+
|  count|             23178|               23178|                      23104|               23178|                      23104|        23178|     23178|             23178|              23178|        23178|        23178|             23178|
|   mean|201317.78233669858|                null

In [9]:
SFO.stat.freqItems(('Operating_Airline','Price_Category_Code','Terminal')).show()

+---------------------------+-----------------------------+--------------------+
|Operating_Airline_freqItems|Price_Category_Code_freqItems|  Terminal_freqItems|
+---------------------------+-----------------------------+--------------------+
|       [Horizon Air, Mes...|            [Low Fare, Other]|[International, T...|
+---------------------------+-----------------------------+--------------------+



In [10]:
SFO.schema

StructType(List(StructField(Activity_Period,IntegerType,true),StructField(Operating_Airline,StringType,true),StructField(Operating_Airline_IATA_Code,StringType,true),StructField(Published_Airline,StringType,true),StructField(Published_Airline_IATA_Code,StringType,true),StructField(GEO_Summary,StringType,true),StructField(GEO_Region,StringType,true),StructField(Activity_Type_Code,StringType,true),StructField(Price_Category_Code,StringType,true),StructField(Terminal,StringType,true),StructField(Boarding_Area,StringType,true),StructField(Passenger_Count,IntegerType,true)))

In [11]:
SFO.filter(SFO["GEO_Summary"]=="Domestic").count()

8262

In [12]:
SFO.filter(SFO["GEO_Summary"]=="International").count()

14916

In [13]:
SFO.groupBy("GEO_Region").count().orderBy("count").show()

+-------------------+-----+
|         GEO_Region|count|
+-------------------+-----+
|      South America|   91|
|        Middle East|  487|
|    Central America|  501|
|Australia / Oceania| 1246|
|             Mexico| 1768|
|             Canada| 2139|
|             Europe| 3721|
|               Asia| 4963|
|                 US| 8262|
+-------------------+-----+



In [14]:
SFO.groupby('GEO_Region').agg({'Passenger_Count': 'mean'}).orderBy("avg(Passenger_Count)").show()

+-------------------+--------------------+
|         GEO_Region|avg(Passenger_Count)|
+-------------------+--------------------+
|      South America|   2756.285714285714|
|    Central America|   5399.469061876248|
|Australia / Oceania|   6259.567415730337|
|             Mexico|    7588.08371040724|
|        Middle East|   7986.381930184805|
|             Canada|   9751.680224403926|
|             Europe|  11369.328943832303|
|               Asia|  13266.305460407011|
|                 US|    62151.4109174534|
+-------------------+--------------------+



In [15]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("Spark SQL Query Dataframes") \
    .getOrCreate()

In [16]:
SFO.createOrReplaceTempView("AirTraffic")

In [17]:
SFO_sql = spark.sql("SELECT * FROM AirTraffic LIMIT 5")
SFO_sql.show()

+---------------+-----------------+---------------------------+-----------------+---------------------------+-------------+----------+------------------+-------------------+----------+-------------+---------------+
|Activity_Period|Operating_Airline|Operating_Airline_IATA_Code|Published_Airline|Published_Airline_IATA_Code|  GEO_Summary|GEO_Region|Activity_Type_Code|Price_Category_Code|  Terminal|Boarding_Area|Passenger_Count|
+---------------+-----------------+---------------------------+-----------------+---------------------------+-------------+----------+------------------+-------------------+----------+-------------+---------------+
|         200507|     ATA Airlines|                         TZ|     ATA Airlines|                         TZ|     Domestic|        US|          Deplaned|           Low Fare|Terminal 1|            B|          27271|
|         200507|     ATA Airlines|                         TZ|     ATA Airlines|                         TZ|     Domestic|        US|      

In [18]:
SFO_sql = spark.sql("SELECT GEO_Region, Terminal As T, Passenger_Count AS PC FROM AirTraffic LIMIT 10")
SFO_sql.show()

+-------------------+-------------+-----+
|         GEO_Region|            T|   PC|
+-------------------+-------------+-----+
|                 US|   Terminal 1|27271|
|                 US|   Terminal 1|29131|
|                 US|   Terminal 1| 5415|
|             Canada|   Terminal 1|35156|
|             Canada|   Terminal 1|34090|
|               Asia|International| 6263|
|               Asia|International| 5500|
|             Europe|International|12050|
|             Europe|International|11638|
|Australia / Oceania|International| 4998|
+-------------------+-------------+-----+



In [19]:
df_sql = spark.sql("SELECT * FROM AirTraffic WHERE Activity_Period = 202003")
df_sql.show(5)
df_sql.count()

+---------------+--------------------+---------------------------+--------------------+---------------------------+-------------+----------+------------------+-------------------+-------------+-------------+---------------+
|Activity_Period|   Operating_Airline|Operating_Airline_IATA_Code|   Published_Airline|Published_Airline_IATA_Code|  GEO_Summary|GEO_Region|Activity_Type_Code|Price_Category_Code|     Terminal|Boarding_Area|Passenger_Count|
+---------------+--------------------+---------------------------+--------------------+---------------------------+-------------+----------+------------------+-------------------+-------------+-------------+---------------+
|         202003|ABC Aerolineas S....|                         4O|ABC Aerolineas S....|                         4O|International|    Mexico|          Deplaned|              Other|International|            A|           2158|
|         202003|ABC Aerolineas S....|                         4O|ABC Aerolineas S....|                 

140

In [20]:
df_sql = spark.sql("SELECT * FROM AirTraffic WHERE Passenger_Count > 5000 AND Activity_Period < 202003 ")
df_sql.count()

16814

In [21]:
df_sql = spark.sql("SELECT * \
                    FROM AirTraffic \
                    WHERE Passenger_Count > 5000 AND Activity_Period > 202002 \
                    ORDER BY Activity_Period ASC")
df_sql.show(5)
df_sql.count()

+---------------+-----------------+---------------------------+-----------------+---------------------------+-------------+----------+------------------+-------------------+-------------+-------------+---------------+
|Activity_Period|Operating_Airline|Operating_Airline_IATA_Code|Published_Airline|Published_Airline_IATA_Code|  GEO_Summary|GEO_Region|Activity_Type_Code|Price_Category_Code|     Terminal|Boarding_Area|Passenger_Count|
+---------------+-----------------+---------------------------+-----------------+---------------------------+-------------+----------+------------------+-------------------+-------------+-------------+---------------+
|         202003|       Aeromexico|                         AM|       Aeromexico|                         AM|International|    Mexico|          Deplaned|              Other|International|            A|           5734|
|         202003|       Aeromexico|                         AM|       Aeromexico|                         AM|International|    M

378

In [22]:
df_count = spark.sql("SELECT count(*) FROM AirTraffic")
df_count.show()

+--------+
|count(1)|
+--------+
|   23178|
+--------+



In [23]:
df_sql = spark.sql("SELECT GEO_Region, min(Passenger_Count), round(avg(Passenger_Count),0), max(Passenger_Count) \
                    FROM AirTraffic \
                    WHERE Activity_Period > 202002 \
                    GROUP BY GEO_Region \
                    ORDER BY count(*) DESC")
df_sql.show()

+-------------------+--------------------+----------------------------------------------+--------------------+
|         GEO_Region|min(Passenger_Count)|round(avg(CAST(Passenger_Count AS BIGINT)), 0)|max(Passenger_Count)|
+-------------------+--------------------+----------------------------------------------+--------------------+
|                 US|                   3|                                       20675.0|              177513|
|               Asia|                   1|                                        2235.0|               20815|
|             Europe|                  56|                                        1691.0|               17391|
|             Mexico|                  18|                                        3453.0|               14801|
|             Canada|                  28|                                        1716.0|               20478|
|Australia / Oceania|                  94|                                        2525.0|               19811|
|

In [24]:
df_sql = spark.sql("SELECT GEO_Region, min(Passenger_Count), round(avg(Passenger_Count),0), max(Passenger_Count) \
                    FROM AirTraffic \
                    WHERE Activity_Period < 202001 \
                    GROUP BY GEO_Region \
                    ORDER BY count(*) DESC")
df_sql.show()

+-------------------+--------------------+----------------------------------------------+--------------------+
|         GEO_Region|min(Passenger_Count)|round(avg(CAST(Passenger_Count AS BIGINT)), 0)|max(Passenger_Count)|
+-------------------+--------------------+----------------------------------------------+--------------------+
|                 US|                   1|                                       64492.0|              659837|
|               Asia|                   2|                                       13882.0|              102939|
|             Europe|                   1|                                       11968.0|               66961|
|             Canada|                   1|                                       10064.0|               61749|
|             Mexico|                   1|                                        7807.0|               35205|
|Australia / Oceania|                  10|                                        6382.0|               24404|
|

In [25]:
sql_window = "SELECT Activity_Period, Activity_Type_Code,  \
              avg(Passenger_Count) OVER (PARTITION BY GEO_Region)  \
FROM  \
      AirTraffic"

In [26]:
spark.sql(sql_window).show()

+---------------+------------------+----------------------------------------------------------------------------------------------------------------------------+
|Activity_Period|Activity_Type_Code|avg(CAST(Passenger_Count AS BIGINT)) OVER (PARTITION BY GEO_Region ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)|
+---------------+------------------+----------------------------------------------------------------------------------------------------------------------------+
|         200507|          Deplaned|                                                                                                          11369.328943832303|
|         200507|          Enplaned|                                                                                                          11369.328943832303|
|         200507|          Deplaned|                                                                                                          11369.328943832303|
|         200507|          E

Machine Learning - Clustering

In [27]:
from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans

In [28]:
vectorAssembler = VectorAssembler(inputCols=["Activity_Period", "Passenger_Count"], outputCol="features")

In [29]:
vcluster_df = vectorAssembler.transform(SFO)

In [30]:
vcluster_df.show()

+---------------+-----------------+---------------------------+-----------------+---------------------------+-------------+-------------------+------------------+-------------------+-------------+-------------+---------------+------------------+
|Activity_Period|Operating_Airline|Operating_Airline_IATA_Code|Published_Airline|Published_Airline_IATA_Code|  GEO_Summary|         GEO_Region|Activity_Type_Code|Price_Category_Code|     Terminal|Boarding_Area|Passenger_Count|          features|
+---------------+-----------------+---------------------------+-----------------+---------------------------+-------------+-------------------+------------------+-------------------+-------------+-------------+---------------+------------------+
|         200507|     ATA Airlines|                         TZ|     ATA Airlines|                         TZ|     Domestic|                 US|          Deplaned|           Low Fare|   Terminal 1|            B|          27271|[200507.0,27271.0]|
|         200507

In [31]:
kmeans = KMeans().setK(3)
kmeans = kmeans.setSeed(1)

In [32]:
kmodel = kmeans.fit(vcluster_df)

In [33]:
kmodel.clusterCenters()

[array([201317.51953409,  11808.76709537]),
 array([201386.23643411, 349898.41472868]),
 array([201303.24307244, 119928.5352455 ])]

Machine Learning - Linear Regression

In [34]:
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

In [35]:
vectorAssembler = VectorAssembler(inputCols=["Activity_Period"], outputCol="features")
SFO = vectorAssembler.transform(SFO)
SFO.show()

+---------------+-----------------+---------------------------+-----------------+---------------------------+-------------+-------------------+------------------+-------------------+-------------+-------------+---------------+----------+
|Activity_Period|Operating_Airline|Operating_Airline_IATA_Code|Published_Airline|Published_Airline_IATA_Code|  GEO_Summary|         GEO_Region|Activity_Type_Code|Price_Category_Code|     Terminal|Boarding_Area|Passenger_Count|  features|
+---------------+-----------------+---------------------------+-----------------+---------------------------+-------------+-------------------+------------------+-------------------+-------------+-------------+---------------+----------+
|         200507|     ATA Airlines|                         TZ|     ATA Airlines|                         TZ|     Domestic|                 US|          Deplaned|           Low Fare|   Terminal 1|            B|          27271|[200507.0]|
|         200507|     ATA Airlines|             

In [36]:
lr = LinearRegression(featuresCol="features",labelCol="Passenger_Count")

In [37]:
lrModel = lr.fit(SFO)

In [38]:
lrModel.coefficients

DenseVector([1.2683])

In [39]:
lrModel.intercept

-226403.4714729307

In [40]:
lrModel.summary.rootMeanSquaredError

60882.80032036311