# Use Autoencoder to implement anomaly detection. Build the model by using:
a. Import required libraries<br>
b. Upload / access the dataset<br>
c. Encoder converts it into latent representation<br>
d. Decoder networks convert it back to the original input<br>
e. Compile the models with Optimizer, Loss, and Evaluation Metrics<br>


In [None]:
# Install pandas (data handling library). In notebooks, it's usually better to use !pip or %pip.
pip install pandas

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 25.0.1 -> 25.3
[notice] To update, run: python.exe -m pip install --upgrade pip


In [None]:
# Install scikit-learn (machine learning utilities). In notebooks, prefer !pip or %pip.
pip install scikit-learn

Collecting scikit-learn
  Using cached scikit_learn-1.7.2-cp313-cp313-win_amd64.whl.metadata (11 kB)
Collecting scipy>=1.8.0 (from scikit-learn)
  Using cached scipy-1.16.3-cp313-cp313-win_amd64.whl.metadata (60 kB)
Collecting joblib>=1.2.0 (from scikit-learn)
  Using cached joblib-1.5.2-py3-none-any.whl.metadata (5.6 kB)
Collecting threadpoolctl>=3.1.0 (from scikit-learn)
  Using cached threadpoolctl-3.6.0-py3-none-any.whl.metadata (13 kB)
Using cached scikit_learn-1.7.2-cp313-cp313-win_amd64.whl (8.7 MB)
Using cached joblib-1.5.2-py3-none-any.whl (308 kB)
Using cached scipy-1.16.3-cp313-cp313-win_amd64.whl (38.5 MB)
Using cached threadpoolctl-3.6.0-py3-none-any.whl (18 kB)
Installing collected packages: threadpoolctl, scipy, joblib, scikit-learn
Successfully installed joblib-1.5.2 scikit-learn-1.7.2 scipy-1.16.3 threadpoolctl-3.6.0
Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 25.0.1 -> 25.3
[notice] To update, run: python.exe -m pip install --upgrade pip


In [None]:
# Import libraries and load the ECG dataset (we'll use it to learn normal vs anomaly patterns)
# Importing libraries
import numpy as np
import pandas as pd
import tensorflow as tf
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras import Model, Sequential
from tensorflow.keras.layers import Dense, Dropout
from sklearn.model_selection import train_test_split

# Define the path to the dataset. You can change this to your local file path if needed.
path = 'http://storage.googleapis.com/download.tensorflow.org/data/ecg.csv'

# Read the ECG dataset into a Pandas DataFrame
data = pd.read_csv(path, header=None)


In [None]:
# Peek at the first few rows to understand the data format
data.head()

In [None]:
# Show dataset summary: column types, counts, and missing values
data.info()


In [None]:
# Split features (X) and target (y), then create train/test sets
# - features: all columns except the last
# - target: last column (labels: normal/anomaly)
# We also pick only the anomaly rows from training to train the autoencoder on them
features = data.drop(140, axis=1)  # Features are all columns except the last (column 140)
target = data[140]  # Target is the last column (column 140)

# Split the data into training and testing sets (80% training, 20% testing)
x_train, x_test, y_train, y_test = train_test_split(
    features, target, test_size=0.2
)

# Get the indices of the training data points labeled as "1" (anomalies)
train_index = y_train[y_train == 1].index

# Select the training data points that are anomalies
train_data = x_train.loc[train_index]

In [None]:
# Scale features to 0–1 range so training is stable
# Fit scaler on training data (anomalies) and apply to both train and test
min_max_scaler = MinMaxScaler(feature_range=(0, 1))

# Scale the training data
x_train_scaled = min_max_scaler.fit_transform(train_data.copy())

# Scale the testing data using the same scaler
x_test_scaled = min_max_scaler.transform(x_test.copy())


In [None]:
# Define an Autoencoder: encoder compresses input to a small representation; decoder tries to reconstruct it
class AutoEncoder(Model):
    def __init__(self, output_units, ldim=8):
        super().__init__()
        # Encoder: gradually reduce dimensions to a latent space of size ldim
        self.encoder = Sequential([
            Dense(64, activation='relu'),
            Dropout(0.1),
            Dense(32, activation='relu'),
            Dropout(0.1),
            Dense(16, activation='relu'),
            Dropout(0.1),
            Dense(ldim, activation='relu')
        ])
        # Decoder: expand latent back to original feature size
        self.decoder = Sequential([
            Dense(16, activation='relu'),
            Dropout(0.1),
            Dense(32, activation='relu'),
            Dropout(0.1),
            Dense(64, activation='relu'),
            Dropout(0.1),
            Dense(output_units, activation='sigmoid')
        ])

    def call(self, inputs):
        # Forward pass: encode then decode
        encoded = self.encoder(inputs)
        decoded = self.decoder(encoded)
        return decoded


In [None]:
# Build and train the autoencoder
# - loss='msle': Mean Squared Log Error (good for non‑negative data, reduces effect of large errors)
# - metrics=['mse']: report Mean Squared Error during training
# - optimizer='adam': adaptive optimizer that works well by default
# We train the model to reconstruct its input; good reconstructions indicate "normal" patterns
model = AutoEncoder(output_units=x_train_scaled.shape[1])
model.compile(loss='msle', metrics=['mse'], optimizer='adam')

history = model.fit(
    x_train_scaled,  # input
    x_train_scaled,  # target is same as input (autoencoder)
    epochs=20,
    batch_size=512,
    validation_data=(x_test_scaled, x_test_scaled),
    shuffle=True
)


In [None]:
# Plot training vs validation loss to see how learning progressed
plt.plot(history.history["loss"], label="Training Loss")
plt.plot(history.history["val_loss"], label="Validation Loss")
plt.legend()

In [None]:
# Helper functions: determine a threshold and classify anomalies

def find_threshold(model, x_train_scaled):
    # Reconstruct training samples and compute reconstruction error
    recons = model.predict(x_train_scaled)
    # Use Mean Squared Log Error between reconstruction and original
    recons_error = tf.keras.metrics.msle(recons, x_train_scaled)
    # Threshold = mean error + 1 standard deviation (simple heuristic)
    threshold = np.mean(recons_error.numpy()) + np.std(recons_error.numpy())
    return threshold


def get_predictions(model, x_test_scaled, threshold):
    # Reconstruct test samples and compute errors
    predictions = model.predict(x_test_scaled)
    errors = tf.keras.losses.msle(predictions, x_test_scaled)
    # Mark as anomaly if error is above threshold
    anomaly_mask = pd.Series(errors) > threshold
    # Map anomalies to 0.0 and normal to 1.0 (matches the chosen label mapping here)
    preds = anomaly_mask.map(lambda x: 0.0 if x is True else 1.0)
    return preds

# Find the anomaly threshold on training data
threshold = find_threshold(model, x_train_scaled)
print(f"Threshold: {threshold}")

In [None]:
# Use the threshold to label test samples and measure accuracy
predictions = get_predictions(model, x_test_scaled, threshold)

# Compare predicted labels to true labels and compute accuracy
accuracy = accuracy_score(predictions, y_test)
print(f"Accuracy Score: {accuracy}")
