<a href="https://colab.research.google.com/github/Saransh1329/BlackBox-Agentic-AI-for-Predictive-Maintenance/blob/main/ai1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# ==============================================================
# FULL COLAB SCRIPT - PRESERVES ALL WORKSHEETS - FIXED VERSION
# ==============================================================

!pip install -q openpyxl
import pandas as pd
import numpy as np
import os
from google.colab import files
from IPython.display import display
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.metrics import classification_report, roc_auc_score, mean_absolute_error
import joblib
import random

RANDOM_STATE = 42
np.random.seed(RANDOM_STATE)
random.seed(RANDOM_STATE)

# ==============================================================
# RULE-BASED ANALYZER + SYNTHETIC RUL GENERATION
# ==============================================================

def run_single_anomaly_analysis(df, assume_rul=True):

    df = df.copy().reset_index(drop=True)

    # ---------------- Ensure Timestamp + Vehicle ID ----------------
    if "timestamp" in df.columns:
        df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce")
    else:
        df["timestamp"] = pd.NaT

    if "vehicle_id" not in df.columns:
        df["vehicle_id"] = "__single_vehicle__"

    # ---------------- Ensure All Sensor Columns Exist ----------------
    required_cols = [
        "engine_temp_c","coolant_temp_c","oil_pressure_psi","fuel_consumption_lph",
        "engine_load_percent","air_flow_rate_gps","exhaust_gas_temp_c","vibration_level",
        "brake_fluid_level_psi","brake_temp_c","brake_pedal_pos_percent",
        "battery_voltage_v","battery_temp_c","alternator_output_v"
    ]

    for c in required_cols:
        if c not in df.columns:
            df[c] = np.nan

    # ==============================================================
    # SEVERITY FUNCTIONS
    # ==============================================================

    def sev_engine_temp(x):
        if pd.isna(x): return 0
        if x > 118: return 3
        elif x > 108: return 2
        elif x > 100: return 1
        return 0

    def sev_coolant(x):
        if pd.isna(x): return 0
        if x > 110: return 3
        elif x > 100: return 2
        elif x > 92: return 1
        return 0

    def sev_oil(x):
        if pd.isna(x): return 0
        if x < 15: return 3
        elif x < 25: return 2
        elif x < 35: return 1
        return 0

    def sev_fuel(x):
        if pd.isna(x): return 0
        if x > 20: return 3
        elif x > 12: return 2
        elif x > 8: return 1
        return 0

    def sev_load(x):
        if pd.isna(x): return 0
        if x > 98: return 3
        elif x > 90: return 2
        elif x > 75: return 1
        return 0

    def sev_air(x):
        if pd.isna(x): return 0
        if x == 0 or x > 95: return 3
        if (80 < x <= 95) or (0 < x < 15): return 2
        if (70 < x <= 80) or (15 <= x <= 20): return 1
        return 0

    def sev_exhaust(x):
        if pd.isna(x): return 0
        if x > 850: return 3
        elif x > 700: return 2
        elif x > 550: return 1
        return 0

    def sev_vib(x):
        if pd.isna(x): return 0
        if x > 8: return 3
        elif x > 5: return 2
        elif x > 3: return 1
        return 0

    def sev_brake_fluid(x):
        if pd.isna(x): return 0
        if x < 10: return 3
        elif x < 20: return 2
        elif x < 30: return 1
        return 0

    def sev_brake_temp(x):
        if pd.isna(x): return 0
        if x > 300: return 3
        elif x > 200: return 2
        elif x > 120: return 1
        return 0

    def sev_batt_volt(v):
        if pd.isna(v): return 0
        if v < 11: return 3
        elif v < 11.5: return 2
        elif v < 12: return 1
        return 0

    def sev_alt(x):
        if pd.isna(x): return 0
        if x < 13: return 3
        elif x < 13.5: return 2
        elif x < 13.9: return 1
        return 0

    # ==============================================================
    # COMPUTE SEVERITY COLUMNS
    # ==============================================================

    df["_sev_engine_temp"] = df.engine_temp_c.apply(sev_engine_temp)
    df["_sev_coolant"] = df.coolant_temp_c.apply(sev_coolant)
    df["_sev_oil"] = df.oil_pressure_psi.apply(sev_oil)
    df["_sev_fuel"] = df.fuel_consumption_lph.apply(sev_fuel)
    df["_sev_load"] = df.engine_load_percent.apply(sev_load)
    df["_sev_air"] = df.air_flow_rate_gps.apply(sev_air)
    df["_sev_exhaust"] = df.exhaust_gas_temp_c.apply(sev_exhaust)
    df["_sev_vibration"] = df.vibration_level.apply(sev_vib)
    df["_sev_brake_fluid"] = df.brake_fluid_level_psi.apply(sev_brake_fluid)
    df["_sev_brake_temp"] = df.brake_temp_c.apply(sev_brake_temp)
    df["_sev_battery"] = df.battery_voltage_v.apply(sev_batt_volt)
    df["_sev_alternator"] = df.alternator_output_v.apply(sev_alt)

    # ==============================================================
    # SINGLE MOST IMPORTANT ANOMALY SELECTION
    # ==============================================================

    sev_map = {0:"NORMAL",1:"LOW",2:"MEDIUM",3:"HIGH"}

    fault_labels = {
        "_sev_engine_temp": "Engine Temp",
        "_sev_coolant": "Coolant Temp",
        "_sev_oil": "Oil Pressure",
        "_sev_fuel": "Fuel Consumption",
        "_sev_load": "Engine Load",
        "_sev_air": "Air Flow",
        "_sev_exhaust": "Exhaust Temp",
        "_sev_vibration": "Vibration",
        "_sev_brake_fluid": "Brake Fluid",
        "_sev_brake_temp": "Brake Temp",
        "_sev_battery": "Battery",
        "_sev_alternator": "Alternator",
    }

    sev_cols = list(fault_labels.keys())

    def select_top_anomaly(row):
        sev_values = {c: row[c] for c in sev_cols}
        max_sev = max(sev_values.values())
        if max_sev == 0:
            return "None", "NORMAL"
        top_cols = [c for c,v in sev_values.items() if v == max_sev]
        top_col = top_cols[0]
        return f"{fault_labels[top_col]} ({sev_map[max_sev]})", sev_map[max_sev]

    df["faults_detected"], df["highest_severity"] = zip(*df.apply(select_top_anomaly, axis=1))

    # ==============================================================
    # SUMMARY TEXT
    # ==============================================================

    def make_summary(row):
        if row["faults_detected"] == "None":
            return "NORMAL — no anomalies detected"
        return f"{row['highest_severity']} — Critical fault: {row['faults_detected']}"

    df["summary_text"] = df.apply(make_summary, axis=1)

    # ==============================================================
    # FAILURE LABEL (ML TARGET)
    # ==============================================================

    df["failure_label"] = df["highest_severity"].map(lambda x: 1 if x in ["HIGH","MEDIUM"] else 0)

    # ==============================================================
    # SYNTHETIC RUL GENERATION
    # ==============================================================

    if assume_rul:
        rng = np.random.default_rng(RANDOM_STATE)
        rul_vals = []
        for s in df["highest_severity"]:
            if s == "HIGH":
                rul_vals.append(float(rng.uniform(0.5,6)))
            elif s == "MEDIUM":
                rul_vals.append(float(rng.uniform(6,48)))
            elif s == "LOW":
                rul_vals.append(float(rng.uniform(48,168)))
            else:
                rul_vals.append(np.nan)
        df["rul_hours_for_training"] = rul_vals
    else:
        df["rul_hours_for_training"] = np.nan

    return df


