* annotation list가 아닌 array 형태로 불러오고
* torch.tensor 부분 삭제
* xywh2xyxy, xyxy2xywh 함수 수정

In [1]:
import os
import cv2
from glob import glob 
import os.path as osp
import time
import shutil
from pathlib import Path
import pathlib
import sys
import ast

import matplotlib.pyplot as plt
import numpy as np
import random

import magic
import re
import pandas as pd
import gc

from PIL import Image
from tqdm.notebook import tqdm
import torch

sys.path.append('/home/super/endoai/ugeon/autoAnnotation/ocSort/')
from ocSort.trackers.ocsort.ocsort import OCSort

sys.path.append('/home/super/endoai/ugeon/')
from preprocessingData_v3 import verify_annotation

In [2]:
def xywh2xyxy(x, image_shape = False):
    """
    image_shape = (width, height)
    """
    if not isinstance(x, (np.ndarray)):
        x = np.array(x)

    # Convert nx4 boxes from [cx, cy, w, h, cls] to [x1, y1, x2, y2, cls] where xy1=top-left, xy2=bottom-right
    y = np.copy(x)
    y[:, 0] = x[:, 0] - x[:, 2] / 2  # top left x
    y[:, 1] = x[:, 1] - x[:, 3] / 2  # top left y
    y[:, 2] = x[:, 0] + x[:, 2] / 2  # bottom right x
    y[:, 3] = x[:, 1] + x[:, 3] / 2  # bottom right y
    
    if image_shape:
        y[:,:4] = y[:,:4] * [int(image_shape[0]), int(image_shape[1]),int(image_shape[0]), int(image_shape[1])]
    return y


def xyxy2xywh(x, image_shape = False):
    """
    image_shape = (width, height)
    """

    if not isinstance(x, (np.ndarray)):
        x = np.array(x)

    # Convert nx4 boxes from [x1, y1, x2, y2, cls] to [cx, cy, w, h, cls]
    y = np.copy(x)
    y[:, 0] = (x[:, 0] + x[:, 2]) / 2  # center x
    y[:, 1] = (x[:, 1] + x[:, 3]) / 2  # center y
    y[:, 2] = (x[:, 2] - x[:, 0])         # width x
    y[:, 3] = (x[:, 3] - x[:, 1])        # width y

    if image_shape:    
        y[:,:4] = y[:,:4] / [int(image_shape[0]), int(image_shape[1]),int(image_shape[0]), int(image_shape[1])]
    return y


def IoU( box1, box2):
    # box = (x1, y1, x2, y2)
    box1_area = (box1[2] - box1[0] + 1) * (box1[3] - box1[1] + 1)
    box2_area = (box2[2] - box2[0] + 1) * (box2[3] - box2[1] + 1)

    # obtain x1, y1, x2, y2 of the intersection
    x1 = max(box1[0], box2[0])
    y1 = max(box1[1], box2[1])
    x2 = min(box1[2], box2[2])
    y2 = min(box1[3], box2[3])

    # compute the width and height of the intersection
    w = max(0, x2 - x1 + 1)
    h = max(0, y2 - y1 + 1)

    inter = w * h
    iou = inter / (box1_area + box2_area - inter)
    return iou

def calculateZOOM(img1, img2, scale_factor = 0.7):
    small1 = cv2.resize(img1, None, fx=scale_factor, fy=scale_factor)
    small2 = cv2.resize(img2, None, fx=scale_factor, fy=scale_factor)

    # Convert to grayscale
    gray1 = cv2.cvtColor(small1, cv2.COLOR_BGR2GRAY)
    gray2 = cv2.cvtColor(small2, cv2.COLOR_BGR2GRAY)

    # Detect keypoints and compute the ORB descriptors
    orb = cv2.ORB_create()
    keypoints1, descriptors1 = orb.detectAndCompute(gray1, None)
    keypoints2, descriptors2 = orb.detectAndCompute(gray2, None)

    # Match the descriptors
    bf = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True)
    matches = bf.match(descriptors1, descriptors2)

    # Sort the matches in the order of their distances
    matches = sorted(matches, key = lambda x : x.distance)

    # Calculate the zoom level
    if len(matches) > 4:
        src_pts = np.float32([keypoints1[m.queryIdx].pt for m in matches]).reshape(-1, 1, 2)
        dst_pts = np.float32([keypoints2[m.trainIdx].pt for m in matches]).reshape(-1, 1, 2)

        M, mask = cv2.findHomography(src_pts, dst_pts, cv2.RANSAC, 5.0)
        zoom = np.abs(np.linalg.det(M))
    return zoom

