# **Required Dependencies**
JDK Version 21.0.5: https://www.oracle.com/java/technologies/downloads/#jdk21-windows

This is required to have the Reduces working as otherwise a Py4JJavaError is thrown.
https://docs.oracle.com/en/java/javase/21/install/overview-jdk-installation.html

Task 2

In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from functools import reduce
from collections import Counter
import pandas as pd

def load_dataset(filename, columnNames):
    df = pd.read_csv(filename,encoding='latin1',names=columnNames) #need to define encoding or it all breaks. As any other type of encoding breaks everything after
    return df

def get_instances(data): #get each individual item
    instances = data.split()
    return Counter(instances)

def ReduceCounter(counter1, counter2): #count how many times a value appears
    counter1.update(counter2)
    return counter1

conf = SparkConf().setAppName('MapReduce').setMaster('local')
sparkContext = SparkContext.getOrCreate(conf=conf)
spark = SparkSession.builder.appName("AllowConversion").getOrCreate() #this is used to convert from a panda dataframe to a pyspark dataframe

columns = ['passengerID', 'flightID', 'originAirport', 'destinationAirport', 'departureTime', 'flightTime']

rdd = load_dataset("AComp_Passenger_data_no_error.csv", columns)

Origins = rdd['originAirport'].tolist()
distributed_data_origins = sparkContext.parallelize(Origins, 10)

dist_data_flight_origins = distributed_data_origins.map(get_instances)
dist_data_flight_origins_count = dist_data_flight_origins.reduce(ReduceCounter)
print(dist_data_flight_origins_count)


25/01/07 16:17:18 WARN Utils: Your hostname, codespaces-91b6d6 resolves to a loopback address: 127.0.0.1; using 10.0.0.167 instead (on interface eth0)
25/01/07 16:17:18 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/07 16:17:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

Counter({'DEN': 46, 'CAN': 37, 'IAH': 37, 'ATL': 36, 'ORD': 33, 'KUL': 33, 'CGK': 27, 'JFK': 25, 'LHR': 25, 'CDG': 21, 'CLT': 21, 'PVG': 20, 'LAS': 17, 'BKK': 17, 'AMS': 15, 'FCO': 15, 'MUC': 14, 'MAD': 13, 'PEK': 13, 'HND': 13, 'DFW': 11, 'MIA': 11})


                                                                                

In [3]:
#Flight ID, Count of Passengers, Departure code, Departure Time, Arrival Code, Arrival Time
from pytz import timezone
from datetime import datetime, timedelta

def GetFlightInfo(acc, record):
    flightID = record['flightID']

    if flightID not in acc:
        acc[flightID] = record
    return acc

airportTimezones = {
    "ATL": "America/New_York",
    "PEK": "Asia/Shanghai",
    "LHR": "Europe/London",
    "ORD": "America/Chicago",
    "HND": "Asia/Tokyo",
    "LAX": "America/Los_Angeles",
    "CDG": "Europe/Paris",
    "DFW": "America/Chicago",
    "FRA": "Europe/Berlin",
    "HKG": "Asia/Hong_Kong",
    "DEN": "America/Denver",
    "DXB": "Asia/Dubai",
    "CGK": "Asia/Jakarta",
    "AMS": "Europe/Amsterdam",
    "MAD": "Europe/Madrid",
    "BKK": "Asia/Bangkok",
    "JFK": "America/New_York",
    "SIN": "Asia/Singapore",
    "CAN": "Asia/Shanghai",
    "LAS": "America/Los_Angeles",
    "PVG": "Asia/Shanghai",
    "SFO": "America/Los_Angeles",
    "PHX": "America/Phoenix",
    "IAH": "America/Chicago",
    "CLT": "America/New_York",
    "MIA": "America/New_York",
    "MUC": "Europe/Berlin",
    "KUL": "Asia/Kuala_Lumpur",
    "FCO": "Europe/Rome",
    "IST": "Europe/Istanbul"
}