# ==============================================================
# TRAIN MODELS + ADD PREDICTIONS
# ==============================================================

def train_and_predict(df):

    feature_cols = [
        "engine_temp_c","coolant_temp_c","oil_pressure_psi","fuel_consumption_lph",
        "engine_load_percent","air_flow_rate_gps","exhaust_gas_temp_c","vibration_level",
        "brake_fluid_level_psi","brake_temp_c","battery_voltage_v","battery_temp_c",
        "alternator_output_v"
    ]

    X = df[feature_cols].fillna(df[feature_cols].median())
    y_cls = df["failure_label"]
    y_rul = df["rul_hours_for_training"]

    # --------------------- CLASSIFIER ---------------------
    model_cls = None
    if y_cls.nunique() > 1:
        X_train, X_test, y_train, y_test = train_test_split(X, y_cls, test_size=0.2, random_state=42)
        model_cls = RandomForestClassifier(n_estimators=200, random_state=42, class_weight='balanced')
        model_cls.fit(X_train, y_train)
        print("\n--- CLASSIFICATION REPORT ---")
        preds = model_cls.predict(X_test)
        probs = model_cls.predict_proba(X_test)[:,1]
        print(classification_report(y_test, preds))
        print("ROC-AUC:", roc_auc_score(y_test, probs))

        df["pred_failure_prob"] = model_cls.predict_proba(X)[:,1]
        df["pred_failure_prob_pct"] = (df["pred_failure_prob"]*100).round(1)
        df["pred_risk_label"] = df["pred_failure_prob"].apply(lambda p: "HIGH" if p>0.7 else ("MEDIUM" if p>0.4 else "LOW"))
    else:
        print("\n--- INSUFFICIENT DATA FOR CLASSIFICATION ---")
        print("Only one class found in data. Creating default prediction columns.")
        df["pred_failure_prob"] = 0.0
        df["pred_failure_prob_pct"] = 0.0
        df["pred_risk_label"] = "UNKNOWN"

    # --------------------- RUL REGRESSOR ---------------------
    model_rul = None
    df_rul_train = df.dropna(subset=["rul_hours_for_training"])
    if len(df_rul_train) >= 10:
        Xr = df_rul_train[feature_cols].fillna(df_rul_train[feature_cols].median())
        yr = df_rul_train["rul_hours_for_training"]

        Xr_train, Xr_test, yr_train, yr_test = train_test_split(Xr, yr, test_size=0.2, random_state=42)
        model_rul = RandomForestRegressor(n_estimators=200, random_state=42)
        model_rul.fit(Xr_train, yr_train)
        pred_rul = model_rul.predict(Xr_test)
        print("\n--- RUL REGRESSION MAE ---")
        print(mean_absolute_error(yr_test, pred_rul))

        df["pred_rul_hours"] = model_rul.predict(X)
    else:
        print("\n--- INSUFFICIENT DATA FOR RUL REGRESSION ---")
        print("Using synthetic RUL values as predictions.")
        df["pred_rul_hours"] = df["rul_hours_for_training"]

    return df


