In [1]:
# import necessary libraries
import pandas as pd
import json
import os
import concurrent.futures
import time
import glob


from actions.done.CreateZFWMessageAction import CreateZFWMessageAction
from actions.done.UpdateEstimatesAction import UpdateEstimatesAction
from actions.done.CalculateWeightAndTrimAction import CalculateWeightAndTrimAction
from actions.done.CreateLoadsheetAction import CreateLoadsheetAction
from actions.done.UpdateFuelDataAction import UpdateFuelDataAction
from actions.done.StorePaxDataAction import StorePaxDataAction

In [2]:
def load_config(path):
    with open(path, 'r') as file:
        return json.load(file)

config_path = '../pandas_config.json'
config = load_config(config_path)
parquet_config = config['parquet']

# Setting up the engine as a global constant
ENGINE = parquet_config['engine']
COMPRESSION = parquet_config['compression']
FILE_PATH = "../data"

In [3]:
df = pd.concat([pd.read_parquet(f, engine=ENGINE) for f in glob.glob(f"{FILE_PATH}/data_parquet/combined/*.parquet")], ignore_index=True)

In [4]:
df.reset_index(inplace=True)

In [5]:
KEEP_COLUMNS = ["flight_id", "action_name","entry_details", "index"]
df = df[KEEP_COLUMNS]

In [7]:
df.rename(columns={"index": "row_id"}, inplace=True)

In [6]:
os.listdir(f"actions/done")
#ACTIONS = [action.replace(".py", "") for action in os.listdir(f"actions/done") if action.endswith(".py")]
ACTIONS = ["StorePaxDataAction"]
df = df[df['action_name'].isin(ACTIONS)]

In [8]:
flight_ids = df['flight_id'].unique()
# Set flight_id as index for faster access
df.set_index('flight_id', inplace=True)

