In [1]:
import json
from PIL import Image
import torch
import random

In [2]:
class COCODataset(torch.utils.data.Dataset):
    def __init__(self, annotation_file, image_dir, classes, transform=None):
        with open(annotation_file, 'r') as f:
            self.coco_data = json.load(f)
        self.image_dir = image_dir
        self.classes = classes
        self.transform = transform

        self.image_annotations = {img['id']: [] for img in self.coco_data['images']}
        for ann in self.coco_data['annotations']:
            if ann['category_id'] == 1:
                self.image_annotations[ann['image_id']].append(ann)
        
        #Identify the background image
        filtered_images = [
            img for img in self.coco_data['images']
            if len(self.image_annotations[img['id']]) == 0
        ]

        #Select 100% of non annotated to be removed
        num_to_remove = int(len(filtered_images) * 1)
        images_to_remove = random.sample(filtered_images, num_to_remove)
        images_to_remove_ids = {img['id'] for img in images_to_remove}

        #Remove the selected images
        self.coco_data['images'] = [
            img for img in self.coco_data['images']
            if img['id'] not in images_to_remove_ids
        ]

        #Remove the annotations as well
        self.coco_data['annotations'] = [
            ann for ann in self.coco_data['annotations']
            if ann['image_id'] in {img['id'] for img in self.coco_data['images']}
        ]

        #Rebuild the annotations
        self.image_annotations = {img['id']: [] for img in self.coco_data['images']}
        for ann in self.coco_data['annotations']:
            if ann['category_id'] == 1:
                self.image_annotations[ann['image_id']].append(ann)

        self.images = self.coco_data['images']
        self.image_ids = [img['id'] for img in self.coco_data['images']]
    
    def __len__(self):
        return len(self.images)
    
    def __getitem__(self, idx):
        image_data = self.images[idx]
        image_id = image_data['id']
        image_path = f"{self.image_dir}/{image_data['file_name']}"
        image = Image.open(image_path).convert("RGB")

        if self.transform:
            image = self.transform(image)

        annotations = self.image_annotations[image_id]
        boxes = []
        labels = []
        for ann in annotations:
            x, y, w, h = ann['bbox']
            boxes.append([x, y, x + w, y + h])
            labels.append(int(ann['category_id']))
        
        boxes = torch.as_tensor(boxes, dtype=torch.float32) if len(boxes) > 0 else torch.empty((0,4))
        labels = torch.as_tensor(labels, dtype=torch.int64) if len(labels) > 0 else torch.zeros((0,), dtype=torch.int64)
        image_id = torch.tensor([image_id])

        target = {
            'boxes': boxes,
            'labels': labels, 
            'image_id': image_id
        }
        return image, target

    

In [3]:
import torchvision
from torchvision import transforms
import numpy as np

In [4]:
print(torch.__version__)
print(torchvision.__version__)

2.5.1
0.20.1


In [5]:
annotations_path = "instances_Train.json"
frames_path = "frames"
classes = {0: "No_hit", 1: "Hit"}
transforms = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

In [6]:
dataset = COCODataset(annotations_path, frames_path, classes, transforms)

In [7]:
from sklearn.model_selection import train_test_split
from torch.utils.data import Subset
from torch.utils.data import DataLoader

In [8]:
remaining_indices = np.arange(len(dataset.image_ids))
train_indices, test_indices = train_test_split(remaining_indices, test_size=0.2, random_state=10)

train_dataset = Subset(dataset, train_indices)
test_dataset = Subset(dataset, test_indices)

In [9]:
def collate_fn(batch):
    max_boxes = max([len(item[1]['boxes']) for item in batch])

    images = []
    targets = []

    for img, target in batch:
        boxes = target['boxes']
        labels = target['labels']

        if len(boxes) < max_boxes:
            pad_size = max_boxes - len(boxes)
            boxes = torch.cat([boxes, torch.empty(0, 4)], dim=0)
            labels = torch.cat([labels, torch.zeros((pad_size, ), dtype=torch.int64)], dim=0)
        
        targets.append({
            'boxes': boxes,
            'labels': labels,
            'image_id': target['image_id']
        })

        images.append(img)
    
    return images, targets

In [10]:
train_loader = DataLoader(train_dataset, batch_size=1, shuffle=True, pin_memory=True, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False, pin_memory=True, collate_fn=collate_fn)

In [11]:
for batch_idx, (images, targets) in enumerate(train_loader):
    print(f"Batch {batch_idx} loaded successfully.")