def CalculateTimes(acc, record):
    flightID, originAirport, departureTime, destinationAirport, flightTime = record #get parts of the record that are needed

    depRealTime = datetime.fromtimestamp(departureTime, timezone(airportTimezones[originAirport])) #apply timezone to origin
    flightDelta = timedelta(minutes=flightTime) #work out the minutes from the flight time

    arrivalTime = depRealTime + flightDelta #calculate arrival time
    arrivalTime = arrivalTime.astimezone(timezone(airportTimezones[destinationAirport])) #apply destination timezone
    arrivalTimeFormat = arrivalTime.strftime('%H:%M') #format time
    depTimeFormat = depRealTime.strftime('%H:%M')

    acc.append({
        "flightID": flightID,
        "departureTime": depTimeFormat,
        "arrivalTime": arrivalTimeFormat
    })

    return acc

Flights = rdd['flightID'].tolist()
distributed_data_flights = sparkContext.parallelize(Flights, 10)

dist_data_flight_id = distributed_data_flights.map(get_instances)
dist_data_flight_id_count = dist_data_flight_id.reduce(ReduceCounter)

dist_data_flight_id_list = list(dist_data_flight_id_count.keys())

#Getting data about each flight
FirstFlightInfo = reduce(GetFlightInfo, [record for _, record in rdd.iterrows()], {})
flightIDs = [record['flightID'] for record in FirstFlightInfo.values()]
originAirports = [record['originAirport'] for record in FirstFlightInfo.values()]
departureTimes = [record['departureTime'] for record in FirstFlightInfo.values()]
destinationAirports = [record['destinationAirport'] for record in FirstFlightInfo.values()]
flightTimes = [record['flightTime'] for record in FirstFlightInfo.values()]
FlightInfo = pd.DataFrame({
    'flightID': flightIDs,
    'originAirport': originAirports,
    'departureTime': departureTimes,
    'destinationAirport': destinationAirports,
    'flightTime': flightTimes
})

#get the flightID and number of Passengers 
FlightData = pd.DataFrame({
    'flightID': list(dist_data_flight_id_count.keys()),
    'passengerCount': list(dist_data_flight_id_count.values())
})

df = pd.merge(FlightInfo, FlightData, on='flightID', how='inner')

FlightInfoTuples = FlightInfo[['flightID', 'originAirport', 'departureTime', 'destinationAirport', 'flightTime']].itertuples(index=False) #Convert to tuples so can use reduce

CalcTime = reduce(CalculateTimes, FlightInfoTuples, []) #use a reduce with tuples input and the function CalculateTimes
CalcTimeFI = [flight['flightID'] for flight in CalcTime]
CalcTimeDT = [flight['departureTime'] for flight in CalcTime]
CalcTimeAT = [flight['arrivalTime'] for flight in CalcTime]

CalcArrTime = pd.DataFrame({
    'flightID': CalcTimeFI,
    'departureTime': CalcTimeDT,
    'arrivalTime': CalcTimeAT
})

df = pd.merge(df, CalcArrTime, on='flightID', how='inner', suffixes=('_orig', '_new'))
df['departureTime_orig'] = df['departureTime_new']
df = df.drop(columns=['departureTime_new'])
df = df.rename(columns={"departureTime_orig": "departureTime"})

print(df)



    flightID originAirport departureTime destinationAirport  flightTime  \
0   SQU6245R           DEN         10:14                FRA        1049   
1   XXQ4064B           JFK         12:05                FRA         802   
2   SOH3431A           ORD         11:00                MIA         250   
3   PME8178S           DEN         10:13                PEK        1322   
4   MBA8071P           KUL         01:04                PEK         572   
5   MOO1786A           MAD         17:56                FRA         184   
6   HUR0974O           DEN         10:15                PVG        1398   
7   GMO5938W           LHR         17:11                PEK        1057   
8   DAU2617A           CGK         00:23                SFO        1811   
9   RUM0422W           MUC         17:58                MAD         194   
10  ATT7791R           AMS         18:13                DEN        1001   
11  WPW9201U           DFW         11:21                PEK        1452   
12  DKZ3042O           MI

                                                                                

