In [None]:
import numpy as np
import pandas as pd 
import torch
import torch.nn as nn
import cv2
import os
import time
from torch.autograd import Variable
import matplotlib.pyplot as plt

In [None]:
!pip install sentence_transformers

# **YOLO MODEL**

In [None]:
#getting weights from pretrained model
weightsfile = '/kaggle/input/data-for-yolo-v3-kernel/yolov3.weights'
classfile = '/kaggle/input/data-for-yolo-v3-kernel/coco.names'
cfgfile = '/kaggle/working/yolov3.cfg'
sample_img1 = '/kaggle/working/input/insta20881.jpg'
input_dir = '/kaggle/working/input'
output_dir = '/kaggle/working/output'
nms_thesh = 0.5

In [None]:
if not os.path.exists(input_dir):
    os.mkdir(input_dir)

In [None]:
! wget https://raw.githubusercontent.com/pjreddie/darknet/master/cfg/yolov3.cfg
! cp /kaggle/input/instagram-images-with-captions/instagram_data2/img2/insta20881.jpg /kaggle/working/input

We will create the model using the configuration file.First we will parse it and then feed to our model

In [None]:
def parse_cfg(config_file):
    file = open(config_file,'r')
    file = file.read().split('\n')
    file =  [line for line in file if len(line)>0 and line[0] != '#']
    file = [line.lstrip().rstrip() for line in file]

    final_list = []
    element_dict = {}
    for line in file:

        if line[0] == '[':
            if len(element_dict) != 0:     # appending the dict stored on previous iteration
                    final_list.append(element_dict)
                    element_dict = {} # again emtying dict
            element_dict['type'] = ''.join([i for i in line if i != '[' and i != ']'])
            
        else:
            val = line.split('=')
            element_dict[val[0].rstrip()] = val[1].lstrip()  #removing spaces on left and right side
        
    final_list.append(element_dict) # appending the values stored for last set
    return final_list

In [None]:
class DummyLayer(nn.Module):
    def __init__(self):
        super(DummyLayer, self).__init__()
        

        
class DetectionLayer(nn.Module):
    def __init__(self, anchors):
        super(DetectionLayer, self).__init__()
        self.anchors = anchors
        
        

def create_model(blocks):
    darknet_details = blocks[0]
    channels = 3 
    output_filters = []
    modulelist = nn.ModuleList()
    for i,block in enumerate(blocks[1:]):
        seq = nn.Sequential()
        if (block["type"] == "convolutional"):
            activation = block["activation"]
            filters = int(block["filters"])
            kernel_size = int(block["size"])
            strides = int(block["stride"])
            use_bias= False if ("batch_normalize" in block) else True
            pad = (kernel_size - 1) // 2
            
            conv = nn.Conv2d(in_channels=channels, out_channels=filters, kernel_size=kernel_size, 
                             stride=strides, padding=pad, bias = use_bias)
            seq.add_module("conv_{0}".format(i), conv)
            
            if "batch_normalize" in block:
                bn = nn.BatchNorm2d(filters)
                seq.add_module("batch_norm_{0}".format(i), bn)

            if activation == "leaky":
                activn = nn.LeakyReLU(0.1, inplace = True)
                seq.add_module("leaky_{0}".format(i), activn)
            
        elif (block["type"] == "upsample"):
            upsample = nn.Upsample(scale_factor = 2, mode = "bilinear")
            seq.add_module("upsample_{}".format(i), upsample)
        
        elif (block["type"] == 'route'):

            block['layers'] = block['layers'].split(',')
            block['layers'][0] = int(block['layers'][0])
            start = block['layers'][0]
            if len(block['layers']) == 1:               
                filters = output_filters[i + start]
                       
            
            elif len(block['layers']) > 1:
                block['layers'][1] = int(block['layers'][1]) - i 
                end = block['layers'][1]
                filters = output_filters[i + start] + output_filters[i + end]
                  
            
            route = DummyLayer()
            seq.add_module("route_{0}".format(i),route)
      
        elif block["type"] == "shortcut":
            from_ = int(block["from"])
            shortcut = DummyLayer()
            seq.add_module("shortcut_{0}".format(i),shortcut)
            
            
        elif block["type"] == "yolo":
            mask = block["mask"].split(",")
            mask = [int(m) for m in mask]
            anchors = block["anchors"].split(",")
            anchors = [(int(anchors[i]), int(anchors[i + 1])) for i in range(0, len(anchors), 2)]
            anchors = [anchors[i] for i in mask]
            block["anchors"] = anchors
            
            detectorLayer = DetectionLayer(anchors)
            seq.add_module("Detection_{0}".format(i),detectorLayer)
                
        modulelist.append(seq)
        output_filters.append(filters)  
        channels = filters
    
    return darknet_details, modulelist
    

