In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
import os
import json
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler
from torchvision import models, transforms
from pycocotools.coco import COCO
from PIL import Image
import numpy as np
from sklearn.svm import SVC
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import precision_recall_curve, roc_curve, auc, confusion_matrix, ConfusionMatrixDisplay
from sklearn.model_selection import GridSearchCV
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from pathlib import Path

# Custom Dataset for COCO
class WasteCocoDataset(Dataset):
    def __init__(self, image_dir, annotation_file, transform=None):
        self.image_dir = image_dir
        self.coco = COCO(annotation_file)
        self.transform = transform
        self.image_ids = self.coco.getImgIds()
        self.cat_id_to_label = {cat['id']: idx for idx, cat in enumerate(self.coco.loadCats(self.coco.getCatIds()))}
        self.label_to_name = {idx: cat['name'] for idx, cat in enumerate(self.coco.loadCats(self.coco.getCatIds()))}

    def __len__(self):
        return len(self.image_ids)

    def __getitem__(self, idx):
        img_id = self.image_ids[idx]
        img_info = self.coco.loadImgs(img_id)[0]
        img_path = os.path.join(self.image_dir, img_info['file_name'])
        image = Image.open(img_path).convert('RGB')

        ann_ids = self.coco.getAnnIds(imgIds=img_id)
        anns = self.coco.loadAnns(ann_ids)
        if not anns:
            return self.__getitem__((idx + 1) % len(self))
        label = self.cat_id_to_label[anns[0]['category_id']]

        transformed_image = self.transform(image) if self.transform else image

        return transformed_image, label, image, img_info, anns

In [3]:
# Custom collate function
def custom_collate(batch):
    transformed_images = [item[0] for item in batch]
    labels = [item[1] for item in batch]
    raw_images = [item[2] for item in batch]
    img_infos = [item[3] for item in batch]
    anns = [item[4] for item in batch]

    transformed_images = torch.stack(transformed_images)
    labels = torch.tensor(labels)

    return transformed_images, labels, raw_images, img_infos, anns

# Data transforms
train_transforms = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.RandomResizedCrop(224),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomRotation(20),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    transforms.RandomAffine(degrees=0, translate=(0.1, 0.1), scale=(0.8, 1.2)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

val_transforms = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Load datasets with oversampling
def load_datasets(image_dir, annotation_file):
    dataset = WasteCocoDataset(image_dir, annotation_file, transform=None)
    train_size = int(0.8 * len(dataset))
    val_size = len(dataset) - train_size
    train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])

    train_dataset.dataset.transform = train_transforms
    val_dataset.dataset.transform = val_transforms

    # Oversample minority classes
    class_counts = {0: 0, 1: 0, 2: 0, 3: 0}
    for idx in train_dataset.indices:
        _, label, _, _, _ = dataset[idx]
        class_counts[label] += 1
    weights = [1.0 / class_counts[dataset[idx][1]] for idx in train_dataset.indices]
    sampler = WeightedRandomSampler(weights, len(weights))

    train_loader = DataLoader(train_dataset, batch_size=32, sampler=sampler, num_workers=4, collate_fn=custom_collate)
    val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=4, collate_fn=custom_collate)
    return train_loader, val_loader

# Initialize model (ResNet-50 or EfficientNet-B0)
def initialize_model(num_classes=4):
    model = models.resnet50(pretrained=True)
    for param in model.parameters():
        param.requires_grad = False
    for param in model.layer3.parameters():
        param.requires_grad = True
    for param in model.layer4.parameters():
        param.requires_grad = True
    model.fc = nn.Sequential(
        nn.Dropout(0.5),
        nn.Linear(model.fc.in_features, num_classes)
    )

    return model

In [4]:
# Focal Loss
class FocalLoss(nn.Module):
    def __init__(self, gamma=2.0, alpha=None, reduction='mean'):
        super(FocalLoss, self).__init__()
        self.gamma = gamma
        self.alpha = alpha
        self.reduction = reduction

    def forward(self, inputs, targets):
        ce_loss = nn.CrossEntropyLoss(reduction='none')(inputs, targets)
        pt = torch.exp(-ce_loss)
        focal_loss = (1 - pt) ** self.gamma * ce_loss
        if self.alpha is not None:
            focal_loss = self.alpha[targets] * focal_loss
        if self.reduction == 'mean':
            return focal_loss.mean()
        return focal_loss.sum()

