In [None]:
# Cell 1: Imports and environment setup
# -----------------------------------
# Purpose:
# - Centralize imports
# - Define project directory structure
# - Ensure repeatable execution
# - Keep paths generic and public-safe

import sys
import numpy as np
import pandas as pd
from pathlib import Path
from scipy.fft import rfft, rfftfreq
from sqlalchemy import create_engine

# Project root (generic)
PROJECT_ROOT = Path(
    r"YOUR FOLDER PATH"
)

DATA_DIR = PROJECT_ROOT / "data"
RAW_DIR = DATA_DIR / "raw_telemetry_csv"
PROCESSED_DIR = DATA_DIR / "processed_telemetry"
OUTPUT_DIR = PROJECT_ROOT / "outputs"

for d in [RAW_DIR, PROCESSED_DIR, OUTPUT_DIR]:
    d.mkdir(parents=True, exist_ok=True)

print("Project root:", PROJECT_ROOT)
print("Python version:", sys.version)


In [None]:
# Cell 2: Ingest decoded telemetry files
# ------------------------------------
# Purpose:
# - Treat incoming CSVs as decoded telemetry outputs
# - Preserve raw data for traceability
# - Keep ingestion logic independent of analytics

INCOMING_DIR = DATA_DIR / "incoming_telemetry"
csv_files = list(INCOMING_DIR.glob("*.csv"))

if not csv_files:
    raise RuntimeError("No telemetry CSV files found in incoming_telemetry")

for f in csv_files:
    df = pd.read_csv(f)
    df.to_csv(RAW_DIR / f.name, index=False)

print(f"Ingested {len(csv_files)} telemetry files")


In [None]:
# Cell 3: Processing & analytics-ready signal retention
# ----------------------------------------------------
# Purpose:
# - Clean telemetry signals
# - Enforce physical plausibility
# - Retain only analytics-relevant channels
# - Produce normalized telemetry for downstream analytics

def process_trip(df, source_file):
    df = df.copy()

    # --------------------------------------------------
    # Timestamp handling
    # --------------------------------------------------
    df["timestamp"] = pd.to_datetime(df["timestamp"])
    df = df.sort_values("timestamp")

    # --------------------------------------------------
    # Basic plausibility filtering
    # --------------------------------------------------
    if "engine_rpm" in df:
        df.loc[df["engine_rpm"] < 500, "engine_rpm"] = np.nan
        df.loc[df["engine_rpm"] > 6000, "engine_rpm"] = np.nan

    if "torque_nm" in df:
        df.loc[df["torque_nm"] < -180, "torque_nm"] = np.nan
        df.loc[df["torque_nm"] > 200, "torque_nm"] = np.nan

    if "speed_kmph" in df:
        df.loc[df["speed_kmph"] < 0, "speed_kmph"] = np.nan
        df.loc[df["speed_kmph"] > 140, "speed_kmph"] = np.nan

    # Short-gap fill
    df = df.ffill(limit=5)

    # --------------------------------------------------
    # Mandatory identifiers
    # --------------------------------------------------
    if "vehicle_id" not in df:
        df["vehicle_id"] = source_file.split("_")[0]

    if "scenario" not in df:
        df["scenario"] = "unknown"

    # --------------------------------------------------
    # Distance integration (meters)
    # --------------------------------------------------
    if "distance_m" not in df and "speed_kmph" in df:
        dt = df["timestamp"].diff().dt.total_seconds().fillna(0)
        df["distance_m"] = (df["speed_kmph"] * 1000 / 3600 * dt).cumsum()

    # --------------------------------------------------
    # Analytics-ready signal set (generic)
    # --------------------------------------------------
    REQUIRED_SIGNAL_SET = [
        "timestamp",
        "vehicle_id",
        "scenario",

        "engine_rpm",
        "torque_nm",
        "speed_kmph",
        "grade_pct",
        "distance_m",

        "oil_temp_c",
        "coolant_temp_c",

        "current_gear",
        "selected_gear",
        "clutch_state",

        "vibration_ax_g"
    ]

    df = df[[c for c in REQUIRED_SIGNAL_SET if c in df.columns]]

    return df


# Batch processing
count = 0
for f in RAW_DIR.glob("*.csv"):
    df_raw = pd.read_csv(f)
    df_proc = process_trip(df_raw, f.name)

    df_proc.to_csv(PROCESSED_DIR / f.name, index=False)
    count += 1

print(f"Processed telemetry files: {count}")