# Creating Yolo Model

In [None]:
def prediction(x,inp_dim,anchors,num_classes,CUDA=False):
    batch_size = x.size(0)
    grid_size = x.size(2)
    stride =  inp_dim // x.size(2)   
    
    bbox_attrs = 5 + num_classes
    num_anchors = len(anchors)
    prediction = x.view(batch_size, bbox_attrs*num_anchors, grid_size*grid_size)
    prediction = prediction.transpose(1,2).contiguous()
    prediction = prediction.view(batch_size, grid_size*grid_size*num_anchors, bbox_attrs)
    
    
    anchors = [(a[0]/stride, a[1]/stride) for a in anchors]

    prediction[:,:,0] = torch.sigmoid(prediction[:,:,0])
    prediction[:,:,1] = torch.sigmoid(prediction[:,:,1])
    prediction[:,:,4] = torch.sigmoid(prediction[:,:,4])
    grid = np.arange(grid_size)
    a,b = np.meshgrid(grid, grid)

    x_offset = torch.FloatTensor(a).view(-1,1)
    y_offset = torch.FloatTensor(b).view(-1,1)

    if CUDA:
        x_offset = x_offset.cuda()
        y_offset = y_offset.cuda()

    x_y_offset = torch.cat((x_offset, y_offset), 1).repeat(1,num_anchors).view(-1,2).unsqueeze(0)
    

    prediction[:,:,:2] += x_y_offset

    
    anchors = torch.FloatTensor(anchors)

    if CUDA:
        anchors = anchors.cuda()

    anchors = anchors.repeat(grid_size*grid_size, 1).unsqueeze(0)
    prediction[:,:,2:4] = torch.exp(prediction[:,:,2:4])*anchors 
    prediction[:,:,5: 5 + num_classes] = torch.sigmoid((prediction[:,:, 5 : 5 + num_classes]))    
    prediction[:,:,:4] *= stride    
    return prediction
    

