### Imports

In [1]:
import pandas as pd
import numpy as np
import os

import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import folium as fl

from math import pi, acos, cos, sin, tan

# Data Columns Dictionary
from flight_data_dict import flight_data_columns, ENERGY_LABEL, flight_data_columns_final

### Path

In [None]:
allFiles = os.listdir("flight_datalake")
dataPath = []

for file in allFiles:
    if file.endswith(".csv"):
        dataPath.append(f"flight_datalake/{file}")

dataPath

### Reading and Cleaning

In [None]:
class flight_data_analyser():

    def __init__(self, data_path, airport_ref):

        self.GLIDE_PATH_ANGLE = 3.1

        self.TOT_DIST = 0

        # MANAUS TDP
        tdp_lat_SBEG = -3.038627778
        tdp_lon_SBEG = -60.05777778

        #MANAUS ALTITUDE ft
        SBEG_alt = 252.0

        # BRASILIA TDP
        tdp_lat_SBBR = -15.863563
        tdp_lon_SBBR = -47.927720

        #BRASILIA ALTITUDE ft
        SBBR_alt = 3451.0

        # VIRACOPOS TDP
        tdp_lat_SBKP = -22.998310
        tdp_lon_SBKP = -47.147194

        #VIRACOPOS ALTITUDE ft
        SBKP_alt = 2139.0

        if airport_ref == 'VCP':
            self.tdp_lat = tdp_lat_SBKP
            self.tdp_lon = tdp_lon_SBKP
            self.alt_error = SBKP_alt
        elif airport_ref == 'BSB':
            self.tdp_lat = tdp_lat_SBBR
            self.tdp_lon = tdp_lon_SBBR
            self.alt_error = SBBR_alt
        elif airport_ref == 'MAO':
            self.tdp_lat = tdp_lat_SBEG
            self.tdp_lon = tdp_lon_SBEG
            self.alt_error = SBEG_alt


        self.flight_data = pd.read_csv(data_path, sep=";")

    def clean_flight_data(self, flightdata_dict, flightdata_dict_final):

        #self.flight_data.drop(self.flight_data.tail(3).index,inplace = True)

        #if "CORR LAT" in self.flight_data.columns or "LAT CORR" in self.flight_data.columns:
        #    self.flight_data.rename(columns=new_column_names, inplace=True)

        self.flight_data = self.flight_data[[col for col in flightdata_dict.keys()]]

        if len(self.flight_data["LATP"].str.split(".")) > 2:
            self.flight_data["LATP"] = self.flight_data["LATP"].str.replace(".", "", regex=False)
            self.flight_data["LATP"] = self.flight_data["LATP"].astype("Int64")
            self.flight_data["LATP"] = self.flight_data["LATP"] / 1000000

        if len(self.flight_data["LONP"].str.split(".")) > 2:
            self.flight_data["LONP"] = self.flight_data["LONP"].str.replace(".", "", regex=False)
            self.flight_data["LONP"] = self.flight_data["LONP"].astype("Int64")
            self.flight_data["LONP"] = self.flight_data["LONP"] / 1000000

        self.flight_data.astype(flightdata_dict)

        self.flight_data = self.flight_data.dropna()

        # DATE FILED TREATMENT
        self.flight_data["DATE"] = self.flight_data["DATE"].str.replace("XX/", "")
        self.flight_data["DATE"] = pd.to_datetime(self.flight_data["DATE"], format="%m/%y").dt.strftime("%m/%Y")

        # CALCULATION COLS
        self.flight_data = self.lonlatcorr_calc(self.flight_data)
        self.flight_data = self.delta_d_calc(self.flight_data)
        self.flight_data = self.accum_d_calc(self.flight_data)

        self.TOT_DIST = self.flight_data["ACCUM_D"].max()

        self.flight_data = self.tdd_calc(self.flight_data)
        self.flight_data = self.e_p_calc(self.flight_data)
        self.flight_data = self.e_c_calc(self.flight_data)
        self.flight_data = self.e_tot_calc(self.flight_data)
        self.flight_data = self.H_GEOM_REF_m_calc(self.flight_data)
        self.flight_data = self.H_GEOM_REF_ft_calc(self.flight_data)
        self.flight_data = self.sat_calc(self.flight_data)
        self.flight_data = self.theta_calc(self.flight_data)
        self.flight_data = self.sigma_calc(self.flight_data)
        self.flight_data = self.tas_calc(self.flight_data)
        self.flight_data = self.wind_comp_calc(self.flight_data)
        self.flight_data = self.e_p_ref_calc(self.flight_data)
        self.flight_data = self.e_c_ref_calc(self.flight_data)
        self.flight_data = self.E_TOT_ref_calc(self.flight_data)
        self.flight_data = self.e_ref_inf_calc(self.flight_data)
        self.flight_data = self.e_ref_supp_calc(self.flight_data)
        self.flight_data = self.flag_inf_calc(self.flight_data)
        self.flight_data = self.flag_sup_calc(self.flight_data)
        self.flight_data = self.energy_state_calc(self.flight_data)
        self.flight_data = self.color_set(self.flight_data)

        #self.flight_data = self.flight_data[[col for col in flightdata_dict_final.keys()]]
        self.flight_data.astype(flightdata_dict_final)

    def delta_d_calc(self, df):
        
        df["Delta_D"] = 0
        
        for i in range(1, len(df)):
            df.loc[i, "Delta_D"] = (180/pi)*60*acos((sin(df.loc[i-1, "LATCORR"]*pi/180)*sin(df.loc[i, "LATCORR"]*pi/180)+cos(df.loc[i, "LATCORR"]*pi/180)*cos(df.loc[i-1, "LATCORR"]*pi/180)*cos((df.loc[i, "LONCORR"]-df.loc[i-1, "LONCORR"])*pi/180)))
    
        df["Delta_D"] = df["Delta_D"].round(2)

        df = df.fillna(value=0)

        return df

    def accum_d_calc(self, df):

        df["ACCUM_D"] = 0

        for i in range(1, len(df)):
            df.loc[i, "ACCUM_D"] = df.loc[i-1, "ACCUM_D"] + df.loc[i, "Delta_D"]

        df["ACCUM_D"] = df["ACCUM_D"].round(1)

        return df
    
    def tdd_calc(self, df):
        
        df["TDD"] = self.TOT_DIST - df["ACCUM_D"]

        df["TDD"] = df["TDD"].round(1)

        return df
    
    def e_p_calc(self, df):

        df["e_p"] = 9.811 * (df["HEIGHT"] * 0.3048) / 1000

        df["e_p"] = df["e_p"].round(1)

        return df

    def e_c_calc(self, df):

        df["e_c"] = 0.5 * (df["GS"] * 0.5144).pow(2) / 1000

        df["e_c"] = df["e_c"].round(1)

        return df

    def e_tot_calc(self, df):

        df["e_tot"] = df["e_p"] + df["e_c"]

        df["e_tot"] = df["e_tot"].round(1)

        return df
    
    def H_GEOM_REF_m_calc(self, df):

        df["H_GEOM_REF_m"] = df["TDD"] * tan(self.GLIDE_PATH_ANGLE * pi / 180) * 1852

        df["H_GEOM_REF_m"] = df["H_GEOM_REF_m"].round(1)

        return df
    
    def H_GEOM_REF_ft_calc(self, df):

        df["H_GEOM_REF_ft"] = (df["H_GEOM_REF_m"] * 3.2808) + self.alt_error

        return df
    
    def sat_calc(self, df):

        df["SAT"] = df["TAT"] / ( 1 + 0.2 * df["MACH"].pow(2))

        df["SAT"] = df["SAT"].round(1)

        return df
    
    def theta_calc(self, df):

        df["theta"] = (df["SAT"] + 273.15) / 288.15

        df["theta"] = df["theta"].round(4)

        return df
    
    def sigma_calc(self, df):

        df["sigma"] = df["theta"].pow(4.2559)

        df["sigma"] = df["sigma"].round(4)

        return df
    
    def tas_calc(self, df):

        df["TAS"] = df["VLS"] / df["sigma"].pow(1/2)

        df["TAS"] = df["TAS"].round(1)

        return df
    
    def wind_comp_calc(self, df):
        
        df["WIND_COMP"] = 0

        for i in range(0, len(df)):
            df.loc[i, "WIND_COMP"] = cos((df.loc[i, "HEAD"] - df.loc[i, "WINDIR"]) * pi / 180) * df.loc[i, "WINSPD"]
        
        df["WIND_COMP"] = df["WIND_COMP"].round(1)

        return df
    
    def e_p_ref_calc(self, df):

        df["e_p_ref"] = 0

        df["e_p_ref"] = df["H_GEOM_REF_m"] * 9.8611 / 1000

        df["e_p_ref"] = df["e_p_ref"].round(1)

        return df
    
    def e_c_ref_calc(self, df):

        df["e_c_REF"] = 0

        df["e_c_REF"] = 0.5 * ((df["TAS"] + 5 + df["WIND_COMP"]) * 0.5144).pow(2) / 1000

        df["e_c_REF"] = df["e_c_REF"].round(1)

        return df
    
    def E_TOT_ref_calc(self, df):

        df["E_TOT_ref"] = 0

        df["E_TOT_ref"] = df["e_p_ref"] + df["e_c_REF"]

        df["E_TOT_ref"] = df["E_TOT_ref"].round(1)

        return df
    
    def e_ref_inf_calc(self, df):

        df["e_ref_inf"] = 0

        df["e_ref_inf"] = df["E_TOT_ref"] * 0.8

        df["e_ref_inf"] = df["e_ref_inf"].round(1)

        return df
    
    def e_ref_supp_calc(self, df):

        df["e_ref_supp"] = 0

        df["e_ref_supp"] = df["E_TOT_ref"] * 1.2

        df["e_ref_supp"] = df["e_ref_supp"].round(1)

        return df
    
    def flag_inf_calc(self, df):

        df["FLAG_INF"] = 0

        for i in range(0, len(df)):
            df.loc[i, "FLAG_INF"] = 1 if df.loc[i, "e_tot"] < df.loc[i, "e_ref_inf"] else 0

        return df
    
    def flag_sup_calc(self, df):

        df["FLAG_SUP"] = 0

        for i in range(0, len(df)):
            df.loc[i, "FLAG_SUP"] = 1 if df.loc[i, "e_tot"] > df.loc[i, "e_ref_supp"] else 0

        return df
    
    def energy_state_calc(self, df):

        df["energy_state"] = ""

        for i in range(0, len(df)):
            df.loc[i, "energy_state"] = "LOW ENERGY" if df.loc[i, "FLAG_INF"] == 1 else "HIGH ENERGY" if df.loc[i, "FLAG_SUP"] == 1 else "ON PATH"

        return df

    def color_set(self, df):
        
        df["COLOR"] = ""

        for i in range(0, len(df)):
            df.loc[i, "COLOR"] = "orange" if df.loc[i, "energy_state"] == "LOW ENERGY" else "blue" if df.loc[i, "energy_state"] == "ON PATH" else "red"

        return df
    
    def lonlatcorr_calc(self, df):
        
        df["LONCORR"] = 0.0
        df["LATCORR"] = 0.0
        counter = 0

        for i in range(0, len(df)):
            #print(f"FPHASE:{df.loc[i, 'FPHASE']} | LDGL:{df.loc[i, 'LDGL']} | LDGR:{df.loc[i, 'LDGR']}")
            if df.loc[i, "FPHASE"].__contains__("LANDING") and df.loc[i, "LDGL"].__contains__("GROUND") and df.loc[i, "LDGR"].__contains__("GROUND") and counter == 0:
                lat_error = df.loc[i, "LATP"] - self.tdp_lat
                lon_error = df.loc[i, "LONP"] - self.tdp_lon
                counter += 1

        for i in range(0, len(df)):
            #if df.loc[i, "FPHASE"] != "LANDING" and df.loc[i, "LDGL"] != "GROUND" and df.loc[i, "LDGR"] != "GROUND":            
            df.loc[i, "LATCORR"] = df.loc[i, "LATP"] - lat_error
            df.loc[i, "LONCORR"] = df.loc[i, "LONP"] - lon_error
                
        return df

