In [None]:
import pandas as pd
df_ff_train = pd.read_csv("tep_faultfree_training.csv")
df_ff_test = pd.read_csv("tep_faultfree_testing.csv")
df_f_train = pd.read_csv("tep_faulty_training.csv")
df_f_test = pd.read_csv("tep_faulty_testing.csv")

df_ff_train["label"] = 0
df_f_train["label"] = 1
df_ff_test["label"] = 0
df_f_test["label"] = 1

df_train_combined = pd.concat(
    [df_ff_train, df_f_train],
    axis=0,          
    ignore_index=True
)

df_test_combined = pd.concat(
    [df_ff_test, df_f_test],
    axis=0,
    ignore_index=True
)

from sklearn.preprocessing import StandardScaler
import numpy as np

def scale_df(df, scaler):

    features_scaled = scaler.transform(df.iloc[:, :55])
    labels = df.iloc[:, 55:56].to_numpy().reshape(-1, 1)
    X_scaled = np.hstack([features_scaled, labels])
    df_scaled = pd.DataFrame(X_scaled, columns=df.columns, index=df.index)

    return df_scaled

scaler = StandardScaler()
scaler.fit(df_ff_train.iloc[:, :55])
df_ff_train = scale_df(df_ff_train, scaler)
df_test_combined = scale_df(df_test_combined, scaler)
df_train_combined = scale_df(df_train_combined, scaler)


## PCA setup - same as task 1

In [None]:
from sklearn.decomposition import PCA
import matplotlib.pyplot as plt

X_train = df_ff_train.iloc[:, :55].values
y_train = df_ff_train["label"].values

X_test = df_test_combined.iloc[:, :55].values
y_test = df_test_combined["label"].values

pca3 = PCA(n_components=55)
pca3.fit(X_train)

x = np.arange(1, len(pca3.explained_variance_ratio_[0:-1]) + 2)

plt.plot(x,np.cumsum(pca3.explained_variance_ratio_),marker='o')
plt.title('Varaiance explained by PCA components')
plt.xlabel('Number of components')
plt.ylabel('Cumulative explained variance')
plt.xticks(range(1, 56, 5))
plt.yticks(np.arange(0, 1.1, 0.1))
plt.grid(True)
plt.show()

print("Explained variance by the first 10 components:")
for i in range(10):
    print(f"PC{i+1}: {pca3.explained_variance_ratio_[i]*100:.2f}%")


pca3 = PCA(n_components=21)
pca3.fit(X_train)

X_test_pca = pca3.transform(X_test)
X_train_pca = pca3.transform(X_train)

## KMeans clustering

In [None]:
from sklearn.cluster import KMeans
import matplotlib.pyplot as plt

k = 2
kmeans = KMeans(n_clusters=k, random_state=69, n_init='auto').fit(X_test_pca)
cluster_test = kmeans.predict(X_test_pca)

plt.figure(figsize=(7, 6))

plt.scatter(
    X_test_pca[:, 0],
    X_test_pca[:, 1],
    c=cluster_test,     
    cmap='viridis',
    s=5,
    alpha=0.5
)

plt.xlabel("PC1")
plt.ylabel("PC2")
plt.title("K-means clusters in PCA space (PC1 vs PC2)")
plt.colorbar(label="Cluster")
plt.grid(True)
plt.tight_layout()
plt.show()

In [None]:
import seaborn as sns
from sklearn.metrics import classification_report

cf = confusion_matrix(y_test, cluster_test)
# Plot
sns.heatmap(cf, annot=True, fmt='d', cmap='Blues')
plt.title("Confusion Matrix K-means")
plt.xlabel("Predicted Label")
plt.ylabel("True Label")
plt.show()

print(classification_report(y_test, cluster_test))

## DBSCAN

### KNN elbow

In [None]:
from sklearn.neighbors import NearestNeighbors
import matplotlib.pyplot as plt

neighbors = NearestNeighbors(n_neighbors=20)
neighbors_fit = neighbors.fit(X_test_pca)
distances, indices = neighbors_fit.kneighbors(X_test_pca)
distances = np.sort(distances[:, -1])

In [None]:
plt.figure(figsize=(7,5))
plt.plot(distances)
plt.title("k-distance graph")
plt.xlabel("Points sorted by distance")
plt.ylabel("20th nearest neighbor distance")
plt.xlim(00000, 115000)
plt.ylim(3, 25)
plt.grid(True)
plt.show()

In [None]:
from sklearn.cluster import DBSCAN
from sklearn.metrics import confusion_matrix, precision_recall_fscore_support

eps_values = [3, 3.5, 4, 7, 7.5, 8]

results = []  # store results for each eps

for eps in eps_values:
    
    db = DBSCAN(eps=eps, min_samples=20, n_jobs=1, algorithm="brute").fit(X_test_pca)
    db_labels = db.labels_

    # Convert DBSCAN labels: outlier = -1 -> treat as faulty (1)
    y_pred = np.where(db_labels == -1, 1, 0)

    # Compute confusion matrix
    tn, fp, fn, tp = confusion_matrix(y_test, y_pred).ravel()

    # Compute metrics
    precision = tp / (tp + fp + 1e-10)
    recall = tp / (tp + fn + 1e-10)
    f1 = 2 * precision * recall / (precision + recall + 1e-10)
    fnr = fn / (fn + tp + 1e-10)
    fpr = fp / (fp + tn + 1e-10)

    outliers = np.sum(db_labels == -1)
    ratio_outliers = outliers / (np.sum(y_test == 1) + 1e-10)

    results.append({
        "eps": eps,
        "precision": precision,
        "recall": recall,
        "f1": f1,
        "fnr": fnr,
        "fpr": fpr,
        "outliers": outliers,
        "outlier_ratio": ratio_outliers
    })

