In [0]:
import cv2
import numpy as np
from keras.utils import Sequence
import copy
import matplotlib.pyplot as plt

In [0]:
from yolo_utils_new import BoundBox, box_iou

In [0]:
class ImageReader:
  # Read images from svt dataset and preprocess/encode 
  def __init__(self, IMAGE_H, IMAGE_W):
    '''
    IMAGE_H, IMAGE_W - height, width of normalized image
    '''
    self.IMAGE_H = IMAGE_H
    self.IMAGE_W = IMAGE_W

  def encode(self, image):
    image = cv2.resize(image, (self.IMAGE_H, self.IMAGE_W))
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image = image/255
    return(image)
  
  def fit(self, train_instance):
    '''
    read in and resize the image, annotations are resized accordingly
        Input-
        train_instance : dictionary containing filename, height, width and object
        
        {'filename': 'img/14_03.jpg',
         'height' :   880,
         'width'  :   1280,
         'lex'    :
         'object' : [{'label': 'room',
                     'xmax': 318,
                     'xmin': 284,
                     'ymax': 184,
                     'ymin': 100 }]
        }
        '''
    if not isinstance(train_instance,dict):
      train_instance = {'filename':train_instance}
                
    image_name = train_instance['filename']
    image = cv2.imread(image_name)
    h, w, c = image.shape

    if image is None: 
      print('Cannot find ', image_name)

    image = self.encode(image)
    if "object" in train_instance.keys():
      all_objs = copy.deepcopy(train_instance['object'])  # necessary, as annotations are being resized   

    # fix object's position and size
      for obj in all_objs:
        for attr in ['xmin', 'xmax']:
          obj[attr] = int(obj[attr] * float(self.IMAGE_W) / w)  # resize annotations
          obj[attr] = max(min(obj[attr], self.IMAGE_W), 0)      # take care of boundary conditions

        for attr in ['ymin', 'ymax']:
          obj[attr] = int(obj[attr] * float(self.IMAGE_H) / h)
          obj[attr] = max(min(obj[attr], self.IMAGE_H), 0)
    else:
      return image
    return image, all_objs

