In [None]:
import torch
import torch.nn as nn
from torchvision.models.detection import FasterRCNN
from torchvision.models.detection.rpn import AnchorGenerator

class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, inplanes, planes, stride=1, downsample=None):
        super().__init__()
        self.add_relu = torch.nn.quantized.FloatFunctional()
        self.conv1 = nn.Conv2d(inplanes, planes, kernel_size=3, stride=stride,
                     padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=1,
                     padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)
        self.downsample = downsample
        self.stride = stride

    def forward(self, x):
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        if self.downsample is not None:
            identity = self.downsample(x)

        
        
        out = self.add_relu.add_relu(out, identity)
        return out


class ResNet(nn.Module):

    def __init__(self, block, layers, num_classes=5):
        super().__init__()

        self.inplanes = 64

        self.conv1 = nn.Conv2d(3, self.inplanes, kernel_size=7, stride=2, padding=3,
                               bias=False)
        self.bn1 = nn.BatchNorm2d(self.inplanes)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        self.layer1 = self._make_layer(block, 64, layers[0])
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2)
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2)
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2)

        
        self.quant = torch.quantization.QuantStub()
        self.dequant = torch.quantization.DeQuantStub()
        
    def _make_layer(self, block, planes, blocks, stride=1):
        downsample = None

        if stride != 1 or self.inplanes != planes:
            downsample = nn.Sequential(
                nn.Conv2d(self.inplanes, planes, 1, stride, bias=False),
                nn.BatchNorm2d(planes),
            )

        layers = []
        layers.append(block(self.inplanes, planes, stride, downsample))

        self.inplanes = planes

        for _ in range(1, blocks):
            layers.append(block(self.inplanes, planes))

        return nn.Sequential(*layers)

    def forward(self, x):

        x = self.quant(x)
        x = self.conv1(x) 
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x) 

        x = self.layer1(x)  
        x = self.layer2(x) 
        x = self.layer3(x) 
        x = self.layer4(x)

        
        x = self.dequant(x)
        return x


def resnet34():
    print('RESNET34 BACKBONE')
    layers=[3, 4, 6, 3]
    
    model = ResNet(BasicBlock, layers)
    return model


def get_object_detection_model(num_classes=5):

    backbone = resnet34()

    
    backbone.out_channels = 512
    
    anchor_generator = AnchorGenerator(sizes=((128, 256, 512),),
                                       aspect_ratios=((0.5, 1.0, 2.0),))

    model = FasterRCNN(backbone, num_classes=num_classes, rpn_anchor_generator=anchor_generator)
    return model


device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = get_object_detection_model(num_classes=5)
model.load_state_dict(torch.load('/kaggle/input/fpus23/saved_models/OD_34/quant_model.pth', map_location=device))
model.to(device)
model.eval()

print("Model loaded successfully on: ", device)


In [None]:
from PIL import Image
import torchvision.transforms as T

transform = T.Compose([
    T.ToTensor()
])

image = Image.open('/kaggle/input/fpus23/Dataset_Plane/FL_PLANE/0zXampIL.png').convert("RGB")
image = transform(image).to(device)

In [None]:
model.eval()

with torch.no_grad():
    outputs = model([image])

print(outputs)

In [None]:
threshold = 0.7
pred_boxes = outputs[0]['boxes'].cpu().numpy()
pred_scores = outputs[0]['scores'].cpu().numpy()
pred_labels = outputs[0]['labels'].cpu().numpy()

for box, score, label in zip(pred_boxes, pred_scores, pred_labels):
    if score > threshold:
        print(f"Detected class {label} with confidence {score} at box {box}")

In [None]:
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from PIL import Image

CLASSES = {1: "Head", 2: "Abdomen", 3: "Arms", 4: "Legs"}
CLASS_COLORS = {
    1: 'red',      
    2: 'blue',      
    3: 'green',    
    4: 'orange'    
}

fig, ax = plt.subplots(1)
ax.imshow(Image.open('/kaggle/input/fpus23/Dataset_Plane/FL_PLANE/0zXampIL.png'))

