In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, models, transforms
from torch.utils.data import DataLoader, Subset
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import numpy as np
import torchvision
from sklearn.metrics import precision_recall_fscore_support, confusion_matrix, ConfusionMatrixDisplay

### We check whether we train with GPU

In [None]:
print(torch.cuda.is_available())

### Defining transformations
These transformations are identical to the ones performed on the resnet50 and vgg

In [None]:
# Define transforms for the data
data_transforms = {
    'train': transforms.Compose([
        transforms.Resize((256, 256)),
        transforms.RandomCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.RandomRotation(15),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ]),
    'test': transforms.Compose([
        transforms.Resize((256, 256)),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ]),
}

### Defining constansts for the training model
These constast are identical to the ones in resnet50 and vgg models that are in comparison.

In [None]:
EPOCH_COUNT = 20
TO_RECOGNIZE = 5

TRAIN_PART = 0.8
LEARNING_RATE = 1e-3
PATIENCE = 7

CHECKPOINT_FILENAME = "checkpoint.tar"

In [None]:
# Define the device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')


### Preparing dataset:
I am using the datasets.ImageFolder method provided by torchvision to create a dataset. It creates a dataset from a folder structure where each class has its own directory containing the images belonging to the class

In [None]:
# Define the data directory
data_dir = 'Larger'  # Update this path as needed

# Load the full dataset
full_dataset = datasets.ImageFolder(data_dir, transform=None)
CLASS_LIST = full_dataset.classes

# Split the dataset into train and test sets (80% train, 20% test)
train_idx, test_idx = train_test_split(
    list(range(len(full_dataset))), test_size=0.2, stratify=full_dataset.targets)

# Create subsets for each set
train_dataset = Subset(full_dataset, train_idx)
test_dataset = Subset(full_dataset, test_idx)

# Apply appropriate transforms to each subset
train_dataset.dataset.transform = data_transforms['train']
test_dataset.dataset.transform = data_transforms['test']

# Create dataloaders for each set
dataloaders = {
    'train': DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4),
    'test': DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=4)
}

# Get dataset sizes
dataset_sizes = {
    'train': len(train_dataset),
    'test': len(test_dataset)
}

### Printing meta information about the dataset

In [None]:
# Define the method to print dataset information
def print_dataset_info(full_dataset, train_dataset, test_dataset):
    num_classes = len(full_dataset.classes)
    print(f"Total number of samples in the full dataset: {len(full_dataset)}")
    print(f"Number of samples in the training set: {len(train_dataset)}")
    print(f"Number of samples in the test set: {len(test_dataset)}")
    print(f"Total number of classes: {num_classes}")

# Print dataset information
print_dataset_info(full_dataset, train_dataset, test_dataset)

In [None]:
# Get class names
class_names = full_dataset.classes
print(class_names)

### Defining an early stopping method:

In [None]:
class EarlyStopping:
    def __init__(self, patience=5, min_delta=0):
        self.patience = patience
        self.min_delta = min_delta
        self.best_score = None
        self.counter = 0
        self.early_stop = False

    def __call__(self, val_loss):
        score = -val_loss

        if self.best_score is None:
            self.best_score = score
        elif score < self.best_score + self.min_delta:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = score
            self.counter = 0

## Showcasing an array of images and their labels

In [None]:
# Define a function to show images
def imshow(inp, title=None):
    """Imshow for Tensor."""
    inp = inp.numpy().transpose((1, 2, 0))
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    inp = std * inp + mean
    inp = np.clip(inp, 0, 1)
    plt.imshow(inp)
    if title is not None:
        plt.title(title)
    plt.pause(0.001)  # pause a bit so that plots are updated

# Get a batch of training data
inputs, classes = next(iter(dataloaders['train']))

# Make a grid from batch with larger padding and bigger images
out = torchvision.utils.make_grid(inputs, padding=20, pad_value=1, scale_each=True)

# Display batch with labels
imshow(out, title=[class_names[x] for x in classes])
plt.show()


### Defining the training epoch function:

In [None]:
from tqdm import tqdm

def train_epoch(model, train_loader, criterion, optimizer):
    model.train()

    running_train_loss = np.array([], dtype=np.float32)
    correct_predictions = 0
    total_predictions = 0

    # Wrap the train_loader with tqdm
    train_loader = tqdm(train_loader, desc="Training Batches")
    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)

        outputs = model(images)

        loss = criterion(outputs, labels)
        running_train_loss = np.append(running_train_loss, loss.cpu().detach().numpy())

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        predicted = torch.argmax(outputs, axis=1)

        correct_predictions += (predicted == labels).sum().item()
        total_predictions += labels.size(0)

    train_loss = np.mean(running_train_loss)
    train_accuracy = correct_predictions / total_predictions

    return train_loss, train_accuracy


### Calculation of metrics for all the predictions

In [None]:
def calculate_metrics(correct_predictions, total_predictions, y_true, y_pred):
    print('Total predictions: ', total_predictions)
    print('Correct predictions: ', correct_predictions)

    accuracy = correct_predictions / total_predictions
    precision, recall, f1, _ = precision_recall_fscore_support(y_true, y_pred, average='weighted')
    conf_matrix = confusion_matrix(y_true, y_pred)

    return accuracy, precision, recall, f1, conf_matrix

