# Definitions



**Importing Libraries**

In [None]:
import os
import random
import numpy as np
import pandas as pd
import warnings
import cv2
from xml.etree import ElementTree as et
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import torch
import torchvision
from torchvision import transforms as torchtrans  
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2
from albumentations.core.transforms_interface import ImageOnlyTransform
import torchvision.transforms as T
from torch.utils.data import Dataset, DataLoader
from pathlib import Path
from PIL import Image
from torch.utils.data import random_split
from matplotlib import pyplot as plt
from numpy import asarray
from torchmetrics.classification import Accuracy


**Defining Custom Dataset**

In [None]:
class MyDataset(Dataset):
    def __init__(self, root_dir, train=True,mask=False, transforms=None):
        self.split = "train" if train else "test"
        self.root_dir = Path(root_dir)/self.split
        self.transforms = transforms
        self.files = []
        self.mask=mask
        self.df_mask=pd.DataFrame()
        folders = sorted(os.listdir(self.root_dir))
        
        if self.split=="train" :
            if  mask:
                self.df_mask=pd.read_csv(root_dir+"train.csv")
            for folder in folders:
                class_idx= folders.index(folder)
                folder_dir = self.root_dir/folder
                files = os.listdir(folder_dir)
                if(class_idx==0):
                    for x in files:
                        self.files.append({"mask":folder+"/"+x,"file": folder_dir/x, "class": class_idx+1,"flag":False})
                else:
                    for x in files:
                        self.files.append({"mask":folder+"/"+x,"file": folder_dir/x, "class": class_idx+1,"flag":False})
        else:
            self.file=folders
            for file in folders:
                 self.files.append(self.root_dir/file)
    
    def __len__(self):
        return len(self.files)
    
    def __getitem__(self, i):
        
        if self.split == "train":
            item = self.files[i]
            file = item['file']
            # reading the images and converting them to correct size and color    
            img = cv2.imread(str(file))
            img_res = cv2.cvtColor(img, cv2.COLOR_BGR2RGB).astype(np.float32)
            # diving by 255
            img_res /= 255.0
            # recover bounding boxes
            name_file_img=item['mask']
            mask_data=self.df_mask[self.df_mask["image"]==name_file_img]
            xmin=int(mask_data["x1"])+1
            ymin=int(mask_data["y1"])+1
            xmax=int(mask_data["x2"])-1
            ymax=int(mask_data["y2"])-1
            
            # resize bounding boxes
            boxes = []           
            boxes.append([xmin, ymin, xmax, ymax])
            # convert boxes into a torch.Tensor
            boxes = torch.as_tensor(boxes, dtype=torch.int64)            
            labels = torch.tensor(item['class'],dtype=torch.int64)
            labels=labels.unsqueeze(0)
            
            target = {}
            target["boxes"] = boxes
            target["labels"] = labels
                        
            if self.transforms:
            
                sample = {'image' : img_res,
                          'bboxes' : target['boxes'],
                          'labels' : labels
                         }
               
                sample = self.transforms(**sample)
                img_res = sample['image']
                target['boxes'] = torch.as_tensor((sample['bboxes']),dtype=torch.int64)
                
               
            return img_res, target
        else:
            file = self.files[i]
            img = cv2.imread(str(file))
            img_res = cv2.cvtColor(img, cv2.COLOR_BGR2RGB).astype(np.float32)
            img_res /= 255.0
            
            if self.transforms:
                sample = {
                    'image': img_res,
                }
                sample = self.transforms(**sample)
                image = sample['image']
                             
            return image,self.file[i]

**Defining Transformations**

In [None]:
def train_transformations():
    return A.Compose([
        A.Flip(0.5),
        A.ShiftScaleRotate(shift_limit=0.2,scale_limit=0.25, rotate_limit=45, p=0.7),        
        ToTensorV2(p=1.0)
    ], bbox_params={'format': 'pascal_voc', 'label_fields': ['labels']})

def test_transformations():
    return A.Compose([
        ToTensorV2(p=1.0)
    ])

# Training

**Creating Dataloaders**

In [None]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

def collate_fn(batch):
    return tuple(zip(*batch))

root_dir = "../input/aiunict-2023/"
train_dataset = MyDataset(root_dir, train=True,mask=True, transforms= train_transformations())
train_data_loader = DataLoader(train_dataset,batch_size=8,shuffle=True,num_workers=4,collate_fn=collate_fn)


**Importing pretrained FasterRCNN Model**

In [None]:
model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
classes_count = 9 
in_features = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, classes_count)

**Averager class to keep track of the average loss**

In [None]:
class Averager:
    def __init__(self):
        self.current_total = 0.0
        self.iterations = 0.0

    def send(self, value):
        self.current_total += value
        self.iterations += 1

    @property
    def value(self):
        if self.iterations == 0:
            return 0
        else:
            return 1.0 * self.current_total / self.iterations

    def reset(self):
        self.current_total = 0.0
        self.iterations = 0.0

**Defining NMS technique to improve the results**

In [None]:
def apply_nms(orig_prediction, iou_thresh=0.3):
    keep = torchvision.ops.nms(orig_prediction['boxes'], orig_prediction['scores'], iou_thresh)
    final_prediction = orig_prediction
    final_prediction['boxes'] = final_prediction['boxes'][keep]
    final_prediction['scores'] = final_prediction['scores'][keep]
    final_prediction['labels'] = final_prediction['labels'][keep]
    
    return final_prediction

**Setting parameters**

In [None]:
model.to(device)
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.005, momentum=0.9, weight_decay=0.0001)
lr_scheduler =None

**Training Loop**

In [None]:
num_epochs =  10 
loss_hist = Averager()
itr = 1
lossHistoryiter = []
lossHistoryepoch = []
true_lab=[]
import time
start = time.time()

for epoch in range(num_epochs):
    loss_hist.reset()
    model.train()
    for images, targets in train_data_loader:
        
        images = list(image.to(device) for image in images)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
        loss_dict = model(images, targets)  
        
        losses = sum(loss for loss in loss_dict.values())
        loss_value = losses.item()
        
        loss_hist.send(loss_value)
        lossHistoryiter.append(loss_value)
        optimizer.zero_grad()
        losses.backward()
        optimizer.step()

        if itr % 50 == 0:
            print(f"Iteration #{itr} loss: {loss_value}")

        itr += 1
    
    lossHistoryepoch.append(loss_hist.value)
    print(f"Epoch #{epoch} loss: {loss_hist.value}")
     
end = time.time()
hours, rem = divmod(end-start, 3600)
minutes, seconds = divmod(rem, 60)
print("Time taken to Train the model :{:0>2}:{:0>2}:{:05.2f}".format(int(hours),int(minutes),seconds))

# Testing

**Creating Test Dataset**

In [None]:
test_dataset = MyDataset(root_dir, train=False, transforms= test_transformations())

**Running the model on the test dataset and saving the CSV file**

In [None]:
image_list=[]
class_list=[]
for idx in range(test_dataset.__len__()):
    img,name_file = test_dataset[idx]

    model.eval()
    with torch.no_grad():
        prediction = model([img.to(device)])[0]
    nms_prediction = apply_nms(prediction, iou_thresh=0.2)
    pred=nms_prediction['labels'].cpu().numpy()[0]
    
    image_list.append(name_file)
    class_list.append(pred-1)
    
d = {'image': image_list, 'class': class_list}
df = pd.DataFrame(data=d)
df.to_csv("submission.csv",index=False)