In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import time

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from torchvision import transforms, models
from torchsummary import summary

from sklearn.metrics import accuracy_score, balanced_accuracy_score
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from sklearn.metrics import classification_report
from sklearn.model_selection import LeaveOneGroupOut

In [2]:
from gilbert2d import gilbert2d

In [3]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Model running on {device}.")

Model running on cuda.


## Data Loading

In [4]:
CLASSES = ["asymetric", "banded", "locked", "butterfly", "no_pattern"]

In [5]:
# Mapping int to categories
int_to_cat = {
    0: "asymetric",
    1: "banded",
    2: "locked",
    3: "butterfly",
    4: "no_pattern",
}

In [6]:
X_train = np.load("./data/X_train.npy")
y_train_df = pd.read_csv("./data/y_train.csv")

In [7]:
X_test = np.load("./data/X_test.npy")
y_test_df = pd.read_csv("./data/y_test.csv")

## Flatten with Peano curve

In [8]:
def flatten_with_peano(image):
    # Initialize empty flattened image array
    flattened_image = np.zeros(np.prod(image.shape))
    peano_points = gilbert2d(image.shape[0], image.shape[1])

    flattened_image_index = 0
    for row, col in peano_points:
        flattened_image[flattened_image_index] = image[row, col]
        flattened_image_index += 1

    return flattened_image

In [9]:
flattened_img_train = [flatten_with_peano(img) for img in X_train]
flattened_img_test = [flatten_with_peano(img) for img in X_test]

In [10]:
from imblearn.over_sampling import SMOTE
sm = SMOTE(k_neighbors=10, n_jobs=-1)

X_train, y_train = sm.fit_resample(flattened_img_train, y_train_df['cat_num'].values)



In [11]:
# Convert to PyTorch tensors
X_train = torch.tensor(np.array(X_train), dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.long)

X_test = torch.tensor(np.array(flattened_img_test), dtype=torch.float32)
y_test = torch.tensor(y_test_df['cat_num'].values, dtype=torch.long)

In [12]:
X_train, y_train = X_train.to(device), y_train.to(device)
X_test, y_test = X_test.to(device), y_test.to(device)

OutOfMemoryError: CUDA out of memory. Tried to allocate 104.00 MiB. GPU 0 has a total capacty of 14.54 GiB of which 41.44 MiB is free. Process 1991428 has 1.63 GiB memory in use. Process 3167848 has 11.08 GiB memory in use. Process 1211536 has 1.71 GiB memory in use. Process 2725564 has 52.00 MiB memory in use. Of the allocated memory 0 bytes is allocated by PyTorch, and 0 bytes is reserved by PyTorch but unallocated. If reserved but unallocated memory is large try setting max_split_size_mb to avoid fragmentation.  See documentation for Memory Management and PYTORCH_CUDA_ALLOC_CONF

In [None]:
print(f"X_train shape -> {X_train.shape}, y_train shape -> {y_train.shape}")
print(f"X_test shape -> {X_test.shape}, y_test shape -> {y_test.shape}")

# Train model

## Train functions

In [None]:
batch_size = 32

train_dataset = TensorDataset(X_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_dataset = TensorDataset(X_test, y_test)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)

In [None]:
import torch.nn.functional as F

class ImageLSTM(nn.Module):
    def __init__(self, input_size, lstm_outsize, num_layers, hidden_n, num_classes, bidirectional=True):
        super(ImageLSTM, self).__init__()
        self.lstm = nn.LSTM(input_size, lstm_outsize, num_layers, batch_first=True, bidirectional=bidirectional)
        self.fc1 = nn.Linear(lstm_outsize * num_layers, hidden_n)
        self.fc_out = nn.Linear(hidden_n, num_classes)

    def forward(self, x):
        lstm_out, _ = self.lstm(x)
        x = F.relu(self.fc1(lstm_out))
        out = self.fc_out(x)
        return x

