In [None]:
from google.colab import drive

# Mount Google Drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import tensorflow as tf
import pandas as pd

# Load the CSV file
df = pd.read_csv('/content/drive/MyDrive/Aquarium Combined.v2-raw-1024.tensorflow/train/_annotations.csv')
image_dir = '/content/drive/MyDrive/Aquarium Combined.v2-raw-1024.tensorflow/train/'


labels = df['class'].unique()
print(labels)

['starfish' 'shark' 'fish' 'puffin' 'stingray' 'penguin' 'jellyfish']


In [None]:
import os
import pandas as pd
from PIL import Image
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision.transforms import functional as F


class_to_id = {label: i+1 for i, label in enumerate(labels)}



class ObjectDetectionDataset(Dataset):
    def __init__(self, annotations_file, img_dir, transform=None):
        self.img_labels = pd.read_csv(annotations_file)
        self.img_dir = img_dir
        self.transform = transform
        # Assuming you have defined labels earlier
        self.class_to_id = {label: i + 1 for i, label in enumerate(labels)}

    def __len__(self):
        return len(self.img_labels.filename.unique())

    def __getitem__(self, idx):
        img_name = self.img_labels.filename.unique()[idx]
        img_path = os.path.join(self.img_dir, img_name)
        image = Image.open(img_path).convert("RGB")

        # Original dimensions
        original_width, original_height = image.size

        # Filter annotations for the current image
        img_boxes = self.img_labels[self.img_labels['filename'] == img_name]

        boxes = img_boxes[['xmin', 'ymin', 'xmax', 'ymax']].values
        boxes = torch.as_tensor(boxes, dtype=torch.float32)

        # Convert class names to class IDs
        labels = img_boxes['class'].map(self.class_to_id).astype(int)
        labels = torch.as_tensor(labels.to_numpy(), dtype=torch.int64)

        target = {"boxes": boxes, "labels": labels}

        if self.transform:
            image, target = self.transform(image, target)

        # Include original image dimensions in the target dictionary
        target["orig_size"] = torch.tensor([original_height, original_width])

        return image, target


from torch.utils.data.dataloader import default_collate

def collate_fn(batch):
    """
    Custom collate function for handling None values or batches with varying sizes of targets.
    """
    batch = list(filter(lambda x: x is not None, batch))  # Remove None values
    if len(batch) == 0:
        return torch.Tensor()

    images = [item[0] for item in batch]
    targets = [item[1] for item in batch]

    images = default_collate(images)  # Use the default collate function here
    # Targets don't need collating if they are handled as lists in the model

    return images, targets



def transform(image, target):
    original_width, original_height = image.size
    new_width, new_height = 224, 224
    scale_x, scale_y = new_width / original_width, new_height / original_height

    # Resize image
    image = F.resize(image, (new_height, new_width))
    image = F.to_tensor(image)

    # Scale bounding box coordinates
    boxes = target["boxes"]
    boxes[:, [0, 2]] *= scale_x  # Scale x coordinates
    boxes[:, [1, 3]] *= scale_y  # Scale y coordinates
    target["boxes"] = boxes


    return image, target





In [None]:
import random
import torchvision.transforms as transforms
from torchvision.transforms import functional as F

class RandomHorizontalFlip(object):
    """Randomly horizontally flips the Image with the probability p (default = 0.5)"""
    def __init__(self, p=0.5):
        self.p = p

    def __call__(self, image, target):
        if random.random() < self.p:
            image = F.hflip(image)
            width, _ = image.size
            xmin = width - target["boxes"][:, 2]
            xmax = width - target["boxes"][:, 0]
            target["boxes"][:, 0] = xmin
            target["boxes"][:, 2] = xmax
        return image, target

class Resize(object):
    """Resize the image and its bounding boxes"""
    def __init__(self, size):
        self.size = size  # Expected size format: (width, height)

    def __call__(self, image, target):
        original_width, original_height = image.size
        scale_x = self.size[0] / original_width
        scale_y = self.size[1] / original_height
        image = F.resize(image, self.size)
        boxes = target["boxes"]
        boxes[:, [0, 2]] *= scale_x
        boxes[:, [1, 3]] *= scale_y
        target["boxes"] = boxes
        return image, target