In [8]:
class autoAnnotate:
    
    def __init__(self, dataset_path):
        self.dataset_path = dataset_path
    
    def prepare_Detection_df(self, recursive = True):
        """
        caution! Only Available for jpg images
        """
        images = sorted(glob(self.dataset_path + '**/*.jpg', recursive = recursive))
        img_path_dict = {osp.basename(f) : f for f in images}
        img_annote_dict = dict()
        image_shape = list(map(int, re.findall('(\d+)x(\d+)', magic.from_file(images[0]))[-1]))
        for image in images:
            img_name = image.split('/')[-1]
            try :
                with open(image[:-4]+ '.txt', 'r') as f:
                    temp = np.asarray(list(map(lambda x: list(np.float64(x.strip().split( )))+ [1], f.readlines())))
                    annotation = temp[:,1:]
                    annotation = np.append(annotation, temp[:,0]).reshape(1,6)
                    annotation = xywh2xyxy(annotation, image_shape)
                    img_annote_dict[img_name]= annotation
            except :
                img_annote_dict[img_name]= np.nan
                
        df = pd.DataFrame(columns = ['image', 'path','dets', 'pred_forward', 'pred_backward', 'final_pred'])
        df['image'] = img_path_dict.keys()
        df['path'] = df['image'].apply(lambda x : img_path_dict[x])
        df['dets'] = df['image'].apply(lambda x : img_annote_dict[x])
        print(f"found {len(df)} images / shape : {image_shape[0]}x{image_shape[1]}")
        return df.set_index('image'), image_shape

    def get_properDets(self, prev_dets, new_dets, tracking_valid_iou=0.5):
        valid_idx = []
        for idx, prev_det in enumerate(prev_dets):
            for new_det in new_dets:
                iou = IoU(new_det, prev_det)
                if iou >= tracking_valid_iou:
                    valid_idx.append(idx)
        valid_idx = list(set(valid_idx))
        valid_prev_dets = prev_dets[valid_idx]
        return valid_prev_dets

    
    def create_ocSort_tracker(self):
        ocsort_forward = OCSort(det_thresh = 0.1, iou_threshold = 0.1, min_hits = 1, delta_t = 7,)
        ocsort_backward = OCSort(det_thresh = 0.1, iou_threshold = 0.1, min_hits = 1, delta_t = 7,)
        return ocsort_forward, ocsort_backward

    
    def interpolate(self, tracker, df, direction, tracking_valid_iou, interpolate_count, image_shape):
        """
        direction : forward, backward
        """
        try:
            df.set_index('image', inplace=True)
        except:
            pass
        
        if direction  == 'forward':
            out_df = df.loc[sorted(df.index)]
        elif direction  == 'backward':
            out_df = df.loc[sorted(df.index, reverse=True)]
        
        predictable_state = False
        prev_Detected = False
        prev_img = None
        prev_TrueDets = None
        prev_TrueDets_xywh = None
        zoom_factor = []
        predDet_count = 1
        interpolate_det_count = 0

        for image_name in tqdm(out_df.index):
            det = out_df._get_value(image_name, 'dets')
            img_path = out_df._get_value(image_name, 'path')
            img = cv2.imread(img_path)

            if isinstance(det, (np.ndarray)) :
                if prev_Detected == False:
                    prev_Detected = True
                    predictable_state = False
                elif prev_Detected:
                    predictable_state = True

                prev_img = img
                prev_TrueDets = det
                prev_TrueDets_xywh = xyxy2xywh(det, image_shape)
                predDet_count = 1
                zoom_factor = []
                tracker.update(det,img)
            else :
                det = np.empty((0, 6))
                pred_dets = tracker.update(det,img, return_predict = True)
                pred_dets = self.get_properDets(pred_dets,prev_TrueDets, tracking_valid_iou)
                
                if predictable_state and predDet_count < interpolate_count + 1 and len(pred_dets) >=1:
                    zoom = calculateZOOM(prev_img, img)
                    zoom_factor.append(zoom)
                    if zoom >= 0.6 and zoom <= 1.6:
                        pred_dets_xywh = xyxy2xywh(pred_dets, image_shape)
                        pred_dets_xywh[:,2] = prev_TrueDets_xywh[:,2].max() * np.mean(zoom_factor)
                        pred_dets_xywh[:,3] = prev_TrueDets_xywh[:,3].max() * np.mean(zoom_factor)
                        _, pred_dets_xywh = verify_annotation(pred_dets_xywh)
                        out_df._set_value(image_name, f'pred_{direction}', pred_dets_xywh)        
                        predDet_count += 1
                        interpolate_det_count += 1
                    else:
                        predictable_state = False
                        zoom_factor = []

                prev_img = img
                prev_Detected = False

