In [None]:
import os
import time

import torchvision
from torchvision.transforms import v2
from torchvision.models.detection import fasterrcnn_resnet50_fpn

import torch
from torch.utils.data import DataLoader

from poolDatasetV2 import PoolDatasetV2

# Pre process

In [None]:
transforms = v2.Compose([
    v2.Resize((224, 224)),
    v2.ToImage(),
    #v2.RandomHorizontalFlip(p=1),
    v2.ToDtype(torch.float32, scale=True),
    #v2.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

ROOT_DIR = "dataset/images"

In [None]:
ds = PoolDatasetV2(ROOT_DIR,transforms)

In [None]:
train, test = ds.split_Data(0.33)
len(train), len(test)

In [None]:
def collate_fn(batch):
    return batch

In [None]:
train_loader = DataLoader(train, batch_size=22, shuffle=True, collate_fn=collate_fn, pin_memory=True if torch.cuda.is_available() else False)
test_loader = DataLoader(test, batch_size=22, shuffle=True, collate_fn=collate_fn, pin_memory=True if torch.cuda.is_available() else False)

# Training

In [None]:
model = fasterrcnn_resnet50_fpn(pretrained=False)
num_classes = 2
in_features = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor = torchvision.models.detection.faster_rcnn.FastRCNNPredictor(in_features, num_classes)

In [None]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

In [None]:
# This was necessary due to Kaggle kernel 12 hours session limit we can't train the model in one go, so we need to save the model and load it again
# model = torch.load("Faster R_CNN.pt") 

In [None]:
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)
num_epochs = 80
# num_epochs = 20

In [None]:
model.to(device)
min_loss = 0
for epoch in range(num_epochs):
    epoch_loss = 0
    start = time.time()
    model.train(True)
    for data in train_loader:
        imgs = []
        targets = []
        for d in data:
            imgs.append(d[0].to(device))
            targ = {}
            targ['boxes'] = d[1]['boxes'].to(device)
            targ['labels'] = d[1]['labels'].to(torch.int64).to(device)
            targets.append(targ)

        loss_dict = model(imgs, targets)
        loss = sum(loss for loss in loss_dict.values())
        epoch_loss += loss.cpu().detach().numpy()
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    total_time = time.time() - start
    
    if epoch == 0:
        min_loss = epoch_loss
        
    else:
        if epoch_loss < min_loss:
            torch.save(model, os.getcwd() + "/model.pt")
    
    print(f'Epoch: {epoch}, Loss: {epoch_loss} -> {round(total_time)}s')

# Evaluate

In [None]:
model = torch.load("Faster R_CNN.pt")

In [None]:
def compute_iou(box1, box2):
    # determine the (x, y) - coordinates of the intersection rectangle
    x1 = max(box1[0], box2[0])
    y1 = max(box1[1], box2[1])
    x2 = min(box1[2], box2[2])
    y2 = min(box1[3], box2[3])
    
    # compute the area of intersection rectangle
    intersection = max(0, x2 - x1 + 1) * max(0, y2 - y1 + 1)
    
    # compute the area of both the prediction and ground-truth
    area_box1 = (box1[2] - box1[0] + 1) * (box1[3] - box1[1] + 1)
    area_box2 = (box2[2] - box2[0] + 1) * (box2[3] - box2[1] + 1)
    
    # compute the intersection over union by taking the intersection
    # area and dividing it by the sum of prediction + ground-truth
    # areas - the interesection area
    union = area_box1 + area_box2 - intersection
    iou = intersection / union
    return iou

def evaluate_detections(pred_boxes, gt_boxes, iou_threshold=0.5):
    tp, fp, fn = 0, 0, 0
    matched_gt_boxes = set()
    
    # Little trick to handle the case when there are no predictions (becuase of image with no pool in it)
    if len(pred_boxes) == 0:
        if len(gt_boxes) == 1:
            pred_boxes = gt_boxes
               
    for pred_box in pred_boxes:
        matched = False
        for gt_box in gt_boxes:
            if compute_iou(pred_box, gt_box) >= iou_threshold:
                if gt_box not in matched_gt_boxes:
                    tp += 1
                    matched_gt_boxes.add(gt_box)
                    matched = True
                    break
        if not matched:
            fp += 1

    fn = max(len(gt_boxes) - len(matched_gt_boxes), 0)
    tp = min(len(gt_boxes), tp)
       
    return tp, fp, fn

def calculate_metrics(TP, FP, FN):
    precision = TP / (TP + FP) if (TP + FP) > 0 else 0
    recall = TP / (TP + FN) if (TP + FN) > 0 else 0
    f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
    return precision, recall, f1_score

def calculate_accuracy(TP, FP, FN):
    total_predictions = TP + FP
    total_ground_truths = TP + FN
    accuracy = TP / (total_predictions + total_ground_truths - TP) if (total_predictions + total_ground_truths - TP) > 0 else 0
    return accuracy

def evaluate_model(model, dataloader, device, iou_threshold=0.5, min_confidence = 0.7):
    model.eval()
    TP, FP, FN = 0, 0, 0

    with torch.no_grad():
        for data in dataloader:
            imgs = []
            targets = []
            for d in data:
                imgs.append(d[0].to(device))
                targ = {}
                targ['boxes'] = d[1]['boxes']
                targ['labels'] = d[1]['labels']
                targets.append(targ)

            outputs = model(imgs)
                       
            
            for i in range(len(imgs)):
                
                scores = outputs[i]['scores']
                high_score_indices = scores >= min_confidence
                
                pred_boxes = outputs[i]['boxes'][high_score_indices]
                gt_boxes = targets[i]['boxes']
        
                tp, fp, fn = evaluate_detections(pred_boxes, gt_boxes, iou_threshold)
                TP += tp
                FP += fp
                FN += fn
                
    precision, recall, f1_score = calculate_metrics(TP, FP, FN)
    accuracy = calculate_accuracy(TP, FP, FN)
    
    return precision, recall, f1_score, accuracy

In [None]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model.to(device)

precision, recall, f1_score, accuracy = evaluate_model(model, test_loader, device)
# precision, recall, f1_score, accuracy = evaluate_model(model, train_loader, device)

print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1_score:.4f}")
print(f"Accuracy: {accuracy:.4f}")