In [11]:
def process_flight(flight, df):
    
    subset_df = df.loc[flight].copy()
    
    # when subset df has only 1 row, it will be returned as series so 
    # we need to convert it back to a dataframe
    if isinstance(subset_df, pd.Series):
        subset_df = subset_df.to_frame().transpose()
    subset_df.sort_values(inplace=True, by="row_id", ascending=True)
    actions = subset_df['action_name'].values
    entry_details = subset_df["entry_details"].values
    temp_estimated_zfws = []
    temp_actual_zfws = []
    temp_actual_fuels = []
    temp_actual_tows = []
    temp_min_take_off_fuels = []
    temp_total_pax = []
    temp_males = []
    temp_females = []
    temp_children = []
    temp_infants = []
    temp_economy = []
    temp_business = []
    temp_total_bags = []
    temp_total_bag_weights = []
    temp_jump_seats = []
    temp_standbys = []
    
    for entry, action in zip(entry_details, actions):
        if action == "CreateZFWMessageAction":
            estimated_zfw, actual_zfw = CreateZFWMessageAction(entry)
            if estimated_zfw is not None:
                try:
                    if estimated_zfw != temp_estimated_zfws[-1]:
                        temp_estimated_zfws.append(estimated_zfw)
                except IndexError:
                    temp_estimated_zfws.append(estimated_zfw)
            if actual_zfw is not None:
                try:
                    if actual_zfw != temp_actual_zfws[-1]:
                        temp_actual_zfws.append(actual_zfw)
                except IndexError:
                    temp_actual_zfws.append(actual_zfw)
        elif action == "UpdateEstimatesAction":
            estimated_zfw = UpdateEstimatesAction(entry)
            if estimated_zfw is not None:
                try:
                    if estimated_zfw != temp_estimated_zfws[-1]:
                        temp_estimated_zfws.append(estimated_zfw)
                except IndexError:
                    temp_estimated_zfws.append(estimated_zfw)
            if actual_zfw is not None:
                try:
                    if actual_zfw != temp_actual_zfws[-1]:
                        temp_actual_zfws.append(actual_zfw)
                except IndexError:
                    temp_actual_zfws.append(actual_zfw)
        elif action == "CalculateWeightAndTrimAction":
            actual_zfw = CalculateWeightAndTrimAction(entry)
            if actual_zfw is not None:
                try:
                    if actual_zfw != temp_actual_zfws[-1]:
                        temp_actual_zfws.append(actual_zfw)
                except IndexError:
                    temp_actual_zfws.append(actual_zfw)
        elif action == "CreateLoadsheetAction":
            actual_tow, actual_zfw, estimated_zfw, actual_fuel = CreateLoadsheetAction(entry)
            if actual_zfw is not None:
                try:
                    if actual_zfw != temp_actual_zfws[-1]:
                        temp_actual_zfws.append(actual_zfw)
                except IndexError:
                    temp_actual_zfws.append(actual_zfw)
            if estimated_zfw is not None:
                try:
                    if estimated_zfw != temp_estimated_zfws[-1]:
                        temp_estimated_zfws.append(estimated_zfw)
                except IndexError:
                    temp_estimated_zfws.append(estimated_zfw)
            if actual_fuel is not None:
                try: 
                    if actual_fuel != temp_actual_fuels[-1]:
                        temp_actual_fuels.append(actual_fuel)
                except IndexError:
                    temp_actual_fuels.append(actual_fuel)
            if actual_tow is not None:
                try:
                    if actual_tow != temp_actual_tows[-1]:
                        temp_actual_tows.append(actual_tow)
                except IndexError:
                    temp_actual_tows.append(actual_tow)
        elif action == "UpdateFuelDataAction":
            actual_fuel, minimum_tof = UpdateFuelDataAction(entry)
            if actual_fuel is not None:
                try: 
                    if actual_fuel != temp_actual_fuels[-1]:
                        temp_actual_fuels.append(actual_fuel)
                except IndexError:
                    temp_actual_fuels.append(actual_fuel)
            if minimum_tof is not None:
                try: 
                    if minimum_tof != temp_min_take_off_fuels[-1]:
                        temp_min_take_off_fuels.append(minimum_tof)
                except IndexError:
                    temp_min_take_off_fuels.append(minimum_tof)
        
        elif action == "StorePaxDataAction":
            total_pax, economy_class, business_class, jump_seat, standby, male, female, child, infant, total_bag, total_bag_weight, baggage_weight_type, j_capacities_list, y_capacities_list, j_distributions_list, y_distributions_list = StorePaxDataAction(entry)
            if total_pax is not None:
                try:
                    if total_pax != temp_total_pax[-1]:
                        temp_total_pax.append(total_pax)
                except IndexError:
                    temp_total_pax.append(total_pax)
            if economy_class is not None:
                try:
                    if economy_class != temp_economy[-1]:
                        temp_economy.append(economy_class)
                except IndexError:
                    temp_economy.append(economy_class)
            if business_class is not None:
                try:
                    if business_class != temp_business[-1]:
                        temp_business.append(business_class)
                except IndexError:
                    temp_business.append(business_class)
            if jump_seat is not None:
                try:
                    if jump_seat != temp_jump_seats[-1]:
                        temp_jump_seats.append(jump_seat)
                except IndexError:
                    temp_jump_seats.append(jump_seat)
            if standby is not None:
                try:
                    if standby != temp_standbys[-1]:
                        temp_standbys.append(standby)
                except IndexError:
                    temp_standbys.append(standby)
            if male is not None:
                try:
                    if male != temp_males[-1]:
                        temp_males.append(male)
                except IndexError:
                    temp_males.append(male)
            if female is not None:
                try:
                    if female != temp_females[-1]:
                        temp_females.append(female)
                except IndexError:
                    temp_females.append(female)     
            if child is not None:
                try:
                    if child != temp_children[-1]:
                        temp_children.append(child)
                except IndexError:
                    temp_children.append(child)
            if infant is not None:
                try:
                    if infant != temp_infants[-1]:
                        temp_infants.append(infant)
                except IndexError:
                    temp_infants.append(infant)
            if total_bag is not None:
                try:
                    if total_bag != temp_total_bags[-1]:
                        temp_total_bags.append(total_bag)
                except IndexError:
                    temp_total_bags.append(total_bag)
            if total_bag_weight is not None:
                try:
                    if total_bag_weight != temp_total_bag_weights[-1]:
                        temp_total_bag_weights.append(total_bag_weight)
                except IndexError:
                    temp_total_bag_weights.append(total_bag_weight) 
    return flight, temp_estimated_zfws, temp_actual_zfws, temp_actual_fuels, temp_actual_tows, temp_min_take_off_fuels, temp_total_pax, temp_economy, temp_business, temp_jump_seats, temp_males, temp_females, temp_children, temp_infants, temp_total_bags, temp_total_bag_weights, temp_standbys, j_capacities_list, y_capacities_list, j_distributions_list, y_distributions_list

