In [63]:
import numpy as np
import pandas as pd
import os
import torch
import torchvision
from torchvision import datasets, models
from torchvision.transforms import functional as FT
from torchvision import transforms as T
import pytorch_lightning as pl
from pytorch_lightning.loggers import WandbLogger
import torchmetrics
from torchmetrics import Metric
from torch import nn, optim
from torch.nn import functional as F
from torch.utils.data import DataLoader, sampler, random_split, Dataset
from torch.nn.utils.rnn import pad_sequence
from pytorch_lightning.callbacks import ModelCheckpoint
from torchvision.datasets import CocoDetection
import fiftyone.utils.coco as fouc
import copy
import math
from PIL import Image
import cv2
import albumentations as A 
from collections import defaultdict, deque
import datetime
import time
from tqdm import tqdm # progress bar
from torchvision.utils import draw_bounding_boxes
import matplotlib.pyplot as plt
import sys
%matplotlib inline
print(torch.__version__)
print(torchvision.__version__)
from pycocotools.coco import COCO
from albumentations.pytorch import ToTensorV2
#import wandb
import matplotlib.patches as patches



2.2.1+cu121
0.17.1+cu121


# Hyper paramaters

In [64]:
# Hyperparameters
batch_size = 16
num_epochs= 1
lr = 0.001
image_size = [600, 600]
is_Test = False
wandb_on = False
device_cuda = True
num_workers = 0
if wandb_on:
    wandb.login()

    wandb.init(
        # set the wandb project where this run will be logged
        project="Bachelor0386",
        
        # track hyperparameters and run metadata
        config={
        "architecture": "Faster RCNN",
        "dataset": "CustomDataset",
        "epochs": 1,
        }
    )

if device_cuda:
    device = torch.device("cuda") # use GPU to train
else:
    device = "cpu"


# Transforms

In [65]:

def get_transforms(train=False):
    if True:
        transform = A.Compose([
            A.Resize(image_size[0], image_size[1]), 
            A.HorizontalFlip(p=0.3),
            A.VerticalFlip(p=0.3),
            A.RandomBrightnessContrast(p=0.1),
            A.ColorJitter(p=0.1),
            ToTensorV2()
        ], bbox_params=A.BboxParams(format='coco'))
    # else:
    #     transform = A.Compose([
    #         A.Resize(image_size[0], image_size[1]), 
    #         ToTensorV2()
    #     ], bbox_params=A.BboxParams(format='coco'))
    return transform

# Data Import

In [66]:
class PotholeDetectionClass(datasets.VisionDataset):
    def __init__(self, root, stage='/train', transform=get_transforms, transforms = None, target_transform = None, batch_size = batch_size):
        super().__init__(root, transform,transforms=None, target_transform=None)
        self.stage = stage #train, valid, test
        self.coco = COCO(root + stage + "/_annotations.coco.json") # annotations stored here
        self.ids = list(sorted(self.coco.imgs.keys()))
        self.ids = [id for id in self.ids if (len(self._load_target(id)) > 0)]
        self.batch_size = batch_size

    def _load_image(self, id: int):
        path = self.coco.loadImgs(id)[0]['file_name']
        path = "/" + path
        image = cv2.imread(self.root + self.stage + path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        return image
    def _load_target(self, id):
        return self.coco.loadAnns(self.coco.getAnnIds(id))

    def __getitem__(self, index):
        coco = self.coco
        img_id = self.ids[index]
        ann_ids = coco.getAnnIds(imgIds=img_id)
        target = coco.loadAnns(ann_ids)

        path = coco.loadImgs(img_id)[0]['file_name']

        img = Image.open(os.path.join(self.root, path)).convert('RGB')
        if self.transform is not None:
            img = self.transform(img)

        if self.target_transform is not None:
            target = self.target_transform(target)

        return img, target
    
    def __len__(self):
        return len(self.ids)
    
dataset_path = "/Pothole_coco"
dataset_path = os.getcwd() + dataset_path

coco = COCO(dataset_path + "/train" + "/_annotations.coco.json")
categories = coco.cats
n_classes = len(categories.keys())

train_dataset = PotholeDetectionClass(root=dataset_path, transforms=get_transforms(True))
test_dataset = PotholeDetectionClass(root=dataset_path, stage='/test', transforms=get_transforms(True))
valid_dataset = PotholeDetectionClass(root=dataset_path, stage= "/valid", transform=get_transforms(True))



loading annotations into memory...
Done (t=0.01s)
creating index...
index created!


TypeError: PotholeDetectionClass.__init__() got an unexpected keyword argument 'transforms'

# Model 

In [None]:
model = models.detection.fasterrcnn_mobilenet_v3_large_320_fpn(pretrained=True)
in_features = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor = models.detection.faster_rcnn.FastRCNNPredictor(in_features, n_classes)

# Cuda
model = model.to(device)



# DataLoader

In [None]:
def custom_collate(batch):
    return tuple(zip(*batch))

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers, collate_fn=custom_collate)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers, collate_fn=custom_collate)
valid_loader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers, collate_fn=custom_collate)