### RUN FLIGHT ENERGY ANALYSIS

In [None]:
for data_path in dataPath:
    flight_obj = flight_data_analyser(data_path=data_path, airport_ref=data_path.split('_')[2].split('.')[0])
    flight_obj.clean_flight_data(flight_data_columns, flight_data_columns_final)
    print("FLIGHT: " + data_path)
    print("TOT_DIST: " + str(flight_obj.TOT_DIST))
    flight_obj.flight_data.to_csv("Result_datalake/" + str(data_path.split('_')[2].replace('.csv', '')) + "/" + str(data_path.split('/')[1]), index=False)
    print(data_path + " PROCESSED AND SAVED!")


### Result Visualization

In [None]:
allResultFiles_BSB = os.listdir("Result_datalake/BSB")
allResultFiles_VCP = os.listdir("Result_datalake/VCP")
allResultFiles_MAO = os.listdir("Result_datalake/MAO")

allResultFiles = allResultFiles_BSB + allResultFiles_VCP + allResultFiles_MAO

resultDataPath = []

for file in allResultFiles:
    if file.endswith(".csv"):
        resultDataPath.append("Result_datalake/"+ str(file.split('_')[1].replace('.csv', '')) + "/" + file)

# FLIGHTS SELECTION
flights_to_select =[]
#######

