In [1]:
import os
import numpy as np
import torch
import torch.nn as nn
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, TensorDataset
from torch.optim.lr_scheduler import ReduceLROnPlateau
import matplotlib.pyplot as plt
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import confusion_matrix
from sklearn.metrics import f1_score
import seaborn as sns
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence

In [None]:
class GRUModelPackUnpack(nn.Module):
    def __init__(self, input_dim, hidden_dim, layer_dim, output_dim, dropout_prob=0.5):
        super(GRUModelPackUnpack, self).__init__()
        
        self.hidden_dim = hidden_dim
        self.layer_dim = layer_dim
        
        # GRU layers
        self.gru = nn.GRU(input_dim, self.hidden_dim, self.layer_dim, dropout=dropout_prob, batch_first=True, bidirectional=True)

        # Fully connected layers
        self.fc1 = nn.Linear(self.hidden_dim * 2, output_dim)

    def forward(self, x, lengths):
        # Pack the padded sequence
        packed_x = pack_padded_sequence(x, lengths, batch_first=True, enforce_sorted=False)
        packed_output, _ = self.gru(packed_x)
        output, _ = pad_packed_sequence(packed_output, batch_first=True)

        out_forward = output[range(len(output)), lengths - 1, :self.hidden_dim]
        out_reverse = output[:, 0, self.hidden_dim:]
        out_reduced = torch.cat((out_forward, out_reverse), dim=1)
        out = self.fc1(out_reduced)
        return out

In [None]:
data = np.load("X-y.npz") # X is 3 dimensional (batch, sequence-length, sequence-dim), y is bucketed RIFL value
X = data['X']
y = data['y']
X = torch.Tensor(X)
y = torch.Tensor(y).type(torch.long)

x_train, x_temp, y_train, y_temp = train_test_split(X, y, test_size = 0.3, stratify=y, random_state = 32)
x_val, x_test, y_val, y_test = train_test_split(x_temp, y_temp, test_size = 0.5, stratify=y_temp, random_state = 23)

train = TensorDataset(x_train, y_train)
val = TensorDataset(x_val, y_val)
test = TensorDataset(x_test, y_test)

# hyperparameters
batch_size = 80
num_epochs = 200

# generate data loaders
train_loader = DataLoader(train, batch_size = batch_size, shuffle = True)
val_loader = DataLoader(val, batch_size = batch_size, shuffle = True)
test_loader = DataLoader(test, batch_size = batch_size, shuffle = True)

# Create RNN
input_dim = 136   # input dimension
hidden_dim = 136  # hidden layer dimension
layer_dim = 2    # number of hidden layers
output_dim = 3   # output dimension
dropout_prob = 0.00

model = GRUModelPackUnpack(input_dim, hidden_dim, layer_dim, output_dim, dropout_prob)

# SGD Optimizer
learning_rate = 0.1
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=1e-5)
scheduler = ReduceLROnPlateau(optimizer, 'min', factor=0.5, patience=10, threshold=1e-2)

# Loss function with class weights
class_weights = compute_class_weight(class_weight='balanced', classes=np.unique(y_train), y=y_train.numpy())
class_weights = torch.tensor(class_weights, dtype=torch.float)
loss_function = torch.nn.CrossEntropyLoss(weight=class_weights)
train_loss_list = []
val_loss_list = []
accuracy_list = []
best_val_loss = float('inf')

In [None]:
def calculate_sequence_lengths(padded_sequences):
    non_zero_elements = padded_sequences != 0
    non_zero_elements_reduced = non_zero_elements.any(dim=2)
    lengths = non_zero_elements_reduced.long().sum(dim=1)
    return lengths

In [None]:
for epoch in range(1, num_epochs + 1):
    avg_loss = 0
    counter = 0
    for x, label in train_loader:
        counter += 1
        model.zero_grad()
        lengths = calculate_sequence_lengths(x)
        out = model(x, lengths)

        loss = loss_function(out, label.long())
        loss.backward()

        optimizer.step()
        avg_loss += loss.item()

    average_epoch_loss = avg_loss / counter
    train_loss_list.append(average_epoch_loss)
    print(f"Epoch: {epoch}, Avg Loss: {average_epoch_loss}")

    if epoch % 5 == 0:
        with torch.no_grad():
            total_samples = 0
            validation_loss = 0
            for x, label in val_loader:
                x = x
                label = label.long()

                lengths = calculate_sequence_lengths(x)

                outputs = model(x, lengths)
                
                total_samples += label.size(0)
                validation_loss +=  loss_function(outputs, label).item() * label.size(0)

            average_val_loss = validation_loss / total_samples
            val_loss_list.append(average_val_loss)

            if average_val_loss < best_val_loss:
                best_val_loss = average_val_loss
                torch.save(model.state_dict(), 'model.pth')
                print(f"New best model saved at epoch {epoch} with loss: {best_val_loss}")

        print(f'Average Validation loss: {average_val_loss}')

    # Update the learning rate after each epoch
    scheduler.step(average_epoch_loss)
    print(f'current learning rate: {optimizer.param_groups[0]["lr"]}\n')

In [None]:
model_path = 'model.pth'
if os.path.exists(model_path):
    model.load_state_dict(torch.load(model_path))
    print("Loaded saved model state.")