#         print(f'interpolated {direction}', interpolate_det_count)
        return out_df


    def update_autoAnnotate(self, rst_df, tracking_valid_iou):
        part_images = rst_df.loc[(rst_df.pred_forward.notna())|(rst_df.pred_backward.notna())].index
        for image in part_images:
            pred1 = rst_df._get_value(image, 'pred_forward')
            pred2 = rst_df._get_value(image, 'pred_backward')

            if type(pred1) == float:
                rst_df._set_value(image, 'final_pred', pred2)
            elif type(pred2) == float:
                rst_df._set_value(image, 'final_pred', pred1)

            elif len(pred1) == 1 and len(pred2) == 1: 
                iou = IoU(pred1[0], pred2[0])
                if iou >= tracking_valid_iou:
                    new_pred = (pred1  + pred2)/2
                    _, new_pred = verify_annotation(new_pred)
                    rst_df._set_value(image, 'final_pred', new_pred)
                else:
                    continue
        print(f"{rst_df.final_pred.notna().sum()}/{rst_df.dets.isna().sum()} images interpolated")
        return rst_df


    def save_newAnnotations(self, df, save_path):
        if osp.isdir(save_path) == False:
            os.mkdir(save_path)
            
        new_annotes = df.loc[df['final_pred'].notnull()].index
        for image in new_annotes:
            new_preds = df._get_value(image, 'final_pred')
            assert type(new_preds) != float
            with open(save_path + image[:-4] + '.txt', 'w') as f:
                for pred_det in new_preds:
                    f.write(f"{int(pred_det[-1])} {pred_det[0]} {pred_det[1]} {pred_det[2]} {pred_det[3]}\n")


    def execute(self, tracking_valid_iou = 0.5, interpolate_count = 10, recursive = True):
        df, image_shape = self.prepare_Detection_df(recursive = recursive)
        ocsort_tracker_forward, ocsort_tracker_backward = self.create_ocSort_tracker()
        
        temp_df1 = self.interpolate(ocsort_tracker_forward, df, direction='forward', interpolate_count = interpolate_count, 
                                    tracking_valid_iou = tracking_valid_iou, image_shape=image_shape)
        temp_df2 = self.interpolate(ocsort_tracker_backward, temp_df1, direction='backward', interpolate_count = interpolate_count,
                                    tracking_valid_iou = tracking_valid_iou, image_shape=image_shape)
        final_df  = self.update_autoAnnotate(temp_df2, tracking_valid_iou)
        return final_df

In [14]:
try:
    del obj
except:
    pass

In [15]:
dataset_path = '/home/super/endoai/ugeon/autoAnnotation/testSet/V1697_p5_AD/'

In [16]:
tracking_valid_iou = 0.1
interpolate_count = 10

obj = autoAnnotate(dataset_path = dataset_path)
ocsort_tracker_forward, ocsort_tracker_backward = obj.create_ocSort_tracker()
df, image_shape = obj.prepare_Detection_df()
# df

found 1379 images / shape : 1240x1080


In [19]:
rst = obj.execute()