In [None]:
class ImageLSTMWithAttention(nn.Module):
    def __init__(self, input_size, lstm1, lstm2, num_layers, hidden_n, num_classes, bidirectional=True, dropout_rate=0.5):
        super(ImageLSTMWithAttention, self).__init__()
        self.lstm1 = nn.LSTM(input_size, lstm1, num_layers, batch_first=True, bidirectional=bidirectional)
        self.lstm2 = nn.LSTM(num_layers * lstm1, lstm2, num_layers, batch_first=True, bidirectional=bidirectional)
        self.dropout_lstm = nn.Dropout(dropout_rate)
        self.fc1 = nn.Linear(lstm2 * num_layers, hidden_n)
        self.dropout_fc1 = nn.Dropout(dropout_rate)
        self.fc_out = nn.Linear(hidden_n, num_classes)
        self.attention_fc = nn.Linear(lstm2 * num_layers, 1)

    def attention(self, lstm_out):
        # Apply attention mechanism
        energy = self.attention_fc(lstm_out)
        # Squeeze only if the last dimension is 1
        if energy.dim() == 2 and energy.size(1) == 1:
            energy = energy.squeeze(-1)
        weights = F.softmax(energy, dim=1).unsqueeze(-1)
        attention_output = torch.sum(weights * lstm_out, dim=1)
        return attention_output

    def forward(self, x):
        lstm_out1, _ = self.lstm1(x)
        lstm_out2, _ = self.lstm2(lstm_out1)

        # Apply attention to the output of the second LSTM layer
        #attention_output = self.attention(lstm_out2)

        # Concatenate the attention output with the output of the second LSTM layer
        #combined_output = torch.cat([lstm_out2[:, -1, :], attention_output], dim=1)

        combined_output = self.dropout_lstm(lstm_out2)
        fc1_out = F.relu(self.fc1(combined_output))
        fc1_out = self.dropout_fc1(fc1_out)
        out = self.fc_out(fc1_out)
        return out

In [None]:
class SelfAttention(nn.Module):
    def __init__(self, hidden_size, num_heads):
        super(SelfAttention, self).__init__()
        self.hidden_size = hidden_size
        self.attention = nn.MultiheadAttention(embed_dim=hidden_size, num_heads=num_heads)

    def forward(self, x):
        # Assuming x is a 1D tensor with shape (sequence_length * batch_size * hidden_size)

        # Reshape to (sequence_length, batch_size, hidden_size)
        cur_batch_size = x.shape[0]
        x = x.view(-1, cur_batch_size, self.hidden_size)

        # Apply self-attention
        attn_output, _ = self.attention(x, x, x)

        # Reshape back to (sequence_length * batch_size * hidden_size)
        attn_output = attn_output.view(-1, self.hidden_size)
        return attn_output

class ImageLSTMWithAttention(nn.Module):
    def __init__(self, input_size, lstm1, lstm2, num_layers, hidden_n, num_classes, num_attention_heads,
                 bidirectional=True, dropout_rate=0.5, add_attention=True):
        super(ImageLSTMWithAttention, self).__init__()
        self.add_attention = add_attention

        self.lstm1 = nn.LSTM(input_size, lstm1, num_layers, batch_first=True, bidirectional=bidirectional)
        self.lstm2 = nn.LSTM(num_layers * lstm1, lstm2, num_layers, batch_first=True, bidirectional=bidirectional)
        self.self_attention = SelfAttention(lstm2 * num_layers, num_attention_heads)
        self.dropout_lstm = nn.Dropout(dropout_rate)
        self.fc1 = nn.Linear(lstm2 * num_layers * (2 if self.add_attention else 1), hidden_n)
        self.dropout_fc1 = nn.Dropout(dropout_rate)
        self.fc_out = nn.Linear(hidden_n, num_classes)

    def forward(self, x):
        lstm_out1, _ = self.lstm1(x)
        lstm_out1 = self.dropout_lstm(lstm_out1)
        lstm_out2, _ = self.lstm2(lstm_out1)

        if self.add_attention:
            attention_output = self.self_attention(lstm_out2)
            lstm_out2 = torch.cat([lstm_out2, attention_output], dim=-1)
            lstm_out2 = self.dropout_lstm(lstm_out2)

        fc1_out = F.relu(self.fc1(lstm_out2))
        fc1_out = self.dropout_fc1(fc1_out)
        out = self.fc_out(fc1_out)
        return out