class ToTensor(object):
    """Convert PIL Image and target into PyTorch Tensors."""
    def __call__(self, image, target):
        image = F.to_tensor(image)
        return image, target

class ColorJitter(transforms.ColorJitter):
    """Randomly change the brightness, contrast, saturation, and hue of an image."""
    def __call__(self, image, target):
        image = super(ColorJitter, self).__call__(image)
        return image, target

def get_transform(train):
    transforms = []
    transforms.append(Resize((224, 224)))  # Resize the image
    if train:
        transforms.append(RandomHorizontalFlip())  # Randomly flip
        transforms.append(ColorJitter(brightness=0.5, contrast=0.5, saturation=0.5, hue=0.1))  # Color Jitter
    transforms.append(ToTensor())  # Convert to tensor
    return Compose(transforms)

class Compose(object):
    """Composes several transforms together."""
    def __init__(self, transforms):
        self.transforms = transforms

    def __call__(self, image, target):
        for t in self.transforms:
            image, target = t(image, target)
        return image, target


In [None]:
train_dataset = ObjectDetectionDataset(
    annotations_file='/content/drive/MyDrive/Aquarium Combined.v2-raw-1024.tensorflow/train/_annotations.csv',
    img_dir='/content/drive/MyDrive/Aquarium Combined.v2-raw-1024.tensorflow/train',
    transform=get_transform(train=True)  # Apply data augmentations for training
)

val_dataset = ObjectDetectionDataset(
    annotations_file='/content/drive/MyDrive/Aquarium Combined.v2-raw-1024.tensorflow/valid/_annotations.csv',
    img_dir='/content/drive/MyDrive/Aquarium Combined.v2-raw-1024.tensorflow/valid',
    transform=get_transform(train=False)  # Apply only the necessary transformations for validation
)

test_dataset = ObjectDetectionDataset(
    annotations_file='/content/drive/MyDrive/Aquarium Combined.v2-raw-1024.tensorflow/test/_annotations.csv',
    img_dir='/content/drive/MyDrive/Aquarium Combined.v2-raw-1024.tensorflow/test',
    transform=get_transform(train=False)  # Apply only the necessary transformations for testing
)

from torch.utils.data import DataLoader

train_data_loader = DataLoader(train_dataset, batch_size=4, shuffle=True, collate_fn=collate_fn)
val_data_loader = DataLoader(val_dataset, batch_size=16, shuffle=False, collate_fn=collate_fn)
test_data_loader = DataLoader(test_dataset, batch_size=16, shuffle=False, collate_fn=collate_fn)



In [None]:
import torchvision
from torchvision.models.detection import FasterRCNN
from torchvision.models.detection.backbone_utils import mobilenet_backbone
from torchvision.models.detection.rpn import AnchorGenerator

from torchvision.models import mobilenet_v3_large, MobileNet_V3_Large_Weights

def create_mobilenet_v3_backbone(pretrained=True, trainable_layers=3):
    # Specify pretrained weights
    weights = MobileNet_V3_Large_Weights.IMAGENET1K_V1 if pretrained else None

    # Load the MobileNetV3 model
    if weights is not None:
        model = mobilenet_v3_large(weights=weights)
    else:
        model = mobilenet_v3_large(weights=None)

    # Extract the backbone
    backbone = model.features
    backbone.out_channels = 960  # Number of output channels for MobileNetV3's backbone

    # Freeze the desired layers
    for param in model.parameters():
        param.requires_grad = False
    for layer in model.features[-trainable_layers:]:
        for param in layer.parameters():
            param.requires_grad = True

    return backbone



def get_object_detection_model(num_classes):
    # Load the MobileNetV3 backbone
    backbone = create_mobilenet_v3_backbone(pretrained=True)

    # Define the anchor generator for the RPN (Region Proposal Network)
    anchor_generator = AnchorGenerator(sizes=((32, 64, 128, 256, 512),), aspect_ratios=((0.5, 1.0, 2.0),))

    # Define the RoI (Region of Interest) pooler for the detection heads
    roi_pooler = torchvision.ops.MultiScaleRoIAlign(featmap_names=['0'], output_size=7, sampling_ratio=2)

    # Initialize the Faster R-CNN model
    model = FasterRCNN(backbone,
                       num_classes=num_classes,
                       rpn_anchor_generator=anchor_generator,
                       box_roi_pool=roi_pooler)

    return model



