In [1]:
# 1. Imports and Setup
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import platform
import tensorflow as tf
from scipy.signal import butter, filtfilt

from sklearn.model_selection import train_test_split
from tensorflow.keras import layers
from tensorflow.keras.models import Model
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.losses import MeanSquaredError, MeanAbsoluteError, Huber, CosineSimilarity
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
from keras.saving import register_keras_serializable

In [2]:
def normalize(data):
    min = np.min(data)
    max = np.max(data)
    return (data-min)/(max-min)

In [3]:
print(f"GPU:{tf.config.list_physical_devices('GPU')}, Platform:{platform.platform()}, Processor: {platform.machine()}")

GPU:[], Platform:Windows-11-10.0.26100-SP0, Processor: AMD64


In [4]:
# 2. Utility Functions
def predict(model, data):
    predictions = model.predict(data)
    loss = np.mean(np.abs(data - predictions), axis=1)
    return predictions, loss

def plot_examples(model, data, ax, title):
    pred, loss = predict(model, data)
    ax.plot(data.flatten(), label="Actual")
    ax.plot(pred[0], label="Predicted")
    ax.fill_between(range(1, 189), data.flatten(), pred[0], alpha=0.3, color="r")
    ax.legend(shadow=True, frameon=True, facecolor="inherit", loc=1, fontsize=7)
    ax.set_title(f"{title} (loss: {loss[0]:.3f})", fontsize=9.5)

def get_reconstruction_error(model, data):
    predictions = model.predict(data)
    return np.mean(np.abs(data - predictions), axis=1)

def classify_errors(errors, threshold):
    return np.array(errors > threshold, dtype=int)

In [13]:
# 3. Data Loading and Pre-processing
normal_df = pd.read_csv("ptbdb_normal.csv", sep=',').drop("target", axis=1, errors="ignore")
anomaly_df = pd.read_csv("ptbdb_abnormal.csv", sep=',').drop("target", axis=1, errors="ignore")

normal = normal_df.to_numpy()
anomaly = anomaly_df.to_numpy()
X_train, X_test = train_test_split(normal, test_size=0.15, random_state=45, shuffle=True)


In [None]:
test = pd.read_csv("data.csv").to_numpy()
test = test.transpose()
test.shape

In [None]:
t1 = pd.read_csv("new_data.csv").to_numpy().transpose()
# plt.plot(t1[0])
t2 = (t1[0]*3.3)/4095
# plt.plot(t2)
signals = [t1[0],t2]
fig, axes = plt.subplots(1, 2, figsize=(10,5))

# Plot each signal in the grid
for i, ax in enumerate(axes.flat):
    ax.plot(signals[i])
    ax.set_title(f'Signal {i + 1}')
    ax.set_xlabel('Time')
    ax.set_ylabel('Amplitude')

# Adjust layout to avoid overlap
plt.tight_layout()

# Show the grid of plots
plt.show()

In [None]:
def butter_lowpass_filter(data, cutoff, fs, order=4):
    nyquist = 0.5 * fs  # Nyquist frequency
    normal_cutoff = cutoff / nyquist  # Normalize the cutoff frequency
    b, a = butter(order, normal_cutoff, btype='low', analog=False)
    y = filtfilt(b, a, data)
    return y

In [None]:
fil = butter_lowpass_filter(t1[0],20,125)

In [None]:
fil_t2 = butter_lowpass_filter(t2,20,125)
plt.plot(fil_t2)

In [None]:
fig, axes = plt.subplots(1, 3, figsize=(12,6))

# Plot on each axis
sns.histplot(t2, kde=True, ax=axes[0])
sns.histplot(fil_t2, kde=True, ax=axes[1])
sns.histplot(normalize(fil_t2), kde=True, ax=axes[2])
plt.tight_layout()

# Show the grid of plots
plt.show()

In [None]:
t2_norm = normalize(t2)
sns.histplot(t2_norm,kde=True)
plt.show()

In [None]:
fig, axes = plt.subplots(2, 2, figsize=(10, 8))

# Plot on each axis
sns.histplot(normal[0], kde=True, ax=axes[0, 0])
sns.histplot(normal[1], kde=True, ax=axes[0, 1])
sns.histplot(normal[2], kde=True, ax=axes[1, 0])
sns.histplot(normal[3], kde=True, ax=axes[1, 1])
plt.tight_layout()

# Show the grid of plots
plt.show()

