**UXO Classification using RESNET18:**

This notebook serves to outline the code used in developing a classification model using RESNET18. The classification is performed on sonar data, positive entailing to unexploded ordnances (UXOs) underwater and negative from numerous common objects found also underwater. The full project can be accessed on this github repository: https://github.com/MansourSaliba/UXO_detection_model

***Initial steps:***

In [None]:
# Mount Google Drive

from google.colab import drive

drive.mount('/content/drive')

In [None]:
# Import libraries
# Install packages before this step if not installed (e.g. !pip install ultralytics mlflow)

import os
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms, models
from torch.utils.data import Dataset, DataLoader, random_split
from PIL import Image
import numpy as np
import mlflow
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from tqdm import tqdm

In [None]:
# Set paths (change based on your specific path)

MLFLOW_TRACKING_URI = '/content/drive/MyDrive/UXO_project/mlruns'
MODEL_SAVE_PATH = '/content/drive/MyDrive/UXO_project/models/resnet18_uxo.pt'
UXO_PATH = '/content/drive/MyDrive/UXO_project/UXO_dataset/processed/UXO/aris_polar_standardized'
NON_UXO_PATH = '/content/drive/MyDrive/UXO_project/UXO_dataset/processed/Negative'


In [None]:
# Initialize MLflow (to track metrics)

mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
mlflow.set_experiment("UXO_Classification")

In [None]:
# Check runtime

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

***Preparing dataset:***

In [None]:
class UXODataset(Dataset):
    def __init__(self, uxo_files, non_uxo_files, transform=None):
        self.file_paths = uxo_files + non_uxo_files
        self.labels = [1]*len(uxo_files) + [0]*len(non_uxo_files)
        self.transform = transform

    def __len__(self):
        return len(self.file_paths)

    def __getitem__(self, idx):
        img = Image.open(self.file_paths[idx]).convert('RGB')
        label = self.labels[idx]

        if self.transform:
            img = self.transform(img)

        return img, torch.tensor(label, dtype=torch.float32)

# Get all files
uxo_files = [os.path.join(UXO_PATH, f) for f in os.listdir(UXO_PATH) if f.endswith('.png')]
non_uxo_files = [os.path.join(NON_UXO_PATH, f) for f in os.listdir(NON_UXO_PATH) if f.endswith('.png')]

In [None]:
# Noticed an imbalance between positive and negative data so implemented the below steps

# Step 1: Mild subsampling (reduce UXO from 2,573 to 2,200, keeping most data)
uxo_files = uxo_files[:2200]  # Now 2,200 vs 1,868 (15% → 8.2% imbalance)

# Step 2: Add class weights
uxo_weight = len(non_uxo_files) / (len(uxo_files) + len(non_uxo_files))
non_uxo_weight = len(uxo_files) / (len(uxo_files) + len(non_uxo_files))
criterion = nn.BCEWithLogitsLoss(
    pos_weight=torch.tensor([non_uxo_weight / uxo_weight], device=device)
)

# Step 3: Slightly augment Non-UXO
train_transform_non_uxo = transforms.Compose([
    transforms.Resize(TARGET_SIZE),
    transforms.RandomHorizontalFlip(p=0.3),
    transforms.ToTensor(),
])

# Verify balanced classes
assert abs(len(uxo_files) - len(non_uxo_files)) / (len(uxo_files) + len(non_uxo_files)) < 0.1, "Class imbalance >10%"

In [None]:
# Data preprocessing

TARGET_SIZE = (224, 414) # Maintaining aspect ratio (original 1636×3025 → scaled to 224×414)

# Augmentations
train_transform = transforms.Compose([
    transforms.Resize(TARGET_SIZE),
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),
    transforms.RandomRotation(15),
    transforms.ColorJitter(brightness=0.2, contrast=0.2),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

val_transform = transforms.Compose([
    transforms.Resize(TARGET_SIZE),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Get ALL files and labels first
all_files = np.array(uxo_files + non_uxo_files)
all_labels = np.array([1]*len(uxo_files) + [0]*len(non_uxo_files))

# 80/10/10 stratified split
train_files, temp_files, train_labels, temp_labels = train_test_split(
    all_files, all_labels,
    test_size=0.2,
    stratify=all_labels,
    random_state=42
)

val_files, test_files, val_labels, test_labels = train_test_split(
    temp_files, temp_labels,
    test_size=0.5,
    stratify=temp_labels,
    random_state=42
)

# Verify
assert len(set(val_files) & set(test_files)) == 0
print(f"Final counts - Train: {len(train_files)}, Val: {len(val_files)}, Test: {len(test_files)}")

In [None]:
# Create datasets
train_dataset = UXODataset(
    [f for f, l in zip(train_files, train_labels) if l == 1],
    [f for f, l in zip(train_files, train_labels) if l == 0],
    transform=train_transform
)

val_dataset = UXODataset(
    [f for f, l in zip(val_files, val_labels) if l == 1],
    [f for f, l in zip(val_files, val_labels) if l == 0],
    transform=val_transform
)

test_dataset = UXODataset(
    [f for f, l in zip(test_files, test_labels) if l == 1],
    [f for f, l in zip(test_files, test_labels) if l == 0],
    transform=val_transform
)

# DataLoaders
BATCH_SIZE = 32
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=2)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)

***Setting up the model:***

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = models.resnet18(pretrained=True)
model.fc = nn.Sequential(
    nn.Linear(512, 256),
    nn.ReLU(),
    nn.Dropout(0.3),
    nn.Linear(256, 1),
    nn.Sigmoid()
)
model = model.to(device)

criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-4)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=3, factor=0.5)

