# Imports

In [52]:
import findspark
findspark.init()

In [53]:
import pyspark
from pyspark.sql import SparkSession, Row

In [54]:
from pyspark.sql.functions import udf,col, desc, rank
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
from pyspark.sql.window import Window

In [55]:
from FlightRadar24.api import FlightRadar24API


In [56]:
import pandas as pd

In [57]:
from geopy import distance

In [58]:
spark = SparkSession.builder.getOrCreate()

# DataFrames

In [59]:
df_airports = spark.read.parquet("../data/Airports.parquet")

In [60]:
df_airports.show(5)

+--------------------+-------+---------+----+---------+---------+
|                name|country|Continent|iata|      lat|      lon|
+--------------------+-------+---------+----+---------+---------+
|\tMasai Mara Keek...|  Kenya|   africa| KEU|-1.586377|35.257462|
|    A Coruna Airport|  Spain|   europe| LCG|43.302059| -8.37725|
|Aachen Merzbruck ...|Germany|   europe| AAH|50.823051| 6.186111|
|     Aalborg Airport|Denmark|   europe| AAL|57.092781| 9.849164|
|      Aarhus Airport|Denmark|   europe| AAR|56.300011|   10.619|
+--------------------+-------+---------+----+---------+---------+
only showing top 5 rows



In [61]:
df_airline_countries = spark.read.parquet("../data/AirlineCountries.parquet")
df_airline_countries.show(5)

+----+-------------+
|ICAO|      Country|
+----+-------------+
| BOI|  Philippines|
| EVY|             |
| GNL|United States|
|    |       Global|
|    |         APAC|
+----+-------------+
only showing top 5 rows



In [62]:
spark.read.parquet("../data/Flights.parquet").show(5)

+--------+---------+--------+-----+------------+-----+-------+-----------------+
|      id|dest_iata|org_iata|model|registration|speed|company|__index_level_0__|
+--------+---------+--------+-----+------------+-----+-------+-----------------+
|2e3aca80|      CVG|     QRO| B762|      N999YV|  509|    CSB|                0|
|2e3ae4bc|      BOH|     PFO| B737|      G-NEWG|  412|    BRO|                1|
|2e3ad269|      HAK|     KWE| B738|      B-1552|  392|    JYH|                2|
|2e3a2d8c|      CVG|     CGN| B763|      N226CY|  436|    ABX|                3|
|2e3a8e65|      LAX|     CVG| B763|      N362CM|  442|    ABX|                4|
+--------+---------+--------+-----+------------+-----+-------+-----------------+
only showing top 5 rows



# Q1

In [63]:
def most_active_airline():
    df_airlines = spark.read.parquet("../data/Airlines.parquet")
    df_flights = spark.read.parquet("../data/Flights.parquet").select("company")

    count = df_flights.groupBy('company').count().orderBy("count",ascending=False)
    return(df_airlines.filter(df_airlines.ICAO == count.head()["company"]).head()["Name"] , count.head()["count"])

In [64]:
most_active_airline()

('American Airlines', 638)

# Q2

In [65]:
def active_by_continent():
    df= spark.read.parquet("../data/Flights.parquet").select("dest_iata","org_iata","company")

    df1 = df.join(df_airports.select("iata","Continent"), df.org_iata == df_airports.iata , "inner")\
        .select("dest_iata","company","Continent")
        
    df1 = df1.withColumnRenamed("Continent","org_continent")\
        .join(df_airports.select("iata","Continent"), df.dest_iata == df_airports.iata , "inner")\
        .select("company","org_Continent","Continent")

    return(df1.filter(df1.org_Continent == df1.Continent).select("company","Continent"))

In the app :


In [66]:
df1 = active_by_continent()

In [67]:
df2 = df1.groupBy("Continent","company").count()

In [68]:
fr_api = FlightRadar24API()
continents = fr_api.get_zones().keys()

In [69]:
for continent in continents :
    count = df1.filter(df1.Continent == continent)\
        .groupBy("Continent","company")\
        .count().orderBy("count",ascending=False)
    print(count.head())


