In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
import sys

CODE_DIR = "/content/drive/MyDrive/lib-test"
if CODE_DIR not in sys.path:
    sys.path.insert(0, CODE_DIR)
    print(sys.path)

['/content/drive/MyDrive/lib-test', '/content', '/env/python', '/usr/lib/python312.zip', '/usr/lib/python3.12', '/usr/lib/python3.12/lib-dynload', '', '/usr/local/lib/python3.12/dist-packages', '/usr/lib/python3/dist-packages', '/usr/local/lib/python3.12/dist-packages/IPython/extensions', '/root/.ipython']


In [2]:
import cv2
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image

import torch
import torch.nn as nn
import torchvision
import torchvision.transforms.functional as TF
from PIL import Image

import math

import pandas as pd

from pathlib import Path
import os
from typing import List

from metrics import circle_iou, circle_iou_torch
from helpers import draw_two_circles_on_pil

In [6]:
!rsync -a --info=progress2 "/content/drive/MyDrive/Research-Project-Data/processed_data.zip" "/content/"
!unzip processed_data.zip

  1,541,376,315 100%   39.92MB/s    0:00:36 (xfr#1, to-chk=0/1)
Archive:  processed_data.zip
   creating: processed_data/
  inflating: __MACOSX/._processed_data  
   creating: processed_data/Session1_Light/
  inflating: __MACOSX/processed_data/._Session1_Light  
   creating: processed_data/Session2_Light/
  inflating: __MACOSX/processed_data/._Session2_Light  
   creating: processed_data/Session1_Light/Participant7/
  inflating: __MACOSX/processed_data/Session1_Light/._Participant7  
   creating: processed_data/Session1_Light/Participant9/
  inflating: __MACOSX/processed_data/Session1_Light/._Participant9  
   creating: processed_data/Session1_Light/Participant8/
  inflating: __MACOSX/processed_data/Session1_Light/._Participant8  
   creating: processed_data/Session1_Light/Participant1/
  inflating: __MACOSX/processed_data/Session1_Light/._Participant1  
   creating: processed_data/Session1_Light/Participant6/
  inflating: __MACOSX/processed_data/Session1_Light/._Participant6  
   crea

In [13]:
len(train_pars) + len(val_pars) + len(test_pars)

47

In [None]:
import torch
from torch.utils.data import Dataset
from PIL import Image
from torchvision import transforms
import torchvision.transforms.functional as TF
import torchvision.transforms as T

# ImageNet mean and std
mean = (0.485, 0.456, 0.406)
std  = (0.229, 0.224, 0.225)

train_tf = T.Compose([
    T.ToTensor(),
    T.Normalize(mean=mean, std=std)
])

val_tf = T.Compose([
    T.ToTensor(),
    T.Normalize(mean=mean, std=std)
])

class ProcessedDataset(Dataset):
    def __init__(self, image_paths, targets, transform, out_size=(360, 640)):

        self.image_paths = list(image_paths)
        self.targets = list(targets)
        self.out_h, self.out_w = out_size

        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):

        img = Image.open(self.image_paths[idx]).convert("RGB")
        img = self.transform(img)

        y = torch.tensor(self.targets[idx], dtype=torch.float32).clone()    

        return img, y


In [None]:
from torch.utils.data import DataLoader

train_ds = ProcessedDataset(train_images, train_targets, transform=train_tf)
val_ds = ProcessedDataset(val_images, val_targets, transform=val_tf)
test_ds = ProcessedDataset(test_images, test_targets, transform=val_tf)

train_dl = DataLoader(train_ds, batch_size=32, shuffle=True)
val_dl  = DataLoader(val_ds, batch_size=32, shuffle=False)
test_dl = DataLoader(test_ds, batch_size=32, shuffle=False)

In [4]:
from data import get_loaders

train_dl, val_dl, test_dl = get_loaders('./data/processed_data')

In [5]:
from torchvision import models
import torch.nn as nn
from torchvision.models import ResNet18_Weights
import torch
import numpy as np
import cv2
from PIL import Image

from typing import Tuple, Callable, Optional

class CircleRegressor(nn.Module):
    def __init__(self, pretrained=True):
        super().__init__()

        weights = ResNet18_Weights.DEFAULT if pretrained else None
        self.backbone = models.resnet18(weights=weights)
        
        in_features = self.backbone.fc.in_features
        self.backbone.fc = nn.Identity()
        
        self.head = nn.Sequential(
            nn.Linear(in_features, 256),
            nn.ReLU(),
            nn.Linear(256, 6)
        )

    def forward(self, x):
        feats = self.backbone(x)
        out = self.head(feats)
        return out

In [6]:
import torch
import torch.nn.functional as F
import torch.optim as optim
import time
from datetime import datetime
from tqdm import tqdm

import os

device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"device is: {device}")

model = CircleRegressor().to(device)

def circle_loss(preds, targets, w_center=1.0, w_radius=2.0, beta=0.02):
    """
    preds/targets: (B, 6) = [cx1, cy1, r1, cx2, cy2, r2], normalized to [0,1]
    """
    preds = preds.view(-1, 2, 3)
    targets = targets.view(-1, 2, 3)

    pc, pr = preds[..., :2], preds[..., 2]   # centers, radii
    tc, tr = targets[..., :2], targets[..., 2]

    lc = F.smooth_l1_loss(pc, tc, beta=beta, reduction="mean")
    lr = F.smooth_l1_loss(pr, tr, beta=beta, reduction="mean")
    return w_center * lc + w_radius * lr

optimizer = optim.Adam(model.parameters(), lr=1e-3)