In [None]:
# Cell 4: Helper function to load processed telemetry
# --------------------------------------------------
# Purpose:
# - Centralize CSV loading
# - Enforce timestamp ordering

def load_processed_csv(path):
    df = pd.read_csv(path, parse_dates=["timestamp"])
    return df.sort_values("timestamp")


In [None]:
# Cell 5: Operating profile analytics
# ------------------------------------------------
# Purpose:
# - Quantify time spent in observed operating regions
# - Avoid cartesian explosion
# - Produce BI-ready bin indices and labels

RPM_BINS = np.arange(800, 6001, 250)
TORQUE_BINS = np.arange(-180, 201, 25)

rows = []

for f in PROCESSED_DIR.glob("*.csv"):
    df = load_processed_csv(f)[["timestamp", "engine_rpm", "torque_nm"]].dropna()

    df["delta_t"] = df["timestamp"].diff().dt.total_seconds().fillna(0)

    df["rpm_bin_idx"] = pd.cut(df["engine_rpm"], RPM_BINS, labels=False)
    df["torque_bin_idx"] = pd.cut(df["torque_nm"], TORQUE_BINS, labels=False)

    df = df.dropna(subset=["rpm_bin_idx", "torque_bin_idx"])
    df[["rpm_bin_idx", "torque_bin_idx"]] = df[["rpm_bin_idx", "torque_bin_idx"]].astype(int)

    grp = (
        df.groupby(["rpm_bin_idx", "torque_bin_idx"], as_index=False)["delta_t"]
        .sum()
    )

    grp["rpm_bin_low"] = grp["rpm_bin_idx"].apply(lambda i: RPM_BINS[i])
    grp["rpm_bin_high"] = grp["rpm_bin_idx"].apply(lambda i: RPM_BINS[i + 1])
    grp["rpm_bin_label"] = grp["rpm_bin_low"].astype(str) + "–" + grp["rpm_bin_high"].astype(str)

    grp["torque_bin_low"] = grp["torque_bin_idx"].apply(lambda i: TORQUE_BINS[i])
    grp["torque_bin_high"] = grp["torque_bin_idx"].apply(lambda i: TORQUE_BINS[i + 1])
    grp["torque_bin_label"] = grp["torque_bin_low"].astype(str) + "–" + grp["torque_bin_high"].astype(str)

    grp["source_file"] = f.name
    rows.append(grp)

operating_profile_df = pd.concat(rows, ignore_index=True)
operating_profile_df.to_csv(OUTPUT_DIR / "telemetry_operating_profile.csv", index=False)

print("Operating profile analytics generated")


In [None]:
# Cell 6: Thermal behavior analytics
# --------------------------------
# Purpose:
# - Monitor thermal signals
# - Quantify excursions beyond safe operating ranges

rows = []

for f in PROCESSED_DIR.glob("*.csv"):
    df = load_processed_csv(f)[["timestamp", "oil_temp_c", "coolant_temp_c"]].dropna()

    df["oil_over_limit"] = df["oil_temp_c"] > 110
    df["coolant_out_of_range"] = (df["coolant_temp_c"] < 60) | (df["coolant_temp_c"] > 90)

    df["oil_overshoot_count"] = df["oil_over_limit"].astype(int)
    df["coolant_overshoot_count"] = df["coolant_out_of_range"].astype(int)

    df["source_file"] = f.name
    rows.append(df)

thermal_df = pd.concat(rows, ignore_index=True)
thermal_df.to_csv(OUTPUT_DIR / "telemetry_thermal_behavior.csv", index=False)

print("Thermal behavior analytics generated")


In [None]:
# Cell 7: Environmental profile analytics
# -------------------------------------
# Purpose:
# - Analyze distribution of environmental signals (e.g., grade)

rows = []

for f in PROCESSED_DIR.glob("*.csv"):
    df = load_processed_csv(f)

    if "grade_pct" not in df:
        continue

    g = df[["grade_pct"]].dropna()
    g["source_file"] = f.name
    rows.append(g)

env_df = pd.concat(rows, ignore_index=True)
env_df.to_csv(OUTPUT_DIR / "telemetry_environmental_profile.csv", index=False)

print("Environmental profile analytics generated")


In [None]:
# Cell 8: Actuation and state transition analytics
# -----------------------------------------------
# Purpose:
# - Quantify actuation events
# - Detect state mismatches
# - Normalize by distance

rows = []

