In [9]:
import os
import torch
import torchvision
from torch.utils.data import DataLoader, Dataset
from PIL import Image
from torchvision.transforms import transforms
from xml.etree import ElementTree as ET
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
import torch.optim as optim
import matplotlib.pyplot as plt
import cv2
import numpy as np

In [2]:
#defining the dataset class
class FrisbeeDataset(Dataset):
    def __init__(self, root, transforms=None):
        self.root = root
        self.transforms = transforms
        self.image_dir = os.path.join(root, 'images')
        self.annotation_dir = os.path.join(root, 'annotations')
        self.image_files = sorted(os.listdir(self.image_dir))
        
    def __getitem__(self, index):
        img_path = os.path.join(self.image_dir, self.image_files[index])
        annotation_path = os.path.join(self.annotation_dir, self.image_files[index][:-4] + '.xml')
        
        #check if the annotation file exists
        if not os.path.exists(annotation_path):
            raise FileNotFoundError(f'Annotation file {annotation_path} not found for image {img_path}')
        
        img = Image.open(img_path).convert('RGB')
        
        boxes = []
        labels = []
        tree = ET.parse(annotation_path)
        root = tree.getroot()
        
        for obj in root.iter('object'):
            xmin = float(obj.find('bndbox/xmin').text)
            ymin = float(obj.find('bndbox/ymin').text)
            xmax = float(obj.find('bndbox/xmax').text)
            ymax = float(obj.find('bndbox/ymax').text)
            boxes.append([xmin, ymin, xmax, ymax])
            label_name = obj.find('name').text
            if label_name == 'Flying-disc':
                labels.append(1)
            
            if len(boxes) == 0:
                raise ValueError(f'No objects found in {annotation_path}')
            
            boxes = torch.as_tensor(boxes, dtype=torch.float32)
            labels = torch.as_tensor(labels, dtype=torch.int64)
            image_id = torch.tensor([index])
            area = (boxes[:,2] - boxes[:,0]) * (boxes[:,3] - boxes[:,1])
            iscrowd = torch.zeros((len(boxes),), dtype=torch.int64)
            
            target = {'boxes': boxes, 'labels': labels, 'image_id': image_id, 'area': area, 'iscrowd': iscrowd}
            
            if self.transforms is not None: 
                img = self.transforms(img)
                
            return img, target
        
    def __len__(self):
        return len(self.image_files)

In [3]:
#define model returning function

def get_faster_rcnn_model(num_classes):
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
    return model

In [4]:
data_transform = transforms.Compose([
    transforms.ToTensor(), 
    transforms.RandomHorizontalFlip(0.5)
])

def custom_collate_fn(batch):
    return tuple(zip(*batch))

dataset = FrisbeeDataset('Frisbee_Data', transforms=data_transform)
data_loader = DataLoader(dataset, batch_size=2, shuffle=True, num_workers=0, collate_fn=custom_collate_fn)

In [5]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model = get_faster_rcnn_model(num_classes=2).to(device)

optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9, weight_decay=0.0005)
lr_scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)



In [6]:
num_epochs = 20
for epoch in range(num_epochs):
    model.train()
    epoch_loss = 0
    for images, targets in data_loader:
        images = list(image.to(device) for image in images)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
        
        loss_dict = model(images, targets)
        
        losses = sum(loss for loss in loss_dict.values())
        optimizer.zero_grad()
        losses.backward()
        optimizer.step()
        
        epoch_loss += losses.item()
    
    lr_scheduler.step()
    
    print(f"{epoch + 1}/{num_epochs} | Loss: {epoch_loss / len(data_loader):.4f}")

1/20 | Loss: 0.1579
2/20 | Loss: 0.1121
3/20 | Loss: 0.1056
4/20 | Loss: 0.1030
5/20 | Loss: 0.1063
6/20 | Loss: 0.1070
7/20 | Loss: 0.1009
8/20 | Loss: 0.0917
9/20 | Loss: 0.1029
10/20 | Loss: 0.1021
11/20 | Loss: 0.1003
12/20 | Loss: 0.0981
13/20 | Loss: 0.1032
14/20 | Loss: 0.0976
15/20 | Loss: 0.1034
16/20 | Loss: 0.1042
17/20 | Loss: 0.1004
18/20 | Loss: 0.1051
19/20 | Loss: 0.0976
20/20 | Loss: 0.1018


In [7]:
save_path = 'frisbee_model.pth'
torch.save(model.state_dict(), save_path)
print(f'Model saved at {save_path}')

Model saved at frisbee_model.pth


In [20]:
#inference
model.eval()

def draw_boxes(image, boxes):
    for box in boxes:
        x1, y1, x2, y2 = map(int, box)
        cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 3)
    return image

def detect_objects(image):
    print('Detecting objects in image...')
    image = Image.open(image).convert('RGB')
    transform = transforms.Compose([transforms.ToTensor()])
    image_tensor = transform(image).unsqueeze(0).to(device)
    
    with torch.no_grad():
        prediction = model(image_tensor)
        
    #extract the bounding boxes
    boxes = prediction[0]['boxes'].cpu().numpy()
    scores = prediction[0]['scores'].cpu().numpy()
    selected_boxes = boxes[scores > 0.5]
    
    image_np = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)
    result_image = draw_boxes(image_np, selected_boxes)

    cv2.imwrite('result.jpg', result_image)
    print('Object detection complete. Output saved as result.jpg')
    
test_path = "test.jpg"    
detect_objects(test_path)

Detecting objects in image...
Object detection complete. Output saved as result.jpg


In [24]:
#video detection
def detect_objects_video(video_path):
    print('Detecting objects in video...')
    cap = cv2.VideoCapture(video_path)
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    
    #define codec and create video writer object
    fourcc = cv2.VideoWriter_fourcc(*'XVID')
    out = cv2.VideoWriter('output3.avi', fourcc, fps, (frame_width, frame_height))
    
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        
        image_tensor = transforms.ToTensor()(frame).unsqueeze(0).to(device)
        
        with torch.no_grad():
            prediction = model(image_tensor)
            
        boxes = prediction[0]['boxes'].cpu().numpy()
        scores = prediction[0]['scores'].cpu().numpy()
        selected_boxes = boxes[scores > 0.5]
        
        result_frame = draw_boxes(frame, selected_boxes)
        out.write(result_frame)
        
    cap.release()
    out.release()
    cv2.destroyAllWindows()
    print('Object detection complete. Output saved as output.avi')
    
    
video_path = 'test3.mp4'
detect_objects_video(video_path)

Detecting objects in video...
Object detection complete. Output saved as output.avi