In [None]:
def train_epoch(model, train_loader, criterion, optimizer):
    model.train()
    epoch_train_loss = 0.0
    all_predictions = []
    all_labels = []

    for images, labels in train_loader:
        #images, labels = images.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        epoch_train_loss += loss.item()

        # Collect predictions and labels for balanced accuracy calculation
        _, predictions = torch.max(outputs.data, 1)
        all_predictions.extend(predictions.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

    epoch_train_loss /= len(train_loader)

    # Calculate balanced accuracy on the training set
    train_balanced_acc = balanced_accuracy_score(all_labels, all_predictions)
    train_acc = accuracy_score(all_labels, all_predictions)

    return epoch_train_loss, train_acc, train_balanced_acc

In [None]:
def validate(model, val_loader, criterion):
    model.eval()
    epoch_val_loss = 0.0
    all_predictions = []
    all_labels = []

    with torch.no_grad():
        for images, labels in val_loader:
            outputs = model(images)
            loss = criterion(outputs, labels)
            epoch_val_loss += loss.item()

            _, predictions = torch.max(outputs.data, 1)
            all_predictions.extend(predictions.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    epoch_val_loss /= len(val_loader)
    balanced_acc = balanced_accuracy_score(all_labels, all_predictions)
    acc = accuracy_score(all_labels, all_predictions)
    return epoch_val_loss, acc, balanced_acc

In [None]:
def train_with_early_stopping(model, train_loader, val_loader, criterion, optimizer, num_epochs, plot_every=None, patience=np.inf):
    train_losses = []
    val_losses = []
    train_accuracies = []
    val_accuracies = []
    train_balanced_accuracies = []
    val_balanced_accuracies = []
    best_val_loss = float('inf')
    counter = 0

    def print_metrics(epoch):
      print(f"Epoch [{epoch+1}/{num_epochs}] - "
            f"Train Loss: {epoch_train_loss:.4f}, "
            f"Validation Loss: {epoch_val_loss:.4f}, "
            f"Train Accuracy: {train_accuracy:.4f}, "
            f"Valid Accuracy: {val_accuracy:.4f}, "
            f"Train Balanced Accuracy: {train_balanced_acc:.4f}, "
            f"Validation Balanced Accuracy: {val_balanced_acc:.4f}, "
            f"Time per epoch: {round(stop-start, 2)}")

    if plot_every is None:
      plot_every = int(num_epochs/10)

    for epoch in range(num_epochs):
        # Training
        start = time.time()
        epoch_train_loss, train_accuracy, train_balanced_acc = train_epoch(model, train_loader, criterion, optimizer)
        train_losses.append(epoch_train_loss)
        train_accuracies.append(train_accuracy)
        train_balanced_accuracies.append(train_balanced_acc)

        # Validation
        epoch_val_loss, val_accuracy, val_balanced_acc = validate(model, val_loader, criterion)
        val_losses.append(epoch_val_loss)
        val_accuracies.append(val_accuracy)
        val_balanced_accuracies.append(val_balanced_acc)
        stop = time.time()

        if epoch % plot_every == 0 or epoch == num_epochs-1:
          print_metrics(epoch)

        # Patience mechanism
        if epoch_val_loss < best_val_loss:
            best_val_loss = epoch_val_loss
            counter = 0
        else:
            counter += 1
            if counter >= patience:
                print_metrics(epoch)
                break

    return train_losses, val_losses, train_accuracies, val_accuracies, train_balanced_accuracies, val_balanced_accuracies

## Training

In [None]:
from sklearn.utils.class_weight import compute_class_weight

class_weights = compute_class_weight('balanced', classes=np.unique(y_train.cpu()), y=y_train.cpu().numpy())
class_weights = torch.tensor(class_weights, dtype=torch.float32).to(device)

In [None]:
input_size = X_train.shape[1]
lstm1 = 2048
lstm2 = 2048
num_layers = 2
hidden_n = 1024
bidirectional = True
num_classes = 5
num_attention_heads = 16
num_epochs = 100
learning_rate = 1e-4
patience = int(num_epochs/10)+np.inf

add_attention = False

#model = ImageLSTM(input_size, lstm1, num_layers, hidden_n, num_classes, bidirectional=bidirectional)
model = ImageLSTMWithAttention(input_size, lstm1, lstm2, num_layers, hidden_n, num_classes, num_attention_heads, bidirectional=True, dropout_rate=0.5, add_attention=add_attention)
model = model.to(device)
criterion = nn.CrossEntropyLoss(weight=class_weights)
#criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay=0)

In [None]:
train_losses, val_losses, train_accuracies, val_accuracies, train_balanced_accuracies, val_balanced_accuracies = train_with_early_stopping(
    model, train_loader, test_loader, criterion, optimizer, num_epochs, plot_every=2, patience=patience
)

In [None]:
fig, axs = plt.subplots(2, 3, figsize=(16, 8))

axs[0][0].plot(train_losses, label='Train loss')
axs[1][0].plot(val_losses, label='Test loss')

axs[0][1].plot(train_balanced_accuracies, label='Train Balanced Acc')
axs[1][1].plot(val_balanced_accuracies, label='Test Balanced Acc')

axs[0][2].plot(train_accuracies, label='Train Accuracy')
axs[1][2].plot(val_accuracies, label='Test Accuracy')

for ax in axs:
  for sub_ax in ax:
    sub_ax.legend()
    sub_ax.grid('on')

plt.tight_layout()
plt.show()

In [None]:
def test_model(model, test_loader):
    model.eval()
    test_predictions = []
    test_labels = []

    with torch.no_grad():
        for images, labels in test_loader:
            outputs = model(images)
            _, predictions = torch.max(outputs.data, 1)
            test_predictions.extend(predictions.cpu().numpy())
            test_labels.extend(labels.cpu().numpy())

    test_balanced_acc = balanced_accuracy_score(test_labels, test_predictions)
    print(f"Test Balanced Accuracy: {test_balanced_acc:.4f}")
    return test_balanced_acc

In [None]:
# Test the model
test_balanced_acc = test_model(model, test_loader)