In [5]:
# Training function
def train_model(model, train_loader, val_loader, num_epochs=20):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)

    # Focal loss with class weights
    dataset = train_loader.dataset.dataset
    class_counts = {0: 0, 1: 0, 2: 0, 3: 0}
    for _, label, _, _, _ in dataset:
        class_counts[label] += 1
    total = sum(class_counts.values())
    alpha = torch.tensor([total / (4 * class_counts[i]) for i in range(4)], dtype=torch.float).to(device)
    criterion = FocalLoss(gamma=2.0, alpha=alpha)

    optimizer = optim.Adam([
        {'params': model.layer3.parameters(), 'lr': 0.00005},
        {'params': model.layer4.parameters(), 'lr': 0.0001},
        {'params': model.fc.parameters(), 'lr': 0.001}
    ], weight_decay=0.01)


    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)

    train_losses = []
    val_losses = []
    best_val_loss = float('inf')
    patience = 5
    counter = 0

    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        for images, labels, _, _, _ in train_loader:
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item() * images.size(0)

        scheduler.step()
        epoch_train_loss = running_loss / len(train_loader.dataset)
        train_losses.append(epoch_train_loss)

        model.eval()
        val_loss = 0.0
        correct = 0
        total = 0
        with torch.no_grad():
            for images, labels, _, _, _ in val_loader:
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)
                loss = criterion(outputs, labels)
                val_loss += loss.item() * images.size(0)
                _, predicted = torch.max(outputs, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

        epoch_val_loss = val_loss / len(val_loader.dataset)
        val_losses.append(epoch_val_loss)
        val_accuracy = 100 * correct / total

        print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {epoch_train_loss:.4f}, Val Loss: {epoch_val_loss:.4f}, Val Acc: {val_accuracy:.2f}%")

        if epoch_val_loss < best_val_loss:
            best_val_loss = epoch_val_loss
            torch.save(model.state_dict(), 'best_model.pth')
            counter = 0
        else:
            counter += 1
            if counter >= patience:
                print("Early stopping triggered")
                break

    plt.figure(figsize=(10, 5))
    plt.plot(train_losses, label='Train Loss')
    plt.plot(val_losses, label='Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.savefig('loss_plot.png')

    model.load_state_dict(torch.load('best_model.pth'))
    return model, best_val_loss

In [6]:
def plot_bounding_boxes(model, test_loader, output_dir="output", svm=None, scaler=None, max_images=100):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)
    model.eval()

    Path(output_dir).mkdir(parents=True, exist_ok=True)
    class_names = ['Alucan', 'Glass', 'HDPEM', 'PET']

    count = 0
    with torch.no_grad():
        for images, labels, raw_images, img_infos, anns in test_loader:
            images = images.to(device)
            if svm is not None:
                feats = model.conv1(images)
                feats = model.bn1(feats)
                feats = model.relu(feats)
                feats = model.maxpool(feats)
                for layer in [model.layer1, model.layer2, model.layer3, model.layer4]:
                    feats = layer(feats)
                feats = model.avgpool(feats)
                feats = feats.view(feats.size(0), -1)
                feats = feats.cpu().numpy()
                feats = scaler.transform(feats)
                scores = svm.predict_proba(feats)
                predictions = svm.predict(feats)
            else:
                outputs = model(images)
                scores = torch.softmax(outputs, dim=1).cpu().numpy()
                _, predictions = torch.max(outputs, 1)
                predictions = predictions.cpu().numpy()

            for i in range(len(raw_images)):
                if count >= max_images:
                    return

                fig, ax = plt.subplots(1)
                raw_image = raw_images[i]
                ax.imshow(raw_image)

                for ann in anns[i]:
                    bbox = ann['bbox']
                    x, y, w, h = bbox
                    rect = patches.Rectangle((x, y), w, h, linewidth=2, edgecolor='r', facecolor='none')
                    ax.add_patch(rect)

                    # Use ground truth category_id for bounding box label
                    category_id = ann['category_id'] - 1  # Adjust for 0-based indexing
                    true_label = class_names[category_id]
                    # Use predicted confidence for the image-level prediction
                    confidence = scores[i][predictions[i]]
                    label_text = f'{true_label}: {confidence:.2f}'
                    ax.text(x, y - 10, label_text, color='white', fontsize=10,
                            bbox=dict(facecolor='red', alpha=0.5, pad=2))

                output_path = os.path.join(output_dir, f"test_image_{count}.png")
                plt.axis('off')
                plt.savefig(output_path, bbox_inches='tight')
                plt.close()
                count += 1
                print(f"Saved visualization: {output_path}")

In [7]:
# Test function
def test_model(model, test_loader, svm=None, scaler=None):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)
    model.eval()

    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels, _, _, _ in test_loader:
            images, labels = images.to(device), labels.to(device)
            if svm is not None:
                feats = model.conv1(images)
                feats = model.bn1(feats)
                feats = model.relu(feats)
                feats = model.maxpool(feats)
                for layer in [model.layer1, model.layer2, model.layer3, model.layer4]:
                    feats = layer(feats)
                feats = model.avgpool(feats)
                feats = feats.view(feats.size(0), -1)
                feats = feats.cpu().numpy()
                feats = scaler.transform(feats)
                predicted = torch.tensor(svm.predict(feats)).to(device)
            else:
                outputs = model(images)
                _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    test_accuracy = 100 * correct / total
    print(f"Test Accuracy: {test_accuracy:.2f}%")
    return test_accuracy

