<a href="https://colab.research.google.com/github/Mateo755/UAV_ML_FDI/blob/main/FDI_UAV_W%26B_(Colab).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# UAV Propeller Fault Detection System (Parrot Bebop 2)

This project focuses on the development and validation of fault detection and isolation (FDI) methods for the propulsion system of the **Parrot Bebop 2** unmanned aerial vehicle (UAV). The analysis utilizes inertial sensor data (accelerometer and gyroscope) collected during real-world flight experiments.

### Research Problem
The primary objective is to classify the technical state of the propellers based on vibration signals. We analyze various fault scenarios across four rotors (A, B, C, D), distinguishing between nominal states and specific defects such as chipped edges or bent blades.

### Methodology
This notebook compares two signal processing approaches:
1.  **Time Domain Analysis**
2.  **Frequency Domain Analysis**

Experiment tracking and performance visualization are managed via **Weights & Biases (W&B)**.

# 1. Environment Setup & Global Configuration
Installation of necessary libraries (Weights & Biases for experiment tracking) and importing standard data science modules.

In [None]:
!pip install wandb -q
import wandb
import os
import glob
import time
import json
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report, precision_score, \
                            recall_score, f1_score, roc_curve, auc, ConfusionMatrixDisplay
from sklearn.preprocessing import label_binarize
from tensorflow.keras import layers, models, regularizers
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.utils import plot_model
from wandb.integration.keras import WandbMetricsLogger, WandbModelCheckpoint

# Login to W&B
wandb.login()

# 2. Exploratory Data Analysis
Initial inspection of the accelerometer and gyroscope data from the UAV. We examine the column structure.

## 2.1 Time domain

In [None]:
!unzip -q /content/Normalized_data.zip

In [None]:
df_time = pd.read_csv('/content/Normalized_data/Bebop2_16g_1kdps_normalized_0000.csv')
df_time

In [None]:
df_time.info()

## 2.2 Frequency Domain

In [None]:
!unzip -q /content/FFT_data.zip

In [None]:
df_freq = pd.read_csv('/content/FFT_data/128_Hann_20_52/Bebop2_16g_FFT_ACCEL_128_Hann_20_52_0000.csv', header=None)
df_freq

In [None]:
df_freq.info()

# 3. Fault Scenario Mapping

The Bebop 2 flight data is labeled using a 4-digit code (e.g., `1022`), defining the state of each propeller (A, B, C, D):
* **0**: Nominal (Functional propeller).
* **1**: Fault Type I (e.g., chipped edge).
* **2**: Fault Type II (e.g., bent tip/severe damage).

Below, we define the mapping of these physical scenarios to model class labels. Depending on the diagnostic granularity required, the problem can be framed as a **5-class problem** (aggregated by the number of faults) or a **20-class problem** (precise fault configuration).

In [None]:
# Select Experiment Mode
CLASS_MODE = "20class"   # Options: "5class" (aggregated) or "20class" (precise diagnosis)

In [None]:
# Precise mapping for 20 unique scenarios (Parrot Bebop 2)
scenario_to_class_20 = {
    "0000": 0,  # Nominal state
    "1000": 1, "0100": 2, "0010": 3, "0001": 4,     # Single faults (Type 1)
    "2000": 5, "0200": 6, "0020": 7, "0002": 8,     # Single faults (Type 2)
    "1100": 9, "1020": 10, "1002": 11, "0120": 12, "0102": 13, "0022": 14, # Dual faults
    "1120": 15, "1102": 16, "1022": 17, "0122": 18, # Triple faults
    "1122": 19, # All propellers faulty
}

# Simplified mapping (Number of faulty rotors)
scenario_to_class_5 = {
    "0000": 0,
    "1000": 1, "0100": 1, "0010": 1, "0001": 1,
    "2000": 1, "0200": 1, "0020": 1, "0002": 1,
    "1100": 2, "1020": 2, "1002": 2, "0120": 2, "0102": 2, "0022": 2,
    "1120": 3, "1102": 3, "1022": 3, "0122": 3,
    "1122": 4,
}

if CLASS_MODE == "5class":
    scenario_to_class = scenario_to_class_5
elif CLASS_MODE == "20class":
    scenario_to_class = scenario_to_class_20
else:
    raise ValueError("Invalid CLASS_MODE selected.")

NUM_CLASSES = len(set(scenario_to_class.values()))
print(f"Experiment Mode: {CLASS_MODE} | Total Classes: {NUM_CLASSES}")

# 3. Time Domain Analysis, data preparation

In this section, we process normalized time-series signals from the accelerometers and gyroscopes. Since the data represents a continuous flight stream, we apply a **sliding window** technique to segment the signal into fixed-length samples (e.g., 256 measurement points).



## 3.1 Data Segmentation

In [None]:
# Path to normalized time-domain data
DATA_DIR_TIME = r"/content/Normalized_data"
SAMPLE_SIZE = 8             # Window length
N_FEATURES = 24               # Input channels (e.g., 3-axis accel + 3-axis gyro per sensor)
SENSOR_MODE = "both"          # "accel" + "gyro"