In [None]:
class Darknet(nn.Module):
    def __init__(self, cfgfile):
        super(Darknet, self).__init__()
        self.blocks = parse_cfg(cfgfile)
        self.net_info, self.module_list = create_model(self.blocks)
        
    def forward(self, x, CUDA=False):
        modules = self.blocks[1:]
        outputs = {}  
        
        write = 0    
        
        for i, module in enumerate(modules):        
            module_type = (module["type"])
            if module_type == "convolutional" or module_type == "upsample":
                x = self.module_list[i](x)
                outputs[i] = x
                
            elif module_type == "route":
                layers = module["layers"]
                layers = [int(a) for a in layers]
                if len(layers) == 1:
                    x = outputs[i + layers[0]]
                if len(layers) > 1:
                    map1 = outputs[i + layers[0]]
                    map2 = outputs[i + layers[1]]
                    x = torch.cat((map1,map2),1)
       
                outputs[i] = x
                
            elif  module_type == "shortcut":
                from_ = int(module["from"])

     
                x = outputs[i-1] + outputs[i+from_]  
                outputs[i] = x
                
            elif module_type == 'yolo':
                anchors = self.module_list[i][0].anchors
                
                
                inp_dim = int(self.net_info["height"])
                
                num_classes = int(module["classes"])

                
                x = x.data   
                x = prediction(x,inp_dim,anchors,num_classes)
                
                if not write:              
                    detections = x
                    write = 1
                else:       
                    detections = torch.cat((detections, x), 1)

                outputs[i] = outputs[i-1]
                
        try:
            return detections   
        except:
            return 0
    
    def load_weights(self, weightfile):
        
        
        fp = open(weightfile, "rb")

        header = np.fromfile(fp, dtype = np.int32, count = 5)
        self.header = torch.from_numpy(header)
        self.seen = self.header[3]
        
        weights = np.fromfile(fp, dtype = np.float32)
        
        ptr = 0
        for i in range(len(self.module_list)):
            module_type = self.blocks[i + 1]["type"]
            
            if module_type == "convolutional":
                model = self.module_list[i]
                try:
                    batch_normalize = int(self.blocks[i+1]["batch_normalize"])
                except:
                    batch_normalize = 0
                
                conv = model[0]
                
                if (batch_normalize):
                    bn = model[1]
                    
                    
                    num_bn_biases = bn.bias.numel()
                    
                    
                    bn_biases = torch.from_numpy(weights[ptr:ptr + num_bn_biases])
                    ptr += num_bn_biases
                    
                    bn_weights = torch.from_numpy(weights[ptr: ptr + num_bn_biases])
                    ptr  += num_bn_biases
                    
                    bn_running_mean = torch.from_numpy(weights[ptr: ptr + num_bn_biases])
                    ptr  += num_bn_biases
                    
                    bn_running_var = torch.from_numpy(weights[ptr: ptr + num_bn_biases])
                    ptr  += num_bn_biases
                    
                    bn_biases = bn_biases.view_as(bn.bias.data)
                    bn_weights = bn_weights.view_as(bn.weight.data)
                    bn_running_mean = bn_running_mean.view_as(bn.running_mean)
                    bn_running_var = bn_running_var.view_as(bn.running_var)

                    bn.bias.data.copy_(bn_biases)
                    bn.weight.data.copy_(bn_weights)
                    bn.running_mean.copy_(bn_running_mean)
                    bn.running_var.copy_(bn_running_var)
                
                else:
                    num_biases = conv.bias.numel()
                
                    conv_biases = torch.from_numpy(weights[ptr: ptr + num_biases])
                    ptr = ptr + num_biases
                    
                    
                    conv_biases = conv_biases.view_as(conv.bias.data)
                
                    conv.bias.data.copy_(conv_biases)
                    
                num_weights = conv.weight.numel()
                
                conv_weights = torch.from_numpy(weights[ptr:ptr+num_weights])
                ptr = ptr + num_weights

                conv_weights = conv_weights.view_as(conv.weight.data)
                conv.weight.data.copy_(conv_weights)

# YOLO Predictions 

In [None]:
def bbox_iou(box1, box2):

    b1_x1, b1_y1, b1_x2, b1_y2 = box1[:,0], box1[:,1], box1[:,2], box1[:,3]
    b2_x1, b2_y1, b2_x2, b2_y2 = box2[:,0], box2[:,1], box2[:,2], box2[:,3]
    
    inter_rect_x1 =  torch.max(b1_x1, b2_x1)
    inter_rect_y1 =  torch.max(b1_y1, b2_y1)
    inter_rect_x2 =  torch.min(b1_x2, b2_x2)
    inter_rect_y2 =  torch.min(b1_y2, b2_y2)

    inter_area = torch.clamp(inter_rect_x2 - inter_rect_x1 + 1, min=0) * torch.clamp(inter_rect_y2 - inter_rect_y1 + 1, min=0)
 
    b1_area = (b1_x2 - b1_x1 + 1)*(b1_y2 - b1_y1 + 1)
    b2_area = (b2_x2 - b2_x1 + 1)*(b2_y2 - b2_y1 + 1)
    
    iou = inter_area / (b1_area + b2_area - inter_area)
    
    return iou


def unique(tensor):
    tensor_np = tensor.cpu().numpy()
    unique_np = np.unique(tensor_np)
    unique_tensor = torch.from_numpy(unique_np)
    
    tensor_res = tensor.new(unique_tensor.shape)
    tensor_res.copy_(unique_tensor)
    return tensor_res

