In [2]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import numpy as np
import pandas as pd 
from matplotlib import pyplot as plt
from tqdm import tqdm
from sklearn.model_selection import train_test_split
import seaborn as sns

In [3]:
# Directory containing your CSV files
data_dir = 'C:\\Users\\artur\\Desktop\\Studia\\Projekt inzynierski\\data'

# Read CSV files and stack data
def load_eeg_data(data_dir):
    all_data = []
    for filename in os.listdir(data_dir):
        if filename.endswith('.csv'):
            filepath = os.path.join(data_dir, filename)
            data = pd.read_csv(filepath)
            all_data.append(data.values)
    
    # Stack all data samples into a single array
    data_array = np.stack(all_data, axis=0)  # Shape: (N, T, C)
    return data_array

In [4]:
data_array = load_eeg_data(data_dir)

# Check the shape of the data
print(f"Data shape: {data_array.shape}")

# Normalize the data
data_array = data_array.astype(np.float32)
data_array = (data_array - np.min(data_array)) / (np.max(data_array) - np.min(data_array))  # Normalize to [0, 1]

# Ensure the data is in the format (N, C, T, 1)
N, T, C = data_array.shape
data = data_array.transpose(0, 2, 1)[:, :, :, np.newaxis]  # Shape: (N, C, T, 1)

train_data, test_data = train_test_split(data, test_size=0.15, random_state=42)

Data shape: (36, 30999, 19)


In [5]:
class EEGDataset(Dataset):
    def __init__(self, data):
        self.data = data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        x = torch.tensor(self.data[idx], dtype=torch.float32)
        return x, x  # In autoencoder, target is the same as input

# Create datasets and dataloaders for training and testing
train_dataset = EEGDataset(train_data)
train_dataloader = DataLoader(train_dataset, batch_size=16, shuffle=True)

test_dataset = EEGDataset(test_data)
test_dataloader = DataLoader(test_dataset, batch_size=16, shuffle=False)  # No shuffle for test set

In [6]:
class SimpleAutoencoder(nn.Module):
    def __init__(self, in_channels):
        super(SimpleAutoencoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Conv2d(in_channels, 16, kernel_size=(1, 5), padding=(0, 2)),  # Output: (N, 16, T, C)
            nn.ReLU(),
            nn.Conv2d(16, 8, kernel_size=(1, 5), padding=(0, 2)),  # Output: (N, 8, T, C)
            nn.ReLU()
        )
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(8, 16, kernel_size=(1, 5), padding=(0, 2)),  # Output: (N, 16, T, C)
            nn.ReLU(),
            nn.ConvTranspose2d(16, in_channels, kernel_size=(1, 5), padding=(0, 2)),  # Output: (N, in_channels, T, C)
            nn.Sigmoid()  # Sigmoid to ensure output is in range [0, 1]
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

Define the model

In [7]:
in_channels = data.shape[1]
model = SimpleAutoencoder(in_channels=in_channels)
criterion = nn.MSELoss()  # Mean Squared Error Loss for reconstruction
optimizer = optim.Adam(model.parameters(), lr=0.001)

Training loop

In [8]:
num_epochs = 20
for epoch in range(num_epochs):
    model.train()
    epoch_loss = 0
    progress_bar = tqdm(train_dataloader, desc=f'Epoch {epoch+1}/{num_epochs}', unit='batch')
    for x_batch, _ in progress_bar:
        optimizer.zero_grad()
        outputs = model(x_batch)
        loss = criterion(outputs, x_batch)
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()
        
        progress_bar.set_postfix(loss=loss.item())

    print(f'Epoch {epoch+1}/{num_epochs}, Average Loss: {epoch_loss/len(train_dataloader)}')

Epoch 1/20: 100%|██████████| 2/2 [00:02<00:00,  1.08s/batch, loss=0.0578]


Epoch 1/20, Average Loss: 0.05789574049413204


Epoch 2/20: 100%|██████████| 2/2 [00:02<00:00,  1.03s/batch, loss=0.0575]


Epoch 2/20, Average Loss: 0.057579297572374344


Epoch 3/20: 100%|██████████| 2/2 [00:01<00:00,  1.02batch/s, loss=0.0572]


Epoch 3/20, Average Loss: 0.05725984647870064


Epoch 4/20: 100%|██████████| 2/2 [00:02<00:00,  1.03s/batch, loss=0.0569]