In [None]:
def make_windows_from_df(df: pd.DataFrame, sample_size: int):
    """
    Segments time-series data into non-overlapping windows.
    """
    data = df.values.astype("float32")
    n_total = len(data)
    n_windows = n_total // sample_size
    if n_windows == 0:
        return np.empty((0, sample_size, data.shape[1]), dtype="float32")
    data = data[:n_windows * sample_size]
    windows = data.reshape(n_windows, sample_size, data.shape[1])
    return windows

### Processing Pipeline
The following loop iterates through all normalized CSV files in time domain. For each file, it:
1.  Extracts the scenario code (e.g., `0000`) from the filename.
2.  Checks if the scenario exists in our defined class mapping.
3.  Loads the data and applies the sliding window segmentation.
4.  Accumulates the processed windows (`X`) and corresponding labels (`y`) into a single dataset.

In [None]:
# Data pass through window function

X_list = []
y_list = []

norm_data_files_pattern = os.path.join(DATA_DIR_TIME, "Bebop2_16g_1kdps_normalized_*.csv")

for path in glob.glob(norm_data_files_pattern):
    fname = os.path.basename(path)
    # ostatni fragment po "_" to kod scenariusza, np. "0000"
    scenario = os.path.splitext(fname)[0].split("_")[-1]

    if scenario not in scenario_to_class:
        print(f"Pomijam {fname} ‚Äì scenariusz {scenario} nie jest w mapowaniu.")
        continue

    label = scenario_to_class[scenario]

    df = pd.read_csv(path)

    # je≈ºeli w pliku sƒÖ inne kolumny ni≈º 24 sensory, tu mo≈ºna wybraƒá tylko potrzebne:
    # df = df[["A_aX","A_aY",...,"D_gZ"]]

    windows = make_windows_from_df(df, SAMPLE_SIZE)
    if windows.shape[0] == 0:
        print(f"Za ma≈Ço danych w {fname} na choƒá jedno okno, pomijam.")
        continue

    X_list.append(windows)
    y_list.append(np.full((windows.shape[0],), label, dtype="int32"))

X = np.concatenate(X_list, axis=0)  # (N, SAMPLE_SIZE, 24)
y = np.concatenate(y_list, axis=0)  # (N,)

print("X shape:", X.shape)
print("y shape:", y.shape, "unikalne etykiety:", np.unique(y))

input_shape = (SAMPLE_SIZE, N_FEATURES)
print("input_shape modelu:", input_shape)

# 4. Frequency Domain Analysis (FFT)

Mechanical faults in rotating components (such as propellers) generate distinct vibration signatures that are often most discernible in the frequency spectrum.

In this experiment, we utilize data pre-processed via **Fast Fourier Transform (FFT)** using a Hann window to mitigate spectral leakage, which is already done in repo. The input features are vectors of spectral coefficients for each sensor axis.

## 4.1 Spectral Data Structure
The FFT files contain metadata within their filenames (window length, window type, frequency range). The following code parses this information and loads the corresponding spectral coefficients.

In [None]:
# Configuration for FFT Data
FFT_ROOT      = "FFT_data"
FFT_CONFIG    = "256_Hann_40_104"    # Specific window/range configuration
SENSOR_MODE   = "both"               # "accel", "gyro", or "both"
SAMPLING_RATE = 500.0                # Sampling rate for Bebop 2 inertial sensors

# liczba osi na jeden typ czujnika
N_AXES_SINGLE = 12
N_AXES = 12 if SENSOR_MODE in ("accel", "gyro") else 24

fft_dir = os.path.join(FFT_ROOT, FFT_CONFIG)

In [None]:
def print_fft_info(fft_dir, sampling_rate=500.0):
    """
    Extracts FFT parameters encoded in the data folder name and converts them
    into physical frequency values.

    The function assumes the folder name follows the format:
    'WindowLength_WindowType_StartBin_StopBin' (e.g., '128_Hann_20_52').
    """

    folder_name = os.path.basename(os.path.normpath(fft_dir))
    parts = folder_name.split("_")          # np. ["128","Hann","20","52"]
    measuringWindowLength = int(parts[0])   # 128
    rangeStart = int(parts[-2])             # 20
    rangeStop  = int(parts[-1])             # 52

    freq_res = sampling_rate / measuringWindowLength
    f_start  = (rangeStart - 1) * freq_res
    f_stop   = rangeStop * freq_res

    print(f"Folder FFT: {folder_name}")
    print(f"measuringWindowLength = {measuringWindowLength}")
    print(f"Zakres bin√≥w: {rangeStart}‚Äì{rangeStop}")
    print(f"Rozdzielczo≈õƒá czƒôstotliwo≈õci: {freq_res:.3f} Hz")
    print(f"Zakres czƒôstotliwo≈õci: {f_start:.1f} Hz ‚Äì {f_stop:.1f} Hz")

    return measuringWindowLength, rangeStart, rangeStop, freq_res, f_start, f_stop

