In [None]:
import torch
import torch.nn as nn
from torch.optim import SGD
from torch.utils.data import DataLoader
from shapely.geometry import Polygon
import os
import PIL
import pandas as pd
import numpy as np

Initializing the pre-trained model


In [None]:
yolo = torch.hub.load('ultralytics/yolov5','custom', path = 'Path of your pre-trained model')

Path to the dataset

In [None]:
# all images path
base_img_train_path = 'Training image folder'
base_label_train_path = 'Lable folder of training image'
base_img_test_path = 'Testing image folder'
base_label_test_path = 'Lable folder for testing images'

Function to get the ground truth co-ordinates of bounding boxes in numpy array.

In [None]:
def get_labels_into_pandas(img,base_label_path):
    img = img[:len(img)-3]+'txt'
    path = base_label_path+img
    df = pd.read_csv(path,delimiter = ' ',header  = None)
    df.drop(0, inplace=True, axis=1)
    return np.array(df)

Function to get the results from YOLOV5 pre trained model

In [None]:
def get_yolo_results(img,base_img_path):
    result = yolo(base_img_path+img)
    df = result.pandas().xywhn[0]
    df.drop('confidence',inplace = True,axis = 1)
    df.drop('class',inplace = True,axis = 1)
    df.drop('name',inplace = True,axis = 1)
    return np.array(df)

This function sorts ground truth labels according to the results we get whiles passing images to YOLOV5 model; such that absolute distance between ground truth and prediction in minimum.

In [None]:
def get_sorted(x,y):
    res = []
    for i in x:
        min_diff = 100000
        im = y[0]
        for j in y:
            diff = abs(i[0]-j[0])+abs(i[1]-j[1])+abs(i[2]-j[2])+abs(i[3]-j[3])
            if(diff<min_diff):
                min_diff = diff
                im = j
        res.append(im)
    return np.array(res)

Function to compute Intersection of Union

In [None]:
def compute_IOU(b1,b2):
    xmin1,xmax1,ymin1,ymax1 = b1[0],b1[1],b1[2],b1[3]
    xmin2,xmax2,ymin2,ymax2 = b2[0],b2[1],b2[2],b2[3]
    p1 = Polygon([[xmin1,ymin1],[xmax1,ymin1],[xmax1,ymax1],[xmin1,ymax1]])
    p2 = Polygon([[xmin2,ymin2],[xmax2,ymin2],[xmax2,ymax2],[xmin2,ymax2]])
    a = p1.intersection(p2).area
    b = p1.union(p2).area
    if(b!=0):
      iou = a / b
      return iou
    else:
      return 0

Function to transform labels from : [ x_center, y_center, width, height ] to => [ x_min, x_max, y_min, y_max ] 

In [None]:
def transform_y(y,w,h):
    y[:,0] *= w
    y[:,1] *= h
    y[:,2] *= w
    y[:,3] *= h
    for i in range(len(y)):
        xmin = y[i][0]-(y[i][2]/2)
        xmax = y[i][0]+(y[i][2]/2)
        ymin = y[i][1]-(y[1][3]/2)
        ymax = y[i][1]-(y[1][3]/2)
        y[i][0],y[i][1],y[i][2],y[i][3] = xmin/w,xmax/w,ymin/h,ymax/h
    return y

Function to write results we get from DeepQNetwork to csv

In [None]:
def write_csv(full_y,img):
  img = img[:len(img)-3]+'csv'
  path = 'Path where you wan to save the results'+img
  df = pd.DataFrame(full_y)
  df.to_csv(path,index = False)

This function computes minimum squared error

In [None]:
def get_mse(y1,y2):
    s = 0
    for i in range(len(y1)):
        s = (y1[i] - y2[i])**2
    return s/len(y1)

Defining the Deep Q Network network

In [None]:
class DeepQNetwork(nn.Module):
    def __init__(self):
        super(DeepQNetwork, self).__init__()
        self.hidden1 = nn.Linear(5,100)
        nn.init.xavier_uniform_(self.hidden1.weight)
        self.activation1 = nn.Tanh()
        self.d1 = nn.Dropout(p = 0.2)
        self.hidden2 = nn.Linear(100,1000)
        nn.init.xavier_uniform_(self.hidden2.weight)
        self.activation2= nn.Tanh()
        self.d2 = nn.Dropout(p = 0.2)
        self.hidden3= nn.Linear(1000,4)
        nn.init.xavier_uniform_(self.hidden3.weight)
        self.activation3 = nn.Tanh()
        
    def forward(self,X):
        X = self.hidden1(X)
        X = self.activation1(X)
        X = self.d1(X)
        X = self.hidden2(X)
        X = self.activation2(X)
        X = self.d2(X)
        X = self.hidden3(X)
        X = self.activation3(X)
        return X

Function to train the model

In [None]:
def train(model):
    loss_l = []
    mse = nn.MSELoss()
    opt = SGD(model.parameters(),lr = 0.01,momentum = 0.9)
    cnt = 0
    img_list = os.listdir(base_img_train_path)
    loss_list = []
    for img in img_list:
        w,h = PIL.Image.open(base_img_path+img).size
        x = torch.from_numpy(get_yolo_results(img,base_img_train_path))
        y = get_sorted(x,get_labels_into_pandas(img,base_img_train_path))
        y = transform_y(y,w,h)
        y = torch.from_numpy(y)
        reward = torch.tensor([1])
        if (len(x)!=0 and len(y)!=0):
            for epoch in range(1):
                for i in range(len(x)):
                    opt.zero_grad()
                    new_x = torch.cat((x[i],reward))
                    yhat = model(new_x.float())
                    loss = mse(yhat.float(),y[i].float())
                    loss.backward()
                    iou = compute_IOU(yhat,y[i])
                    if(iou>0.8):
                        reward = torch.tensor([1])
                    else:
                        reward = torch.tensor([-1])
                    opt.step()
        loss_list.append(loss)
        if(cnt%200==0):
            print('Training sample = ',cnt)
            print('loss = ',loss)
        cnt+=1
    torch.save(model.state_dict(),'/content/drive/MyDrive/DenseQNet.pt')
    loss_df = pd.DataFrame(loss_list)
    loss_df.to_csv('/content/drive/MyDrive/loss.csv',index = False)

Funcion to test the model. It stores co-ordinates of bounding box to a csv file.

In [None]:
def test(model):
  mse_l = []
  iou_l = []
  img_list = os.listdir(base_img_test_path)
  cnt = 0
  for img in img_list:
    w,h = PIL.Image.open(base_img_test_path+img).size
    x = torch.from_numpy(get_yolo_results(img,base_img_test_path))
    y_org = get_sorted(x,get_labels_into_pandas(img,base_label_test_path))
    y = transform_y(y_org,w,h)
    y = torch.from_numpy(y)
    reward = torch.tensor([1])
    iou = mse = 0
    full_y = []
    if (len(x)!=0 and len(y)!=0):
        for i in range(len(x)):
            new_x = torch.cat((x[i],reward))
            yhat = model(new_x.float())
            mse += get_mse(yhat,y[i]).float()
            iou += compute_IOU(y_org[i],x[i])
            full_y.append(list(map(abs,yhat.tolist())))
    iou_l.append(iou/len(y_org))
    mse_l.append(mse/len(y_org))
    write_csv(full_y,img)
    if(cnt%20==0):
      print('Testing Sample = ',cnt)
    cnt+=1

In [None]:
model = DeepQNetwork()
train(model)

In [None]:
model = DeepQNetwork()
model.load_state_dict(torch.load('/content/drive/MyDrive/DenseQNet.pt'))
model.eval()

In [None]:
test(model)