### Calculation of metrics for each of the class with selected treshold

In [None]:
def calculate_metrics_per_class(all_confidences, all_true_labels, threshold=0.8):
    if len(all_true_labels.shape) == 1:
        all_true_labels = all_true_labels[:, None]

    num_classes = all_true_labels.shape[1]
    stats = {}
    for class_index in range(num_classes):
        predictions = (all_confidences[:, class_index] >= threshold).astype(int)
        true_labels = all_true_labels[:, class_index].astype(int)

        TP = np.sum((predictions == 1) & (true_labels == 1))
        TN = np.sum((predictions == 0) & (true_labels == 0))
        FP = np.sum((predictions == 1) & (true_labels == 0))
        FN = np.sum((predictions == 0) & (true_labels == 1))

        accuracy = (TP + TN) / (TP + TN + FP + FN) if (TP + TN + FP + FN) > 0 else 0
        precision = TP / (TP + FP) if (TP + FP) > 0 else 0
        recall = TP / (TP + FN) if (TP + FN) > 0 else 0
        f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

        stats[class_index] = {
            'accuracy': accuracy,
            'precision': precision,
            'recall': recall,
            'f1_score': f1_score
        }

    return stats

In [None]:
def evaluate_epoch(model, test_loader, criterion):
    model.eval()
    running_val_loss = np.array([], dtype=np.float32)
    correct_predictions, total_predictions = 0, 0
    y_true, y_pred, all_confidences, all_true_labels = [], [], [], []

    with torch.no_grad():
        for images, labels in test_loader:
            
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            confidences = torch.sigmoid(outputs)
            val_loss = criterion(outputs, labels)
            running_val_loss = np.append(running_val_loss, val_loss.cpu().detach().numpy())

            predicted = torch.argmax(outputs, axis=1)

            correct_predictions += (predicted == labels).sum().item()
            total_predictions += labels.size(0)

            y_true.extend(labels.cpu().numpy())
            y_pred.extend(predicted.cpu().numpy())

            all_confidences.extend(confidences.cpu().numpy())
            all_true_labels.extend(labels.cpu().numpy())

    all_confidences = np.array(all_confidences)
    all_true_labels = np.array(all_true_labels)
    
    stats = calculate_metrics_per_class(all_confidences, all_true_labels, threshold=0.8)

    for class_index, class_stats in stats.items():
        print(f"Class {class_index}:")
        print(f" Accuracy: {class_stats['accuracy']}")
        print(f" Precision: {class_stats['precision']}")
        print(f" Recall: {class_stats['recall']}")
        print(f" F1 Score: {class_stats['f1_score']}")

    return np.mean(running_val_loss), correct_predictions, total_predictions, y_true, y_pred, stats


In [None]:
def load_checkpoint(filename, model, optimizer):
    print("Loading checkpoint...")
    checkpoint = torch.load(filename)

    model.load_state_dict(checkpoint['state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer'])

    training_history = {
        "epoch": checkpoint.get("epoch", 0),
        "train_loss_history": checkpoint.get("train_loss_history", []),
        "accuracy_history": checkpoint.get("accuracy_history", []),
        "train_accuracy_history": checkpoint.get("train_accuracy_history", []),
        "val_loss_history": checkpoint.get("val_loss_history", []),
        "precision_history": checkpoint.get("precision_history", []),
        "recall_history": checkpoint.get("recall_history", []),
        "f1_score_history": checkpoint.get("f1_score_history", []),
        "stats_history": checkpoint.get("stats_history", []),
        "conf_matrix": checkpoint.get("conf_matrix", [])
    }

    return training_history

def seconds_to_time(seconds):
    s = int(seconds) % 60
    m = int(seconds) // 60
    if m < 1:
        return f'{s}s'
    h = m // 60
    m = m % 60
    if h < 1:
        return f'{m}m{s}s'
    return f'{h}h{m}m{s}s'

### Loading and finetuning model

In [None]:
# Load a pretrained MobileNetV2 model
model_ft = models.mobilenet_v2(pretrained=True)

# Modify the classifier to match the number of classes
num_ftrs = model_ft.classifier[1].in_features
model_ft.classifier[1] = nn.Linear(num_ftrs, len(class_names))

# Move the model to the appropriate device
model_ft = model_ft.to(device)

In [None]:
from datetime import datetime

LOAD_CHECKPOINT = False

