# Imports

In [None]:
# Imports for data loading
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# Imports for model learning
import torch
import torchvision
from torch.utils.data import DataLoader
from torchvision import datasets, transforms, models
from torch.optim.lr_scheduler import ReduceLROnPlateau

# Imports for model evaluation
from torchmetrics.classification import MulticlassPrecision, MulticlassRecall, MulticlassSpecificity

# Other imports
import datetime
from typing import Tuple, List

In [None]:
# Check CUDA availability
device = "cuda" if torch.cuda.is_available() else "cpu"
device

# Cells for Vast

In [4]:
# !pip install pandas
# !pip install torchmetrics

# import zipfile

# zip_path = "dataset.zip"
# extract_path = "dataset"

# with zipfile.ZipFile(zip_path, "r") as zip_ref:
#     zip_ref.extractall(extract_path)

# print(os.listdir(extract_path))

# Data Loading

In [None]:
# # Dima's paths
# TRAIN_PATH = r"C:\Users\aquas\Jupyter\ML_Project_git\dataset\train"
# TEST_PATH = r"C:\Users\aquas\Jupyter\ML_Project_git\dataset\test"

# # Tykhin's paths
# TRAIN_PATH = f"D:\\Tykhon\\Downloads\\data_folder\\train"
# TEST_PATH = f"D:\\Tykhon\\Downloads\\data_folder\\test"

# # Colab paths
# #TRAIN_PATH = "/content/train"
# #TEST_PATH = "/content/test"

# Vast paths
TRAIN_PATH = r"dataset/train"
TEST_PATH = r"dataset/test"

train_set = datasets.ImageFolder(root=TRAIN_PATH)
test_set = datasets.ImageFolder(root=TEST_PATH)

# Print dataset configuration
print(f"\nTraining set size: {len(train_set)} images")
print(f"Test set size: {len(test_set)} images")

class_names = train_set.classes
print(f"\nClasses [{len(class_names)}]: {class_names}")

In [None]:
# Apply custom transformations
custom_transform = transforms.Compose([
    transforms.Resize((380, 380)),
    transforms.CenterCrop(380),
    transforms.Grayscale(num_output_channels=3),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.2954581669589034, 0.29541042352786206, 0.29537371119537875], std=[0.31727746077062646, 0.31729778651393764, 0.31726590300772206]),
])
train_set_transformed = datasets.ImageFolder(root=TRAIN_PATH, transform=custom_transform)
test_set_transformed = datasets.ImageFolder(root=TEST_PATH, transform=custom_transform)

# Show 5 pictures
def show_pictures(dataset: datasets.ImageFolder, pictures_num: int=5) -> None:
    start_index = 0

    fig, axes = plt.subplots(nrows=1, ncols=pictures_num, figsize=(15, 45))

    for _, ax in enumerate(axes):
        image, label = dataset[np.random.randint(0, 23787)]
        image = image.permute(1, 2, 0).numpy()
        ax.imshow(image)
        ax.set_title(f"Label: {label}")
        ax.axis("off")
    plt.show()

show_pictures(train_set_transformed)

# Model Loading

## EfficientNet

In [None]:
# Load EfficientNet model
model = torchvision.models.efficientnet_b0(weights=torchvision.models.EfficientNet_B0_Weights.DEFAULT).to(device)
MODEL_NAME = "EfficientNet"

# Set parameter training to True
for param in model.features.parameters():
    param.requires_grad = True

# Define classifier
model.classifier = torch.nn.Sequential(
    #torch.nn.Dropout(p=0.1, inplace=True), LOra!!! binary cross entropy -for multi labeling!!! mini-batching
    torch.nn.Linear(in_features=model.classifier[1].in_features,
                    out_features=4,
                    bias=True)).to(device)

# Define loss function and optimizer
loss_fn = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=2, threshold=0.01)

## ResNet

In [9]:
# # Load ResNet model
# model = models.resnet152(weights=models.ResNet152_Weights.DEFAULT).to(device)\
# MODEL_NAME = "ResNet"

# # Set parameter training to True
# for param in model.parameters():
#     param.requires_grad = True

# # Define classifier
# model.fc = torch.nn.Linear(
#     in_features=model.fc.in_features,
#     out_features=4,
#     bias=True).to(device)

# # Define loss function and optimizer
# loss_fn = torch.nn.CrossEntropyLoss()
# optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
# scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=2, threshold=0.01)

## DenseNet

In [10]:
# # Load DenseNet model
# model = torchvision.models.densenet169(weights=torchvision.models.DenseNet169_Weights.DEFAULT).to(device)
# MODEL_NAME = DenseNet

# # Set parameter training to True
# for param in model.features.parameters():
#     param.requires_grad = True

# # Define classifier
# model.classifier = torch.nn.Sequential(
#     torch.nn.Linear(in_features=model.classifier.in_features,
#                     out_features=4,
#                     bias=True)).to(device)

# # Define loss function and optimizer
# loss_fn = torch.nn.CrossEntropyLoss()
# optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
# scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=2, threshold=0.01)

## MobileNet

In [11]:
# # Load MobileNet model
# model = models.mobilenet_v2(weights=models.MobileNet_V2_Weights.DEFAULT).to(device)
# MODEL_NAME = MobileNet

# # Set parameter training to True
# for param in model.features.parameters():
#     param.requires_grad = True

# # Define classifier
# model.classifier = torch.nn.Sequential(
#     #torch.nn.Dropout(p=0.1, inplace=True), LOra!!! binary cross entropy -for multi labeling!!! mini-batching
#     torch.nn.Linear(in_features=model.classifier[1].in_features,
#                     out_features=4,
#                     bias=True)).to(device)

# # Define loss function and optimizer
# loss_fn = torch.nn.CrossEntropyLoss()
# optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# Settings