In [0]:
class NEW(Sequence):
  # Batch Generator, generates x_batch (np.array of images) and y_batch (np.array of labels)
  # with augmentation
  def __init__(self, images, config, shuffle = True, aug = True):
    self.images = images # list of dicts with annotation data
    self.config = config 
    self.shuffle = shuffle
    self.aug = aug 
    self.anchors = [BoundBox(0, 0, config['ANCHORS'][2*i], config['ANCHORS'][2*i+1]) for i in range(self.config['N_ANCHORS'])]
    # doesn't matter that encoding is done in min-max format. As long as its 0 centered or one of the points is 0, IOU remains unchanged.

  def __len__(self):
    return int(np.ceil(float(len(self.images))/self.config['BATCH_SIZE']))

  def on_epoch_end(self):
    # shuffle at the end of epoch
   if self.shuffle:
    np.random.shuffle(self.images)

  def load_image(self, i):
    # loads the original image
    return cv2.imread(self.images[i]['filename'])

  def aug_image(self, train_instance, aug): 
    ### read image and augment by scaling and translating 
    image_name = train_instance['filename']
    image = cv2.imread(image_name)
    image = image.astype(np.float32)
    if image is None: 
      print('Cannot find ', image_name)

    h, w, c = image.shape
    if aug:
      ### scale the image, by 0 to 10%
      scale = np.random.uniform()/10. + 1.  
      img = cv2.resize(image, (0,0), fx = scale, fy = scale)

      ### translate the image
      max_offx = (scale-1.) * w
      max_offy = (scale-1.) * h
      offx = int(np.random.uniform() * max_offx)
      offy = int(np.random.uniform() * max_offy)
            
      img = img[offy : (offy + h), offx : (offx + w)]
      img/= 255
      # image = self.aug_pipe.augment_image(image)

    # resize the image to standard size and convert color
    img = cv2.resize(img, (self.config['IMAGE_H'], self.config['IMAGE_W']))
    img = img[:,:,::-1]

    all_objs = copy.deepcopy(train_instance['object'])  # dict of all bbox coords and labels in an image
    for obj in all_objs:

      for attr in ['xmin', 'xmax']:
        if aug: obj[attr] = int(obj[attr] * scale - offx)  # fix scaling and translation
        obj[attr] = int(obj[attr] * float(self.config['IMAGE_W']) / w)  # resize annotations
        obj[attr] = max(min(obj[attr], self.config['IMAGE_W']), 0)      # take care of boundary conditions 

      for attr in ['ymin', 'ymax']:
        if aug: obj[attr] = int(obj[attr] * scale - offy)  # fix scaling and translation
        obj[attr] = int(obj[attr] * float(self.config['IMAGE_H']) / h)  # resize annotations
        obj[attr] = max(min(obj[attr], self.config['IMAGE_H']), 0)      # take care of boundary conditions

    return img, all_objs       

  def __getitem__(self, idx):
    '''
    input-
        idx- non-negative integer
    outputs-
        x_batch: The numpy array of shape  (BATCH_SIZE, IMAGE_H, IMAGE_W, 3).
            
        y_batch: The numpy array of shape  (BATCH_SIZE, GRID_H, GRID_W, N_ANCHORS, 4 + 1). 
          
          y_batch[iframe, igrid_h, igrid_w, ianchor, :4] contains (center_x, center_y, center_w, center_h) 
          of object if the object exists in this (grid cell, anchor) pair, else  0.
          y_batch[iframe,igrid_h,igrid_w,ianchor,4] contains 1 if the object exists in this 
          (grid cell, anchor) pair, else 0.

        b_batch: The numpy array of shape (BATCH_SIZE, 1, 1, 1, TRUE_BOX_BUFFER, 4).

          b_batch[iframe, 1, 1, 1, ibuffer, :] contains ibufferth object's 
          (center_x, center_y, center_w, center_h) in iframeth frame.
          If ibuffer > N objects in iframeth frame, then the values are simply 0.
          This is just a hack to easily calculate loss. 
    ''' 

    l_bound = idx*self.config['BATCH_SIZE']
    r_bound = (idx+1)*self.config['BATCH_SIZE']

    if r_bound > len(self.images):
      r_bound = len(self.images)
      l_bound = r_bound - self.config['BATCH_SIZE']

    instance_count = 0
    ## prepare empty storage space: this will be output
    x_batch = np.zeros((r_bound - l_bound, self.config['IMAGE_H'], self.config['IMAGE_W'], 3))                       # input images
    b_batch = np.zeros((r_bound - l_bound, 1     , 1     , 1    ,  self.config['TRUE_BOX_BUFFER'], 4))               # list of self.config['TRUE_BOX_BUFFER'] GT boxes
    y_batch = np.zeros((r_bound - l_bound, self.config['GRID_H'],  self.config['GRID_W'], self.config['N_ANCHORS'], 4+1)) # desired network output

    for train_instance in self.images[l_bound: r_bound]:
      img, all_objs = self.aug_image(train_instance, aug = self.aug)
      true_box_index = 0

      for obj in all_objs:
        if obj['xmax'] > obj['xmin'] and obj['ymax'] > obj['ymin']:
          center_x = .5*(obj['xmin'] + obj['xmax'])
          center_x = center_x / (float(self.config['IMAGE_W']) / self.config['GRID_W'])                      # unit: grid cell
          center_y = .5*(obj['ymin'] + obj['ymax'])                                                          
          center_y = center_y / (float(self.config['IMAGE_H']) / self.config['GRID_H'])                      # unit: grid cell

          grid_x = int(np.floor(center_x))
          grid_y = int(np.floor(center_y))

          if grid_x < self.config['GRID_W'] and grid_y < self.config['GRID_H']:
            center_w = (obj['xmax'] - obj['xmin']) / (float(self.config['IMAGE_W']) / self.config['GRID_W']) # unit: grid cell
            center_h = (obj['ymax'] - obj['ymin']) / (float(self.config['IMAGE_H']) / self.config['GRID_H']) # unit: grid cell
            box = [center_x, center_y, center_w, center_h]

            # find the anchor that best predicts this box
            best_anchor = -1
            max_iou     = -1
            shifted_box = BoundBox(0,0,center_w, center_h)

            for i in range(len(self.anchors)):
              anchor = self.anchors[i]
              iou = box_iou(shifted_box, anchor)  # GLOBAL FUNCTION

              if max_iou<iou:
                best_anchor = i
                max_iou = iou

            # assign ground truth x, y, w, h, confidence and class probs to y_batch
            y_batch[instance_count, grid_y, grid_x, best_anchor, 0:4] = box
            y_batch[instance_count, grid_y, grid_x, best_anchor, 4  ] = 1.
                        
            # assign the true box to b_batch
            b_batch[instance_count, 0, 0, 0, true_box_index] = box
                        
            true_box_index += 1
            true_box_index = true_box_index % self.config['TRUE_BOX_BUFFER']
      # image assignment to x_batch
      x_batch[instance_count] = img
      # plt.imshow(img)
      instance_count += 1

    return [x_batch, b_batch], y_batch

