# Baseline: Classical Autoencoder

A classical benchmark for the quantum enhanced autoencoder, following the structure given in "Applying Quantum Autoencoders for Time Series Anomaly Detection", published 10/10/2024, by Robin Frehner, Kurt Stockinger.

Dataset avaliable here: https://www.cs.ucr.edu/~eamonn/time_series_data_2018/ upload it on your Google Drive to make the code work.

In [1]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
from sklearn.preprocessing import MinMaxScaler
from google.colab import drive

In [3]:
# Mount Google Drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
# Paths to your dataset
train_file_path = '/content/drive/My Drive/UCRArchive_2018/Fish/Fish_TRAIN.tsv'
test_file_path = '/content/drive/My Drive/UCRArchive_2018/Fish/Fish_TEST.tsv'

In [5]:
# Load the data
def load_data(file_path):
    data = pd.read_csv(file_path, sep='\t', header=None)
    data = data.iloc[:, 1:]  # Remove the first column (it's extra info we don't use)
    return data.values

train_data = load_data(train_file_path)
test_data = load_data(test_file_path)

# Normalize the data
scaler = MinMaxScaler()
train_data = scaler.fit_transform(train_data)
test_data = scaler.transform(test_data)


In [6]:
# Preprocess the data into sliding windows
window_size = 128
stride = 1

def create_sliding_windows(data, window_size, stride):
    windows = []
    for i in range(0, data.shape[0] - window_size + 1, stride):
        window = data[i:i + window_size]
        windows.append(window)
    return np.array(windows)

train_windows = create_sliding_windows(train_data, window_size, stride)
test_windows = create_sliding_windows(test_data, window_size, stride)

# Verify dimensions
print("Train windows shape:", train_windows.shape)
print("Test windows shape:", test_windows.shape)

Train windows shape: (48, 128, 463)
Test windows shape: (48, 128, 463)


In [12]:
# PyTorch Dataset and DataLoader
class TimeSeriesDataset(Dataset):
    def __init__(self, data):
        self.data = torch.tensor(data, dtype=torch.float32)

    def __len__(self):
        return self.data.shape[0]

    def __getitem__(self, idx):
        return self.data[idx]

train_dataset = TimeSeriesDataset(train_windows)
test_dataset = TimeSeriesDataset(test_windows)

train_loader = DataLoader(train_dataset, batch_size=150, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=150, shuffle=False)


In [8]:
class Autoencoder(nn.Module):
    def __init__(self, window_size, input_size):
        super(Autoencoder, self).__init__()
        self.encoder_lstm = nn.LSTM(input_size=input_size, hidden_size=64, batch_first=True)
        self.encoder_fc = nn.Sequential(
            nn.ReLU(),
            nn.Linear(64, 10),
            nn.ReLU(),
            nn.Linear(10, 4)
        )
        self.decoder_fc = nn.Sequential(
            nn.Linear(4, 10),
            nn.ReLU(),
            nn.Linear(10, 64),
            nn.ReLU()
        )
        self.decoder_lstm = nn.LSTM(input_size=64, hidden_size=input_size, batch_first=True)

    def forward(self, x):
        batch_size, sequence_length, feature_size = x.size()

        # Encoding
        x, _ = self.encoder_lstm(x)  # (batch_size, sequence_length, hidden_size)
        x = self.encoder_fc(x[:, -1, :])  # (batch_size, latent_dim)

        # Decoding
        x = self.decoder_fc(x)  # (batch_size, hidden_size)
        x = x.unsqueeze(1).repeat(1, sequence_length, 1)  # Repeat for sequence length
        x, _ = self.decoder_lstm(x)  # (batch_size, sequence_length, input_size)

        return x


In [22]:
# Initialize model, loss, and optimizer
input_size = train_windows.shape[2]  # 80 features per time step
model = Autoencoder(window_size=window_size, input_size=input_size)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# Training loop
def train_model(model, train_loader, criterion, optimizer, epochs=250):
    model.train()
    for epoch in range(epochs):
        epoch_loss = 0
        for batch in train_loader:
            optimizer.zero_grad()
            outputs = model(batch)
            loss = criterion(outputs, batch)
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item()
        print(f"Epoch {epoch + 1}/{epochs}, Loss: {epoch_loss / len(train_loader):.4f}")

# Train the autoencoder
train_model(model, train_loader, criterion, optimizer)

Epoch 1/250, Loss: 0.2786
Epoch 2/250, Loss: 0.2693
Epoch 3/250, Loss: 0.2585
Epoch 4/250, Loss: 0.2434
Epoch 5/250, Loss: 0.2157
Epoch 6/250, Loss: 0.1288
Epoch 7/250, Loss: 0.1298
Epoch 8/250, Loss: 0.1228
Epoch 9/250, Loss: 0.1061
Epoch 10/250, Loss: 0.0863
Epoch 11/250, Loss: 0.0697
Epoch 12/250, Loss: 0.0603
Epoch 13/250, Loss: 0.0577
Epoch 14/250, Loss: 0.0566
Epoch 15/250, Loss: 0.0543
Epoch 16/250, Loss: 0.0513
Epoch 17/250, Loss: 0.0487
Epoch 18/250, Loss: 0.0466
Epoch 19/250, Loss: 0.0448
Epoch 20/250, Loss: 0.0433
Epoch 21/250, Loss: 0.0419
Epoch 22/250, Loss: 0.0406
Epoch 23/250, Loss: 0.0395
Epoch 24/250, Loss: 0.0386
Epoch 25/250, Loss: 0.0380
Epoch 26/250, Loss: 0.0377
Epoch 27/250, Loss: 0.0376
Epoch 28/250, Loss: 0.0374
Epoch 29/250, Loss: 0.0372
Epoch 30/250, Loss: 0.0368
Epoch 31/250, Loss: 0.0365
Epoch 32/250, Loss: 0.0361
Epoch 33/250, Loss: 0.0357
Epoch 34/250, Loss: 0.0354
Epoch 35/250, Loss: 0.0351
Epoch 36/250, Loss: 0.0350
Epoch 37/250, Loss: 0.0348
Epoch 38/2

In [21]:
# Evaluate the model on training data to find a threshold
def evaluate_model_train(model, train_loader, criterion):
    model.eval()
    reconstruction_losses = []
    with torch.no_grad():
        for batch in train_loader:
            outputs = model(batch)
            loss = criterion(outputs, batch)
            reconstruction_losses.append(loss.item())
    return np.array(reconstruction_losses)

# Evaluate the model on test data
def evaluate_model_test(model, test_loader):
    model.eval()
    reconstruction_losses = []
    with torch.no_grad():
        for batch in test_loader:
            outputs = model(batch)
            loss = criterion(outputs, batch)
            reconstruction_losses.append(loss.item())
    return np.array(reconstruction_losses)


train_losses = evaluate_model_train(model, train_loader, criterion)
threshold = np.percentile(train_losses, 95) # Find the threshold on the train data

test_losses = evaluate_model_test(model, test_loader)

# Identify the single anomaly as the test instance with the highest loss
anomaly_index = np.argmax(test_losses)
anomalies = np.zeros_like(test_losses, dtype=bool)
anomalies[anomaly_index] = True

# Save results
results = pd.DataFrame({
    'Reconstruction Loss': test_losses,
    'Anomaly': anomalies
})
results.to_csv('/content/drive/My Drive/anomaly_results.csv', index=False)

print("Threshold for anomalies:", threshold)
print("Anomalies detected:", np.sum(anomalies))

Threshold for anomalies: 0.03479915112257004
Anomalies detected: 1