In [8]:
def evaluate_metrics(model, test_loader, output_dir="output", svm=None, scaler=None):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)
    model.eval()

    Path(output_dir).mkdir(parents=True, exist_ok=True)
    class_names = ['Alucan', 'Glass', 'HDPEM', 'PET']
    all_labels = []
    all_preds = []
    all_scores = []

    with torch.no_grad():
        for images, labels, _, _, _ in test_loader:
            images, labels = images.to(device), labels.to(device)
            if svm is not None:
                feats = model.conv1(images)
                feats = model.bn1(feats)
                feats = model.relu(feats)
                feats = model.maxpool(feats)
                for layer in [model.layer1, model.layer2, model.layer3, model.layer4]:
                    feats = layer(feats)
                feats = model.avgpool(feats)
                feats = feats.view(feats.size(0), -1)
                feats = feats.cpu().numpy()
                feats = scaler.transform(feats)
                scores = svm.predict_proba(feats)
                preds = svm.predict(feats)
            else:
                outputs = model(images)
                scores = torch.softmax(outputs, dim=1).cpu().numpy()
                _, preds = torch.max(outputs, 1)
                preds = preds.cpu().numpy()

            all_labels.extend(labels.cpu().numpy())
            all_preds.extend(preds)
            all_scores.extend(scores)

    all_labels = np.array(all_labels)
    all_preds = np.array(all_preds)
    all_scores = np.array(all_scores)

    # Confusion Matrix
    cm = confusion_matrix(all_labels, all_preds)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=class_names)
    disp.plot(cmap=plt.cm.Blues)
    plt.title('Confusion Matrix')
    plt.savefig(os.path.join(output_dir, 'confusion_matrix.png'))
    plt.close()

    # Precision-Recall, Precision-Confidence, Recall-Confidence Curves
    plt.figure(figsize=(15, 10))
    for i, class_name in enumerate(class_names):
        precision, recall, thresholds = precision_recall_curve(all_labels == i, all_scores[:, i])

        plt.subplot(2, 2, 1)
        plt.plot(recall, precision, label=f'{class_name}')
        plt.xlabel('Recall')
        plt.ylabel('Precision')
        plt.title('Precision-Recall Curve')
        plt.legend()

        plt.subplot(2, 2, 2)
        plt.plot(thresholds, precision[:-1], label=f'{class_name}')
        plt.xlabel('Confidence Threshold')
        plt.ylabel('Precision')
        plt.title('Precision-Confidence Curve')
        plt.legend()

        plt.subplot(2, 2, 3)
        plt.plot(thresholds, recall[:-1], label=f'{class_name}')
        plt.xlabel('Confidence Threshold')
        plt.ylabel('Recall')
        plt.title('Recall-Confidence Curve')
        plt.legend()

    plt.tight_layout()
    plt.savefig(os.path.join(output_dir, 'pr_curves.png'))
    plt.close()

    # ROC-AUC Curve
    plt.figure(figsize=(8, 6))
    for i, class_name in enumerate(class_names):
        fpr, tpr, _ = roc_curve(all_labels == i, all_scores[:, i])
        roc_auc = auc(fpr, tpr)
        plt.plot(fpr, tpr, label=f'{class_name} (AUC = {roc_auc:.2f})')

    plt.plot([0, 1], [0, 1], 'k--')
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('ROC-AUC Curve')
    plt.legend()
    plt.savefig(os.path.join(output_dir, 'roc_auc.png'))
    plt.close()

    # mAP@50 and mAP@95
    map_50_scores = []
    map_95_scores = []
    for i in range(len(class_names)):
        precision, recall, thresholds = precision_recall_curve(all_labels == i, all_scores[:, i])
        precisions_at_50 = precision[np.where(thresholds >= 0.5)[0]]
        recalls_at_50 = recall[np.where(thresholds >= 0.5)[0]]
        precisions_at_95 = precision[np.where(thresholds >= 0.95)[0]]
        recalls_at_95 = recall[np.where(thresholds >= 0.95)[0]]

        ap_50 = np.trapz(precisions_at_50, recalls_at_50) if len(precisions_at_50) > 0 else 0
        ap_95 = np.trapz(precisions_at_95, recalls_at_95) if len(precisions_at_95) > 0 else 0
        map_50_scores.append(ap_50)
        map_95_scores.append(ap_95)

    map_50 = np.mean(map_50_scores)
    map_95 = np.mean(map_95_scores)
    print(f"mAP@50: {map_50:.4f}")
    print(f"mAP@95: {map_95:.4f}")

