In [26]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_recall_fscore_support

In [27]:
# Step 1: Generate Synthetic Data
np.random.seed(42)
n_samples = 1000  # Number of normal samples
n_features = 10   # Number of features
n_anomalies = 50  # Number of anomalies

# Normal data (good data)
normal_data = np.random.normal(0, 1, (n_samples, n_features))

# Anomalies (outliers)
anomalies = np.random.uniform(-10, 10, (n_anomalies, n_features))

# Combine data
data = np.vstack([normal_data, anomalies])
labels = np.array([0] * n_samples + [1] * n_anomalies)  # 0 = normal, 1 = anomaly

# Step 2: Split Data into Training and Validation Sets
# Training set contains only normal data
X_train, X_val_normal = train_test_split(normal_data, test_size=0.2, random_state=42)

# Validation set contains both normal and anomalous data
X_val = np.vstack([X_val_normal, anomalies])
y_val = np.array([0] * len(X_val_normal) + [1] * len(anomalies))

# Step 3: Normalize the Data
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_val = scaler.transform(X_val)

# Convert to PyTorch tensors
X_train = torch.tensor(X_train, dtype=torch.float32)
X_val = torch.tensor(X_val, dtype=torch.float32)


In [28]:
# Step 4: Define the Autoencoder Model
class Autoencoder(nn.Module):
    def __init__(self, input_dim, hidden_dim):
        super(Autoencoder, self).__init__()
        # Encoder
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim // 2),
            nn.ReLU()
        )
        # Decoder
        self.decoder = nn.Sequential(
            nn.Linear(hidden_dim // 2, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, input_dim)
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

# Initialize the model
input_dim = X_train.shape[1]
hidden_dim = 8
autoencoder = Autoencoder(input_dim, hidden_dim)

# Define loss function and optimizer
criterion = nn.MSELoss()
optimizer = optim.Adam(autoencoder.parameters(), lr=0.001)

In [29]:
# Step 5: Train the Autoencoder
num_epochs = 50
batch_size = 32
train_loader = torch.utils.data.DataLoader(X_train, batch_size=batch_size, shuffle=True)

for epoch in range(num_epochs):
    autoencoder.train()
    train_loss = 0
    for batch in train_loader:
        optimizer.zero_grad()
        output = autoencoder(batch)
        loss = criterion(output, batch)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {train_loss/len(train_loader):.4f}')

Epoch [1/50], Loss: 1.0309
Epoch [2/50], Loss: 1.0155
Epoch [3/50], Loss: 1.0023
Epoch [4/50], Loss: 0.9876
Epoch [5/50], Loss: 0.9715
Epoch [6/50], Loss: 0.9543
Epoch [7/50], Loss: 0.9347
Epoch [8/50], Loss: 0.9137
Epoch [9/50], Loss: 0.8922
Epoch [10/50], Loss: 0.8716
Epoch [11/50], Loss: 0.8517
Epoch [12/50], Loss: 0.8316
Epoch [13/50], Loss: 0.8126
Epoch [14/50], Loss: 0.7971
Epoch [15/50], Loss: 0.7846
Epoch [16/50], Loss: 0.7737
Epoch [17/50], Loss: 0.7633
Epoch [18/50], Loss: 0.7538
Epoch [19/50], Loss: 0.7460
Epoch [20/50], Loss: 0.7388
Epoch [21/50], Loss: 0.7331
Epoch [22/50], Loss: 0.7280
Epoch [23/50], Loss: 0.7242
Epoch [24/50], Loss: 0.7210
Epoch [25/50], Loss: 0.7189
Epoch [26/50], Loss: 0.7160
Epoch [27/50], Loss: 0.7141
Epoch [28/50], Loss: 0.7127
Epoch [29/50], Loss: 0.7116
Epoch [30/50], Loss: 0.7097
Epoch [31/50], Loss: 0.7088
Epoch [32/50], Loss: 0.7075
Epoch [33/50], Loss: 0.7067
Epoch [34/50], Loss: 0.7056
Epoch [35/50], Loss: 0.7049
Epoch [36/50], Loss: 0.7039
E

In [30]:
# Step 6: Detect Anomalies on the Validation Set
autoencoder.eval()
with torch.no_grad():
    val_output = autoencoder(X_val)
    reconstruction_error = torch.mean((X_val - val_output) ** 2, dim=1).numpy()

# Set a threshold for anomaly detection (e.g., 95th percentile of reconstruction error)
threshold = np.percentile(reconstruction_error, 95)

# Detect anomalies
anomalies_detected = reconstruction_error > threshold

# Evaluate performance
precision, recall, f1, _ = precision_recall_fscore_support(y_val, anomalies_detected, average='binary')
print(f'Precision: {precision:.2f}, Recall: {recall:.2f}, F1-Score: {f1:.2f}')

Precision: 1.00, Recall: 0.26, F1-Score: 0.41


In [None]:
# Step 7: Visualize Results
import matplotlib.pyplot as plt
plt.figure(figsize=(10, 6))
plt.scatter(range(len(y_val)), reconstruction_error, c=y_val, cmap='coolwarm', label='True Labels')
plt.axhline(threshold, color='black', linestyle='--', label='Threshold')
plt.xlabel('Data Points')
plt.ylabel('Reconstruction Error')
plt.title('Anomaly Detection')
plt.legend()
plt.show()