def collect_results(result):
    flight, temp_estimated_zfws, temp_actual_zfws, temp_actual_fuels, temp_actual_tows, temp_min_take_off_fuels, temp_total_pax, temp_economy, temp_business, temp_jump_seats, temp_males, temp_females, temp_children, temp_infants, temp_total_bags, temp_total_bag_weights, temp_standbys, j_capacities_list, y_capacities_list, j_distributions_list, y_distributions_list= result
    estimated_zfws[flight] = temp_estimated_zfws
    actual_zfws[flight] = temp_actual_zfws
    actual_take_off_fuels[flight] = temp_actual_fuels
    actual_tows[flight] = temp_actual_tows
    minimum_take_off_fuels[flight] = temp_min_take_off_fuels
    
    total_pax[flight] = temp_total_pax
    economy[flight] = temp_economy
    business[flight] = temp_business
    jump_seats[flight] = temp_jump_seats
    standbys[flight] = temp_standbys
    children[flight] = temp_children
    infants[flight] = temp_infants
    total_bags[flight] = temp_total_bags
    total_bag_weights[flight] = temp_total_bag_weights
    males[flight] = temp_males
    females[flight] = temp_females
    j_capacities_dicts[flight] = j_capacities_list
    y_capacities_dicts[flight] = y_capacities_list
    j_distributions_dicts[flight] = j_distributions_list
    y_distributions_dicts[flight] = y_distributions_list

start = time.time()
estimated_zfws = {}
actual_zfws = {}
actual_take_off_fuels = {}
actual_tows = {}
minimum_take_off_fuels = {}

total_pax = {}
economy = {}
business = {}
jump_seats = {}
standbys = {}
children = {}
infants = {}
males = {}
females = {}
total_bags = {}
total_bag_weights = {}


j_capacities_dicts = {}
y_capacities_dicts = {}
j_distributions_dicts = {}
y_distributions_dicts = {}


with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = [executor.submit(process_flight, flight, df) for flight in flight_ids[:1000]]
    for future in concurrent.futures.as_completed(futures):
        collect_results(future.result())

end = time.time()
print(end - start)

17.844433784484863


In [12]:
estimated_zfws_tuples = [(k, v) for k, v in estimated_zfws.items()]
actual_zfws_tuples = [(k, v) for k, v in actual_zfws.items()]
actual_take_off_fuels_tuples = [(k, v) for k, v in actual_take_off_fuels.items()]
minimum_take_off_fuels_tuples = [(k, v) for k, v in minimum_take_off_fuels.items()]
actual_tows_tuples = [(k, v) for k, v in actual_tows.items()]
total_pax_tuples = [(k, v) for k, v in total_pax.items()]
economy_tuples = [(k, v) for k, v in economy.items()]
business_tuples = [(k, v) for k, v in business.items()]
jump_seats_tuples = [(k, v) for k, v in jump_seats.items()]
standbys_tuples = [(k, v) for k, v in standbys.items()]
children_tuples = [(k, v) for k, v in children.items()]
infants_tuples = [(k, v) for k, v in infants.items()]
total_bags_tuples = [(k, v) for k, v in total_bags.items()]
total_bag_weights_tuples = [(k, v) for k, v in total_bag_weights.items()]
males_tuples = [(k, v) for k, v in males.items()]
females_tuples = [(k, v) for k, v in females.items()]
j_distributions_tuples = [(k, v) for k, v in j_distributions_dicts.items()]
y_distributions_tuples = [(k, v) for k, v in y_distributions_dicts.items()]
j_capacities_tuples = [(k, v) for k, v in j_capacities_dicts.items()]
y_capacities_tuples = [(k, v) for k, v in y_capacities_dicts.items()]