In [4]:
from math import radians, sin, cos, sqrt, atan2

AirportData = load_dataset("Top30_airports_LatLong.csv", ["airportName", "IATA", "Latitude", "Longitude"])

def search_airport(data, IATACode):
    SpecificAirport = data[data["IATA"] == IATACode]
    LatLong = [SpecificAirport["Latitude"], SpecificAirport["Longitude"]]
    return LatLong

def HaversineEquation(Coordinate1, Coordinate2):

    R = 3440 #radius of the earth in nautical miles

    lat1, long1 = Coordinate1
    lat2, long2 = Coordinate2

    lat1, long1, lat2, long2 = map(radians, [lat1, long1, lat2, long2])

    dlat = lat2-lat1
    dlon = long2-long1

    a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
    c = 2 * atan2(sqrt(a), sqrt(1-a))

    return R*c

def calculate_distance(pair, data):
    origin, destination = pair
    originCoordinates = search_airport(data, origin)
    destinationCoordinates = search_airport(data, destination)
    return HaversineEquation(originCoordinates, destinationCoordinates)

def get_nautical_miles(flightID, data):
    flight = data[data["flightID"] == flightID]
    if not flight.empty:
        return flight.iloc[0]["NauticalMilesDistance"]
    return None


try:
    df["NauticalMilesDistance"]
except:
    flightPairs = df[["originAirport", "destinationAirport"]].apply(tuple, axis=1)
    df["NauticalMilesDistance"] = flightPairs.map(lambda pair: calculate_distance(pair, AirportData))

try:
    rdd["NauticalMilesDistance"]
except:
    rdd["NauticalMilesDistance"] = rdd["flightID"].map(lambda flightID: get_nautical_miles(flightID, df))


PySdf = spark.createDataFrame(rdd)
rdd_data = PySdf.rdd

PassengerData = rdd_data.map(lambda row: (row['passengerID'], row['NauticalMilesDistance']))

PassengerMilesRDD = PassengerData.reduceByKey(lambda a, b: a+b)
PassengerMiles = PassengerMilesRDD.collect()

print(PassengerMiles)

  lat1, long1, lat2, long2 = map(radians, [lat1, long1, lat2, long2])
[Stage 2:>                                                          (0 + 1) / 1]

[('UES9151GS5', 131876.15681874048), ('EZC9678QI6', 89317.87776669356), ('ONL0812DH1', 54225.17817167289), ('CYJ0225CH1', 54191.15665514621), ('POP2875LH3', 81030.07032011656), ('WTC9125IE5', 59609.36777422628), ('EDV2089LK5', 70429.4403096723), ('HCA3158QA6', 96991.76744551698), ('YMH6360YP0', 76256.3973170801), ('PUD8209OG3', 115810.46489350776), ('PAJ3974RK1', 34228.59581918167), ('WYU2010YH8', 96734.03355935254), ('JJM4724RF7', 93040.90133887033), ('MXU9187YC7', 61039.091952056115), ('HGO4350KK1', 81794.4554696843), ('BWI0520BG6', 124734.89630077957), ('JBE2302VO4', 69001.14576768805), ('DAZ3029XA0', 123080.59781081433), ('PIT2755XC1', 36075.8225107675), ('CKZ3132BR4', 92726.83904712513), ('CXN7304ER2', 78742.65394686518), ('WBE6935NU3', 99157.38607622252), ('IEG9308EA5', 42014.84457478233), ('SJD8775RZ4', 67449.5753585286), ('CDC0302NN5', 63111.6867363678), ('KKP5277HZ7', 58554.810132513354), ('SPR4484HA6', 122255.78873353885), ('VZY2993ME1', 73688.80433818845), ('LLZ3798PE3', 840

                                                                                

25/01/07 16:17:31 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
