# Task 1.2 – Modeling and Tuning (ECG Classification)

Read training-data (ECG-time series) as pickle from task 1

In [5]:
import pickle
from collections import Counter

# Load the dataset from task 1
with open("../data/split_data.pkl", "rb") as f:
    X_train_split, X_val_split, y_train_split, y_val_split = pickle.load(f)

# X_train_split: Trainingsdaten
# X_val_split: Validierungsdaten
# y_train_split: Labels für Trainingsdaten
# y_val_split: Labels für Validierungsdaten

print(f"Trainingsdaten: {len(X_train_split)} Zeitreihen")
print(f"Validierungsmenge: {len(X_val_split)} Beispiele")

# class distribution
print("Train-class-distribution:", Counter(y_train_split))
print("Val-class-distribution:", Counter(y_val_split))


Trainingsdaten: 4945 Zeitreihen
Validierungsmenge: 1234 Beispiele
Train-class-distribution: Counter({0: 2911, 2: 1412, 1: 440, 3: 182})
Val-class-distribution: Counter({0: 727, 2: 353, 1: 109, 3: 45})


Padding of all training time series (= adding 0 to the ECG-time series to make them 
evenly sized)

In [6]:
import numpy as np

def pad_sequences(sequences, max_len=None):
    if not max_len:
        max_len = max(len(seq) for seq in sequences)
    return np.array([np.pad(seq, (0, max_len - len(seq))) for seq in sequences])

# Check the maximum length of training and validation sequences
max_len = max(max(len(seq) for seq in X_train_split),
              max(len(seq) for seq in X_val_split))

print("Maximum length of training and validation sequences:", max_len)

# Pad the sequences to the same length
X_train_split = pad_sequences(X_train_split, max_len)
X_val_split = pad_sequences(X_val_split, max_len)

# Check the shape of the padded sequences
print("Shape of padded training data:", X_train_split.shape)
print("Shape of padded validation data:", X_val_split.shape)

Maximum length of training and validation sequences: 18286
Shape of padded training data: (4945, 18286)
Shape of padded validation data: (1234, 18286)


Convert 1D-NumPy-Arrays into 2D-Tensor with short-time Fourier transform (STFT)

In [7]:
import torch

def apply_stft_numpy(X, n_fft=256, hop_length=128):
    """
    Wandelt ein NumPy-Array mit gepaddeten Zeitreihen in STFT-Spektren um.
    
    Args:
        data_np (np.ndarray): Input (n_samples, sequence_length)
        n_fft (int): Größe des STFT-Fensters
        hop_length (int): Schrittweite der Fenster
    
    Returns:
        torch.Tensor: STFT-Magnitude-Spektren (n_samples, freq_bins, time_steps)
    """
    # convert to torch tensor
    data_tensor = torch.tensor(X, dtype=torch.float32)

    # window function for STFT
    window = torch.hann_window(n_fft)

    # empty list to store STFT results
    stft_results = []

    # iterate over each time series
    for i in range(data_tensor.shape[0]):
        # compute STFT
        stft = torch.stft(
            data_tensor[i], 
            n_fft=n_fft, 
            hop_length=hop_length, 
            window=window, 
            return_complex=True
            )
        
        # compute magnitude
        stft_magnitude = torch.abs(stft)
        
        # append to results
        stft_results.append(stft_magnitude)

        # convert into 3D tensor
        stft_tensor = torch.stack(stft_results)
        
    return stft_tensor
    
# Apply STFT to the training and validation data
X_train_stft = apply_stft_numpy(X_train_split)  # Output: Tensor mit Shape (4943, freq_bins, time_steps)
X_val_stft = apply_stft_numpy(X_val_split)

# Check the shape of the STFT results
print("Shape of STFT training data:", X_train_stft.shape)
print("Shape of STFT validation data:", X_val_stft.shape)

# Save the STFT results
torch.save({'X_val_stft': X_val_stft, 'y_val_split': y_val_split}, '../data/val_data.pt')

Shape of STFT training data: torch.Size([4945, 129, 143])
Shape of STFT validation data: torch.Size([1234, 129, 143])


RuntimeError: Parent directory ../data does not exist.

Convert tensors into DataLoader objects for the model training

In [None]:
from torch.utils.data import Dataset, DataLoader

# Create a custom Dataset class for the STFT data
class ECGSpectrogramDataset(Dataset):
    def __init__(self, X, y):
        self.X = X.unsqueeze(1)  # füge Channel-Dimension hinzu → (N, 1, Freq, Time)
        self.y = torch.tensor(y, dtype=torch.long)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