if len(flights_to_select) != 0:
    resultDataPath = []
    for file in allResultFiles:
        for flight in flights_to_select:
            if file.__contains__(flight):
                resultDataPath.append(f"Result_datalake/{file}")

resultDataPath

### LATERAL PROFILE

In [None]:
airport_map = fl.Map(zoom_start=11)

for data_path in resultDataPath:
        flight_obj = pd.read_csv(data_path)

        print(f"\n\nLATERAL PROFILE: {str(data_path)}")
        for i in range(1, len(flight_obj)):

                msg = f"FLIGHT: {data_path.split('/')[1].split('.')[0]} | LAT:{flight_obj.loc[i-1, 'LATCORR']} | LON:{flight_obj.loc[i-1, 'LONCORR']} | ALT:{flight_obj.loc[i-1, 'ALTSTDC']} | CAS:{flight_obj.loc[i-1, 'CAS']}"

                fl.PolyLine(tuple([(flight_obj.loc[i-1, "LATCORR"], flight_obj.loc[i-1, "LONCORR"]),(flight_obj.loc[i, "LATCORR"], flight_obj.loc[i, "LONCORR"])]), color=flight_obj.loc[i-1, "COLOR"], weight=1, opacity=1, tooltip=msg).add_to(airport_map)

airport_map

