In [None]:
# Cell 1: Imports
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import cv2
import xml.etree.ElementTree as ET
import torch
import torchvision
from torchvision import models, transforms
from torch.utils.data import DataLoader, Dataset
from torchvision.models.detection import fasterrcnn_resnet50_fpn
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from PIL import Image
import os
import torch.nn as nn
from sklearn.metrics import precision_score, recall_score, f1_score

cuda_available = torch.cuda.is_available()

print(f"CUDA Available: {cuda_available}")

if cuda_available:
    for i in range(torch.cuda.device_count()):
        print(f"GPU {i}: {torch.cuda.get_device_name(i)}")
else:
    print("CUDA is not available. PyTorch will use the CPU.")

In [None]:
# Cell 2: Custom Dataset
class CustomDataset(Dataset):
    def __init__(self, root_dir, split="train", transform=None):
        self.root_dir = root_dir
        self.split = split
        self.transform = transform
        self.images = []
        self.annotations = []
        self.image_annotation_count = 0
        self.no_annotation_count = 0

        images_dir = os.path.join(root_dir, "raw-images")
        annotations_dir = os.path.join(root_dir, "annotations")

        for subfolder in os.listdir(images_dir):
            subfolder_images_dir = os.path.join(images_dir, subfolder)
            subfolder_annotations_dir = os.path.join(annotations_dir, subfolder)

            if not os.path.isdir(subfolder_annotations_dir):
                continue

            image_files = [f for f in os.listdir(subfolder_images_dir) if f.endswith(".jpg") or f.endswith(".png")]
            
            # Filter image files to include only those with "rgb" in the filename
            image_files = [f for f in image_files if "rgb" in f.lower()]
            image_files.sort()

            num_images = len(image_files)
            if split == "train":
                image_files = image_files[:int(0.7 * num_images)]
            elif split == "val":
                image_files = image_files[int(0.7 * num_images):int(0.9 * num_images)]
            elif split == "test":
                image_files = image_files[int(0.9 * num_images):]

            for filename in image_files:
                image_path = os.path.join(subfolder_images_dir, filename)
                annotation_path = os.path.join(subfolder_annotations_dir, os.path.splitext(filename)[0] + ".xml")

                if os.path.exists(annotation_path):
                    tree = ET.parse(annotation_path)
                    root = tree.getroot()
                    annotation = []

                    for obj in root.findall("object"):
                        name = obj.find("name").text
                        bbox = obj.find("bndbox")
                        xmin = int(bbox.find("xmin").text)
                        ymin = int(bbox.find("ymin").text)
                        xmax = int(bbox.find("xmax").text)
                        ymax = int(bbox.find("ymax").text)
                        annotation.append([xmin, ymin, xmax, ymax])

                    if len(annotation) > 0:
                        self.images.append(image_path)
                        self.annotations.append(annotation)
                        self.image_annotation_count += 1
                    else:
                        self.no_annotation_count += 1
                else:
                    self.no_annotation_count += 1

    def __len__(self):
        return len(self.images)

    def __getitem__(self, index):
        image_path = self.images[index]
        image = Image.open(image_path).convert("RGB")  # Ensure image is in RGB format
        annotations = self.annotations[index]

        boxes = torch.as_tensor(annotations, dtype=torch.float32)
        labels = torch.ones((len(annotations),), dtype=torch.int64)  # Assuming all objects are 'vehicle'

        target = {"boxes": boxes, "labels": labels}

        if self.transform:
            image = self.transform(image)

        return image, target, image_path

In [None]:
# Cell 3: Test the dataset
dataset_path = "dataset"
split = "train"

thermal_transform = transforms.Compose([
    transforms.Resize((800, 800)),
    transforms.RandomHorizontalFlip(0.5),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5], std=[0.5])
])

thermal_dataset = CustomDataset(dataset_path, split, thermal_transform)
print(f"Number of images in the dataset: {len(thermal_dataset)}")
image, target, image_path = thermal_dataset[0]
print(f"Image path: {image_path}")
print(f"Image shape: {image.shape}")
print(f"Target boxes: {target['boxes']}")
print(f"Target labels: {target['labels']}")

In [None]:
def preprocess_dataset(dataset):
    preprocessed_images = []
    preprocessed_annotations = []
    
    for image, target in dataset:
        # Ensure image is a PIL Image; this should already be the case
        if not isinstance(image, Image.Image):
            raise TypeError("The dataset must return PIL Image objects.")
        
        # Apply transformations
        image = rgb_transform(image)
        
        # Update target 'boxes' format if necessary
        boxes = target['boxes']
        labels = target['labels']

        # Normalize bounding box coordinates
        width, height = image.size  # Accessing size property of PIL Image
        boxes[:, [0, 2]] = boxes[:, [0, 2]] / width
        boxes[:, [1, 3]] = boxes[:, [1, 3]] / height
        
        # Update target dictionary
        target = {'boxes': boxes, 'labels': labels}
        
        preprocessed_images.append(image)
        preprocessed_annotations.append(target)
    
    return preprocessed_images, preprocessed_annotations


