# CNN + GRU Model

## Import libraries

In [None]:
import torch
from torch import nn
import torch.nn.functional as F

import numpy as np
import pandas as pd

from sklearn.model_selection import train_test_split

from common_utils import get_dataloaders, setSeeds

# Set seed
SEED = setSeeds()

In [None]:
# class GRUModel(nn.Module):
#     def __init__(self, input_size, hidden_size, num_layers, numClasses, bidirectional=False):
#         super(GRUModel, self).__init__()
        
#         # Define the GRU layer
#         self.gru = nn.GRU(input_size=input_size, 
#                           hidden_size=hidden_size, 
#                           num_layers=num_layers, 
#                           batch_first=True,
#                           bidirectional=bidirectional
#                           )
        
#         # Define a fully connected output layer
#         self.fc = nn.Linear(hidden_size, numClasses)
    
#     def forward(self, x):
#         # Initialize hidden state for GRU
#         h0 = torch.zeros(self.gru.num_layers * (2 if self.gru.bidirectional else 1), x.size(0), self.gru.hidden_size).to(x.device)
        
#         # Forward propagate through GRU
#         outputs, hidden_states = self.gru(x, h0)
        
#         # Take the output from the last time step
#         out = out[:, -1, :]
        
#         # Pass through the fully connected layer
#         out = self.fc(out)
#         return out

In [None]:
class EarlyStopper:
    def __init__(self, patience=1, min_delta=0):
        self.patience = patience
        self.min_delta = min_delta
        self.counter = 0
        self.min_validation_loss = float('inf')

    def early_stop(self, validation_loss):
        if validation_loss < self.min_validation_loss:
            self.min_validation_loss = validation_loss
            self.counter = 0
        elif validation_loss > (self.min_validation_loss + self.min_delta):
            self.counter += 1
            if self.counter >= self.patience:
                return True
        return False

## Dataset

In [None]:
train_df = pd.read_csv('Heartbeat_Dataset/mitbih_train.csv', header=None)
test_df = pd.read_csv('Heartbeat_Dataset/mitbih_test.csv', header=None)

In [None]:
def get_train_test(train_df, test_df):
    # Separate features and labels
    X_train = train_df.iloc[:, :-1].values  # Exclude label
    y_train = train_df.iloc[:, -1].values   # Labels

    X_test = test_df.iloc[:, :-1].values
    y_test = test_df.iloc[:, -1].values

    return X_train, y_train, X_test, y_test

In [None]:
X_train, y_train, X_test, y_test = get_train_test(train_df, test_df)

# Split the training data to obtain new train and validation sets
# Stratifying the split so both train and validation sets have same class distribution
X_train_new, X_val, y_train_new, y_val = train_test_split(X_train, y_train, test_size=0.2, random_state=SEED, stratify=y_train)

# We add an additional dimension to make it suitable for 1D CNN.
X_train = np.expand_dims(X_train, axis=1)  # Change axis from 2 to 1
X_val = np.expand_dims(X_val, axis=1)    # Change axis from 2 to 1

# Get dataloaders
train_loader, val_loader = get_dataloaders(X_train, X_val, y_train, y_val)

## Model

In [None]:
class CNN_GRU(nn.Module):
    def __init__(self, num_inputs, num_classes=5):
        super(CNN_GRU, self).__init__()

        self.conv_layers = nn.Sequential(
            nn.Conv1d(in_channels=1, out_channels=64, kernel_size=3),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2),
            nn.Conv1d(64, 128, kernel_size=3),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2),
            nn.Conv1d(128, 256, kernel_size=3),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2),
            nn.Dropout(0.3)
        )

        self.gru = nn.GRU(input_size=256, hidden_size=64, batch_first=True)
        self.fc = nn.Linear(64, num_classes)
        
        # # Calculate the output size after the convolutional and pooling layers
        # self.flatten = nn.Flatten()
        # # The input length to the first linear layer needs to be calculated
        # self.fc1 = nn.Linear(256 * self._get_conv_output_size(num_inputs), 256)  # Adjust this based on output size
        # self.dropout = nn.Dropout(0.5)
        # self.fc2 = nn.Linear(256, num_classes)

    # def _get_conv_output_size(self, input_length):
    #     # Calculate the output length after the conv/pool layers
    #     output_length = input_length
        
    #     # Each Conv1d reduces the length by (kernel_size - 1) on each side, then pooling reduces by half
    #     for _ in range(3):  # 3 convolutional layers
    #         output_length = (output_length - 2) // 2  # After conv and pooling

    #     return output_length

    def forward(self, x):
        x = self.conv_layers(x)
        x = torch.permute(x, (0, 2, 1))

        x, hidden_state = self.gru(x)
        x = x[:, -1, :]

        x = self.fc(x)
        
        return x

