In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import math
import os
from sklearn.preprocessing import LabelEncoder, StandardScaler
from imblearn.over_sampling import RandomOverSampler
from sklearn.metrics import confusion_matrix, classification_report
from sklearn.decomposition import PCA
from sklearn.model_selection import train_test_split
from pathlib import Path


In [None]:
import kagglehub

# Download latest version
#path = kagglehub.dataset_download("behrad3d/nasa-cmaps")
#print("Path to dataset files:", path)

DATA_PATH = Path("datasets/CMaps/")
images_dir = "images"

# FD00X dataset prep

In [None]:
indexes = ['unit_number', 'time_cycles']
settings = ['setting_1', 'setting_2', 'setting_3']
sensors = ['s_{}'.format(i+1) for i in range(0,21)]
COLS = indexes + settings + sensors

In [None]:
Sensor_dictionary = {}
dict_list = [
    "Fan intake temperature (°R)",
    "Low-Pressure Compressor outlet temperature (°R)",
    "High-Pressure Compressor outlet temperature (°R)",
    "Low-Pressure Turbine outlet temperature (°R)",
    "Fan intake pressure (psia)",
    "Bypass-duct pressure (psia)",
    "High-Pressure Compressor outlet pressure (psia)",
    "Physical fan RPM",
    "Physical core RPM",
    "Engine pressure ratio (P50/P2)",
    "High-Pressure Compressor outlet static pressure (psia)",
    "Fuel flow to Ps30 ratio (pps/psia)",
    "Corrected fan RPM",
    "Corrected core RPM",
    "Bypass ratio",
    "Burner fuel-air ratio",
    "Bleed enthalpy",
    "Required fan RPM",
    "Required fan conversion RPM",
    "High-pressure turbine cooling airflow",
    "Low-pressure turbine cooling airflow"
]

Sensor_dictionary = {f's_{i+1}': sensor for i, sensor in enumerate(dict_list)}
Sensor_dictionary


In [None]:
def load_fd_dataset(dataset_id):
    """
    Load train/test/RUL files for a single FD dataset (e.g., FD001, FD002, etc.)
    
    :param dataset_id: integer 1..4, e.g. for FD001 use dataset_id=1
    :return: df_train, df_test, df_rul (pandas DataFrames)
    """

    train_file = DATA_PATH / f"train_FD00{dataset_id}.txt"
    test_file  = DATA_PATH / f"test_FD00{dataset_id}.txt"
    rul_file   = DATA_PATH / f"RUL_FD00{dataset_id}.txt"

    df_train = pd.read_csv(
        train_file,
        sep=r"\s+",        
        header=None,
        names=COLS,
        index_col=False
    )

    df_test = pd.read_csv(
        test_file,
        sep=r"\s+",
        header=None,
        names=COLS,
        index_col=False
    )

    df_rul = pd.read_csv(
        rul_file,
        sep=r"\s+",
        header=None,
        names=["RUL"],
        index_col=False
    )
    
    return df_train, df_test, df_rul

def add_train_rul(df_train):
    """
    For the training set, calculate RUL for every row.
    NASA’s train data runs each engine to failure, so:
      RUL = (last cycle for that engine) - (current cycle).
    """
    # Group by unit and get the max cycle of each engine
    max_cycle = df_train.groupby("unit_number")["time_cycles"].transform("max")
    # RUL = distance to max cycle
    df_train["RUL"] = max_cycle - df_train["time_cycles"]
    return df_train

def add_test_rul(df_test, df_rul):
    """
    For the test set, each engine is truncated before failure. 
    NASA gives a single RUL for the *last* row of each engine in df_rul.
    
    Typically, we only need that final row to evaluate or predict RUL. 
    So we can 'merge' that RUL onto the final snapshot of each engine.
    
    If you want row-level RUL for the entire partial test run (less common),
    you need a different approach. Usually, we label only the last row.
    """
    # Identify the final row for each engine in the test set
    # i.e., the row with the maximum 'time_cycles' for that unit_number
    idx = df_test.groupby("unit_number")["time_cycles"].transform("max") == df_test["time_cycles"]
    final_test_rows = df_test[idx].copy().reset_index(drop=True)
    
    # Attach RUL from df_rul, which is one row per engine
    # RUL rows match by index => engine 1 => df_rul.loc[0], engine 2 => df_rul.loc[1], etc.
    # final_test_rows are also in ascending engine order, so we can do direct assignment
    final_test_rows["RUL"] = df_rul["RUL"]
    
    return final_test_rows


In [None]:
datasets = {}  