def write_results(prediction, confidence, num_classes, nms_conf = 0.4):

    conf_mask = (prediction[:,:,4] > confidence).float().unsqueeze(2)
    prediction = prediction*conf_mask
    
    box_corner = prediction.new(prediction.shape)
    box_corner[:,:,0] = (prediction[:,:,0] - prediction[:,:,2]/2)
    box_corner[:,:,1] = (prediction[:,:,1] - prediction[:,:,3]/2)
    box_corner[:,:,2] = (prediction[:,:,0] + prediction[:,:,2]/2) 
    box_corner[:,:,3] = (prediction[:,:,1] + prediction[:,:,3]/2)
    prediction[:,:,:4] = box_corner[:,:,:4]
    
    batch_size = prediction.size(0)
    write = False
    
    for ind in range(batch_size):  
        image_pred = prediction[ind] 
        max_conf, max_conf_score = torch.max(image_pred[:,5:5+ num_classes], 1)
        max_conf = max_conf.float().unsqueeze(1)
        max_conf_score = max_conf_score.float().unsqueeze(1)
        seq = (image_pred[:,:5], max_conf, max_conf_score)
    
        image_pred = torch.cat(seq, 1) 
        
        non_zero_ind =  (torch.nonzero(image_pred[:,4])) 
        image_pred_ = image_pred[non_zero_ind.squeeze(),:].view(-1,7)
        try:
            
            img_classes = unique(image_pred_[:,-1]) 
        except:
             continue
    
        for cls in img_classes:
            
            cls_mask = image_pred_*(image_pred_[:,-1] == cls).float().unsqueeze(1)
            class_mask_ind = torch.nonzero(cls_mask[:,-2]).squeeze()
            image_pred_class = image_pred_[class_mask_ind].view(-1,7)
            
            
            conf_sort_index = torch.sort(image_pred_class[:,4], descending = True )[1]
            image_pred_class = image_pred_class[conf_sort_index]
            idx = image_pred_class.size(0)
            
            for i in range(idx):
                
                try:
                    ious = bbox_iou(image_pred_class[i].unsqueeze(0), image_pred_class[i+1:])
                except ValueError:
                    break
                except IndexError:
                    break
                
               
                iou_mask = (ious < nms_conf).float().unsqueeze(1)
                image_pred_class[i+1:] *= iou_mask
                
          
                non_zero_ind = torch.nonzero(image_pred_class[:,4]).squeeze()
                image_pred_class = image_pred_class[non_zero_ind].view(-1,7)
          
           
            batch_ind = image_pred_class.new(image_pred_class.size(0), 1).fill_(ind)
            seq = batch_ind, image_pred_class
            if not write:
                output = torch.cat(seq,1)
                write = True
            else:
                out = torch.cat(seq,1)
                output = torch.cat((output,out))
    
    return output
            

In [None]:

def load_classes(namesfile):
    '''outputs a dicionary wthat has its keys as class indices and their names as strings as values. '''
    fp = open(namesfile, "r")
    names = fp.read().split("\n")[:-1]
    return names


def prep_image(img, inp_dim):

    orig_im = cv2.imread(img)
    dim = orig_im.shape[1], orig_im.shape[0]
    img = (letterbox_image(orig_im, (inp_dim, inp_dim)))
    img_ = img[:,:,::-1].transpose((2,0,1)).copy()
    img_ = torch.from_numpy(img_).float().div(255.0).unsqueeze(0)
    return img_, orig_im, dim

