In [59]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

In [60]:
import logging

# Set the logging level to ERROR
logging.getLogger("py4j").setLevel(logging.ERROR)

In [61]:
from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Preprocess") \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.pyspark.python", "/usr/local/bin/python3.9") \
    .config("spark.pyspark.driver.python", "/usr/local/bin/python3.9") \
    .getOrCreate()

# Read CSV into DataFrame
# prices = spark.read.csv("itineraries.csv", header=True, inferSchema=True)

In [62]:
!where python3.9

/Library/Frameworks/Python.framework/Versions/3.9/bin/python3.9
/usr/local/bin/python3.9


In [63]:
ls

Dig Bata - flightPrices 2.ipynb  [34mdelays_preprocessed.csv[m[m/
Dig Bata - flightPrices.ipynb    [34mdelays_preprocessed_updated[m[m/
Dig Bata 2.ipynb                 distanceMapping.json
Dig Bata 3.ipynb                 [34mflight_prices_processed_2[m[m/
Dig Bata Project.ipynb           itineraries.csv
Final Project.ipynb              [34mlinear_regression_model[m[m/
Weights  Calculation             nasDelay.json
airportCodeToNameMapping.json    [34mpipeline_model[m[m/
archive (1).zip                  [34mpipeline_model2[m[m/
archive.zip                      s3-access_accessKeys.csv
big-data-project.html            securityDelay.json
[34mdelays[m[m/                          test_prices.ipynb
[34mdelays_lr_model[m[m/                 timeDuration.json
[34mdelays_pipeline[m[m/                 weatherDelay.json
[34mdelays_preprocessed[m[m/


In [64]:
delays_preprocessed = spark.read.csv("delays_preprocessed_updated/*.csv", header=True, inferSchema=True)

                                                                                

In [65]:
delays_preprocessed.count()

                                                                                

28416515

In [66]:
delays_preprocessed.columns

['Origin',
 'Operating_Airline',
 'OriginCityName',
 'Dest',
 'DestCityName',
 'DepDelay',
 'DepDelayMinutes',
 'TaxiOut',
 'TaxiIn',
 'ArrDelay',
 'Distance',
 'CarrierDelay',
 'WeatherDelay',
 'NASDelay',
 'SecurityDelay',
 'LateAircraftDelay',
 'FlightWeek',
 'Year',
 'depDaypart',
 'arrDaypart']

In [67]:
# Create a new column by concatenating 'Origin' and 'Dest'
from pyspark.sql.functions import concat_ws

df_with_concatenated = delays_preprocessed.withColumn('Origin_Dest', concat_ws('_', 'OriginCityName', 'DestCityName'))

# Convert the DataFrame to an RDD and collect it as a dictionary
distance_dict = df_with_concatenated.select('Origin_Dest', 'Distance').distinct().rdd.collectAsMap()

# If you want to use this dictionary in Python code, convert the keys to strings
distance_dict = {str(k): v for k, v in distance_dict.items()}
distance_dict

                                                                                

{'Nashville, TN_Seattle, WA': 1978.0,
 'Reno, NV_Santa Ana, CA': 415.0,
 'St. Petersburg, FL_Pittsburgh, PA': 878.0,
 'Columbus, OH_Valparaiso, FL': 675.0,
 'Los Angeles, CA_Springfield, MO': 1423.0,
 'Sanford, FL_Shreveport, LA': 791.0,
 'Norfolk, VA_Cincinnati, OH': 485.0,
 'Dubuque, IA_Chicago, IL': 147.0,
 'Traverse City, MI_Chicago, IL': 224.0,
 'Fort Wayne, IN_Chicago, IL': 157.0,
 'Charlotte, NC_Tallahassee, FL': 386.0,
 'Savannah, GA_New York, NY': 722.0,
 'Wilmington, NC_New York, NY': 500.0,
 'Tampa, FL_Charlotte, NC': 507.0,
 'Phoenix, AZ_Nashville, TN': 1449.0,
 'Albany, NY_Philadelphia, PA': 212.0,
 'Kona, HI_San Jose, CA': 2384.0,
 'Cleveland, OH_Boston, MA': 563.0,
 'Burbank, CA_Denver, CO': 850.0,
 'Baltimore, MD_Rochester, NY': 277.0,
 'Oakland, CA_Santa Barbara, CA': 263.0,
 'Phoenix, AZ_Cincinnati, OH': 1553.0,
 'Phoenix, AZ_Billings, MT': 878.0,
 'Denver, CO_Scottsbluff, NE': 150.0,
 'Los Angeles, CA_San Diego, CA': 109.0,
 'Detroit, MI_Indianapolis, IN': 231.0,
 'K

In [68]:
delays_preprocessed.limit(10).toPandas().head(10)

Unnamed: 0,Origin,Operating_Airline,OriginCityName,Dest,DestCityName,DepDelay,DepDelayMinutes,TaxiOut,TaxiIn,ArrDelay,Distance,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay,FlightWeek,Year,depDaypart,arrDaypart
0,SEA,DL,"Seattle, WA",TUS,"Tucson, AZ",-7.0,0.0,17.0,22.0,-13.0,1216.0,0.0,0.0,0.0,0.0,0.0,17,2021,morning,afternoon
1,MCO,DL,"Orlando, FL",ATL,"Atlanta, GA",141.0,141.0,14.0,14.0,135.0,404.0,135.0,0.0,0.0,0.0,0.0,17,2021,afternoon,evening
2,LAS,DL,"Las Vegas, NV",SLC,"Salt Lake City, UT",-4.0,0.0,17.0,16.0,2.0,368.0,0.0,0.0,0.0,0.0,0.0,17,2021,night,night
3,ATL,DL,"Atlanta, GA",IND,"Indianapolis, IN",-1.0,0.0,8.0,3.0,-15.0,432.0,0.0,0.0,0.0,0.0,0.0,17,2021,afternoon,afternoon
4,IND,DL,"Indianapolis, IN",ATL,"Atlanta, GA",-6.0,0.0,10.0,11.0,-12.0,432.0,0.0,0.0,0.0,0.0,0.0,17,2021,evening,evening
5,SAT,DL,"San Antonio, TX",ATL,"Atlanta, GA",6.0,6.0,9.0,15.0,-1.0,874.0,0.0,0.0,0.0,0.0,0.0,17,2021,morning,morning
6,LAS,DL,"Las Vegas, NV",MSP,"Minneapolis, MN",6.0,6.0,14.0,5.0,-8.0,1299.0,0.0,0.0,0.0,0.0,0.0,17,2021,afternoon,night
7,DEN,DL,"Denver, CO",SLC,"Salt Lake City, UT",18.0,18.0,21.0,8.0,20.0,391.0,0.0,18.0,2.0,0.0,0.0,17,2021,afternoon,afternoon
8,SLC,DL,"Salt Lake City, UT",DEN,"Denver, CO",-4.0,0.0,15.0,8.0,-5.0,391.0,0.0,0.0,0.0,0.0,0.0,17,2021,morning,morning
9,SLC,DL,"Salt Lake City, UT",DEN,"Denver, CO",-8.0,0.0,14.0,5.0,-12.0,391.0,0.0,0.0,0.0,0.0,0.0,17,2021,night,night


In [69]:
ls

Dig Bata - flightPrices 2.ipynb  [34mdelays_preprocessed.csv[m[m/
Dig Bata - flightPrices.ipynb    [34mdelays_preprocessed_updated[m[m/
Dig Bata 2.ipynb                 distanceMapping.json
Dig Bata 3.ipynb                 [34mflight_prices_processed_2[m[m/
Dig Bata Project.ipynb           itineraries.csv
Final Project.ipynb              [34mlinear_regression_model[m[m/
Weights  Calculation             nasDelay.json
airportCodeToNameMapping.json    [34mpipeline_model[m[m/
archive (1).zip                  [34mpipeline_model2[m[m/
archive.zip                      s3-access_accessKeys.csv
big-data-project.html            securityDelay.json
[34mdelays[m[m/                          test_prices.ipynb
[34mdelays_lr_model[m[m/                 timeDuration.json
[34mdelays_pipeline[m[m/                 weatherDelay.json
[34mdelays_preprocessed[m[m/


In [70]:
# Create a new column by concatenating 'Origin' and 'Dest'
from pyspark.sql.functions import concat_ws

df_with_concatenated = delays_preprocessed.withColumn('Origin_Dest', concat_ws('_', 'OriginCityName', 'DestCityName'))

# Convert the DataFrame to an RDD and collect it as a dictionary
distance_dict = df_with_concatenated.select('Origin_Dest', 'Distance').distinct().rdd.collectAsMap()

# If you want to use this dictionary in Python code, convert the keys to strings
distance_dict = {str(k): v for k, v in distance_dict.items()}
distance_dict

                                                                                

{'Nashville, TN_Seattle, WA': 1978.0,
 'Reno, NV_Santa Ana, CA': 415.0,
 'St. Petersburg, FL_Pittsburgh, PA': 878.0,
 'Columbus, OH_Valparaiso, FL': 675.0,
 'Los Angeles, CA_Springfield, MO': 1423.0,
 'Sanford, FL_Shreveport, LA': 791.0,
 'Norfolk, VA_Cincinnati, OH': 485.0,
 'Dubuque, IA_Chicago, IL': 147.0,
 'Traverse City, MI_Chicago, IL': 224.0,
 'Fort Wayne, IN_Chicago, IL': 157.0,
 'Charlotte, NC_Tallahassee, FL': 386.0,
 'Savannah, GA_New York, NY': 722.0,
 'Wilmington, NC_New York, NY': 500.0,
 'Tampa, FL_Charlotte, NC': 507.0,
 'Phoenix, AZ_Nashville, TN': 1449.0,
 'Albany, NY_Philadelphia, PA': 212.0,
 'Kona, HI_San Jose, CA': 2384.0,
 'Cleveland, OH_Boston, MA': 563.0,
 'Burbank, CA_Denver, CO': 850.0,
 'Baltimore, MD_Rochester, NY': 277.0,
 'Oakland, CA_Santa Barbara, CA': 263.0,
 'Phoenix, AZ_Cincinnati, OH': 1553.0,
 'Phoenix, AZ_Billings, MT': 878.0,
 'Denver, CO_Scottsbluff, NE': 150.0,
 'Los Angeles, CA_San Diego, CA': 109.0,
 'Detroit, MI_Indianapolis, IN': 231.0,
 'K

In [71]:
import json

# Convert the dictionary to JSON format
distance_json = json.dumps(distance_dict)

# Specify the file path where you want to save the JSON file
file_path = "distanceMapping.json"  # Replace "/path/to/your/file.json" with your desired file path

# Write the JSON string to the file
with open(file_path, 'w') as f:
    json.dump(distance_dict, f)

In [72]:
delays_preprocessed.limit(10).toPandas().head()

Unnamed: 0,Origin,Operating_Airline,OriginCityName,Dest,DestCityName,DepDelay,DepDelayMinutes,TaxiOut,TaxiIn,ArrDelay,Distance,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay,FlightWeek,Year,depDaypart,arrDaypart
0,SEA,DL,"Seattle, WA",TUS,"Tucson, AZ",-7.0,0.0,17.0,22.0,-13.0,1216.0,0.0,0.0,0.0,0.0,0.0,17,2021,morning,afternoon
1,MCO,DL,"Orlando, FL",ATL,"Atlanta, GA",141.0,141.0,14.0,14.0,135.0,404.0,135.0,0.0,0.0,0.0,0.0,17,2021,afternoon,evening
2,LAS,DL,"Las Vegas, NV",SLC,"Salt Lake City, UT",-4.0,0.0,17.0,16.0,2.0,368.0,0.0,0.0,0.0,0.0,0.0,17,2021,night,night
3,ATL,DL,"Atlanta, GA",IND,"Indianapolis, IN",-1.0,0.0,8.0,3.0,-15.0,432.0,0.0,0.0,0.0,0.0,0.0,17,2021,afternoon,afternoon
4,IND,DL,"Indianapolis, IN",ATL,"Atlanta, GA",-6.0,0.0,10.0,11.0,-12.0,432.0,0.0,0.0,0.0,0.0,0.0,17,2021,evening,evening


In [73]:
from pyspark.sql.functions import mean
mean_carrier_delay = delays_preprocessed.groupBy('Operating_Airline') \
                       .agg(mean('CarrierDelay').alias('Mean_CarrierDelay'))

# Convert the DataFrame to a dictionary
carrier_delay_dict = dict(mean_carrier_delay.rdd.map(lambda row: (row['Operating_Airline'], row['Mean_CarrierDelay'])).collect())
carrier_delay_dict

                                                                                

{'UA': 3.423222419985118,
 'NK': 3.2118444457475714,
 'AA': 4.7917388244190064,
 'EV': 5.342242695384107,
 'B6': 7.299943174643263,
 'PT': 2.539269115342735,
 'DL': 3.7599815035051902,
 'OO': 6.209415580208714,
 'F9': 5.000508332704653,
 'YV': 5.67237268244944,
 'MQ': 2.6363881908638844,
 'C5': 6.434559493742045,
 'OH': 3.49215333495421,
 'EM': 2.1525951715607676,
 'HA': 3.0608543863527027,
 'G4': 5.794402889991456,
 'ZW': 4.268656961022644,
 'YX': 3.003557364412194,
 'AS': 2.5637682170002094,
 'QX': 2.2097902553837927,
 'G7': 4.747592106149391,
 'WN': 3.1851960736430476,
 '9E': 3.1854992326419103,
 'CP': 4.858521951123864,
 'AX': 6.546503974809003,
 'KS': 8.625425170068027,
 '9K': 2.388185654008439,
 'VX': 3.0799443058536866}

In [74]:
from pyspark.sql.functions import mean
from pyspark.sql.functions import concat_ws


mean_weather_delay = delays_preprocessed.groupBy('Origin', 'Dest') \
                      .agg(mean('WeatherDelay').alias('Mean_WeatherDelay'))

# Concatenate 'Origin' and 'Dest' columns to create the key for the dictionary
mean_weather_delay = mean_weather_delay.withColumn("origin_dest", concat_ws("_", "Origin", "Dest"))

# Convert the DataFrame to a dictionary
weather_delay_dict = dict(mean_weather_delay.rdd.map(lambda row: (row['origin_dest'], row['Mean_WeatherDelay'])).collect())

# Show the dictionary
print(weather_delay_dict)



{'ATL_GSP': 0.4232451423245142, 'FSD_ATL': 2.6272640610104863, 'PHL_MCO': 0.23165900086524888, 'LAS_LIT': 0.14367269267364416, 'SNA_GEG': 0.02843601895734597, 'PIT_VPS': 2.1451612903225805, 'STS_PHX': 0.2509677419354839, 'BQN_MCO': 0.1848127600554785, 'TPA_ACY': 0.5124481327800829, 'ORD_PDX': 0.8498078938176737, 'SNA_PHX': 0.15776667947240364, 'PBI_DCA': 0.4250174378051616, 'SJC_LIH': 0.0, 'IAD_ILM': 1.1649565903709551, 'SPI_ORD': 0.7884138785625775, 'DAL_LGB': 0.1194331983805668, 'MDW_MEM': 0.48843827819281393, 'SMF_BUR': 0.05024325519681557, 'MCI_IAH': 1.432156982050502, 'SHR_DEN': 1.6733812949640288, 'LBB_DEN': 0.5839534131349078, 'EWR_STT': 0.21149425287356322, 'MYR_PSM': 0.19101123595505617, 'PNS_PHL': 0.9384184744576627, 'TPA_CVG': 0.513678905687545, 'RDU_SLC': 0.5516102394715111, 'CAE_ATL': 0.38136620856911885, 'MYR_TTN': 0.0, 'BTV_MCO': 1.495, 'RDU_PGD': 1.28992628992629, 'AZA_SAN': 0.0, 'TYS_HOU': 0.8476190476190476, 'BNA_SYR': 0.20249221183800623, 'AZA_MOT': 0.286168521462639

                                                                                

In [75]:
import json

# Convert the dictionary to JSON format
weather_delay_json = json.dumps(weather_delay_dict)

# Specify the file path where you want to save the JSON file
file_path = "weatherDelay.json"  # Replace "/path/to/your/file.json" with your desired file path

# Write the JSON string to the file
with open(file_path, 'w') as f:
    json.dump(weather_delay_dict, f)

In [76]:
from pyspark.sql.functions import mean
from pyspark.sql.functions import concat_ws


mean_nas_delay = delays_preprocessed.groupBy('Origin', 'Dest') \
                      .agg(mean('NASDelay').alias('Mean_NASDelay'))

# Concatenate 'Origin' and 'Dest' columns to create the key for the dictionary
mean_nas_delay = mean_nas_delay.withColumn("origin_dest", concat_ws("_", "Origin", "Dest"))

# Convert the DataFrame to a dictionary
mean_nas_dict = dict(mean_nas_delay.rdd.map(lambda row: (row['origin_dest'], row['Mean_NASDelay'])).collect())

# Show the dictionary
print(mean_nas_dict)



{'ATL_GSP': 0.5679357567935757, 'FSD_ATL': 2.3040991420400383, 'PHL_MCO': 4.164716061751446, 'LAS_LIT': 1.6888677450047573, 'SNA_GEG': 1.5308056872037914, 'PIT_VPS': 1.4623655913978495, 'STS_PHX': 0.7303225806451613, 'BQN_MCO': 2.6449375866851597, 'TPA_ACY': 2.7579529737206085, 'ORD_PDX': 2.25823728024217, 'SNA_PHX': 1.149442950441798, 'PBI_DCA': 2.5656823994419904, 'SJC_LIH': 2.438449848024316, 'IAD_ILM': 2.25611681136543, 'SPI_ORD': 3.700743494423792, 'DAL_LGB': 1.1295546558704452, 'MDW_MEM': 1.1828530771967272, 'SMF_BUR': 0.5194161875276426, 'MCI_IAH': 2.6010039549741406, 'SHR_DEN': 3.2338129496402876, 'LBB_DEN': 1.6182465221611129, 'EWR_STT': 3.3233716475095787, 'MYR_PSM': 2.9101123595505616, 'PNS_PHL': 3.3428971308607416, 'TPA_CVG': 2.299496040316775, 'RDU_SLC': 1.2089182493806772, 'CAE_ATL': 1.2363581244947452, 'MYR_TTN': 4.101214574898785, 'BTV_MCO': 7.71, 'RDU_PGD': 3.71007371007371, 'AZA_SAN': 2.4705882352941178, 'TYS_HOU': 2.980952380952381, 'BNA_SYR': 3.9501557632398754, 'AZ

                                                                                

In [77]:
import json

# Convert the dictionary to JSON format
mean_nas_json = json.dumps(mean_nas_dict)

# Specify the file path where you want to save the JSON file
file_path = "nasDelay.json"  # Replace "/path/to/your/file.json" with your desired file path

# Write the JSON string to the file
with open(file_path, 'w') as f:
    json.dump(mean_nas_dict, f)

In [78]:
mean_sec_delay = delays_preprocessed.groupBy('Origin') \
                       .agg(mean('SecurityDelay').alias('Mean_SecurityDelay'))

# Convert the DataFrame to a dictionary
mean_sec_dict = dict(mean_sec_delay.rdd.map(lambda row: (row['Origin'], row['Mean_SecurityDelay'])).collect())
mean_sec_dict

                                                                                

{'BGM': 0.0004175365344467641,
 'DLG': 1.7715736040609138,
 'PSE': 0.3469135802469136,
 'INL': 0.0,
 'MSY': 0.024985783155985677,
 'GEG': 0.016993649554759287,
 'DRT': 0.0,
 'SNA': 0.016745891947976738,
 'BUR': 0.014180047236309709,
 'GRB': 0.0048291233283803865,
 'GTF': 0.001578728707935189,
 'IDA': 0.00891291642314436,
 'GRR': 0.011740955214365405,
 'LWB': 0.0003661662394727206,
 'PVU': 0.007997630331753554,
 'JLN': 0.005786718104160926,
 'EUG': 0.00811772130113017,
 'PSG': 0.011363636363636364,
 'ATY': 0.0,
 'MYR': 0.008709101287032093,
 'PVD': 0.01272971487880823,
 'GSO': 0.007419484849595134,
 'OAK': 0.03932584269662921,
 'EAR': 0.009688581314878892,
 'FAR': 0.020723450417087494,
 'MSN': 0.005555231824097275,
 'FSM': 0.032324187953327026,
 'MQT': 0.0352071581404396,
 'COD': 0.011385199240986717,
 'BTM': 0.01678478179783663,
 'SCC': 0.014566395663956639,
 'ESC': 0.05382215288611544,
 'DCA': 0.016708104538665745,
 'RFD': 0.042566129522651264,
 'CID': 0.003367003367003367,
 'MLU': 0.

In [79]:
import json

# Convert the dictionary to JSON format
# mean_nas_json = json.dumps(mean_nas_dict)

# Specify the file path where you want to save the JSON file
file_path = "securityDelay.json"  # Replace "/path/to/your/file.json" with your desired file path

# Write the JSON string to the file
with open(file_path, 'w') as f:
    json.dump(mean_sec_dict, f)

In [80]:
# 'CarrierDelay', # Flight
#  'WeatherDelay', # org dest
 # 'NASDelay',# org dest
 # 'SecurityDelay', # org
 # 'LateAircraftDelay', # Flight
 

In [81]:
mean_late_aircraft_delay = delays_preprocessed.groupBy('Operating_Airline') \
                       .agg(mean('LateAircraftDelay').alias('Mean_LateAircraftDelay'))

# Convert the DataFrame to a dictionary
mean_late_aircraft_delay_dict = dict(mean_late_aircraft_delay.rdd.map(lambda row: (row['Operating_Airline'], row['Mean_LateAircraftDelay'])).collect())
mean_late_aircraft_delay_dict

                                                                                

{'UA': 5.1743180972118035,
 'NK': 3.694704255360738,
 'AA': 4.874551629303676,
 'EV': 5.129514361667639,
 'B6': 7.2672119548665535,
 'PT': 4.982086420736728,
 'DL': 2.304132494997973,
 'OO': 4.003186489827333,
 'F9': 7.564506162860458,
 'YV': 6.064572802154132,
 'MQ': 4.095667314583926,
 'C5': 10.02854928666937,
 'OH': 5.852045276494583,
 'EM': 6.494479820422374,
 'HA': 1.3495462169577033,
 'G4': 7.319008289238979,
 'ZW': 7.082420583977493,
 'YX': 4.000902014364639,
 'AS': 2.7092320461748516,
 'QX': 2.5150772280068066,
 'G7': 7.782649693540782,
 'WN': 4.315058973990045,
 '9E': 3.5869364059012443,
 'CP': 6.185060033072111,
 'AX': 10.59635040264299,
 'KS': 6.386479591836735,
 '9K': 1.5123568414707655,
 'VX': 2.859430295295005}

In [82]:
origin_city_pairs = delays_preprocessed.select('Origin', 'OriginCityName').distinct().collect()

# Create a dictionary from the list of tuples
origin_city_dict = {row['Origin']: row['OriginCityName'] for row in origin_city_pairs}

# Show the dictionary
print(origin_city_dict)



{'COS': 'Colorado Springs, CO', 'SDF': 'Louisville, KY', 'CLL': 'College Station/Bryan, TX', 'PIR': 'Pierre, SD', 'MSN': 'Madison, WI', 'EVV': 'Evansville, IN', 'CMI': 'Champaign/Urbana, IL', 'GTR': 'Columbus, MS', 'STX': 'Christiansted, VI', 'RIC': 'Richmond, VA', 'FLO': 'Florence, SC', 'DCA': 'Washington, DC', 'CKB': 'Clarksburg/Fairmont, WV', 'LNY': 'Lanai, HI', 'GUM': 'Guam, TT', 'PGV': 'Greenville, NC', 'GRI': 'Grand Island, NE', 'ITO': 'Hilo, HI', 'IAG': 'Niagara Falls, NY', 'MSP': 'Minneapolis, MN', 'HHH': 'Hilton Head, SC', 'CHS': 'Charleston, SC', 'AVL': 'Asheville, NC', 'PIT': 'Pittsburgh, PA', 'MYR': 'Myrtle Beach, SC', 'GST': 'Gustavus, AK', 'SLC': 'Salt Lake City, UT', 'DSM': 'Des Moines, IA', 'SJT': 'San Angelo, TX', 'DVL': 'Devils Lake, ND', 'DEN': 'Denver, CO', 'CYS': 'Cheyenne, WY', 'SGF': 'Springfield, MO', 'OGD': 'Ogden, UT', 'COD': 'Cody, WY', 'INL': 'International Falls, MN', 'PVU': 'Provo, UT', 'MDW': 'Chicago, IL', 'DAY': 'Dayton, OH', 'CHO': 'Charlottesville, VA

                                                                                

In [83]:
import json

# Convert the dictionary to JSON format
# mean_nas_json = json.dumps(mean_nas_dict)

# Specify the file path where you want to save the JSON file
file_path = "airportCodeToNameMapping.json"  # Replace "/path/to/your/file.json" with your desired file path

# Write the JSON string to the file
with open(file_path, 'w') as f:
    json.dump(origin_city_dict, f)

In [84]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

cols = ["Operating_Airline", 
           "Origin", 
           "Dest", 
           "FlightWeek", 
           "arrDaypart", 
           "depDaypart"]

indexers = [StringIndexer(inputCol=column, outputCol=column+"_index", handleInvalid="keep") for column in cols]

# Perform OneHotEncoding on the indexed columns
encoders = [OneHotEncoder(inputCol=column+"_index", outputCol=column+"_onehot") for column in cols]

# Create a pipeline to execute the indexers and encoders sequentially
pipeline = Pipeline(stages=indexers + encoders)

# Fit the pipeline to the data and transform the DataFrame
pipeline_model = pipeline.fit(delays_preprocessed)
encoded_df = pipeline_model.transform(delays_preprocessed)

# Show the encoded DataFrame
encoded_df.show()


                                                                                

+------+-----------------+------------------+----+------------------+--------+---------------+-------+------+--------+--------+------------+------------+--------+-------------+-----------------+----------+----+----------+----------+-----------------------+------------+----------+----------------+----------------+----------------+------------------------+----------------+----------------+-----------------+-----------------+-----------------+
|Origin|Operating_Airline|    OriginCityName|Dest|      DestCityName|DepDelay|DepDelayMinutes|TaxiOut|TaxiIn|ArrDelay|Distance|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|FlightWeek|Year|depDaypart|arrDaypart|Operating_Airline_index|Origin_index|Dest_index|FlightWeek_index|arrDaypart_index|depDaypart_index|Operating_Airline_onehot|   Origin_onehot|     Dest_onehot|FlightWeek_onehot|arrDaypart_onehot|depDaypart_onehot|
+------+-----------------+------------------+----+------------------+--------+---------------+-------+------+-

In [85]:
encoded_df.columns

['Origin',
 'Operating_Airline',
 'OriginCityName',
 'Dest',
 'DestCityName',
 'DepDelay',
 'DepDelayMinutes',
 'TaxiOut',
 'TaxiIn',
 'ArrDelay',
 'Distance',
 'CarrierDelay',
 'WeatherDelay',
 'NASDelay',
 'SecurityDelay',
 'LateAircraftDelay',
 'FlightWeek',
 'Year',
 'depDaypart',
 'arrDaypart',
 'Operating_Airline_index',
 'Origin_index',
 'Dest_index',
 'FlightWeek_index',
 'arrDaypart_index',
 'depDaypart_index',
 'Operating_Airline_onehot',
 'Origin_onehot',
 'Dest_onehot',
 'FlightWeek_onehot',
 'arrDaypart_onehot',
 'depDaypart_onehot']

In [86]:
encoded_df.limit(100).show(100)

+------+-----------------+--------------------+----+--------------------+--------+---------------+-------+------+--------+--------+------------+------------+--------+-------------+-----------------+----------+----+----------+----------+-----------------------+------------+----------+----------------+----------------+----------------+------------------------+----------------+-----------------+-----------------+-----------------+-----------------+
|Origin|Operating_Airline|      OriginCityName|Dest|        DestCityName|DepDelay|DepDelayMinutes|TaxiOut|TaxiIn|ArrDelay|Distance|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|FlightWeek|Year|depDaypart|arrDaypart|Operating_Airline_index|Origin_index|Dest_index|FlightWeek_index|arrDaypart_index|depDaypart_index|Operating_Airline_onehot|   Origin_onehot|      Dest_onehot|FlightWeek_onehot|arrDaypart_onehot|depDaypart_onehot|
+------+-----------------+--------------------+----+--------------------+--------+---------------+--

In [87]:
req_cols = [
     'Year',
     'Operating_Airline',
     'Origin_onehot',
     'Operating_Airline_onehot',
     'Dest_onehot',
     'FlightWeek_onehot',
     'arrDaypart_onehot',
     'depDaypart_onehot',
     'DepDelay',
     'DepDelayMinutes',
     'TaxiOut',
     'TaxiIn',
     'Distance',
     'CarrierDelay',
     'WeatherDelay',
     'NASDelay',
     'SecurityDelay',
     'LateAircraftDelay',
     'Origin_index',
     'Operating_Airline_index',
     'Dest_index',
     'FlightWeek_index',
     'arrDaypart_index',
     'depDaypart_index',    
     'ArrDelay'
]

encoded_df_2 = encoded_df[req_cols]


In [88]:
encoded_df_2.columns

['Year',
 'Operating_Airline',
 'Origin_onehot',
 'Operating_Airline_onehot',
 'Dest_onehot',
 'FlightWeek_onehot',
 'arrDaypart_onehot',
 'depDaypart_onehot',
 'DepDelay',
 'DepDelayMinutes',
 'TaxiOut',
 'TaxiIn',
 'Distance',
 'CarrierDelay',
 'WeatherDelay',
 'NASDelay',
 'SecurityDelay',
 'LateAircraftDelay',
 'Origin_index',
 'Operating_Airline_index',
 'Dest_index',
 'FlightWeek_index',
 'arrDaypart_index',
 'depDaypart_index',
 'ArrDelay']

In [89]:
from pyspark.sql.functions import when, col

train_data_df = encoded_df_2.filter(col("Year") != 2022)
test_data_df = encoded_df_2.filter(col("Year") == 2022)


In [90]:
train_data_df.limit(100).show(100)

+----+-----------------+----------------+------------------------+-----------------+-----------------+-----------------+-----------------+--------+---------------+-------+------+--------+------------+------------+--------+-------------+-----------------+------------+-----------------------+----------+----------------+----------------+----------------+--------+
|Year|Operating_Airline|   Origin_onehot|Operating_Airline_onehot|      Dest_onehot|FlightWeek_onehot|arrDaypart_onehot|depDaypart_onehot|DepDelay|DepDelayMinutes|TaxiOut|TaxiIn|Distance|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|Origin_index|Operating_Airline_index|Dest_index|FlightWeek_index|arrDaypart_index|depDaypart_index|ArrDelay|
+----+-----------------+----------------+------------------------+-----------------+-----------------+-----------------+-----------------+--------+---------------+-------+------+--------+------------+------------+--------+-------------+-----------------+------------+-------

In [91]:
test_data_df.limit(100).show(100)



+----+-----------------+-----------------+------------------------+-----------------+-----------------+-----------------+-----------------+--------+---------------+-------+------+--------+------------+------------+--------+-------------+-----------------+------------+-----------------------+----------+----------------+----------------+----------------+--------+
|Year|Operating_Airline|    Origin_onehot|Operating_Airline_onehot|      Dest_onehot|FlightWeek_onehot|arrDaypart_onehot|depDaypart_onehot|DepDelay|DepDelayMinutes|TaxiOut|TaxiIn|Distance|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|Origin_index|Operating_Airline_index|Dest_index|FlightWeek_index|arrDaypart_index|depDaypart_index|ArrDelay|
+----+-----------------+-----------------+------------------------+-----------------+-----------------+-----------------+-----------------+--------+---------------+-------+------+--------+------------+------------+--------+-------------+-----------------+------------+----

                                                                                

In [92]:
train_data_df.columns

['Year',
 'Operating_Airline',
 'Origin_onehot',
 'Operating_Airline_onehot',
 'Dest_onehot',
 'FlightWeek_onehot',
 'arrDaypart_onehot',
 'depDaypart_onehot',
 'DepDelay',
 'DepDelayMinutes',
 'TaxiOut',
 'TaxiIn',
 'Distance',
 'CarrierDelay',
 'WeatherDelay',
 'NASDelay',
 'SecurityDelay',
 'LateAircraftDelay',
 'Origin_index',
 'Operating_Airline_index',
 'Dest_index',
 'FlightWeek_index',
 'arrDaypart_index',
 'depDaypart_index',
 'ArrDelay']

In [93]:
train_data_df_2 = train_data_df[[
 'FlightWeek_onehot',
 "Operating_Airline_onehot",
 "Operating_Airline_index",
 'Origin_onehot',
 'Dest_onehot',
 'arrDaypart_onehot',
 'depDaypart_onehot',
 'FlightWeek_index',
 'Origin_index',
 'Dest_index',
 'arrDaypart_index',
 'depDaypart_index',
 'CarrierDelay',
 'WeatherDelay',
 'NASDelay',
 'SecurityDelay',
 'LateAircraftDelay',
 'Distance',
 'ArrDelay']]

test_data_df_2 = test_data_df[[
 'FlightWeek_onehot',
 "Operating_Airline_onehot",
 "Operating_Airline_index",
 'Origin_onehot',
 'Dest_onehot',
 'arrDaypart_onehot',
 'depDaypart_onehot',
 'FlightWeek_index',
 'Origin_index',
 'Dest_index',
 'arrDaypart_index',
 'depDaypart_index',
 'CarrierDelay', # Flight
 'WeatherDelay', # org dest
 'NASDelay',# org dest
 'SecurityDelay', # org
 'LateAircraftDelay', # Flight
 'Distance',    
 'ArrDelay']]

In [94]:
train_data_df_2.limit(10).toPandas().head(10)

Unnamed: 0,FlightWeek_onehot,Operating_Airline_onehot,Operating_Airline_index,Origin_onehot,Dest_onehot,arrDaypart_onehot,depDaypart_onehot,FlightWeek_index,Origin_index,Dest_index,arrDaypart_index,depDaypart_index,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay,Distance,ArrDelay
0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",1.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 1.0, 0.0)","(1.0, 0.0, 0.0, 0.0)",24.0,6.0,71.0,2.0,0.0,0.0,0.0,0.0,0.0,0.0,1216.0,-13.0
1,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",1.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 1.0)","(0.0, 1.0, 0.0, 0.0)",24.0,12.0,0.0,3.0,1.0,135.0,0.0,0.0,0.0,0.0,404.0,135.0
2,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",1.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(1.0, 0.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 1.0)",24.0,9.0,18.0,0.0,3.0,0.0,0.0,0.0,0.0,0.0,368.0,2.0
3,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",1.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 1.0, 0.0)","(0.0, 1.0, 0.0, 0.0)",24.0,0.0,41.0,2.0,1.0,0.0,0.0,0.0,0.0,0.0,432.0,-15.0
4,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",1.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 1.0)","(0.0, 0.0, 1.0, 0.0)",24.0,41.0,0.0,3.0,2.0,0.0,0.0,0.0,0.0,0.0,432.0,-12.0
5,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",1.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 1.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0)",24.0,48.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,874.0,-1.0
6,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",1.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(1.0, 0.0, 0.0, 0.0)","(0.0, 1.0, 0.0, 0.0)",24.0,9.0,14.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,1299.0,-8.0
7,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",1.0,"(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 1.0, 0.0)","(0.0, 1.0, 0.0, 0.0)",24.0,2.0,18.0,2.0,1.0,0.0,18.0,2.0,0.0,0.0,391.0,20.0
8,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",1.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 1.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0)",24.0,18.0,2.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,391.0,-5.0
9,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",1.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(1.0, 0.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 1.0)",24.0,18.0,2.0,0.0,3.0,0.0,0.0,0.0,0.0,0.0,391.0,-12.0


In [95]:
train_data_df_2.columns

['FlightWeek_onehot',
 'Operating_Airline_onehot',
 'Operating_Airline_index',
 'Origin_onehot',
 'Dest_onehot',
 'arrDaypart_onehot',
 'depDaypart_onehot',
 'FlightWeek_index',
 'Origin_index',
 'Dest_index',
 'arrDaypart_index',
 'depDaypart_index',
 'CarrierDelay',
 'WeatherDelay',
 'NASDelay',
 'SecurityDelay',
 'LateAircraftDelay',
 'Distance',
 'ArrDelay']

In [96]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Combine features into a single vector column"]
trainAssembler = VectorAssembler(inputCols=train_data_df_2.columns[:-1],
                                 outputCol="features")
trainOutput = trainAssembler.transform(train_data_df_2)

testAssembler = VectorAssembler(inputCols=test_data_df_2.columns[:-1],
                                outputCol="features")
testOutput = testAssembler.transform(test_data_df_2)

# Create a LinearRegression model
lr = LinearRegression(featuresCol="features", labelCol="ArrDelay")

# Train the model
model = lr.fit(trainOutput)

# Make predictions
predictions = model.transform(testOutput)


24/05/09 19:12:36 WARN Instrumentation: [84840884] regParam is zero, which might cause numerical instability and overfitting.
24/05/09 19:13:35 WARN Instrumentation: [84840884] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.
                                                                                

In [55]:
from pyspark.sql.functions import abs, mean

mae = predictions.select(abs(predictions["ArrDelay"] - predictions["prediction"]).alias("error")).agg({"error": "mean"}).collect()[0][0]
print("Mean Absolute Error (MAE) on test data = %g" % mae)




Mean Absolute Error (MAE) on test data = 8.90166


                                                                                

In [78]:
from pyspark.sql.functions import abs, mean

rmse = evaluator.evaluate(predictions)
mae = predictions.select(abs(predictions["ArrDelay"] - predictions["prediction"]).alias("error")).agg({"error": "mean"}).collect()[0][0]


print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
print("Mean Absolute Error (MAE) on test data = %g" % mae)




Root Mean Squared Error (RMSE) on test data = 20.3077
Mean Absolute Error (MAE) on test data = 8.90166


                                                                                

In [79]:
predictions.columns

['FlightWeek_onehot',
 'Operating_Airline_onehot',
 'Operating_Airline_index',
 'Origin_onehot',
 'Dest_onehot',
 'arrDaypart_onehot',
 'depDaypart_onehot',
 'FlightWeek_index',
 'Origin_index',
 'Dest_index',
 'arrDaypart_index',
 'depDaypart_index',
 'CarrierDelay',
 'WeatherDelay',
 'NASDelay',
 'SecurityDelay',
 'LateAircraftDelay',
 'Distance',
 'ArrDelay',
 'features',
 'prediction']

In [None]:
predictions.select("ArrDelay").show()

In [80]:
# Save the pipeline
pipeline_path = "delays_pipeline"
pipeline_model.save(pipeline_path)

# Save the trained model
model_path = "delays_lr_model"
model.save(model_path)


In [None]:
# from pyspark.sql.functions import abs

# # Calculate Mean Absolute Error (MAE)
# mae = predictions.selectaa(abs(predictions["ArrDelay"] - predictions["prediction"]).alias("error")).agg({"error": "mean"}).collect()[0][0]

# print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
# print("Mean Absolute Error (MAE) on test data = %g" % mae)


In [None]:
# from pyspark.ml.feature import VectorAssembler
# from pyspark.ml.regression import RandomForestRegressor
# from pyspark.ml.evaluation import RegressionEvaluator


# # Combine features into a single vector column
# trainAssembler = VectorAssembler(inputCols=train_data_df_2.columns[:-1],
#                             outputCol="features")
# trainOutput = trainAssembler.transform(train_data_df_2)


# testAssembler = VectorAssembler(inputCols=test_data_df_2.columns[:-1],
#                             outputCol="features")
# testOutput = testAssembler.transform(test_data_df_2)

In [None]:
# trainOutput.limit(10000).show(10000)

In [None]:
# testOutput.limit(100).show(100)

In [None]:
# # Create a RandomForestRegressor
# rf = RandomForestRegressor(featuresCol="features", labelCol="ArrDelay", numTrees=10,maxBins=500)

# # Train the model
# model = rf.fit(trainOutput)

# # Make predictions
# predictions = model.transform(testOutput)



In [None]:
# from pyspark.sql.functions import abs

# # Calculate Mean Absolute Error (MAE)
# # Evaluate the model
# evaluator = RegressionEvaluator(labelCol="ArrDelay", predictionCol="prediction", metricName="rmse")
# rmse = evaluator.evaluate(predictions)

# mae = predictions.select(abs(predictions["ArrDelay"] - predictions["prediction"]).alias("error")).agg({"error": "mean"}).collect()[0][0]

# print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
# print("Mean Absolute Error (MAE) on test data = %g" % mae)