# YOLO and OWL-ViT Check Annotation Quality

In [1]:
import os 
import json
import copy

from tqdm import tqdm
import pandas as pd
from ultralytics import YOLO
from PIL import Image, ImageDraw
import PIL
from transformers import OwlViTProcessor, OwlViTForObjectDetection
import torch
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split

PIL.Image.MAX_IMAGE_PIXELS = 108000001

%matplotlib inline  
%load_ext autoreload
%autoreload 2

In [2]:
img_dir = "../data_round_2/final"
annotations_csv = "../data_round_2/phase2_train_v0.csv"
yolo_path = 'yolo/runs/detect/classic/weights/best.pt'

In [3]:
df_img_label = df[["img_fName", "class_label"]].drop_duplicates()

_train_data, _val_data = train_test_split(
    df_img_label,
    test_size=0.2,
    stratify=df_img_label["class_label"],
    random_state=200,
)
val_list = list(set(_val_data["img_fName"]))
df = df[df['img_fName'].isin(val_list)]

NameError: name 'df' is not defined

In [16]:
det = YOLO(yolo_path, task="detect")
df = pd.read_csv(annotations_csv)


#_, df = train_test_split(
#        df,
#        test_size=0.2,
#        #stratify=df["class_label"],
#        random_state=200,
#    )


train_df = df.sample(frac=0.8, random_state=200)
df = df.drop(train_df.index)

owl_processor = OwlViTProcessor.from_pretrained("google/owlvit-base-patch32", cache_dir='models/owl/')
owl_model = OwlViTForObjectDetection.from_pretrained("google/owlvit-base-patch32", cache_dir='models/owl/').cuda()

In [5]:
def nms_pytorch(P : torch.tensor ,thresh_iou : float):
    """
    Apply non-maximum suppression to avoid detecting too many
    overlapping bounding boxes for a given object.
    Args:
        boxes: (tensor) The location preds for the image 
            along with the class predscores, Shape: [num_boxes,5].
        thresh_iou: (float) The overlap thresh for suppressing unnecessary boxes.
    Returns:
        A list of filtered boxes, Shape: [ , 5]

    Not smart so copy paste:
        https://learnopencv.com/non-maximum-suppression-theory-and-implementation-in-pytorch/
    """
 
    # we extract coordinates for every 
    # prediction box present in P
    x1 = P[:, 0]
    y1 = P[:, 1]
    x2 = P[:, 2]
    y2 = P[:, 3]
 
    # we extract the confidence scores as well
    scores = P[:, 4]
 
    # calculate area of every block in P
    areas = (x2 - x1) * (y2 - y1)
     
    # sort the prediction boxes in P
    # according to their confidence scores
    order = scores.argsort()
 
    # initialise an empty list for 
    # filtered prediction boxes
    keep = []
     
 
    while len(order) > 0:
         
        # extract the index of the 
        # prediction with highest score
        # we call this prediction S
        idx = order[-1]
 
        # push S in filtered predictions list
        keep.append(P[idx])
 
        # remove S from P
        order = order[:-1]
 
        # sanity check
        if len(order) == 0:
            break
         
        # select coordinates of BBoxes according to 
        # the indices in order
        xx1 = torch.index_select(x1,dim = 0, index = order)
        xx2 = torch.index_select(x2,dim = 0, index = order)
        yy1 = torch.index_select(y1,dim = 0, index = order)
        yy2 = torch.index_select(y2,dim = 0, index = order)
 
        # find the coordinates of the intersection boxes
        xx1 = torch.max(xx1, x1[idx])
        yy1 = torch.max(yy1, y1[idx])
        xx2 = torch.min(xx2, x2[idx])
        yy2 = torch.min(yy2, y2[idx])
 
        # find height and width of the intersection boxes
        w = xx2 - xx1
        h = yy2 - yy1
         
        # take max with 0.0 to avoid negative w and h
        # due to non-overlapping boxes
        w = torch.clamp(w, min=0.0)
        h = torch.clamp(h, min=0.0)
 
        # find the intersection area
        inter = w*h
 
        # find the areas of BBoxes according the indices in order
        rem_areas = torch.index_select(areas, dim = 0, index = order) 
 
        # find the union of every prediction T in P
        # with the prediction S
        # Note that areas[idx] represents area of S
        union = (rem_areas - inter) + areas[idx]
         
        # find the IoU of every prediction in P with S
        IoU = inter / union
 
        # keep the boxes with IoU less than thresh_iou
        mask = IoU < thresh_iou
        order = order[mask]


    boxes = [k.numpy().tolist()[:4] for k in keep]
    scores = [k.numpy().tolist()[-1] for k in keep]
    return boxes, scores

In [24]:
@torch.no_grad()
def owl_detect_images(img_path, t=0.5):
    image = Image.open(os.path.join(img_dir, img_path)).convert("RGB")
    texts = [["a photo of a mosquito"]]
    inputs = owl_processor(text=texts, images=image, return_tensors="pt").to('cuda')
    outputs = owl_model(**inputs)
    target_sizes = torch.Tensor([image.size[::-1]]).to('cuda')
    results = owl_processor.post_process_object_detection(outputs=outputs, 
                                                          target_sizes=target_sizes, 
                                                          threshold=0.1)
    boxes, scores = results[0]["boxes"].cpu().detach(), results[0]["scores"].cpu().detach()
    #print('Before nms: ', len(boxes))

    P = torch.cat((boxes, torch.unsqueeze(scores, 1)), 1)
    boxes, scores = nms_pytorch(P, t)
    #print('After nms: ', len(boxes))

    return boxes, scores

    