In [None]:
signals_to_plot = anomaly[21:30]
fig, axes = plt.subplots(2, 5, figsize=(20, 10))

# Plot each signal in the grid
for i, ax in enumerate(axes.flat):
    ax.plot(signals_to_plot[i])
    ax.set_title(f'Signal {i + 1}')
    ax.set_xlabel('Time')
    ax.set_ylabel('Amplitude')

# Adjust layout to avoid overlap
plt.tight_layout()

# Show the grid of plots
plt.show()

In [8]:
@register_keras_serializable()
class AutoEncoder(Model):
    def __init__(self, input_dim, latent_dim):
        super(AutoEncoder, self).__init__() #inheritance
        self.input_dim = input_dim
        self.latent_dim = latent_dim

        self.encoder = tf.keras.Sequential([
            layers.Input(shape=(input_dim,)),
            layers.Reshape((input_dim, 1)),  # Reshape to 3D for Conv1D
            layers.Conv1D(32, 3, strides=1, activation='relu', padding="same"),
            layers.BatchNormalization(),
            layers.MaxPooling1D(2, padding="same"),
            layers.Conv1D(latent_dim, 3, strides=1, activation='relu', padding="same"),
            layers.BatchNormalization(),
            layers.MaxPooling1D(2, padding="same"),
        ])

        self.decoder = tf.keras.Sequential([
            layers.Conv1D(latent_dim, 3, strides=1, activation='relu', padding="same"),
            layers.UpSampling1D(2),
            layers.BatchNormalization(),
            layers.Conv1D(32, 3, strides=1, activation='relu', padding="same"),
            layers.UpSampling1D(2),
            layers.BatchNormalization(),
            layers.Flatten(),
            layers.Dense(input_dim)
        ])

    def call(self, X):
        encoded = self.encoder(X)
        decoded = self.decoder(encoded)
        return decoded


In [14]:
input_dim = X_train.shape[-1]
latent_dim = 16
epochs = 100
batch_size = 64
early_stopping = EarlyStopping(patience=10, min_delta=1e-3, monitor="val_loss", restore_best_weights=True)
loss_functions = {'MAE': MeanAbsoluteError(), 'MSE': MeanSquaredError(), 'Huber': Huber(), 'CosineSimilarity': CosineSimilarity()}
best_loss = float('inf')
best_model = None
best_loss_function = None


In [15]:
input_dim

188

In [14]:
# Ground truth for validation
y_true_normal = np.zeros(len(X_test))
y_true_anomaly = np.ones(len(anomaly))
y_test = np.concatenate([y_true_normal, y_true_anomaly])
X_combined_test = np.concatenate([X_test, anomaly])


In [16]:
m1 = AutoEncoder(input_dim, latent_dim)
# m1.build((None, input_dim))
m1.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.01), loss=Huber())

In [17]:
m1.encoder.summary()
m1.decoder.summary()

In [None]:
m1.fit(X_train, X_train, epochs=epochs, batch_size=batch_size,
              validation_split=0.1, callbacks=[early_stopping], verbose=0)

In [None]:
train_errors = get_reconstruction_error(m1, X_train)
threshold = np.percentile(train_errors, 95)  # 95th percentile as threshold
val_errors = get_reconstruction_error(m1, X_combined_test)
y_pred_val = np.array(val_errors > threshold, dtype=int)
accuracy = accuracy_score(y_test, y_pred_val)
print(f"Validation Accuracy using : {accuracy:.2%}")

In [None]:
testing_array = np.append(fil_t2,[0 for i in range(89)])
ts_norm = normalize(testing_array)
sns.histplot(ts_norm,kde=True)
plt.show()

In [None]:
ts_norm = ts_norm.reshape((1,188))

In [None]:
t2_norm.shape

In [None]:
sns.histplot(np.append(t2_norm,[0 for i in range(89)]),kde=1)

In [None]:
error = get_reconstruction_error(m1,np.append(t2_norm,[0 for i in range(89)]).reshape((1,188)))
print(error)

In [None]:
threshold = np.percentile(get_reconstruction_error(m1, X_train), 95)
print(threshold)
pred = classify_errors(error,threshold)
print(pred)

In [None]:
data = pd.read_csv("samples.csv",header=None).to_numpy().transpose().reshape((8,100))

In [None]:
data[0].shape

In [None]:
np.append(normalize(data[0]),[0 for i in range(88)]).reshape((1,188)).shape