Row(Continent='europe', company='RYR', count=166)
Row(Continent='northamerica', company='AAL', count=597)
Row(Continent='southamerica', company='LAN', count=107)
Row(Continent='oceania', company='AXM', count=55)
Row(Continent='asia', company='IGO', count=128)
Row(Continent='africa', company='ETH', count=30)
None
None
None


# Q3

In [70]:
df_airports.show(5)

+--------------------+-------+---------+----+---------+---------+
|                name|country|Continent|iata|      lat|      lon|
+--------------------+-------+---------+----+---------+---------+
|\tMasai Mara Keek...|  Kenya|   africa| KEU|-1.586377|35.257462|
|    A Coruna Airport|  Spain|   europe| LCG|43.302059| -8.37725|
|Aachen Merzbruck ...|Germany|   europe| AAH|50.823051| 6.186111|
|     Aalborg Airport|Denmark|   europe| AAL|57.092781| 9.849164|
|      Aarhus Airport|Denmark|   europe| AAR|56.300011|   10.619|
+--------------------+-------+---------+----+---------+---------+
only showing top 5 rows



In [71]:
df3 = spark.read.parquet("../data/Flights.parquet", columns = ["id","dest_iata","org_iata"])
airport_pos = spark.read.parquet("../data/Airports.parquet").select("iata","lat","lon","name")

# Origin Data
df3 = df3.join(airport_pos, df3.org_iata == airport_pos.iata , "inner").\
    select("lat","lon","id","dest_iata","name")\
    .withColumnRenamed("lat","org_lat")\
    .withColumnRenamed("lon","org_lon")\
    .withColumnRenamed("name","org_name")

# Destination Data
df3 = df3.join(airport_pos, df3.dest_iata == airport_pos.iata , "inner").\
    select("lat","lon","id","org_lat","org_lon","org_name","name")\
    .withColumnRenamed("lat","dest_lat")\
    .withColumnRenamed("lon","dest_lon")\
    .withColumnRenamed("name","dest_name")
df3.show(5)

+---------+----------+--------+---------+----------+--------------------+--------------------+
| dest_lat|  dest_lon|      id|  org_lat|   org_lon|            org_name|           dest_name|
+---------+----------+--------+---------+----------+--------------------+--------------------+
|55.616959| 12.645637|2e3aea91|57.092781|  9.849164|     Aalborg Airport|  Copenhagen Airport|
|51.471626| -0.467081|2e3ae9fa|57.201939|  -2.19777|Aberdeen Internat...|London Heathrow A...|
| 44.88195|-93.221703|2e3ac6d9|45.439999|-98.419998|Aberdeen Regional...|Minneapolis Saint...|
| 21.67956| 39.156528|2e3aeffe| 18.24036|  42.65662|Abha Regional Air...|Jeddah King Abdul...|
| 24.95764| 46.698769|2e3ae3ce| 18.24036|  42.65662|Abha Regional Air...|Riyadh King Khali...|
+---------+----------+--------+---------+----------+--------------------+--------------------+
only showing top 5 rows



In [72]:
def rowwise_function(row):
    # convert row to python dictionary:
    row_dict = row.asDict()
    # Add a new key in the dictionary with the new column name and value.
    # This might be a big complex function.
    row_dict['dist'] =  distance.distance((row_dict["org_lat"],row_dict["org_lon"]),(row_dict["dest_lat"],row_dict["dest_lon"])).km
    # convert dict to row back again:
    newrow = Row(**row_dict)
    # return new row
    return newrow

In [73]:
df3_rdd = df3.rdd# apply our function to RDD
df3_rdd_new = df3_rdd.map(lambda row: rowwise_function(row))
df3NewDf = spark.createDataFrame(df3_rdd_new)# Convert RDD Back to DataFrame
df3NewDf.show(5)