# Optimizer

In [None]:
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.01, momentum=0.9, nesterov=True, weight_decay=1e-4)
#optimizer = torch.optim.Adam(params, lr=0.001)
#optimizer = torch.optim.PSO(params, lr=0.001) #particle swarm optimization

# Additional

In [None]:
def plot_image(img_tensor, annotation,phase='train'):

    fig,ax = plt.subplots(1)
    img = img_tensor.cpu().data



    ax.imshow(img.permute(1, 2, 0))
    for idx,box in enumerate(annotation["boxes"]):


      if phase=='test':
        test=1
      else:
        test=annotation['scores'][idx] 
      if test>0.95 :
        xmin, ymin, xmax, ymax = box
        color=['r','g','b','r']
        classes=['no mask','Masked','Improper masking','No-mask']
        # Create a Rectangle patch
        rect = patches.Rectangle((xmin,ymin),(xmax-xmin),(ymax-ymin),linewidth=3,edgecolor=color[annotation['labels'][idx]],facecolor='none')
        ax.text(xmin, ymin, classes[annotation['labels'][idx]],color='black',bbox=dict(facecolor=color[annotation['labels'][idx]], alpha=0.8))
        # Add the patch to the Axes
        ax.add_patch(rect)

    plt.show()

def accuracyMetric(preds,annotations):
    non_accurate=0
    accurate=0
    def csm(A,B,corr):
        if corr:
            B=B-B.mean(axis=1)[:,np.newaxis]
            A=A-A.mean(axis=1)[:,np.newaxis]
        num=np.dot(A,B.T)
        p1=np.sqrt(np.sum(A**2,axis=1))[:,np.newaxis]
        p2=np.sqrt(np.sum(B**2,axis=1))[np.newaxis,:]
        return 1-(num/(p1*p2))
    inds=torch.where((preds['scores'])>0.91)
    distMatrix=csm(np.array(preds['boxes'][inds].cpu()),np.array(annotations['boxes'].cpu()),True)

    for i in range (distMatrix.shape[0]):
        cla=np.argmin(distMatrix[i,:])

        if preds['labels'][i]%3==annotations['labels'][cla]:
            accurate+=1
        else:
            non_accurate+=1
    allSamp=np.max(((accurate+non_accurate),len(annotations['labels'])))
    return (accurate/allSamp)

# Neural Network

