## Faster R-CNN Implementation

**Please Note:** For better usability `Training` and `Testing` sections should be placed in seperate source files. Furtheremore some functions like `getModel` are defined twice. Once in each section.

### Imports

For this project, to install PyTorch the command below was used since it download `PyTroch with Cuda compatability` which allowed the model to run on GPU instead of CPU which `helped decrease execution time`.

In [None]:
!pip install pillow
!pip install matplotlib
!pip3 install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118 --no-cache-dir

In [None]:
import os
import torch
import torchvision
from torch.utils.data import DataLoader
from torchvision.models.detection import FasterRCNN_ResNet50_FPN_Weights
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.datasets import CocoDetection
from torchvision.transforms import functional
import matplotlib.pyplot as plt
from PIL import Image

%matplotlib inline

## Training

### Helper Functions

In [None]:
# convert image/s to tensor
class CocoToTensor:
    def __call__(self, image, target):
        # Convert PIL image to tensor
        image = functional.to_tensor(image)
        
        # return image and target - target mean the class name and bounding box
        return image, target

# Load the COCO dataset
def get_dataset(img_dir, ann_file):
    # Load the COCO dataset
    return CocoDetection(
        root=img_dir, 
        annFile=ann_file, 
        transforms=CocoToTensor()
    )

# function to load Faster R-CNN with ResNet50 backend
def getModel(numOfClasses):
    # Load pre-trained Faster R-CNN model
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(weights=FasterRCNN_ResNet50_FPN_Weights.DEFAULT)
    
    # Get the number of input features for the classifier
    input_features = model.roi_heads.box_predictor.cls_score.in_features
    
    # Replace the pre-trained head with a new one
    model.roi_heads.box_predictor = FastRCNNPredictor(input_features, numOfClasses)
    
    # Return model
    return model

### Training One Epoch Function

In [None]:
# Training
def trainEpoch(model, optimizer, data_loader, device, epoch):
    # Set model to training mode
    model.train()
    
    # Iterate over the data
    for batch_index, (images, targets) in enumerate(data_loader):
        if (batch_index + 1) == (len(data_loader)):
            print(f"Processing Batch {batch_index + 1}/{len(data_loader)}\n")
        else:
            print(f"Processing Batch {batch_index + 1}/{len(data_loader)}", end='\r')

        # Move images to device
        images = [img.to(device) for img in images]
        
        # Validate and process targets
        processed_targets = []
        valid_images = []
        
        # Iterate over targets
        for i, target in enumerate(targets):
            boxes = []
            labels = []
            
            for obj in target:
                # Extract bounding box coordinates
                bbox = obj['bbox']  # [x, y, width, height]
                x, y, w, h = bbox
                
                # Validate width and height are positive
                if w > 0 and h > 0:
                    boxes.append([x, y, x + w, y + h])  # Convert to [x_min, y_min, x_max, y_max]
                    labels.append(obj['category_id'])
                    
            # If valid boxes
            if boxes:
                processed_target = {
                    "boxes": torch.tensor(boxes, dtype=torch.float32).to(device),  # Corrected key
                    "labels": torch.tensor(labels, dtype=torch.int64).to(device)
                }
                processed_targets.append(processed_target)
                valid_images.append(images[i])
        
        # Skip iteration if no valid targets
        if not processed_targets:
            print(f"Batch {batch_index + 1}: No valid targets, skipping.")
            continue
        
        # Ensure alignment of images and targets
        images = valid_images
        
        # Forward pass
        loss_dict = model(images, processed_targets)
        losses = sum(loss for loss in loss_dict.values())
        
        # Backward pass
        optimizer.zero_grad()
        losses.backward()
        optimizer.step()
        
    print(f"Epoch [{epoch + 1}] Loss: {losses.item():.3f}")

### Training Main Pipeline

#### Loading Dataset and Creating DataLoader

