In [1]:
import findspark
findspark.init()
import pyspark as ps
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from pyspark.sql import SparkSession
import pyspark.pandas as pd
from pyspark.sql.functions import isnan, when, count, col



In [2]:
spark = SparkSession.builder.appName('Task_2_Big_Data').getOrCreate()

In [13]:
# 1.
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
#https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.map.html?highlight=rdd%20map#pyspark.RDD.map
#https://www.educba.com/pyspark-map/
# Load data into DataFrames
schema = StructType([
    StructField("Name", StringType(), True),
    StructField("IATA", StringType(), True),
    StructField("Latitude", DoubleType(), True),
    StructField("Longitude", DoubleType(), True),
])

dfAir = spark.read.csv("Top30_airports_LatLong.csv", schema=schema, header=False)

passenger_schema = StructType([
    StructField("Passenger_id", StringType(), True),
    StructField("Flight_id", StringType(), True),
    StructField("From_airport", StringType(), True),
    StructField("Destination_airport", StringType(), True),
    StructField("Departure_time", DoubleType(), True),
    StructField("Total_flight_time", DoubleType(), True)
])

dfPass = spark.read.csv("AComp_Passenger_data_no_error.csv", schema=passenger_schema, header=False)

# Step 1: Extract airports from passenger data
from_airports_rdd = dfPass.select("From_airport").rdd.map(lambda x: (x[0], 1))

# Step 2: Count the number of flights from each airport     | add y to every element in the map to create key value pair
flight_counts_rdd = from_airports_rdd.reduceByKey(lambda x, y: x + y)

# Step 3: Extract all airports from dfAir
all_airports_rdd = dfAir.select("IATA").rdd.map(lambda x: (x[0], 0))

# Step 4: Identify airports that are not used
unused_airports_rdd = all_airports_rdd.subtractByKey(flight_counts_rdd)

# Step 5: Collect results
flight_counts = flight_counts_rdd.collect()
unused_airports = unused_airports_rdd.collect()

# Display the results
print("Number of Flights from Each Airport:")
for airport, count in flight_counts:
    print(f"{airport}: {count} flights")

print("\nAirports Not Used:")
for airport, _ in unused_airports:
    print(airport)

Number of Flights from Each Airport:
DEN: 46 flights
JFK: 25 flights
ORD: 33 flights
KUL: 33 flights
MAD: 13 flights
LHR: 25 flights
CGK: 27 flights
MUC: 14 flights
AMS: 15 flights
DFW: 11 flights
MIA: 11 flights
CDG: 21 flights
CAN: 37 flights
IAH: 37 flights
LAS: 17 flights
CLT: 21 flights
ATL: 36 flights
PVG: 20 flights
FCO: 15 flights
BKK: 17 flights
PEK: 13 flights
HND: 13 flights

Airports Not Used:
LAX
HKG
SIN
SFO
PHX
FRA
DXB
IST


In [15]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
#not unqiue passengers wrong output and can't scale to more columns without error

# MapReduce
def mapper(row):
    # Extract Flight_id and create a tuple with Passenger_id for counting
    return (row.Flight_id, row.Passenger_id)

def reducer(count, _):
    # Count the number of passengers for each Flight_id
    return count + 1

# Step 1: Map Flight_id and Passenger_id
flight_passenger_rdd = dfPass.rdd.map(mapper)

# Step 2: Reduce to count the number of passengers for each Flight_id
flight_passenger_count_rdd = flight_passenger_rdd.aggregateByKey(0, reducer, lambda a, b: a + b)

# Step 3: Convert to DataFrame for better visualization
flight_passenger_count_df = flight_passenger_count_rdd.toDF(["Flight_id", "NumOfPassengers"])

# Display the results
flight_passenger_count_df.show()

+---------+---------------+
|Flight_id|NumOfPassengers|
+---------+---------------+
| SQU6245R|             21|
| XXQ4064B|             25|
| SOH3431A|             18|
| PME8178S|             18|
| MBA8071P|             16|
| MOO1786A|             13|
| HUR0974O|              7|
| GMO5938W|             25|
| DAU2617A|             12|
| RUM0422W|             14|
| ATT7791R|             15|
| WPW9201U|             11|
| DKZ3042O|             11|
| QHU1140O|             21|
| ULZ8130D|             27|
| VYU9214I|             15|
| HZT2506M|             14|
| EWH6301Y|             10|
| VYW5940P|             17|
| WSK1289Z|             21|
+---------+---------------+
only showing top 20 rows