for box, score, label in zip(pred_boxes, pred_scores, pred_labels):
    if score > threshold:
        color = CLASS_COLORS.get(label, 'white') 
        rect = patches.Rectangle(
            (box[0], box[1]),
            box[2] - box[0],
            box[3] - box[1],
            linewidth=2,
            edgecolor=color,
            facecolor='none'
        )
        ax.add_patch(rect)
        ax.text(
            box[0], box[1],
            f'{CLASSES[label]}: {score:.2f}',
            color='white',
            bbox=dict(facecolor=color, alpha=0.5)
        )

plt.show()

In [8]:
!git clone https://github.com/pytorch/vision.git

fatal: destination path 'vision' already exists and is not an empty directory.


In [9]:
import sys
sys.path.append("/kaggle/working/vision/references/detection")

from engine import train_one_epoch, evaluate
import utils 


In [10]:
%%writefile OD_US_Dataset.py

import os
import numpy as np
import cv2
import torch
from xml.etree import ElementTree as ET
from torchvision import transforms as torchtrans
import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2


class OD_US_Dataset(torch.utils.data.Dataset):
    def __init__(self, ano_path, img_path, transforms=None):
        self.classes = ['bkg', 'Head', 'Abdomen', 'Arms', 'Legs']
        self.ano_path = ano_path
        self.img_path = img_path
        self.transforms = transforms
        self.X = []
        self.Y = []
        self.LA = []

        for obj in os.listdir(ano_path):
            file_name = os.path.join(ano_path, obj, 'annotations.xml')
            dom = ET.parse(file_name)
            names = dom.findall('image')

            for n in names:
                bbox = []
                la = []
                name = n.attrib.get('name')
                lab = n.findall('box')

                if lab:
                    for l in lab:
                        xtl, ytl, xbr, ybr = float(l.attrib.get('xtl')), float(l.attrib.get('ytl')), float(l.attrib.get('xbr')), float(l.attrib.get('ybr'))
                        label = l.attrib.get('label').lower()

                        label_map = {'bkg': 0, 'head': 1, 'abdomen': 2, 'arm': 3, 'legs': 4}
                        if label in label_map:
                            la.append(label_map[label])

                        bbox.append([xtl, ytl, xbr, ybr])

                    img_path_full = os.path.join(img_path, obj, name)
                    self.Y.append(bbox)
                    self.X.append(img_path_full)
                    self.LA.append(la)

    def __getitem__(self, idx):
        img_name = self.X[idx]
        img = cv2.imread(img_name)
        img_res = cv2.cvtColor(img, cv2.COLOR_BGR2RGB).astype(np.float32) / 255.0

        labels = self.LA[idx]
        boxes = np.array(self.Y[idx]).astype(float)

        if len(boxes) == 0: 
            boxes = np.array([[0, 0, 1, 1]], dtype=np.float32)
            labels = [0] 

        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        labels = torch.as_tensor(labels, dtype=torch.int64)
        area = (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0])
        iscrowd = torch.zeros((boxes.shape[0],), dtype=torch.int64)
        image_id = torch.tensor([idx])

        target = {
            "boxes": boxes,
            "labels": labels,
            "area": area,
            "iscrowd": iscrowd,
            "image_id": image_id
        }

        if self.transforms:
            sample = self.transforms(image=img_res,
                                     bboxes=boxes.tolist(),
                                     labels=labels.tolist())

            img_res = sample['image']
            target['boxes'] = torch.tensor(sample['bboxes'], dtype=torch.float32)

        return img_res, target

    def __len__(self):
        return len(self.X)


Overwriting OD_US_Dataset.py


In [11]:
import sys
sys.path.append("/kaggle/working/")
from OD_US_Dataset import OD_US_Dataset

In [7]:
pip install -U albumentations