In [None]:
print(f"\n----- <Loading Training Dataset> -----")
# Load training and validation data
training_data = get_dataset(
    img_dir='./trashy-dataset-roboflow.coco/train', 
    ann_file='./trashy-dataset-roboflow.coco/train/_annotations.coco.json'
)
print(f"----- <Dataset Loaded Successfully> -----")

# Create two respective dataloaders
print(f"\n----- <Creating Training DataLoader> -----")
training_dataloader = DataLoader(training_data, batch_size=8, shuffle=True, collate_fn=lambda x: tuple(zip(*x)))
print(f"----- <DataLoader Created Successfully> -----")

#### Load Faster R-CNN Model

In [None]:
# Define the number of classes
numOfClasses = 5 # Background, Mixed Waste -Black Bag-, Organic Waste -White Bag-, Other, Recycled Waste -Grey or Green Bag-

# Initialise the Model
model = getModel(numOfClasses)

# Move model to device if GPU is available
if torch.cuda.is_available():
    device = torch.device('cuda')
    print(f"\nGPU: {torch.cuda.get_device_name(0)} is available - moving model to GPU")
else:
    device = torch.device('cpu')
    print("No GPU available. Moving training to CPU.")

# move model to device
model.to(device)

#### Define Hyperparameters and Train Model

In [None]:
# Define the optimizer and hyperparameters
num_epochs = 15
parameters = [p for p in model.parameters() if p.requires_grad]
optimiser = torch.optim.SGD(parameters, lr=0.005, momentum=0.9, weight_decay=0.0005)
learningRate_scheduler = torch.optim.lr_scheduler.StepLR(optimiser, step_size=3, gamma=0.1)

print(f"\n----- <Training Model> -----")
for epoch in range(num_epochs):
    print(f"\n----- <Starting Epoch {epoch + 1}/{num_epochs}> -----")
    trainEpoch(model, optimiser, training_dataloader, device, epoch)
    learningRate_scheduler.step()
    
    # Save model's if it's the last epoch
    if (epoch + 1) == num_epochs:
        # Create output directory if it doesn't exist
        outputFolder = './modelOut'
        
        if not os.path.exists(outputFolder):
            os.makedirs(outputFolder)
        
        # Save model
        model_path = f"{outputFolder}/model_epoch{epoch + 1}.pth"
        torch.save(model.state_dict(), model_path)
        
        print(f"\nEpoch {epoch + 1}: Model saved at {model_path}")
    
print(f"\n----- <Training Completed> -----")

## Testing

### Helper Functions

In [None]:
# function to load Faster R-CNN with ResNet50 backend
def getModel(numOfClasses):
    # Load pre-trained Faster R-CNN model
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(weights=FasterRCNN_ResNet50_FPN_Weights.DEFAULT)
    
    # Get the number of input features for the classifier
    input_features = model.roi_heads.box_predictor.cls_score.in_features
    
    # Replace the pre-trained head with a new one
    model.roi_heads.box_predictor = FastRCNNPredictor(input_features, numOfClasses)
    
    # Return model
    return model

# function to preprocess image
def preprocess_image(img_path, device):
    # Open image
    img = Image.open(img_path).convert('RGB')
    
    # Convert image to tensor and add batch dimension
    img_tensor = functional.to_tensor(img).unsqueeze(0)
    
    # move image to device and return it
    return img_tensor.to(device)

# Get class name
def get_class_name(class_id, COCO_CLASSES):
    return COCO_CLASSES.get(class_id, 'Unknown') # return 'Unknown' if class_id not found

### Function to Draw Bounding Boxes on Test Images

