In [2]:
from collections import OrderedDict
from torch.optim.lr_scheduler import StepLR
import torch
import torchvision
from torchvision.ops import nms
from torchvision.ops.boxes import box_convert,box_iou


from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
import os
import cv2
import random
from dataset import ObjectDetectionDataset
from helper import get_train_data_loader, filter_prediction, clean_targets
from conf import *

torch.manual_seed = 0
from torchvision.models.detection import ssdlite320_mobilenet_v3_large
from torchvision.models.detection import ssd, ssd300_vgg16, SSD300_VGG16_Weights
import datetime



In [3]:

def get_object_detection_model(num_classes=NUMBER_OF_CLASSES):
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
    for param in model.parameters():
        param.requires_grad = False
    for param in model.roi_heads.parameters():
        param.requires_grad = True
    for idx, param in enumerate(model.backbone.parameters()):
        if(idx > 56):
            param.requires_grad = True
    return model

def get_ssd_detection_model(num_classes=NUMBER_OF_CLASSES):
    ssd_model = ssd300_vgg16(pretrained=True)
    classification_head = ssd_model.head.classification_head
    freeze_layers = [
        ssd_model.backbone.features,    # Freeze the VGG16 backbone
        ssd_model.backbone.extra,       # Optionally, freeze extra layers
        ssd_model.anchor_generator,     # Freeze the anchor generator
    ]
    for layer in classification_head.module_list:
        layer = torch.nn.Conv2d(layer.in_channels, num_classes * 4, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    for layer in freeze_layers:
        for param in layer.parameters():
            param.requires_grad = False
    return ssd_model

def save_test_img(img, target, prefix):
    # img = img.permute(2,0,1).cpu().numpy()  # Convert to (height, width, channels)
    img = img.cpu().numpy()  # Convert to (height, width, channels)

    # img = img.astype('uint8')
    # img = img
    # Draw bounding boxes on the image
    print(target)
    for box, label in zip(target['boxes'], target['labels']):
        x, y, w, h = box.tolist()
        x, y, w, h = int(x), int(y), int(w), int(h)
        box_color = BOX_COLOR[label.item()]
        cv2.rectangle(img, (x, y), (w, h), box_color, 2)

    # Save the image with bounding boxes
    if not os.path.exists(os.path.join(os.getcwd(), 'test_output')):
        os.makedirs(os.path.join(os.getcwd(), 'test_output'))
    # cv2.imshow(img)
    img_path = f"./test_output/output_image_{prefix}.png"
    cv2.imwrite(img_path, img)
    return img_path


def get_images(image_path):
        # reading the images and converting them to correct size and color
        original_image = cv2.imread(image_path)
        grayscale = to_grayscale(original_image)
        grayscale = normalize_image(grayscale)
        grayscale = torch.from_numpy(grayscale).float()
        grayscale = grayscale.unsqueeze(0)
        
        return grayscale, torch.from_numpy(original_image)

def to_grayscale(image):	
    gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    return gray

def normalize_image(img):
    return img / 255


def train(model, train_loader, optimizer, epoch):
    device = torch.device("mps")
    model.to(device)
    model.train()
    for batch_idx, (data, targets,_) in enumerate(train_loader, 1):
        # Print bounding boxes for debugging
        print(f"=====[ epoch {epoch} batch {batch_idx}  data: {data}")
        print(f"=====[ epoch {epoch} batch {batch_idx}  targets: {targets}")
        data = list(image.to(device) for image in data)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
        optimizer.zero_grad()
        print(f"=====[ epoch {epoch} batch {batch_idx}  before model parse")
        output = model(data, targets)
        print(f"=====[ epoch {epoch} batch {batch_idx}  output of the model: {output}")

        total_loss = sum(output.values())
        total_loss.backward()
        optimizer.step()

def run(model, train_loader):
    epochs = 10
    optimizer = torch.optim.SGD(model.parameters(), lr=0.01, momentum=0.5)
    scheduler = StepLR(optimizer, step_size=10, gamma=0.1)

    for epoch in range(1, epochs + 1):
        train(model, train_loader, optimizer, epoch)
        scheduler.step()



In [4]:
# fasterrcnn = get_object_detection_model()
dataset = get_train_data_loader(16, "../../train_set/")
# timer1 = datetime.datetime.now()
# run(fasterrcnn, dataset)
# timer1 = datetime.datetime.now() - timer1

ssd = get_ssd_detection_model()
timer2 = datetime.datetime.now()
run(ssd, dataset)
timer2 = datetime.datetime.now() - timer2

# model.eval()
# original_image_sizes = []

=====[INFO] Get train data loader training_dir: ./cropped_train
=====[INFO] Got dataset <dataset.ObjectDetectionDataset object at 0x1650feda0>




=====[ epoch 1 batch 1  data: [tensor([[[0.8039, 0.8039, 0.8039,  ..., 0.0275, 0.0314, 0.0314],
         [0.8039, 0.8039, 0.8078,  ..., 0.0196, 0.0235, 0.0235],
         [0.8078, 0.8078, 0.8078,  ..., 0.0196, 0.0235, 0.0275],
         ...,
         [0.0039, 0.0039, 0.0039,  ..., 0.3137, 0.3176, 0.3176],
         [0.0039, 0.0039, 0.0039,  ..., 0.3176, 0.3176, 0.3176],
         [0.0039, 0.0039, 0.0039,  ..., 0.3176, 0.3176, 0.3176]]]), tensor([[[0.8706, 0.8745, 0.8784,  ..., 0.0157, 0.0157, 0.0157],
         [0.8745, 0.8745, 0.8784,  ..., 0.0078, 0.0078, 0.0118],
         [0.8784, 0.8784, 0.8784,  ..., 0.0039, 0.0118, 0.0235],
         ...,
         [0.0000, 0.0000, 0.0039,  ..., 0.4510, 0.4510, 0.4510],
         [0.0000, 0.0078, 0.0196,  ..., 0.4510, 0.4510, 0.4510],
         [0.0000, 0.0118, 0.0275,  ..., 0.4510, 0.4510, 0.4510]]]), tensor([[[0.0039, 0.0039, 0.0078,  ..., 0.1725, 0.1725, 0.1765],
         [0.0000, 0.0000, 0.0039,  ..., 0.1725, 0.1765, 0.1765],
         [0.0000, 0.0000,