# Create DataFrame
estimated_zfws_df = pd.DataFrame(estimated_zfws_tuples, columns=['flight_id', 'estimated_zfws'])
actual_zfws_df = pd.DataFrame(actual_zfws_tuples, columns=['flight_id', 'actual_zfws'])
actual_take_off_fuels_df = pd.DataFrame(actual_take_off_fuels_tuples, columns=['flight_id', 'actual_take_off_fuels'])
minimum_take_off_fuels_df = pd.DataFrame(minimum_take_off_fuels_tuples, columns=['flight_id', 'minimum_take_off_fuels'])
actual_tows_df = pd.DataFrame(actual_tows_tuples, columns=['flight_id', 'actual_tows'])
total_pax_df = pd.DataFrame(total_pax_tuples, columns=['flight_id', 'total_pax'])
economy_df = pd.DataFrame(economy_tuples, columns=['flight_id', 'economy'])
business_df = pd.DataFrame(business_tuples, columns=['flight_id', 'business'])
jump_seats_df = pd.DataFrame(jump_seats_tuples, columns=['flight_id', 'jump_seats'])
standbys_df = pd.DataFrame(standbys_tuples, columns=['flight_id', 'standbys'])
children_df = pd.DataFrame(children_tuples, columns=['flight_id', 'children'])
infants_df = pd.DataFrame(infants_tuples, columns=['flight_id', 'infants'])
total_bags_df = pd.DataFrame(total_bags_tuples, columns=['flight_id', 'total_bags'])
total_bag_weights_df = pd.DataFrame(total_bag_weights_tuples, columns=['flight_id', 'total_bag_weights'])
males_df = pd.DataFrame(males_tuples, columns=['flight_id', 'males'])
females_df = pd.DataFrame(females_tuples, columns=["flight_id", "females"])
j_distributions_df = pd.DataFrame(j_distributions_tuples, columns=["flight_id", "j_distributions"])
y_distributions_df = pd.DataFrame(y_distributions_tuples, columns=["flight_id", "y_distributions"])
j_capacities_df = pd.DataFrame(j_capacities_tuples, columns=["flight_id", "j_capacities"])
y_capacities_df = pd.DataFrame(y_capacities_tuples, columns=["flight_id", "y_capacities"])

weights = pd.merge(estimated_zfws_df, actual_zfws_df, on='flight_id', how="outer")
weights = pd.merge(weights, actual_take_off_fuels_df, on='flight_id', how="outer")
weights = pd.merge(weights, minimum_take_off_fuels_df, on='flight_id', how="outer")
weights = pd.merge(weights, actual_tows_df, on='flight_id', how="outer")
weights = pd.merge(weights, total_pax_df, on='flight_id', how="outer")
weights = pd.merge(weights, economy_df, on='flight_id', how="outer")
weights = pd.merge(weights, business_df, on='flight_id', how="outer")
weights = pd.merge(weights, jump_seats_df, on='flight_id', how="outer")
weights = pd.merge(weights, standbys_df, on='flight_id', how="outer")
weights = pd.merge(weights, children_df, on='flight_id', how="outer")
weights = pd.merge(weights, infants_df, on='flight_id', how="outer")
weights = pd.merge(weights, total_bags_df, on='flight_id', how="outer")
weights = pd.merge(weights, total_bag_weights_df, on='flight_id', how="outer")
weights = pd.merge(weights, males_df, on='flight_id', how="outer")
weights = pd.merge(weights, females_df, on='flight_id', how="outer")
weights = pd.merge(weights, j_distributions_df, on='flight_id', how="outer")
weights = pd.merge(weights, y_distributions_df, on='flight_id', how="outer")
weights = pd.merge(weights, j_capacities_df, on='flight_id', how="outer")
weights = pd.merge(weights, y_capacities_df, on='flight_id', how="outer")


In [13]:
weights