In [None]:
# Draw bounding box with correct class name and increase image size
def draw_bboxes(output_dir, image, prediction, fig_size, COCO_CLASSES, saved_images_counter, total_images):
    boxes = prediction[0]['boxes'].cpu().numpy() # get predicted bounding boxes
    labels = prediction[0]['labels'].cpu().numpy() # get predicted labels
    scores = prediction[0]['scores'].cpu().numpy() # get predicted scores
    
    # Set a threshold for showing bounding boxes
    threshold = 0.5
    
    # Create a figure and axes using subplots
    fig, ax = plt.subplots(figsize=fig_size)

    # Display the image
    ax.imshow(image)
    
    # Draw bboxes
    for box, label, score in zip(boxes, labels, scores):
        # check is score is above threshold
        if score > threshold:
            # Draw bbox
            x_min, y_min, x_max, y_max = box
            # Get class name
            class_name = get_class_name(label, COCO_CLASSES)
            
            # Draw bbox
            ax.add_patch(
                plt.Rectangle(
                    (x_min, y_min), x_max - x_min, y_max - y_min, 
                    fill=False, 
                    edgecolor='red', 
                    linewidth=2
                )
            )
            # Add class name
            ax.text(
                x_min, y_min, 
                f'{class_name} ({score:.3f})', 
                color='blue', 
                fontsize=10,
            )
            
    # Turn off axis
    ax.axis('off')
    # Saving plt using Image
    fig.savefig(f'{output_dir}/{img}', bbox_inches='tight', pad_inches=0)
    
    # ouput saved image number
    if (saved_images_counter + 1) == (total_images - 1):
        print(f"Saved image {saved_images_counter + 1}/{total_images - 1}")
    else:
        print(f"Saved image {saved_images_counter + 1}/{total_images - 1}", end='\r')

### Main Pipeline

#### Parameters 

In [None]:
# Figure size
fig_size = (8, 8)
# Num of classes
numOfClasses = 5 # Trash, Mixed Waste -Black Bag-, Organic Waste -White Bag-, Other, Recycled Waste -Grey or Green Bag-
# COCO classes - 5 classes
COCO_CLASSES = {0:'Trash', 1:'Mixed Waste -Black Bag-', 2:'Organic Waste -White Bag-', 3:'Other', 4:'Recycled Waste -Grey or Green Bag-'}
# Get testing directory
testing_dir = './trashy-dataset-roboflow.coco/test'
# Output directory
output_dir = './imagesOut'
# saved images counter
saved_images_counter = 0

#### Checking for Device Availability and Loading Model

In [None]:
# Move model to device if GPU is available
if torch.cuda.is_available():
    device = torch.device('cuda')
    print(f"\nGPU: {torch.cuda.get_device_name(0)} is available - moving model to GPU\n")
else:
    device = torch.device('cpu')
    print("\nNo GPU available. Moving training to CPU\n")

# Initialise the Model

# Load trained model with weights_only=True
model = getModel(numOfClasses)
# Load the state dictionary (safe way)
state_dict = torch.load('./modelOut/model_epoch15.pth', weights_only=True)
model.load_state_dict(state_dict)
# Move model to the appropriate device
model.to(device)
# Set model to evaluation mode
model.eval()

#### Obtain Predictions For Each Test Image and Output To Folder

In [None]:
# Create output images directory
if not os.path.exists(output_dir):
    os.makedirs(output_dir)

# Get all test images in the directory
for img in os.listdir(testing_dir):
    # If file is not _annotations.coco.json
    if not img == '_annotations.coco.json':
        # Get full image path
        img_path = os.path.join(testing_dir, img)
        # Convert image to tensor
        image_tensor = preprocess_image(img_path, device)

        # Disable gradient computation
        with torch.no_grad():
            # Get prediction
            prediction = model(image_tensor)   
    
        # Display image with bounding boxes
        draw_bboxes(output_dir, Image.open(img_path), prediction, fig_size, COCO_CLASSES, saved_images_counter, total_images=len(os.listdir(testing_dir)))
        # Increment saved images counter
        saved_images_counter += 1
        
        # Close all figures
        plt.close('all')
        
print("\n----- <Testing Completed> ----- ")

### Precision-Recall and Confusion Matrix 