# function to create DataLoader objects for training and validation
def create_spectrogram_dataloaders(X_train_stft, y_train, X_val_stft, y_val, batch_size=32):
    """
    Erstellt trainierbare DataLoader aus STFT-Tensoren und Label-Listen.

    Args:
        X_train_stft (torch.Tensor): Trainingsdaten (N_train, Freq, Time)
        y_train (List[int]): Trainingslabels
        X_val_stft (torch.Tensor): Validierungsdaten (N_val, Freq, Time)
        y_val (List[int]): Validierungslabels
        batch_size (int): Größe der Batches

    Returns:
        train_loader, val_loader: DataLoader-Objekte
    """
    train_dataset = ECGSpectrogramDataset(X_train_stft, y_train)
    val_dataset = ECGSpectrogramDataset(X_val_stft, y_val)

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

    return train_loader, val_loader

# Create DataLoader objects for training and validation
train_loader, val_loader = create_spectrogram_dataloaders(
    X_train_stft, y_train_split, X_val_stft, y_val_split
)

# Check the shape of a batch from the DataLoader and the labels
X_batch, y_batch = next(iter(train_loader))
print("Batch shape (X):", X_batch.shape)
print("Batch shape (y):", y_batch.shape)
#print("Labels im Batch:", X_batch.tolist())
#print("Labels im Batch:", y_batch.tolist())


Batch shape (X): torch.Size([32, 1, 129, 143])
Batch shape (y): torch.Size([32])


Definition and training of CNN-model (Convolutional Neural Network)

In [None]:
import torch.nn as nn
# from model import ECGCNN

# # method to train the model 
# def train_model(model, train_loader, val_loader, epochs=10, lr=0.001):
    
#     device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
#     model.to(device)

#     criterion = nn.CrossEntropyLoss()
#     optimizer = torch.optim.Adam(model.parameters(), lr=lr)

#     for epoch in range(epochs):
#         model.train()
#         train_loss, correct, total = 0.0, 0, 0

#         for X_batch, y_batch in train_loader:
#             X_batch, y_batch = X_batch.to(device), y_batch.to(device)
#             optimizer.zero_grad()
#             outputs = model(X_batch)
#             loss = criterion(outputs, y_batch)
#             loss.backward()
#             optimizer.step()

#             train_loss += loss.item()
#             _, predicted = torch.max(outputs, 1)
#             total += y_batch.size(0)
#             correct += (predicted == y_batch).sum().item()

#         train_acc = correct / total

#         # evaluate on validation set
#         model.eval()
#         val_loss, val_correct, val_total = 0.0, 0, 0
#         with torch.no_grad():
#             for X_val, y_val in val_loader:
#                 X_val, y_val = X_val.to(device), y_val.to(device)
#                 outputs = model(X_val)
#                 loss = criterion(outputs, y_val)
#                 val_loss += loss.item()
#                 _, predicted = torch.max(outputs, 1)
#                 val_total += y_val.size(0)
#                 val_correct += (predicted == y_val).sum().item()

#         val_acc = val_correct / val_total

#         # print the training and validation loss and accuracy
#         print(f"Epoch {epoch+1}: "
#               f"Train Loss={train_loss:.4f}, Train Acc={train_acc:.4f}, "
#               f"Val Loss={val_loss:.4f}, Val Acc={val_acc:.4f}")


# # only once for testing the model architecture
# # X_batch, _ = next(iter(train_loader))
# # cnn_model = ECGCNN()
# # cnn_model(X_batch)  # ruft automatisch forward() auf

# # instantiate the model and train it   
# cnn_model = ECGCNN()

# # train the model
# train_model(cnn_model, train_loader, val_loader, epochs=10)

# # # Modell speichern
# # torch.save(cnn_model.state_dict(), "../models/cnn_model_wt_tune.pth")

# # load the model
# # cnn_model.load_state_dict(torch.load("model.pth"))

# # delete the model to free up memory
# # del cnn_model

# # Check the number of parameters in the model
# print("Anzahl der Modell-Parameter:", sum(p.numel() for p in cnn_model.parameters()))

Tuning approch 1

In [None]:
# import torch.optim as optim
# from itertools import product

# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# # Trainings- & Evaluationsfunktion
# def train_and_eval(model, train_loader, val_loader, lr, weight_decay, optimizer_type, epochs=10):
#     criterion = nn.CrossEntropyLoss()
#     if optimizer_type == "adam":
#         optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)
#     elif optimizer_type == "sgd":
#         optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9, weight_decay=weight_decay)
#     else:
#         raise ValueError("Unsupported optimizer")

#     model.to(device)
#     best_acc = 0
#     val_loss_final = 0.0

#     for epoch in range(epochs):
#         model.train()
#         for X_batch, y_batch in train_loader:
#             X_batch, y_batch = X_batch.to(device), y_batch.to(device)
#             optimizer.zero_grad()
#             outputs = model(X_batch)
#             loss = criterion(outputs, y_batch)
#             loss.backward()
#             optimizer.step()

#         # Evaluation
#         model.eval()
#         val_correct, val_total, val_loss = 0, 0, 0.0
#         with torch.no_grad():
#             for X_val, y_val in val_loader:
#                 X_val, y_val = X_val.to(device), y_val.to(device)
#                 outputs = model(X_val)
#                 loss = criterion(outputs, y_val)
#                 val_loss += loss.item()
#                 _, predicted = torch.max(outputs, 1)
#                 val_total += y_val.size(0)
#                 val_correct += (predicted == y_val).sum().item()