plt.figure(figsize=(10, 6))
plt.plot(train_loss_list, label='Training Loss')
plt.plot(list(range(5, len(train_loss_list) + 1, 5)), val_loss_list, label='Validation Loss')
plt.title('Training and Validation Losses')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

In [None]:
#compute training loss and accuracy
model.eval()
test_loss = 0
correct = 0
total = 0
all_outputs = []
all_labels = []

with torch.no_grad():
    for articles, labels in train_loader:
        articles = articles
        labels = labels
        outputs = model(articles)
        probabilities = torch.softmax(outputs, dim=1)
        _, predicted = torch.max(outputs.data, 1)

        all_outputs.append(probabilities)
        all_labels.append(labels)

        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        loss = loss_function(outputs, labels)
        test_loss += loss.item()

all_outputs = torch.cat(all_outputs)
all_labels = torch.cat(all_labels)

test_loss /= len(train_loader)
test_accuracy = 100 * correct / total

print(f"Training Loss: {test_loss:.4f}")
print(f"Training Accuracy: {test_accuracy:.2f}%")

# Convert outputs to predicted class indices
predicted_classes = torch.argmax(all_outputs, dim=1)

# Calculate F1 scores
f1_macro = f1_score(all_labels.cpu(), predicted_classes.cpu(), average='macro')
f1_micro = f1_score(all_labels.cpu(), predicted_classes.cpu(), average='micro')

print(f"F1 Score (Macro): {f1_macro:.4f}")
print(f"F1 Score (Micro): {f1_micro:.4f}")

# Calculate AUC for binary classification
# For multi-class, you need to binarize the labels and use one-vs-rest approach
# auc = roc_auc_score(all_labels.cpu(), all_outputs.cpu(), multi_class='ovr')

# Confusion Matrix
cm = confusion_matrix(all_labels.cpu(), predicted_classes.cpu())
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='g')
plt.title('Confusion Matrix')
plt.ylabel('True Label')
plt.xlabel('Predicted Label')
plt.show()

In [None]:
#compute validation loss and accuracy
model.eval()
test_loss = 0
correct = 0
total = 0
all_outputs = []
all_labels = []

with torch.no_grad():
    for articles, labels in val_loader:
        articles = articles
        labels = labels
        outputs = model(articles)
        probabilities = torch.softmax(outputs, dim=1)
        _, predicted = torch.max(outputs.data, 1)

        all_outputs.append(probabilities)
        all_labels.append(labels)

        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        loss = loss_function(outputs, labels)
        test_loss += loss.item()

all_outputs = torch.cat(all_outputs)
all_labels = torch.cat(all_labels)

test_loss /= len(val_loader)
test_accuracy = 100 * correct / total

print(f"Validation Loss: {test_loss:.4f}")
print(f"Validation Accuracy: {test_accuracy:.2f}%")

# Convert outputs to predicted class indices
predicted_classes = torch.argmax(all_outputs, dim=1)

# Calculate F1 scores
f1_macro = f1_score(all_labels.cpu(), predicted_classes.cpu(), average='macro')
f1_micro = f1_score(all_labels.cpu(), predicted_classes.cpu(), average='micro')

print(f"F1 Score (Macro): {f1_macro:.4f}")
print(f"F1 Score (Micro): {f1_micro:.4f}")

# Calculate AUC for binary classification
# For multi-class, you need to binarize the labels and use one-vs-rest approach
# auc = roc_auc_score(all_labels.cpu(), all_outputs.cpu(), multi_class='ovr')

# Confusion Matrix
cm = confusion_matrix(all_labels.cpu(), predicted_classes.cpu())
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='g')
plt.title('Confusion Matrix')
plt.ylabel('True Label')
plt.xlabel('Predicted Label')
plt.show()

In [None]:
#compute test loss and accuracy
model.eval()
test_loss = 0
correct = 0
total = 0
all_outputs = []
all_labels = []

with torch.no_grad():
    for articles, labels in test_loader:
        articles = articles
        labels = labels
        outputs = model(articles)
        probabilities = torch.softmax(outputs, dim=1)
        _, predicted = torch.max(outputs.data, 1)

        all_outputs.append(probabilities)
        all_labels.append(labels)

        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        loss = loss_function(outputs, labels)
        test_loss += loss.item()

all_outputs = torch.cat(all_outputs)
all_labels = torch.cat(all_labels)

test_loss /= len(test_loader)
test_accuracy = 100 * correct / total

print(f"Test Loss: {test_loss:.4f}")
print(f"Test Accuracy: {test_accuracy:.2f}%")

# Convert outputs to predicted class indices
predicted_classes = torch.argmax(all_outputs, dim=1)

# Calculate F1 scores
f1_macro = f1_score(all_labels.cpu(), predicted_classes.cpu(), average='macro')
f1_micro = f1_score(all_labels.cpu(), predicted_classes.cpu(), average='micro')

print(f"F1 Score (Macro): {f1_macro:.4f}")
print(f"F1 Score (Micro): {f1_micro:.4f}")

# Calculate AUC for binary classification
# For multi-class, you need to binarize the labels and use one-vs-rest approach
# auc = roc_auc_score(all_labels.cpu(), all_outputs.cpu(), multi_class='ovr')

# Confusion Matrix
cm = confusion_matrix(all_labels.cpu(), predicted_classes.cpu())
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='g')
plt.title('Confusion Matrix')
plt.ylabel('True Label')
plt.xlabel('Predicted Label')
plt.show()