Unnamed: 0,flight_id,estimated_zfws,actual_zfws,actual_take_off_fuels,minimum_take_off_fuels,actual_tows,total_pax,economy,business,jump_seats,...,children,infants,total_bags,total_bag_weights,males,females,j_distributions,y_distributions,j_capacities,y_capacities
0,MN-1030-2024-1-4-BGI,[],[],[],[],[],[236],[212],[24],[0],...,[4],[0],[0],[0],[108],[124],[],[],[],[]
1,MN-1030-2024-1-5-BGI,[],[],[],[],[],"[236, 240, 236, 240]","[212, 214, 212, 214]","[24, 26, 24, 26]",[0],...,"[4, 0, 4, 0]",[0],"[234, 223, 238, 223, 238, 218, 220, 226, 227, ...","[3510, 3345, 3570, 3345, 3570, 3270, 310300, 3...","[108, 123, 108, 123]","[124, 117, 124, 117]","[26, 0, 0]","[0, 124, 90]","[1030, 0, 0]","[0, 160, 127]"
2,MN-1030-2024-3-5-BGI,[],[],[],[],[],"[189, 188, 189, 188, 189, 188]","[168, 167, 168, 167, 168, 167]",[21],[0],...,"[2, 1, 2, 1]",[3],"[0, 186, 155, 202, 155, 186, 190, 199, 202]","[0, 2790, 2325, 10301030, 2325, 2790, 2850, 29...","[93, 102, 93, 102, 101, 102, 101, 102, 101]","[94, 86, 94, 86]","[21, 0, 0]","[0, 95, 72]","[1030, 0, 0]","[0, 160, 127]"
3,MN-1030-2024-5-5-BGI,[],[],[],[],[],[226],"[202, 201, 202, 201]","[24, 25, 24, 25]",[0],...,"[4, 1, 4, 1]",[1],"[0, 224, 233, 235, 237, 223, 224]","[0, 3360, 3495, 3525, 3555, 3345, 3360]","[112, 119, 112, 119]","[110, 106, 110, 106]","[25, 0, 0]","[0, 117, 84]","[1030, 0, 0]","[0, 160, 127]"
4,MN-1034-2024-2-5-MCO,[],[],[],[],[],"[181, 188, 187, 188, 187]","[163, 170, 169, 170, 169]",[18],[0],...,"[11, 8, 11, 8]",[3],"[0, 176, 216, 195, 169, 171, 168]","[0, 2640, 3240, 2925, 2535, 2565, 2520]","[82, 86, 96, 86, 96]","[88, 91, 83, 91, 83]","[18, 0, 0]","[0, 81, 88]","[30, 0, 0]","[0, 160, 127]"
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
995,MN-4250-2024-4-5-DUB,[],[],[],[],[],"[160, 164, 151, 164, 151]","[160, 164, 151, 164, 151]",[],[0],...,[1],[0],"[0, 160, 105, 70, 67, 62, 66]","[0, 2080, 1365, 910, 871, 806, 858]","[78, 77, 80, 82, 80, 82]","[81, 82, 83, 68, 83, 68]","[0, 0, 0]","[51, 47, 53]","[0, 0, 0]","[60, 60, 54]"
996,MN-4250-2024-5-5-DUB,[],[],[],[],[],"[85, 87, 88, 83, 88, 83, 88, 83, 79, 83, 81, 8...","[85, 87, 88, 83, 88, 83, 88, 83, 79, 83, 81, 8...",[],[0],...,[0],[0],"[0, 85, 70, 54, 70, 61, 70, 54]","[0, 1105, 910, 702, 910, 793, 910, 702]","[47, 46, 48, 49, 45, 49, 45, 49, 45, 42, 45, 4...","[38, 39, 38, 39, 38, 39, 38, 37, 38, 37, 38, 37]","[0, 0, 0]","[14, 39, 26]","[0, 0, 0]","[60, 60, 54]"
997,MN-4250-2024-7-5-DUB,[],[],[],[],[],"[78, 77, 76, 74, 76, 74, 70, 74, 70]","[78, 77, 76, 74, 76, 74, 70, 74, 70]",[],[0],...,[1],[0],"[0, 78, 70, 53, 27]","[0, 1014, 910, 689, 351]","[44, 43, 44, 50, 44, 50, 48, 50, 48]","[33, 34, 33, 31, 23, 31, 23, 21, 23, 21]","[0, 0, 0]","[12, 35, 23]","[0, 0, 0]","[60, 60, 54]"
998,MN-4250-2024-8-5-DUB,[],[],[],[],[],[84],[84],[],[0],...,[0],[0],"[0, 84, 71]","[0, 1092, 923]",[42],[42],"[0, 0, 0]","[7, 18, 18]","[0, 0, 0]","[60, 60, 54]"


In [15]:
weights["airline_code"] = weights["flight_id"].str[0:2]
weights["flight_number"] = weights["flight_id"].str[3:7]

In [16]:
weights.to_parquet("../data/data_parquet/weights.parquet", engine=ENGINE, compression=COMPRESSION)