def letterbox_image(img, inp_dim):
    
    img_w, img_h = img.shape[1], img.shape[0]
    w, h = inp_dim
    new_w = int(img_w * min(w/img_w, h/img_h))
    new_h = int(img_h * min(w/img_w, h/img_h))
    resized_image = cv2.resize(img, (new_w,new_h), interpolation = cv2.INTER_CUBIC)
    
    canvas = np.full((inp_dim[1], inp_dim[0], 3), 128)

    canvas[(h-new_h)//2:(h-new_h)//2 + new_h,(w-new_w)//2:(w-new_w)//2 + new_w,:] = resized_image
    
    return canvas

In [None]:
CUDA = False
batch_size = 2
print("Loading network.....")
model = Darknet(cfgfile)
model.load_weights(weightsfile)
print("Network successfully loaded")
classes = load_classes(classfile)
print('Classes loaded')
inp_dim = int(model.net_info["height"])
assert inp_dim % 32 == 0 
assert inp_dim > 32
query=''

if CUDA:
    model.cuda()

model.eval()

read_dir = time.time()

try:
    imlist = [os.path.join(os.path.realpath('.'), input_dir, img) for img in os.listdir(input_dir)]
except NotADirectoryError:
    imlist = []
    imlist.append(os.path.join(os.path.realpath('.'), input_dir))
except FileNotFoundError:
    print ("No file or directory with the name {}".format(input_dir))
    exit()
    
if not os.path.exists(output_dir):
    os.makedirs(output_dir)
    
load_batch = time.time()


batches = list(map(prep_image, imlist, [inp_dim for x in range(len(imlist))]))
im_batches = [x[0] for x in batches] 
orig_ims = [x[1] for x in batches] 
im_dim_list = [x[2] for x in batches] 
im_dim_list = torch.FloatTensor(im_dim_list).repeat(1,2) 
    
    
if CUDA:
    im_dim_list = im_dim_list.cuda()

    
    
reminder = 0
if (len(im_dim_list) % batch_size): 
    reminder = 1

if batch_size != 1:
    num_batches = len(imlist) // batch_size + reminder            
    im_batches = [torch.cat((im_batches[i*batch_size : min((i +  1)*batch_size,len(im_batches))])) 
                 for i in range(num_batches)] 
    
    
i = 0
write = False
    
objs = {}    

for batch in im_batches:

        start = time.time()
        if CUDA:
            batch = batch.cuda()       
    
        with torch.no_grad():
            prediction = model(Variable(batch), CUDA)
        
        prediction = write_results(prediction, confidence=0.5, num_classes=80, nms_conf = nms_thesh)
        
        if type(prediction) == int:
            i += 1
            continue


        prediction[:,0] += i*batch_size
                  
        if not write:
            output = prediction
            write = 1
        else:
            output = torch.cat((output,prediction))  
        i += 1
        
        if CUDA:
            torch.cuda.synchronize()
    
try:
    output
except NameError:
    print("No detections were made")
    exit()



im_dim_list = torch.index_select(im_dim_list, 0, output[:,0].long())
scaling_factor = torch.min(inp_dim/im_dim_list,1)[0].view(-1,1)
output[:,[1,3]] -= (inp_dim - scaling_factor*im_dim_list[:,0].view(-1,1))/2
output[:,[2,4]] -= (inp_dim - scaling_factor*im_dim_list[:,1].view(-1,1))/2
output[:,1:5] /= scaling_factor
    
for i in range(output.shape[0]):
    output[i, [1,3]] = torch.clamp(output[i, [1,3]], 0.0, im_dim_list[i,0])
    output[i, [2,4]] = torch.clamp(output[i, [2,4]], 0.0, im_dim_list[i,1])
    
def getLabels(labels):
    global query
    query=labels
    

def write(x, batches, results):
    c1 = tuple(x[1:3].int())
    c2 = tuple(x[3:5].int())
    img = results[int(x[0])]
    cls = int(x[-1])
    label = "{0}".format(classes[cls])
    color = (0,0,255)
    cv2.rectangle(img, c1, c2,color, 2)
    t_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_PLAIN, 1 , 1)[0]
    c2 = c1[0] + t_size[0] + 3, c1[1] + t_size[1] + 4
    cv2.rectangle(img, c1, c2,color, -1)
    cv2.putText(img, label, (c1[0], c1[1] + t_size[1] + 4), cv2.FONT_HERSHEY_PLAIN, 1, [225,255,255], 1)
    getLabels(label)
    return img
    
            
list(map(lambda x: write(x, im_batches, orig_ims), output))
      
det_names = pd.Series(imlist).apply(lambda x: "{}/det_{}".format(output_dir,x.split("/")[-1]))
    
list(map(cv2.imwrite, det_names, orig_ims))

    

torch.cuda.empty_cache()
    

In [None]:
img = cv2.imread('/kaggle/working/output/det_insta20881.jpg') 
img = cv2.cvtColor(img,cv2.COLOR_BGR2RGB)
plt.figure(figsize=(20,10))
plt.imshow(img)

In [None]:
print (query)

In [None]:
#%pip install openai