In [9]:
# Check class distribution
def check_class_distribution(dataset):
    class_counts = {0: 0, 1: 0, 2: 0, 3: 0}
    for _, label, _, _, _ in dataset:
        class_counts[label] += 1
    print("Class Distribution:", {dataset.label_to_name[k]: v for k, v in class_counts.items()})

In [None]:

# Main execution
def main():
    image_dir = "/content/drive/MyDrive/MiniProject/TrainYolov8CustomDataset/train1000/images"  # Update this path
    annotation_file = "/content/drive/MyDrive/MiniProject/TrainYolov8CustomDataset/train1000/coco_annotations1000.json"  # Update this path
    test_image_dir = "/content/drive/MyDrive/MiniProject/TrainYolov8CustomDataset/train1000/val_mix/"  # Update this path
    test_annotation_file = "val_mix_annotations.json"  # Update this path
    output_dir = "/content/drive/MyDrive/MiniProject/TrainYolov8CustomDataset/train1000/mix_output"

    # Set device
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")

    # Check class distribution
    full_dataset = WasteCocoDataset(image_dir, annotation_file, transform=val_transforms)
    check_class_distribution(full_dataset)

    train_loader, val_loader = load_datasets(image_dir, annotation_file)
    test_dataset = WasteCocoDataset(test_image_dir, test_annotation_file, transform=val_transforms)
    test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=4, collate_fn=custom_collate)

    # Initialize and move model to device
    model = initialize_model(num_classes=4).to(device)
    trained_model, val_loss = train_model(model, train_loader, val_loader, num_epochs=20)

    # Save the trained model
    model_path = os.path.join(output_dir, "trained_model.pth")
    torch.save(trained_model.state_dict(), model_path)
    print(f"Model saved to: {model_path}")

    if val_loss >= 1.0:
        print("Validation loss >= 1, training SVM pipeline...")
        svm, scaler = train_svm_pipeline(trained_model, train_loader, val_loader)
        plot_bounding_boxes(trained_model, test_loader, output_dir, svm, scaler, max_images=100)
        evaluate_metrics(trained_model, test_loader, output_dir, svm, scaler)
        test_accuracy = test_model(trained_model, test_loader, svm, scaler)
    else:
        print("Validation loss < 1, testing directly...")
        plot_bounding_boxes(trained_model, test_loader, output_dir, max_images=100)
        evaluate_metrics(trained_model, test_loader, output_dir)
        test_accuracy = test_model(trained_model, test_loader)

    print(f"Final Test Accuracy: {test_accuracy:.2f}%")

if __name__ == "__main__":
    main()