# IMPORTING LIBRARIES & DATA

In [41]:
import shutil
from pathlib import Path
import random
from torchvision import models, transforms
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader
import torch.optim as optim
import torch.nn as nn
import torch
import numpy as np
from sklearn.metrics import confusion_matrix, accuracy_score
import matplotlib.pyplot as plt
import seaborn as sns
from torchvision import datasets, transforms
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

- ImageFolder is a dataset class provided by torchvision.datasets that helps load images organized in folders automatically.
- DataLoader is a PyTorch utility that loads data from a dataset and provides batches of samples during training or evaluation.

In [42]:

dataset = Path("./HG14")
output = Path("./HG14_split")

# **SPLITTING DATA**

10% random images from 14000 images were selected
from each class, a total of 1400 images were reserved for
testing.

Then, 20% of the remaining 12600 images (2520
images) were randomly divided for validation. 

The remaining
10080 images were used for the train process

In [None]:

# fixing the seed will give us the same random data each time (praticale for comparing)
random.seed(42)

In [44]:

for class_folder in sorted(dataset.iterdir()):
    if class_folder.is_dir():
        images = list(class_folder.glob("*.jpg"))
        random.shuffle(images)
        
        total = len(images)
        test_count = int(0.10 * total) # 10% pour testing 
        val_count = int(0.20 * (total - test_count)) # 20% pour validation 

        test_imgs = images[:test_count]
        val_imgs = images[test_count:test_count + val_count]
        train_imgs = images[test_count + val_count:]

        split_dict = {
            "training": train_imgs,
            "validation": val_imgs,
            "testing": test_imgs,
        }

        # creating folders here 
        for split_name, split_images in split_dict.items():
            output_dir = output / split_name / class_folder.name
            output_dir.mkdir(parents=True, exist_ok=True)

            for img_path in split_images:
                shutil.copy(img_path, output_dir / img_path.name)

# PREPROCESSING DATA

Global Variables : 
- Resizing images to 128x128. (all images in the batch need to be the same size to stack them into a tensor)
- Splitting into batch sizes of 20, we don’t feed the entire dataset at once, instead, we divide it into smaller groups of 20 samples that the model processes one batch at a time.
- We chose 50 epochs, which means the model will see the entire dataset 50 times during training.
- num_classes = 14, it means the model will output 14 scores, each representing how likely the input belongs to each of those 14 categories.

In [45]:
img_size = (128, 128)
batch_size = 20
epochs = 50
num_classes = 14

In [46]:
transform = transforms.Compose([
    transforms.Resize(img_size),
    transforms.ToTensor(),
    transforms.Normalize(
        # those are standard normalization values 
        mean=[0.485, 0.456, 0.406],  
        std=[0.229, 0.224, 0.225]
    )
])


Loading data sets and applying transformation

In [None]:
train_dataset = ImageFolder(root='HG14_split/training', transform=transform)
val_dataset   = ImageFolder(root='HG14_split/validation', transform=transform)
test_dataset  = ImageFolder(root='HG14_split/testing', transform=transform)


FileNotFoundError: [WinError 3] The system cannot find the path specified: 'HG14_split/train'

Creating data loaders (splitting dataset into small batches)

In [None]:
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader   = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader  = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Loading Models
We're using transfer learning, it means we don’t start training from scratch, instead, we start with a model that already knows useful image features.

In [None]:
vgg16 = models.vgg16(weights=models.VGG16_Weights.IMAGENET1K_V1)
vgg19 = models.vgg19(weights=models.VGG19_Weights.IMAGENET1K_V1)
mobilenet_v2 = models.mobilenet_v2(weights=models.MobileNet_V2_Weights.IMAGENET1K_V1)

0.0%

Downloading: "https://download.pytorch.org/models/vgg16-397923af.pth" to C:\Users\Bleu/.cache\torch\hub\checkpoints\vgg16-397923af.pth


100.0%
0.1%

Downloading: "https://download.pytorch.org/models/vgg19-dcbb9e9d.pth" to C:\Users\Bleu/.cache\torch\hub\checkpoints\vgg19-dcbb9e9d.pth


100.0%
4.6%

Downloading: "https://download.pytorch.org/models/mobilenet_v2-b0353104.pth" to C:\Users\Bleu/.cache\torch\hub\checkpoints\mobilenet_v2-b0353104.pth