In [None]:
class NN(pl.LightningModule):
    def __init__(self, model, optimizer, train_loader, test_loader):
        super().__init__()
        self.model = model
        self.optimizer = optimizer
        self.train_loader = train_loader
        self.test_loader = test_loader
        self.losses_dict = []
        self.loss = nn.HingeEmbeddingLoss()

    def forward(self, images, targets):
        return self.model(images, targets)

    def training_step(self, batch, batch_idx):
        imgs, annotations = batch
        imgs = list(img for img in imgs)
        annotations = [{k: v for k, v in t.items()} for t in annotations]
        

        # Forward pass
        loss_dict = self(imgs,annotations)
        losses = sum(loss for loss in loss_dict.values())
        if wandb_on:
            wandb.log({"train/loss": losses})
        occurrences = np.count_nonzero(annotations[0]['labels'].cpu() == 2)
        occurrences2=np.count_nonzero(annotations[0]['labels'].cpu() == 1)
        occurrences=occurrences/(occurrences2+1 )

        if occurrences>=1:

            occurrences=np.clip(occurrences,1,4)
            loss_dict['loss_classifier']=occurrences*4*loss_dict['loss_classifier']
            print(f'Weighted {occurrences}')

        elif losses<0.2:
            for k,v in zip(loss_dict,loss_dict.values()):
                loss_dict[k]=v*0
        loss = sum(loss for loss in loss_dict.values())
        self.log('train_loss', loss, on_step=True, on_epoch=True, prog_bar=True)
        tensorboard_logs = {'train_loss': loss , 'classifier_loss': loss_dict['loss_classifier'],
                            'box_reg_loss':loss_dict['loss_box_reg']}
        # use key 'log'

        return {"loss": loss, 'log': tensorboard_logs}


    def test_step(self, batch, batch_idx):
        imgs, annotations = batch
        logits = self.model(imgs)
        loss = self.loss(logits, annotations)
        self.log('test_loss', loss)
        return {"test_loss": loss}

    def configure_optimizers(self):
        return self.optimizer

    def train_dataloader(self):
        return self.train_loader

    def test_dataloader(self):
        return self.test_loader

    def load_from_checkpoint(cls, checkpoint_path, model, optimizer, train_loader, test_loader):
        model = model.load_from_checkpoint(checkpoint_path)
        return cls(model, optimizer, train_loader, test_loader)

    

# Training

In [None]:
lightning_module = NN(model, optimizer, train_loader, test_loader)
# Initialize a Lightning Trainer
if wandb_on:
    wandb_logger = WandbLogger(project='Bachelor', job_type='train')
    wandb_logger.watch(model, log="all")
    trainer = pl.Trainer(max_epochs=num_epochs,logger=wandb_logger)  # You can adjust the Trainer options
else:
    trainer = pl.Trainer(max_epochs=num_epochs)
# Start training


trainer.fit(lightning_module, train_loader)

if wandb_on:
    wandb.finish()

GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name  | Type               | Params
---------------------------------------------
0 | model | FasterRCNN         | 18.9 M
1 | loss  | HingeEmbeddingLoss | 0     
---------------------------------------------
18.9 M    Trainable params
58.9 K    Non-trainable params
18.9 M    Total params
75.721    Total estimated model params size (MB)


c:\Users\tobia\AppData\Local\Programs\Python\Python312\Lib\site-packages\pytorch_lightning\trainer\connectors\data_connector.py:441: The 'train_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=11` in the `DataLoader` to improve performance.
c:\Users\tobia\AppData\Local\Programs\Python\Python312\Lib\site-packages\pytorch_lightning\loops\fit_loop.py:298: The number of training batches (30) is smaller than the logging interval Trainer(log_every_n_steps=50). Set a lower value for log_every_n_steps if you want to see logs for the training epoch.


Epoch 0:   0%|          | 0/30 [00:00<?, ?it/s] 

NotImplementedError: 

# Testing

In [None]:
trainer.test(lightning_module, test_loader)

LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
c:\Users\tobia\AppData\Local\Programs\Python\Python312\Lib\site-packages\pytorch_lightning\trainer\connectors\data_connector.py:441: The 'test_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=11` in the `DataLoader` to improve performance.


Testing DataLoader 0:   0%|          | 0/5 [00:00<?, ?it/s]

TypeError: hinge_embedding_loss(): argument 'input' (position 1) must be Tensor, not list