In [None]:
!pip install deeplake[enterprise]

In [None]:
import copy
import deeplake
import os
import matplotlib.pyplot as plt
import numpy as np
import random
import time
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torchvision import transforms, models

In [None]:
token = ""

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
train_ds = deeplake.load('hub://um_project/art-train', token=token, read_only=True)
dev_ds = deeplake.load('hub://um_project/art-dev', token=token, read_only=True)
val_ds = deeplake.load('hub://um_project/art-val', token=token, read_only=True)
test_ds = deeplake.load('hub://um_project/art-test', token=token, read_only=True)
shap_ds = deeplake.load('hub://um_project/art-shap', token=token, read_only=True)

In [None]:
print(f'Size of train dataset: {len(train_ds)}')
print(f'Size of dev dataset: {len(dev_ds)}')
print(f'Size of validation dataset: {len(val_ds)}')
print(f'Size of test dataset: {len(test_ds)}')
print(f'Size of shap dataset: {len(shap_ds)}')

In [None]:
classes_labels = train_ds.labels.info.class_names
num_classes = len(classes_labels)
print(f'Number of classes: {num_classes}')
for i, label in enumerate(classes_labels):
  print(f'{i}. {label}')

In [None]:
def plot_class_distribution(ax, class_counts, class_labels, dataset_name):
    ax.bar(np.arange(len(class_labels)), class_counts, tick_label=class_labels)
    ax.set_xlabel('Class', weight='bold')
    ax.set_xticklabels(class_labels, rotation='vertical')
    ax.set_ylabel('Number of Instances', weight='bold')
    ax.set_title(f'Frequency per Class ({dataset_name})', weight='bold')

class_train_count = np.bincount(np.concatenate(train_ds.labels.numpy(aslist = True), axis=0))
class_dev_count = np.bincount(np.concatenate(dev_ds.labels.numpy(aslist = True), axis=0))
class_val_count = np.bincount(np.concatenate(val_ds.labels.numpy(aslist = True), axis=0))
class_test_count = np.bincount(np.concatenate(test_ds.labels.numpy(aslist = True), axis=0))

fig, axs = plt.subplots(2, 2, figsize=(12, 12), constrained_layout=True)

plot_class_distribution(axs[0, 0], class_train_count, classes_labels, "Train")
plot_class_distribution(axs[0, 1], class_dev_count, classes_labels, "Dev")
plot_class_distribution(axs[1, 0], class_val_count, classes_labels, "Val")
plot_class_distribution(axs[1, 1], class_test_count, classes_labels, "Test")

plt.show()

In [None]:
# Initialize a dictionary to store images for each class
class_images = {label: [] for label in classes_labels}

# Iterate through the dataset and store 7 images for each class
for i, sample in enumerate(train_ds):
    label = sample['labels'].data()['text'][0]  # Access the first element of the list
    if len(class_images[label]) < 7:
        image_array = sample['images'].data()['value']
        class_images[label].append(image_array)

    # Stop iterating if we already have 7 images for each class
    if all(len(images) == 7 for images in class_images.values()):
        break


# Create a grid of subplots and display the images
num_classes = len(classes_labels)
num_images_per_class = 7
fig = plt.figure(figsize=(28, 56))

for i, class_label in enumerate(classes_labels):
    for j, image_array in enumerate(class_images[class_label]):
        ax = fig.add_subplot(num_classes, num_images_per_class, i * num_images_per_class + j + 1)
        ax.imshow(image_array)
        ax.axis('off')

        # Set the class name above the first image in each row
        if j == int(num_images_per_class / 2):
            ax.set_title(class_label, fontsize=32, ha='center', va='center', weight='bold')

plt.tight_layout(pad=3.0)
plt.show()

In [None]:
image_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), # ImageNet statistics
])

def one_hot_encode(label, num_classes):
    one_hot = torch.zeros(num_classes)
    one_hot[label] = 1
    return one_hot

batch_size = 64
num_workers = 0