In [None]:
measuringWindowLength, rangeStart, rangeStop, freq_res, f_start, f_stop = print_fft_info(fft_dir, SAMPLING_RATE)

### File Discovery & Categorization
This block scans the directory for all CSV files and organizes them into dictionaries based on the sensor type (**ACCEL** vs. **GYRO**). It parses the filename to extract the specific fault scenario code (e.g., `0000`, `1022`), using it as a key for quick lookup during the data loading phase.

In [None]:
# Scan directory and map file paths t# Scan directory and map file paths to scenarios based on sensor type (ACCEL/GYRO)o scenarios based on sensor type (ACCEL/GYRO)

all_files = glob.glob(os.path.join(fft_dir, "*.csv"))

accel_files = {}  # scenario -> ≈õcie≈ºka
gyro_files  = {}

for path in all_files:
    fname = os.path.basename(path)
    parts = fname.split("_")
    # przyk≈Çad: Bebop2_16g_FFT_ACCEL_128_Hann_20_52_0000.csv
    # indeksy:   0      1   2   3     4    5    6   7   8
    sensor_type = parts[3]              # "ACCEL" albo "GYRO"
    scenario    = os.path.splitext(parts[-1])[0]  # "0000" itd.

    if sensor_type == "ACCEL":
        accel_files[scenario] = path
    elif sensor_type == "GYRO":
        gyro_files[scenario] = path

print("Znaleziono ACCEL dla scenariuszy:", sorted(accel_files.keys()))
print("Znaleziono GYRO  dla scenariuszy:", sorted(gyro_files.keys()))


### Data Loading & Sensor Fusion
In this step, we aggregate the spectral data based on the selected `SENSOR_MODE`.
* **Accel/Gyro:** Loads only the specified sensor data.
* **Both:** Loads both accelerometer and gyroscope files for the same scenario, verifying consistency, and concatenates them along the feature axis to create a unified feature vector (e.g., 24 input channels).

In [None]:
def load_fft_file(path, n_axes=N_AXES_SINGLE):
    """
    Loads spectral data from a CSV file and reshapes it into a 3D tensor.

    The function reads a flat CSV (assuming no header), calculates the number
    of frequency bins based on the total columns and specified axes, and
    restructures the data.
    """

    # je≈õli oka≈ºe siƒô, ≈ºe plik ma nag≈Ç√≥wek ‚Äì zmie≈Ñ na header=0
    df = pd.read_csv(path, header=None)
    data = df.values.astype("float32")   # (n_okien, n_features)
    n_features = data.shape[1]

    if n_features % n_axes != 0:
        raise ValueError(f"{os.path.basename(path)}: {n_features} kolumn "
                         f"nie dzieli siƒô przez {n_axes} osi.")

    n_freq_bins = n_features // n_axes
    data_3d = data.reshape(-1, n_freq_bins, n_axes)  # (n_okien, n_freq_bins, n_axes)
    return data_3d, n_freq_bins

In [None]:
X_list = []
y_list = []
n_freq_bins_global = None

for scenario, label in scenario_to_class.items():
    cur_X = None

    if SENSOR_MODE == "accel":
        path = accel_files.get(scenario)
        if path is None:
            print(f"[ACCEL] brak pliku dla scenariusza {scenario}, pomijam.")
            continue
        accel_data, n_freq_bins = load_fft_file(path)
        cur_X = accel_data   # (n_okien, n_freq_bins, 12)

    elif SENSOR_MODE == "gyro":
        path = gyro_files.get(scenario)
        if path is None:
            print(f"[GYRO] brak pliku dla scenariusza {scenario}, pomijam.")
            continue
        gyro_data, n_freq_bins = load_fft_file(path)
        cur_X = gyro_data    # (n_okien, n_freq_bins, 12)

    elif SENSOR_MODE == "both":
        path_a = accel_files.get(scenario)
        path_g = gyro_files.get(scenario)
        if path_a is None or path_g is None:
            print(f"[BOTH] brak ACCEL lub GYRO dla {scenario}, pomijam.")
            continue

        accel_data, n_freq_bins_a = load_fft_file(path_a, n_axes=N_AXES_SINGLE)
        gyro_data,  n_freq_bins_g = load_fft_file(path_g, n_axes=N_AXES_SINGLE)

        if accel_data.shape[0] != gyro_data.shape[0] or n_freq_bins_a != n_freq_bins_g:
            raise ValueError(f"Niezgodne rozmiary ACCEL/GYRO dla scenariusza {scenario}")

        # sklejanie po osi ‚Äûkana≈Ç√≥w‚Äù: 12 (ACCEL) + 12 (GYRO) = 24
        cur_X = np.concatenate([accel_data, gyro_data], axis=-1)  # (..., n_freq_bins, 24)
        n_freq_bins = n_freq_bins_a

    else:
        raise ValueError("SENSOR_MODE musi byƒá 'accel', 'gyro' albo 'both'")

    # ustaw / sprawd≈∫ globalnƒÖ liczbƒô bin√≥w
    if n_freq_bins_global is None:
        n_freq_bins_global = n_freq_bins
    elif n_freq_bins_global != n_freq_bins:
        raise ValueError("R√≥≈ºne n_freq_bins miƒôdzy plikami, co≈õ jest nie tak.")

    X_list.append(cur_X)
    y_list.append(np.full((cur_X.shape[0],), label, dtype="int32"))