In [None]:
for data_path in resultDataPath:
        print(data_path)
        flight_obj = pd.read_csv(data_path)
        #display(flight_obj)

        print(f"\n\nALT X CAS PROFILE: {str(data_path)}")
        # Create figure with secondary y-axis
        fig_height = make_subplots(specs=[[{"secondary_y": True}]])
        # Add traces
        fig_height.add_trace(
        go.Scatter(x=flight_obj["TDD"], y=flight_obj["ALTQNH"], name="ALTQNH"),
        secondary_y=False,
        )
        fig_height.add_trace(
        go.Scatter(x=flight_obj["TDD"], y=flight_obj["CAS"], name="CAS"),
        secondary_y=True,
        )
        # Add figure title
        fig_height.update_layout(
        title_text="ALT X CAS PROFILE"
        )
        # Set x-axis title
        fig_height.update_xaxes(title_text="TDD")
        # Set y-axes titles
        fig_height.update_yaxes(title_text="<b>ALTQNH</b>", secondary_y=False)
        fig_height.update_yaxes(title_text="<b>CAS</b>", secondary_y=True)
        fig_height.write_image("Result_datalake/ALTXCAS/" + data_path.split('/')[1] + "/" + data_path.split('/')[2].replace('.csv', '.png'))
        

        print(f"\n\nVERTICAL PROFILE: {str(data_path)}")
        fig_height = px.scatter(flight_obj, x="TDD", y="ALTSTDC", hover_name=flight_obj["energy_state"], color="energy_state", color_discrete_map={"LOW ENERGY" : "orange", "ON PATH" : "blue", "HIGH ENERGY" : "red"})
        fig_height.update_traces(marker=dict(size=3))
        fig_height.update_layout(title=f"VERTICAL PROFILE: {str(data_path)}", title_x=1)
        fig_height.add_traces(px.line(flight_obj, x="TDD", y="H_GEOM_REF_ft", title="Reference Line", color_discrete_sequence=["black"], line_dash_sequence=["dash"]).data)
        fig_height.write_image("Result_datalake/Vertical_Profile/" + data_path.split('/')[1] + "/" + data_path.split('/')[2].replace('.csv', '.png'))
        #fig_height.show()

        print(f"\n\nENERGY PROFILE: {str(data_path)}")
        fig_height = px.scatter(flight_obj, x="TDD", y="e_tot", hover_name=flight_obj["energy_state"], color="energy_state", color_discrete_map={"LOW ENERGY" : "orange", "ON PATH" : "blue", "HIGH ENERGY" : "red"})
        fig_height.update_traces(marker=dict(size=3))
        fig_height.add_traces(px.line(flight_obj, x="TDD", y="e_ref_inf", title="Inferior Limit", color_discrete_sequence=['orange']).data)
        fig_height.add_traces(px.line(flight_obj, x="TDD", y="e_ref_supp", title="Superior Limit", color_discrete_sequence=['red']).data)
        fig_height.add_traces(px.line(flight_obj, x="TDD", y="E_TOT_ref", title="Reference Line", color_discrete_sequence=["black"], line_dash_sequence=["dash"]).data)
        fig_height.update_layout(title=f"ENERGY PROFILE: {str(data_path)}", title_x=1)
        fig_height.write_image("Result_datalake/Energy_Profile/" + data_path.split('/')[1] + "/" + data_path.split('/')[2].replace('.csv', '.png'))
        #fig_height.show()