In [14]:
import pandas as pd
cols = ['Passenger_id','Flight_id','From_IATA/FAA_Code','To_IATA/FAA_Code','Departure_time','Total_flight_time']
df = pd.read_csv("AComp_Passenger_data_no_error.csv", encoding='latin1', names=cols)

# Assuming 'Departure_time' column contains Unix epoch time
df['Departure_time'] = pd.to_datetime(df['Departure_time'], unit='s')

# Format the datetime as HH:MM
df['Departure_time'] = df['Departure_time'].dt.strftime('%H:%M')

dataPassID = df['Passenger_id'].tolist()
dataFlightID = df['Flight_id'].tolist()
dataFromIATA = df['From_IATA/FAA_Code'].tolist()
dataToIATA = df['To_IATA/FAA_Code'].tolist()
dataDeparture = df['Departure_time'].tolist()
dataFlightTime = df['Total_flight_time'].tolist()

First20Entries = dataFlightID[:20]

for i in dataDeparture:
    print(i)

17:14
17:05
17:00
17:00
17:13
17:04
16:56
17:15
17:11
17:05
17:23
16:58
17:13
17:21
17:05
17:15
17:05
16:58
17:04
17:05
17:14
17:23
17:11
17:18
17:14
17:12
16:58
17:22
17:00
17:05
17:26
17:14
17:18
16:59
17:05
17:26
17:25
17:14
17:14
17:11
17:18
17:26
16:59
17:00
17:16
17:14
17:23
17:11
17:16
17:18
17:04
17:26
17:05
17:04
17:13
17:23
17:05
17:16
17:13
16:56
17:14
17:11
17:00
17:28
17:13
17:14
17:28
16:59
17:05
17:14
17:28
17:14
17:23
17:16
16:59
17:00
17:05
16:59
17:26
17:13
17:00
17:23
17:04
17:23
17:05
16:59
17:23
17:26
17:26
17:26
17:04
17:23
17:23
16:58
17:07
17:18
17:14
17:23
17:14
17:25
17:18
17:11
17:18
17:26
17:05
17:05
17:14
17:26
17:23
17:18
17:16
17:11
17:05
17:12
17:18
17:11
16:56
17:07
17:18
17:05
16:59
17:26
17:18
17:26
17:14
17:11
17:12
17:21
17:00
17:14
17:07
17:23
17:25
17:07
17:13
17:15
17:15
17:15
16:56
17:11
17:25
17:23
17:23
17:26
17:04
17:05
17:13
17:23
17:11
17:11
17:18
17:28
17:11
16:59
17:14
17:23
17:25
17:05
17:14
17:05
16:56
17:05
17:13
17:00
17:13
17:13
17:1

In [5]:
from functools import reduce

def map_flight_passenger(x):
    return (dataFlightID[x[0]], dataPassID[x[0]])

# Map function to create a key-value pair for each flight_id and passenger_id
flight_passenger_mapping = list(map(map_flight_passenger, enumerate(dataPassID)))

# Map function to create a key-value pair for each flight_id and count each passenger occurrence
passenger_count_mapping = list(map(lambda x: (x[0], {x[1]: 1}), flight_passenger_mapping))

# Reduce function to merge the counts for each flight_id
def reduce_passenger_counts(acc, val):
    flight_id, count_dict = val
    if flight_id in acc:
        acc[flight_id].update(count_dict)
    else:
        acc[flight_id] = count_dict
    return acc

passenger_counts = reduce(reduce_passenger_counts, passenger_count_mapping, {})

# Add relevant IATA/FAA codes, departure times, and calculate arrival times to the final result
final_result = []

