# Introduction

This project has been created using the "Flight Radar API" from "Rapid API" (https://rapidapi.com/apidojo/api/flight-radar1), who tracks the informations form FlightRadar24 (https://www.flightradar24.com/).

The aim of this project is to showcase how PySpark can be used to manage data.

# Importing libraries

In [1]:
!pip install pyspark



In [2]:
from pyspark.sql import SparkSession, Row, Window
from pyspark.sql.types import *
from pyspark.sql.functions import col, row_number, when, desc, from_unixtime, length, substring

import pandas
import csv
import requests
import json
import time

In [3]:
from google.colab import drive
drive.mount('/content/drive')

folder = "/content/drive/MyDrive/portfolio/spark_flight_data/"

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [4]:
with open(folder + "headers.json", "r") as f:
  headers = json.load(f)

# Creating Spark Session

In [5]:
spark = SparkSession\
        .builder\
        .appName("flight_data")\
        .getOrCreate()

# Creating function to fetch data stored as list

In [6]:
def getDataList(file_path, url, headers):
  """
  Get data from API for airlines/list, airports/list and aircrafts/list.

  Argument:
    file_path: str
      If file exist, it will be used as data source. If not, path file to store
      data is created.
    url: str
      url of the API
    headers: dict
      Identification data to use the API

  Return:
    List of dictionary containing the data requested
  """
  # Chek if data already available as a file. If not, fetch data from API.
  try:
    # Open file
    f = open(file_path)
    # Read file
    data = json.load(f)

  except:

    # Get response from API
    response = requests.get(url, headers=headers)

    # Save requested data in a new var
    data = response.json()['rows']

    # Save data in a new file
    with open(file_path, 'w') as f:
      json.dump(data, f)

  # Return data
  return data

# Airline information

## Importing JSON

In [7]:
# Name of file to use or create
airline_file = 'airline_data.json'

# Url of the API to call
airline_url = "https://flight-radar1.p.rapidapi.com/airlines/list"

# Fetching data from API
airline_data = getDataList(folder + airline_file, airline_url, headers=headers)

# Print data
airline_data[0:5]

[{'Name': '21 Air', 'Code': '2I', 'ICAO': 'CSB'},
 {'Name': '247 Aviation', 'Code': '', 'ICAO': 'EMC'},
 {'Name': '2Excel Aviation', 'Code': '', 'ICAO': 'BRO'},
 {'Name': '4 Airways', 'Code': '', 'ICAO': 'DAK'},
 {'Name': '40-Mile Air', 'Code': 'Q5', 'ICAO': 'MLA'}]

## Creating airline_dim

In [8]:
# Creating a RDD using airline_data
airline_rdd = spark.sparkContext.parallelize(airline_data)

# Convert RDD in DataFrame
airline_dim = spark.read.json(airline_rdd)

# Print
airline_dim.show()

+----+----+------------------+
|Code|ICAO|              Name|
+----+----+------------------+
|  2I| CSB|            21 Air|
|    | EMC|      247 Aviation|
|    | BRO|   2Excel Aviation|
|    | DAK|         4 Airways|
|  Q5| MLA|       40-Mile Air|
|  FE| IHO|  748 Air Services|
|  AQ| JYH|             9 Air|
|  S5| NKP|        Abakan Air|
|    | ABP|          ABS Jets|
|    | BAR|Abu Dhabi Aviation|
|  GB| ABX|           ABX Air|
|    | SON|     Acass Ireland|
|  AN| WSN|      Advanced Air|
|  A3| AEE|   Aegean Airlines|
|  EI| EIN|        Aer Lingus|
|  EG| EUK|     Aer Lingus UK|
|  JK| ACL|         AerCaribe|
|  5E| BLK|              Aero|
|  N2| NIG|              Aero|
|    | AKF|        Aero Asahi|
+----+----+------------------+
only showing top 20 rows



# Airports information

In [9]:
# Name of file to use or create
airports_file = 'airports_data.json'

# Url of the API to call
airports_url = "https://flight-radar1.p.rapidapi.com/airports/list"

# Fetching data from API
airports_data = getDataList(folder + airports_file, airports_url, headers=headers)

# Print data
airports_data[0:5]

[{'id': 1900,
  'name': 'A Coruna Airport',
  'iata': 'LCG',
  'icao': 'LECO',
  'city': 'A Coruna',
  'lat': 43.302059,
  'lon': -8.37725,
  'country': 'Spain',
  'alt': 326,
  'size': 4686,
  'timezone': {'name': 'Europe/Madrid',
   'offset': 7200,
   'offsetHours': '2:00',
   'abbr': 'CEST',
   'abbrName': 'Central European Summer Time',
   'isDst': True},
  'countryId': 209},
 {'id': 3,
  'name': 'Aachen Merzbruck Airport',
  'iata': 'AAH',
  'icao': 'EDKA',
  'city': 'Aachen',
  'lat': 50.821899,
  'lon': 6.184759,
  'country': 'Germany',
  'alt': 626,
  'size': 1491,
  'timezone': {'name': 'Europe/Berlin',
   'offset': 7200,
   'offsetHours': '2:00',
   'abbr': 'CEST',
   'abbrName': 'Central European Summer Time',
   'isDst': True},
  'countryId': 83},
 {'id': 4,
  'name': 'Aalborg Airport',
  'iata': 'AAL',
  'icao': 'EKYT',
  'city': 'Aalborg',
  'lat': 57.095112,
  'lon': 9.855172,
  'country': 'Denmark',
  'alt': 3,
  'size': 8856,
  'timezone': {'name': 'Europe/Copenhagen',

In [10]:
# Saving timezones info in a Dictionary
timezones = {}

# Iterate over each Dict in the list
for airport in airports_data:

  # Get timezone's information for each airport
  timezone = airport["timezone"]

  # Get timezone name
  name = timezone["name"]

  # If timezone information not saved in timezones dict, add new item to the dict
  if name not in timezones.keys():
    timezones[name] = timezone

# Print a value from timezones dict to check it worked
timezones[ list(timezones.keys())[0] ]

{'name': 'Europe/Madrid',
 'offset': 7200,
 'offsetHours': '2:00',
 'abbr': 'CEST',
 'abbrName': 'Central European Summer Time',
 'isDst': True}

## Timezone information

### Saving timezone info in a DataFrame

In [11]:
# Converting input in a list of Rows
timezones_rows = [Row(**timezones[name]) for name in timezones.keys()]

# Printing data
timezones_rows[:5]

[Row(name='Europe/Madrid', offset=7200, offsetHours='2:00', abbr='CEST', abbrName='Central European Summer Time', isDst=True),
 Row(name='Europe/Berlin', offset=7200, offsetHours='2:00', abbr='CEST', abbrName='Central European Summer Time', isDst=True),
 Row(name='Europe/Copenhagen', offset=7200, offsetHours='2:00', abbr='CEST', abbrName='Central European Summer Time', isDst=True),
 Row(name='America/Godthab', offset=-3600, offsetHours='-1:00', abbr='-01', abbrName=None, isDst=True),
 Row(name='Asia/Tehran', offset=12600, offsetHours='3:30', abbr='+0330', abbrName=None, isDst=False)]

In [12]:
# Defining DataFrame schema
timezones_schema = StructType([
    StructField('timezoneName', StringType()),
    StructField('offset', IntegerType()),
    StructField('offsetHours', StringType()),
    StructField('abbr', StringType()),
    StructField('abbrName', StringType()),
    StructField('isDst', BooleanType())
])

# Creating DataFrame
timezones_df = spark.createDataFrame(timezones_rows, schema = timezones_schema)

# Printing data
timezones_df.show()

+-------------------+------+-----------+-----+--------------------+-----+
|       timezoneName|offset|offsetHours| abbr|            abbrName|isDst|
+-------------------+------+-----------+-----+--------------------+-----+
|      Europe/Madrid|  7200|       2:00| CEST|Central European ...| true|
|      Europe/Berlin|  7200|       2:00| CEST|Central European ...| true|
|  Europe/Copenhagen|  7200|       2:00| CEST|Central European ...| true|
|    America/Godthab| -3600|      -1:00|  -01|                NULL| true|
|        Asia/Tehran| 12600|       3:30|+0330|                NULL|false|
|   Asia/Krasnoyarsk| 25200|       7:00|  +07|                NULL|false|
|  America/Vancouver|-25200|      -7:00|  PDT|Pacific Daylight ...| true|
|     Pacific/Tarawa| 43200|      12:00|  +12|                NULL|false|
|      Europe/London|  3600|       1:00|  BST| British Summer Time| true|
|    America/Chicago|-18000|      -5:00|  CDT|Central Daylight ...| true|
|        Asia/Riyadh| 10800|       3:0

### Checking if the same 'abbr' can correspond to different 'abbrName'

In [13]:
# Creting a DataFrame contarining unique abbr-abbrName pairs
abbr_check = timezones_df \
                              .select(["abbr", "abbrName"]) \
                              .distinct()

# Select abbr that appears more than one time
abbr_check \
          .groupBy('abbr') \
          .count() \
          .sort("count", ascending=False) \
          .filter( col('count') > 1) \
          .show()

+----+-----+
|abbr|count|
+----+-----+
|WEST|    2|
| IST|    2|
|CEST|    2|
| CST|    2|
+----+-----+



In [14]:
# Check information about abbr that appears more than one time
abbr_check \
          .filter( abbr_check.abbr.isin('IST', 'CST', 'WEST', 'CEST') ) \
          .show(truncate = False)

+----+----------------------------+
|abbr|abbrName                    |
+----+----------------------------+
|CST |Central Standard Time       |
|IST |India Standard Time         |
|WEST|Western European Summer Time|
|CEST|Central European Summer Time|
|CST |China Standard Time         |
|IST |Irish Standard Time         |
|CEST|NULL                        |
|WEST|NULL                        |
+----+----------------------------+



### Updating DataFrame

In [15]:
timezones_df = timezones_df \
                            .withColumn( # Replace conflitting 'abbr' with new value
                                      'abbr',
                                        when( col('abbrName') == 'China Standard Time', 'ChiST') \
                                        .when( col('abbrName') == 'Irish Standard Time', 'IriST') \
                                        .otherwise(timezones_df.abbr)
                                        ). \
                            withColumn( # Fill missing 'abbrName' based on 'abbr'
                                      'abbrName',
                                      when( col('abbr') == 'CEST', 'Central European Summer Time' ) \
                                      .when( col('abbr') == 'WEST', 'Western European Summer Time') \
                                      .otherwise(timezones_df.abbrName)
                                        )

# Printing data
timezones_df.show()

+-------------------+------+-----------+-----+--------------------+-----+
|       timezoneName|offset|offsetHours| abbr|            abbrName|isDst|
+-------------------+------+-----------+-----+--------------------+-----+
|      Europe/Madrid|  7200|       2:00| CEST|Central European ...| true|
|      Europe/Berlin|  7200|       2:00| CEST|Central European ...| true|
|  Europe/Copenhagen|  7200|       2:00| CEST|Central European ...| true|
|    America/Godthab| -3600|      -1:00|  -01|                NULL| true|
|        Asia/Tehran| 12600|       3:30|+0330|                NULL|false|
|   Asia/Krasnoyarsk| 25200|       7:00|  +07|                NULL|false|
|  America/Vancouver|-25200|      -7:00|  PDT|Pacific Daylight ...| true|
|     Pacific/Tarawa| 43200|      12:00|  +12|                NULL|false|
|      Europe/London|  3600|       1:00|  BST| British Summer Time| true|
|    America/Chicago|-18000|      -5:00|  CDT|Central Daylight ...| true|
|        Asia/Riyadh| 10800|       3:0

### Checking if the same 'abbr' can have 'isDst' both as True and False
DST = Daylight SAving Time

In [16]:
# Selecting unique 'abbr' and 'isDst' pairs
abbr_check = timezones_df \
                              .select(["abbr", "isDst"]) \
                              .distinct()

#  Show 'abbr' that can have 'isDst' both True and False
abbr_check \
          .groupBy('abbr') \
          .count() \
          .filter( col('count') == 2) \
          .show()

# Since the same 'abbr' can have 'isDst' both True and False, 'isDst' must be
# associated with each timezoneName

+----+-----+
|abbr|count|
+----+-----+
| -01|    2|
| -02|    2|
+----+-----+



### Splitting DataFrame in different tables

In [17]:
# Printing columns list
timezones_df.columns

['timezoneName', 'offset', 'offsetHours', 'abbr', 'abbrName', 'isDst']

#### timezones_dim

In [18]:
# Main timezone's information
timezones_dim = timezones_df \
                            .withColumn( # Create id for each timezone
                                'timezoneId', row_number().over( Window.orderBy('offset')) ) \
                            .select( # Select columns
                                ['timezoneId', 'timezoneName', 'offset', 'abbr', 'isDst'])

# Show DataFrame
timezones_dim.show()

+----------+------------------+------+----+-----+
|timezoneId|      timezoneName|offset|abbr|isDst|
+----------+------------------+------+----+-----+
|         1|      Africa/Dakar|  NULL| GMT|false|
|         2|    Africa/Abidjan|  NULL| GMT|false|
|         3|      Africa/Accra|  NULL| GMT|false|
|         4|   Atlantic/Azores|  NULL| +00| true|
|         5|Atlantic/Reykjavik|  NULL| GMT|false|
|         6|    Africa/Conakry|  NULL| GMT|false|
|         7|     Africa/Bamako|  NULL| GMT|false|
|         8|               UTC|  NULL| UTC|false|
|         9| Africa/Nouakchott|  NULL| GMT|false|
|        10|Africa/Ouagadougou|  NULL| GMT|false|
|        11|   Africa/Freetown|  NULL| GMT|false|
|        12|       Africa/Lome|  NULL| GMT|false|
|        13|Atlantic/St_Helena|  NULL| GMT|false|
|        14|     Africa/Banjul|  NULL| GMT|false|
|        15|     Africa/Bissau|  NULL| GMT|false|
|        16|   Africa/Monrovia|  NULL| GMT|false|
|        17|   Africa/Sao_Tome|  NULL| GMT|false|


#### timezones_abbr_dim

In [19]:
# 'abbr' and 'abbrName' pairs
timezones_abbr_dim = timezones_df \
                                    .select(['abbr', 'abbrName']) \
                                    .distinct( # Select distinct pairs
                                              ) \
                                    .withColumn( # Create unique id for each abbr
                                        'abbrId', row_number().over( Window.orderBy('abbr'))) \
                                    .select(['abbrId', 'abbr', 'abbrName'])

timezones_abbr_dim.show()

+------+-----+--------+
|abbrId| abbr|abbrName|
+------+-----+--------+
|     1|  +00|    NULL|
|     2|  +01|    NULL|
|     3|  +03|    NULL|
|     4|+0330|    NULL|
|     5|  +04|    NULL|
|     6|+0430|    NULL|
|     7|  +05|    NULL|
|     8|+0530|    NULL|
|     9|+0545|    NULL|
|    10|  +06|    NULL|
|    11|+0630|    NULL|
|    12|  +07|    NULL|
|    13|  +08|    NULL|
|    14|+0845|    NULL|
|    15|  +09|    NULL|
|    16|  +10|    NULL|
|    17|+1030|    NULL|
|    18|  +11|    NULL|
|    19|  +12|    NULL|
|    20|+1245|    NULL|
+------+-----+--------+
only showing top 20 rows



In [20]:
# Replacing 'abbr' with 'abbrId' in timezone_dim
timezones_dim = timezones_dim \
                                .join(
                                    timezones_abbr_dim,
                                    timezones_dim.abbr == timezones_abbr_dim.abbr,
                                    'left'
                                    ) \
                                .select(['timezoneId', 'timezoneName', 'offset', 'isDst', 'abbrId'])

timezones_dim.show()

+----------+------------------+------+-----+------+
|timezoneId|      timezoneName|offset|isDst|abbrId|
+----------+------------------+------+-----+------+
|         1|      Africa/Dakar|  NULL|false|    52|
|         2|    Africa/Abidjan|  NULL|false|    52|
|         3|      Africa/Accra|  NULL|false|    52|
|         4|   Atlantic/Azores|  NULL| true|     1|
|         5|Atlantic/Reykjavik|  NULL|false|    52|
|         6|    Africa/Conakry|  NULL|false|    52|
|         7|     Africa/Bamako|  NULL|false|    52|
|         8|               UTC|  NULL|false|    71|
|         9| Africa/Nouakchott|  NULL|false|    52|
|        10|Africa/Ouagadougou|  NULL|false|    52|
|        11|   Africa/Freetown|  NULL|false|    52|
|        12|       Africa/Lome|  NULL|false|    52|
|        13|Atlantic/St_Helena|  NULL|false|    52|
|        14|     Africa/Banjul|  NULL|false|    52|
|        15|     Africa/Bissau|  NULL|false|    52|
|        16|   Africa/Monrovia|  NULL|false|    52|
|        17|

## Geographic information

### Creating countries_dim table

In [21]:
# Getting set of unique countries name
countries_set = { airport['country'] for airport in airports_data}

In [22]:
countries_dim = spark \
                    .createDataFrame( # Convert set in DataFrame
                        countries_set, StringType()) \
                    .withColumnRenamed( # Rename col 'value'  in 'countryName'
                        'value', 'country') \
                    .withColumn( # Add unique id
                        'countryId', row_number().over( Window.orderBy('country')))

countries_dim.show()

+-------------------+---------+
|            country|countryId|
+-------------------+---------+
|        Afghanistan|        1|
|            Albania|        2|
|            Algeria|        3|
|     American Samoa|        4|
|             Angola|        5|
|           Anguilla|        6|
|         Antarctica|        7|
|Antigua And Barbuda|        8|
|          Argentina|        9|
|            Armenia|       10|
|              Aruba|       11|
|          Australia|       12|
|            Austria|       13|
|         Azerbaijan|       14|
|            Bahamas|       15|
|            Bahrain|       16|
|         Bangladesh|       17|
|           Barbados|       18|
|            Belarus|       19|
|            Belgium|       20|
+-------------------+---------+
only showing top 20 rows



### Creating cities_dim table

In [23]:
# Create set of unique city-country pairs
cities_set = { (airport['city'], airport['country']) for airport in airports_data}

In [24]:
# Converting set in DataFrame
cities_dim = spark \
              .createDataFrame(cities_set, schema = ["city", "country"]) \
              .withColumn( # Create unique id
                  'cityId', row_number().over( Window.orderBy('city')))

cities_dim.show()

+---------------+--------------------+------+
|           city|             country|cityId|
+---------------+--------------------+------+
|       A Coruna|               Spain|     1|
|         Aachen|             Germany|     2|
|        Aalborg|             Denmark|     3|
|         Aarhus|             Denmark|     4|
|        Aasiaat|           Greenland|     5|
|         Abadan|                Iran|     6|
|         Abakan|              Russia|     7|
|     Abbotsford|              Canada|     8|
|  Abemama Atoll|            Kiribati|     9|
|       Aberdeen|      United Kingdom|    10|
|       Aberdeen|       United States|    11|
|           Abha|        Saudi Arabia|    12|
|        Abidjan|         Ivory Coast|    13|
|        Abilene|       United States|    14|
|       Abingdon|       United States|    15|
|      Abu Dhabi|United Arab Emirates|    16|
|Abu Musa Island|                Iran|    17|
|     Abu Simbel|               Egypt|    18|
|          Abuja|             Nige

In [25]:
# Replacing 'country' with 'countryId'
cities_dim = cities_dim \
                      .join(
                          countries_dim,
                          'country',
                          'left'
                      ) \
                      .select( ['cityId', 'city', 'countryId'])

cities_dim.show()

+------+---------------+---------+
|cityId|           city|countryId|
+------+---------------+---------+
|     7|         Abakan|      170|
|     9|  Abemama Atoll|      109|
|     2|         Aachen|       76|
|    13|        Abidjan|      102|
|    21|          Accra|       77|
|    11|       Aberdeen|      216|
|    14|        Abilene|      216|
|    15|       Abingdon|      216|
|    19|          Abuja|      151|
|     1|       A Coruna|      192|
|     3|        Aalborg|       54|
|     4|         Aarhus|       54|
|     6|         Abadan|       96|
|    17|Abu Musa Island|       96|
|    20|       Acapulco|      133|
|    12|           Abha|      179|
|    16|      Abu Dhabi|      214|
|     8|     Abbotsford|       35|
|     5|        Aasiaat|       80|
|    18|     Abu Simbel|       59|
+------+---------------+---------+
only showing top 20 rows



### Creating airports_dim



In [26]:
def airport_typecaster(key, value):

  # Keep only the name of the timezone
  if key == 'timezone':
    return value['name']

  # Some 'lat' e 'lon' values are encoded as Float and some as Int,
  # so we have to convert all of them to Float
  if key in ('lat', 'lon'):
    return float(value)

  # Some values are encoded as String, so they have to be encoded as Int
  if key == 'alt':
    return int(value)

  return value

In [27]:
airports_col_to_remove = ["country"]

airports_data_typecasted = [
                            {key: airport_typecaster(key, value) for key, value in airport.items() if key not in airports_col_to_remove }
                             for airport in airports_data
                            ]

In [28]:
airport_rows = [Row(**airport) for airport in airports_data_typecasted]

airport_rows[:4]

[Row(id=1900, name='A Coruna Airport', iata='LCG', icao='LECO', city='A Coruna', lat=43.302059, lon=-8.37725, alt=326, size=4686, timezone='Europe/Madrid', countryId=209),
 Row(id=3, name='Aachen Merzbruck Airport', iata='AAH', icao='EDKA', city='Aachen', lat=50.821899, lon=6.184759, alt=626, size=1491, timezone='Europe/Berlin', countryId=83),
 Row(id=4, name='Aalborg Airport', iata='AAL', icao='EKYT', city='Aalborg', lat=57.095112, lon=9.855172, alt=3, size=8856, timezone='Europe/Copenhagen', countryId=61),
 Row(id=9, name='Aarhus Airport', iata='AAR', icao='EKAH', city='Aarhus', lat=56.303295, lon=10.619129, alt=71, size=3919, timezone='Europe/Copenhagen', countryId=61)]

In [29]:
airport_schema = StructType([
    StructField('id', IntegerType()),
    StructField('airportName', StringType()),
    StructField('iata', StringType()),
    StructField('icao', StringType()),
    StructField('city', StringType()),
    StructField('lat', DoubleType()),
    StructField('lon', DoubleType()),
    StructField('alt', IntegerType()),
    StructField('size', IntegerType()),
    StructField('timezoneName', StringType()),
    StructField('countryId', IntegerType()),
])

In [30]:
airport_df = spark.createDataFrame(airport_rows, airport_schema)

airport_df.show()

+----+--------------------+----+----+---------------+---------+-----------+----+-----+-----------------+---------+
|  id|         airportName|iata|icao|           city|      lat|        lon| alt| size|     timezoneName|countryId|
+----+--------------------+----+----+---------------+---------+-----------+----+-----+-----------------+---------+
|1900|    A Coruna Airport| LCG|LECO|       A Coruna|43.302059|   -8.37725| 326| 4686|    Europe/Madrid|      209|
|   3|Aachen Merzbruck ...| AAH|EDKA|         Aachen|50.821899|   6.184759| 626| 1491|    Europe/Berlin|       83|
|   4|     Aalborg Airport| AAL|EKYT|        Aalborg|57.095112|   9.855172|   3| 8856|Europe/Copenhagen|       61|
|   9|      Aarhus Airport| AAR|EKAH|         Aarhus|56.303295|  10.619129|  71| 3919|Europe/Copenhagen|       61|
|7569|  Aarhus Sea Airport| QEA|EKAC|         Aarhus|56.151993|  10.247725|   1|  139|Europe/Copenhagen|       61|
|1596|     Aasiaat Airport| JEG|BGAA|        Aasiaat| 68.72184| -52.784698|  74|

In [31]:
cities_dim.show()

+------+---------------+---------+
|cityId|           city|countryId|
+------+---------------+---------+
|     7|         Abakan|      170|
|     9|  Abemama Atoll|      109|
|     2|         Aachen|       76|
|    13|        Abidjan|      102|
|    21|          Accra|       77|
|    11|       Aberdeen|      216|
|    14|        Abilene|      216|
|    15|       Abingdon|      216|
|    19|          Abuja|      151|
|     1|       A Coruna|      192|
|     3|        Aalborg|       54|
|     4|         Aarhus|       54|
|     6|         Abadan|       96|
|    17|Abu Musa Island|       96|
|    20|       Acapulco|      133|
|    12|           Abha|      179|
|    16|      Abu Dhabi|      214|
|     8|     Abbotsford|       35|
|     5|        Aasiaat|       80|
|    18|     Abu Simbel|       59|
+------+---------------+---------+
only showing top 20 rows



### Creating airport_dim

In [32]:
airport_df.show()

+----+--------------------+----+----+---------------+---------+-----------+----+-----+-----------------+---------+
|  id|         airportName|iata|icao|           city|      lat|        lon| alt| size|     timezoneName|countryId|
+----+--------------------+----+----+---------------+---------+-----------+----+-----+-----------------+---------+
|1900|    A Coruna Airport| LCG|LECO|       A Coruna|43.302059|   -8.37725| 326| 4686|    Europe/Madrid|      209|
|   3|Aachen Merzbruck ...| AAH|EDKA|         Aachen|50.821899|   6.184759| 626| 1491|    Europe/Berlin|       83|
|   4|     Aalborg Airport| AAL|EKYT|        Aalborg|57.095112|   9.855172|   3| 8856|Europe/Copenhagen|       61|
|   9|      Aarhus Airport| AAR|EKAH|         Aarhus|56.303295|  10.619129|  71| 3919|Europe/Copenhagen|       61|
|7569|  Aarhus Sea Airport| QEA|EKAC|         Aarhus|56.151993|  10.247725|   1|  139|Europe/Copenhagen|       61|
|1596|     Aasiaat Airport| JEG|BGAA|        Aasiaat| 68.72184| -52.784698|  74|

In [33]:
airport_dim = airport_df \
                            .join( # Replace 'timezone' with 'timezoneId'
                                timezones_dim,
                                'timezoneName',
                                'left'
                              ) \
                            .join( # Replace 'city' with 'cityId'
                                cities_dim,
                                airport_df.city == cities_dim.city,
                                'left'
                              ) \
                            .select(
                                [ 'id', 'airportName', 'iata', 'icao', 'lat', 'lon', 'size', 'cityId', 'timezoneId' ]
                            )

airport_dim.show()

+----+--------------------+----+----+---------+-----------+-----+------+----------+
|  id|         airportName|iata|icao|      lat|        lon| size|cityId|timezoneId|
+----+--------------------+----+----+---------+-----------+-----+------+----------+
|1596|     Aasiaat Airport| JEG|BGAA| 68.72184| -52.784698|  574|     5|       158|
|  11|Abakan Internatio...| ABA|UNAA|53.740002|  91.385002| 2146|     7|       308|
|4146|Abbotsford Intern...| YXX|CYXX|49.025269|-122.360001| 7188|     8|        32|
|  20|Aberdeen Regional...| ABR|KABR|45.439999| -98.419998| 3549|    11|        65|
|  20|Aberdeen Regional...| ABR|KABR|45.439999| -98.419998| 3549|    10|        65|
|  16|Abidjan Port Boue...| ABJ|DIAP| 5.261386|   -3.92629|11907|    13|         1|
|  15|Abilene Regional ...| ABI|KABI|32.411301| -99.681801| 5984|    14|        65|
|7403|Abingdon Virginia...| VJI|KVJI|36.686111| -82.033333| 1778|    15|       104|
| 251|Abu Dhabi Al Bate...| AZI|OMAD|24.428329|   54.45808| 1165|    16|    

# Aircraft information

## Importing JSON

In [34]:
aircraft_file = 'aircraft_data.json'
aircraft_url = "https://flight-radar1.p.rapidapi.com/aircrafts/list"

aircraft_data = getDataList(folder + aircraft_file, aircraft_url, headers=headers)

## Saving aircarft info in a DataFrame

In [35]:
aircraft_rows = []

for family in aircraft_data:
  for model in family['models']:
      model['family'] = family['description']
      aircraft_rows.append(Row(**model))

In [36]:
aircraft_df = spark.createDataFrame(aircraft_rows, schema = ['aircraftName', 'code', 'family'])

aircraft_df.show()

+----------------+----+------------------+
|    aircraftName|code|            family|
+----------------+----+------------------+
| Airbus A220-100|BCS1|Airbus A220 family|
| Airbus A220-300|BCS3|Airbus A220 family|
|     Airbus A300|A30B|Airbus A300 family|
| Airbus A300-600|A306|Airbus A300 family|
|     Airbus A310|A310|Airbus A310 family|
|     Airbus A318|A318|Airbus A320 family|
|     Airbus A319|A319|Airbus A320 family|
|  Airbus A319neo|A19N|Airbus A320 family|
|     Airbus A320|A320|Airbus A320 family|
|  Airbus A320neo|A20N|Airbus A320 family|
|     Airbus A321|A321|Airbus A320 family|
|  Airbus A321neo|A21N|Airbus A320 family|
| Airbus A330-200|A332|Airbus A330 family|
| Airbus A330-300|A333|Airbus A330 family|
| Airbus A330-900|A339|Airbus A330 family|
| Airbus A340-200|A342|Airbus A340 family|
| Airbus A340-300|A343|Airbus A340 family|
| Airbus A340-500|A345|Airbus A340 family|
| Airbus A340-600|A346|Airbus A340 family|
|Airbus A350-1000|A35K|Airbus A350 family|
+----------

In [37]:
aircraft_df.groupBy("family").count().sort("count", ascending=False).show()

+--------------------+-----+
|              family|count|
+--------------------+-----+
|   Boeing 737 family|   11|
|  Airbus A320 family|    7|
|   Boeing 747 family|    5|
|Bombardier CRJ fa...|    5|
|McDonnell Douglas...|    5|
|Embraer E-Jet family|    5|
|  Airbus A340 family|    4|
|   Boeing 777 family|    4|
|Bombardier Dash 8...|    4|
|  Airbus A330 family|    3|
|      Avro RJ family|    3|
|       ATR 42 family|    3|
|       ATR 72 family|    3|
|       Fokker family|    3|
|   Boeing 787 family|    3|
|   Boeing 767 family|    3|
|  Airbus A220 family|    2|
|  Airbus A350 family|    2|
|  Airbus A300 family|    2|
|   Boeing 757 family|    2|
+--------------------+-----+
only showing top 20 rows



## Creating aircraft_family_dim

In [38]:
aircraft_family_dim = aircraft_df \
                                  .select('family') \
                                  .distinct() \
                                  .withColumn('familyId', row_number().over( Window.orderBy('family') ))

aircraft_family_dim.show()

+--------------------+--------+
|              family|familyId|
+--------------------+--------+
|       ATR 42 family|       1|
|       ATR 72 family|       2|
|  Airbus A220 family|       3|
|  Airbus A300 family|       4|
|  Airbus A310 family|       5|
|  Airbus A320 family|       6|
|  Airbus A330 family|       7|
|  Airbus A340 family|       8|
|  Airbus A350 family|       9|
|     Airbus A380-800|      10|
|      Avro RJ family|      11|
|   Boeing 737 family|      12|
|   Boeing 747 family|      13|
|   Boeing 757 family|      14|
|   Boeing 767 family|      15|
|   Boeing 777 family|      16|
|   Boeing 787 family|      17|
|Bombardier CRJ fa...|      18|
|Bombardier Dash 8...|      19|
|Embraer E-Jet family|      20|
+--------------------+--------+
only showing top 20 rows



## Creating aircraft_dim

In [39]:
aircraft_dim = aircraft_df \
                          .join(
                              aircraft_family_dim,
                              'family',
                              'left'
                          ) \
                          .select( [ 'aircraftName', 'code', 'familyId' ] ) \

aircraft_dim.show()

+----------------+----+--------+
|    aircraftName|code|familyId|
+----------------+----+--------+
| Airbus A220-100|BCS1|       3|
| Airbus A220-300|BCS3|       3|
| Airbus A330-200|A332|       7|
| Airbus A330-300|A333|       7|
| Airbus A330-900|A339|       7|
| Airbus A340-200|A342|       8|
| Airbus A340-300|A343|       8|
| Airbus A340-500|A345|       8|
| Airbus A340-600|A346|       8|
|     Airbus A310|A310|       5|
|Airbus A350-1000|A35K|       9|
| Airbus A350-900|A359|       9|
|     Airbus A300|A30B|       4|
| Airbus A300-600|A306|       4|
|     Airbus A318|A318|       6|
|     Airbus A319|A319|       6|
|  Airbus A319neo|A19N|       6|
|     Airbus A320|A320|       6|
|  Airbus A320neo|A20N|       6|
|     Airbus A321|A321|       6|
+----------------+----+--------+
only showing top 20 rows



# Flights

##  Most tracked

### Importing JSON

In [40]:
def get_most_tracked_list(folder, url, headers):

    response = requests.get(url, headers=headers)

    print(response.json())

    data = response.json()["data"]

    file_name = folder + "flight" + str(response.json()["update_time"]) + ".json"

    with open(file_name, 'w') as f:
      json.dump(data, f)

    return(data)

In [41]:
flight_url = "https://flight-radar1.p.rapidapi.com/flights/list-most-tracked"
flight_folder = "/content/drive/MyDrive/portfolio/spark_flight_data/flight/"

flight_data = get_most_tracked_list(flight_folder, flight_url, headers)

{'version': '0.3.9', 'update_time': 1725226273.70263, 'data': [{'flight_id': '36e9de21', 'flight': 'EK20', 'callsign': 'UAE20', 'squawk': None, 'clicks': 580, 'from_iata': 'MAN', 'from_city': 'Manchester', 'to_iata': 'DXB', 'to_city': 'Dubai', 'model': 'A388', 'type': 'Airbus A380-842'}, {'flight_id': '36e9ba2d', 'flight': 'U21264', 'callsign': 'EZS61ZK', 'squawk': None, 'clicks': 532, 'from_iata': 'CPH', 'from_city': 'Copenhagen', 'to_iata': 'BSL', 'to_city': 'Basel', 'model': 'A320', 'type': 'Airbus A320-214'}, {'flight_id': '36e985fe', 'flight': 'GM613', 'callsign': 'GSW613', 'squawk': None, 'clicks': 420, 'from_iata': 'PRN', 'from_city': 'Pristina', 'to_iata': 'BSL', 'to_city': 'Basel', 'model': 'A320', 'type': 'Airbus A320-214'}, {'flight_id': '36e9e579', 'flight': 'LX92', 'callsign': 'SWR92', 'squawk': None, 'clicks': 396, 'from_iata': 'ZRH', 'from_city': 'Zurich', 'to_iata': 'GRU', 'to_city': 'Sao Paulo', 'model': 'B77W', 'type': 'Boeing 777-3DE(ER)'}, {'flight_id': '36e9ce0a', 

### Saving info in a DataFrame

In [42]:
# Converting list of Dictionaries in a list of Rows
fligth_rows = [Row(**flight) for flight in flight_data]

In [43]:
# Defining schema
flight_schema = StructType([
    StructField('flightId', StringType()),
    StructField('flight', StringType()),
    StructField('callsign', StringType()),
    StructField('squawk', StringType()),
    StructField('clicks', IntegerType()),
    StructField('from_iata', StringType()),
    StructField('from_city', StringType()),
    StructField('to_iata', StringType()),
    StructField('to_city', StringType()),
    StructField('model', StringType()),
    StructField('type', StringType())
])

In [44]:
flight_data = spark.read.json(flight_folder + "flight*.json")

flight_data.show()

+--------+------+------+---------+----------+---------+-----+------+------------+-------+--------------------+
|callsign|clicks|flight|flight_id| from_city|from_iata|model|squawk|     to_city|to_iata|                type|
+--------+------+------+---------+----------+---------+-----+------+------------+-------+--------------------+
|   UAE20|   580|  EK20| 36e9de21|Manchester|      MAN| A388|  NULL|       Dubai|    DXB|     Airbus A380-842|
| EZS61ZK|   532|U21264| 36e9ba2d|Copenhagen|      CPH| A320|  NULL|       Basel|    BSL|     Airbus A320-214|
|  GSW613|   420| GM613| 36e985fe|  Pristina|      PRN| A320|  NULL|       Basel|    BSL|     Airbus A320-214|
|   SWR92|   396|  LX92| 36e9e579|    Zurich|      ZRH| B77W|  NULL|   Sao Paulo|    GRU|  Boeing 777-3DE(ER)|
| EZS59AU|   324|U21056| 36e9ce0a|      Nice|      NCE| A320|  NULL|       Basel|    BSL|     Airbus A320-214|
|  SWR282|   320| LX282| 36e9e639|    Zurich|      ZRH| A343|  NULL|Johannesburg|    JNB|     Airbus A340-313|
|

In [45]:
flight_data.count()

150

In [46]:
fligth_df = flight_data \
                .select( # Removing 'from_city' and 'to_city' since can be retreved using 'from_iata' e 'to_iata'.
                         # Removing aircraft 'type' since can be retreaved using 'model'
                    [ 'flight_id', 'flight', 'callsign', 'squawk', 'clicks', 'from_iata', 'to_iata', 'model']
                ) \
                .withColumn( # COunt how many times the same id appears
                    "isDuplicate", row_number().over( Window.orderBy( desc('clicks') ).partitionBy("flight_id") )
                ) \
                .filter( # Drop all the outdated rows
                    col("isDuplicate") == 1
                ) \
                .drop( # Drop 'isDuplicate', since no longer needed
                    "isDuplicate"
                ) \
                .sort('clicks', ascending = False)

fligth_df.show()

+---------+-------+--------+------+------+---------+-------+-----+
|flight_id| flight|callsign|squawk|clicks|from_iata|to_iata|model|
+---------+-------+--------+------+------+---------+-------+-----+
| 36560a96|   NULL|   N827V|  7700|  6948|     NULL|   NULL| BE36|
| 36556bf9|   NULL| RRR7226|  NULL|  5200|     NULL|   NULL| R135|
| 3655eb88|   NULL|NIGHTW12|  NULL|  3124|     NULL|   NULL| EUFI|
| 3655e8bd|   NULL|NIGHTW11|  NULL|  2964|     NULL|   NULL| EUFI|
| 3653737f| TK6404| THY6404|  NULL|  1812|      FRA|    IST| B77L|
| 36510f76|   MR21|    MR21|  NULL|  1672|     NULL|   NULL|  F15|
| 36e9d5bf|  AF274|  AFR274|  NULL|  1492|      CDG|    HND| B77W|
| 3651e35c|   NULL|   LEE92|  NULL|  1472|      QKG|   NULL| HUNT|
| 3650dcff|ROF9142| ROF9142|  NULL|  1460|      OTP|    BCM| DRON|
| 364fbb07|   NULL|  SNBD01|  NULL|  1276|      DLH|   NULL| CL41|
| 3651aa8f|  CX458|  CPA458|  NULL|  1232|      HKG|    KHH| B773|
| 36522d1e|   NULL|    A124|  NULL|  1208|     NULL|   NULL| A

In [47]:
fligth_df.count()

114

## In boundary

### Importing JSON

In [48]:
in_boundary_url = "https://flight-radar1.p.rapidapi.com/flights/list-in-boundary"
in_bounadary_folder = "/content/drive/MyDrive/portfolio/spark_flight_data/flight_in_boundary/"

querystring = {"bl_lat":"10","bl_lng":"80","tr_lat":"15","tr_lng":"120","limit":"300"}

In [49]:
def get_in_boundary_data(folder, url, headers, params):

    # Fetch response from API
    response = requests.get(url, headers=headers, params=params)

    # Get data from JSON
    data = response.json()["aircraft"]

    # Create spark DataFrame
    df = spark \
            .createDataFrame(data) \
            .withColumnRenamed('_1', 'flight_id') \
            .withColumnRenamed('_2', 'mode_s') \
            .withColumnRenamed('_3', 'lat') \
            .withColumnRenamed('_4', 'lon') \
            .withColumnRenamed('_10', 'aircraf_model') \
            .withColumnRenamed('_11', 'aircraft_registration') \
            .withColumnRenamed('_12', 'departure_time') \
            .withColumnRenamed('_13', 'from_iata') \
            .withColumnRenamed('_14', 'to_iata') \
            .withColumnRenamed('_15', 'airline_IATA') \
            .withColumnRenamed('_18', 'airline_ICAO')

    # Crete name of the file to create using the current time
    file_name = folder + "flight_in_boundary_" + str(int(time.time())) + ".csv"

    # Save the DataFrame as CSV
    df.write.format('csv').option('header', 'true').save(file_name)

### Saving data in DataFrame

In [50]:
# API url
in_boundary_url = "https://flight-radar1.p.rapidapi.com/flights/list-in-boundary"

# Where to save CSV data
in_boundary_folder = "/content/drive/MyDrive/portfolio/spark_flight_data/flight_in_boundary/"

# Fetch data from API

get_in_boundary_data(
    url = in_boundary_url,
    folder = in_boundary_folder,
    headers = headers,
    params = querystring
    )

In [51]:
flight_in_boundary_schema = StructType([
    StructField('flight_id', StringType()),
    StructField('mode_s', StringType()),
    StructField('lat', FloatType()),
    StructField('lon', FloatType()),
    StructField('_5', StringType()),
    StructField('_6', StringType()),
    StructField('_7', StringType()),
    StructField('_8', StringType()),
    StructField('_9', StringType()),
    StructField('aircraft_model', StringType()),
    StructField('aircraft_registration', StringType()),
    StructField('departure_time', StringType()),
    StructField('from_iata', StringType()),
    StructField('to_iata', StringType()),
    StructField('airline_IATA', StringType()),
    StructField('_16', StringType()),
    StructField('_17', StringType()),
    StructField('airline_ICAO', StringType()),
    StructField('_19', StringType()),
])

In [52]:
flight_in_boundary_df = spark \
                            .read.csv(in_boundary_folder + "flight_in_boundary_*.csv", header = True, schema = flight_in_boundary_schema) \
                            .dropDuplicates()

flight_in_boundary_df.show()

+---------+------+-----+------+---+-----+---+----+-------+--------------+---------------------+--------------+---------+-------+------------+---+-----+------------+---+
|flight_id|mode_s|  lat|   lon| _5|   _6| _7|  _8|     _9|aircraft_model|aircraft_registration|departure_time|from_iata|to_iata|airline_IATA|_16|  _17|airline_ICAO|_19|
+---------+------+-----+------+---+-----+---+----+-------+--------------+---------------------+--------------+---------+-------+------------+---+-----+------------+---+
| 36e984a5|896211|13.69|100.74| 28|    0|  2|NULL|F-VTBS5|          B77W|               A6-EGP|    1725222894|      BKK|    DXB|       EK371|  1|    0|      UAE371|  0|
| 3655f133|8881B9|13.27|108.24|185|32000|459|NULL|F-VVPK1|          A321|              VN-A288|    1721998391|      DAD|    SGN|       VU675|  0|    0|      VAG675|  0|
| 3656095e|8991DD|13.93|101.75| 67|17950|416|NULL|F-VTBU1|          A339|              B-58302|    1721998392|      BKK|    TPE|       JX746|  0| 1280|    

In [53]:
flight_in_boundary_df.groupBy('flight_id').count().filter( col('count') > 1).show()

+---------+-----+
|flight_id|count|
+---------+-----+
| 36e92de9|    6|
| 36e9a57b|    3|
| 36e94460|    3|
| 36e8141c|    4|
| 3653c610|    2|
| 36e9b2aa|    3|
| 365226c8|    2|
| 36e96bb2|    4|
| 3653b2ea|    2|
| 36e80b0f|    3|
| 3652df6a|    2|
| 36e9a094|    2|
| 365344b8|    2|
| 36e8016d|    4|
| 36e7dcee|    3|
| 36533cf0|    2|
| 3652deaf|    2|
| 36e9b77e|    3|
| 365351f5|    2|
| 36e96bef|    3|
+---------+-----+
only showing top 20 rows



In [54]:
flight_in_boundary_df = flight_in_boundary_df \
                        .withColumn('departure_time', from_unixtime( col('departure_time') )) \
                        .withColumn('airline_ICAO_1', substring('airline_ICAO', 0, 3)) \
                        .withColumn('airline_ICAO_2', substring('airline_ICAO', 3, 5)) \
                        .drop('airline_ICAO') \
                        .withColumn('airline_IATA_1', substring('airline_IATA', 0, 2)) \
                        .withColumn('airline_IATA_2', substring('airline_IATA', 2, 5)) \
                        .drop('airline_IATA')

flight_in_boundary_df.show()

+---------+------+-----+------+---+-----+---+----+-------+--------------+---------------------+-------------------+---------+-------+---+-----+---+--------------+--------------+--------------+--------------+
|flight_id|mode_s|  lat|   lon| _5|   _6| _7|  _8|     _9|aircraft_model|aircraft_registration|     departure_time|from_iata|to_iata|_16|  _17|_19|airline_ICAO_1|airline_ICAO_2|airline_IATA_1|airline_IATA_2|
+---------+------+-----+------+---+-----+---+----+-------+--------------+---------------------+-------------------+---------+-------+---+-----+---+--------------+--------------+--------------+--------------+
| 36e984a5|896211|13.69|100.74| 28|    0|  2|NULL|F-VTBS5|          B77W|               A6-EGP|2024-09-01 20:34:54|      BKK|    DXB|  1|    0|  0|           UAE|          E371|            EK|          K371|
| 3655f133|8881B9|13.27|108.24|185|32000|459|NULL|F-VVPK1|          A321|              VN-A288|2024-07-26 12:53:11|      DAD|    SGN|  0|    0|  0|           VAG|      

### Testing columns

In [55]:
flight_in_boundary_df \
                  .join(
                      aircraft_dim,
                      flight_in_boundary_df.aircraft_model == aircraft_dim.code,
                      'left'
                  ) \
                  .select(
                      ['flight_id', 'aircraft_model', 'aircraftName']
                  ) \
                  .show()

+---------+--------------+----------------+
|flight_id|aircraft_model|    aircraftName|
+---------+--------------+----------------+
| 365344b8|          A21N|  Airbus A321neo|
| 36e92de9|          A21N|  Airbus A321neo|
| 3655c4c6|          A21N|  Airbus A321neo|
| 36558a1a|          A332| Airbus A330-200|
| 36550077|          C130|            NULL|
| 3655fa18|          A319|     Airbus A319|
| 365614a8|          A319|     Airbus A319|
| 36560cdb|          A320|     Airbus A320|
| 36e96ecb|          A320|     Airbus A320|
| 36e9c0b5|          A320|     Airbus A320|
| 3655f133|          A321|     Airbus A321|
| 3653c610|          A321|     Airbus A321|
| 3655cfd1|          A321|     Airbus A321|
| 365281d1|          B789|    Boeing 787-9|
| 365392f4|          A333| Airbus A330-300|
| 3656095e|          A339| Airbus A330-900|
| 36e984a5|          B77W|Boeing 777-300ER|
| 3652fb8b|          B77W|Boeing 777-300ER|
| 3655cbed|          B77W|Boeing 777-300ER|
| 36537d87|          A359| Airbu

In [56]:
flight_in_boundary_df \
                    .join(
                        airline_dim,
                        flight_in_boundary_df.airline_ICAO_1 == airline_dim.ICAO,
                        'left'
                    ) \
                    .select(
                        ['flight_id', 'airline_ICAO_1', 'Name']
                    ) \
                    .show()

+---------+--------------+--------------------+
|flight_id|airline_ICAO_1|                Name|
+---------+--------------+--------------------+
| 3655cbed|           ETD|      Etihad Airways|
| 3655fa18|           BKP|     Bangkok Airways|
| 365614a8|           BKP|     Bangkok Airways|
| 3656095e|           SJX|             Starlux|
| 3655c4c6|           PAL| Philippine Airlines|
| 365281d1|           ANA|  All Nippon Airways|
| 365344b8|           CSN|China Southern Ai...|
| 36560cdb|           THA|        Thai Airways|
| 36537d87|           CPA|      Cathay Pacific|
| 365392f4|           CPA|      Cathay Pacific|
| 36e9c0b5|           CEB|        Cebu Pacific|
| 36550077|           CNV|                NULL|
| 36e984a5|           UAE|            Emirates|
| 3652fb8b|           UAE|            Emirates|
| 3655f133|           VAG|  Vietravel Airlines|
| 3653c610|           CES|China Eastern Air...|
| 36558a1a|           CES|China Eastern Air...|
| 36e92de9|           VJC|         VietJ