def train_and_eval(model, loader_train, loader_valid, filename, epoch_count, lr):
    loss_func = torch.nn.CrossEntropyLoss()
    optimizer = optim.Adam([
        {'params': model_ft.features.parameters(), 'lr': lr * 0.1},
        {'params': model_ft.classifier.parameters(), 'lr': lr}
    ])
    early_stopping = EarlyStopping(patience=PATIENCE, min_delta=0.01)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=PATIENCE, verbose=True)

    training_history = None

    if LOAD_CHECKPOINT:
        training_history = load_checkpoint(filename, model, optimizer)
        start_epoch = training_history['epoch'] + 1
        train_loss_history = training_history['train_loss_history']
        accuracy_history = training_history['accuracy_history']
        train_accuracy_history = training_history['train_accuracy_history']
        val_loss_history = training_history['val_loss_history']
        precision_history = training_history['precision_history']
        recall_history = training_history['recall_history']
        f1_score_history = training_history['f1_score_history']
        stats_history = training_history['stats_history']
        conf_matrix = training_history['conf_matrix']
    else:
        start_epoch = 0
        train_loss_history = []
        val_loss_history = []
        accuracy_history = []
        train_accuracy_history = []
        precision_history = []
        recall_history = []
        f1_score_history = []
        stats_history = []
        conf_matrix = []

    for epoch in range(start_epoch, epoch_count):
        print('Starting training epoch... ', epoch)
        start_time = datetime.now()

        train_loss, train_accuracy = train_epoch(model, loader_train, loss_func, optimizer)

        current_time = datetime.now()
        elapsed = seconds_to_time((current_time - start_time).total_seconds())
        print(f'Epoch: {epoch}, Time: {elapsed}, Loss: {train_loss}')

        print('Starting evaluation... ', start_time)
        start_time = datetime.now()

        avg_val_loss, correct_predictions, total_predictions, y_true, y_pred, stats = evaluate_epoch(model, dataloaders["test"], loss_func)

        accuracy, precision, recall, f1, conf_matrix = calculate_metrics(correct_predictions, total_predictions, y_true, y_pred)

        current_time = datetime.now()
        per_image = (current_time - start_time).total_seconds() / total_predictions
        print(f'Time: {per_image * 1000}ms, Epoch {epoch}, Train Loss: {train_loss:.4f}, Val Loss: {avg_val_loss:.4f}, Accuracy: {accuracy:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}, F1 score: {f1:.4f}')
        print(f'Confusion matrix: {conf_matrix}')

        accuracy_history.append(accuracy)
        train_accuracy_history.append(train_accuracy)
        train_loss_history.append(train_loss)
        val_loss_history.append(avg_val_loss)
        precision_history.append(precision)
        recall_history.append(recall)
        f1_score_history.append(f1)
        stats_history.append(stats)

        print("Saving checkpoint...")
        checkpoint = {
            "epoch": epoch,
            "state_dict": model.state_dict(),
            "optimizer": optimizer.state_dict(),
            "train_loss_history": train_loss_history,
            "accuracy_history": accuracy_history,
            "train_accuracy_history": train_accuracy_history,
            "val_loss_history": val_loss_history,
            "precision_history": precision_history,
            "recall_history": recall_history,
            "f1_score_history": f1_score_history,
            "stats_history": stats_history,
            "conf_matrix": conf_matrix
        }
        torch.save(checkpoint, filename)

        scheduler.step(avg_val_loss)

        early_stopping(avg_val_loss)
        if early_stopping.early_stop:
            print("Early stopping triggered. Reducing learning rate and resetting early stopping.")
            early_stopping.counter = 0

    return training_history

### Teaching of the model

In [None]:
print(f'Parameter count: {sum(p.numel() for p in model_ft.parameters() if p.requires_grad):,}')
training_history = train_and_eval(model_ft, dataloaders["train"], dataloaders["test"], CHECKPOINT_FILENAME, epoch_count = EPOCH_COUNT, lr = LEARNING_RATE)

## Display of statistics

In [None]:
optimizer = optim.Adam([
        {'params': model_ft.features.parameters(), 'lr': LEARNING_RATE * 0.1},
        {'params': model_ft.classifier.parameters(), 'lr': LEARNING_RATE}
])

# Load a pretrained MobileNetV2 model
model_ft = models.mobilenet_v2(pretrained=True)

# Modify the classifier to match the number of classes
num_ftrs = model_ft.classifier[1].in_features
model_ft.classifier[1] = nn.Linear(num_ftrs, len(class_names))

# Move the model to the appropriate device
model_ft = model_ft.to(device)

training_history = load_checkpoint(CHECKPOINT_FILENAME, model_ft, optimizer)

train_accuracy_history = training_history['train_accuracy_history']
accuracy_history = training_history['accuracy_history']
train_loss_history = training_history['train_loss_history']
val_loss_history = training_history['val_loss_history']
conf_matrix = training_history['conf_matrix']

plt.plot(train_accuracy_history, label='Train Accuracy')
plt.plot(accuracy_history, label='Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.title('Training and Validation Accuracy')
plt.show()

plt.plot(train_loss_history, label='Train Loss')
plt.plot(val_loss_history, label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.title('Training and Validation Loss')
plt.show()

if isinstance(conf_matrix, list):
    last_conf_matrix = np.array(conf_matrix[-1])
else:
    last_conf_matrix = np.array(conf_matrix)

disp = ConfusionMatrixDisplay(confusion_matrix=last_conf_matrix, display_labels=full_dataset.classes)
disp.plot(cmap=plt.cm.Blues)
plt.title('Confusion Matrix')
plt.show()