# Sklejenie wszystkiego
X = np.concatenate(X_list, axis=0)   # (N, n_freq_bins, N_AXES)
y = np.concatenate(y_list, axis=0)   # (N,)

print("X shape:", X.shape)
print("y shape:", y.shape, "unikalne etykiety:", np.unique(y))

n_freq_bins = n_freq_bins_global
input_shape = (n_freq_bins, N_AXES)
print("input_shape modelu:", input_shape)


# 5. Models Creation

Controller of the W&B logger

In [None]:
# Wybierz jeden tryb:
# "online"   -> Wysy≈Ça od razu do chmury
# "offline"  -> Zapisuje na dysku (wymaga wandb sync)
# "disabled" -> Nic nie robi (debugowanie)
LOG_MODE = "online"

SHOULD_LOG = (LOG_MODE != "disabled")

## 5.1 ANN - MLP

In [None]:
# Initialize W&B Experiment, CONFIG
wandb.init(
    #project="UAV-FDI-Bebop2",
    project="UAV-FDI",
    name="Time_MLP_v3",
    mode=LOG_MODE,
    config={
        "domain": "time",
        "sensor_mode": SENSOR_MODE,
        "input_shape": input_shape,
        "output_shape": NUM_CLASSES,
        "layer_1": 128,
        "dropout_rate": 0.2,
        #"l2_value": 0.01,
        "activation": "relu",
        "final_activation": "softmax",
        "optimizer": "sgd",
        "loss": "sparse_categorical_crossentropy",
        "metric": "accuracy",
        "learning_rate": 0.001,
        "epochs": 10,
        "batch_size": 512
    }
)


config = wandb.config

In [None]:
model = models.Sequential([
layers.Input(shape=input_shape),
layers.Flatten(),
layers.Dense(config.layer_1, activation=config.activation),
layers.Dropout(config.dropout_rate),
layers.Dense(NUM_CLASSES, activation=config.final_activation)
])

In [None]:
model.compile(
    optimizer=config.optimizer,
    loss=config.loss,
    metrics=["accuracy"],
)

model.summary()

In [None]:
# Generating model structure chart
plot_model(model, to_file='model_arch.png', show_shapes=True, show_layer_names=True)

wandb.log({"charts/model_architecture": wandb.Image('model_arch.png')})

if LOG_MODE == "online":
    print("Schemat modelu zosta≈Ç wys≈Çany do chmury W&B.")

elif LOG_MODE == "offline":
    print("Zapisano lokalnie.")

else: # disabled
    print("Logowanie wy≈ÇƒÖczone - nic nie wys≈Çano.")

# 6. Data splitting - train/val/test
To ensure efficient training, we convert the NumPy arrays into `tf.data.Dataset` objects. We apply:
* **Shuffling:** To prevent the model from learning order-dependent patterns.
* **Batching:** Grouping samples for gradient updates.
* **Prefetching:** Using `AUTOTUNE` to prepare the next batch of data while the GPU is training on the current one, significantly reducing latency.

In [None]:
X_train, X_temp, y_train, y_temp = train_test_split(
    X, y, test_size=0.3, random_state=42, stratify=y
)
X_val, X_test, y_val, y_test = train_test_split(
    X_temp, y_temp, test_size=0.5, random_state=42, stratify=y_temp
)


train_ds = tf.data.Dataset.from_tensor_slices((X_train, y_train)) \
    .shuffle(buffer_size=len(X_train)) \
    .batch(config.batch_size) \
    .prefetch(tf.data.AUTOTUNE)

val_ds = tf.data.Dataset.from_tensor_slices((X_val, y_val)) \
    .batch(config.batch_size) \
    .prefetch(tf.data.AUTOTUNE)

test_ds = tf.data.Dataset.from_tensor_slices((X_test, y_test)) \
    .batch(config.batch_size)

# 6. Training, Evaluation, and Experiment Logging

This comprehensive block executes the core machine learning pipeline:

1.  **Training:** The model is trained using `model.fit()` with callbacks to:
    * Log live training metrics to W&B (`WandbMetricsLogger`).
    * Save the best model weights based on validation accuracy (`ModelCheckpoint`).