In [None]:
# Cell 4: Dataset and DataLoader Creation
rgb_transform = transforms.Compose([
    transforms.Resize((800, 800)),
    transforms.ToTensor()
])

rgb_dataset = CustomDataset(dataset_path, split, rgb_transform)

class_labels = set()
for _, target, _ in rgb_dataset:
    for obj in target["labels"]:
        class_labels.add("vehicle")  

class_to_idx = {"vehicle": 0}
print("Class labels:", class_to_idx)

def collate_fn(batch):
    images = [item[0] for item in batch]
    targets = [item[1] for item in batch]
    image_paths = [item[2] for item in batch]
    images = torch.stack(images, dim=0)
    return images, targets, image_paths

train_rgb_loader = DataLoader(rgb_dataset, batch_size=4, shuffle=True, collate_fn=collate_fn)

for images, targets, image_paths in train_rgb_loader:
    print(f"Batch images shape: {images.shape}")
    print(f"Batch targets boxes shape: {targets[0]['boxes'].shape}")
    print(f"Batch targets labels shape: {targets[0]['labels'].shape}")
    break

In [None]:
# Cell 5: Model Training
coco_vehicle_ids = [3, 8, 6, 4]  
coco_vehicle_labels = ['car', 'truck', 'bus', 'motorcycle']

dataset_label_to_coco_id = {'vehicle': coco_vehicle_ids}  

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = torchvision.models.detection.fasterrcnn_resnet50_fpn(weights=torchvision.models.detection.FasterRCNN_ResNet50_FPN_Weights.DEFAULT)
model.to(device)

for param in model.parameters():
    param.requires_grad = False

num_classes = len(coco_vehicle_labels) + 1  
in_features = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
model.roi_heads.box_predictor.to(device)  

params = [p for p in model.parameters() if p.requires_grad]

def evaluate_model(model, dataloader, device):
    model.eval()
    
    all_precisions = []
    all_recalls = []
    all_f1_scores = []
    all_predictions = []
    all_targets = []
    
    with torch.no_grad():
        for images, targets, _ in dataloader:
            images = list(image.to(device) for image in images)
            outputs = model(images)
            
            for output, target in zip(outputs, targets):
                boxes = output['boxes'].cpu().numpy()
                labels = output['labels'].cpu().numpy()
                scores = output['scores'].cpu().numpy()
                
                # Apply non-maximum suppression to remove overlapping detections
                indices = torchvision.ops.nms(torch.tensor(boxes), torch.tensor(scores), iou_threshold=0.5)
                
                predicted_labels = labels[indices]
                predicted_boxes = boxes[indices]
                predicted_scores = scores[indices]
                
                if isinstance(predicted_labels, np.int64):
                    predicted_labels = [predicted_labels]
                if isinstance(predicted_boxes, np.ndarray) and predicted_boxes.ndim == 1:
                    predicted_boxes = [predicted_boxes]
                if isinstance(predicted_scores, np.float32):
                    predicted_scores = [predicted_scores]
                
                target_labels = target['labels'].cpu().numpy()
                target_boxes = target['boxes'].cpu().numpy()
                
                if isinstance(target_labels, np.int64):
                    target_labels = [target_labels]
                if isinstance(target_boxes, np.ndarray) and target_boxes.ndim == 1:
                    target_boxes = [target_boxes]
                
                if len(predicted_labels) == 0 or len(target_labels) == 0:
                    continue
                
                if len(predicted_labels) < len(target_labels):
                    predicted_labels = np.pad(predicted_labels, (0, len(target_labels) - len(predicted_labels)), mode='constant')
                elif len(predicted_labels) > len(target_labels):
                    predicted_labels = predicted_labels[:len(target_labels)]
                
                precision = precision_score(y_true=target_labels, y_pred=predicted_labels, average='weighted', zero_division=1)
                recall = recall_score(y_true=target_labels, y_pred=predicted_labels, average='weighted', zero_division=1)
                f1 = f1_score(y_true=target_labels, y_pred=predicted_labels, average='weighted', zero_division=1)
                
                all_precisions.append(precision)
                all_recalls.append(recall)
                all_f1_scores.append(f1)
                all_predictions.append((predicted_boxes, predicted_labels, predicted_scores))
                all_targets.append((target_boxes, target_labels))
    
    avg_precision = np.mean(all_precisions) if len(all_precisions) > 0 else 0.0
    avg_recall = np.mean(all_recalls) if len(all_recalls) > 0 else 0.0
    avg_f1 = np.mean(all_f1_scores) if len(all_f1_scores) > 0 else 0.0
    
    print(f"Average Precision: {avg_precision:.4f}")
    print(f"Average Recall: {avg_recall:.4f}")
    print(f"Average F1-score: {avg_f1:.4f}")
    
    return avg_precision, avg_recall, avg_f1

