In [15]:
import torch
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt

In [16]:
# RNN model
class ntwkRNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, dropout, bidirectional):
        super(ntwkRNN, self).__init__()
        self.hidden_size = hidden_size
        self.RNN = nn.RNN(input_size, hidden_size, batch_first=True, dropout=dropout, bidirectional=bidirectional)
        directional_multiplier = 2 if bidirectional else 1
        self.fc = nn.Linear(directional_multiplier * hidden_size, output_size)

    def forward(self, x):
        output, _ = self.RNN(x)

        if output.ndim == 3:
            output = self.fc(output[:, -1, :])  # Use the last timestep
        else:
            raise ValueError(f"Expected output to be 3-dimensional, got {output.ndim} dimensions")
        return output
        #output = self.fc(output[:, -1, :])  #last output of RNN


class ntwkGRU(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, dropout, bidirectional):
        super(ntwkGRU, self).__init__()
        self.hidden_size = hidden_size
        self.GRU = nn.GRU(input_size, hidden_size, batch_first=True, dropout=dropout, bidirectional=bidirectional)
        directional_multiplier = 2 if bidirectional else 1
        self.fc = nn.Linear(directional_multiplier * hidden_size, output_size)

    def forward(self, x):
        output, _ = self.GRU(x)
        output = self.fc(output[:, -1, :])  # last output of GRU
        return output
    
class ntwkLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, dropout, bidirectional):
        super(ntwkLSTM, self).__init__()
        self.hidden_size = hidden_size

        self.LSTM = nn.LSTM(input_size, hidden_size, batch_first=True, dropout=dropout, bidirectional=bidirectional)
        directional_multiplier = 2 if bidirectional else 1
        self.fc = nn.Linear(directional_multiplier * hidden_size, output_size)

    def forward(self, x):
        output, _ = self.LSTM(x)
        output = self.fc(output[:, -1, :])  # last output of LSTM
        return output

In [17]:
def train_RNN(epochs, model, criterion, optimizer, train_loader, test_loader, device):
    train_loss = []
    val_loss = []

    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        for seq, labels in train_loader:
            seq, labels = seq.to(device), labels.to(device)

            optimizer.zero_grad()

            y_pred = model(seq)
            loss = criterion(y_pred, labels)

            loss.backward()
            optimizer.step()

            running_loss += loss.item()

        train_loss_epoch = running_loss / len(train_loader)
        train_loss.append(train_loss_epoch)

        model.eval()
        with torch.no_grad():
            running_val_loss = 0.0
            for seq, labels in test_loader:
                seq, labels = seq.to(device), labels.to(device)

                y_pred = model(seq)
                val_loss_epoch = criterion(y_pred, labels)
                running_val_loss += val_loss_epoch.item()

            avg_val_loss_epoch = running_val_loss / len(test_loader)
            val_loss.append(avg_val_loss_epoch)

        print(f'Epoch {epoch + 1}, Train Loss: {train_loss_epoch:.4f}, Val Loss: {avg_val_loss_epoch:.4f}')

    return train_loss, val_loss


In [18]:
#transformer model
class ntwkTransformer(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers, nheads, dropout):
        super(ntwkTransformer, self).__init__()
        
        self.embedding = nn.Embedding(input_size, hidden_size)
        
        encoder_layers = nn.TransformerEncoderLayer(hidden_size, nheads, dropout = dropout)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layers, num_layers)
        
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        embedded = self.embedding(x)
        transformer_output = self.transformer_encoder(embedded)
        output = self.fc(transformer_output[:, -1, :])  # Get the output of the last Transformer block
        return output

In [19]:
#Posintional Encoding
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.encoding = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
        self.encoding[:, 0::2] = torch.sin(position * div_term)
        self.encoding[:, 1::2] = torch.cos(position * div_term)
        self.encoding = self.encoding.unsqueeze(0)

    def forward(self, x):
        return x + self.encoding[:, :x.size(1)].detach()

#transformer w/ Positional Encoding for regression
class ntwkPETransformer(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers, nhead):
        super(ntwkPETransformer, self).__init__()
        
        self.embedding = nn.Embedding(input_size, hidden_size)
        self.pos_encoder = PositionalEncoding(hidden_size)
        
        encoder_layers = nn.TransformerEncoderLayer(hidden_size, nhead, batch_first=True)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layers, num_layers)
        
        self.fc = nn.Linear(hidden_size, output_size)
        self.softmax = nn.Softmax(dim=2) 


    def forward(self, x):
        embedded = self.embedding(x)
        embedded = self.pos_encoder(embedded)
        
        transformer_output = self.transformer_encoder(embedded)
        output = self.fc(transformer_output)
        
        return output

In [20]:
#train loop for transformer
def trainTransformer(model,epochs, criterion, optimizer, train_loader, test_loader, device):
    train_loss = []
    val_loss = []
    val_acc = []
    for epoch in range(epochs):
        for seq, labels in train_loader:
            optimizer.zero_grad()
            
            seq = seq.to(device)
            output = model(seq)
            
            loss = criterion(output.transpose(1, 2), labels)
            loss.backward()
            optimizer.step()
        
        train_loss.append(loss)
        
        with torch.no_grad():
            for seq, labels in test_loader:
                seq = seq.to(device)
                labels = labels.to(device)
                val_output = model(seq)
                val_loss = criterion(val_output, labels)
                _, predicted = torch.max(val_output, 1)
                val_accuracy = (predicted == labels).float().mean()
                
            val_loss.append(loss)
            val_acc.append(val_accuracy)
        if (epoch+1) % 10 == 0:
            print(f'Epoch {epoch+1}, Loss: {loss.item()}, Validation Loss: {val_loss.item()}, Validation Accuracy: {val_accuracy.item()}')
            
    return train_loss, val_loss, val_acc

In [21]:
def lossplot(t_loss,v_loss, epochs):
    
    plt.plot(epochs,t_loss, label="Training Loss")
    plt.plot(epochs, v_loss, label="Validation Loss")
    plt.xlabel("Epochs")
    plt.ylabel("Loss")
    plt.legend()
    plt.title("Training and Validation Loss Over Epochs")
    plt.show()