In [None]:
for i in range(8):
    print(get_reconstruction_error(m1,np.append(normalize(data[i]),[0 for i in range(88)]).reshape((1,188))))

In [None]:
# Train and evaluate models for each loss function
for name, loss_function in loss_functions.items():
    # Define and compile the model
    model = AutoEncoder(input_dim, latent_dim)
    # model.build((None, input_dim))
    # Check if the machine is using an M1/M2 processor
    # if "macOS-13.5.2-arm64-arm-64bit" in platform.platform() and "arm64" in platform.machine():
    #     model.compile(optimizer=tf.keras.optimizers.legacy.Adam(learning_rate=0.01), loss=loss_function)
    #     print("Using legacy optimizer for M1/M2 Mac.")
    # elif tf.config.list_physical_devices('GPU'):  # Check if there are any GPUs available
    #     model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.01), loss=loss_function)
    #     print("Using standard optimizer for GPU.")
    # else:
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.01), loss=loss_function)
    # print("Using standard optimizer for conventional CPU.")


    # Train the model
    model.fit(X_train, X_train, epochs=epochs, batch_size=batch_size,
              validation_split=0.1, callbacks=[early_stopping], verbose=0)

    # Calculate the threshold based on the training data
    train_errors = get_reconstruction_error(model, X_train)
    threshold = np.percentile(train_errors, 95)  # 95th percentile as threshold

    # Evaluate the model on combined validation data (normal + anomaly) using the calculated threshold
    val_errors = get_reconstruction_error(model, X_combined_test)
    y_pred_val = np.array(val_errors > threshold, dtype=int)

    accuracy = accuracy_score(y_test, y_pred_val)

    print(f"Validation Accuracy using {name}: {accuracy:.2%}")
    print(f"Validation Reconstruction Error using {name}: {np.mean(val_errors)}")

    # After training, visualize the reconstructions for this specific model
    # fig, axes = plt.subplots(2, 5, sharey=True, sharex=True, figsize=(12, 6))
    # random_indexes = np.random.randint(0, len(X_train), size=5)
    # for i, idx in enumerate(random_indexes):
    #     data = X_train[[idx]]
    #     plot_examples(model, data, ax=axes[0, i], title="Normal")
    # for i, idx in enumerate(random_indexes):
    #     data = anomaly[[idx]]
    #     plot_examples(model, data, ax=axes[1, i], title="Anomaly")
    # plt.tight_layout()
    # fig.suptitle(f"Sample plots for {name} loss (Actual vs Reconstructed by the CNN autoencoder)", y=1.04, weight="bold")
    # plt.show()

    # Check if current model is the best
    if np.mean(val_errors) < best_loss:
        best_loss = np.mean(val_errors)
        best_model = model
        best_loss_function = name

print(f"Best Model uses {best_loss_function} with average validation error: {best_loss}")

In [None]:
# 6. Evaluation
threshold = np.percentile(get_reconstruction_error(best_model, X_train), 95)
normal_errors = get_reconstruction_error(best_model, X_test)
anomaly_errors = get_reconstruction_error(best_model, anomaly)
y_pred_normal = classify_errors(normal_errors, threshold)
y_pred_anomaly = classify_errors(anomaly_errors, threshold)
y_true_normal = np.zeros_like(y_pred_normal)
y_true_anomaly = np.ones_like(y_pred_anomaly)
y_pred = np.concatenate([y_pred_normal, y_pred_anomaly])
y_true = np.concatenate([y_true_normal, y_true_anomaly])
accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred)
recall = recall_score(y_true, y_pred)
f1 = f1_score(y_true, y_pred)
print(f"Threshold: {threshold}")
print(f"Accuracy: {accuracy * 100:.2f}%")
print(f"Precision: {precision * 100:.2f}%")
print(f"Recall: {recall * 100:.2f}%")
print(f"F1 Score: {f1 * 100:.2f}%")

In [None]:
data[0].shape

In [None]:
for i in range(8):
    print(get_reconstruction_error(best_model,np.append(normalize(data[i]),[0 for i in range(88)]).reshape((1,188))))

In [None]:
conf_matrix = confusion_matrix(y_true, y_pred)
plt.figure(figsize=(8,6))
sns.heatmap(conf_matrix, annot=True, fmt="d", cmap="Blues", xticklabels=['Normal', 'Anomaly'], yticklabels=['Normal', 'Anomaly'])
plt.ylabel('Actual Class')
plt.xlabel('Predicted Class')