# Define the number of classes: 7 classes + background
num_classes = 8

# Initialize the model
model = get_object_detection_model(num_classes)

# Move the model to the GPU if available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

# Define your optimizer, learning rate scheduler, and other training details here


FasterRCNN(
  (transform): GeneralizedRCNNTransform(
      Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
      Resize(min_size=(800,), max_size=1333, mode='bilinear')
  )
  (backbone): Sequential(
    (0): Conv2dNormActivation(
      (0): Conv2d(3, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
      (1): BatchNorm2d(16, eps=0.001, momentum=0.01, affine=True, track_running_stats=True)
      (2): Hardswish()
    )
    (1): InvertedResidual(
      (block): Sequential(
        (0): Conv2dNormActivation(
          (0): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=16, bias=False)
          (1): BatchNorm2d(16, eps=0.001, momentum=0.01, affine=True, track_running_stats=True)
          (2): ReLU(inplace=True)
        )
        (1): Conv2dNormActivation(
          (0): Conv2d(16, 16, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (1): BatchNorm2d(16, eps=0.001, momentum=0.01, affine=True, track_running_stats=True)
   

In [None]:
def iou(boxA, boxB):
    # Determine the (x, y)-coordinates of the intersection rectangle
    xA = max(boxA[0], boxB[0])
    yA = max(boxA[1], boxB[1])
    xB = min(boxA[2], boxB[2])
    yB = min(boxA[3], boxB[3])

    # Compute the area of intersection rectangle
    interArea = max(0, xB - xA) * max(0, yB - yA)

    # Compute the area of both the prediction and ground-truth rectangles
    boxAArea = (boxA[2] - boxA[0]) * (boxA[3] - boxA[1])
    boxBArea = (boxB[2] - boxB[0]) * (boxB[3] - boxB[1])

    # Compute the intersection over union by taking the intersection area and dividing it by the sum of prediction + ground-truth areas - the intersection area
    iou = interArea / float(boxAArea + boxBArea - interArea)

    # Return the intersection over union value
    return iou

def calculate_precision(val_data_loader, model, device, iou_threshold=0.5, confidence_threshold=0.5):
    true_positives = 0
    false_positives = 0
    total_ground_truth = 0

    model.eval()
    model.to(device)

    with torch.no_grad():
        for images, targets in val_data_loader:
            images = list(img.to(device) for img in images)
            outputs = model(images)
            for i, output in enumerate(outputs):
                pred_boxes = output['boxes'].cpu().numpy()
                pred_scores = output['scores'].cpu().numpy()
                pred_labels = output['labels'].cpu().numpy()  # Extract predicted labels
                high_confidence_mask = pred_scores > confidence_threshold
                pred_boxes = pred_boxes[high_confidence_mask]
                pred_scores = pred_scores[high_confidence_mask]
                pred_labels = pred_labels[high_confidence_mask]  # Apply mask to labels as well
                # Apply NMS
                keep_indices = non_max_suppression(pred_boxes, pred_scores, 0.5)
                pred_boxes = pred_boxes[keep_indices]
                pred_scores = pred_scores[keep_indices]
                pred_labels = pred_labels[keep_indices]  # Keep labels consistent with boxes
                gt_boxes = targets[i]['boxes'].cpu().numpy()
                gt_labels = targets[i]['labels'].cpu().numpy()  # Extract ground truth labels
                matched_gt_boxes = []
                for j, pred_box in enumerate(pred_boxes):
                    pred_label = pred_labels[j]  # Get the label for the current predicted box
                    best_iou = 0
                    best_gt_idx = -1
                    for gt_idx, gt_box in enumerate(gt_boxes):
                        gt_label = gt_labels[gt_idx]  # Get the label for the current ground truth box
                        if gt_idx not in matched_gt_boxes and pred_label == gt_label:  # Check for label match
                            current_iou = iou(pred_box, gt_box)
                            if current_iou > best_iou:
                                best_iou = current_iou
                                best_gt_idx = gt_idx
                    if best_iou >= iou_threshold:
                        true_positives += 1
                        matched_gt_boxes.append(best_gt_idx)
                    else:
                        false_positives += 1
                total_ground_truth += len(gt_boxes)
    precision = true_positives / (true_positives + false_positives) if (true_positives + false_positives) > 0 else 0
    recall = true_positives / total_ground_truth if total_ground_truth > 0 else 0

    return precision, recall


In [None]:
import numpy as np

def filter_boxes(boxes, scores, confidence_threshold):
    """
    Filter out boxes with a confidence score below the threshold.

    Args:
    - boxes (list of lists): List of bounding boxes.
    - scores (list): List of confidence scores for each bounding box.
    - confidence_threshold (float): Confidence threshold.

    Returns:
    - Filtered list of boxes and scores.
    """
    filtered_boxes = []
    filtered_scores = []
    for box, score in zip(boxes, scores):
        if score >= confidence_threshold:
            filtered_boxes.append(box)
            filtered_scores.append(score)
    return filtered_boxes, filtered_scores

def non_max_suppression(boxes, scores, threshold):
    """
    Perform non-maximum suppression.

    Args:
    - boxes (list of lists): List of bounding boxes (each box is a list of [x1, y1, x2, y2]).
    - scores (list): List of confidence scores for each bounding box.
    - threshold (float): Overlap threshold for suppressing redundant boxes.

    Returns:
    - List of indices corresponding to the boxes that have been kept.
    """

    # Convert to a numpy array for vectorized operations
    boxes = np.array(boxes)
    scores = np.array(scores)

    # Compute the area of the bounding boxes and sort by the bottom-right y-coordinate of the bounding box
    x1 = boxes[:, 0]
    y1 = boxes[:, 1]
    x2 = boxes[:, 2]
    y2 = boxes[:, 3]
    areas = (x2 - x1 + 1) * (y2 - y1 + 1)
    order = scores.argsort()[::-1]

    keep = []  # Indices of boxes that will be kept
    while order.size > 0:
        i = order[0]  # Index of the current box with the highest score
        keep.append(i)

        # Compute overlap between the current box and the rest
        xx1 = np.maximum(x1[i], x1[order[1:]])
        yy1 = np.maximum(y1[i], y1[order[1:]])
        xx2 = np.minimum(x2[i], x2[order[1:]])
        yy2 = np.minimum(y2[i], y2[order[1:]])

        w = np.maximum(0, xx2 - xx1 + 1)
        h = np.maximum(0, yy2 - yy1 + 1)
        overlap = (w * h) / areas[order[1:]]  # Intersection over area of other boxes

        # Keep boxes with an overlap less than the threshold
        inds = np.where(overlap <= threshold)[0]
        order = order[inds + 1]

    return keep

In [None]:
from torch.utils.tensorboard import SummaryWriter
import itertools
import numpy as np
import torch.optim as optim


%load_ext tensorboard
%tensorboard --logdir runs

# Define your hyperparameters space
hyperparams = {
    'learning_rate': [0.001, 0.0005, 0.0001],
    'num_epochs': [5, 10, 15],
    'batch_size': [4, 8, 16]
}

# Initialize the TensorBoard writer
writer = SummaryWriter('/content/drive/My Drive/TensorBoard/hparam_tuning')

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

for lr, epochs, batch_size in itertools.product(hyperparams['learning_rate'], hyperparams['num_epochs'], hyperparams['batch_size']):
    print(f"\nTraining with learning rate: {lr}, epochs: {epochs}, batch size: {batch_size}")

    model = get_object_detection_model(8)  # Initialize your model
    model.to(device)
    optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=0.0005)
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

    train_data_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)

    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        total_batches = len(train_data_loader)

        for batch_index, (images, targets) in enumerate(train_data_loader, start=1):
            images = list(image.to(device) for image in images)
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
            optimizer.zero_grad()
            loss_dict = model(images, targets)
            losses = sum(loss for loss in loss_dict.values())
            losses.backward()
            optimizer.step()

            running_loss += losses.item()
            print(f"Epoch [{epoch+1}/{epochs}], Batch [{batch_index}/{total_batches}], Loss: {losses.item():.4f}")

        epoch_loss = running_loss / total_batches
        writer.add_scalar(f'Loss/train_lr_{lr}_bs_{batch_size}', epoch_loss, epoch)

        val_precision, val_recall = calculate_precision(val_data_loader, model, device, 0.1, 0.5)
        writer.add_scalar(f'Precision/val_lr_{lr}_bs_{batch_size}', val_precision, epoch)
        writer.add_scalar(f'Recall/val_lr_{lr}_bs_{batch_size}', val_recall, epoch)

        print(f"End of Epoch {epoch+1}/{epochs}, Average Loss: {epoch_loss:.4f}, Precision: {val_precision:.4f}, Recall: {val_recall:.4f}")

    # Log hyperparameters and their final performance metrics
    writer.add_hparams({'lr': lr, 'epochs': epochs, 'batch_size': batch_size},
                       {'hparam/precision': val_precision, 'hparam/recall': val_recall})

    print(f"Completed training with lr={lr}, epochs={epochs}, batch_size={batch_size}. Final Precision: {val_precision:.4f}, Recall: {val_recall:.4f}\n")

writer.close()


In [None]:
from torch.utils.tensorboard import SummaryWriter
import torch.optim as optim
from torch.utils.data import DataLoader
import sys

# Initialize TensorBoard writer
writer = SummaryWriter('/content/drive/My Drive/TensorBoard/final_run')

# Set hyperparameters based on tuning results
batch_size = 4
learning_rate = 0.0001
num_epochs = 15

# Prepare the data loader
train_data_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
val_data_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)

# Initialize the model
model = get_object_detection_model(8)  # Assuming 8 classes + background
# Check for GPU availability
if torch.cuda.is_available():
    device = torch.device("cuda")
    print("GPU is available. Using GPU...")
else:
    device = torch.device("cpu")
    print("GPU is not available. Using CPU...")
model.to(device)

# Set the optimizer
optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay=0.0005)

# Training loop
for epoch in range(num_epochs):
    model.train()
    total_batches = len(train_data_loader)
    for batch_index, (images, targets) in enumerate(train_data_loader, start=1):
        images = list(image.to(device) for image in images)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
        optimizer.zero_grad()
        loss_dict = model(images, targets)
        losses = sum(loss for loss in loss_dict.values())
        losses.backward()
        optimizer.step()

        # Print progress update
        sys.stdout.write(f'\rEpoch {epoch+1}/{num_epochs}, Batch {batch_index}/{total_batches}, Loss: {losses.item():.4f}')
        sys.stdout.flush()

    # Validation step - evaluate precision and recall after each epoch
    precision, recall = calculate_precision(val_data_loader, model, device, iou_threshold=0.1, confidence_threshold=0.5)

    # Log precision and recall to TensorBoard
    writer.add_scalar('Precision/validation', precision, epoch)
    writer.add_scalar('Recall/validation', recall, epoch)

    print(f'\nEpoch {epoch+1}/{num_epochs}, Precision: {precision:.4f}, Recall: {recall:.4f}')
    if epoch == 14:  # Save model at the 15th epoch
        torch.save(model.state_dict(), '/content/drive/My Drive/ModelWeights')
        print("Saved model weights at epoch 15")

# Close the writer when done
writer.close()


GPU is available. Using GPU...
Epoch 1/15, Batch 112/112, Loss: 0.4887
Epoch 1/15, Precision: 0.9167, Recall: 0.0121
Epoch 2/15, Batch 112/112, Loss: 0.3496
Epoch 2/15, Precision: 0.9189, Recall: 0.0374
Epoch 3/15, Batch 112/112, Loss: 0.8015
Epoch 3/15, Precision: 0.8657, Recall: 0.2057
Epoch 4/15, Batch 112/112, Loss: 0.4501
Epoch 4/15, Precision: 0.8315, Recall: 0.2552
Epoch 5/15, Batch 112/112, Loss: 0.3679
Epoch 5/15, Precision: 0.8185, Recall: 0.2926
Epoch 6/15, Batch 112/112, Loss: 0.2776
Epoch 6/15, Precision: 0.7862, Recall: 0.4246
Epoch 7/15, Batch 112/112, Loss: 0.2658
Epoch 7/15, Precision: 0.8465, Recall: 0.4246
Epoch 8/15, Batch 112/112, Loss: 0.6420
Epoch 8/15, Precision: 0.8440, Recall: 0.4048
Epoch 9/15, Batch 112/112, Loss: 0.7402
Epoch 9/15, Precision: 0.8223, Recall: 0.4631
Epoch 10/15, Batch 112/112, Loss: 0.9006
Epoch 10/15, Precision: 0.7803, Recall: 0.5314
Epoch 11/15, Batch 112/112, Loss: 0.2207
Epoch 11/15, Precision: 0.7992, Recall: 0.4686
Epoch 12/15, Batch 

In [None]:
from torchvision.transforms import functional as F

def val_transform(image, target):
    # Example transformation: Resize and convert to tensor
    image = F.resize(image, (224, 224))
    image = F.to_tensor(image)
    # Apply similar transformations to target if necessary
    return image, target


# Usage example
val_dataset = ObjectDetectionDataset(
    annotations_file='/content/drive/MyDrive/Aquarium Combined.v2-raw-1024.tensorflow/valid/_annotations.csv',
    img_dir='/content/drive/MyDrive/Aquarium Combined.v2-raw-1024.tensorflow/valid/',
     transform=get_transform(train=False)
)

# Initialize the DataLoader for the validation dataset
val_data_loader = DataLoader(
    val_dataset,
    batch_size=64,  # Adjust as per your GPU memory
    shuffle=False,
    collate_fn=collate_fn  # Your custom collate function
)


In [None]:
import matplotlib.pyplot as plt
import matplotlib.patches as patches
id_to_class = {v: k for k, v in class_to_id.items()}
def visualize_with_boxes(image, boxes, labels, scores):
    """
    Visualize the image with bounding boxes and labels.

    Args:
    - image (tensor): The image tensor.
    - boxes (array): The bounding boxes array.
    - labels (array): The labels array corresponding to the boxes.
    - scores (array): The confidence scores for each box.
    """
    # Convert image tensor to numpy for visualization
    image_np = image.permute(1, 2, 0).cpu().numpy()
    plt.figure(figsize=(8, 8))
    plt.imshow(image_np)
    ax = plt.gca()

    for box, label, score in zip(boxes, labels, scores):
        # Create a Rectangle patch
        rect = patches.Rectangle((box[0], box[1]), box[2] - box[0], box[3] - box[1],
                                 linewidth=2, edgecolor='r', facecolor='none')
        ax.add_patch(rect)

        # Add label and score text
        plt.text(box[0], box[1], f'{id_to_class[label]}: {score:.2f}',
                 bbox=dict(facecolor='yellow', alpha=0.5), fontsize=8, color='black')

    plt.show()


# # Set model to evaluation mode and disable gradients
model.eval()
torch.set_grad_enabled(False)

# Process a single batch from the validation data loader
images, _ = next(iter(val_data_loader))  # Get the next batch of images and targets
images = [img.to(device) for img in images]  # Move images to the appropriate device

outputs = model(images)  # Get predictions from the model
outputs = [{k: v.to('cpu') for k, v in t.items()} for t in outputs]  # Move everything to CPU
confidence_threshold = 0.5
# For each image in the batch
for i in range(len(images)):
    # Extract boxes, labels, and scores
    pred_boxes = outputs[i]['boxes'].numpy()
    pred_scores = outputs[i]['scores'].numpy()
    pred_labels = outputs[i]['labels'].numpy()
    print(pred_scores)
    high_confidence_mask = pred_scores > confidence_threshold
    pred_boxes = pred_boxes[high_confidence_mask]
    pred_scores = pred_scores[high_confidence_mask]



    # Apply NMS
    keep_indices = non_max_suppression(pred_boxes, pred_scores, 0.3)
    final_boxes = pred_boxes[keep_indices]
    final_labels = pred_labels[keep_indices]
    final_scores = pred_scores[keep_indices]

    # Visualize predictions for each image
    visualize_with_boxes(images[i], final_boxes, final_labels, final_scores)

In [None]:
# Call the function
precision, recall = calculate_precision(val_data_loader, model, device, 0.2, 0.5)
print(f'Precision: {precision}')
print(f'Recall: {recall}')


Precision: 0.7140804597701149
Recall: 0.5467546754675467


In [None]:

# Call the function
precision, recall = calculate_precision(test_data_loader, model, device, 0.2, 0.5)
print(f'Precision: {precision}')
print(f'Recall: {recall}')


Precision: 0.760806916426513
Recall: 0.4520547945205479


In [None]:
import cv2
from google.colab import drive
from google.colab.patches import cv2_imshow
# Mount Google Drive


# Path to your MP4 file in Google Drive
mp4_path = '/content/drive/My Drive/CoralReef.mp4'
id_to_class = {v: k for k, v in class_to_id.items()}



import torch
import cv2
import numpy as np
from PIL import Image

# Assume 'model' is your object detection model and it's already loaded and moved to the correct device (CPU or GPU)

# Function to preprocess input images
def preprocess_image(frame):
    # Convert the BGR image to RGB, then convert to PIL Image
    img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))

    # Transform the image as required by your model (e.g., resizing, normalization)
    transform = torchvision.transforms.Compose([
        torchvision.transforms.Resize((224, 224)),  # Example resize, adjust to your model's input size
        torchvision.transforms.ToTensor(),
    ])
    img = transform(img)

    return img

