In [1]:
import os
import pandas as pd
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
import torchvision
from torchvision import transforms
from torchvision.models.detection import FasterRCNN
from torchvision.models.detection.rpn import AnchorGenerator
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models import mobilenet_v3_large, MobileNet_V3_Large_Weights
import torchvision.transforms as transforms
import torchvision.transforms.functional as TF
from torchvision.transforms.functional import to_tensor, normalize
import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2
from PIL import Image, ImageDraw
import cv2
from torchvision.transforms import functional as F
import matplotlib.pyplot as plt

# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
# Custom Dataset for Oxford-IIIT Pet Dataset
class OxfordPetsDataset(Dataset):
    def __init__(self, annotations_file, img_dir, transform=None):
        self.img_labels = pd.read_csv(annotations_file)
        self.img_dir = img_dir
        self.transform = transform
        self.label_map = {'cat': 1, 'dog': 2}

    def __len__(self):
        return len(self.img_labels)

    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.img_labels.iloc[idx, 0])
        image = Image.open(img_path).convert("RGB")
        bbox = self.img_labels.iloc[idx, 4:8].values.astype(float)
        label_str = self.img_labels.iloc[idx, 3]
        label = self.label_map[label_str]

        # Define transformations
        transform = transforms.Compose([
            transforms.ToTensor(),  # Converts to a PyTorch tensor and normalizes to [0, 1]
            transforms.Resize((480, 480)),  # Resizes the image
        ])
        image = transform(image)

        # Convert bbox and label to tensors
        bbox = torch.tensor([bbox], dtype=torch.float32)
        label = torch.tensor([label], dtype=torch.int64)

        target = {'boxes': bbox, 'labels': label}
        return image, target

# Data Augmentation
def get_transforms(train):
    if train:
        return A.Compose([
            A.Resize(height=480, width=480),
            A.HorizontalFlip(p=0.5),
            ToTensorV2(p=1.0)  # Converts to a PyTorch tensor
        ], bbox_params=A.BboxParams(format='pascal_voc', label_fields=['labels']))
    else:
        return A.Compose([
            A.Resize(height=480, width=480),
            ToTensorV2(p=1.0)  # Also for the validation set
        ], bbox_params=A.BboxParams(format='pascal_voc', label_fields=['labels']))

# Custom collate_fn for DataLoader
def collate_fn(batch):
    images, targets = zip(*batch)
    images = [img for img in images]
    targets = [{k: v for k, v in t.items()} for t in targets]
    return images, targets

# Function to create the model
def create_model(num_classes):
    # Load a pre-trained MobileNetV3 model as the backbone using the new weights argument
    backbone = mobilenet_v3_large(weights=MobileNet_V3_Large_Weights.IMAGENET1K_V1).features
    backbone.out_channels = 960

    # Anchor generator and RPN configuration
    anchor_generator = AnchorGenerator(sizes=((32, 64, 128, 256, 512),), aspect_ratios=((0.5, 1.0, 2.0),) * 5)

    # Create the Faster R-CNN model
    model = FasterRCNN(backbone,
                       num_classes=num_classes,
                       rpn_anchor_generator=anchor_generator)

    # Replace the classifier head with the correct number of classes
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

    return model

# Evaluation function
def evaluate(model, data_loader, device):
    model.eval()
    val_loss = 0
    with torch.no_grad():
        for images, targets in data_loader:
            images = list(img.to(device) for img in images)
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

            loss_dict = model(images, targets)
            # Summing up the losses
            losses = sum(loss.item() for loss in loss_dict.values())
            val_loss += losses
    return val_loss / len(data_loader)

In [3]:
# Prepare the dataset
train_data = OxfordPetsDataset(annotations_file='/content/drive/My Drive/dataset/train/_annotations.csv', img_dir='/content/drive/My Drive/dataset/train', transform=get_transforms(train=True))
train_data_loader = DataLoader(train_data, batch_size=4, shuffle=True, collate_fn=collate_fn)

val_data = OxfordPetsDataset(annotations_file='/content/drive/My Drive/dataset/valid/_annotations.csv', img_dir='/content/drive/My Drive/dataset/valid', transform=get_transforms(train=False))
val_data_loader = DataLoader(val_data, batch_size=4, shuffle=False, collate_fn=collate_fn)

In [4]:
# Model and Training Setup
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
num_classes = 3  # Cats, Dogs and background
model = create_model(num_classes).to(device)
optimizer = torch.optim.SGD(model.parameters(), lr=0.005, momentum=0.9, weight_decay=0.0005)

# Training loop with evaluation
num_epochs = 20
for epoch in range(num_epochs):
    model.train()
    train_loss = 0
    for images, targets in train_data_loader:
        images = [img.to(device) for img in images]
        targets_device = [{k: v.to(device) if isinstance(v, torch.Tensor) else v for k, v in t.items()} for t in targets]

        loss_dict = model(images, targets_device)
        losses = sum(loss for loss in loss_dict.values())
        train_loss += losses.item()

        optimizer.zero_grad()
        losses.backward()
        optimizer.step()

    train_loss /= len(train_data_loader)
    val_loss = evaluate(model, val_data_loader, device)

    print(f"Epoch {epoch}: Train loss: {train_loss:.4f}, Val loss: {val_loss:.4f}")

# Save the trained model
torch.save(model.state_dict(), '/content/drive/My Drive/faster_rcnn_oxford_pet.pth')

Downloading: "https://download.pytorch.org/models/mobilenet_v3_large-8738ca79.pth" to /root/.cache/torch/hub/checkpoints/mobilenet_v3_large-8738ca79.pth
100%|██████████| 21.1M/21.1M [00:00<00:00, 97.8MB/s]
  bbox = torch.tensor([bbox], dtype=torch.float32)


