In [8]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

from scipy import stats
from scipy.signal import savgol_filter
from scipy.stats import skew, entropy
from scipy.fft import rfft
import pywt

from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score, mean_absolute_error, mean_squared_error
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
from sklearn.mixture import GaussianMixture

import networkx as nx

from statsmodels.tsa.seasonal import seasonal_decompose
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.statespace.sarimax import SARIMAX

# Optional: use fastdtw if installed, otherwise fallback to a simple approximate distance
try:
    from fastdtw import fastdtw
    from scipy.spatial.distance import euclidean
    has_fastdtw = True
except Exception:
    has_fastdtw = False

import warnings
warnings.filterwarnings("ignore")

In [9]:
DATA_PATH = "sensor_data.csv"
OUTPUT_DIR = "output"
os.makedirs(OUTPUT_DIR, exist_ok=True)

def load_data(path=DATA_PATH):
    df = pd.read_csv(path, parse_dates=["timestamp"])
    # unify column names if necessary
    expected = ["timestamp", "device_id", "temperature_c", "humidity_pct",
                "vibration_level", "pressure_kpa", "light_lux", "gas_ppm", "anomaly_label"]
    # if columns slightly differ, try to normalize (user's file earlier had these names)
    df = df.rename(columns={c: c.strip() for c in df.columns})
    # sort
    df = df.sort_values(["device_id", "timestamp"]).reset_index(drop=True)
    return df

Task 1: Time-Series Integrity & Cleaning
Objectives
Validate time frequency, identify missing intervals, and impute multivariate sensor values.
Requirements
1. Verify that each device reports data every 30 seconds.
2. Identify devices with gaps longer than 5 minutes and count missing intervals.
3. Reconstruct missing timestamps using reindexing.
4. Impute:
• Temperature: linear interpolation
• Humidity: rolling median
• Vibration: exponential weighted mean
• Pressure/Light/Gas: student chooses appropriate techniques
Deliverables
Table of missing timestamps per device and a cleaned DataFrame.

In [13]:
def task1_integrity_and_cleaning(df, freq_seconds=30, report_csv=True):
    """
    Returns:
      missing_df: per-device missing summary
      cleaned_df: reindexed and imputed DataFrame
    """
    freq = pd.Timedelta(seconds=freq_seconds)
    missing_info = []

    # ensure timestamp dtype
    df["timestamp"] = pd.to_datetime(df["timestamp"])

    for device, g in df.groupby("device_id"):
        diffs = g["timestamp"].diff()
        # gaps bigger than expected
        gaps = diffs[diffs > freq]
        total_missing = sum(int(gap / freq) - 1 for gap in gaps)
        # count gaps specifically over 5 minutes
        gaps_over_5 = (diffs > pd.Timedelta(minutes=5)).sum()
        missing_info.append([device, int(gaps_over_5), int(total_missing)])

    missing_df = pd.DataFrame(missing_info, columns=["device_id", "num_gaps_over_5min", "missing_intervals"])
    missing_df = missing_df.sort_values("missing_intervals", ascending=False).reset_index(drop=True)
    if report_csv:
        missing_df.to_csv(os.path.join(OUTPUT_DIR, "missing_summary_per_device.csv"), index=False)

    # Reconstruct and impute
    cleaned_devices = []
    for device, g in df.groupby("device_id"):
        g = g.set_index("timestamp").sort_index()
        full_index = pd.date_range(start=g.index.min(), end=g.index.max(), freq=f"{freq_seconds}s")
        g = g.reindex(full_index)
        g["device_id"] = device

        # Impute rules
        # Temperature: linear interpolation
        if "temperature_c" in g.columns:
            g["temperature_c"] = g["temperature_c"].interpolate(method="linear")

        # Humidity: rolling median (window = 5 samples ~ 2.5 minutes)
        if "humidity_pct" in g.columns:
            g["humidity_pct"] = g["humidity_pct"].fillna(
                g["humidity_pct"].rolling(window=5, center=True, min_periods=1).median()
            )

        # Vibration: exponential weighted mean (denoised baseline)
        if "vibration_level" in g.columns:
            g["vibration_level_raw"] = g["vibration_level"]  # keep a raw copy
            g["vibration_level"] = g["vibration_level"].ewm(span=10, min_periods=1).mean()

        # Pressure: time interpolation (linear in time)
        if "pressure_kpa" in g.columns:
            g["pressure_kpa"] = g["pressure_kpa"].interpolate(method="time")

        # Light: forward fill (blackouts expected -> 0), if still missing -> 0
        if "light_lux" in g.columns:
            g["light_lux"] = g["light_lux"].fillna(method="ffill").fillna(0)

        # Gas: linear interpolation
        if "gas_ppm" in g.columns:
            g["gas_ppm"] = g["gas_ppm"].interpolate(method="linear")

        # anomaly_label: keep original where present; optional fill
        if "anomaly_label" in g.columns:
            # leave as-is: interpolation doesn't make sense for labels
            pass

        cleaned_devices.append(g)

    cleaned_df = pd.concat(cleaned_devices).reset_index().rename(columns={"index": "timestamp"})
    # Save cleaned
    cleaned_df.to_csv(os.path.join(OUTPUT_DIR, "cleaned_sensor_data.csv"), index=False)
    return missing_df, cleaned_df