100.0%


We can see here for example the structure of the vgg16 model,
it has a part of feature extraction (convolutional layers)
and a part of classifier (fully connected layers)


the classifier takes the features learned by the convolutional layers, Combine them And output a prediction.
This is where we're gonna operate.

In [None]:
print(vgg16)

VGG(
  (features): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU(inplace=True)
    (2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): ReLU(inplace=True)
    (4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (5): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (6): ReLU(inplace=True)
    (7): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (8): ReLU(inplace=True)
    (9): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (10): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (11): ReLU(inplace=True)
    (12): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (13): ReLU(inplace=True)
    (14): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (15): ReLU(inplace=True)
    (16): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1

Modifying the last layer of the classifier for transfer learning

In [None]:
# VGG16 has a 7 layer classifier, counting from 0 to 6, so we take the last layer inputs (in-features) and the output features are the number of classes 14

vgg16.classifier[6] = nn.Linear(in_features=4096, out_features=num_classes)
vgg19.classifier[6] = nn.Linear(in_features=4096, out_features=num_classes)
mobilenet_v2.classifier[1] = nn.Linear(in_features=1280, out_features=num_classes)

We freeze the feature extracting layer (the models already know how to extract features so no need to do it again)

The attribute requires_grad is a flag that tells PyTorch if True: This parameter will be updated during training because PyTorch will calculate its gradients.
If False: This parameter will not be updated, it’s 'frozen"

In [None]:
def freeze_features(model):
    for param in model.features.parameters():
        param.requires_grad = False

freeze_features(vgg16)
freeze_features(vgg19)
freeze_features(mobilenet_v2)

According to the article: 

We're gonna modify our classifier like this: 

We Use a dropout of 0.5% (0.005) meanings we randomly "drops" (disables) 0.5% of neurons during training to avoid overfitting.
We Reduce neuron to 14
We Use ReLU and Softmax as activations



In [None]:
vgg16.classifier = nn.Sequential(
    nn.Linear(25088, 512),   
    nn.ReLU(),
    nn.Dropout(0.005),
    nn.Linear(512, num_classes), 
    nn.Softmax(dim=1)
)

vgg19.classifier = nn.Sequential(
    nn.Linear(25088, 512),
    nn.ReLU(),
    nn.Dropout(0.005),
    nn.Linear(512, num_classes),
    nn.Softmax(dim=1)
)

mobilenet_v2.classifier = nn.Sequential(
    nn.Linear(1280, 512),
    nn.ReLU(),
    nn.Dropout(0.005),
    nn.Linear(512, num_classes),
    nn.Softmax(dim=1)
)


# Training 

In [None]:

# training our models either in or gpu or cpu if there's no gpu (i have a gpu) 
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:

def train_one_epoch(model, optimizer, criterion, dataloader):
    model.train()

    # initializing variables to keep track of loss, correct predciitions, and total of examples seen so far
    total_loss, correct, total = 0, 0, 0
    for images, labels in dataloader:
        images, labels = images.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        _, predicted = outputs.max(1)
        correct += predicted.eq(labels).sum().item()
        total += labels.size(0)
    return total_loss / len(dataloader), 100. * correct / total


In [None]:
def validate(model, dataloader):
    model.eval()
    correct, total = 0, 0
    with torch.no_grad():
        for images, labels in dataloader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = outputs.max(1)
            correct += predicted.eq(labels).sum().item()
            total += labels.size(0)
    return 100. * correct / total


In [None]:
def train_model(model, train_loader, val_loader, num_epochs=50, lr=0.001):
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.classifier.parameters(), lr=lr)

    for epoch in range(num_epochs):
        train_loss, train_acc = train_one_epoch(model, optimizer, criterion, train_loader)
        val_acc = validate(model, val_loader)
        print(f"Epoch {epoch+1}/{num_epochs} - Loss: {train_loss:.4f} - Train Acc: {train_acc:.2f}% - Val Acc: {val_acc:.2f}%")

    return model

Once we trained our models, we save them !

In [None]:
def save_model(model, path):
    torch.save(model.state_dict(), path)

In [None]:
 
print("Training VGG16")
vgg16 = train_model(vgg16, train_loader, val_loader, epochs)
save_model(vgg16, 'vgg16_hg14.pth')

print("Training VGG19")
vgg19 = train_model(vgg19, train_loader, val_loader, epochs)
save_model(vgg19, 'vgg19_hg14.pth')

print("Training MobileNetV2")
mobilenet_v2 = train_model(mobilenet_v2, train_loader, val_loader, epochs)
save_model(mobilenet_v2, 'mobilenet_v2_hg14.pth')

# Dirichlet Ensemble Learning

When we have multiple models trained on the same task, instead of just picking one or averaging their outputs evenly, we can combine their predictions using weights that come from a Dirichlet distribution.

So now, time for testing and combining the three models ! 


In [None]:
def load_model(model, path):
    model.load_state_dict(torch.load(path, map_location=device))
    model.to(device)
    model.eval()
    return model

def test_model(model, test_loader):
    model.eval()
    correct, total = 0, 0
    all_preds, all_labels = [], []

    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = outputs.max(1)

            correct += predicted.eq(labels).sum().item()
            total += labels.size(0)

            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    accuracy = 100. * correct / total
    return accuracy, all_preds, all_labels


In [None]:
# Load trained models
vgg16 = load_model(vgg16, 'vgg16_hg14.pth')
vgg19 = load_model(vgg19, 'vgg19_hg14.pth')
mobilenet_v2 = load_model(mobilenet_v2, 'mobilenet_v2_hg14.pth')


Testing models individually is used to compute performances of the different models (we won't do it because we already know how the models perfom in the article)

In [None]:

# Test each model
# acc_vgg16, preds_vgg16, labels = test_model(vgg16, test_loader)
# acc_vgg19, preds_vgg19, _ = test_model(vgg19, test_loader)
# acc_mobilenet, preds_mobilenet, _ = test_model(mobilenet_v2, test_loader)

# print(f"Test Accuracy - VGG16: {acc_vgg16:.2f}%")
# print(f"Test Accuracy - VGG19: {acc_vgg19:.2f}%")
# print(f"Test Accuracy - MobileNetV2: {acc_mobilenet:.2f}%")


We can  visualise the confusion matrix, A table used to evaluate the performance of a classification model

In [None]:
# cm = confusion_matrix(labels, preds_vgg16)  
# disp = ConfusionMatrixDisplay(confusion_matrix=cm)
# disp.plot()
# plt.title("Confusion Matrix - VGG16")
# plt.show()


In [None]:
def dirichlet_ensemble(models, test_loader, num_runs=10):
    all_accuracies = []
    final_predictions = None
    final_labels = None

    for run in range(num_runs):
        all_probs = []

        for model in models:
            model.eval()
            probs = []
            with torch.no_grad():
                for images, _ in test_loader:
                    images = images.to(device)
                    outputs = model(images)
                    probs.append(outputs.cpu().numpy())
            all_probs.append(np.concatenate(probs, axis=0)) 

        weights = np.random.dirichlet(np.ones(len(models)))
        print(f"Run {run+1} Dirichlet weights: {weights}")

        weighted_probs = sum(w * p for w, p in zip(weights, all_probs))
        predictions = np.argmax(weighted_probs, axis=1)

        all_labels = []
        for _, labels in test_loader:
            all_labels.extend(labels.numpy())

        accuracy = accuracy_score(all_labels, predictions)
        print(f"Run {run+1} Accuracy: {accuracy * 100:.2f}%")
        all_accuracies.append(accuracy * 100)

        if run == num_runs - 1:
            final_predictions = predictions
            final_labels = all_labels
  
    avg_acc = np.mean(all_accuracies)
    std_acc = np.std(all_accuracies)
    print(f"\nAverage Accuracy over {num_runs} runs: {avg_acc:.2f}% ± {std_acc:.2f}%")

    return all_accuracies, avg_acc, std_acc, final_predictions, final_labels


In [None]:
all_accuracies, avg_acc, std_acc, final_preds, final_labels = dirichlet_ensemble(models, test_loader, num_runs=10)

print(f"Final Ensemble Accuracy: {avg_acc:.2f}% ± {std_acc:.2f}%")

In [None]:
cm = confusion_matrix(final_labels, final_preds)
disp = ConfusionMatrixDisplay(confusion_matrix=cm)
disp.plot(cmap=plt.cm.Blues)
plt.title("Confusion Matrix for Dirichlet Ensemble")
plt.show()