### TRAINING CHAMBON 2018 WITH PHYSIOEX DATA

In [None]:
from physioex.physioex.train.utils import train, test
from physioex.physioex.data import PhysioExDataModule

datamodule = PhysioExDataModule(
    datasets=["sleepedf"],     # list of datasets to be used
    batch_size=64,             # batch size for the DataLoader
    preprocessing="raw",       # preprocessing method
    selected_channels=["EEG"], # channels to be selected
    sequence_length=21,        # length of the sequence
    data_folder="./data",      # path to the data folder
)

# get the DataLoaders
train_loader = datamodule.train_dataloader()
val_loader = datamodule.val_dataloader()
test_loader = datamodule.test_dataloader()

print(len(train_loader)) # number of batches in the training set
print(len(val_loader)) # number of batches in the validation set
print(len(test_loader)) # number of batches in the test set

checkpoint_path = "./model/checkpoint/"

2036
570
409


In [None]:
import os
import yaml
import importlib

# with open("./physioex/physioex/train/networks/config/chambon2018.yaml", "r") as file:
with open("./config.yaml", "r") as file:
    config = yaml.safe_load(file)

print(config)
network_config = config["model_config"]

print(network_config)

# load the loss function 
loss_package, loss_class = network_config["loss_call"].split(":")
model_loss = getattr(importlib.import_module(loss_package), loss_class)

print(model_loss)

# in case you provide model_name the system loads the additional model parameters from the library
if "model_name" in config:
    model_name = config["model_name"]
    # load the loss function 
    model_package, model_loader = network_config["loss_call"].split(":")
    model = getattr(importlib.import_module(model_package), model_loader)

print(model_name)

# load the model class
model_package, model_class = config["module"].split(":")
model_class = getattr(importlib.import_module(model_package), model_class)

print(model_class)

{'model_config': {'loss_call': 'physioex.train.networks.utils.loss:CrossEntropyLoss', 'loss_params': {}, 'sequence_length': 21, 'n_classes': 5, 'in_channels': 1, 'sf': 100, 'n_times': 3000, 'learning_rate': '1e-4', 'adam_beta_1': 0.9, 'adam_beta_2': 0.999, 'adam_epsilon': '1e-8', 'latent_space_dim': 32, 'input_processing': 'raw'}, 'checkpoint_path': './model/checkpoint/', 'input_transform': 'raw', 'model': 'physioex.train.networks.chambon2018:Chambon2018Net', 'model_name': 'chambon2018', 'module': 'physioex.train.networks:Chambon2018Net', 'preprocessing': 'raw', 'target_transform': 'physioex.train.networks.utils.target_transform:get_mid_label'}
{'loss_call': 'physioex.train.networks.utils.loss:CrossEntropyLoss', 'loss_params': {}, 'sequence_length': 21, 'n_classes': 5, 'in_channels': 1, 'sf': 100, 'n_times': 3000, 'learning_rate': '1e-4', 'adam_beta_1': 0.9, 'adam_beta_2': 0.999, 'adam_epsilon': '1e-8', 'latent_space_dim': 32, 'input_processing': 'raw'}
<class 'physioex.train.networks.

In [None]:
datamodule_kwargs = {
    "selected_channels" : ["EEG"], # needs to match in_channels
    "sequence_length" : network_config["sequence_length"],
    "target_transform" : config["target_transform"],
    "preprocessing" : config["input_transform"],
    "data_folder" : "./data",
}

# Train the model
best_checkpoint = train(
    datasets = datamodule,
    datamodule_kwargs = datamodule_kwargs,
    model_class = model_class,
    model_config = network_config,
    checkpoint_path = checkpoint_path,
    batch_size = 64,
    max_epochs = 10
)

# Test the model
results_dataframe = test(
    datasets = datamodule,
    datamodule_kwargs = datamodule_kwargs,
    model_class = model_class,
    model_config = network_config,
    chekcpoint_path = os.path.join( checkpoint_path, best_checkpoint ),
    batch_size = 64,
    results_dir = checkpoint_path,  # if you want to save the test results 
                                    # in your checkpoint directory
)

### TRAINING SIMPLE CNN WITH SIMULATED DATA

In [6]:
# Load and shape synthetic data
import os
import torch
import numpy as np
from torch.utils.data import DataLoader, TensorDataset

x = None
y = None

# load the data from synthetic data
data_folder = "./data/synthetic/train_1"
data_files = os.listdir(data_folder)
for file in data_files:
    if "samples_0" in file:
        if x is None and y is None:
            x = np.load(os.path.join(data_folder, file))
            y = np.zeros(5000)
        else:
            x = np.concatenate([x, np.load(os.path.join(data_folder, file))])
            y = np.concatenate([y, np.zeros(5000)])
    elif "samples_1" in file:
        if x is None and y is None:
            x = np.load(os.path.join(data_folder, file))
            y = np.ones(5000)
        else:
            x = np.concatenate([x, np.load(os.path.join(data_folder, file))])
            y = np.concatenate([y, np.ones(5000)])
    else:
        print("File not recognized")
        continue
    

x = x[:, np.newaxis, :]

print(x.shape)
print(y.shape)

# convert the data to torch tensors
x = torch.tensor(x, dtype=torch.float32)
y = torch.tensor(y, dtype=torch.long)

# create the dataset
dataset = TensorDataset(x, y)

# create the DataLoader
train_loader = DataLoader(dataset, batch_size=128, shuffle=True)


(50000, 1, 1000)
(50000,)


In [7]:
# print the shape of the dataloader
print(len(train_loader))
for x, y in train_loader:
    print(x.shape)
    print(y.shape)
    break

391
torch.Size([128, 1, 1000])
torch.Size([128])


In [8]:
import torch.nn as nn
import torch.optim as optim

def train_model(model, train_loader, epochs=20, lr=0.001):
    device = torch.device("mps" if torch.backends.mps.is_available else "cpu")
    print(device)
    model.to(device)  # Move model to GPU if available

    criterion = nn.CrossEntropyLoss()  # Binary classification loss
    optimizer = optim.Adam(model.parameters(), lr=lr)  # Adam optimizer

    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0
        
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            
            optimizer.zero_grad()  # Clear previous gradients
            outputs = model(inputs)  # Forward pass
            loss = criterion(outputs, labels)  # Compute loss
            loss.backward()  # Backpropagation
            optimizer.step()  # Update weights
            
            running_loss += loss.item()
            _, predicted = torch.max(outputs, 1)  # Get predictions
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        train_acc = correct / total
        print(f"Epoch {epoch+1}/{epochs}, Loss: {running_loss/len(train_loader):.4f}, Accuracy: {train_acc:.4f}")

    print("Training complete!")


In [9]:
# train the SimpleCNN model
from src.models.simple import SimpleCNN

model = SimpleCNN(in_channels=1, out_channels=2, hidden_size=64, kernel_size=31)

# Train the model
train_model(model, train_loader, epochs=5, lr=0.001)


  from .autonotebook import tqdm as notebook_tqdm


mps
Epoch 1/5, Loss: 0.0531, Accuracy: 0.9899
Epoch 2/5, Loss: 0.0003, Accuracy: 1.0000
Epoch 3/5, Loss: 0.0001, Accuracy: 1.0000
Epoch 4/5, Loss: 0.0001, Accuracy: 1.0000
Epoch 5/5, Loss: 0.0000, Accuracy: 1.0000
Training complete!


In [10]:
# save the model
torch.save(model.state_dict(), "./model/checkpoint/simpleCNN_3.pth")