Task 2: Noise Reduction & Signal Decomposition
Objectives
Denoise signals and analyze underlying trends/seasonality.

2

Requirements
1. Apply Savitzky–Golay smoothing to temperature.
2. Apply exponential moving average to humidity.
3. Perform wavelet denoising on vibration.
4. Use seasonal decompose() to extract trend, seasonal, and residual components
for each sensor variable.

Deliverables
Before–after plots and a summary of which smoothing technique performed best.

In [14]:
def task2_denoise_and_decompose(cleaned_df, device=None, plot=True):
    """
    Apply smoothing/denoising and seasonal decompose for sensors.
    If device is None, runs an example on the first device.
    Returns a dict of denoised series and decomposition results per variable.
    """
    sensors = ["temperature_c", "humidity_pct", "vibration_level", "pressure_kpa", "light_lux", "gas_ppm"]
    result = {}

    # choose a device sample
    if device is None:
        device = cleaned_df["device_id"].unique()[0]

    df_dev = cleaned_df[cleaned_df["device_id"] == device].set_index("timestamp").sort_index()

    # 1) Savitzky–Golay on temperature
    temp = df_dev["temperature_c"].copy().dropna()
    window_length = 11 if len(temp) >= 11 else (len(temp) // 2) * 2 + 1
    if window_length < 3:
        temp_sg = temp
    else:
        temp_sg = pd.Series(savgol_filter(temp.values, window_length=window_length, polyorder=2), index=temp.index)

    # 2) Exponential moving average on humidity
    hum = df_dev["humidity_pct"].copy()
    hum_ema = hum.ewm(span=10, adjust=False).mean()

    # 3) Wavelet denoising on vibration
    vib = df_dev["vibration_level"].dropna()
    if len(vib) > 10:
        # pywt denoising: use 'db4' and universal thresholding
        coeffs = pywt.wavedec(vib.values, "db4", level=3)
        sigma = np.median(np.abs(coeffs[-1])) / 0.6745
        uthresh = sigma * np.sqrt(2 * np.log(len(vib)))
        denoised_coeffs = [pywt.threshold(c, value=uthresh, mode="soft") for c in coeffs]
        vib_denoised_vals = pywt.waverec(denoised_coeffs, "db4")
        vib_denoised = pd.Series(vib_denoised_vals[:len(vib)], index=vib.index)
    else:
        vib_denoised = vib

    # 4) Seasonal decomposition for each sensor variable (additive)
    decompositions = {}
    for s in sensors:
        series = df_dev[s].dropna()
        if len(series) > 24:  # enough data for decomposition
            # infer freq in samples per day: if 30s samples then 2880 per day -> that's big.
            # seasonal_decompose requires a period; choose a periodCandidate like daily or hourly based on length.
            # We'll try a day-period if length allows, otherwise fallback to 24-hourly period converted to samples.
            try:
                period = None
                # infer: if sample rate is 30s -> samples per day = 24*60*60/30 = 2880
                # but seasonal_decompose may choke on large periods; use smaller period if data shorter.
                samples_per_second = 1 / np.median(np.diff(series.index.astype(np.int64) // 10**9))
                # not strictly necessary — choose period=2880 if enough length else 1440 or 288
                if len(series) > 2880 * 2:
                    period = 2880
                elif len(series) > 1440:
                    period = 1440
                elif len(series) > 288:
                    period = 288
                else:
                    period = None
                if period:
                    dec = seasonal_decompose(series, period=period, model="additive", extrapolate_trend="freq")
                    decompositions[s] = dec
                else:
                    decompositions[s] = None
            except Exception:
                decompositions[s] = None
        else:
            decompositions[s] = None

    result["temperature_sg"] = temp_sg
    result["humidity_ema"] = hum_ema
    result["vibration_wavelet"] = vib_denoised
    result["decompositions"] = decompositions

    if plot:
        plt.figure(figsize=(14, 6))
        plt.plot(df_dev["temperature_c"], alpha=0.4, label="raw temp")
        plt.plot(temp_sg, label="Savitzky-Golay")
        plt.title(f"Temperature smoothing for {device}")
        plt.legend()
        plt.savefig(os.path.join(OUTPUT_DIR, f"{device}_temperature_smoothing.png"))
        plt.close()

        plt.figure(figsize=(14, 6))
        plt.plot(df_dev["humidity_pct"], alpha=0.4, label="raw humidity")
        plt.plot(hum_ema, label="EMA humidity")
        plt.title(f"Humidity smoothing for {device}")
        plt.legend()
        plt.savefig(os.path.join(OUTPUT_DIR, f"{device}_humidity_ema.png"))
        plt.close()

        plt.figure(figsize=(14, 6))
        plt.plot(df_dev["vibration_level"], alpha=0.4, label="raw vib")
        plt.plot(vib_denoised, label="wavelet denoised")
        plt.title(f"Vibration denoising for {device}")
        plt.legend()
        plt.savefig(os.path.join(OUTPUT_DIR, f"{device}_vibration_wavelet.png"))
        plt.close()

    # return results for downstream
    return result

Task 3: Device Behavior Profiling & Clustering
Objectives
Create behavioral fingerprints for each device and cluster them.
Requirements
1. Compute per-device summary statistics: mean, variance, quantiles, slopes, and
threshold exceedance proportions.
2. Construct a feature matrix for all 100 devices.
3. Determine optimal clusters using K-means and silhouette score.
4. Visualize clusters via PCA (2D).
Deliverables
Cluster assignments and a PCA scatterplot of device behavior profiles.

In [15]:
def task3_profile_and_cluster(cleaned_df, n_clusters_range=(2, 8), random_state=42, save=True):
    devices = cleaned_df["device_id"].unique()
    feature_rows = []

    def slope(series):
        if series.isnull().all() or len(series.dropna()) < 2:
            return 0.0
        x = np.arange(len(series))
        y = series.fillna(method="ffill").fillna(method="bfill").values
        m, b = np.polyfit(x, y, 1)
        return m

    for device in devices:
        d = cleaned_df[cleaned_df["device_id"] == device].set_index("timestamp")
        row = {"device_id": device}
        for col in ["temperature_c", "humidity_pct", "vibration_level", "pressure_kpa", "light_lux", "gas_ppm"]:
            s = d[col].dropna()
            row[f"{col}_mean"] = s.mean() if len(s) else np.nan
            row[f"{col}_var"] = s.var() if len(s) else np.nan
            row[f"{col}_q25"] = s.quantile(0.25) if len(s) else np.nan
            row[f"{col}_q50"] = s.quantile(0.50) if len(s) else np.nan
            row[f"{col}_q75"] = s.quantile(0.75) if len(s) else np.nan
            row[f"{col}_slope"] = slope(s)
            # threshold exceedance proportion: > mean + 2*std
            if len(s) > 0:
                th = s.mean() + 2 * s.std()
                row[f"{col}_th_exceed_prop"] = (s > th).mean()
            else:
                row[f"{col}_th_exceed_prop"] = np.nan
        feature_rows.append(row)

    features_df = pd.DataFrame(feature_rows).set_index("device_id")

    # Fill NAs with column medians
    features_df = features_df.fillna(features_df.median())

    # Scale
    scaler = StandardScaler()
    X = scaler.fit_transform(features_df)

    # Choose k by silhouette score
    best_k = None
    best_score = -1
    best_labels = None
    for k in range(n_clusters_range[0], n_clusters_range[1] + 1):
        km = KMeans(n_clusters=k, random_state=random_state, n_init=10).fit(X)
        labels = km.labels_
        sc = silhouette_score(X, labels) if len(np.unique(labels)) > 1 else -1
        if sc > best_score:
            best_score = sc
            best_k = k
            best_labels = labels
            best_km = km

    features_df["cluster"] = best_labels

    # PCA for visualization
    pca = PCA(n_components=2, random_state=random_state)
    pcs = pca.fit_transform(X)
    features_df["pc1"] = pcs[:, 0]
    features_df["pc2"] = pcs[:, 1]

    if save:
        features_df.to_csv(os.path.join(OUTPUT_DIR, "device_profiles_clusters.csv"))
        plt.figure(figsize=(10, 7))
        sns.scatterplot(data=features_df, x="pc1", y="pc2", hue="cluster", palette="tab10", s=100)
        for i, d in enumerate(features_df.index):
            plt.text(features_df.loc[d, "pc1"] + 0.01, features_df.loc[d, "pc2"] + 0.01, d, fontsize=8)
        plt.title(f"Device clustering (k={best_k}, silhouette={best_score:.3f})")
        plt.savefig(os.path.join(OUTPUT_DIR, "device_cluster_pca.png"))
        plt.close()

    return features_df, best_k, best_score

Task 4: Rolling Windows & Advanced Feature Ex-
traction

Objectives
Capture local behavior changes in sensors.

3

Requirements
1. Compute rolling features for 1, 5, and 10-minute windows:
• Mean
• Standard deviation
• Skewness
• Entropy
• FFT energy
2. Detect local spikes or drops in sensor behavior.
Deliverables
A feature table and visualizations of rolling entropy for a selected device.

In [16]:
def compute_entropy(arr, base=2):
    # normalize histogram
    arr = np.array(arr)
    if len(arr) == 0:
        return 0.0
    hist, _ = np.histogram(arr, bins=20, density=True)
    hist = hist[hist > 0]
    return entropy(hist, base=base)

def task4_rolling_features(cleaned_df, device, windows_seconds=(60, 300, 600), freq_seconds=30):
    """
    windows_seconds: windows in seconds e.g., 60,300,600 -> 1,5,10 minutes
    returns feature_df with hierarchical column names: sensor_window_feature
    """
    df_dev = cleaned_df[cleaned_df["device_id"] == device].set_index("timestamp").sort_index()
    sensors = ["temperature_c", "humidity_pct", "vibration_level", "pressure_kpa", "light_lux", "gas_ppm"]
    feature_frames = []

    for win_s in windows_seconds:
        win_samples = int(win_s / freq_seconds)
        # rolling window
        roll = df_dev[sensors].rolling(window=win_samples, min_periods=1, center=False)
        feat = pd.DataFrame(index=df_dev.index)
        for s in sensors:
            feat[f"{s}_mean_{win_s}s"] = roll[s].mean()
            feat[f"{s}_std_{win_s}s"] = roll[s].std().fillna(0)
            feat[f"{s}_skew_{win_s}s"] = roll[s].apply(lambda x: skew(x.dropna()) if len(x.dropna()) > 2 else 0.0)
            feat[f"{s}_entropy_{win_s}s"] = roll[s].apply(lambda x: compute_entropy(x.dropna()) if len(x.dropna()) > 2 else 0.0)
            # FFT energy: compute energy of rfft magnitudes in the window
            def fft_energy(x):
                x = np.array(x.dropna())
                if len(x) == 0:
                    return 0.0
                spec = np.abs(rfft(x))
                return np.sum(spec ** 2) / len(spec)
            feat[f"{s}_fft_energy_{win_s}s"] = roll[s].apply(fft_energy)

        feature_frames.append(feat)

    features_concat = pd.concat(feature_frames, axis=1)
    features_concat.to_csv(os.path.join(OUTPUT_DIR, f"{device}_rolling_features.csv"))
    return features_concat

Task 5: Multivariate Correlation & Cross-Device Syn-
chronization

Objectives
Study relationships between sensors and alignments across devices.
Requirements
1. Compute Pearson and Spearman correlations for all sensor pairs.
2. Compute cross-correlation between at least three device pairs.
3. Identify synchronized or environmentally coupled devices.
Deliverables
Correlation heatmap and analysis of synchronized device groups.

In [17]:
def task5_correlations(cleaned_df, sample_fraction=0.1, save=True):
    sensors = ["temperature_c", "humidity_pct", "vibration_level", "pressure_kpa", "light_lux", "gas_ppm"]
    # compute correlations by pooling all devices (or aggregated)
    pivot = cleaned_df.set_index("timestamp").groupby("device_id")[sensors].mean().T
    # Pearson & Spearman across sensors pooled by device-means
    pear = pivot.corr(method="pearson")
    spear = pivot.corr(method="spearman")

    if save:
        plt.figure(figsize=(10, 8))
        sns.heatmap(pear, annot=True, fmt=".2f", cmap="coolwarm")
        plt.title("Pearson correlation between sensors (device-mean aggregated)")
        plt.savefig(os.path.join(OUTPUT_DIR, "sensor_correlation_pearson.png"))
        plt.close()

        plt.figure(figsize=(10, 8))
        sns.heatmap(spear, annot=True, fmt=".2f", cmap="coolwarm")
        plt.title("Spearman correlation between sensors (device-mean aggregated)")
        plt.savefig(os.path.join(OUTPUT_DIR, "sensor_correlation_spearman.png"))
        plt.close()

    # Cross-correlation between device pairs: pick three pairs (first 3 devices)
    devices = cleaned_df["device_id"].unique()[:6]
    xc_results = {}
    for i in range(0, min(3, len(devices)-1)):
        d1 = devices[i]
        d2 = devices[i+1]
        # take temperature series, align timestamps by reindexing to common index
        s1 = cleaned_df[cleaned_df["device_id"] == d1].set_index("timestamp")["temperature_c"]
        s2 = cleaned_df[cleaned_df["device_id"] == d2].set_index("timestamp")["temperature_c"]
        common_index = s1.dropna().index.intersection(s2.dropna().index)
        s1a = s1.reindex(common_index).fillna(method="ffill").fillna(method="bfill")
        s2a = s2.reindex(common_index).fillna(method="ffill").fillna(method="bfill")
        # compute cross-correlation (normalized)
        corr = np.correlate((s1a - s1a.mean())/s1a.std(), (s2a - s2a.mean())/s2a.std(), mode="full")
        lags = np.arange(-len(s1a)+1, len(s1a))
        max_idx = np.argmax(corr)
        lag_at_max = lags[max_idx]
        xc_results[f"{d1}_vs_{d2}"] = {"lag": int(lag_at_max), "max_corr": float(corr[max_idx])}
    return pear, spear, xc_results

Task 6: Multi-Method Anomaly Detection
Objectives
Detect anomalies using at least three distinct approaches.

4

Methods Allowed
- Statistical: Z-score, IQR
-  Machine Learning: Isolation Forest, Local Outlier Factor
- Probabilistic: Gaussian Mixture anomaly probabilities
- Sequence-Based (optional advanced): Autoencoder or LSTM prediction error

Deliverables
Anomaly labels, precision/recall comparison, and annotated time-series plots.

In [18]:
def task6_anomaly_detection(cleaned_df, device, contamination=0.01, save=True):
    d = cleaned_df[cleaned_df["device_id"] == device].set_index("timestamp").sort_index()
    sensors = ["temperature_c", "humidity_pct", "vibration_level", "pressure_kpa", "light_lux", "gas_ppm"]
    X = d[sensors].fillna(method="ffill").fillna(method="bfill").values

    # 1) Statistical: Z-score and IQR per sensor -> binary label if any sensor flagged
    z_scores = np.abs(stats.zscore(X, nan_policy="omit"))
    z_anom = (z_scores > 3).any(axis=1)

    # IQR
    q1 = np.nanpercentile(X, 25, axis=0)
    q3 = np.nanpercentile(X, 75, axis=0)
    iqr = q3 - q1
    lower = q1 - 1.5 * iqr
    upper = q3 + 1.5 * iqr
    iqr_anom = ((X < lower) | (X > upper)).any(axis=1)

    # 2) Isolation Forest
    iso = IsolationForest(contamination=contamination, random_state=42)
    iso_labels = iso.fit_predict(X)
    iso_anom = (iso_labels == -1)

    # 3) LOF
    lof = LocalOutlierFactor(n_neighbors=20, contamination=contamination)
    lof_labels = lof.fit_predict(X)
    lof_anom = (lof_labels == -1)

    # 4) GMM probability: low-likelihood points flagged
    gmm = GaussianMixture(n_components=2, random_state=42)
    try:
        gmm.fit(X)
        scores = gmm.score_samples(X)  # log-likelihood
        threshold = np.percentile(scores, 100 * contamination)
        gmm_anom = scores < threshold
    except Exception:
        gmm_anom = np.zeros(len(X), dtype=bool)
        scores = np.zeros(len(X))

    # Consolidate: create DataFrame of anomaly flags
    anom_df = pd.DataFrame(index=d.index)
    anom_df["z_anom"] = z_anom
    anom_df["iqr_anom"] = iqr_anom
    anom_df["iso_anom"] = iso_anom
    anom_df["lof_anom"] = lof_anom
    anom_df["gmm_anom"] = gmm_anom
    # voting: anomaly if 2+ methods flag
    anom_df["anomaly_vote"] = anom_df.sum(axis=1) >= 2

    if save:
        anom_df.astype(int).to_csv(os.path.join(OUTPUT_DIR, f"{device}_anomalies_methods.csv"))
        # quick plot
        plt.figure(figsize=(14, 5))
        plt.plot(d.index, d["temperature_c"], label="temperature")
        plt.scatter(anom_df[anom_df["anomaly_vote"]].index, d.loc[anom_df[anom_df["anomaly_vote"]].index, "temperature_c"],
                    color="red", s=20, label="anomaly_vote")
        plt.title(f"Anomaly votes for {device}")
        plt.legend()
        plt.savefig(os.path.join(OUTPUT_DIR, f"{device}_anomaly_plot.png"))
        plt.close()

    # compute simple precision/recall if original labels exist
    if "anomaly_label" in d.columns:
        true = d["anomaly_label"].fillna(0).astype(int).values
        pred = anom_df["anomaly_vote"].astype(int).values
        # avoid zeros length
        if len(true) == len(pred):
            tp = ((true == 1) & (pred == 1)).sum()
            fp = ((true == 0) & (pred == 1)).sum()
            fn = ((true == 1) & (pred == 0)).sum()
            precision = tp / (tp + fp) if (tp + fp) > 0 else np.nan
            recall = tp / (tp + fn) if (tp + fn) > 0 else np.nan
        else:
            precision, recall = np.nan, np.nan
    else:
        precision, recall = np.nan, np.nan

    return anom_df, {"precision": precision, "recall": recall}

Task 7: Sensor Health Scoring
Objectives
Evaluate each sensor’s reliability.
Requirements
1. Compute:
- Stability score (variance)
- Missing data score
- Noise score (frequency-domain)
- Anomaly rate
2. Construct a composite index (0–100).
3. Rank devices from most to least reliable.

In [19]:
def task7_sensor_health(cleaned_df):
    devices = cleaned_df["device_id"].unique()
    rows = []
    sensors = ["temperature_c", "humidity_pct", "vibration_level", "pressure_kpa", "light_lux", "gas_ppm"]

    for device in devices:
        d = cleaned_df[cleaned_df["device_id"] == device]
        total_count = len(d)
        row = {"device_id": device}

        # stability: inverse of variance aggregated across sensors
        variances = {}
        for s in sensors:
            variances[s] = d[s].var(skipna=True)
        stability_score = 100 - np.nanmean([min(variances[s], np.nanmedian(list(variances.values()))*10) for s in sensors])  # rough
        # missing data score
        missing_prop = d[sensors].isna().mean().mean()  # average missing proportion across sensors
        missing_score = 100 * (1 - missing_prop)
        # noise score (FFT energy variance as proxy)
        noise_scores = []
        for s in sensors:
            series = d[s].dropna()
            if len(series) > 10:
                spec = np.abs(rfft(series - series.mean()))
                noise_scores.append(np.var(spec))
            else:
                noise_scores.append(0.0)
        noise_score = 100 - np.nanmean(noise_scores)/ (np.nanmax(noise_scores)+1e-9) * 100  # normalize-ish
        # anomaly rate
        anomaly_rate = d["anomaly_label"].fillna(0).astype(int).mean() if "anomaly_label" in d.columns else 0
        anomaly_score = 100 * (1 - anomaly_rate)

        # composite index: weighted sum
        composite = 0.35 * stability_score + 0.25 * missing_score + 0.2 * noise_score + 0.2 * anomaly_score
        row.update({
            "stability_score": float(np.clip(stability_score, 0, 100)),
            "missing_score": float(np.clip(missing_score, 0, 100)),
            "noise_score": float(np.clip(noise_score, 0, 100)),
            "anomaly_score": float(np.clip(anomaly_score, 0, 100)),
            "composite_score": float(np.clip(composite, 0, 100))
        })
        rows.append(row)

    health_df = pd.DataFrame(rows).set_index("device_id").sort_values("composite_score", ascending=False)
    health_df.to_csv(os.path.join(OUTPUT_DIR, "device_health_scores.csv"))
    return health_df

Task 8: Forecasting
Objectives
Forecast the next 24 hours of temperature for any device.
Methods Allowed
- ARIMA / SARIMA
- Prophet
- LSTM or GRU
- Temporal CNN

5

Deliverables
Forecast plot, MAE/RMSE metrics, and comparison of two models.

In [20]:
def task8_forecast_temperature(cleaned_df, device, horizon_hours=24, freq_seconds=30, model_choice="ARIMA"):
    """
    Forecast next horizon_hours for temperature using ARIMA/SARIMAX.
    Returns forecast series and error metrics (if holdout present).
    """
    d = cleaned_df[cleaned_df["device_id"] == device].set_index("timestamp").sort_index()
    ts = d["temperature_c"].dropna()
    if len(ts) < 50:
        raise ValueError("Not enough data for forecasting")

    # resample to 30s if not strict
    ts = ts.asfreq(f"{freq_seconds}s").fillna(method="ffill")

    # train/test split: last 24h as test if exists
    samples_per_hour = int(3600 / freq_seconds)
    horizon = horizon_hours * samples_per_hour
    if len(ts) > horizon * 2:
        train, test = ts[:-horizon], ts[-horizon:]
    else:
        train, test = ts[:-horizon], ts[-horizon:]  # may be small; still try

    if model_choice == "ARIMA":
        # simple ARIMA on differenced series with auto-order selection is not done here due to time.
        model = ARIMA(train, order=(2,1,2))
        fit = model.fit()
        pred = fit.get_forecast(steps=horizon)
        forecast = pred.predicted_mean
    elif model_choice == "SARIMAX":
        # seasonal example with weekly seasonality - example params; tune in practice
        model = SARIMAX(train, order=(1,1,1), seasonal_order=(0,1,1, samples_per_hour*24//2 if samples_per_hour*24//2 < len(train) else 0))
        fit = model.fit(disp=False)
        forecast = fit.get_forecast(steps=horizon).predicted_mean
    else:
        raise ValueError("Only ARIMA/SARIMAX implemented in this example")

    # metrics
    if len(test) == len(forecast):
        mae = mean_absolute_error(test, forecast)
        rmse = np.sqrt(mean_squared_error(test, forecast))
    else:
        mae = np.nan; rmse = np.nan

    # plot
    plt.figure(figsize=(12, 5))
    plt.plot(train.index[-(samples_per_hour*24):], train[-(samples_per_hour*24):], label="train (last 24h)")
    if len(test)>0:
        plt.plot(test.index, test, label="test (actual)")
    plt.plot(pd.date_range(start=ts.index[-1] + pd.Timedelta(seconds=freq_seconds), periods=horizon, freq=f"{freq_seconds}s"), forecast, label="forecast")
    plt.title(f"Temperature forecast for {device} — {model_choice}")
    plt.legend()
    plt.savefig(os.path.join(OUTPUT_DIR, f"{device}_temperature_forecast_{model_choice}.png"))
    plt.close()

    return forecast, {"mae": mae, "rmse": rmse}

Task 9: Cross-Device Similarity Search
Objectives
Identify similar devices for maintenance and monitoring.
Requirements
1. Compute DTW similarity between devices.
2. Use Euclidean distance on window-based features as an alternate metric.
3. Visualize similarity network using NetworkX.
Deliverables
Top-5 nearest neighbors per device and a similarity graph.

In [21]:
def task9_similarity_search(cleaned_df, metric="dtw", device_sample=None, top_k=5):
    devices = cleaned_df["device_id"].unique()
    if device_sample is None:
        device_sample = devices[0]
    # create short normalized time series per device for temperature (first N points)
    ts_by_device = {}
    N = 2000  # cap for distance computation
    for device in devices:
        s = cleaned_df[cleaned_df["device_id"] == device].set_index("timestamp")["temperature_c"].dropna()
        s = s.iloc[:N]
        if len(s) < 10:
            continue
        s = (s - s.mean()) / (s.std() + 1e-9)
        ts_by_device[device] = s.values

    target = ts_by_device.get(device_sample)
    if target is None:
        raise ValueError("device_sample has no valid series")

    distances = []
    for d, series in ts_by_device.items():
        if d == device_sample:
            continue
        if metric == "dtw" and has_fastdtw:
            dist, _ = fastdtw(target, series, dist=euclidean)
        else:
            # fallback: Euclidean distance on truncated or padded series
            L = min(len(target), len(series))
            dist = np.linalg.norm(target[:L] - series[:L])
        distances.append((d, dist))
    distances = sorted(distances, key=lambda x: x[1])
    topk = distances[:top_k]

    # build network graph
    G = nx.Graph()
    G.add_node(device_sample)
    for d, dist in topk:
        G.add_node(d)
        G.add_edge(device_sample, d, weight=float(dist))

    # save graph in GML and plot
    nx.write_gml(G, os.path.join(OUTPUT_DIR, f"{device_sample}_similarity_graph.gml"))
    plt.figure(figsize=(6, 6))
    pos = nx.spring_layout(G, seed=42)
    nx.draw(G, pos, with_labels=True, node_size=500, font_size=8)
    edge_labels = nx.get_edge_attributes(G, "weight")
    nx.draw_networkx_edge_labels(G, pos, edge_labels=edge_labels, font_size=7)
    plt.title(f"Top-{top_k} similar devices to {device_sample} using {metric}")
    plt.savefig(os.path.join(OUTPUT_DIR, f"{device_sample}_similarity_network_{metric}.png"))
    plt.close()

    return topk, G

Task 10: Final Report
A professional report including:
- Cleaning and preprocessing summary
- Methods used
- Key insights (clusters, anomalies, reliability, forecasting)
- Business/operational interpretations
- Appendices with plots and tables

In [22]:
def generate_report():
    # This is a placeholder function that collects artifacts and writes a simple index
    artifacts = [f for f in os.listdir(OUTPUT_DIR) if not f.startswith(".")]
    with open(os.path.join(OUTPUT_DIR, "report_index.txt"), "w") as f:
        f.write("Artifacts produced:\n\n")
        for a in artifacts:
            f.write(a + "\n")
    print("Report index generated at:", os.path.join(OUTPUT_DIR, "report_index.txt"))

# -------------------------
# Example orchestration / quick-run
# -------------------------
if __name__ == "__main__":
    df = load_data(DATA_PATH)
    print("Loaded rows:", len(df))

    # Task 1
    missing_df, cleaned_df = task1_integrity_and_cleaning(df)
    print("Missing summary saved. Cleaned saved.")

    # Task 2 (example on first device)
    sample_device = cleaned_df["device_id"].unique()[0]
    t2 = task2_denoise_and_decompose(cleaned_df, device=sample_device)
    print("Denoising & decomposition done for", sample_device)

    # Task 3
    profiles_df, best_k, best_score = task3_profile_and_cluster(cleaned_df)
    print("Clustering done: k=", best_k, "silhouette=", best_score)

    # Task 4 (rolling features for sample device)
    rolling_feats = task4_rolling_features(cleaned_df, sample_device)
    print("Rolling features saved for", sample_device)

    # Task 5
    pearson, spearman, xc = task5_correlations(cleaned_df)
    print("Correlation images saved. Cross-corr sample:", xc)

    # Task 6 (anomalies sample device)
    anom_df, metrics = task6_anomaly_detection(cleaned_df, sample_device)
    print("Anomaly detection done. precision/recall:", metrics)

    # Task 7
    health = task7_sensor_health(cleaned_df)
    print("Health scoring done. Top devices:\n", health.head())

    # Task 8 (forecast for sample device)
    try:
        forecast, fmetrics = task8_forecast_temperature(cleaned_df, sample_device, model_choice="ARIMA")
        print("Forecast done. MAE/RMSE:", fmetrics)
    except Exception as e:
        print("Forecast error:", e)

    # Task 9
    topk, G = task9_similarity_search(cleaned_df, metric="dtw" if has_fastdtw else "euclidean", device_sample=sample_device)
    print("Top similar devices:", topk)

    # Task 10
    generate_report()
    print("Pipeline finished. Check the 'output' folder for artifacts.")

Loaded rows: 20000
Missing summary saved. Cleaned saved.
Denoising & decomposition done for DEV_001
Clustering done: k= 2 silhouette= 0.10620563116123012
Rolling features saved for DEV_001
Correlation images saved. Cross-corr sample: {'DEV_001_vs_DEV_002': {'lag': 5075, 'max_corr': 4665.065089337741}, 'DEV_002_vs_DEV_003': {'lag': 8511, 'max_corr': 2975.467108382855}, 'DEV_003_vs_DEV_004': {'lag': -8984, 'max_corr': 3766.1811014270597}}
Anomaly detection done. precision/recall: {'precision': np.float64(0.005097706032285472), 'recall': np.float64(1.0)}
Health scoring done. Top devices:
            stability_score  missing_score  noise_score  anomaly_score  \
device_id                                                               
DEV_001                0.0      84.120438    82.609459      99.969823   
DEV_002                0.0      84.140178    82.291499      99.959700   
DEV_003                0.0      84.169803    82.354740      99.969887   
DEV_004                0.0      84.126266 