2.  **Inference & Metrics:** After training, the model generates predictions on the held-out test set. We calculate key performance indicators: **Accuracy**, **Precision**, **Recall**, and **F1-Score** (macro-averaged).
3.  **Visualization:** Standard diagnostic plots are generated using Matplotlib:
    * **Learning Curves:** Loss and Accuracy over epochs.
    * **Confusion Matrix:** To visualize misclassifications between classes.
    * **ROC Curves:** To analyze the trade-off between True Positive Rate and False Positive Rate for each class.
4.  **Final Logging:** All computed scalars, generated plots, and the saved model artifact are uploaded to the **Weights & Biases** dashboard to finalize the experiment run.

In [None]:
# ==============================================================================
# 3. TRENOWANIE
# ==============================================================================

print("Rozpoczynam trenowanie...")
history = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=config.epochs,
    callbacks=[
        # Loguje przebieg treningu (krzywe live)
        WandbMetricsLogger(log_freq=5),
        # Zapisuje najlepszy model (wymagane .keras dla Keras 3)
        ModelCheckpoint(
            filepath="checkpoints/model_best.keras",
            save_best_only=True,
            monitor="val_accuracy",
            mode="max",
        )
    ]
)

# ==============================================================================
# 4. EWALUACJA I OBLICZANIE METRYK
# ==============================================================================

print("\nGenerowanie predykcji na zbiorze testowym...")
y_true_list = []
y_proba_list = []

for x_batch, y_batch in test_ds:
    y_true_list.append(y_batch.numpy())
    y_proba_list.append(model.predict(x_batch, verbose=0))

y_true = np.concatenate(y_true_list, axis=0)   # (N,)
y_proba = np.concatenate(y_proba_list, axis=0) # (N, NUM_CLASSES)
y_pred = np.argmax(y_proba, axis=1)            # (N,)

# Upewniamy siƒô, ≈ºe y_true to inty
if y_true.ndim > 1:
    y_true_int = np.argmax(y_true, axis=1)
else:
    y_true_int = y_true.astype(int)

# --- Metryki Liczbowe ---
test_loss, test_acc = model.evaluate(test_ds, verbose=0)
prec = precision_score(y_true_int, y_pred, average="macro")
rec = recall_score(y_true_int, y_pred, average="macro")
f1 = f1_score(y_true_int, y_pred, average="macro")

print(f"Loss: {test_loss:.4f}, Acc: {test_acc:.4f}, F1: {f1:.4f}")

# ==============================================================================
# 5. TWORZENIE WYKRES√ìW (MATPLOTLIB)
# ==============================================================================

# A. Loss Plot
fig_loss = plt.figure(figsize=(8, 5))
plt.plot(history.history['loss'], label='Train Loss')
plt.plot(history.history['val_loss'], label='Val Loss')
plt.title('Training Loss')
plt.legend(); plt.grid(True)

# B. Accuracy Plot
fig_acc = plt.figure(figsize=(8, 5))
plt.plot(history.history['accuracy'], label='Train Acc')
plt.plot(history.history['val_accuracy'], label='Val Acc')
plt.title('Training Accuracy')
plt.legend(); plt.grid(True)

# C. Confusion Matrix
cm = confusion_matrix(y_true_int, y_pred)
fig_cm, ax = plt.subplots(figsize=(8, 8))
ConfusionMatrixDisplay(cm).plot(ax=ax, cmap='Blues', values_format='d')
plt.title('Confusion Matrix')

# D. ROC Curve
y_true_bin = label_binarize(y_true_int, classes=np.arange(NUM_CLASSES))
fpr, tpr, roc_auc = {}, {}, {}
for i in range(NUM_CLASSES):
    fpr[i], tpr[i], _ = roc_curve(y_true_bin[:, i], y_proba[:, i])
    roc_auc[i] = auc(fpr[i], tpr[i])

fig_roc = plt.figure(figsize=(10, 8))
for i in range(NUM_CLASSES):
    plt.plot(fpr[i], tpr[i], label=f'Cls {i} (AUC={roc_auc[i]:.2f})')
plt.plot([0, 1], [0, 1], 'k--')
plt.title('ROC Curve')
# Legenda poza wykresem, ≈ºeby nie zas≈Çania≈Ça przy wielu klasach
plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
plt.tight_layout()

# 2. RƒòCZNE WYS≈ÅANIE MODELU NA KO≈ÉCU
# Robimy to tylko raz, tu≈º przed wandb.finish()
wandb.save("checkpoints/model_best.keras")

# ==============================================================================
# 6. LOGOWANIE FINALNE (Podzia≈Ç na Summary i Media)
# ==============================================================================

# 1. LICZBY -> wandb.summary (Tylko do tabeli, BEZ kropek na wykresach)
wandb.summary["test_loss"] = test_loss
wandb.summary["test_accuracy"] = test_acc
wandb.summary["test_precision"] = prec
wandb.summary["test_recall"] = rec
wandb.summary["test_f1_score"] = f1

