In [1]:
import time
import tensorflow as tf
import cv2
import os
import glob
import numpy as np
from pathlib import Path
from matplotlib import pyplot as plt
from PIL import Image, ImageDraw, ImageFont
from scipy.spatial import distance
from pathlib import Path
import json

%matplotlib inline

In [2]:
def nms(boxes, scores, classes, overlap_thresh, num_boxes=20):
    #print('Before NMD box shape', np.array(boxes).shape)
    classes_set = np.unique(classes)
    output_boxes = []
    output_scores = []
    output_classes = []
    sort_by_score_idxs = np.argsort(scores)
    boxes = np.array(boxes)[sort_by_score_idxs]
    classes = np.array(classes)[sort_by_score_idxs]
    scores = np.array(scores)[sort_by_score_idxs]
    for cl in classes_set:
        class_indices = np.where(cl == classes)
        class_scores = scores[class_indices]
        class_boxes = boxes[class_indices]
        
        x1 = class_boxes[:, 0]
        y1 = class_boxes[:, 1]
        x2 = class_boxes[:, 2]
        y2 = class_boxes[:, 3]
        
        areas = np.abs(x2 - x1 + 1) * np.abs(y2 - y1 + 1)
        available_indices = np.arange(class_scores.shape[0])
        while available_indices.size > 0:
            output_boxes.append(class_boxes[available_indices[-1]])
            output_scores.append(class_scores[available_indices[-1]])
            output_classes.append(cl)
            if available_indices.size <= 1:
                break
            yy1 = np.maximum(y1[available_indices[-1]], y1[available_indices[:-1]])
            xx1 = np.maximum(x1[available_indices[-1]], x1[available_indices[:-1]])
            yy2 = np.minimum(y2[available_indices[-1]], y2[available_indices[:-1]])
            xx2 = np.minimum(x2[available_indices[-1]], x2[available_indices[:-1]])
            
            w = np.maximum(0.0, xx2-xx1+1)
            h = np.maximum(0.0, yy2-yy1+1)
            intersection = w*h; 

            iou = 1.0*intersection/(areas[available_indices[:-1]] + areas[available_indices[-1]] - intersection)
            suppressed_indices = available_indices[np.where(iou > overlap_thresh)]

            available_indices = available_indices[np.where(iou <= overlap_thresh)]

    sorted_idx = np.argsort(output_scores)[::-1]
    output_boxes, output_scores, output_classes = np.array(output_boxes)[sorted_idx], np.array(output_scores)[sorted_idx], np.array(output_classes)[sorted_idx] 
    return (output_boxes[:num_boxes], output_scores[:num_boxes], output_classes[:num_boxes])

In [3]:
def read_json(path_file):
    with open(path_file) as file_ptr:
        data = json.load(file_ptr)
    return data
def write_json(path_file, data):
    with open(path_file, "w") as file_ptr:
        json.dump(data, file_ptr, indent=4)

