<a href="https://colab.research.google.com/github/AmanManiTiwari/dpf-soot-load-prediction/blob/main/main.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [50]:
# =========================
# Predictive Maintenance Pipeline: DPF Soot Load Prediction
# =========================

In [51]:
# -------------------------
# IMPORT NECESSARY PACKAGES
# -------------------------
# numpy, pandas: data manipulation
# datetime: timestamp handling
# sklearn: model building and evaluation
# fastapi: API for serving predictions
# joblib: save/load model artifacts

In [52]:
import numpy as np
import pandas as pd
from datetime import timedelta
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_absolute_error
from fastapi import FastAPI
import joblib
import warnings

# Ignore all warnings
warnings.filterwarnings("ignore")


In [53]:
# -------------------------
# CONFIGURATION
# -------------------------
# Define constants for synthetic data generation and modeling
# N_VEHICLES: number of vehicles in dataset
# DAYS: total duration for data simulation
# FREQ_MIN: frequency of telemetry readings in minutes
# SOOT_REGEN_THRESHOLD: threshold for soot to trigger regeneration
# MODEL_PATH: location to save trained model

In [54]:
N_VEHICLES = 20
DAYS = 30
FREQ_MIN = 5
SOOT_REGEN_THRESHOLD = 75

MODEL_PATH = "soot_model.pkl"

In [55]:
# -------------------------
# PART 1: DATA GENERATION
# -------------------------
# Function to simulate vehicle sensor telemetry
# - Generates features like engine load, speed, rpm, exhaust temperatures, differential pressure, and soot load
# - Adds realistic randomness/noise to mimic real-world sensors
# - Gradually accumulates soot over time for each vehicle


In [56]:
def generate_sensor_data():
    records = []
    timestamps = pd.date_range(
        start="2025-11-01",
        periods=int((24 * 60 / FREQ_MIN) * DAYS),
        freq=f"{FREQ_MIN}min"
    )

    for vid in range(1, N_VEHICLES + 1):
        soot = np.random.uniform(5, 15)  # starting soot level

        for ts in timestamps:
            engine_load = np.random.uniform(20, 90)
            speed = np.random.uniform(0, 90)
            rpm = np.random.uniform(800, 2200)

            exhaust_temp_pre = 200 + engine_load * 3 + np.random.normal(0, 10)
            exhaust_temp_post = exhaust_temp_pre - np.random.uniform(10, 40)

            diff_pressure = 5 + soot * 0.8 + np.random.normal(0, 1)
            exhaust_flow = speed * 0.4 + engine_load * 0.3
            ambient_temp = np.random.uniform(10, 40)

            # Increment soot load gradually based on engine load
            soot += engine_load * 0.0005
            soot = min(100, soot)  # cap at 100%

            records.append([
                vid, ts, engine_load, exhaust_temp_pre, exhaust_temp_post,
                diff_pressure, exhaust_flow, speed, rpm,
                ambient_temp, soot
            ])

    columns = [
        "vehicle_id", "timestamp", "engine_load",
        "exhaust_temp_pre", "exhaust_temp_post",
        "diff_pressure", "exhaust_flow",
        "vehicle_speed", "rpm",
        "ambient_temp", "soot_load"
    ]

    return pd.DataFrame(records, columns=columns)

In [57]:
# -------------------------
# Generate Maintenance Events
# -------------------------
# Function to simulate maintenance / regeneration events
# - Triggers events when soot exceeds SOOT_REGEN_THRESHOLD
# - Returns an event-based dataframe

In [58]:
def generate_maintenance_events(sensor_df):
    events = []
    for vid in sensor_df.vehicle_id.unique():
        high_soot = sensor_df[
            (sensor_df.vehicle_id == vid) &
            (sensor_df.soot_load > SOOT_REGEN_THRESHOLD)
        ]
        if not high_soot.empty:
            for ts in high_soot.sample(min(3, len(high_soot))).timestamp:
                events.append([vid, ts, "active_regeneration"])
    return pd.DataFrame(events, columns=["vehicle_id", "event_time", "event_type"])