Batch 0 loaded successfully.
Batch 1 loaded successfully.
Batch 2 loaded successfully.
Batch 3 loaded successfully.
Batch 4 loaded successfully.
Batch 5 loaded successfully.
Batch 6 loaded successfully.
Batch 7 loaded successfully.
Batch 8 loaded successfully.
Batch 9 loaded successfully.
Batch 10 loaded successfully.
Batch 11 loaded successfully.
Batch 12 loaded successfully.
Batch 13 loaded successfully.
Batch 14 loaded successfully.
Batch 15 loaded successfully.
Batch 16 loaded successfully.
Batch 17 loaded successfully.
Batch 18 loaded successfully.
Batch 19 loaded successfully.
Batch 20 loaded successfully.
Batch 21 loaded successfully.
Batch 22 loaded successfully.
Batch 23 loaded successfully.
Batch 24 loaded successfully.
Batch 25 loaded successfully.
Batch 26 loaded successfully.
Batch 27 loaded successfully.
Batch 28 loaded successfully.
Batch 29 loaded successfully.
Batch 30 loaded successfully.
Batch 31 loaded successfully.
Batch 32 loaded successfully.
Batch 33 loaded succ

In [12]:
import torch
from torchvision.models.detection import fasterrcnn_resnet50_fpn

In [13]:
model = fasterrcnn_resnet50_fpn(pretrained=True)

num_classes = len(classes)
in_features = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor.cls_score = torch.nn.Linear(in_features, num_classes)



In [14]:
import torch.optim as optim

In [15]:
params = [
    {'params': model.backbone.parameters(), 'lr': 0.0001},
    {'params': model.rpn.parameters(), 'lr': 0.01},
    {'params': model.roi_heads.parameters(), 'lr': 0.01}
]
optimizer = optim.SGD(params, lr=0.005, momentum=0.9, weight_decay=0.0001)

lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

In [33]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model.to(device)

num_epochs = 10

for epoch in range(num_epochs):
    model.train()
    train_loss = 0

    for images, targets in train_loader:
        images = [img.to(device) for img in images]
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        loss_dict = model(images, targets)
        losses = sum(loss for loss in loss_dict.values())
        #print(loss_dict)
        optimizer.zero_grad()
        losses.backward()
        optimizer.step()
        lr_scheduler.step()
        
        train_loss += losses.item()
    
    print(f"Epoch {epoch+1} training loss: {train_loss/len(train_loader):.4f}")

    del images, targets, loss_dict, losses
    torch.cuda.empty_cache()

print("Finished training")

Epoch 1 training loss: 30.0052
Epoch 2 training loss: 30.0374
Epoch 3 training loss: 29.9313
Epoch 4 training loss: 30.1253
Epoch 5 training loss: 30.1487
Epoch 6 training loss: 30.2467
Epoch 7 training loss: 29.9111
Epoch 8 training loss: 30.1641
Epoch 9 training loss: 30.0033
Epoch 10 training loss: 30.1246
Finished training


In [34]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model.to(device)

model.eval()
all_preds = []
all_labels = []

with torch.no_grad():
    for images, targets in test_loader:
        img = images[0].to(device)
        target = targets[0]
        target = {k: v.to(device) for k, v in target.items()}

        pred = model([img])

        all_preds.append(pred)
        all_labels.append(target)
print("Finished evaluating")
print(all_preds)
print(all_labels)