# Sort by the metric you care about: choose f1 here
top3 = sorted(results, key=lambda x: x["f1"], reverse=True)[:3]

In [None]:
results_df = pd.DataFrame(results)
results_df

In [None]:
eps_values = [3, 3.2, 3.4, 3.6, 3.8, 4]
min_samples_list = [10, 15, 20, 25]

results = []  # store results for each eps

for eps in eps_values:
        
    for min_samples in min_samples_list:
        db = DBSCAN(eps=eps, min_samples=min_samples, n_jobs=1, algorithm="brute").fit(X_test_pca)
        db_labels = db.labels_

        # Convert DBSCAN labels: outlier = -1 -> treat as faulty (1)
        y_pred = np.where(db_labels == -1, 1, 0)

        # Compute confusion matrix
        tn, fp, fn, tp = confusion_matrix(y_test, y_pred).ravel()

        # Compute metrics
        precision = tp / (tp + fp + 1e-10)
        recall = tp / (tp + fn + 1e-10)
        f1 = 2 * precision * recall / (precision + recall + 1e-10)
        fnr = fn / (fn + tp + 1e-10)
        fpr = fp / (fp + tn + 1e-10)

        outliers = np.sum(db_labels == -1)
        ratio_outliers = outliers / (np.sum(y_test == 1) + 1e-10)

        results.append({
            "eps": eps,
            "precision": precision,
            "recall": recall,
            "f1": f1,
            "fnr": fnr,
            "fpr": fpr,
            "outliers": outliers,
            "outlier_ratio": ratio_outliers
        })

# Sort by the metric you care about: choose f1 here
top3 = sorted(results, key=lambda x: x["f1"], reverse=True)[:3]

### PLots and stuff

In [None]:
n_outliers = np.sum(db_labels == -1)
n_clusters = len(set(db_labels)) - (1 if -1 in db_labels else 0)

print("Cluster:", n_clusters)
print("Outliers:", n_outliers)
print("Split outliers:", round(n_outliers / len(db_labels) * 100, 2), "%")

plt.figure(figsize=(7,6))
plt.scatter(
    X_test_pca[:,0],
    X_test_pca[:,1],
    c=db_labels,
    cmap="tab10",
    s=5,
    alpha=0.5
)
plt.title("DBSCAN clusters in PCA space")
plt.xlabel("PC1")
plt.ylabel("PC2")
plt.grid(True)
plt.show()

colors = np.where(db_labels == -1, "black", "red")

plt.figure(figsize=(7,6))
plt.scatter(X_test_pca[:,0], X_test_pca[:,1], c=colors, s=5, alpha=0.5)
plt.title("Outliers detected by DBSCAN")
plt.xlabel("PC1")
plt.ylabel("PC2")
plt.grid(True)
plt.show()

# true labels: 0 = normal, 1 = fault
faults = df_test_combined["label"] == 1
normals = df_test_combined["label"] == 0
outliers = df_test_combined["dbscan"] == -1

fault_detection_rate = np.mean(outliers[faults]) * 100
false_alarm_rate = np.mean(outliers[normals]) * 100

print("Fault detection rate:", round(fault_detection_rate, 2), "%")
print("False alarm rate:", round(false_alarm_rate, 2), "%")

pred = np.where(db_labels == -1, 1, 0)

cf= confusion_matrix(y_test, pred)
sns.heatmap(cf, annot=True, fmt='d', cmap='Blues')
plt.title("Confusion Matrix DBSCAN")
plt.xlabel("Predicted Label")
plt.ylabel("True Label")
plt.show()

print(classification_report(y_test, pred))

colors = np.where(db_labels == -1, "black", np.where(y_test==1, "red", "blue"))

plt.figure(figsize=(7,6))
plt.scatter(X_test_pca[:,0], X_test_pca[:,1], c=colors, s=5, alpha=0.5)
plt.title("DBSCAN Outlier Detection vs True Labels")
plt.xlabel("PC1")
plt.ylabel("PC2")
plt.grid(True)
plt.show()

## HDBSCAN

In [None]:
from sklearn.cluster import HDBSCAN
hdb = HDBSCAN(
            min_cluster_size=50,      
            min_samples=20,           
            cluster_selection_epsilon=0.0
)
hdb_labels = hdb.fit_predict(X_test_pca)   # -1 = outlier
df_test_combined["hdbscan"] = hdb_labels

In [None]:
outliers = df_test_combined["hdbscan"] == -1
faults   = df_test_combined["label"] == 1

print("Faults detected as outliers:", 
      np.mean(outliers[faults]) * 100, "%")

print("Normal detected as outliers:",
      np.mean(outliers[~faults]) * 100, "%")


plt.figure(figsize=(7,6))
plt.scatter(
      X_test_pca[:,0], X_test_pca[:,1],
      c=df_test_combined["hdbscan"],
      cmap='tab10', s=5, alpha=0.5
)
plt.title("HDBSCAN clustering in PCA space")
plt.xlabel("PC1"); plt.ylabel("PC2")
plt.grid(True)
plt.show()

labels = df_test_combined["hdbscan"]
colors = np.where(labels == -1, "black","red")

plt.figure(figsize=(7,6))
plt.scatter(X_test_pca[:,0], X_test_pca[:,1], c=colors, s=5, alpha=0.5)
plt.title("Outliers detected by HDBSCAN")
plt.xlabel("PC1")
plt.ylabel("PC2")
plt.grid(True)
plt.show()