def draw_boxes(frame, boxes, labels, scores, threshold=0.5):
    for box, label, score in zip(boxes, labels, scores):
        if score > threshold:
            # Convert box coordinates to integers
            x1, y1, x2, y2 = box.int().tolist()  # Converts to int and then to a list of Python integers

            # Apply scale factors to adjust the box size back to the frame's dimensions
            x1 = int(x1 * scale_x)
            y1 = int(y1 * scale_y)
            x2 = int(x2 * scale_x)
            y2 = int(y2 * scale_y)

            # Draw rectangles and text on the frame
            cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)

            # Ensure the label is a string if you want to display it
            label_str = str(label) if isinstance(label, int) else label
            cv2.putText(frame, f'{id_to_class[label_str.item()]}: {score:.2f}', (x1, y1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0,255,0), 2)
    return frame

# Correct place to define VideoWriter, after obtaining original frame dimensions
cap = cv2.VideoCapture(mp4_path)
ret, frame = cap.read()  # Read once to get the original size
if ret:
    original_height, original_width = frame.shape[:2]

# Now define VideoWriter with the correct frame size
fourcc = cv2.VideoWriter_fourcc(*'MP4V')  # Define the codec for .mp4 files
out = cv2.VideoWriter('/content/drive/My Drive/CoralReefUpdated2.mp4', fourcc, 20.0, (original_width, original_height))