Epoch 4/20, Average Loss: 0.05693719908595085


Epoch 5/20: 100%|██████████| 2/2 [00:01<00:00,  1.03batch/s, loss=0.0565]


Epoch 5/20, Average Loss: 0.05660444684326649


Epoch 6/20: 100%|██████████| 2/2 [00:02<00:00,  1.02s/batch, loss=0.0562]


Epoch 6/20, Average Loss: 0.056268395856022835


Epoch 7/20: 100%|██████████| 2/2 [00:02<00:00,  1.01s/batch, loss=0.0559]


Epoch 7/20, Average Loss: 0.05592348240315914


Epoch 8/20: 100%|██████████| 2/2 [00:02<00:00,  1.01s/batch, loss=0.0555]


Epoch 8/20, Average Loss: 0.05556577444076538


Epoch 9/20: 100%|██████████| 2/2 [00:02<00:00,  1.03s/batch, loss=0.0551]


Epoch 9/20, Average Loss: 0.055197011679410934


Epoch 10/20: 100%|██████████| 2/2 [00:02<00:00,  1.01s/batch, loss=0.0547]


Epoch 10/20, Average Loss: 0.05481266602873802


Epoch 11/20: 100%|██████████| 2/2 [00:02<00:00,  1.01s/batch, loss=0.0543]


Epoch 11/20, Average Loss: 0.05441712774336338


Epoch 12/20: 100%|██████████| 2/2 [00:02<00:00,  1.01s/batch, loss=0.0539]


Epoch 12/20, Average Loss: 0.054003043100237846


Epoch 13/20: 100%|██████████| 2/2 [00:02<00:00,  1.01s/batch, loss=0.0534]


Epoch 13/20, Average Loss: 0.053564876317977905


Epoch 14/20: 100%|██████████| 2/2 [00:01<00:00,  1.01batch/s, loss=0.053] 


Epoch 14/20, Average Loss: 0.053102822974324226


Epoch 15/20: 100%|██████████| 2/2 [00:02<00:00,  1.03s/batch, loss=0.0525]


Epoch 15/20, Average Loss: 0.0526100005954504


Epoch 16/20: 100%|██████████| 2/2 [00:02<00:00,  1.01s/batch, loss=0.0519]


Epoch 16/20, Average Loss: 0.052089499309659004


Epoch 17/20: 100%|██████████| 2/2 [00:02<00:00,  1.05s/batch, loss=0.0514]


Epoch 17/20, Average Loss: 0.051534635946154594


Epoch 18/20: 100%|██████████| 2/2 [00:02<00:00,  1.04s/batch, loss=0.0508]


Epoch 18/20, Average Loss: 0.050935545936226845


Epoch 19/20: 100%|██████████| 2/2 [00:02<00:00,  1.00s/batch, loss=0.0501]


Epoch 19/20, Average Loss: 0.05029614642262459


Epoch 20/20: 100%|██████████| 2/2 [00:02<00:00,  1.00s/batch, loss=0.0494]

Epoch 20/20, Average Loss: 0.04960709437727928





In [9]:
# Assuming you have a trained model and a test DataLoader
def calculate_reconstruction_error(model, dataloader):
    model.eval()  # Set the model to evaluation mode
    reconstruction_errors = []

    with torch.no_grad():  # Disable gradient calculation
        for inputs, _ in dataloader:  # No need for labels
            reconstructed = model(inputs)  # Get reconstructed output
            
            # Calculate the reconstruction error (Mean Squared Error)
            mse = torch.mean((inputs - reconstructed) ** 2, dim=[1, 2, 3])  # Mean over all dimensions
            reconstruction_errors.extend(mse.cpu().numpy())  # Store errors

    return np.array(reconstruction_errors)

# Call the function and get reconstruction errors
reconstruction_errors = calculate_reconstruction_error(model, test_dataloader)

# Set a threshold for anomaly detection (this can be tuned)
threshold = np.percentile(reconstruction_errors, 95)  # E.g., 95th percentile

# Identify anomalies
anomalies = reconstruction_errors > threshold

# Print some information
print(f"Threshold for anomalies: {threshold:.4f}")
print(f"Number of anomalies detected: {np.sum(anomalies)}")

Threshold for anomalies: 0.0490
Number of anomalies detected: 1