+---------+----------+--------+---------+----------+--------------------+--------------------+------------------+
| dest_lat|  dest_lon|      id|  org_lat|   org_lon|            org_name|           dest_name|              dist|
+---------+----------+--------+---------+----------+--------------------+--------------------+------------------+
|55.616959| 12.645637|2e3aea91|57.092781|  9.849164|     Aalborg Airport|  Copenhagen Airport|238.48286606060464|
|51.471626| -0.467081|2e3ae9fa|57.201939|  -2.19777|Aberdeen Internat...|London Heathrow A...| 647.6482629706971|
| 44.88195|-93.221703|2e3ac6d9|45.439999|-98.419998|Aberdeen Regional...|Minneapolis Saint...|413.32219375228965|
| 21.67956| 39.156528|2e3aeffe| 18.24036|  42.65662|Abha Regional Air...|Jeddah King Abdul...|   528.31964824529|
| 24.95764| 46.698769|2e3ae3ce| 18.24036|  42.65662|Abha Regional Air...|Riyadh King Khali...| 853.2848249708582|
+---------+----------+--------+---------+----------+--------------------+---------------

In [74]:
df3NewDf.select("org_name","dest_name","dist").orderBy("dist", ascending=False).first()

Row(org_name='New York John F. Kennedy International Airport', dest_name='Singapore Changi Airport', dist=15348.64044117391)

# Q4

In [75]:
df4 = spark.read.parquet("../data/Flights.parquet").select(["org_iata","dest_iata"])
airport_pos_4 = spark.read.parquet("../data/Airports.parquet").select("iata","lat","lon","name","Continent")

# Origin Data
df4 = df4.join(airport_pos_4, df4.org_iata == airport_pos_4.iata , "inner").\
    select("lat","lon","dest_iata","Continent")\
    .withColumnRenamed("lat","org_lat")\
    .withColumnRenamed("lon","org_lon")\
    .withColumnRenamed("Continent","org_Continent")

# Destination Data
df4 = df4.join(airport_pos_4, df4.dest_iata == airport_pos_4.iata , "inner").\
    select("lat","lon","org_lat","org_lon","org_Continent","Continent")\
    .withColumnRenamed("lat","dest_lat")\
    .withColumnRenamed("lon","dest_lon")\
    .withColumnRenamed("Continent","dest_Continent")

df4.show(5)

+---------+----------+---------+----------+-------------+--------------+
| dest_lat|  dest_lon|  org_lat|   org_lon|org_Continent|dest_Continent|
+---------+----------+---------+----------+-------------+--------------+
|55.616959| 12.645637|57.092781|  9.849164|       europe|        europe|
|51.471626| -0.467081|57.201939|  -2.19777|       europe|        europe|
| 44.88195|-93.221703|45.439999|-98.419998| northamerica|  northamerica|
| 21.67956| 39.156528| 18.24036|  42.65662|         asia|        africa|
| 24.95764| 46.698769| 18.24036|  42.65662|         asia|          asia|
+---------+----------+---------+----------+-------------+--------------+
only showing top 5 rows



In [76]:
df4_rdd = df4.filter(df4.dest_Continent == df4.org_Continent).drop("dest_Continent").rdd
df4_rdd_new = df4_rdd.map(lambda row : rowwise_function(row))
df4NewDF = spark.createDataFrame(df4_rdd_new).withColumnRenamed("org_Continent","Continent").select("Continent","dist")
df4NewDF.groupBy("continent").agg({"dist":"mean"}).\
    withColumnRenamed("avg(dist)","average distance by flight").\
    sort(desc("average distance by flight")).\
    show()

+------------+--------------------------+
|   continent|average distance by flight|
+------------+--------------------------+
|     oceania|        1896.1725617106347|
|        asia|        1662.4231921254027|
|      africa|        1559.0177451418008|
|northamerica|        1524.5624281217354|
|      europe|        1263.5200825082873|
|southamerica|        1169.1745671471888|
+------------+--------------------------+



# Q5.1

In [77]:
df_airlines = spark.read.parquet("../data/Airlines.parquet")
print(df_airlines.count())
df_airlines.show(5)

1807
+----------------+----+
|            Name|ICAO|
+----------------+----+
|          21 Air| CSB|
| 25only Aviation| ONY|
| 2Excel Aviation| BRO|
|     40-Mile Air| MLA|
|748 Air Services| IHO|
+----------------+----+
only showing top 5 rows



In [78]:
df51 = spark.read.parquet("../data/Flights.parquet", columns=['company'])
df51 = df51.join(df_airlines, df_airlines.ICAO == df51.company, "inner").\
    select("Name","company")