***Training loop:***

In [None]:
# Training Loop with Early Stopping

def train_model():
    best_val_loss = float('inf')
    patience = 5
    patience_counter = 0

    with mlflow.start_run():
        # Log parameters
        mlflow.log_params({
            "model": "ResNet18",
            "batch_size": BATCH_SIZE,
            "optimizer": "Adam",
            "learning_rate": 0.001,
            "weight_decay": 1e-4,
            "scheduler": "ReduceLROnPlateau",
            "dropout": 0.3,
            "target_size": TARGET_SIZE
        })

        for epoch in range(50):  # Max epochs
            model.train()
            train_loss = 0.0
            correct = 0
            total = 0

            # Training phase
            for inputs, labels in tqdm(train_loader, desc=f"Epoch {epoch+1}"):
                inputs, labels = inputs.to(device), labels.to(device).unsqueeze(1)

                optimizer.zero_grad()
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                loss.backward()
                optimizer.step()

                train_loss += loss.item()
                predicted = (outputs > 0.5).float()
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

            train_acc = 100 * correct / total
            avg_train_loss = train_loss / len(train_loader)

            # Validation phase
            val_loss, val_acc = evaluate(model, val_loader)
            scheduler.step(val_loss)

            # Log metrics
            mlflow.log_metrics({
                "train_loss": avg_train_loss,
                "train_acc": train_acc,
                "val_loss": val_loss,
                "val_acc": val_acc,
                "lr": optimizer.param_groups[0]['lr']
            }, step=epoch)

            print(f"Epoch {epoch+1}: "
                  f"Train Loss: {avg_train_loss:.4f}, Acc: {train_acc:.2f}% | "
                  f"Val Loss: {val_loss:.4f}, Acc: {val_acc:.2f}%")

            # Early stopping
            if val_loss < best_val_loss:
                best_val_loss = val_loss
                patience_counter = 0
                torch.save(model.state_dict(), MODEL_SAVE_PATH)
                mlflow.log_artifact(MODEL_SAVE_PATH)
            else:
                patience_counter += 1
                if patience_counter >= patience:
                    print(f"Early stopping at epoch {epoch+1}")
                    break

def evaluate(model, loader):
    model.eval()
    loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():
        for inputs, labels in loader:
            inputs, labels = inputs.to(device), labels.to(device).unsqueeze(1)
            outputs = model(inputs)
            loss += criterion(outputs, labels).item()
            predicted = (outputs > 0.5).float()
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    return loss / len(loader), 100 * correct / total

# Start training
train_model()


In [None]:
# Test Set Evaluation

def test_model():
    model.load_state_dict(torch.load(MODEL_SAVE_PATH))
    test_loss, test_acc = evaluate(model, test_loader)

    with mlflow.start_run(run_name="Test_Evaluation"):
        mlflow.log_metrics({
            "test_loss": test_loss,
            "test_acc": test_acc
        })

        # Log confusion matrix
        model.eval()
        all_preds = []
        all_labels = []

        with torch.no_grad():
            for inputs, labels in test_loader:
                inputs = inputs.to(device)
                outputs = model(inputs)
                preds = (outputs > 0.5).float().cpu()
                all_preds.extend(preds.numpy())
                all_labels.extend(labels.numpy())

        from sklearn.metrics import confusion_matrix
        cm = confusion_matrix(all_labels, all_preds)

        plt.figure(figsize=(8, 6))
        plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
        plt.title('Confusion Matrix')
        plt.colorbar()
        plt.xlabel('Predicted')
        plt.ylabel('True')
        plt.xticks([0, 1], ['Non-UXO', 'UXO'])
        plt.yticks([0, 1], ['Non-UXO', 'UXO'])

        for i in range(2):
            for j in range(2):
                plt.text(j, i, str(cm[i][j]), ha='center', va='center')

        mlflow.log_figure(plt.gcf(), "confusion_matrix.png")
        plt.close()

        print(f"\nTest Results: Loss: {test_loss:.4f}, Accuracy: {test_acc:.2f}%")
        print(f"Confusion Matrix:\n{cm}")

test_model()


***Sample predictions:***

In [None]:
def show_predictions(num_samples=3):
    model.load_state_dict(torch.load(MODEL_SAVE_PATH))
    model.eval()

    samples = []
    for i, (img, label) in enumerate(test_dataset):
        if len(samples) >= num_samples:
            break
        samples.append((img, label))

    plt.figure(figsize=(15, 5))
    for i, (img, label) in enumerate(samples):
        with torch.no_grad():
            output = model(img.unsqueeze(0).to(device))
            pred = "UXO" if output > 0.5 else "Non-UXO"
            confidence = output.item() if output > 0.5 else 1 - output.item()

        plt.subplot(1, num_samples, i+1)
        plt.imshow(img.permute(1, 2, 0).cpu().numpy() * 0.5 + 0.5)  # Unnormalize
        plt.title(f"True: {'UXO' if label == 1 else 'Non-UXO'}\n"
                  f"Pred: {pred} ({confidence:.2f})")
        plt.axis('off')

    mlflow.log_figure(plt.gcf(), "sample_predictions.png")
    plt.show()

show_predictions()