for flight_id, counts in passenger_counts.items():
    num_passengers = len(counts)
    iata_faa_code = dataFromIATA[dataFlightID.index(flight_id)]
    departure_time = dataDeparture[dataFlightID.index(flight_id)]
    flight_duration = dataFlightTime[dataFlightID.index(flight_id)]
    
    # Calculate arrival time by adding flight duration to departure time
    departure_hour, departure_minute = map(int, departure_time.split(":"))
    arrival_hour = (departure_hour + (departure_minute + flight_duration) // 60) % 24
    arrival_minute = (departure_minute + flight_duration) % 60
    
    arrival_time = f"{arrival_hour:02d}:{arrival_minute:02d}"
    
    final_result.append({
        'Flight ID': flight_id,
        'Num of Passengers': num_passengers,
        'IATA/FAA Code': iata_faa_code,
        'Departure Time': departure_time,
        'Arrival Time': arrival_time
    })

# Print the result
for entry in final_result:
    print(entry)

{'Flight ID': 'SQU6245R', 'Num of Passengers': 17, 'IATA/FAA Code': 'DEN', 'Departure Time': '17:14', 'Arrival Time': '10:43'}
{'Flight ID': 'XXQ4064B', 'Num of Passengers': 19, 'IATA/FAA Code': 'JFK', 'Departure Time': '17:05', 'Arrival Time': '06:27'}
{'Flight ID': 'SOH3431A', 'Num of Passengers': 15, 'IATA/FAA Code': 'ORD', 'Departure Time': '17:00', 'Arrival Time': '21:10'}
{'Flight ID': 'PME8178S', 'Num of Passengers': 15, 'IATA/FAA Code': 'DEN', 'Departure Time': '17:13', 'Arrival Time': '15:15'}
{'Flight ID': 'MBA8071P', 'Num of Passengers': 13, 'IATA/FAA Code': 'KUL', 'Departure Time': '17:04', 'Arrival Time': '02:36'}
{'Flight ID': 'MOO1786A', 'Num of Passengers': 11, 'IATA/FAA Code': 'MAD', 'Departure Time': '16:56', 'Arrival Time': '20:00'}
{'Flight ID': 'HUR0974O', 'Num of Passengers': 6, 'IATA/FAA Code': 'DEN', 'Departure Time': '17:15', 'Arrival Time': '16:33'}
{'Flight ID': 'GMO5938W', 'Num of Passengers': 17, 'IATA/FAA Code': 'LHR', 'Departure Time': '17:11', 'Arrival T

In [7]:
cols = ['Airport_Name','IATA/FAA_code','Latitude','Longitude']
df = pd.read_csv("Top30_airports_LatLong.csv", encoding='latin1', names=cols)

dataName = df['Airport_Name'].tolist()
dataIATA = df['IATA/FAA_code'].tolist()
dataLat = df['Latitude'].tolist()
dataLong = df['Longitude'].tolist()

First20Entries = dataLat[:20]

for i in dataLong:
    print(i)

-84.428067
116.584556
-0.461389
-87.904842
139.779694
-118.408075
2.55
-97.037997
8.543125
113.914603
-104.673178
55.364444
106.655897
4.763889
-3.566764
100.747283
-73.778925
103.994433
113.298786
-115.15225
121.805214
-122.374889
-112.011583
-95.341442
-80.943139
-80.290556
11.786086
101.709917
12.250797
28.814606


In [9]:
from math import radians, sin, cos, sqrt, atan2

def haversine(lat1, lon1, lat2, lon2):
    # Calculate the Haversine distance between two points (latitude, longitude)
    R = 6371.0  # Earth radius in kilometers
    dlat = radians(lat2 - lat1)
    dlon = radians(lon2 - lon1)
    a = sin(dlat / 2)**2 + cos(radians(lat1)) * cos(radians(lat2)) * sin(dlon / 2)**2
    c = 2 * atan2(sqrt(a), sqrt(1 - a))
    distance = R * c
    return distance

def map_flight_passenger(x):
    return (dataFlightID[x[0]], dataPassID[x[0]], dataFromIATA[x[0]], dataToIATA[x[0]])

# Map function to create a key-value pair for each flight_id and passenger_id
flight_passenger_mapping = list(map(map_flight_passenger, enumerate(dataPassID)))

# Map function to create a key-value pair for each flight_id, passenger_id, and route
route_mapping = list(map(lambda x: (x[0], {'Passenger': x[1], 'Route': (x[2], x[3])}), flight_passenger_mapping))

# Reduce function to merge the routes for each flight_id
def reduce_routes(acc, val):
    flight_id, route_dict = val
    if flight_id in acc:
        acc[flight_id].append(route_dict)
    else:
        acc[flight_id] = [route_dict]
    return acc

flight_routes = reduce(reduce_routes, route_mapping, {})

# Calculate the line-of-sight (nautical) miles for each flight and the total traveled by each passenger using The Haversine formula
# https://en.wikipedia.org/wiki/Haversine_formula
# https://stackoverflow.com/questions/4913349/haversine-formula-in-python-bearing-and-distance-between-two-gps-points
# https://stackoverflow.com/a/29958276/23133052
passenger_total_miles = {}

for flight_id, routes in flight_routes.items():
    flight_distance = 0.0
    for route in routes:
        passenger_id = route['Passenger']
        route_start = route['Route'][0]
        route_end = route['Route'][1]
        
        # Find the coordinates for the start and end points
        start_index = dataIATA.index(route_start)
        end_index = dataIATA.index(route_end)
        start_lat, start_lon = dataLat[start_index], dataLong[start_index]
        end_lat, end_lon = dataLat[end_index], dataLong[end_index]

        # Calculate the line-of-sight distance in nautical miles
        distance = haversine(start_lat, start_lon, end_lat, end_lon)
        flight_distance += distance

        # Update the total line-of-sight miles for each passenger
        if passenger_id in passenger_total_miles:
            passenger_total_miles[passenger_id] += distance
        else:
            passenger_total_miles[passenger_id] = distance

    # Print the line-of-sight (nautical) miles for each flight
    print(f"Flight ID: {flight_id}, Line-of-Sight Miles: {flight_distance}")

# Find the passenger with the highest total line-of-sight miles
max_passenger = max(passenger_total_miles, key=passenger_total_miles.get)
max_miles = passenger_total_miles[max_passenger]

# Output the result
print("\nTotal traveled by each passenger:")
for passenger_id, total_miles in passenger_total_miles.items():
    print(f"Passenger ID: {passenger_id}, Total Miles: {total_miles}")

print(f"\nPassenger with the highest air miles: {max_passenger}")
print(f"Total line-of-sight miles: {max_miles}")

Flight ID: SQU6245R, Line-of-Sight Miles: 169844.315896926
Flight ID: XXQ4064B, Line-of-Sight Miles: 154699.3861003547
Flight ID: SOH3431A, Line-of-Sight Miles: 34740.68969019944
Flight ID: PME8178S, Line-of-Sight Miles: 183444.36430246444
Flight ID: MBA8071P, Line-of-Sight Miles: 70615.40629548024
Flight ID: MOO1786A, Line-of-Sight Miles: 18439.341726921484
Flight ID: HUR0974O, Line-of-Sight Miles: 75458.90532375847
Flight ID: GMO5938W, Line-of-Sight Miles: 203817.6704249488
Flight ID: DAU2617A, Line-of-Sight Miles: 167534.9001166846
Flight ID: RUM0422W, Line-of-Sight Miles: 20939.20386172111
Flight ID: ATT7791R, Line-of-Sight Miles: 115838.95010673656
Flight ID: WPW9201U, Line-of-Sight Miles: 123142.77622605299
Flight ID: DKZ3042O, Line-of-Sight Miles: 45690.64203505539
Flight ID: QHU1140O, Line-of-Sight Miles: 183475.11463577006
Flight ID: ULZ8130D, Line-of-Sight Miles: 350394.7439754354
Flight ID: VYU9214I, Line-of-Sight Miles: 174607.9915952478
Flight ID: HZT2506M, Line-of-Sight M

In [9]:
spark.stop()

In [11]:
print(flight_routes)

{'SQU6245R': [{'Passenger': 'UES9151GS5', 'Route': ('DEN', 'FRA')}, {'Passenger': 'UES9151GS5', 'Route': ('DEN', 'FRA')}, {'Passenger': 'JBE2302VO4', 'Route': ('DEN', 'FRA')}, {'Passenger': 'SJD8775RZ4', 'Route': ('DEN', 'FRA')}, {'Passenger': 'HCA3158QA6', 'Route': ('DEN', 'FRA')}, {'Passenger': 'XFG5747ZT9', 'Route': ('DEN', 'FRA')}, {'Passenger': 'PIT2755XC1', 'Route': ('DEN', 'FRA')}, {'Passenger': 'CYJ0225CH1', 'Route': ('DEN', 'FRA')}, {'Passenger': 'MXU9187YC7', 'Route': ('DEN', 'FRA')}, {'Passenger': 'HGO4350KK1', 'Route': ('DEN', 'FRA')}, {'Passenger': 'HCA3158QA6', 'Route': ('DEN', 'FRA')}, {'Passenger': 'HCA3158QA6', 'Route': ('DEN', 'FRA')}, {'Passenger': 'PUD8209OG3', 'Route': ('DEN', 'FRA')}, {'Passenger': 'XFG5747ZT9', 'Route': ('DEN', 'FRA')}, {'Passenger': 'WBE6935NU3', 'Route': ('DEN', 'FRA')}, {'Passenger': 'IEG9308EA5', 'Route': ('DEN', 'FRA')}, {'Passenger': 'LLZ3798PE3', 'Route': ('DEN', 'FRA')}, {'Passenger': 'YMH6360YP0', 'Route': ('DEN', 'FRA')}, {'Passenger': 