df51 = df51.groupBy("Name").agg({"Name": "count"}).withColumnRenamed("count(Name)","number active planes").sort(desc("number active planes"))
df51.show(5)

+------------------+--------------------+
|              Name|number active planes|
+------------------+--------------------+
| American Airlines|                 638|
|   Delta Air Lines|                 538|
|   United Airlines|                 488|
|Southwest Airlines|                 384|
|             FedEx|                 240|
+------------------+--------------------+
only showing top 5 rows



# Q5.2

In [79]:
df52 = spark.read.parquet("../data/Flights.parquet").select("org_iata","model")
df52 = df52.join(df_airports.select("iata","Continent"), df_airports.iata == df52.org_iata, "inner")\
    .drop("org_iata","iata")\
    .groupBy("Continent","model")\
    .agg({"model":"count"})\

df52.show(5)

+------------+-----+------------+
|   Continent|model|count(model)|
+------------+-----+------------+
|      europe| A359|          50|
|      europe| A343|          10|
|      europe| LJ55|           1|
|southamerica| A321|          28|
|     oceania| AT76|           2|
+------------+-----+------------+
only showing top 5 rows



In [80]:
window52 = Window.partitionBy(["Continent"]).orderBy(col("count(model)").desc(),col("model").desc())
df52 = df52.na.drop("any")
df52 = df52.withColumn("rank", F.dense_rank().over(window52)).filter(col("rank") == 1)
df52.groupBy("Continent").agg(F.collect_list(F.concat_ws(" : ", "model", "count(model)")).alias("Models")).show(truncate=False)

+-------------+------------+
|Continent    |Models      |
+-------------+------------+
|africa       |[B738 : 104]|
|asia         |[A20N : 209]|
|europe       |[B738 : 381]|
|northamerica |[B738 : 428]|
|northatlantic|[A20N : 3]  |
|oceania      |[A320 : 115]|
|southamerica |[A320 : 67] |
+-------------+------------+



# Q6

In [81]:
df6 = spark.read.parquet("../data/Flights.parquet").select("company","model")
df6 = df6.join(df_airline_countries, df_airline_countries.ICAO == df6.company, "inner").drop("ICAO","company")\
    .groupBy("Country","model").agg({"model": "count"})

window6 = Window.partitionBy(["Country"]).orderBy(F.col("count(model)").desc())

df6.withColumn("rank", F.dense_rank().over(window6)).filter(col("rank") == 1)\
    .groupBy("Country").agg(F.collect_list(F.concat_ws(" : ", "model", "count(model)")).alias("Models"))\
    .orderBy(F.col("Country").desc())\
    .show(n=5, truncate=False)

+-------------+------------------------------+
|Country      |Models                        |
+-------------+------------------------------+
|Yemen        |[A320 : 1]                    |
|Vietnam      |[A321 : 30]                   |
|Venezuela    |[MD83 : 1, B733 : 1, A346 : 1]|
|Uzbekistan   |[A320 : 5]                    |
|United States|[B738 : 369]                  |
+-------------+------------------------------+
only showing top 5 rows



# Q7.1

In [82]:
df71 = spark.read.parquet("../data/Flights.parquet").select("dest_iata")

window7 = Window.partitionBy("Continent").orderBy(F.col("count(name)").desc())

df71.join(df_airports.select("name","iata","Continent"), df_airports.iata == df71.dest_iata, "inner")\
    .drop("dest_iata","iata")\
    .dropna()\
    .groupBy("Continent","name").agg({"name":"count"})\
    .withColumn("rank", F.dense_rank().over(window7)).filter(F.col("rank") == 1)\
    .drop("rank")\
    .orderBy(F.col("count(name)").desc())\
    .show(truncate=False)


+-------------+-----------------------------------------+-----------+
|Continent    |name                                     |count(name)|
+-------------+-----------------------------------------+-----------+
|northamerica |Dallas Fort Worth International Airport  |178        |
|asia         |Dubai International Airport              |137        |
|europe       |London Heathrow Airport                  |105        |
|oceania      |Singapore Changi Airport                 |76         |
|africa       |Tel Aviv Ben Gurion International Airport|41         |
|southamerica |Sao Paulo Guarulhos International Airport|35         |
|northatlantic|Keflavik International Airport           |13         |
|atlantic     |Corvo Airport                            |1          |
+-------------+-----------------------------------------+-----------+