In [None]:
def create_data_loader(dataset, batch_size, num_classes, image_transform, shuffle=True, num_workers=0):
    return dataset.pytorch(
        num_workers=num_workers,
        batch_size=batch_size,
        transform={
            'images': image_transform,
            'labels': lambda label: one_hot_encode(label, num_classes),
        },
        shuffle=shuffle,
        decode_method={'images': 'pil'}
    )

# Create data loaders for different datasets
train_loader = create_data_loader(train_ds, batch_size, num_classes, image_transform)
dev_loader = create_data_loader(dev_ds, batch_size, num_classes, image_transform)
val_loader = create_data_loader(val_ds, batch_size, num_classes, image_transform)
test_loader = create_data_loader(test_ds, batch_size, num_classes, image_transform)
shap_loader = create_data_loader(shap_ds, 13, num_classes, image_transform)

In [None]:
def save_model(model, optimizer, epoch, save_path, model_name):
  # Create the save directory if it doesn't exist
  if not os.path.exists(save_path):
    os.makedirs(save_path)

  # Create the full path for the saved model
  model_file = os.path.join(save_path, f"{model_name}_epoch_{epoch}.pth")

  # Save the model and optimizer state_dicts
  torch.save({
    'epoch': epoch,
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
  }, model_file)

  print(f"Model saved: {model_file}")

In [None]:
def load_model(model, optimizer, load_path, device):
  # Load the saved model and optimizer state_dicts
  checkpoint = torch.load(load_path)

  # Load the model and optimizer state_dicts into the model and optimizer objects
  model.load_state_dict(checkpoint['model_state_dict'])
  optimizer.load_state_dict(checkpoint['optimizer_state_dict'])

  # Move the model to the appropriate device (GPU or CPU)
  model.to(device)

  # Set the starting epoch for the model
  start_epoch = checkpoint['epoch']

  print(f"Model loaded: {load_path}, starting from epoch {start_epoch}")

# Usage example:
#load_path = "/content/drive/MyDrive/SSN_Projekt/Saved_Models/MultiLabelCNN_epoch_1.pth"
#load_model(model, optimizer, load_path, device)

In [None]:
def train_validate(model, train_loader, val_loader, criterion, optimizer, device, num_epochs, patience):

    save_path = "/content/drive/MyDrive/UM_Projekt/Saved_Models"
    model_name = "Net"

    best_val_accuracy = 0.0
    best_model = None
    counter = 0

    for epoch in range(num_epochs):
        # Training
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0

        for i, data in enumerate(train_loader, 0):
            inputs, labels = data['images'].to(device), data['labels'].to(device)

            # Convert one-hot encoded labels to class indices
            labels = torch.argmax(labels, dim=1)

            optimizer.zero_grad()

            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        train_epoch_loss = running_loss / (i + 1)
        train_epoch_accuracy = correct / total * 100

        # Validation
        model.eval()
        running_loss = 0.0
        correct = 0
        total = 0

        with torch.no_grad():
            for i, data in enumerate(val_loader, 0):
                inputs, labels = data['images'].to(device), data['labels'].to(device)

                # Convert one-hot encoded labels to class indices
                labels = torch.argmax(labels, dim=1)

                outputs = model(inputs)
                loss = criterion(outputs, labels)

                running_loss += loss.item()
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

        val_epoch_loss = running_loss / (i + 1)
        val_epoch_accuracy = correct / total * 100

        print(f"Epoch {epoch + 1}/{num_epochs}, Training Loss: {train_epoch_loss:.4f}, Training Accuracy: {train_epoch_accuracy:.2f}%, Validation Loss: {val_epoch_loss:.4f}, Validation Accuracy: {val_epoch_accuracy:.2f}%")

        # Save the model after each epoch
        save_model(model, optimizer, epoch + 1, save_path, model_name)

        # Save the best model and implement early stopping
        if val_epoch_accuracy > best_val_accuracy:
            best_val_accuracy = val_epoch_accuracy
            best_model = copy.deepcopy(model.state_dict())
            counter = 0

            save_model(model, optimizer, epoch + 1, save_path, f"{model_name}_Best")
        else:
            counter += 1
            if counter >= patience:
                print(f"Early stopping at epoch {epoch + 1}. Best Validation Accuracy: {best_val_accuracy:.2f}%")
                break

    # Load the best model
    model.load_state_dict(best_model)
    return model