val_split = "val"
val_rgb_dataset = CustomDataset(dataset_path, val_split, rgb_transform)
val_rgb_loader = DataLoader(val_rgb_dataset, batch_size=4, shuffle=False, collate_fn=collate_fn)

lr_values = [0.001, 0.0005, 0.0001]
momentum_values = [0.9, 0.8]
weight_decay_values = [0.0005, 0.0001]
best_accuracy = 0.0
best_model_state_dict = None

for lr in lr_values:
    for momentum in momentum_values:
        for weight_decay in weight_decay_values:
            optimizer = torch.optim.SGD(params, lr=lr, momentum=momentum, weight_decay=weight_decay)
            lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)  # Adjust the learning rate every 5 epochs

            # Train the model
            num_epochs = 10
            for epoch in range(num_epochs):
                model.train()
                epoch_loss = 0.0
                for images, targets, _ in train_rgb_loader:
                    images = list(img.to(device) for img in images)
                    targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

                    loss_dict = model(images, targets)
                    losses = sum(loss for loss in loss_dict.values())

                    optimizer.zero_grad()
                    losses.backward()
                    optimizer.step()

                    epoch_loss += losses.item()
                print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss/len(train_rgb_loader):.4f}")

                lr_scheduler.step()  

            model.eval()
            with torch.no_grad():
                avg_precision, avg_recall, avg_f1 = evaluate_model(model, val_rgb_loader, device)

            if avg_f1 > best_accuracy:
                best_accuracy = avg_f1
                best_model_state_dict = model.state_dict()

            print(f"Hyperparameters: lr={lr}, momentum={momentum}, weight_decay={weight_decay}")
            print(f"Validation Accuracy: {avg_f1:.4f}\n")

model.load_state_dict(best_model_state_dict)

torch.save(model.state_dict(), "trained_model.pth")

In [None]:
# Cell 6: Evaluation and Testing
test_split = "test"
test_rgb_dataset = CustomDataset(dataset_path, test_split, rgb_transform)
test_rgb_loader = DataLoader(test_rgb_dataset, batch_size=1, shuffle=False, collate_fn=collate_fn)

avg_precision, avg_recall, avg_f1 = evaluate_model(model, test_rgb_loader, device)

print(f"Number of images without annotations: {test_rgb_dataset.no_annotation_count}")
print(f"Average Precision: {avg_precision:.4f}")
print(f"Average Recall: {avg_recall:.4f}")
print(f"Average F1-score: {avg_f1:.4f}")

In [None]:
# Cell 7: Load the trained model
model.load_state_dict(torch.load("trained_model.pth"))
model.eval()

In [None]:
# Cell 8: Prepare the test dataset
test_split = "test"
test_rgb_dataset = CustomDataset(dataset_path, test_split, rgb_transform)
test_rgb_loader = DataLoader(test_rgb_dataset, batch_size=1, shuffle=False, collate_fn=collate_fn)

In [None]:
# Cell 9: Evaluate the model on the test dataset
labels = [label.item() for _, target, _ in test_rgb_dataset for label in target['labels']]

unique_labels, label_counts = np.unique(labels, return_counts=True)
print("Unique labels:", unique_labels)
print("Label counts:", label_counts)

In [None]:
import matplotlib.pyplot as plt
import cv2
from PIL import Image

class_labels[1] = class_labels.get(1, 'vehicle')  

def visualize_detections(image, boxes, labels, scores, class_labels, confidence_threshold=0.0):
    image_with_detections = image.copy()
    for box, label, score in zip(boxes, labels, scores):
        if score >= confidence_threshold:
            xmin, ymin, xmax, ymax = box.astype(int)
            class_name = class_labels.get(label, "Unknown")
            cv2.rectangle(image_with_detections, (xmin, ymin), (xmax, ymax), (0, 255, 0), 2)
            cv2.putText(image_with_detections, f"{class_name}: {score:.2f}", (xmin, ymin - 10),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)
    return image_with_detections

for i in range(num_visualizations):
    image_path = test_rgb_dataset.images[i]
    image = cv2.imread(image_path)

    image_tensor = rgb_transform(Image.fromarray(image)).unsqueeze(0).to(device)

    with torch.no_grad():
        outputs = model(image_tensor)

    boxes = outputs[0]['boxes'].cpu().numpy()
    labels = outputs[0]['labels'].cpu().numpy()
    scores = outputs[0]['scores'].cpu().numpy()

    print(f"Image {i+1} Path: {image_path}")
    print(f"Boxes: {boxes}")
    print(f"Labels: {labels}")
    print(f"Scores: {scores}")

    image_with_detections = visualize_detections(image, boxes, labels, scores, class_labels)

    image_with_detections_rgb = cv2.cvtColor(image_with_detections, cv2.COLOR_BGR2RGB)

    plt.figure(figsize=(12, 8))
    plt.imshow(image_with_detections_rgb)
    plt.axis('off')
    plt.show()