# Q7.2

In [83]:
df72 = spark.read.parquet("../data/Flights.parquet").select("dest_iata","org_iata")

df72_in = df72.join(df_airports.select("name","iata","Continent"), df_airports.iata == df72.dest_iata, "inner")\
    .drop("dest_iata","iata")\
    .withColumnRenamed("Continent","dest_Continent")\
    .withColumnRenamed("name","dest_name")\
    .dropna()\
    .groupBy("dest_Continent","dest_name").agg({"dest_name":"count"})


df72_out = df72.join(df_airports.select("name","iata","Continent"), df_airports.iata == df72.org_iata, "inner")\
    .drop("dest_iata","iata")\
    .withColumnRenamed("Continent","org_Continent")\
    .withColumnRenamed("name","org_name")\
    .dropna()\
    .groupBy("org_Continent","org_name").agg({"org_name":"count"})


In [84]:
df72 = df72_out.join(df72_in, df72_in.dest_name == df72_out.org_name, "inner")\
    .drop("dest_Continent","dest_name")\
    .withColumnRenamed("org_Continent", "continent")\
    .withColumnRenamed("org_name","Name")\


df72.show(5)

+------------+--------------------+---------------+----------------+
|   continent|                Name|count(org_name)|count(dest_name)|
+------------+--------------------+---------------+----------------+
|northamerica|       Boise Airport|             10|               4|
|        asia|Changsha Huanghua...|              6|               2|
|northamerica|White Plains West...|             13|               8|
|northamerica|Del Rio Internati...|              1|               1|
|        asia|Dushanbe Internat...|              5|               4|
+------------+--------------------+---------------+----------------+
only showing top 5 rows



In [121]:
def rowdiff(row):
    row_dict = row.asDict()
    diff = row_dict["count(org_name)"] - row_dict["count(dest_name)"]
    row_dict['abs(bound)'] =  abs(diff)
    row_dict['bound'] = diff
    newrow = Row(**row_dict)
    return newrow

In [123]:
window72 = Window.partitionBy("Continent").orderBy(F.col("abs(bound)").desc())

df72_rdd = df72.rdd
df72_rdd_new = df72_rdd.map(lambda row : rowdiff(row))
df72NewDF = spark.createDataFrame(df72_rdd_new).select("Continent","name","bound","abs(bound)")\
    .withColumn("rank", F.dense_rank().over(window72)).filter(F.col("rank") == 1)\
    .drop("rank","abs(bound)")\
    .show()

+-------------+--------------------+-----+
|    Continent|                name|bound|
+-------------+--------------------+-----+
|       africa|Casablanca Mohamm...|   11|
|         asia|Seoul Incheon Int...|   81|
|       europe|Paris Charles de ...|   31|
|       europe|Amsterdam Schipho...|   31|
| northamerica|Dallas Fort Worth...| -116|
|northatlantic|Keflavik Internat...|   -5|
|      oceania|Singapore Changi ...|  -12|
| southamerica|Brasilia Internat...|   24|
+-------------+--------------------+-----+



# Q8

In [98]:
df8 = spark.read.parquet("../data/Flights.parquet").select("speed","org_iata")

df8.join(df_airports.select("iata","Continent"),df8.org_iata == df_airports.iata, "inner")\
    .drop("iata","org_iata")\
    .dropna()\
    .groupBy("continent")\
    .agg({"speed": "mean"})\
    .orderBy(F.col("avg(speed)").desc())\
    .show()


+-------------+------------------+
|    continent|        avg(speed)|
+-------------+------------------+
|      oceania| 386.4922178988327|
|         asia| 386.3145743145743|
|northatlantic|             376.5|
|       africa|370.23126338329763|
|       europe| 352.9645685702376|
| southamerica| 346.4194373401535|
| northamerica| 292.2076986984215|
+-------------+------------------+

