In [2]:
!pip install pytorch_lightning

Collecting pytorch_lightning
  Downloading pytorch_lightning-2.2.4-py3-none-any.whl (802 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m802.2/802.2 kB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
Collecting torchmetrics>=0.7.0 (from pytorch_lightning)
  Downloading torchmetrics-1.4.0-py3-none-any.whl (868 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m868.8/868.8 kB[0m [31m8.1 MB/s[0m eta [36m0:00:00[0m
Collecting lightning-utilities>=0.8.0 (from pytorch_lightning)
  Downloading lightning_utilities-0.11.2-py3-none-any.whl (26 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.1.105 (from torch>=1.13.0->pytorch_lightning)
  Using cached nvidia_cuda_nvrtc_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (23.7 MB)
Collecting nvidia-cuda-runtime-cu12==12.1.105 (from torch>=1.13.0->pytorch_lightning)
  Using cached nvidia_cuda_runtime_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (823 kB)
Collecting nvidia-cuda-cupti-cu12==12.1.105 (from torch>=1.13.0->pytorch

In [3]:
import random
import sys
from typing import Literal

import pytorch_lightning as pl
import torch
import torch.nn as nn
import torch.optim as optim
from pytorch_lightning.callbacks import ModelCheckpoint
from pytorch_lightning.loggers import TensorBoardLogger
from torch.utils.data import DataLoader, Dataset
from torchmetrics import Accuracy
import numpy as np
import pandas as pd
from pathlib import Path
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np

def read_file(file):
    df = pd.read_csv(file)
    df = df.rename(columns={"trial number": "trial_number",
                                "sample num": "sample_num",
                                "sensor value": "sensor_value",
                                "sensor position": "sensor_position",
                                "subject identifier": "subject_identifier",
                                "matching condition": "matching_condition"})
    #check data
    assert len(df.trial_number.unique()) == 1
    trial_number = int(df.trial_number.unique()[0])
    assert len(df.sample_num.unique()) == 256
    sensor_positions = df.sensor_position.unique()
    assert len(df.subject_identifier.unique()) == 1
    subject = df.subject_identifier.unique()[0]
    assert len(df.channel.unique()) == 64
    assert len(df.name.unique()) == 1
    name = df.name.unique()[0]
    matching_condition = list(df.matching_condition.unique())
    ts = df.pivot(index="sample_num", columns="sensor_position", values="sensor_value")
    subject_class = 1 if subject == "a" else 0
    return {
        "trial_number": trial_number,
        "sensor_positions": sensor_positions,
        "name": name,
        "time_series": ts,
        "matching_condition": matching_condition,
        "subject_identifier": subject,
        "class": subject_class
    }

In [4]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [5]:
train_files = Path(r'/content/drive/MyDrive/robotyka_4/SMNI_CMI_TRAIN').glob('**/*')
test_files = Path(r'/content/drive/MyDrive/robotyka_4/SMNI_CMI_TEST').glob('**/*')
train_files = [x for x in train_files if x.is_file()]
test_files = [x for x in test_files if x.is_file()]
train_data = dict()
test_data = dict()

for f in train_files:
    train_data[f] = read_file(f)

for f in test_files:
    test_data[f] = read_file(f)

In [6]:
first_key = next(iter(train_data.keys()))
train_data[first_key]['time_series'].columns

Index(['AF1', 'AF2', 'AF7', 'AF8', 'AFZ', 'C1', 'C2', 'C3', 'C4', 'C5', 'C6',
       'CP1', 'CP2', 'CP3', 'CP4', 'CP5', 'CP6', 'CPZ', 'CZ', 'F1', 'F2', 'F3',
       'F4', 'F5', 'F6', 'F7', 'F8', 'FC1', 'FC2', 'FC3', 'FC4', 'FC5', 'FC6',
       'FCZ', 'FP1', 'FP2', 'FPZ', 'FT7', 'FT8', 'FZ', 'O1', 'O2', 'OZ', 'P1',
       'P2', 'P3', 'P4', 'P5', 'P6', 'P7', 'P8', 'PO1', 'PO2', 'PO7', 'PO8',
       'POZ', 'PZ', 'T7', 'T8', 'TP7', 'TP8', 'X', 'Y', 'nd'],
      dtype='object', name='sensor_position')

In [7]:
train_sequences = [(train_data[key]['time_series'].to_numpy(), train_data[key]['class']) for key in train_data]
test_sequences = [(test_data[key]['time_series'].to_numpy(), test_data[key]['class']) for key in test_data]

In [8]:
random.shuffle(test_sequences)
val_sequences = test_sequences[:100]
test_sequences = test_sequences[100:]

In [12]:
print(train_sequences[0][0].shape)
print(test_sequences[0][0].shape)
print(val_sequences[0][0].shape)

(256, 64)
(256, 64)
(256, 64)


In [13]:
class Custom_Dataset(Dataset):
    def __init__(self, sequences):
        self.sequences = sequences

    def __len__(self):
        return len(self.sequences)

    def __getitem__(self, idx):
        sequence, label = self.sequences[idx]
        return dict(
            sequence=torch.Tensor(sequence),
            label=torch.tensor(label).long()
        )

class Custom_data_module(pl.LightningDataModule):
    def __init__(self, train_sequences, val_sequences, test_sequences, batch_size):
        super().__init__()
        self.train_sequences = train_sequences
        self.val_sequences = val_sequences
        self.test_sequences = test_sequences
        self.batch_size = batch_size

    def setup(self, stage=None):
        self.train_dataset = Custom_Dataset(self.train_sequences)
        self.val_dataset = Custom_Dataset(self.val_sequences)
        self.test_dataset = Custom_Dataset(self.test_sequences)

    def train_dataloader(self):
        return DataLoader(
            self.train_dataset,
            batch_size=self.batch_size,
            shuffle=True,
            num_workers=6
        )

    def val_dataloader(self):
        return DataLoader(
            self.val_dataset,
            batch_size=self.batch_size,
            shuffle=False,
            num_workers=6
        )

    def test_dataloader(self):
        return DataLoader(
            self.test_dataset,
            batch_size=self.batch_size,
            shuffle=False,
            num_workers=6
        )

In [15]:
class LSTM_Model(nn.Module):
    def __init__(self, n_features: int = 64, n_hidden: int = 256, n_layers: int = 3, dropout: float = 0.3):
        super().__init__()

        self.rnn = nn.LSTM(
            input_size=n_features,
            hidden_size=n_hidden,
            num_layers=n_layers,
            batch_first=True,
            bidirectional=True,
            dropout=dropout
        )
        self.classifier = nn.Linear(2 * n_hidden, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        self.rnn.to(x.device)
        self.classifier.to(x.device)

        out, _ = self.rnn(x)
        out = out.max(-2).values
        out = self.classifier(out)
        out = self.sigmoid(out)
        return out

In [24]:
import torch
import torch.nn as nn

class LSTM_Model(nn.Module):
    def __init__(self, n_features: int = 64, n_hidden: int = 256, n_layers: int = 3, dropout: float = 0.3):
        super().__init__()
        if n_layers > 1:
            self.rnn = nn.LSTM(
                input_size=n_features,
                hidden_size=n_hidden,
                num_layers=n_layers,
                batch_first=True,
                bidirectional=True,
                dropout=dropout
            )
        else:
            self.rnn = nn.LSTM(
                input_size=n_features,
                hidden_size=n_hidden,
                num_layers=n_layers,
                batch_first=True,
                bidirectional=True
            )
        self.classifier = nn.Linear(2 * n_hidden, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        out, _ = self.rnn(x)
        out = out.max(-2).values
        out = self.classifier(out)
        out = self.sigmoid(out)
        return out

In [43]:
class LSTM_Predictor(pl.LightningModule):
    def __init__(self, n_features, *args, **kwargs):
        super().__init__()
        self.model = LSTM_Model(n_features, *args, **kwargs)
        self.criterion = nn.BCELoss()
        self.acc = Accuracy(task="binary")

    def forward(self, x):
        return self.model(x)

    def _shared_step(self, batch, stage):
        sequences = batch["sequence"]
        labels = batch["label"]
        outputs = self(sequences)
        loss = self.criterion(outputs, labels.float().unsqueeze(-1))
        predictions = (outputs > 0.5).float().squeeze(-1)
        accuracy = self.acc(predictions, labels.int())
        self.log(f"{stage}_loss", loss, prog_bar=True, logger=True)
        self.log(f"{stage}_accuracy", accuracy, prog_bar=True, logger=True)
        return {"loss": loss, "accuracy": accuracy}

    def training_step(self, batch, batch_idx):
        return self._shared_step(batch, "train")

    def validation_step(self, batch, batch_idx):
        return self._shared_step(batch, "val")

    def test_step(self, batch, batch_idx):
        return self._shared_step(batch, "test")

    def predict(self, sequence):
        self.eval()
        with torch.no_grad():
            sequence = sequence.to(self.device)
            output = self.forward(torch.Tensor(sequence))
            predictions = (output > 0.5).float().squeeze(-1)
        return predictions

    def configure_optimizers(self):
        return optim.Adam(self.parameters(), lr=0.001)

In [44]:
N_EP0CHS = 1
BATCH_SIZE = 32

data_module = Custom_data_module(train_sequences, val_sequences, test_sequences, BATCH_SIZE)
model = LSTM_Predictor(n_features=64, n_hidden=256, n_layers=3, dropout=0.25)

In [45]:
checkpoint_callback = ModelCheckpoint(
    dirpath="checkpoints",
    filename="best-checkpoint",
    save_top_k=1,
    verbose=True,
    monitor="val_loss",
    mode="min"
)

logger = TensorBoardLogger("lightning_logs", name="EEG")

trainer = pl.Trainer(
    logger=logger,
    callbacks=checkpoint_callback,
    max_epochs=N_EP0CHS,
)

INFO:pytorch_lightning.utilities.rank_zero:GPU available: False, used: False
INFO:pytorch_lightning.utilities.rank_zero:TPU available: False, using: 0 TPU cores
INFO:pytorch_lightning.utilities.rank_zero:IPU available: False, using: 0 IPUs
INFO:pytorch_lightning.utilities.rank_zero:HPU available: False, using: 0 HPUs


In [46]:
trainer.fit(model, data_module)

INFO:pytorch_lightning.callbacks.model_summary:
  | Name      | Type           | Params
---------------------------------------------
0 | model     | LSTM_Model     | 3.8 M 
1 | criterion | BCELoss        | 0     
2 | acc       | BinaryAccuracy | 0     
---------------------------------------------
3.8 M     Trainable params
0         Non-trainable params
3.8 M     Total params
15.256    Total estimated model params size (MB)


Sanity Checking: |          | 0/? [00:00<?, ?it/s]

Training: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

INFO:pytorch_lightning.utilities.rank_zero:Epoch 0, global step 30: 'val_loss' reached 0.73966 (best 0.73966), saving model to '/content/checkpoints/best-checkpoint-v2.ckpt' as top 1
INFO:pytorch_lightning.utilities.rank_zero:`Trainer.fit` stopped: `max_epochs=1` reached.


In [47]:
trainer.test(dataloaders=data_module.test_dataloader())

INFO:pytorch_lightning.utilities.rank_zero:Restoring states from the checkpoint path at /content/checkpoints/best-checkpoint-v2.ckpt
INFO:pytorch_lightning.utilities.rank_zero:Loaded model weights from the checkpoint at /content/checkpoints/best-checkpoint-v2.ckpt


Testing: |          | 0/? [00:00<?, ?it/s]

[{'test_loss': 0.5831756591796875, 'test_accuracy': 0.7709302306175232}]

In [22]:
N_EP0CHS = 5
BATCH_SIZE = 32

data_module = Custom_data_module(train_sequences, val_sequences, test_sequences, BATCH_SIZE)
model = LSTM_Predictor(n_features=64, n_hidden=256, n_layers=3, dropout=0.25)

checkpoint_callback = ModelCheckpoint(
    dirpath="checkpoints",
    filename="best-checkpoint",
    save_top_k=1,
    verbose=True,
    monitor="val_loss",
    mode="min"
)

logger = TensorBoardLogger("lightning_logs", name="EEG")

trainer = pl.Trainer(
    logger=logger,
    callbacks=checkpoint_callback,
    max_epochs=N_EP0CHS,
)

trainer.fit(model, data_module)

INFO:pytorch_lightning.utilities.rank_zero:GPU available: False, used: False
INFO:pytorch_lightning.utilities.rank_zero:TPU available: False, using: 0 TPU cores
INFO:pytorch_lightning.utilities.rank_zero:IPU available: False, using: 0 IPUs
INFO:pytorch_lightning.utilities.rank_zero:HPU available: False, using: 0 HPUs
/usr/local/lib/python3.10/dist-packages/pytorch_lightning/callbacks/model_checkpoint.py:653: Checkpoint directory /content/checkpoints exists and is not empty.
INFO:pytorch_lightning.callbacks.model_summary:
  | Name      | Type           | Params
---------------------------------------------
0 | model     | LSTM_Model     | 3.8 M 
1 | criterion | BCELoss        | 0     
2 | acc       | BinaryAccuracy | 0     
---------------------------------------------
3.8 M     Trainable params
0         Non-trainable params
3.8 M     Total params
15.256    Total estimated model params size (MB)


Sanity Checking: |          | 0/? [00:00<?, ?it/s]

Training: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

INFO:pytorch_lightning.utilities.rank_zero:Epoch 0, global step 30: 'val_loss' reached 0.37660 (best 0.37660), saving model to '/content/checkpoints/best-checkpoint-v1.ckpt' as top 1


Validation: |          | 0/? [00:00<?, ?it/s]

INFO:pytorch_lightning.utilities.rank_zero:Epoch 1, global step 60: 'val_loss' was not in top 1


Validation: |          | 0/? [00:00<?, ?it/s]

INFO:pytorch_lightning.utilities.rank_zero:Epoch 2, global step 90: 'val_loss' reached 0.23285 (best 0.23285), saving model to '/content/checkpoints/best-checkpoint-v1.ckpt' as top 1


Validation: |          | 0/? [00:00<?, ?it/s]

INFO:pytorch_lightning.utilities.rank_zero:Epoch 3, global step 120: 'val_loss' was not in top 1


Validation: |          | 0/? [00:00<?, ?it/s]

INFO:pytorch_lightning.utilities.rank_zero:Epoch 4, global step 150: 'val_loss' was not in top 1
INFO:pytorch_lightning.utilities.rank_zero:`Trainer.fit` stopped: `max_epochs=5` reached.


In [23]:
trainer.test(dataloaders=data_module.test_dataloader())

INFO:pytorch_lightning.utilities.rank_zero:Restoring states from the checkpoint path at /content/checkpoints/best-checkpoint-v1.ckpt
INFO:pytorch_lightning.utilities.rank_zero:Loaded model weights from the checkpoint at /content/checkpoints/best-checkpoint-v1.ckpt


Testing: |          | 0/? [00:00<?, ?it/s]

[{'test_loss': 0.20669136941432953, 'test_accuracy': 0.9220930337905884}]

next step:
 - tune 
 - feature engineering - choose only some sensors (feature engineering) [PRIORITY]
 - https://tsaug.readthedocs.io/en/stable/