for f in PROCESSED_DIR.glob("*.csv"):
    df = load_processed_csv(f)

    required = {"current_gear", "selected_gear", "clutch_state", "distance_m"}
    if not required.issubset(df.columns):
        continue

    clutch_diff = df["clutch_state"].diff().fillna(0)

    engage = int((clutch_diff > 0).sum())
    disengage = int((clutch_diff < 0).sum())
    mismatch = int((df["current_gear"] != df["selected_gear"]).sum())

    distance_km = df["distance_m"].max() / 1000 if df["distance_m"].max() > 0 else np.nan
    events_total = engage + disengage

    rows.append({
        "source_file": f.name,
        "vehicle_id": df["vehicle_id"].iloc[0],
        "scenario": df["scenario"].iloc[0],
        "actuation_engage_count": engage,
        "actuation_disengage_count": disengage,
        "state_mismatch_count": mismatch,
        "distance_km": distance_km,
        "events_per_km": events_total / distance_km if distance_km else np.nan
    })

actuation_df = pd.DataFrame(rows)
actuation_df.to_csv(OUTPUT_DIR / "telemetry_actuation_statistics.csv", index=False)

print("Actuation statistics generated")


In [None]:
# Cell 9: Frequency-domain analytics
# --------------------------------
# Purpose:
# - Extract vibration frequency features
# - Compute total spectral energy
# - Identify dominant frequencies

rows = []

for f in PROCESSED_DIR.glob("*.csv"):
    df = load_processed_csv(f)

    if "vibration_ax_g" not in df:
        continue

    sig = df[["timestamp", "vibration_ax_g"]].dropna()

    dt = sig["timestamp"].diff().dt.total_seconds().median()
    dt = 1.0 if not dt or dt <= 0 else dt

    x = sig["vibration_ax_g"].values
    yf = rfft(x)
    xf = rfftfreq(len(x), dt)
    mag = np.abs(yf)

    peaks = np.argsort(mag)[-3:][::-1]

    rows.append({
        "source_file": f.name,
        "fft_energy": float((mag ** 2).sum()),
        "dominant_frequencies": str([(float(xf[i]), float(mag[i])) for i in peaks])
    })

fft_df = pd.DataFrame(rows)
fft_df.to_csv(OUTPUT_DIR / "telemetry_frequency_features.csv", index=False)

print("Frequency-domain analytics generated")


In [None]:
# Cell 10: MySQL database bootstrap
# --------------------------------
# Purpose:
# - Ensure database exists before uploading tables
# - Prevent runtime failures in SQLAlchemy

from sqlalchemy import create_engine, text

MYSQL_USER = "root"
MYSQL_PASSWORD = "mypassword!"
MYSQL_HOST = "localhost"
MYSQL_PORT = "3306"
MYSQL_DB = "telemetry_analytics_db"

# Connect WITHOUT database first
engine_bootstrap = create_engine(
    f"mysql+pymysql://{MYSQL_USER}:{MYSQL_PASSWORD}@{MYSQL_HOST}:{MYSQL_PORT}"
)

with engine_bootstrap.connect() as conn:
    conn.execute(text(f"CREATE DATABASE IF NOT EXISTS {MYSQL_DB}"))
    print(f"Database '{MYSQL_DB}' ensured")

# Create engine WITH database
engine = create_engine(
    f"mysql+pymysql://{MYSQL_USER}:{MYSQL_PASSWORD}@{MYSQL_HOST}:{MYSQL_PORT}/{MYSQL_DB}"
)

print("Connected to MySQL database:", MYSQL_DB)


In [None]:
# Final Cell: Upload analytics outputs to MySQL
# ---------------------------------------------
# Purpose:
# - Upload analytics tables
# - Enable Power BI live connection & refresh

import pandas as pd

def upload_table(csv_file, table_name):
    df = pd.read_csv(OUTPUT_DIR / csv_file)
    df.to_sql(table_name, engine, if_exists="replace", index=False)
    print(f"Uploaded {table_name}: {len(df)} rows")

upload_table("telemetry_operating_profile.csv", "telemetry_operating_profile")
upload_table("telemetry_thermal_behavior.csv", "telemetry_thermal_behavior")
upload_table("telemetry_environmental_profile.csv", "telemetry_environmental_profile")
upload_table("telemetry_actuation_statistics.csv", "telemetry_actuation_statistics")
upload_table("telemetry_frequency_features.csv", "telemetry_frequency_features")

print("All analytics tables uploaded successfully")