class ImagePatcher():
    '''
    Assumes that the input images to the patcher will be of same size
    '''
    def __init__(self, patch_sz, overlap):
        self.window_sz = patch_sz
        self.overlap = overlap
    
    def calculate_padding_for_same_pad(self, input_sz, kernel_sz, stride):
        '''
        Calculate required padding such that kernel convolves on the
        entire input
        '''
        output_sz = np.ceil(input_sz / stride)
        total_pad = (output_sz - 1) * stride - input_sz + kernel_sz
        
        low_priority_pad = total_pad // 2
        high_priority_pad = total_pad - low_priority_pad
        return low_priority_pad, high_priority_pad
    
    def get_num_windows_along_axis(self, axis_len, window_len, overlap):
        '''
        Params:
        ------
        + axis_len - number of pixels along an axis of image
        + window_len - number of pixels along an axis of window
        + overlap - overlap between two windows in percentage
        Returns:
        -------
        + number of windows along the axis
        + stride - number of pixels for the jump
        + number of pad with low priority (left pad along width, top pad along height)
        + number of pad with high priority (right pad along width, bottom pad along height)
        '''
        stride = np.floor(window_len * (100 - overlap) * 0.01)
        low_priority_pad, high_priority_pad = self.calculate_padding_for_same_pad(input_sz=axis_len, kernel_sz=window_len, stride=stride)
        num_windows =int(((axis_len - window_len + low_priority_pad + high_priority_pad) // stride)) + 1    
        return int(num_windows), int(stride), int(low_priority_pad), int(high_priority_pad)
    
    def is_inside_bbox(self, point, bbox):
        xmin, ymin, xmax, ymax = bbox
        point_x, point_y = point
        return point_x >= xmin and point_x <= xmax and point_y >= ymin and point_y <= ymax
    
    def get_patch_annotations(self, patch_xmin, patch_xmax, patch_ymin, patch_ymax, annotations):
        patch_bboxs = []
        patch_box = (patch_xmin, patch_ymin, patch_xmax, patch_ymax)
        for bbox in annotations:
            xmin, ymin, xmax, ymax = bbox
            xmin = xmin + self.left_pad
            xmax = xmax + self.left_pad
            ymin = ymin + self.top_pad
            ymax = ymax + self.top_pad
            
            if self.is_inside_bbox(point=(xmin, ymin), bbox=patch_box) or \
               self.is_inside_bbox(point=(xmin, ymax), bbox=patch_box) or \
               self.is_inside_bbox(point=(xmax, ymin), bbox=patch_box) or \
               self.is_inside_bbox(point=(xmax, ymax), bbox=patch_box):
                patch_bbox_xmin = max(xmin, patch_xmin) - patch_xmin
                patch_bbox_ymin = max(ymin, patch_ymin) - patch_ymin
                patch_bbox_xmax = min(xmax, patch_xmax) - patch_xmin
                patch_bbox_ymax = min(ymax, patch_ymax) - patch_ymin
                patch_bboxs.append([patch_bbox_xmin, patch_bbox_ymin, patch_bbox_xmax, patch_bbox_ymax])
        return patch_bboxs

    def generate_patches(self, path_img_file, path_json_file=None, path_patch_dir="dataset/patches/"):
        """
        Generate patches from image provided
        """
        img = cv2.imread(path_img_file)
        if path_json_file is not None:
            annotations = read_json(path_json_file)

        self.height, self.width, self.channels = img.shape
        self.dtype = img.dtype
        self.num_rows, self.row_stride, self.left_pad, self.right_pad = self.get_num_windows_along_axis(axis_len=self.height, window_len=self.window_sz,overlap=self.overlap)
        self.num_cols, self.col_stride, self.top_pad, self.bottom_pad = self.get_num_windows_along_axis(axis_len=self.width, window_len=self.window_sz,overlap=self.overlap)
        
        self.padded_img_height = self.height+self.left_pad+self.right_pad
        self.padded_img_width = self.width+self.top_pad+self.bottom_pad
        padded_img = np.zeros(shape=(self.padded_img_height, self.padded_img_width, self.channels), dtype=self.dtype)
        padded_img[self.left_pad:self.left_pad+self.height, self.right_pad:self.right_pad+self.width, :] = img
        # cv2.imwrite("padded_img.png", padded_img)

        path_patch_dir = Path(path_patch_dir)
        path_patch_dir.mkdir(parents=True, exist_ok=True)
        image_filename = Path(path_img_file).stem

        for row_idx in range(0, self.num_rows):
            for col_idx in range(0, self.num_cols):
                h_grid_start = row_idx * self.row_stride
                w_grid_start = col_idx * self.col_stride
                
                patch_xmin = w_grid_start
                patch_xmax = w_grid_start+self.window_sz
                patch_ymin = h_grid_start
                patch_ymax = h_grid_start+self.window_sz
                
                patch = padded_img[patch_ymin:patch_ymax, patch_xmin:patch_xmax, :] # slice 
                # print("patch sz", patch.shape)
                
                patch_name = "{}_{}_{}".format(image_filename, row_idx, col_idx)
                if path_json_file is not None:
                    patch_bboxs = self.get_patch_annotations(patch_xmin, patch_xmax, patch_ymin, patch_ymax, annotations)
                    # for patch_bbox in patch_bboxs:
                    #     patch = cv2.rectangle(patch, (patch_bbox[0], patch_bbox[1]), (patch_bbox[2], patch_bbox[3]), (255, 255, 0), 1)
                    write_json(path_file="{}/{}.json".format(path_patch_dir, patch_name), data=patch_bboxs)
                cv2.imwrite("{}/{}.jpg".format(path_patch_dir, patch_name), patch)
    
    def generate_img_from_patches(self, path_patch_dir, img_name, score_threshold=0.0, path_annotation_dir=None):
        padded_img = np.zeros(shape=(self.padded_img_height, self.padded_img_width, self.channels), dtype=self.dtype)
        img_bboxs, scores, classes = [], [], []
        for row_idx in range(0, self.num_rows):
            for col_idx in range(0, self.num_cols):
                h_grid_start = row_idx * self.row_stride
                w_grid_start = col_idx * self.col_stride
                
                patch_xmin = w_grid_start
                patch_xmax = w_grid_start+self.window_sz
                patch_ymin = h_grid_start
                patch_ymax = h_grid_start+self.window_sz
                
                patch_name = "{}_{}_{}".format(img_name, row_idx, col_idx)
                patch = cv2.imread("{}/{}.jpg".format(path_patch_dir, patch_name))
                
                padded_img[patch_ymin:patch_ymax, patch_xmin:patch_xmax, :] = patch
                if path_annotation_dir is not None:
                    patch_bboxs, _score, _classes = read_json("{}/{}.json".format(path_annotation_dir, patch_name))
                    for bbox, tmp_score, tmp_class in zip(patch_bboxs,_score, _classes):
                        if tmp_score >= score_threshold:
                            xmin, ymin, xmax, ymax = bbox
                            xmin = max(xmin + patch_xmin - self.left_pad, 0)
                            ymin = max(ymin + patch_ymin - self.top_pad, 0)
                            xmax = min(xmax + patch_xmin - self.left_pad, self.width)
                            ymax = min(ymax + patch_ymin - self.top_pad, self.height)
                            img_bboxs.append([xmin, ymin, xmax, ymax])
                            scores.append(tmp_score)
                            classes.append(tmp_class)
        
        cv2.imwrite("./padded_img_from_patches_{}.png".format(img_name), padded_img)
        original_img = padded_img[self.left_pad:self.left_pad+self.height, self.right_pad:self.right_pad+self.width, :]
        cv2.imwrite("./original_img_from_patches_{}.png".format(img_name), np.copy(original_img))
        if path_annotation_dir is not None:
            mask = np.zeros_like(original_img)
            # print("Before NMS", len(img_bboxs))
            img_bboxs, scores, _ = nms(boxes=np.array(img_bboxs), scores=np.array(scores), classes=np.array(classes), 
                                  overlap_thresh=0.001, num_boxes=None) # TODO: change overlap threshold
            img_bboxs = img_bboxs.tolist()
            print("Penguins after NMS:", len(img_bboxs))
            scores = scores.tolist()
            for bbox in img_bboxs:
                original_img = cv2.rectangle(original_img, (bbox[0], bbox[1]), (bbox[2], bbox[3]), (0, 255, 0), 2)
                mask = cv2.rectangle(mask, (bbox[0], bbox[1]), (bbox[2], bbox[3]), (255, 255, 255), -1)
            write_json("./original_img_bboxes_{}.json".format(img_name), data = [img_bboxs, scores])
            cv2.imwrite("PATH_TO_INFERENCE_RESULT".format(img_name), original_img)
            cv2.imwrite("PATH_TO_INFERENCE_MASK".format(img_name), mask)
            return mask

In [4]:
def do_inference_for_single_img(model, img, score_threshold):
    bboxs, scores = [], []
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    image = np.asarray(img)
    input_tensor = tf.convert_to_tensor(image)
    input_tensor = input_tensor[tf.newaxis,...]

    # Do inference here
    model_fn = model.signatures['serving_default']
    output_dict = model_fn(input_tensor)
    
    num_detections = int(output_dict.pop('num_detections'))
    output_dict = {key:value[0, :num_detections].numpy() 
                     for key,value in output_dict.items()}
    output_dict['num_detections'] = num_detections
   
    # Filter bboxs 
    for bbox_normalised, score in zip(output_dict['detection_boxes'], output_dict['detection_scores']):
        if score > score_threshold:
            ymin, xmin, ymax, xmax = bbox_normalised
            ymin = int(img.shape[0] * ymin)
            xmin = int(img.shape[1] * xmin)
            ymax = int(img.shape[0] * ymax)
            xmax = int(img.shape[1] * xmax)
            bboxs.append([xmin, ymin, xmax, ymax]) # bbox should be in [xmin, ymin, xmax, ymax]
            scores.append(score)        
    bboxs, scores, classes = nms(boxes=np.array(bboxs), scores=np.array(scores), 
                                 classes= np.array([0] * len(scores)), overlap_thresh=0.5, num_boxes=None)
    return bboxs.tolist(), scores.tolist(), classes.tolist()

In [5]:
def do_inference(model, path_img, score_threshold):
    path_patch_dir = "./tmp"
    img_name = Path(path_img).stem
    image_patcher = ImagePatcher(patch_sz=224, overlap=25) # TODO: change patch size and overlap %
    image_patcher.generate_patches(path_img_file=path_img, path_patch_dir=path_patch_dir)
    
    path_patch_imgs = list(glob.glob("{}/{}_*.jpg".format(path_patch_dir, img_name)))
    for path_patch_img in path_patch_imgs:
        patch_img = cv2.imread(path_patch_img)
        bboxs, score, classes = do_inference_for_single_img(model=model, img=patch_img, score_threshold=score_threshold)
        write_json(path_file=path_patch_img.replace(".jpg", ".json"), data=[bboxs, score, classes])

    img_mask = image_patcher.generate_img_from_patches(path_patch_dir=path_patch_dir, 
                                                       img_name=img_name, 
                                                       score_threshold=score_threshold,
                                                       path_annotation_dir=path_patch_dir)
    img_mask = img_mask[:, :, 0]

    # erode
    kernel = np.ones((5,5),np.uint8)
    erosion = cv2.erode(img_mask, kernel, iterations = 2)

    cnts = cv2.findContours(erosion, cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE)[-2]
    num_penguins = len(cnts)
    bbox_viz = cv2.drawContours(image=np.zeros(shape=(img_mask.shape[0], img_mask.shape[1], 3)), 
                                contours=cnts, contourIdx=-1, color=(255, 255, 255), thickness=2, lineType=cv2.LINE_AA)
    cv2.imwrite("PATH_TO_SAVE_CONTOURS".format(img_name), img_mask)
    img = cv2.imread(path_img)
    img_viz = cv2.drawContours(image=img, contours=cnts, contourIdx=-1, color=(0, 255, 0), thickness=2, lineType=cv2.LINE_AA)
    cv2.imwrite("PATH_TO_SAVE_CONTOURS".format(img_name), img_viz)
    print("[mask method]: Number of penguins in {} : {}".format(img_name, num_penguins))

In [6]:
PATH_TO_SAVED_MODEL = "PATH_TO_SAVED_MODEL" # Path to saved model
tf.keras.backend.clear_session()
model = tf.saved_model.load(PATH_TO_SAVED_MODEL);

In [None]:
print('Inferencing... ', end='')
start_time = time.time()
do_inference(model=model, path_img="TEST_IMAGE_PATH", score_threshold=0.7) # TODO: Change score threshold, path to image1
end_time = time.time()
elapsed_time = end_time - start_time
print('Done inferencing! Took {} seconds'.format(elapsed_time))