found 1379 images / shape : 1240x1080


  0%|          | 0/1379 [00:00<?, ?it/s]

  0%|          | 0/1379 [00:00<?, ?it/s]

97/295 images interpolated


In [22]:
rst

Unnamed: 0_level_0,path,dets,pred_forward,pred_backward,final_pred
image,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
V1697_p5_AD_01379.jpg,/home/super/endoai/ugeon/autoAnnotation/testSe...,,,,
V1697_p5_AD_01378.jpg,/home/super/endoai/ugeon/autoAnnotation/testSe...,,,,
V1697_p5_AD_01377.jpg,/home/super/endoai/ugeon/autoAnnotation/testSe...,,,,
V1697_p5_AD_01376.jpg,/home/super/endoai/ugeon/autoAnnotation/testSe...,,,,
V1697_p5_AD_01375.jpg,/home/super/endoai/ugeon/autoAnnotation/testSe...,,,,
...,...,...,...,...,...
V1697_p5_AD_00005.jpg,/home/super/endoai/ugeon/autoAnnotation/testSe...,,,,
V1697_p5_AD_00004.jpg,/home/super/endoai/ugeon/autoAnnotation/testSe...,,,,
V1697_p5_AD_00003.jpg,/home/super/endoai/ugeon/autoAnnotation/testSe...,,,,
V1697_p5_AD_00002.jpg,/home/super/endoai/ugeon/autoAnnotation/testSe...,,,,


In [24]:
rst.loc['V1697_p5_AD_01365.jpg', 'final_pred']

array([[    0.45367,       0.562,       0.148,      0.1503,           0]])

In [131]:
images = sorted(glob(dataset_path + '**/*.jpg', recursive = True))
img_path_dict = {osp.basename(f) : f for f in images}
img_annote_dict = dict()
for image in images:
    image_shape = list(map(int, re.findall('(\d+)x(\d+)', magic.from_file(images[0]))[-1]))
    img_name = image.split('/')[-1]
    try :
        with open(image[:-4]+ '.txt', 'r') as f:
            temp = np.asarray(list(map(lambda x: list(np.float64(x.strip().split( ))), f.readlines())))
            annotation = temp[:,1:]
            annotation = np.append(annotation, temp[:,0]).reshape(1,5)
            annotation = xywh2xyxy(annotation, image_shape)
            annotation[:,:4] = annotation[:,:4] * [int(image_shape[0]), int(image_shape[1]),int(image_shape[0]), int(image_shape[1])]
            img_annote_dict[img_name]= annotation

    except :
        img_annote_dict[img_name]= np.nan
df = pd.DataFrame(columns = ['image', 'path','dets', 'pred_forward', 'pred_backward', 'final_pred'])
df['image'] = img_path_dict.keys()
df['path'] = df['image'].apply(lambda x : img_path_dict[x])
df['dets'] = df['image'].apply(lambda x : img_annote_dict[x])
print(f"found {len(df)} images")
df

found 1379 images


Unnamed: 0,image,path,dets,pred_forward,pred_backward,final_pred
0,V1697_p5_AD_00001.jpg,/home/super/endoai/ugeon/autoAnnotation/testSe...,,,,
1,V1697_p5_AD_00002.jpg,/home/super/endoai/ugeon/autoAnnotation/testSe...,,,,
2,V1697_p5_AD_00003.jpg,/home/super/endoai/ugeon/autoAnnotation/testSe...,,,,
3,V1697_p5_AD_00004.jpg,/home/super/endoai/ugeon/autoAnnotation/testSe...,,,,
4,V1697_p5_AD_00005.jpg,/home/super/endoai/ugeon/autoAnnotation/testSe...,,,,
...,...,...,...,...,...,...
1374,V1697_p5_AD_01375.jpg,/home/super/endoai/ugeon/autoAnnotation/testSe...,,,,
1375,V1697_p5_AD_01376.jpg,/home/super/endoai/ugeon/autoAnnotation/testSe...,,,,
1376,V1697_p5_AD_01377.jpg,/home/super/endoai/ugeon/autoAnnotation/testSe...,,,,
1377,V1697_p5_AD_01378.jpg,/home/super/endoai/ugeon/autoAnnotation/testSe...,,,,