def detect_images(img_path, t_iou=0.5, t_conf=0.5, shrink=5):
    results = det(os.path.join(img_dir, img_path), iou=t_iou, verbose=False) 

    bboxes = []
    confs = []

    conf_max = 0.0
    box_max = []
    
    for result in results:
        _bboxes = result.boxes.xyxy.tolist()
        _confs = result.boxes.conf.tolist()

        for bbox, conf in zip(_bboxes, _confs):
            #if conf > conf_max:
            if conf > t_conf:
                #conf_max = conf
                #box_max = [bbox[0]+shrink, bbox[1]+shrink, bbox[2]-shrink, bbox[3]-shrink]
                confs.append(conf)
                bboxes.append(bbox)
    #bboxes.append(box_max)
    #confs.append(conf_max)
                
    return bboxes, confs


def iou(box1, box2):
    area1 = (box1[2] - box1[0]) * (box1[3] - box1[1])
    area2 = (box2[2] - box2[0]) * (box2[3] - box2[1])
    
    dif_x = (min(box1[2], box2[2]) - max(box1[0], box2[0]))
    dif_y = (min(box1[3], box2[3]) - max(box1[1], box2[1]))
    
    inter = dif_x * dif_y
    
    if dif_x < 0 or dif_y < 0:
        return 0
    return inter / (area1 + area2 - inter)

def plot_image(img_path, bboxes, true_box=[], plot=True, save_dir='examples'):
    img = Image.open(os.path.join(img_dir, img_path)).convert("RGB")
    img_ = ImageDraw.Draw(img)  
    
    for bbox in bboxes:    
        img_.rectangle(bbox, fill=None, outline="blue", width=int(0.005*max(img.size)))

    if true_box:
        img_.rectangle(true_box, fill=None, outline="green", width=int(0.005*max(img.size)))

    

    if plot:
        plt.imshow(img)
    else:
        os.makedirs(save_dir, exist_ok=True)
        img.save(os.path.join(save_dir, img_path))

In [25]:
# find all the annotations 

__model = "yolo" # "owl"
annotations_owl_path = f'annoations_{__model}.json'

if not os.path.exists(annotations_owl_path):
    data = {}
    
    for img_path in tqdm(df.img_fName):
        if __model == "owl":
            boxes, scores = owl_detect_images(img_path, t=0.1) # this should be equivalent
        else:
            boxes, scores = detect_images(img_path, t_conf=0.4, shrink=0)
        data[img_path] = boxes

    with open(annotations_owl_path, 'w') as f:
        json.dump(data, f, indent=4)
else:
    data = json.load(open(annotations_owl_path))

100%|███████████████████████████████████████████████████████████████████████████████████████| 2071/2071 [01:34<00:00, 21.90it/s]


In [7]:
# do again so I don't run 20 min of owl guy

for key, boxes in data.items():
    if len(boxes) > 1:
        P = torch.tensor([box + [len(boxes) - i] for i, box in enumerate(boxes)])
        boxes, _ = nms_pytorch(P, 0.1)
        data[key] = boxes

In [28]:
# find bad annotations

def find_bad_annotations(df, data, t=0.1, shrink=0):
    bad_annotations = []


    for img_name, pred_boxes in tqdm(data.items()):
        
        # god i am bad at pandas
        true_box = df[df.img_fName == img_name].values.tolist()[0][3:7]

        good_annotation = False
        for pred_box in pred_boxes:
            if not pred_box:
                continue

            pred_box = [pred_box[0]+shrink, pred_box[1]+shrink, pred_box[2]-shrink, pred_box[3]-shrink]
            if iou(true_box, pred_box) > t:
                good_annotation = True
                break

        if not good_annotation:
            bad_annotations.append(img_name)

    return bad_annotations


bad_annotations = find_bad_annotations(df, data, 0.75, 0)

100%|█████████████████████████████████████████████████████████████████████████████████████| 2071/2071 [00:00<00:00, 4826.08it/s]


In [29]:
len(bad_annotations), len(data)

(236, 2071)

In [31]:
for img_name in tqdm(bad_annotations):
    boxes = data[img_name]
    if boxes == [[]]:
        boxes = []
    true_box = df[df.img_fName == img_name].values.tolist()[0][3:7]
    plot_image(img_name, boxes, true_box, False, f'examples_failed_cases_{__model}')


100%|█████████████████████████████████████████████████████████████████████████████████████████| 236/236 [00:07<00:00, 31.83it/s]


In [11]:
# show images with multiple detection
data_multiple = {}
for k, v in data.items():
    if len(v) > 1:
        data_multiple[k] = v
len(data_multiple)

0

In [12]:
for img_name, boxes in tqdm(data_multiple.items()):
    true_box = df[df.img_fName == img_name].values.tolist()[0][3:7]
    plot_image(img_name, boxes, true_box, False, f'example_multiple_annotations_{__model}')

0it [00:00, ?it/s]
