# Exploration et analyse des résidus au cours des vols

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
from pathlib import Path
from datetime import datetime

import pandas as pd
import torch
import torch.nn as nn
from pytorch_lightning.callbacks import ModelCheckpoint
from pytorch_lightning.loggers import TensorBoardLogger
from tqdm import tqdm

from reader import FromPandasDataset
from model import (
    PhysicalModel,
    FlightMecaNet3DEq,
    GenericFlightMecaDataset,
    GenericFlightMecaDatasetFast,
)
from reader.preprocessing import convert_units
from postprocessing.anomaly_score import add_residue_stats
from result_analysis.dashboard import create_dashboard

## Variables

In [3]:
training_name = "2_epochs_only"

dataset_name = "Safire"
if dataset_name == "test":
    time_var = "Time, s"
    flight_name_var = "flight_name"
    mass_var = "MASS"
    jx_var = "JX"
    jy_var = "JY"
    jz_var = "JZ"
    alpha_var = "ALPHA"
    beta_var = "BETA"
    pressure_var = "PST"
    temp_var = "SAT"
    air_speed_var = "VTAS"
    mach_var = "MACH"
    gear_var = "MLG"
    flaps_bool_var = "DB"
    flaps_var = "DVOLIG"
    stab_var = "TRIM"
    elevator_var = "DM"
    rudder_var = "DN"
    aileron_var = "DL"
    spoiler_var = "DSPOIL"
    n1_var = "N1"
    altitude_var = "ZBPIL"
    roll_angle_var = "PHI"
elif dataset_name == "Safire":
    time_var = "time"
    flight_name_var = "flight_name"
    mass_var = "m"
    jx_var = "nx"
    jy_var = "ny"
    jz_var = "nz"
    alpha_var = "alphar"
    beta_var = "betar"
    pressure_var = "p"
    temp_var = "temp"
    air_speed_var = "tas"
    mach_var = "mach"
    gear_var = "ge"
    flaps_var = "fl"
    stab_var = "dSr"
    elevator_var = "dEr"
    rudder_var = "dRr"
    aileron_var = "dAr"
    throttle_var = "dT"
    altitude_var = "h"
    roll_angle_var = "phir"
else :
    raise ValueError(f"Unknown dataset {dataset_name}")

cx_input_vars = [
    alpha_var,
    flaps_var,
    stab_var,
    elevator_var,
    mach_var,
    gear_var,
]

cy_input_vars = [
    beta_var,
    rudder_var,
    mach_var,
]

cz_input_vars = [
    alpha_var,
    flaps_var,
    stab_var,
    elevator_var,
    mach_var,
    gear_var,
]

thrust_input_vars = [
    #n1_var,
    throttle_var,
    pressure_var,
    temp_var,
]

## Chargement des données

In [None]:
if dataset_name == "test":
    datapath = Path.cwd() / "test_data" / "test_data.csv"
    df = pd.read_csv(datapath)
    df[time_var] = datetime(2025, 1, 1) + pd.to_timedelta(df[time_var], unit="s")
    df = df.set_index([flight_name_var, time_var])
    convertion_dict = {
        "lbs": [mass_var],
        "deg": [
            alpha_var,
            beta_var,
            elevator_var,
            rudder_var,
            aileron_var,
            spoiler_var,
            roll_angle_var,
        ],
        "celcius": [temp_var],
        "kts": [air_speed_var],
    }
    df = convert_units(df, convertion_dict)

elif dataset_name == "Safire":
    dirpath = (
        Path.home()
        / "Documents"
        / "data"
        / "Safire_meghatropique"
        / "simulations"
        / "oop_control_old"
        / "light"
    )
    df = pd.DataFrame()
    for i, file in tqdm(enumerate(dirpath.glob("*.csv"))):
        temp_df = pd.read_csv(file).iloc[::5]
        temp_df[flight_name_var] = f"vol {i}"
        df = pd.concat([df, temp_df], axis=0)   
    del temp_df
    df["h"] = -df["ze"]
    df[time_var] = datetime(2025, 1, 1) + pd.to_timedelta(df[time_var], unit="s")
    df = df.set_index([flight_name_var, time_var])
    #df.index = df.index.set_levels(df.index.levels[1].to_timedelta(unit="s"), level=1)
else:
    raise ValueError("Unknown dataset")

## Création du dataset

In [6]:
to_normalize = cx_input_vars + cy_input_vars + cz_input_vars + thrust_input_vars
to_normalize = list(set(to_normalize))

data = FromPandasDataset(
    df,
    train_flight_names=None,  # répartition aléatoire avec utilisation d'une seed
    data_reduction=None,
    to_normalize=to_normalize,
    filter_train_phases=True,
    flight_phase_parameters={
        "alt_name":altitude_var,
        "flaps_name":flaps_var,
        "roll_angle_name":roll_angle_var,
    }
)

## Définition des hyperparamètres et initialisation du modèle

In [None]:
dataset_var_names_list = [
    cx_input_vars,
    cy_input_vars,
    cz_input_vars,
    thrust_input_vars,
]
dataset_var_names = [
    mass_var,
    jx_var,
    jy_var,
    jz_var,
    alpha_var,
    beta_var,
    pressure_var,
    temp_var,
    air_speed_var,
]
dataset_params = {
    "var_names": dataset_var_names,
    "var_names_list": dataset_var_names_list,
}


net_coef_dict = {
    "cx_net_coef": 1e-2,
    "cy_net_coef": 1e-3,
    "cz_net_coef": 1e-1,
    "trust_net_coef": 1e3,
}

net_params = {
    "cx_param_dim": len(cx_input_vars),
    "cy_param_dim": len(cy_input_vars),
    "cz_param_dim": len(cz_input_vars),
    "thrust_param_dim": len(thrust_input_vars),
    "regressor_layers": 3,
    "regressor_layer_dim": 64,
    "lr": 1e-5,
    "net_coef_dict": net_coef_dict,
    "equation_params": {
        "air_molar_mass": 29e-3,
        "gas_constant": 8.314,
        "wing_surface": 21.5,  # surface ailaire, en m^2
    },
}

saving_dir = Path.cwd() / "checkpoints" / training_name
log_dir = Path.cwd() / "logs"/ training_name

callbacks = [
    ModelCheckpoint(
        saving_dir,
        "checkpoint.ckpt",
        monitor="val loss",
    )
]

trainer_params = {
    "max_epochs": 200,
    "callbacks": callbacks,
    "logger": TensorBoardLogger(save_dir=log_dir),
    "accelerator": "gpu"
}

model = PhysicalModel(
    log_dir=log_dir,
    saving_dir=saving_dir,
    saving_name="checkpoint.ckpt",
    NetClass=FlightMecaNet3DEq,
    net_params=net_params,
    DatasetClass=GenericFlightMecaDatasetFast,
    dataset_params=dataset_params,
    trainer_params=trainer_params,
    batch_size=1024,
    num_loader_workers=8,  # à ajuster selon le nombre de coeurs dispos
)

## Entrainement du modèle

In [None]:
force_training = False
if not model._is_fitted or force_training:
    model.fit(data)
else:
    print("Modèle déjà entrainé !")

## Prédiction sur les données de test

In [None]:
res = model.predict(data.test, concat_predict_and_data=True)

In [None]:
res.columns

In [11]:
res = add_residue_stats(
    res,
    ["x_residue", "y_residue", "z_residue"],
    10,
    10,
    0.02
)

## Observation des données de vol et des résidus

In [None]:
create_dashboard(res, air_speed_var, altitude_var)