# 2. OBRAZKI -> wandb.log (Do sekcji Media / Custom Charts)
wandb.log({
    "charts/loss_history": wandb.Image(fig_loss),
    "charts/acc_history": wandb.Image(fig_acc),
    "charts/confusion_matrix": wandb.Image(fig_cm),
    "charts/roc_curve": wandb.Image(fig_roc)
})

# SprzƒÖtanie pamiƒôci RAM (zamykamy wykresy matplotlib)
plt.close('all')

if LOG_MODE == "online":
    print("Proces zako≈Ñczony. Wyniki wys≈Çane do chmury W&B.")

elif LOG_MODE == "offline":
    print("Proces zako≈Ñczony. Wyniki zapisano lokalnie.")

else: # disabled
    print("Proces zako≈Ñczony.")
    print("Logowanie wy≈ÇƒÖczone - nic nie wys≈Çano.")

## Local Result Visualization
This section serves as an immediate, on-screen verification of the training results. It prints the final classification metrics (Accuracy, Loss, Precision, Recall, F1) directly to the console and renders the learning curves and confusion matrix inline using Matplotlib.

In [None]:
# === 1. WYPISANIE WYNIK√ìW LICZBOWYCH ===
print("="*40)
print(f"RAPORT KO≈ÉCOWY MODELU")
print("="*40)
print(f"Test Accuracy:  {test_acc:.2%}")      # Format procentowy
print(f"Test Loss:      {test_loss:.4f}")
print("-"*20)
print(f"Precision:      {prec:.4f}")
print(f"Recall:         {rec:.4f}")
print(f"F1 Score:       {f1:.4f}")
print("="*40)

# === 2. RYSOWANIE WYKRES√ìW HISTORII (LOSS & ACCURACY) ===
# Tworzymy jeden du≈ºy obrazek z dwoma wykresami obok siebie
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))

# Wykres Loss
ax1.plot(history.history['loss'], label='Train Loss')
ax1.plot(history.history['val_loss'], label='Val Loss')
ax1.set_title('Loss (B≈ÇƒÖd)')
ax1.set_xlabel('Epoka')
ax1.set_ylabel('Loss')
ax1.legend()
ax1.grid(True)

# Wykres Accuracy
ax2.plot(history.history['accuracy'], label='Train Acc')
ax2.plot(history.history['val_accuracy'], label='Val Acc')
ax2.set_title('Accuracy (Dok≈Çadno≈õƒá)')
ax2.set_xlabel('Epoka')
ax2.set_ylabel('Acc')
ax2.legend()
ax2.grid(True)

plt.tight_layout()
plt.show()

# === 3. RYSOWANIE MACIERZY POMY≈ÅEK (CONFUSION MATRIX) ===
# Korzystamy z wcze≈õniej obliczonych y_true_int i y_pred
fig, ax = plt.subplots(figsize=(12, 12))
disp = ConfusionMatrixDisplay(confusion_matrix=cm) # cm jest w pamiƒôci z poprzedniej kom√≥rki
disp.plot(ax=ax, cmap='Blues', values_format='d')
plt.title('Macierz Pomy≈Çek (Test Set)')
plt.show()

# 7. Specialized Quality metrics
Standard accuracy can sometimes be misleading in multi-class problems. Here, we calculate additional metrics proposed for fault diagnosis:
* **$A$ (Average Accuracy):** Mean accuracy across all classes.
* **$Q_{AC}$ (Accurate Classification Quality):** A metric that rewards high accuracy while penalizing performance on a large number of classes (based on information theory).
* **Inference Time ($T$):** Measuring the computational cost of a single prediction to assess real-time feasibility.

In [None]:
# === 1. Zbieramy y_true i y_pred na zbiorze testowym ===
y_true_list = []
y_pred_list = []

for x_batch, y_batch in test_ds:
    # y_batch: numery klas 0..NUM_CLASSES-1 (je≈õli one-hot, damy radƒô ni≈ºej)
    y_true_list.append(y_batch.numpy())
    y_pred_list.append(
        np.argmax(model.predict(x_batch, verbose=0), axis=1)
    )

y_true = np.concatenate(y_true_list, axis=0)
y_pred = np.concatenate(y_pred_list, axis=0)

# je≈õli y_true jest one-hot ‚Üí zrzucamy do indeks√≥w klas
if y_true.ndim > 1:
    y_true_int = np.argmax(y_true, axis=1)
else:
    y_true_int = y_true.astype(int)

# === 2. Confusion matrix ===
cm = confusion_matrix(y_true_int, y_pred)
#print("Confusion matrix:\n", cm)