# If the first read was successful, rewind or re-initialize the capture to start from the first frame again
cap.set(cv2.CAP_PROP_POS_FRAMES, 0)


while True:
    ret, frame = cap.read()
    if not ret:
        break  # Exit loop if no more frames to read

# Calculate scale factors
    scale_x = original_width / 224
    scale_y = original_height / 224
    if not ret:
        break  # Break the loop if there are no frames left to read

    # Preprocess the frame
    img = preprocess_image(frame)
    img = img.unsqueeze(0)  # Add batch dimension
    img = img.to(device)  # Move to the same device as your model

    # Perform inference
    with torch.no_grad():
        model.eval()
        predictions = model(img)


    # Assuming 'predictions' is the output from your model

# Ensure pred_boxes and pred_scores are tensors and moved to the correct device
    pred_boxes = predictions[0]['boxes'].to(device)  # Ensure it's on the same device as model
    pred_scores = predictions[0]['scores'].to(device)

# Convert boxes and scores to float32 if not already
    pred_boxes = pred_boxes.float()
    pred_scores = pred_scores.float()

# Apply NMS
    indices = torchvision.ops.nms(pred_boxes, pred_scores, 0.1)  # Adjust threshold as needed

# Use indices to select boxes, labels, and scores
    nms_boxes = pred_boxes[indices]
    nms_labels = predictions[0]['labels'][indices].to(device)  # Adjusting for device as well
    nms_scores = pred_scores[indices]


    # Draw bounding boxes on the original frame
    frame = draw_boxes(frame, nms_boxes, nms_labels, nms_scores, threshold=0.5)
    out.write(frame)




    # Press 'q' to exit early
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# Release everything if job is finished
cap.release()
out.release()
cv2.destroyAllWindows()