Finished evaluating
[[{'boxes': tensor([], device='cuda:0', size=(0, 4)), 'labels': tensor([], device='cuda:0', dtype=torch.int64), 'scores': tensor([], device='cuda:0')}], [{'boxes': tensor([], device='cuda:0', size=(0, 4)), 'labels': tensor([], device='cuda:0', dtype=torch.int64), 'scores': tensor([], device='cuda:0')}], [{'boxes': tensor([], device='cuda:0', size=(0, 4)), 'labels': tensor([], device='cuda:0', dtype=torch.int64), 'scores': tensor([], device='cuda:0')}], [{'boxes': tensor([[212.0339, 173.0699, 241.5187, 200.9351]], device='cuda:0'), 'labels': tensor([1], device='cuda:0'), 'scores': tensor([0.2763], device='cuda:0')}], [{'boxes': tensor([], device='cuda:0', size=(0, 4)), 'labels': tensor([], device='cuda:0', dtype=torch.int64), 'scores': tensor([], device='cuda:0')}], [{'boxes': tensor([], device='cuda:0', size=(0, 4)), 'labels': tensor([], device='cuda:0', dtype=torch.int64), 'scores': tensor([], device='cuda:0')}], [{'boxes': tensor([[229.0957, 170.1881, 252.8866, 19

In [18]:
from pycocotools.cocoeval import COCOeval
from pycocotools.coco import COCO
import json

In [31]:
#Prepare the ground truth annotations in COCO format
coco_gt = {'images': [], 'annotations': [], 'categories': []}
for i, target in enumerate(all_labels):
    coco_gt['images'].append({'id': i})
    for j, box in enumerate(target['boxes']):
        coco_gt['annotations'].append({
            'image_id': i,
            'bbox': box.tolist(),
            'category_id': target['labels'][j].item(),
            'id': len(coco_gt['annotations']) + 1
        })

# Save ground truth in COCO format
with open('gt_annotations.json', 'w') as f:
    json.dump(coco_gt, f)

# Prepare predictions in COCO format
coco_pred = {'images': [], 'annotations': []}
for i, pred in enumerate(all_preds):
    coco_pred['images'].append({'id': i})
    if len(pred[0]['boxes']) == 0:
        continue
    for j, (box, score, label) in enumerate(zip(pred[0]['boxes'], pred[0]['scores'], pred[0]['labels'])):
        if score > 0.1:
            coco_pred['annotations'].append({
                'image_id': i,
                'bbox': box.tolist(),
                'category_id': label.item(),
                'score': score.item(),
                'id': len(coco_gt['annotations']) + 1
            })

with open('pred_annotations.json', 'w') as f:
    json.dump(coco_pred, f)

coco_gt = COCO('gt_annotations.json')
coco_pred = COCO('pred_annotations.json')

coco_eval = COCOeval(coco_gt, coco_pred, iouType='bbox')
coco_eval.evaluate()
coco_eval.accumulate()
coco_eval.summarize()

loading annotations into memory...
Done (t=0.00s)
creating index...
index created!
loading annotations into memory...
Done (t=0.00s)
creating index...
index created!
Running per image evaluation...
Evaluate annotation type *bbox*
DONE (t=0.00s).
Accumulating evaluation results...
Please run evaluate() first
DONE (t=0.00s).
 Average Precision  (AP) @[ IoU=0.50:0.95 | area=   all | maxDets=100 ] = -1.000
 Average Precision  (AP) @[ IoU=0.50      | area=   all | maxDets=100 ] = -1.000
 Average Precision  (AP) @[ IoU=0.75      | area=   all | maxDets=100 ] = -1.000
 Average Precision  (AP) @[ IoU=0.50:0.95 | area= small | maxDets=100 ] = -1.000
 Average Precision  (AP) @[ IoU=0.50:0.95 | area=medium | maxDets=100 ] = -1.000
 Average Precision  (AP) @[ IoU=0.50:0.95 | area= large | maxDets=100 ] = -1.000
 Average Recall     (AR) @[ IoU=0.50:0.95 | area=   all | maxDets=  1 ] = -1.000
 Average Recall     (AR) @[ IoU=0.50:0.95 | area=   all | maxDets= 10 ] = -1.000
 Average Recall     (AR) @[

# Testing with video

In [20]:
import cv2

In [41]:
video_path = "fight.mp4"
cap = cv2.VideoCapture(video_path)

output_path = "result_video.avi"
fourcc = cv2.VideoWriter_fourcc(*'XVID')
fps = int(cap.get(cv2.CAP_PROP_FPS))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

model.eval()

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

    pil_img = Image.fromarray(frame_rgb)

    input_frame = transforms(pil_img).unsqueeze(0)

    input_frame = input_frame.to(device)

    with torch.no_grad():
        predictions = model(input_frame)
    
    boxes = predictions[0]['boxes'].cpu().numpy()
    labels = predictions[0]['labels'].cpu().numpy()
    scores = predictions[0]['scores'].cpu().numpy()

    filtered_boxes = boxes[scores > 0.275]
    filtered_labels = labels[scores > 0.275]

    scale_x, scale_y = width / 256, height / 256
    filtered_boxes[:, [0, 2]] *= scale_x
    filtered_boxes[:, [1, 3]] *= scale_y

    for box, label, in zip(filtered_boxes, filtered_labels):
        x1, y1, x2, y2 = x1, y1, x2, y2 = np.clip(box, 0, [width, height, width, height])
        cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), (0, 255, 0), 2)  # Draw bounding box
        cv2.putText(frame, f'Label: {label}', (int(x1), int(y1)-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

    out.write(frame)
    
    # Display the frame with predictions
    cv2.imshow('Video', frame)

    # Exit the loop if 'q' is pressed
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
out.release()
cv2.destroyAllWindows()
