# Algebraic and AI-Assisted Anomaly Detection for Robotic Sensor Data

## Refinements

- Residual-based -> substitute OLS with **Ridge-Regularized regression** to take into account the multicollinearity between data (sensor data, typically correlated)
- Covariance-based -> substitute empirical cov with **Minimum Covariance Determinant estimator**, to improve normal-data fitting and not allow anomalies to distort the shape of normal data

In [None]:
# --- 1. Imports ---
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path

from sklearn.linear_model import RidgeCV
from sklearn.covariance import MinCovDet, EmpiricalCovariance
from scipy.stats import chi2

SEED = 42
np.random.seed(SEED)

plt.style.use("seaborn-v0_8-darkgrid")
pd.set_option("display.max_columns", None)

# --- 2. Paths ---
data_path = Path("../data/normalized")
files = sorted(data_path.glob("*.csv"))

print("[INFO] Normalized datasets found:")
for f in files:
    print(f" - {f.name}")

# --- 3. Load datasets ---
datasets = {f.stem.replace("_norm", ""): pd.read_csv(f) for f in files}
print(f"[INFO] Datasets loaded")

# --- 4. Quick overview ---
for name, df in datasets.items():
    print(f"\n{name}: {df.shape[0]} samples x {df.shape[1]} features")
    print(f"{df.head(1)}")



"""Part 1 -> RESIDUAL-BASED ANOMALY DETECTION"""

residual_scores = {}
RIDGE_ALPHAS = np.logspace(-4, 2, 9) # safe alpha grid for sensor data

for name, df in datasets.items():
    num_df = df.select_dtypes(include=np.number).dropna()
    X = num_df.values
    n, d = X.shape
    print(f"\n--> Residual-based (RidgeCV) on {name} ({n}x{d})")

    residual_matrix = np.zeros_like(X)

    for i in range(d):
        X_i = np.delete(X, i, axis=1)
        y_i = X[:, i]

        model = RidgeCV(alphas=RIDGE_ALPHAS, fit_intercept=True, store_cv_values=False)
        model.fit(X_i, y_i)
        y_pred = model.predict(X_i)
        residual_matrix[:, i] = y_i - y_pred
    
    residual_score = np.mean(np.abs(residual_matrix), axis=1)
    residual_scores[name] = residual_score

    # Plot distribution
    plt.figure(figsize=(6,3))
    sns.histplot(residual_score, bins=30, kde=True)
    plt.title(f"{name} - Residual (RidgeCV) anomaly score distribution")
    plt.xlabel("Mean absolute residual")
    plt.ylabel("Count")
    plt.show()


"""Part 2 -> Covariance-based Anomaly Detection"""

covariance_scores = {}

for name, df in datasets.items():
    num_df = df.select_dtypes(include=np.number).dropna()
    X = num_df.values
    n, d = X.shape

    try:
        mcd = MinCovDet(random_state=SEED, support_function=None).fit(X)
        mahalanobis_sq = mcd.mahalanobis(X)
        inlier_ratio = mcd.support_.mean()
        print(f"[INFO] {name}: MCD inlier ratio = {inlier_ratio:.1%}")
    except Exception as e:
        print(f"[WARNING] MCD failed on {name} ({e}), fallback to EMPIRICAL COV")
        emp_cov = EmpiricalCovariance().fit(X)
        mahalanobis_sq = emp_cov.mahalanobis(X)
    
    covariance_scores[name] = mahalanobis_sq

    # Chi-square thresh
    thresh = chi2.ppf(0.99, df=d)
    anomaly_mask = mahalanobis_sq > thresh
    anomaly_ratio = anomaly_mask.mean()

    print(f"\n--> {name}: Covariance-based anomalies above threshold: {anomaly_ratio:.2%}")

    # Plot
    plt.figure(figsize=(7,3))
    sns.histplot(mahalanobis_sq, bins=30, kde=True)
    plt.axvline(thresh, color='r', linestyle='--', label=f"Threshold (χ², 0.99)")
    plt.title(f"{name} - Mahalanobis Distance² Distribution (Robust MCD)")
    plt.xlabel("d² value")
    plt.legend()
    plt.show()


"""Part 3.1 -> Compare and Visualize"""

results_summary = []

for name in datasets.keys():
    residual_mean = np.mean(residual_scores[name])
    residual_std = np.std(residual_scores[name])
    cov_mean = np.mean(covariance_scores[name])
    cov_std = np.std(covariance_scores[name])
    results_summary.append({
        "Dataset": name,
        "Residual_mean": residual_mean,
        "Residual_std": residual_std,
        "Covariance_mean": cov_mean,
        "Covariance_std": cov_std,
    })

results_df = pd.DataFrame(results_summary)
display(results_df)

plt.figure(figsize=(8,4))
sns.barplot(x="Dataset", y="Residual_mean", data=results_df, color="skyblue", label="Residual")
sns.barplot(x="Dataset", y="Covariance_mean", data=results_df, color="salmon", alpha=0.6, label="Covariance")
plt.title("Average anomaly scores per method")
plt.legend()
plt.show()

"""Part 3.2 -> Cross-Sensor Correlation"""

# Align lengths by truncating to min length
min_len = min(len(v) for v in residual_scores.values())

residual_df = pd.DataFrame({name: v[:min_len] for name, v in residual_scores.items()})
covariance_df = pd.DataFrame({name: v[:min_len] for name, v in covariance_scores.items()})

# Correlation matrices
res_corr = residual_df.corr()
cov_corr = covariance_df.corr()

print("\n[INFO] Cross-sensor correlation (Residual-based)")
display(res_corr.round(3))
print("\n[INFO] Cross-sensor correlation (Covariance-based)")
display(cov_corr.round(3))

# Heatmaps
fig, axes = plt.subplots(1, 2, figsize=(12, 4))
sns.heatmap(res_corr, annot=True, cmap="Blues", ax=axes[0])
axes[0].set_title("Residual-based Score Correlation")
sns.heatmap(cov_corr, annot=True, cmap="Reds", ax=axes[1])
axes[1].set_title("Covariance-based Score Correlation")
plt.tight_layout()
plt.show()



"""Part 4 -> Save"""

output_path = Path("../results")
output_path.mkdir(exist_ok=True)

for name in datasets.keys():
    df_out = pd.DataFrame({
        "residual_score": residual_scores[name],
        "mahalanobis_d2": covariance_scores[name]
    })
    df_out.to_csv(output_path / f"{name}_residual_cov.csv", index=False)
    print(f"[INFO] Saved: {output_path}/{name}_residual_cov.csv")

print("\n[INFO] Residual-based and covariance-based anomaly detection completed.")

: 