# === 3. Funkcja do obliczania A, Q_AC, Q_ACT wg wzor√≥w z artyku≈Çu ===
def compute_quality_indices(cm, alpha=3.0, T_ms=None, T0_ms=1.0):
    """
    cm    : macierz pomy≈Çek (C x C)
    alpha : indeks potƒôgowy Œ± (>1)
    T_ms  : czas klasyfikacji w ms (dla Q_ACT), je≈õli None ‚Äì Q_ACT nie jest liczony
    T0_ms : okres odniesienia (1 ms dla 1 kHz)
    """
    C = cm.shape[0]           # liczba klas
    N = cm.sum()              # liczba wszystkich pr√≥bek

    TP = np.diag(cm)
    FN = cm.sum(axis=1) - TP
    FP = cm.sum(axis=0) - TP
    TN = N - TP - FN - FP

    # Eq. (10): averaged accuracy A
    per_class_acc = (TP + TN) / (TP + FP + TN + FN)
    A = per_class_acc.mean()

    # Eq. (9): Q_AC = A^Œ± * sqrt(ln C)
    Q_AC = (A ** alpha) * np.sqrt(np.log(C))

    # Eq. (11): Q_ACT = A^Œ± * sqrt( ln C / ln(1 + œÑ^2) ), œÑ = T/T0
    Q_ACT = None
    tau = None
    if T_ms is not None:
        tau = T_ms / T0_ms
        Q_ACT = (A ** alpha) * np.sqrt(np.log(C) / np.log(1.0 + tau**2))

    return {
        "A": A,
        "per_class_acc": per_class_acc,
        "Q_AC": Q_AC,
        "Q_ACT": Q_ACT,
        "tau": tau,
    }

# === 4. Pomiar czasu JEDNEJ klasyfikacji (T) ===
# bierzemy jednƒÖ pr√≥bkƒô ze zbioru testowego
for xb, yb in test_ds.take(1):
    sample = xb[0:1]   # kszta≈Çt (1, n_freq_bins, n_axes)
    break

# ma≈Çy "warm-up", ≈ºeby pominƒÖƒá jednorazowe op√≥≈∫nienia
_ = model.predict(sample, verbose=0)

N_REPEAT = 100  # ile razy powtarzamy, ≈ºeby u≈õredniƒá
start = time.time()
for _ in range(N_REPEAT):
    _ = model.predict(sample, verbose=0)
end = time.time()

ANN_T_ms = (end - start) * 1000.0 / N_REPEAT   # ≈õredni czas jednej klasyfikacji w ms

# === 5. Liczymy wska≈∫niki ===
metrics_q = compute_quality_indices(cm, alpha=3.0)

print("\n=== Specjalistyczne metryki (wieloklasowe) ===\n")
print(f"A (≈õrednia accuracy po klasach): {metrics_q['A']:.4f}")
print(f"Q_AC (Accurate Classification Quality): {metrics_q['Q_AC']:.4f}")
print(f"Czas jednej klasyfikacji T: {ANN_T_ms:.3f} ms")
#print(f"Q_ACT (Accurate Classification-Time Quality): {metrics_q['Q_ACT']:.4f}")
print("Accuracy per klasa:", metrics_q["per_class_acc"])


In [None]:
wandb.summary["Q_AC"] = metrics_q['Q_AC']

if LOG_MODE == "online":
    print("Specjalistyczna metryka Accurate Classification Quality wys≈Çana do chmury W&B.")

elif LOG_MODE == "offline":
    print("Specjalistyczna metryka Accurate Classification Quality zapisana lokalnie.")

else: # disabled
    print("Logowanie wy≈ÇƒÖczone - nic nie wys≈Çano.")

# W&B Experiment finish

In [None]:
# === KONIEC EKSPERYMENTU W&B ===
wandb.finish()

### W&B SYNC

Only necessary when W&B 'offline' mode is set.

In [None]:
#wandb sync wandb/latest-run

# 8. Model Export

To facilitate deployment on the drone's embedded platform or for future inference tasks, we serialize the trained model along with a metadata file. The `metadata.json` file contains critical configuration details regarding sensor setup and class mapping, which are essential for correctly interpreting model predictions in a production environment.

In [None]:
# katalog na zapisane modele
SAVE_DIR = "saved_models"
os.makedirs(SAVE_DIR, exist_ok=True)

MODEL_PATH    = os.path.join(SAVE_DIR, "uav_fft_classifier.keras")  # format Keras
METADATA_PATH = os.path.join(SAVE_DIR, "metadata.json")

# Tworzymy s≈Çownik odwrotny: ID -> Nazwa scenariusza (do odczytu predykcji)
class_to_scenario = {v: k for k, v in scenario_to_class.items()}

# 1) zapis samego modelu
model.save(MODEL_PATH)
print("Zapisano model do:", MODEL_PATH)

# 2) zapis podstawowych metadanych (przydadzƒÖ siƒô przy wczytywaniu)
metadata = {
    "data_domain": config.domain,
    "num_classes": int(NUM_CLASSES),
    "classes_mapping": class_to_scenario,
    "input_shape": input_shape,
    "sensor_mode": SENSOR_MODE,
    "data_files_pattern": norm_data_files_pattern,   # norm_data_files_pattern lub FFT_CONFIG
    "final_test_accuracy": float(test_acc),
    "final_f1_score": float(f1)
}

with open(METADATA_PATH, "w") as f:
    json.dump(metadata, f, indent=2)