In [59]:
# -------------------------
# FEATURE ENGINEERING
# -------------------------
# Add rolling averages and delta features
# - temp_rolling: smooth exhaust temp over last ~1 hour (12 readings at 5-min freq)
# - pressure_rolling: smooth diff pressure
# - temp_delta: difference pre vs post DPF (regeneration efficiency indicator)


In [60]:
def add_features(df):
    df = df.sort_values(["vehicle_id", "timestamp"])

    df["temp_rolling"] = (
        df.groupby("vehicle_id")["exhaust_temp_pre"]
        .rolling(12, min_periods=1)
        .mean()
        .reset_index(0, drop=True)
    )

    df["pressure_rolling"] = (
        df.groupby("vehicle_id")["diff_pressure"]
        .rolling(12, min_periods=1)
        .mean()
        .reset_index(0, drop=True)
    )

    df["temp_delta"] = df["exhaust_temp_pre"] - df["exhaust_temp_post"]

    return df

In [61]:
# -------------------------
# DATA QUALITY CHECKS
# -------------------------
# Validate missing values and detect possible sensor drift

In [62]:
def validate_data(df):
    if df.isnull().mean().max() > 0.1:
        raise ValueError("Too many missing sensor values")

    for col in ["exhaust_temp_pre", "diff_pressure"]:
        if df[col].std() > 3 * df[col].mean():
            print(f"âš  Potential sensor drift detected in {col}")

In [63]:
# -------------------------
# MODELING
# -------------------------
# Random Forest Regressor to predict soot_load
# - Features include raw sensors + rolling averages + delta
# - Model trained on entire synthetic dataset

In [64]:
FEATURES = [
    "engine_load", "vehicle_speed", "rpm",
    "exhaust_temp_pre", "exhaust_temp_post",
    "diff_pressure", "temp_rolling",
    "pressure_rolling", "temp_delta"
]

def train_model(df):
    X = df[FEATURES]
    y = df["soot_load"]

    model = RandomForestRegressor(
        n_estimators=200,
        max_depth=10,
        random_state=42
    )
    model.fit(X, y)

    preds = model.predict(X)
    mae = mean_absolute_error(y, preds)

    joblib.dump(model, MODEL_PATH)
    print(f"Training MAE: {mae:.2f}")

    return model

In [65]:
# -------------------------
# API SERVING
# -------------------------
# FastAPI endpoints:
# /health -> API status
# /model/info -> model metadata
# /predict/soot-load -> single prediction + regen recommendation


In [66]:
pp = FastAPI(title="DPF Soot Load Prediction API")
model = None

@app.on_event("startup")
def load_model():
    global model
    model = joblib.load(MODEL_PATH)

@app.get("/health")
def health():
    return {"status": "ok", "model_loaded": model is not None}

@app.get("/model/info")
def model_info():
    return {
        "model": "RandomForestRegressor",
        "features": FEATURES,
        "regen_threshold": SOOT_REGEN_THRESHOLD
    }

@app.post("/predict/soot-load")
def predict(payload: dict):
    df = pd.DataFrame([payload])
    missing = set(FEATURES) - set(df.columns)
    if missing:
        return {"error": f"Missing fields: {missing}"}

    soot_pred = model.predict(df[FEATURES])[0]
    recommendation = "REGEN_REQUIRED" if soot_pred > SOOT_REGEN_THRESHOLD else "NORMAL"

    return {
        "predicted_soot_load": round(float(soot_pred), 2),
        "recommendation": recommendation
    }

In [67]:
# -------------------------
# MAIN PIPELINE EXECUTION
# -------------------------
# Step 1: Generate synthetic sensor data
# Step 2: Generate corresponding maintenance events
# Step 3: Feature engineering
# Step 4: Data validation
# Step 5: Train model
# Step 6: Save model artifact

In [68]:
if __name__ == "__main__":
    sensor_df = generate_sensor_data()
    maintenance_df = generate_maintenance_events(sensor_df)

    sensor_df = add_features(sensor_df)
    validate_data(sensor_df)

    trained_model = train_model(sensor_df)

    print("Pipeline completed successfully.")

Training MAE: 0.10
Pipeline completed successfully.