epochs = 5
LOSS_FN = circle_loss

train_batches_to_use = int(0.45 * len(train_dl))

print("Training Started ...")
for epoch in range(1, epochs + 1):

    epoch_start = time.time()

    model.train()
    train_loss_sum = 0.0
    train_count = 0

    train_pbar = tqdm(train_dl, total=len(train_dl), desc=f"Train {epoch:02d}/{epochs}", leave=False)
    for imgs, targets in train_pbar:
        imgs = imgs.to(device).float()
        targets = targets.to(device).float()

        preds = model(imgs)
        loss = LOSS_FN(preds, targets)

        optimizer.zero_grad(set_to_none=True)
        loss.backward()
        optimizer.step()

        bs = imgs.size(0)
        train_loss_sum += loss.item() * bs
        train_count += bs

        train_pbar.set_postfix(loss=f"{loss.item():.5f}")

    avg_train_loss = train_loss_sum / max(1, train_count)

    model.eval()
    val_loss_sum = 0.0
    val_count = 0

    val_pbar = tqdm(test_dl, total=len(test_dl), desc=f"Val   {epoch:02d}/{epochs}", leave=False)
    with torch.no_grad():
        for imgs, targets in val_pbar:
            imgs = imgs.to(device).float()
            targets = targets.to(device).float()

            preds = model(imgs)
            loss = LOSS_FN(preds, targets)

            bs = imgs.size(0)
            val_loss_sum += loss.item() * bs
            val_count += bs

            val_pbar.set_postfix(loss=f"{loss.item():.5f}")

    avg_val_loss = val_loss_sum / max(1, val_count)

    epoch_time = time.time() - epoch_start
    finished_at = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

    print(
        f"Epoch {epoch:02d}/{epochs} | Train Loss: {avg_train_loss:.6f} | Val Loss: {avg_val_loss:.6f} "
        f"| Time: {epoch_time:.2f}s | Finished at: {finished_at}"
    )

i = 1
save_path = f"/content/drive/MyDrive/circle_regressor_v{i}.pt"
while os.path.exists(save_path):
    print("Path exists", i)
    i += 1
    save_path = f"/content/drive/MyDrive/circle_regressor_v{i}.pt"

torch.save(model.state_dict(), save_path)

device is: cpu
Training Started ...


                                                                           

KeyboardInterrupt: 

In [28]:
model_path = f"/content/drive/MyDrive/circle_regressor_v1.pt"

device = 'cpu'

model = CircleRegressor(True)
state = torch.load(model_path, map_location=device)
model.load_state_dict(state)
model.eval();

In [23]:
!ls

drive	  processed_data      requirements-colab.txt
__MACOSX  processed_data.zip  sample_data


In [24]:
!ls

drive	  processed_data      requirements-colab.txt
__MACOSX  processed_data.zip  sample_data


In [40]:
import itertools
from typing import Tuple

def evaluate_model_circl_iou(model: CircleRegressor, evaluation_set: 'str'='test', n_batches: int=-1, verbose: bool=False) -> Tuple[float, float]:

    eval_set = {
        "train": train_dl,
        "val": val_dl,
        "test": test_dl
    }[evaluation_set]

    if n_batches > len(eval_set):
        print(f"the chosen set contins less that {n_batches} batches, max n_batch is {len(eval_set)} here")
        print(f"replacing n_batch with {len(eval_set)} ...")
        n_batches = eval_set

    if n_batches == -1:
        eval_iter = eval_set
        n_batches = len(eval_set)
    else:
        print(n_batches)
        eval_iter = itertools.islice(iter(test_dl), n_batches)

    if verbose: print(f"Evaluating {evaluation_set}, on {n_batches=}")

    model = model.to(device)

    running_r_iou = 0.0
    running_g_iou = 0.0

    for images, targets in eval_iter:

        if verbose: print(f"{images.shape=}, {targets.shape=}")

        images = images.to(device)
        targets = targets.to(device)

        outs = model(images)

        r_gt = targets[:, :3]
        g_gt = targets[:, 3:]

        r_preds = outs[:, :3]
        g_preds = outs[:, 3:]

        r_iou = circle_iou_torch(r_gt, r_preds)
        g_iou = circle_iou_torch(g_gt, g_preds)
        
        r_iou = r_iou.detach().mean().item()
        g_iou = g_iou.detach().mean().item()

        if verbose: print(f"{r_iou=}, {g_iou=}")

        running_r_iou += r_iou
        running_g_iou += g_iou

    running_r_iou /= n_batches
    running_g_iou /= n_batches

    return running_r_iou, running_g_iou

def predict_on_cv2_frames(model: CircleRegressor, frame, transform, device='cpu', verbose: bool=False) -> Tuple[Tuple[float, float, float], Tuple[float, float, float]]:

    model.eval()
    model = model.to(device)

    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    frame_pil = Image.fromarray(frame)
    
    x = transform(frame_pil)
    x = x.unsqueeze(0).to(device)

    out = model(x)
    out = out.squeeze(0).detach().cpu().tolist()

    r_pred = tuple(out[:3])
    g_pred = tuple(out[3:])

    return r_pred, g_pred

img = cv2.imread("/content/processed_data/Session1_Light/Participant17/video_frames/img160.jpg")

r, g = predict_on_cv2_frames(model, img, transform=val_tf)


# running_r_iou, running_g_iou = evaluate_model_circl_iou(model, evaluation_set='test', n_batches=2, verbose=True)
# print(f"Average IOU: {running_r_iou=}, {running_g_iou=}")