In [12]:
# Number of classes
num_classes = 4

In [14]:
# Set batch size
batch_size = 64
train_dataloader = DataLoader(train_set_transformed, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(test_set_transformed, batch_size=batch_size, shuffle=False)

In [15]:
# Set number of epochs
epochs_num = 25

# Functions

### Some functions, idk

In [16]:
# Log results of every epoch
def save_log(train_loss: float, train_acc: float, test_loss:float, test_acc: float):
    log_data = f"\nTrain loss: {train_loss:.5f} | Train acc: {((train_acc)*100):.2f}% | Test loss: {test_loss:.5f} | Test acc: {((test_acc)*100):.2f}% [{datetime.datetime.now().strftime('%H:%M:%S')}]\n"
    print(log_data)

    with open(f"log_{MODEL_NAME}.txt", "a") as file:
        file.write(log_data)

### Save Results

In [17]:
# Save weight of the model with best accuracy
def model_and_weight_save(test_acc: float, best_acc: float) -> float:
    if test_acc > best_acc:
        best_acc = test_acc
        torch.save(model, f"model_{MODEL_NAME}.pt")
        torch.save(model.state_dict(), f"model_best_weights_{MODEL_NAME}.pt")

    return best_acc

In [21]:
# Save top confused images
def save_top_mistakes(mistakes: list, X, y, test_pred_labels, test_y_prob):
    wrong = test_pred_labels != y
    for idx in range(len(wrong)):
        if wrong[idx]:
            mistakes.append({
                'image': X[idx].cpu(),
                'true': y[idx].item(),
                'pred': test_pred_labels[idx].item(),
                'conf': test_y_prob[idx][test_pred_labels[idx]].item()
            })

# Training

In [None]:
# Variables for evaluation
best_acc = 0
train_loss_array = []
train_acc_array = []
test_loss_array = []
test_acc_array = []
precision_array = []
specificity_array = []
recall_array = []
f1_score_array = []
mistakes = []
all_predictions = torch.empty(0, dtype=torch.long)
all_probabilities = torch.empty(0, num_classes)
all_labels = torch.empty(0, dtype=torch.long)

precision_metric = MulticlassPrecision(num_classes=num_classes, average="macro").to(device)
recall_metric = MulticlassRecall(num_classes=num_classes, average="macro").to(device)
specificity_metric = MulticlassSpecificity(num_classes=num_classes, average='macro').to(device)

# Training loop
for epoch in range(epochs_num):
    print(f"Epoch: {epoch}\n---------")

    model.train()
    train_loss, train_acc = 0, 0
    for batch, (X, y) in enumerate(train_dataloader):
        X, y = X.to(device), y.to(device)
        y_pred = model(X)

        loss = loss_fn(y_pred, y)
        train_loss += loss.item()
        train_pred_labels = y_pred.argmax(dim=1)
        train_acc += (train_pred_labels == y).sum().item()

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if (batch % 10 == 0):
            print(f"Looked at {batch * len(X)}/{len(train_dataloader.dataset)} samples [{datetime.datetime.now().strftime('%H:%M:%S')}]")

    train_loss /= len(train_dataloader)
    train_acc /= len(train_dataloader.dataset)
    train_loss_array.append(train_loss)
    train_acc_array.append(train_acc)

    # Evaluation
    model.eval()
    test_loss, test_acc = 0, 0
    print(f"Evaluation... [{datetime.datetime.now().strftime('%H:%M:%S')}]")

    with torch.inference_mode():
        for X, y in test_dataloader:
            X, y = X.to(device), y.to(device)
            test_y_pred = model(X)
            test_y_prob = test_y_pred.softmax(dim=1)
            test_loss += loss_fn(test_y_pred, y).item()
            test_pred_labels = test_y_pred.argmax(dim=1)
            test_acc += (test_pred_labels == y).sum().item()

            if best_acc <= test_acc:
                all_probabilities = test_y_prob.cpu(), dim=0
                all_predictions = test_pred_labels.cpu(), dim=0
                all_labels =y.cpu(), dim=0

            save_top_mistakes(mistakes, X, y, test_pred_labels, test_y_prob)

    test_loss /= len(test_dataloader)
    test_acc /= len(test_dataloader.dataset)
    test_loss_array.append(test_loss)
    test_acc_array.append(test_acc)

    scheduler.step(test_loss)

    save_log(train_loss, train_acc, test_loss, test_acc)
    best_acc = model_and_weight_save(test_acc, best_acc)

# Saving Results

In [None]:
# Save plot for top mistakes
mistakes.sort(key=lambda x: x['conf'], reverse=True)
mistakes = mistakes[:10]

fig, axes = plt.subplots(2, 5, figsize=(15, 6))
for idx, mistake in enumerate(mistakes):
    ax = axes[idx//5, idx%5]
    img = mistake['image']
    img = np.transpose(img, (1, 2, 0))
    ax.imshow(img, cmap='gray')
    ax.axis('off')
    ax.set_title(f'Pred: {mistake["pred"]}\nTrue: {mistake["true"]}')

plt.subplots_adjust(hspace=0.5)
plt.savefig(f"top_mistakes_{MODEL_NAME}.png")

In [None]:
# Save arrays for acc / loss graphs
visualisation_arrays = pd.DataFrame({
    "Train loss": train_loss_array,
    "Test loss": test_loss_array,
    "Train acc": train_acc_array,
    "Test acc": test_acc_array
})
visualisation_arrays.to_csv("visualisation_arrays.csv", index=False)

In [29]:
# Save tensors for all_predictions
torch.save({
    "all_probabilities": all_probabilities,
    "all_predictions": all_predictions,
    "all_labels": all_labels
}, "visualisation_tensors.pt")