In [None]:
def test_model(model, test_loader, device):
    model.eval()
    correct = 0
    total = 0

    with torch.no_grad():
        for i, data in enumerate(test_loader, 0):
            inputs, labels = data['images'].to(device), data['labels'].to(device)

            # Convert one-hot encoded labels to class indices
            labels = torch.argmax(labels, dim=1)

            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    test_accuracy = correct / total * 100
    print(f"Test Accuracy: {test_accuracy:.2f}%")

    return test_accuracy

In [None]:
num_classes = 13
flatten_out = 325
# blocks for creating larger network
class SubNet(nn.Module):
  def __init__(self, in_channels, out_channels, kernel_size, stride, padding, pool_kernel, pool_stride, dropout_rate):
    super(SubNet, self).__init__()
    self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=kernel_size, stride=stride, padding=padding)
    self.bn1 = nn.BatchNorm2d(out_channels)
    self.relu1 = nn.LeakyReLU(inplace=True)
    self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=kernel_size, stride=stride, padding=padding)
    self.bn2 = nn.BatchNorm2d(out_channels)
    self.relu2 = nn.LeakyReLU(inplace=True)
    self.max_pool = nn.MaxPool2d(kernel_size=pool_kernel, stride=pool_stride)
    self.drop = nn.Dropout(dropout_rate)

  def forward(self, x):
      x = self.conv1(x)
      x = self.bn1(x)
      x = self.relu1(x)
      x = self.conv2(x)
      x = self.bn2(x)
      x = self.relu2(x)
      x = self.max_pool(x)
      x = self.drop(x)
      return x

# neural network created out of blocks
class Net(nn.Module):
  def __init__(self):
    super(Net, self).__init__()
    self.block0 = SubNet(3, 16, 3, 1, 1, 2, 2, 0.35)
    self.block1 = SubNet(16, 32, 3, 1, 1, 2, 2, 0.35)
    self.block2 = SubNet(32, 64, 3, 1, 1, 2, 2, 0.35)
    self.block3 = SubNet(64, 128, 3, 1, 1, 2, 2, 0.35)
    self.block4 = SubNet(128, 256, 3, 1, 1, 2, 2, 0.35)
    self.block5 = SubNet(256, 512, 3, 1, 1, 2, 2, 0.35)
    self.block6 = SubNet(512, 512, 3, 1, 1, 2, 2, 0.35)
    self.avg_pool = nn.AdaptiveAvgPool2d((1,1))
    self.fc = nn.Linear(512, 13)

  def forward(self, x):
    x = self.block0(x)
    x = self.block1(x)
    x = self.block2(x)
    x = self.block3(x)
    x = self.block4(x)
    x = self.block5(x)
    x = self.block6(x)
    x = self.avg_pool(x)
    x = x.view(x.shape[0], -1) # flatten the tensor
    x = self.fc(x)
    return x

# Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Model is curretly running on: {device}")
model = Net().to(device=device)

# Set the loss function and optimizer
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [None]:
# Set the number of epochs and patience
num_epochs = 6
patience = 5

# Train and validate the model with early stopping
# best_model = train_validate(model, train_loader, val_loader, criterion, optimizer, device, num_epochs, patience)

In [None]:
save_path = "/content/drive/MyDrive/UM_Projekt/Saved_Models"
model_name = "Net"
best_epoch = 16

best_model_path = os.path.join(save_path, f"{model_name}_Best_epoch_{best_epoch}.pth")
load_model(model, optimizer, best_model_path, device)