## Training

In [None]:
# Train loop
def train(model, optimizer, train_loader, device, criterion=nn.CrossEntropyLoss()):
    model.train()

    train_correct = 0
    train_loss = 0

    for X_batch, y_batch in train_loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)  # Move data to device
        # X_batch = X_batch.unsqueeze(-1) # Dataloader collapses last dimension of 1
        
        # Forward pass
        outputs = model(X_batch)
        # print(f'Output {outputs.shape}, y_batch {y_batch.shape}')
        loss = criterion(outputs, y_batch.long())
        
        # Backward pass and optimization
        optimizer.zero_grad()  # Clear gradients
        loss.backward()        # Backpropagation
        optimizer.step()       # Update weights

        # Get predictions
        _, predicted = torch.max(outputs, 1)
        train_correct += (predicted == y_batch).sum().item()
        
        train_loss += loss.item() * X_batch.size(0)  # Accumulate loss

    # Calculate average training loss
    train_loss /= len(train_loader.dataset)
    train_accuracy = train_correct / len(train_loader.dataset)

    return train_loss, train_accuracy

In [None]:
# Validation/Test loop
def evaluate(model, scheduler, val_loader, device, criterion=nn.CrossEntropyLoss()):
    model.eval()

    val_loss = 0.0
    val_acc = 0.0
    correct = 0

    with torch.no_grad():  # No need to calculate gradients for validation/testing
        for X_batch, y_batch in val_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            # print(f"X_batch {X_batch.shape}")
            # X_batch = X_batch.unsqueeze(-1) # Dataloader collapses last dimension of 1
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch.long())
            val_loss += loss.item() * X_batch.size(0)
            
            # Get predictions
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == y_batch).sum().item()
        
        # Calculate average validation loss and accuracy
        val_loss /= len(val_loader.dataset)
        val_acc = correct / len(val_loader.dataset)

        scheduler.step(val_loss)
        
    return val_loss, val_acc 

In [None]:
def train_val_loop(model, optimizer, scheduler, train_loader, val_loader, num_epochs=50):

    early_stopper = EarlyStopper(patience=10)

    # Move model to GPU if available
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)

    train_losses, train_accuracies = [], []
    val_losses, val_accuracies = [], []

    for epoch in range(1, num_epochs+1):
        train_loss, train_acc = train(model, optimizer, train_loader, device)
        train_losses.append(train_loss)
        train_accuracies.append(train_acc)

        val_loss, val_acc = evaluate(model, scheduler, val_loader, device)
        val_losses.append(val_loss)
        val_accuracies.append(val_acc)

        # Print epoch results
        print(f'Epoch [{epoch}/{num_epochs}], '
            f'Train Loss: {train_loss:.4f}, '
            f'Train Accuracy: {train_acc * 100:.2f}%, '
            f'Validation Loss: {val_loss:.4f}, '
            f'Validation Accuracy: {val_acc * 100:.2f}%'
            f'Learning rate: {scheduler.get_last_lr()}')
        
        if early_stopper.early_stop(val_loss):
            print(f'STOPPED AT EPOCH {epoch}')
            break
        
    return train_losses, train_accuracies, val_losses, val_accuracies

In [None]:
# Instantiate model
model = CNN_GRU(num_inputs=187)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=50)