#         val_acc = val_correct / val_total
#         val_loss_final = val_loss / len(val_loader)

#         if val_acc > best_acc:
#             best_acc = val_acc
#             filename = f"best_lr{lr}_opt{weight_decay}.pth"
#             torch.save(model.state_dict(), f"../models/tuning_approach_without_weights{filename}")

#     return best_acc, val_loss_final

# # Hyperparameter-Raster
# param_grid = {
#     'lr': [0.001],
#     'weight_decay': [1e-4],
#     'optimizer_type': ['adam'],
#     'epochs': [10]
# }

# results = []

# # Tuning Loop
# for params in product(*param_grid.values()):
#     param_dict = dict(zip(param_grid.keys(), params))
#     print(f"\nTrainiere mit Parametern: {param_dict}")

#     #model = ECGCNN()

#     val_acc, val_loss = train_and_eval(model, train_loader, val_loader, **param_dict)

#     param_dict["val_accuracy"] = val_acc
#     param_dict["val_loss"] = val_loss
#     results.append(param_dict)

#     #del model
#     torch.cuda.empty_cache()



Trainiere mit Parametern: {'lr': 0.001, 'weight_decay': 0.0001, 'optimizer_type': 'adam', 'epochs': 10}


Tuning approach 2

In [None]:
# import torch.optim as optim
# from itertools import product
# import torch.nn as nn
# import torch
# import numpy as np

# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# class_counts = np.bincount(y_train_split)
# class_weights = 1. / class_counts
# class_weights = class_weights / class_weights.sum()
# weights_tensor = torch.tensor(class_weights, dtype=torch.float32).to(device)

# def train_and_eval(model, train_loader, val_loader, lr, weight_decay, optimizer_type, epochs=10):
#     criterion = nn.CrossEntropyLoss(weight=weights_tensor)
#     if optimizer_type == "adam":
#         optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)
#     elif optimizer_type == "sgd":
#         optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9, weight_decay=weight_decay)
#     else:
#         raise ValueError("Unsupported optimizer")

#     model.to(device)
#     best_acc = 0
#     val_loss_final = 0.0

#     for epoch in range(epochs):
#         model.train()
#         for X_batch, y_batch in train_loader:
#             X_batch, y_batch = X_batch.to(device), y_batch.to(device)
#             optimizer.zero_grad()
#             outputs = model(X_batch)
#             loss = criterion(outputs, y_batch)
#             loss.backward()
#             optimizer.step()

#         # Evaluation
#         model.eval()
#         val_correct, val_total, val_loss = 0, 0, 0.0
#         with torch.no_grad():
#             for X_val, y_val in val_loader:
#                 X_val, y_val = X_val.to(device), y_val.to(device)
#                 outputs = model(X_val)
#                 loss = criterion(outputs, y_val)
#                 val_loss += loss.item()
#                 _, predicted = torch.max(outputs, 1)
#                 val_total += y_val.size(0)
#                 val_correct += (predicted == y_val).sum().item()

#         val_acc = val_correct / val_total
#         val_loss_final = val_loss / len(val_loader)

#         if val_acc > best_acc:
#             best_acc = val_acc
#             filename = f"best_lr{lr}_opt{weight_decay}.pth"
#             torch.save(model.state_dict(), f"../models/tuning_approach_2/{filename}")

#     return best_acc, val_loss_final

# # Hyperparameter-Raster
# param_grid = {
#     'lr': [0.001],
#     'weight_decay': [1e-2, 1e-3, 1e-4, 1e-5, 0],
#     'optimizer_type': ['adam'],
#     'epochs': [10]
# }

# results = []

# # Tuning Loop
# for params in product(*param_grid.values()):
#     param_dict = dict(zip(param_grid.keys(), params))
#     print(f"\nTrainiere mit Parametern: {param_dict}")

#     #model = ECGCNN()

#     val_acc, val_loss = train_and_eval(model, train_loader, val_loader, **param_dict)

#     param_dict["val_accuracy"] = val_acc
#     param_dict["val_loss"] = val_loss
#     results.append(param_dict)

#     del model
#     torch.cuda.empty_cache()


Trainiere mit Parametern: {'lr': 0.001, 'weight_decay': 0.01, 'optimizer_type': 'adam', 'epochs': 10}

Trainiere mit Parametern: {'lr': 0.001, 'weight_decay': 0.001, 'optimizer_type': 'adam', 'epochs': 10}

Trainiere mit Parametern: {'lr': 0.001, 'weight_decay': 0.0001, 'optimizer_type': 'adam', 'epochs': 10}

Trainiere mit Parametern: {'lr': 0.001, 'weight_decay': 1e-05, 'optimizer_type': 'adam', 'epochs': 10}

Trainiere mit Parametern: {'lr': 0.001, 'weight_decay': 0, 'optimizer_type': 'adam', 'epochs': 10}