test_accuracy = test_model(model, test_loader, device)

In [None]:
from sklearn.metrics import confusion_matrix
import seaborn as sn
import pandas as pd

y_pred = []
y_true = []

# iterate over test data
for inputs, labels in test_loader:
        # Feed Network
        output = model(inputs.to(device))
        output = (torch.max(torch.exp(output), 1)[1]).data.cpu().numpy()
        # Save Prediction
        y_pred.extend(output)

        #reverse one-hot encoding
        labels = torch.argmax(labels, dim=1)
        #make labels a numpy array
        labels = labels.data.cpu().numpy()
        # Save Truth
        y_true.extend(labels)

# Build confusion matrix
cf_matrix = confusion_matrix(y_true, y_pred)
df_cm = pd.DataFrame(cf_matrix / np.sum(cf_matrix, axis=1)[:, None], index = [i for i in classes_labels],
                     columns = [i for i in classes_labels])
plt.figure(figsize = (12,7))
sn.heatmap(df_cm, annot=True)
plt.show()

In [None]:
!pip install shap

In [None]:
import shap

# get images from shap dataset
batch = next(iter(shap_loader))
images, images_labels = batch
shap_true_labels = []

#save label assigned to every image
for i in range(13):
  decoded_class_number = int(np.argmax(images_labels[i]))
  shap_true_labels.append(shap_ds.labels.info.class_names[decoded_class_number])
# create array 13x13 with all labels copied
all_labels_cloned = np.repeat(np.array(shap_ds.labels.info.class_names).reshape(1,13), 13, axis=0)

# create background
batch_background = next(iter(test_loader))
images_background, _ = batch_background

In [None]:
# sorting images and labels for Confusion Matrix style shap analysis
# temp variables for access
unsortedLabelsArray = np.array(shap_true_labels)
sortedLabelsArray = np.array(shap_ds.labels.info.class_names)
temp_images = []
temp_labels = []
# for each image / lable
for i in range(len(images)):
  # get index in unsorted images and labels arrays
  index = np.where(unsortedLabelsArray == str(sortedLabelsArray[i]))[0][0]
  # append to sorted temp variables
  temp_images.append(images[index])
  temp_labels.append(unsortedLabelsArray[index])
# assign temp to initial tensors / arrays
images = torch.stack(temp_images)
shap_true_labels = temp_labels

In [None]:
# get 50 images from backgrond and init DeepExplainer
background = images_background[:50].to(device)
e = shap.DeepExplainer(model, background)

#get 13 images and use them for explanation
shap_images = images[:13].to(device)
shap_values = e.shap_values(shap_images)

#convert explained images and original images to numpy
shap_numpy = [np.swapaxes(np.swapaxes(s, 1, -1), 1, 2) for s in shap_values]
test_numpy = np.swapaxes(np.swapaxes(shap_images.cpu().numpy(), 1, -1), 1, 2)

#normalize
def normalize_data(data):
    return (data - np.min(data)) / (np.max(data) - np.min(data))
test_numpy = normalize_data(test_numpy)

#plot grid with all images and their explanations
shap.image_plot(shap_numpy, test_numpy, true_labels=shap_true_labels, labels=all_labels_cloned, labelpad=0.9, width=25)

In [None]:
# get images and labels from shap dataset and make preditions on them
model.eval()
with torch.no_grad():
    for i, data in enumerate(shap_loader, 0):
        inputs, labels = data['images'].to(device), data['labels'].to(device)

        # Convert one-hot encoded labels to class indices
        labels = torch.argmax(labels, dim=1)

        # predict label using the model
        outputs = model(inputs)
        _, predicted = torch.max(outputs.data, 1)

#print original labels and prediction results
print("{: >20} {: >20}".format("Actual:", "Predicted:"))
for i in range(13):
  print("{: >20} {: >20}".format(shap_ds.labels.info.class_names[labels[i]], shap_ds.labels.info.class_names[predicted[i]]))
