## Steven Miller
### DSC 650 Winter 2019
### 2019-12-18

#### Exercise 4.2: Joining Datasets and Performing Aggregations on Grouped Data

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.appName('Exercise').getOrCreate()

In [2]:
df_flights = spark.read.parquet('domestic-flights/flights.parquet')
df_airport_codes = spark.read.load('airport-codes/airport-codes.csv', format='csv', sep=',', inferSchema=True, header=True)
df_flights.printSchema()
df_airport_codes.printSchema()

root
 |-- origin_airport_code: string (nullable = true)
 |-- destination_airport_code: string (nullable = true)
 |-- origin_city: string (nullable = true)
 |-- destination_city: string (nullable = true)
 |-- passengers: long (nullable = true)
 |-- seats: long (nullable = true)
 |-- flights: long (nullable = true)
 |-- distance: double (nullable = true)
 |-- origin_population: long (nullable = true)
 |-- destination_population: long (nullable = true)
 |-- flight_year: long (nullable = true)
 |-- flight_month: long (nullable = true)
 |-- __index_level_0__: long (nullable = true)

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: double (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- l

**a. Join the Data**

Join the flight data to airport codes data by matching the IATA code of the originating flight to the IATA code in the airport codes file. Note that the airport codes file may not contain IATA codes for all of the origin and destination flights in the flight data. We still want information on those flights even if we cannot match it to a value in the airport codes file. This means you will want to use a left join instead of the default inner join.

Print the schema of the joined dataframe.

In [3]:
joined_df = df_flights.join(df_airport_codes, df_flights.origin_airport_code==df_airport_codes.iata_code)
joined_df.printSchema()

root
 |-- origin_airport_code: string (nullable = true)
 |-- destination_airport_code: string (nullable = true)
 |-- origin_city: string (nullable = true)
 |-- destination_city: string (nullable = true)
 |-- passengers: long (nullable = true)
 |-- seats: long (nullable = true)
 |-- flights: long (nullable = true)
 |-- distance: double (nullable = true)
 |-- origin_population: long (nullable = true)
 |-- destination_population: long (nullable = true)
 |-- flight_year: long (nullable = true)
 |-- flight_month: long (nullable = true)
 |-- __index_level_0__: long (nullable = true)
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: double (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_c

**b. Rename and Remove Columns**

Next, we want to rename some of the joined columns and remove unneeded columns. Remove the following columns from the joined dataframe.

* \_\_index_level_0\_\_
* ident
* local_code
* continent
* iso_country
* iata_code

Rename the following columns.

* type: origin_airport_type
* name: origin_airport_name
* elevation_ft: origin_airport_elevation_ft
* iso_region: origin_airport_region
* municipality: origin_airport_municipality
* gps_code: origin_airport_gps_code
* coordinates: origin_airport_coordinates

In [4]:
joined_df = joined_df.drop(*['__index_level_0__', 'ident', 'local_code', 'continent', 'iso_country', 'iata_code'])

In [5]:
joined_df = (joined_df
             .withColumnRenamed('type','origin_airport_type')
             .withColumnRenamed('name', 'origin_airport_name')
             .withColumnRenamed('elevation_ft', 'origin_airport_elevation_ft')
             .withColumnRenamed('iso_region', 'origin_airport_region')
             .withColumnRenamed('municipality', 'origin_airport_municipality')
             .withColumnRenamed('gps_code', 'origin_airport_gps_code')
             .withColumnRenamed('coordinates', 'origin_airport_coordinates'))

In [6]:
joined_df.printSchema()

root
 |-- origin_airport_code: string (nullable = true)
 |-- destination_airport_code: string (nullable = true)
 |-- origin_city: string (nullable = true)
 |-- destination_city: string (nullable = true)
 |-- passengers: long (nullable = true)
 |-- seats: long (nullable = true)
 |-- flights: long (nullable = true)
 |-- distance: double (nullable = true)
 |-- origin_population: long (nullable = true)
 |-- destination_population: long (nullable = true)
 |-- flight_year: long (nullable = true)
 |-- flight_month: long (nullable = true)
 |-- origin_airport_type: string (nullable = true)
 |-- origin_airport_name: string (nullable = true)
 |-- origin_airport_elevation_ft: double (nullable = true)
 |-- origin_airport_region: string (nullable = true)
 |-- origin_airport_municipality: string (nullable = true)
 |-- origin_airport_gps_code: string (nullable = true)
 |-- origin_airport_coordinates: string (nullable = true)



**c. Join to Destination Airport**

Repeat parts a and b joining the airport codes file to the destination airport instead of the origin airport. Drop the same columns and rename the same columns using the prefix destination_airport_ instead of origin_airport_. Print the schema of the resultant dataframe. The final schema and dataframe should contain the added information (name, region, coordinate, …) for the destination and origin airports.

In [7]:
joined_df = joined_df.join(df_airport_codes, df_flights.destination_airport_code==df_airport_codes.iata_code)
joined_df = joined_df.drop(*['__index_level_0__', 'ident', 'local_code', 'continent', 'iso_country', 'iata_code'])
joined_df = (joined_df
             .withColumnRenamed('type','destination_airport_type')
             .withColumnRenamed('name', 'destination_airport_name')
             .withColumnRenamed('elevation_ft', 'destination_airport_elevation_ft')
             .withColumnRenamed('iso_region', 'destination_airport_region')
             .withColumnRenamed('municipality', 'destination_airport_municipality')
             .withColumnRenamed('gps_code', 'destination_airport_gps_code')
             .withColumnRenamed('coordinates', 'destination_airport_coordinates'))
joined_df.printSchema()

root
 |-- origin_airport_code: string (nullable = true)
 |-- destination_airport_code: string (nullable = true)
 |-- origin_city: string (nullable = true)
 |-- destination_city: string (nullable = true)
 |-- passengers: long (nullable = true)
 |-- seats: long (nullable = true)
 |-- flights: long (nullable = true)
 |-- distance: double (nullable = true)
 |-- origin_population: long (nullable = true)
 |-- destination_population: long (nullable = true)
 |-- flight_year: long (nullable = true)
 |-- flight_month: long (nullable = true)
 |-- origin_airport_type: string (nullable = true)
 |-- origin_airport_name: string (nullable = true)
 |-- origin_airport_elevation_ft: double (nullable = true)
 |-- origin_airport_region: string (nullable = true)
 |-- origin_airport_municipality: string (nullable = true)
 |-- origin_airport_gps_code: string (nullable = true)
 |-- origin_airport_coordinates: string (nullable = true)
 |-- destination_airport_type: string (nullable = true)
 |-- destination_airp

**d. Top Ten Airports**

Create a dataframe using only data from 2008. This dataframe will be a report of the top ten airports by the number of inbound passengers. This dataframe should contain the following fields:

* Rank (1-10)
* Name
* IATA code
* Total Inbound Passengers
* Total Inbound Flights
* Average Daily Passengers
* Average Inbound Flights

Show the results of this dataframe using the show method.

In [8]:
top_10 = joined_df\
.where("flight_year = 2008")\
.groupBy("destination_airport_name", "destination_airport_code")\
.agg(sum("passengers").alias('Total Inbound Passengers'), sum("flights").alias('Total Inbound Flights'),\
     expr('round(sum(passengers)/366,2)').alias('Average Daily Passengers'), expr('round(sum(flights)/366,2)').alias('Average Inbound Flights'))\
.orderBy(desc('Total Inbound Passengers'))\
.show(10)

+------------------------+------------------------+------------------------+---------------------+------------------------+-----------------------+
|destination_airport_name|destination_airport_code|Total Inbound Passengers|Total Inbound Flights|Average Daily Passengers|Average Inbound Flights|
+------------------------+------------------------+------------------------+---------------------+------------------------+-----------------------+
|    Hartsfield Jackso...|                     ATL|                35452700|               392691|                 96865.3|                1072.93|
|    Chicago O'Hare In...|                     ORD|                26398793|               356570|                72127.85|                 974.23|
|    Dallas Fort Worth...|                     DFW|                22883558|               270243|                62523.38|                 738.37|
|    Los Angeles Inter...|                     LAX|                19741782|               215000|              

**e. User Defined Functions**

The latitude and longitude coordinates for the destination and origin airports are string values and not numeric. You will create a user-defined function in Python that will convert the string coordinates into numeric coordinates.

Add new columns for destination_airport_longitude, destination_airport_latitude, origin_airport_longitude, and origin_airport_latitude.

In [9]:
from pyspark.sql.functions import udf

@udf('double')
def get_latitude(coordinates):
    split_coords = coordinates.split(',')
    if len(split_coords) != 2:
        return None

    return float(split_coords[0].strip())


@udf('double')
def get_longitude(coordinates):
    split_coords = coordinates.split(',')
    if len(split_coords) != 2:
        return None

    return float(split_coords[1].strip())

joined_df = joined_df.withColumn(
  'destination_airport_longitude',
  get_longitude(joined_df['destination_airport_coordinates'])
).withColumn(
  'destination_airport_latitude',
  get_latitude(joined_df['destination_airport_coordinates'])
).withColumn(
  'origin_airport_longitude',
  get_longitude(joined_df['origin_airport_coordinates'])
).withColumn(
  'origin_airport_latitude',
  get_latitude(joined_df['origin_airport_coordinates'])
)

In [10]:
joined_df.printSchema()

root
 |-- origin_airport_code: string (nullable = true)
 |-- destination_airport_code: string (nullable = true)
 |-- origin_city: string (nullable = true)
 |-- destination_city: string (nullable = true)
 |-- passengers: long (nullable = true)
 |-- seats: long (nullable = true)
 |-- flights: long (nullable = true)
 |-- distance: double (nullable = true)
 |-- origin_population: long (nullable = true)
 |-- destination_population: long (nullable = true)
 |-- flight_year: long (nullable = true)
 |-- flight_month: long (nullable = true)
 |-- origin_airport_type: string (nullable = true)
 |-- origin_airport_name: string (nullable = true)
 |-- origin_airport_elevation_ft: double (nullable = true)
 |-- origin_airport_region: string (nullable = true)
 |-- origin_airport_municipality: string (nullable = true)
 |-- origin_airport_gps_code: string (nullable = true)
 |-- origin_airport_coordinates: string (nullable = true)
 |-- destination_airport_type: string (nullable = true)
 |-- destination_airp