# ==============================================================
# MAIN: UPLOAD → PROCESS → OUTPUT WITH ALL SHEETS
# ==============================================================

print("Upload your Excel file with multiple worksheets:")
uploaded = files.upload()
filename = list(uploaded.keys())[0]

# Read all sheets from the Excel file
if filename.lower().endswith(".xlsx") or filename.lower().endswith(".xls"):
    all_sheets = pd.read_excel(filename, sheet_name=None)  # Returns dict of all sheets

    # Check if "Failure prediction" sheet exists
    sheet_names = list(all_sheets.keys())
    print(f"\nFound sheets: {sheet_names}")

    # Find the failure prediction sheet (case-insensitive, handles misspelling)
    failure_sheet_name = None
    for name in sheet_names:
        # Check for both "Failure" and "Failiure" spellings
        if name.lower() in ["failure prediction", "failiure prediction"]:
            failure_sheet_name = name
            break

    if failure_sheet_name is None:
        print("\nERROR: 'Failure prediction' or 'Failiure Prediction' sheet not found!")
        print("Available sheets:", sheet_names)
    else:
        # Process only the Failure prediction sheet
        raw = all_sheets[failure_sheet_name]
        print(f"\nProcessing '{failure_sheet_name}' sheet...")

        # Run analyzer
        df_processed = run_single_anomaly_analysis(raw, assume_rul=True)

        # Run predictive models
        df_final = train_and_predict(df_processed)

        # Replace the processed sheet in the dictionary
        all_sheets[failure_sheet_name] = df_final

        # Save all sheets to output file
        out_path = "/content/output_complete_file.xlsx"
        with pd.ExcelWriter(out_path, engine='openpyxl') as writer:
            for sheet_name, sheet_data in all_sheets.items():
                sheet_data.to_excel(writer, sheet_name=sheet_name, index=False)

        print(f"\n✓ Saved complete file with all {len(all_sheets)} sheets:", out_path)
        print(f"✓ Processed sheet: {failure_sheet_name}")
        print(f"✓ Preserved sheets: {[s for s in sheet_names if s != failure_sheet_name]}")

        display(df_final.head(10))
        files.download(out_path)
else:
    print("\nERROR: Please upload an Excel file (.xlsx or .xls)")