In [1]:
#==========================================================================================
### FUNCTIONS ###
#==========================================================================================
def map_function(row):
    try:
        return (row['Passenger_id'], (1, row['Distance_km'], row['Total_flight_time_mins']))
    except ValueError:
        return

def shuffle(mapper_out): 
    #Organise the mapped values by key
    data = {} 
    for k, v in filter(None, mapper_out): 
        if k not in data: 
            data[k] = [v] 
        else: 
            data[k].append(v) 
    return data

def reduce(kv):
    k, v = kv
    count = [x[0] for x in v]
    distances = [x[1] for x in v]
    flight_times = [x[2] for x in v]
    return k, sum(count), sum(distances), sum(flight_times)


    

In [2]:
#==========================================================================================
### IMPORT MODULES ###
#==========================================================================================
try:
    print("IMPORT MODULES - Start")
    import sys
    import traceback
    import os
    import multiprocess as mp
    import pandas as pd
    from geopy.distance import geodesic
    print("IMPORT MODULES - Success")
except:
    print("IMPORT MODULES - Failed")
    traceback.print_exc()
    sys.exit(1)  # Exit with non-zero status code to indicate an error


#==========================================================================================
### OUTPUT FOLDER CREATION ###
#==========================================================================================
try:
    print("OUTPUT FOLDER CREATION - Start")
    if not os.path.exists("output_folder"):
        os.makedirs("output_folder")
    else:
        print("output_folder already exists")
    print("OUTPUT FOLDER CREATION- Success")
except:
    print("OUTPUT FOLDER CREATION - Failed")
    traceback.print_exc()
    sys.exit(1)  # Exit with non-zero status code to indicate an error


#==========================================================================================
### LOAD INPUT AND PREPROCESSING ###
#==========================================================================================
try:
    print("LOAD INPUT AND PREPROCESSING - Start")
    header_names_1 = ["Passenger_id", "Flight_id", "From_airport_IATA", "Destination_airport_IATA",
                      "Departure_time_GMT", "Total_flight_time_mins"]
    df_1 = pd.read_csv('AComp_Passenger_data_no_error.csv', encoding='utf-8', header=None, names=header_names_1)

    header_names_2 = ["From_Airport_Name", "From_Airport_IATA", "From_Latitude", "From_Longitude"]
    df_2 = pd.read_csv('Top30_airports_LatLong.csv', encoding='utf-8', header=None, names=header_names_2)
    df = pd.merge(df_1, df_2, how='left', left_on="From_airport_IATA", right_on="From_Airport_IATA")

    header_names_2 = ["To_Airport_Name", "To_Airport_IATA", "To_Latitude", "To_Longitude"]
    df_2 = pd.read_csv('Top30_airports_LatLong.csv', encoding='utf-8', header=None, names=header_names_2)
    df = pd.merge(df, df_2, how='left', left_on="Destination_airport_IATA", right_on="To_Airport_IATA")

    df['Distance_km'] = df.apply(lambda row: geodesic((row['From_Latitude'], row['From_Longitude']), 
                                                           (row['To_Latitude'], row['To_Longitude'])).kilometers, axis=1)
    print("LOAD INPUT AND PREPROCESSING - Success")
except:
    print("LOAD INPUT AND PREPROCESSING - Failed")
    traceback.print_exc()
    sys.exit(1)  # Exit with non-zero status code to indicate an error


#==========================================================================================
### MAPREDUCE BEFORE REMOVING DUPLICATE VALUES ###
#==========================================================================================
try:
    print("MAPREDUCE BEFORE REMOVING DUPLICATE VALUES - Start")
    # Perform map-reduce operations using multiprocessing
    with mp.get_context('spawn').Pool(processes=mp.cpu_count()) as pool:
        map_out = pool.map(map_function, df.to_dict('records'))
        reduce_in = shuffle(map_out)
        reduce_out = pool.map(reduce, reduce_in.items())
        reduce_out_df_1 = pd.DataFrame(reduce_out, columns=['Passenger_id', 'total flights', 'Distance_km', 'Total_flight_time_mins'])
        reduce_out_df_1.sort_values(by=['total flights', 'Distance_km', 'Total_flight_time_mins'], ascending=False, inplace =True)
        print("Print top six passengers")
        print(reduce_out_df_1.head(6))
        print("________________________________")
        print(f'Saving file as {os.path.join("output_folder", "reduce_out_BEFORE_duplicate_values.csv")}')
        reduce_out_df_1.to_csv(os.path.join("output_folder", "reduce_out_BEFORE_duplicate_values.csv"), index=False)
    print("MAPREDUCE BEFORE REMOVING DUPLICATE VALUES - Success")
except:
    print("MAPREDUCE BEFORE REMOVING DUPLICATE VALUES- Failed")
    traceback.print_exc()
    sys.exit(1)  # Exit with non-zero status code to indicate an error

#==========================================================================================
### MAPREDUCE AFTER REMOVING DUPLICATE VALUES ###
#==========================================================================================
try:
    print("MAPREDUCE AFTER REMOVING DUPLICATE VALUES - Start")
    df.drop_duplicates(inplace=True) #drop duplicates

    # Perform map-reduce operations using multiprocessing
    with mp.get_context('spawn').Pool(processes=mp.cpu_count()) as pool:
        map_out = pool.map(map, df.to_dict('records'))
        reduce_in = shuffle(map_out)
        reduce_out = pool.map(reduce, reduce_in.items())
        reduce_out_df_2 = pd.DataFrame(reduce_out, columns=['Passenger_id', 'total flights', 'Distance_km', 'Total_flight_time_mins'])
        reduce_out_df_2.sort_values(by=['total flights', 'Distance_km', 'Total_flight_time_mins'], ascending=False, inplace =True)
        print("Print top six passengers")
        print(reduce_out_df_2.head(6))
        print("-----------------")
        print(f'Saving file as {os.path.join("output_folder", "reduce_out_AFTER_duplicate_values.csv")}')
        reduce_out_df_2.to_csv(os.path.join("output_folder", "reduce_out_AFTER_duplicate_values.csv"), index=False)
        print("MAPREDUCE AFTER REMOVING DUPLICATE VALUES - Success")
except:
    print("MAPREDUCE AFTER REMOVING DUPLICATE VALUES- Failed")
    traceback.print_exc()
    sys.exit(1)  # Exit with non-zero status code to indicate an error





IMPORT MODULES - Start
IMPORT MODULES - Failed


Traceback (most recent call last):
  File "/var/folders/h3/lldh0c4j02962dvm357kt4080000gn/T/ipykernel_45086/3660960729.py", line 9, in <module>
    import multiprocess as mp
ModuleNotFoundError: No module named 'multiprocess'


AttributeError: 'tuple' object has no attribute 'tb_frame'