In [0]:
def checkgen():
  md = copy.copy(val_images[18:54])
  bg = NEW(md, config_1) 
  bg.__len__()
  [xxx,bbb], yyy = bg.__getitem__(0)
  plt.imshow(xxx[0])
  lbl = yyy[0]
  mi = bg.load_image(0)
  mi = cv2.resize(mi, (416,416))
  plt.imshow(mi)

  for h in range(lbl.shape[0]):
    for w in range(lbl.shape[1]):
      for an in range(lbl.shape[2]):
        if lbl[h,w,an,4] == 1:
          print(lbl[h,w,an,:4], '\n')

In [0]:
# Define the parameters, global variables to be used 
def initialize():
  IMAGE_DIMS = (416, 416)
  IMAGE_W = IMAGE_DIMS[1]
  IMAGE_H = IMAGE_DIMS[0]

  GRID_DIMS = (32,32)
  GRID_WSIZE = GRID_DIMS[1]
  GRID_HSIZE = GRID_DIMS[0]

  GRID_W = IMAGE_W//GRID_WSIZE
  GRID_H = IMAGE_H//GRID_HSIZE
  ANCHORS = np.load('/content/drive/My Drive/SVT/anchors_.npy')
  ANCHORS = ANCHORS.reshape(10)
  N_ANCHORS = len(ANCHORS)//2
  TRUE_BOX_BUFFER = 50
  LAMBDA_COORD = 1
  LAMBDA_NO_OBJECT = 1
  LAMBDA_OBJECT = 5
  BATCH_SIZE = 4
  config_dict = {
    'IMAGE_W'         : IMAGE_W, 
    'IMAGE_H'         : IMAGE_H,
    'GRID_WSIZE'      : GRID_WSIZE,
    'GRID_HSIZE'      : GRID_HSIZE,
    'GRID_W'          : GRID_W,  
    'GRID_H'          : GRID_H,
    'ANCHORS'         : ANCHORS,
    'N_ANCHORS'       : N_ANCHORS,
    'TRUE_BOX_BUFFER' : TRUE_BOX_BUFFER,
    'LAMBDA_COORD'     : LAMBDA_COORD,
    'LAMBDA_NO_OBJECT' : LAMBDA_NO_OBJECT,
    'LAMBDA_OBJECT'    : LAMBDA_OBJECT,
    'BATCH_SIZE'      : BATCH_SIZE
  }

  return(config_dict)

In [0]:
if __name__ == "__main__":
  # verification
  # Mount google drive
  from google.colab import drive
  drive.mount('/content/drive')

  import pickle 
  with open ('/content/drive/My Drive/SVT/val_images_.pickle', 'rb') as fp:
    val_images = pickle.load(fp)
    
  config_1= initialize()
  checkgen()