Epoch 0: Train loss: 0.2681
Epoch 1: Train loss: 0.2151
Epoch 2: Train loss: 0.1877
Epoch 3: Train loss: 0.1758
Epoch 4: Train loss: 0.1647
Epoch 5: Train loss: 0.1526
Epoch 6: Train loss: 0.1454
Epoch 7: Train loss: 0.1374
Epoch 8: Train loss: 0.1307
Epoch 9: Train loss: 0.1261
Epoch 10: Train loss: 0.1197
Epoch 11: Train loss: 0.1154
Epoch 12: Train loss: 0.1103
Epoch 13: Train loss: 0.1068
Epoch 14: Train loss: 0.1070
Epoch 15: Train loss: 0.1011
Epoch 16: Train loss: 0.0998
Epoch 17: Train loss: 0.0970
Epoch 18: Train loss: 0.0948
Epoch 19: Train loss: 0.0907


In [4]:
# Load the model
def create_model(num_classes):
    backbone = mobilenet_v3_large(weights=MobileNet_V3_Large_Weights.IMAGENET1K_V1).features
    backbone.out_channels = 960
    anchor_generator = AnchorGenerator(sizes=((32, 64, 128, 256, 512),), aspect_ratios=((0.5, 1.0, 2.0),) * 5)
    model = FasterRCNN(backbone, num_classes=num_classes, rpn_anchor_generator=anchor_generator)
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
    return model

model = create_model(3)  # 3 classes: background, cat, dog
model.load_state_dict(torch.load('/content/drive/My Drive/faster_rcnn_oxford_pet.pth'))
model.eval()

# Function to process and predict on each frame
def process_frame(frame):
    # Convert frame to PIL Image and resize
    original_size = frame.shape[1], frame.shape[0]  # Width, Height
    pil_image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)).convert("RGB")
    pil_image_resized = F.to_tensor(pil_image)
    pil_image_resized = F.resize(pil_image_resized, (480, 480))

    # Predict
    with torch.no_grad():
        prediction = model(pil_image_resized.unsqueeze(0))

    # Rescale bounding boxes
    scale_x, scale_y = original_size[0] / 480, original_size[1] / 480
    prediction[0]['boxes'][:, [0, 2]] *= scale_x
    prediction[0]['boxes'][:, [1, 3]] *= scale_y

    return prediction

# Function to draw bounding boxes on the frame
def draw_boxes(frame, predictions):
    # Convert the OpenCV frame to a PIL Image
    pil_image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
    draw = ImageDraw.Draw(pil_image)

    # Draw box with the maximum score
    max_score_index = np.argmax(predictions[0]["scores"].numpy())
    max_box = predictions[0]["boxes"][max_score_index].numpy()
    max_label = predictions[0]["labels"][max_score_index].item()
    max_score = np.round(predictions[0]["scores"][max_score_index].numpy(), decimals=4)

    # Define text font
    try:
        font = ImageFont.truetype("Arial-Bold.ttf", 16)
    except IOError:
        print("Arial font not found, using default font.")
        font = ImageFont.load_default()

    label = "Dog" if max_label == 2 else "Cat"
    draw.rectangle([(max_box[0], max_box[1]), (max_box[2], max_box[3])], outline="red", width=3)

    # Draw text box and text
    text = f"{label}: {(max_score * 100):.2f}%"
    text_size = draw.textsize(text, font=font)
    text_position = (max_box[0], max_box[1] - text_size[1])
    # Draw background rectangle for text
    draw.rectangle([text_position, (text_position[0] + text_size[0], text_position[1] + text_size[1])], fill="red")
    # Draw text
    draw.text(text_position, text, fill="white", font=font)

    # Convert the PIL image back to an OpenCV image
    frame_with_boxes = cv2.cvtColor(np.array(pil_image), cv2.COLOR_RGB2BGR)
    return frame_with_boxes

# Process Video
def process_video(video_path):
    cap = cv2.VideoCapture(video_path)

    if not cap.isOpened():
        print("Error: Unable to open video")
        return

    # Get video properties
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)

    # Define codec and create VideoWriter object
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter('output_video.mp4', fourcc, fps, (width, height))

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        predictions = process_frame(frame)
        frame_with_boxes = draw_boxes(frame, predictions)
        out.write(frame_with_boxes)

    cap.release()
    out.release()

video_path = 'input_video.mp4'
process_video(video_path)

  text_size = draw.textsize(text)
  text_size = draw.textsize(text)
  text_size = draw.textsize(text)
  text_size = draw.textsize(text)
  text_size = draw.textsize(text)
  text_size = draw.textsize(text)
  text_size = draw.textsize(text)
  text_size = draw.textsize(text)
  text_size = draw.textsize(text)
  text_size = draw.textsize(text)
  text_size = draw.textsize(text)
  text_size = draw.textsize(text)
  text_size = draw.textsize(text)
  text_size = draw.textsize(text)
  text_size = draw.textsize(text)
  text_size = draw.textsize(text)
  text_size = draw.textsize(text)
  text_size = draw.textsize(text)
  text_size = draw.textsize(text)
  text_size = draw.textsize(text)
  text_size = draw.textsize(text)
  text_size = draw.textsize(text)
  text_size = draw.textsize(text)
  text_size = draw.textsize(text)
  text_size = draw.textsize(text)
  text_size = draw.textsize(text)
  text_size = draw.textsize(text)
  text_size = draw.textsize(text)
  text_size = draw.textsize(text)
  text_size = 