In [None]:
train_losses, train_accuracies, val_losses, val_accuracies = train_val_loop(model, optimizer, scheduler, train_loader, val_loader)

In [None]:
PATH = './model/CNN_GRU_model.pth'
torch.save(model.state_dict(), PATH)

### Plot loss and accuracy graph of train and validation

In [None]:
import matplotlib.pyplot as plt

def plot_graphs(model, train_accuracies, train_losses, val_accuracies, val_losses):
    model_name = model.getName()

    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(10, 5))

    ax1.plot(range(1, len(train_losses)+1), train_losses, label='Training Loss')
    ax1.plot(range(1, len(val_losses)+1), val_losses, label='Validation Loss')
    ax1.set_title(f'{model_name} Loss')
    ax1.legend(loc='lower right')

    ax2.plot(range(1, len(train_accuracies)+1), train_accuracies, label='Training Accuracy')
    ax2.plot(range(1, len(val_accuracies)+1), val_accuracies, label='Validation Accuracy')
    ax2.set_title(f'{model_name} Accuracy')
    ax2.legend(loc='lower right')

In [None]:
plot_graphs(model, train_accuracies, train_losses, val_accuracies, val_losses)

## Test

In [None]:
def train_test_loop(model, optimizer, scheduler, train_loader, test_loader, num_epochs=50):

    # Move model to GPU if available
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)

    train_losses, train_accuracies = [], []
    test_losses, test_accuracies = [], []

    for epoch in range(1, num_epochs+1):
        train_loss, train_acc = train(model, optimizer, train_loader, device)
        train_losses.append(train_loss)
        train_accuracies.append(train_acc)

        test_loss, test_acc = evaluate(model, scheduler, test_loader, device)
        test_losses.append(test_loss)
        test_accuracies.append(test_acc)

        # Print epoch results
        print(f'Epoch [{epoch}/{num_epochs}], '
            f'Train Loss: {train_loss:.4f}, '
            f'Train Accuracy: {train_acc * 100:.2f}%, '
            f'Test Loss: {test_loss:.4f}, '
            f'Test Accuracy: {test_acc * 100:.2f}%')
        
    return train_losses, train_accuracies, test_losses, test_accuracies

In [None]:
# Instantiate model
model = CNN_GRU(num_inputs=187)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=5)

In [None]:
X_train, y_train, X_test, y_test = get_train_test(train_df, test_df)

# We add an additional dimension to make it suitable for 1D CNN.
X_train = np.expand_dims(X_train, axis=1)  # Change axis from 2 to 1
X_test = np.expand_dims(X_val, axis=1)    # Change axis from 2 to 1

# Get dataloaders
train_loader, test_loader = get_dataloaders(X_train, X_test, y_train, y_test)

In [None]:
train_losses, train_accuracies, test_losses, test_accuracies = train_test_loop(model, optimizer, scheduler, train_loader, test_loader)

In [None]:
PATH = './model/CNN_GRU_model.pth'
torch.save(model.state_dict(), PATH)

In [None]:
y_pred, y_true = [], []
criterion = nn.CrossEntropyLoss()
device = torch.device('cuda' if torch.cuda.is_available else 'cpu')

with torch.no_grad():  # No need to calculate gradients for validation/testing
    for X_batch, y_batch in val_loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)
        # print(f"X_batch {X_batch.shape}")
        # X_batch = X_batch.unsqueeze(-1) # Dataloader collapses last dimension of 1
        outputs = model(X_batch)
        y_pred.append(outputs.cpu().numpy())
        y_true.append(y_batch.cpu().numpy())

In [None]:
from sklearn.metrics import classification_report, confusion_matrix

report = classification_report(y_true, y_pred, labels=[0.0, 1.0, 2.0, 3.0, 4.0], target_names=['N', 'S', 'V', 'F', 'Q'], output_dict=True)
conf_matrix = confusion_matrix(y_true, y_pred, labels=[0.0, 1.0, 2.0, 3.0, 4.0])

In [None]:
report(y_true, y_pred)

In [None]:
conf_matrix(y_true, y_pred)