Collecting albumentations
  Downloading albumentations-2.0.5-py3-none-any.whl.metadata (41 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m41.7/41.7 kB[0m [31m1.9 MB/s[0m eta [36m0:00:00[0m
Collecting albucore==0.0.23 (from albumentations)
  Downloading albucore-0.0.23-py3-none-any.whl.metadata (5.3 kB)
Collecting simsimd>=5.9.2 (from albucore==0.0.23->albumentations)
  Downloading simsimd-6.2.1-cp310-cp310-manylinux_2_28_x86_64.whl.metadata (66 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m66.0/66.0 kB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
Downloading albumentations-2.0.5-py3-none-any.whl (290 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m290.6/290.6 kB[0m [31m13.7 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading albucore-0.0.23-py3-none-any.whl (14 kB)
Downloading simsimd-6.2.1-cp310-cp310-manylinux_2_28_x86_64.whl (632 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m632.7/632.7 kB[0m [31m32.

In [None]:
import os
import random
import numpy as np
import pandas as pd
import warnings
import torch
import torch.nn as nn
import torch.nn.utils.prune as prune
import torch.optim as optim
from torch.utils.data import DataLoader, Subset
import torchvision
from torchvision import transforms as torchtrans
from torchvision.models.detection import FasterRCNN
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.detection.backbone_utils import resnet_fpn_backbone
import cv2
import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2
from engine import train_one_epoch, evaluate
import utils
import transforms as T
from OD_US_Dataset import OD_US_Dataset
warnings.filterwarnings('ignore')

image_path = '/kaggle/input/fpus23/Dataset/four_poses/'
annotation_path = '/kaggle/input/fpus23/Dataset/boxes/annotation/'

torch.backends.cudnn.benchmark = True  

def get_transform(train):
    return A.Compose([
        A.Resize(512, 512),
        ToTensorV2(p=1.0)
    ], bbox_params=A.BboxParams(format='pascal_voc', label_fields=['labels']))

def collate_fn(batch):
    batch = [b for b in batch if b is not None]
    return tuple(zip(*batch)) if batch else None

dataset = OD_US_Dataset(annotation_path, image_path, transforms=get_transform(train=True))
print(f"Dataset Length: {len(dataset)}\n")

torch.manual_seed(1)
indices = torch.randperm(len(dataset)).tolist()
test_split = 0.2
tsize = int(len(dataset) * test_split)

dataset_train = Subset(dataset, indices[:-tsize])
dataset_test = Subset(dataset, indices[-tsize:])

data_loader = DataLoader(dataset_train, batch_size=8, shuffle=True, num_workers=8, pin_memory=True, collate_fn=collate_fn)
data_loader_test = DataLoader(dataset_test, batch_size=8, shuffle=False, num_workers=8, pin_memory=True, collate_fn=collate_fn)

device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
print(f"Using Device: {device}")

def get_object_detection_model(num_classes=5):
    backbone = resnet_fpn_backbone('resnet34', weights="DEFAULT")
    model = FasterRCNN(backbone, num_classes=num_classes)
    return model

num_classes = 5
model = get_object_detection_model(num_classes).to(device)

params = [p for p in model.parameters() if p.requires_grad]
optimizer = optim.SGD(params, lr=0.005, momentum=0.9, weight_decay=0.0005)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)


scaler = torch.cuda.amp.GradScaler()

save_path = '../saved_models/OD_34/'
os.makedirs(save_path, exist_ok=True)

num_epochs = 10 
for epoch in range(num_epochs):
    print(f"Epoch {epoch+1}/{num_epochs} - Training Started...")

    model.train()
    for images, targets in data_loader:
        images = list(image.to(device) for image in images)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        optimizer.zero_grad()

        with torch.cuda.amp.autocast(): 
            loss_dict = model(images, targets)
            losses = sum(loss for loss in loss_dict.values())

        scaler.scale(losses).backward() 
        scaler.step(optimizer)
        scaler.update()

    lr_scheduler.step() 

    print(f"Epoch {epoch+1}/{num_epochs} - Evaluating...")
    evaluate(model, data_loader_test, device=device)

torch.save({
    'epoch': num_epochs,
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict()
}, os.path.join(save_path, "OD_34_ckpt_final.pth"))

print("Training Completed Successfully")


Dataset Length: 9455

Using Device: cuda
Epoch 1/10 - Training Started...