for i in range(1, 5):
    
    df_train_raw, df_test_raw, df_rul = load_fd_dataset(i)
    df_train = add_train_rul(df_train_raw)
    df_test_final = add_test_rul(df_test_raw, df_rul)
    key = f"FD00{i}"
    datasets[key] = {
        "train":       df_train,   
        "test":        df_test_raw,
        "rul":         df_rul,
        "test_final":  df_test_final,
    }

In [None]:
for ds_name, ds_dict in datasets.items():
    print(ds_name)
    print("  train shape:", ds_dict["train"].shape, "(includes computed RUL)")
    print("  test shape: ", ds_dict["test"].shape)
    print("  rul shape:  ", ds_dict["rul"].shape, "(one row per engine in test)")
    print("  final test shape (with RUL):", ds_dict["test_final"].shape)
    print()

In [None]:
datasets["FD001"]["train"]

In [None]:
datasets["FD001"]["train"].describe().transpose()

create distribution plot of max cycles

In [None]:
for ds_name, ds_dict in datasets.items():
    df_train = ds_dict["train"]
    fig_name = os.path.join(images_dir, f"{ds_name}_displot.png")
    
    if os.path.exists(fig_name):
        continue

    max_time_cycles = df_train.groupby("unit_number")["time_cycles"].max()
    sns.displot(
        data=max_time_cycles, 
        kde=True,
        bins=20,
        height=6,
        aspect=2,
    )
    plt.xlabel("Max Time Cycle")
    plt.title(f"Max Time Cycles Distribution – {ds_name}", fontsize=14)
    plt.savefig(fig_name)
    plt.close()


Create correlation_matrixes 

In [None]:
for ds_name, ds_dict in datasets.items():
    df_train = ds_dict["train"]
    fig_name = os.path.join(images_dir, f"{ds_name}_corr_heatmap.png")

    if os.path.exists(fig_name):
        continue

    corr = df_train.corr()
    mask = np.triu(np.ones_like(corr, dtype=bool))

    plt.figure(figsize=(20, 20))
    cmap = sns.diverging_palette(230, 10, as_cmap=True)

    sns.heatmap(
        corr,
        mask=mask,
        cmap=cmap,
        vmax=.3,
        center=0,
        square=True,
        linewidths=.5,
        cbar_kws={"shrink": .5},
        annot=True,
        fmt=".2f"
    )
    plt.title(f"Correlation Matrix Heatmap – {ds_name}", fontsize=14)
    plt.savefig(fig_name)
    plt.close()


In [None]:
signal_dir = os.path.join(images_dir, "signal_plots")
os.makedirs(signal_dir, exist_ok=True)
def plot_signal(df, Sensor_dic, signal_name, ds_name, images_dir):
    fig_name = os.path.join(signal_dir, f"{ds_name}_{signal_name}.png")
    if os.path.exists(fig_name):
        return

    plt.figure(figsize=(13,5))
    for i in df['unit_number'].unique():
        if (i % 10 == 0):
            plt.plot('RUL', signal_name, data=df[df['unit_number']==i].rolling(10).mean())

    plt.xlim(250, 0)
    plt.xticks(np.arange(0, 300, 25))
    plt.ylabel(Sensor_dic[signal_name])
    plt.xlabel('Remaining Useful Life')
    plt.title(f'{signal_name} - {ds_name}')
    plt.savefig(fig_name)
    plt.close()

for ds_name, ds_dict in datasets.items():
    df_train = ds_dict["train"]
    for i in range(1, 22):
        signal_name = f's_{i}'
        try:
            plot_signal(df_train, Sensor_dictionary, signal_name, ds_name, images_dir)
        except Exception as e:
            pass

In [None]:
boxplot_dir = os.path.join(images_dir, "box_plots")
os.makedirs(boxplot_dir, exist_ok=True)

for ds_name, ds_dict in datasets.items():
    df_train = ds_dict["train"]
    
    for i in range(1, 22):
        signal_name = f's_{i}'
        fig_name = os.path.join(boxplot_dir, f"{ds_name}_{signal_name}_boxplot.png")
        
        if os.path.exists(fig_name):
            print(f"Skipping {fig_name}, already exists.")
            continue

        plt.figure(figsize=(14, 6))
        sns.boxplot(x="unit_number", y=signal_name, data=df_train)
        plt.xticks(rotation=90)
        plt.title(f"Box Plot of {signal_name} by Unit – {ds_name}")
        plt.xlabel("Unit Number")
        plt.ylabel(Sensor_dictionary.get(signal_name, signal_name))  # fallback to raw name
        plt.tight_layout()
        plt.savefig(fig_name)
        plt.close()