print("Zapisano metadane do:", METADATA_PATH)


# Single Inference & Real-world Simulation

This cell simulates a production environment scenario to verify the model's performance on individual samples. It performs the following steps:
1.  **Model Loading:** Reloads the saved model (`.keras`) from the disk to ensure the artifact is valid and ready for deployment.
2.  **Random Sampling:** Selects a single, random sample from the test dataset to simulate an incoming data stream.
3.  **Inference:** Runs the classification and calculates the confidence score.
4.  **Decoding:** Maps the numerical class ID back to a human-readable fault scenario (e.g., identifying specifically which rotor is damaged) using the `scenario_to_class` mapping.

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import load_model

# ==============================================================================
# 1. CONFIGURATION & MAPPING (DECODER RING)
# ==============================================================================

# Fault type definitions based on your description
FAULT_MEANING = {
    '0': "‚úÖ no fault",
    '1': "‚ö†Ô∏è chipped edge",
    '2': "‚ùå bent tip"
}

# Create inverse dictionary: Class ID -> Scenario Code (e.g., 10 -> "1020")
current_map = scenario_to_class_20
class_to_scenario = {v: k for k, v in current_map.items()}

def decode_scenario_string(scenario_code):
    """
    Decodes the string (e.g., '1020') into human-readable rotor status.
    Assumption: The string digits correspond to Rotors [A, B, C, D].
    """
    description = r"""
    Rotors Layout:
       C   A
        \ /
        / \
       D   B
    """
    # Mapping indices 0,1,2,3 to the Rotors A,B,C,D and their positions
    rotors_mapping = [
        "Rotor A (Front-Right)", # Index 0
        "Rotor B (Rear-Right) ", # Index 1
        "Rotor C (Front-Left) ", # Index 2
        "Rotor D (Rear-Left)  "  # Index 3
    ]

    print(f"\n Detailed UAV State Analysis (Code: {scenario_code}):")
    print("-" * 60)

    # Iterate through the characters of the string
    for i, char in enumerate(scenario_code):
        if i < len(rotors_mapping):
            rotor_name = rotors_mapping[i]
            status = FAULT_MEANING.get(char, "Unknown State")
            print(f"  üöÅ {rotor_name:<25}: {status}")
    print("-" * 60)

# ==============================================================================
# 2. LOAD MODEL & GET RANDOM SAMPLE
# ==============================================================================

# Path to your saved model (ensure this matches your SAVE_DIR)
MODEL_PATH = "saved_models/uav_fft_classifier.keras"

print(f"Loading model from: {MODEL_PATH} ...")
try:
    loaded_model = load_model(MODEL_PATH)
    print("Model loaded successfully.")
except Exception as e:
    print(f"Error loading model: {e}")
    # Fallback to the current 'model' variable if file not found (for testing in NB)
    loaded_model = model

# Get one batch from the test dataset
print("Sampling a random input from test_ds...")
for x_batch, y_batch in test_ds.take(1):
    # Pick a random index within this batch
    random_idx = np.random.randint(0, len(x_batch))

    # Extract single sample
    sample_x = x_batch[random_idx] # Shape (65, 24)
    sample_y = y_batch[random_idx] # True label

    # The model expects a batch dimension, so we expand: (1, 65, 24)
    sample_x_expanded = np.expand_dims(sample_x, axis=0)

    # ==========================================================================
    # 3. PERFORM INFERENCE
    # ==========================================================================

    # Predict
    predictions = loaded_model(sample_x_expanded, training=False).numpy()
    pred_class_id = np.argmax(predictions)
    confidence = np.max(predictions) * 100

    # ==========================================================================
    # 4. DECODE & REPORT
    # ==========================================================================

    # Handle true label format (one-hot vs integer)
    if np.ndim(sample_y) > 0 and len(sample_y) > 1:
        true_class_id = np.argmax(sample_y)
    else:
        true_class_id = int(sample_y)

    # Convert ID -> Scenario Code (e.g., 12 -> "0120")
    pred_scenario_code = class_to_scenario.get(pred_class_id, "????")
    true_scenario_code = class_to_scenario.get(true_class_id, "????")

    print("\n" + "="*60)
    print(f"SINGLE INFERENCE RESULT")
    print("="*60)

    # Visual check
    is_correct = (pred_class_id == true_class_id)
    icon = "‚úÖ SUCCESS" if is_correct else "‚ùå FAILURE"

    print(f"True Class:        {true_class_id} [{true_scenario_code}]")
    print(f"Predicted Class:   {pred_class_id} [{pred_scenario_code}]")
    print(f"Model Confidence:  {confidence:.2f}%")
    print(f"Inference Status:  {icon}")

    # Decode the predicted scenario to show rotor status
    decode_scenario_string(pred_scenario_code)

    if not is_correct:
      print("\n\nTrue class scenario description")
      decode_scenario_string(true_scenario_code)

    break # Stop after one sample