# Keras Code:

Dependencies:

- CUDA 9.0
- cuDNN 7.4
- Tensorflow-gpu 1.12.0
- Keras 2.2.4
- pillow 5.4



    

## Fileutil:

In [None]:
'''
fileutil.py

Utilities for various file operations
'''

import glob, os
import numpy as np

def makeFullExtensionList(ext_list:list):
    '''
Create both uppercase and lowercase extension lists without duplication
    
     args:
         ext_list: list of extensions

     returns:
         Extension list with unique and extended case
    '''
    temp_set = set(ext_list)
    res_list = []
    for ext in temp_set:
        # Ignore it is empty
        if len(ext) == 0:
            continue
        # If there is no., Add it to the beginning
        if ext[0] is not '.':
            ext = '.' + ext
        res_list.append(ext.lower())
        res_list.append(ext.upper())
    return res_list

def getTargetPathList(search_root, ext_list = ['.xml']):
    '''
Get list of paths to files with specified extension
     args:
         search_root: search target root path
         ext_list: Extension of file to be searched
     returns:
         Search root path, list of paths relative to search root path
    '''
    res_root = None # Search output root path
    res_list = [] # File path list relative to search root 
    target_exts = makeFullExtensionList(ext_list)
    if len(target_exts) == 0:
        return res_root, res_list
    # Create target file relative path list
    res_root = os.path.abspath(search_root)
    curr_dir = os.getcwd() # Save current path
    os.chdir(search_root) # Move to search target route
    for ext in target_exts:
        res_list += glob.glob('**/*' + ext, recursive=True)
    os.chdir(curr_dir) # Return to the original
    return res_root, sorted(res_list)

class ODData(object):
    '''
    Object Detection Data Class    

   '''
    def __init__(self, img_path:str, size, bboxes, classes):
        self.img_path = img_path
        self.size = size
        self.bboxes = bboxes
        self.classes = classes
        self.num = len(bboxes)
    def __len__(self):
        return self.num

if __name__ == '__main__':
    pass

## Votutils

In [None]:
#Utility for reading data in VOT Challenge format

import os, csv
from random import shuffle
from typing import List, Tuple
from PIL import Image
import numpy as np
from keras.utils import Sequence
from tools import fileutil

In [None]:
def makeTrainValidDirList(mov_root:str, train_rate:float=0.8,
    train_list_name:str='list_train.txt', valid_list_name:str='list_valid.txt'):
    '''
    Create distribution list for image frame directory
     args:
         mov_root: Root directory where frame image directories are stored
         train_rate: Rate used for inner learning (0.0-1.0))
    '''
    mov_dirs = []
    for x in os.scandir(mov_root):
        if x.is_dir() is True:
            mov_dirs.append(x.name)
    mov_dirs = ['{}\n'.format(x) for x in mov_dirs] #Line feed addition
    shuffle(mov_dirs)
    train_num = int(len(mov_dirs) * train_rate)
    train_list = mov_dirs[:train_num]
    valid_list = mov_dirs[train_num:]
    train_list.sort()
    valid_list.sort()
    # Write to file
    with open(os.path.join(mov_root, train_list_name), 'w') as f:
        f.writelines(train_list)
    with open(os.path.join(mov_root, valid_list_name), 'w') as f:
        f.writelines(valid_list)

In [None]:
def getMovDirList(mov_root:str, target_list=None):
    '''
    Acquisition of image frame directory list
     args:
         mov_root: Root directory where frame image directories are stored \ n
         target_list: Path to target image directory name list file \ n
     returns:
         List of image frame directory paths
    '''
    mov_dirs = []
    if target_list is None:
        # If not specified, target all directories immediately below
        for x in os.scandir(mov_root):
            if x.is_dir() is True:
                mov_dirs.append(x.name)
    else:
        with open(os.path.join(mov_root, target_list)) as f:
            rows = f.readlines()
            for x in rows:
                mov_dirs.append(x.rstrip()) 
    return mov_dirs

In [None]:
class VOTBoxData(object):
    '''
    VOT Challenge Single Tracking format data class
     However, polygon vertex information is converted to circumscribed rectangles
     args:
         img_path: Path to the target image
         points: Vertex list of target object (pixel unit)
    '''
    def __init__(self, img_path:str, points:Tuple[float]):
        self.img_path = img_path
        temp = np.array(points).reshape((-1,2))
        x_min, y_min = np.min(temp, axis=0)
        x_max, y_max = np.max(temp, axis=0)
        self.bbox = (x_min, y_min, x_max, y_max)

In [None]:
def encodeBBox(bbox, search_area):
    '''BoundeingBox encoding 
     (x_min, y_min, x_max, y_max) [pixel] format 
     Convert to (cx, cy, w, h) (ratio to search_area) format 
     args:
         bbox: Source BoundingBox 
         search_area: Object search area 
    '''
    # Convert expression format
    cx = (bbox[0] + bbox[2]) * 0.5
    cy = (bbox[1] + bbox[3]) * 0.5
    w = bbox[2] - bbox[0]
    h = bbox[3] - bbox[1]
    # convert to earch_areas relative value
    s_w = search_area[2] - search_area[0]
    s_h = search_area[3] - search_area[1]
    cx = (cx - search_area[0]) / s_w
    cy = (cy - search_area[1]) / s_h
    w /= s_w
    h /= s_h
    return (cx, cy, w, h)

In [None]:
def decodeBBox(bbox, search_area):
    '''BoundeingBox decoding 
     (cx, cy, w, h) (ratio to search_area) form 
     (x_min, y_min, x_max, y_max) Converted to [pixel] format
     args:
         bbox: Source BoundingBox
         search_area: Object search area
    '''
    # search_areas relative value → pixel conversion
    s_w = search_area[2] - search_area[0]
    s_h = search_area[3] - search_area[1]
    cx = bbox[0] * s_w + search_area[0]
    cy = bbox[1] * s_h + search_area[1]
    w = bbox[2] * s_w
    h = bbox[3] * s_h
    # Convert expression format
    x_min = cx - 0.5 * w
    y_min = cy - 0.5 * h
    x_max = cx + 0.5 * w
    y_max = cy + 0.5 * h
    return (x_min, y_min, x_max, y_max)

In [None]:
def calcSearchArea(bbox, img_size, search_rate=0.8):
    '''Calculation of search range
     args:
         bbox: target bbxo 
         img_size: Target image (whole) size 
         search_range: Search radius magnification
    '''
    width, height = img_size
    # Based on diagonal length
    w = (bbox[2] - bbox[0])
    h = (bbox[3] - bbox[1])
    cx = (bbox[0] + bbox[2]) * 0.5
    cy = (bbox[1] + bbox[3]) * 0.5
    search_rad = np.sqrt(w**2 + h**2) * search_rate
    # Determine crop range
    crop_area = [cx - search_rad, cy - search_rad, cx + search_rad, cy + search_rad]
    # If it protrudes (sobresalir), move it inward
    offset_x = 0.0
    if crop_area[0] < 0.0:
        offset_x = -crop_area[0]
    elif crop_area[2] > width:
        offset_x = width - crop_area[2]
    crop_area[0] = int(crop_area[0] + offset_x)
    crop_area[2] = int(crop_area[2] + offset_x)
    offset_y = 0.0
    if crop_area[1] < 0.0:
        offset_y = -crop_area[1]
    elif crop_area[3] > height:
        offset_y = height - crop_area[3]
    crop_area[1] += int(crop_area[1] + offset_y)
    crop_area[3] += int(crop_area[3] + offset_y)
    # Crop if protruding
    if crop_area[0] < 0:
        crop_area[0] = 0
    if crop_area[2] > width:
        crop_area[2] = width
    if crop_area[1] < 0:
        crop_area[1] = 0
    if crop_area[3] > height:
        crop_area[3] = height
    return crop_area

In [None]:
def makeTrainInput(tgt:VOTBoxData, search:VOTBoxData, input_size=(224, 224)):
    '''Input for learning and creation of correct answer data
     args:
         tgt: Detection data of the detection target
         search: Detection data of search results
     returns:
         Detection target image (numpy array), search target image (numpy array), correct answer bbox
    '''
    # Loading images
    img_tgt = Image.open(tgt.img_path)
    img_search = Image.open(search.img_path)
    # Calculation of search range
    search_area = calcSearchArea(tgt.bbox, img_tgt.size)
    # Creating correct answer data ((cx, cy, w, h) format)
    bbox_gt = encodeBBox(tgt.bbox, search_area)
    # Creating an image
    img_tgt = img_tgt.crop(search_area).resize(input_size)
    img_search = img_search.crop(search_area).resize(input_size)
    img_tgt = (np.array(img_tgt) / 128.0) - 1.0
    img_search = (np.array(img_search) / 128.0) - 1.0
    return img_tgt, img_search, bbox_gt

In [None]:
def makePredictInput(img_tgt:str, bbox_tgt, img_search:str, input_size=(224, 224)):
    '''Create inference input data
     args:
         img_tgt: Image to be detected (PIL Image)
         bbox_tgt: bbox representing the object to be detected
         img_search: Search target image (PIL Image)
     returns:
         Network input, search range bbox
    '''
    # Calculation of search range
    search_area = calcSearchArea(bbox_tgt, img_tgt.size)
    # Creating an image
    img_tgt = img_tgt.crop(search_area).resize(input_size)
    img_search = img_search.crop(search_area).resize(input_size)
    img_tgt = (np.array(img_tgt) / 128.0) - 1.0
    img_search = (np.array(img_search) / 128.0) - 1.0
    return [np.array([img_tgt]), np.array([img_search])], search_area

In [None]:
def readVOTDir(mov_dir, img_ext='.jpg')->List[VOTBoxData]:
    '''
    args:
         mov_dir: frame image directory path
         img_ext: Extension of frame image
     returns:
         List of VOTBoxData
    '''
    # Get list of images under specified directory
    img_dir, img_path_list = fileutil.getTargetPathList(mov_dir, ext_list=[img_ext])
    img_path_list.sort()
    res = []
    with open(os.path.join(img_dir, 'groundtruth.txt')) as f:
        reader = csv.reader(f)
        for i, row in enumerate(reader):
            img_path = os.path.join(img_dir, img_path_list[i]) # Create image path
            points = np.array(row, dtype=np.float).reshape((-1,2)) #Target polygon vertex list
            res.append(VOTBoxData(img_path, points))
    return res

def pickDiffPairIndices(input, diff_list=(-2, -1, 0, 1, 2), sample_per_diff=None):
    '''
    Generate an index list that randomly picks up a specified difference pair
     args:
         input: list or list length \ n
         diff_list: a list of diffs of the index on the list of elements to pair \ n
         sample_per_diff: How many sample pairs to get for one difference. If None, up to the upper limit
    '''
    input_len = len(input) if type(input) == list else input # Get the total number of elements
    sample_num = sample_per_diff if sample_per_diff is not None else input_len
    res_list = [] # Output list
    for diff in diff_list:
        indices = [x for x in range(input_len)]
        if diff < 0:
            indices = indices[-diff:] # If diff is negative, round up the lower index limit
        elif diff > 0:
            indices = indices[:-diff] # If diff is positive, lower the upper limit of the index
        # Randomly extract up to sample_num items
        shuffle(indices)
        for x in indices[:sample_num]:
            res_list.append((x, x + diff)) # xth and x + diffth pairs
    return res_list

In [None]:
class VOTTrainGenerator(Sequence):
    '''
    VOT Challenge Single Tracking learning data generator
    '''
    def __init__(self, mov_root:str, target_list=None, input_shape = (224,224,3), diff_list=(-1, 1), batch_size=32):
        '''
        args:
            mov_root: Root path for storing frame image directories
            target_list: Directory list file handled by the generator. If None, target all directories immediately below \ n
            input_shape: shape of input tensor to model
            batch_size: Batch size
        '''
        self.mov_root = mov_root # Frame image directory group storage route
        self.mov_dirs = getMovDirList(mov_root, target_list) # Get list of frame image directory
        self.img_size = (input_shape[1], input_shape[0]) # Input image size (Width, Height) [pixel]
        self.batch_size = batch_size # Batch size
        self.diff_list = diff_list # Difference set list
        self.makeTrainSamples() # Training sample creation

    def makeTrainSamples(self):
        '''
        Creating training samples
        '''
        self.samples = []
        for mov_dir in self.mov_dirs:
            detect_res = readVOTDir(os.path.join(self.mov_root, mov_dir)) # Acquisition of set of target video frame and detection result
            id_pairs = pickDiffPairIndices(detect_res, diff_list = self.diff_list) # Get index and pair for pickup
            for id_pair in id_pairs:
                x = detect_res[id_pair[0]]
                y = detect_res[id_pair[1]]
                sample = {'tgt': x, 'search':y }
                self.samples.append(sample)
        # Update size information
        self.sample_num = len(self.samples)
        self.batch_num = (len(self.samples) - 1) // self.batch_size + 1
        shuffle(self.samples) # Random shuffle

    def __len__(self):
        '''Batch number'''
        return self.batch_num

    def on_epoch_end(self):
        '''Processing at end of epoch'''
        self.makeTrainSamples() # Training sample creation

    def __getitem__(self, idx):
        '''Get batch data
         args:
             idx: batch index
         return:
             imgs: reference frame image list, search target frame list
             results: Correct answer bbox (relative definition with the width and height of the search range phase set to 1.0)
        '''
        start_pos = self.batch_size * idx
        end_pos = start_pos + self.batch_size
        if end_pos > self.sample_num:
            end_pos = self.sample_num
        batch_items = self.samples[start_pos : end_pos]
        # Creating batch content
        x_tgt = []
        x_search = []
        y = []
        for item in batch_items:
            img_tgt, img_search, bbox_gt = makeTrainInput(item['tgt'], item['search'], self.img_size)
            x_tgt.append(img_tgt)
            x_search.append(img_search)
            y.append(bbox_gt)
        x_tgt = np.array(x_tgt) 
        x_search = np.array(x_search) 
        y = np.array(y) 
        return [x_tgt, x_search], y

if __name__ == '__main__':
    tgt_dir = '/media/bodyscrap/drive_d/Dataset/vot2016'
    gen = VOTTrainGenerator(tgt_dir, target_list='list_valid.txt')
    num = len(gen)
    for i in range(num):
        print('{0}/{1}'.format(i + 1, num))
        gen.__getitem__(i)

## The model:

In [None]:
import keras as K
from keras.models import Model
from keras.layers import Input, concatenate, Dense, Flatten, BatchNormalization, Activation
from keras.applications import MobileNetV2
from keras.engine.network import Network
import numpy as np

class Tracknet(object): 
    '''
    GOTURN Network
     The original feature extractor is AlexNet, 
     Converting Caffe's weight was troublesome,
     It has been rewritten to use the appropriate network of keras.applications.
    '''
    def __init__(self, input_shape = (224,224,3)):
        self.input_shape = input_shape # The size of the input image. Match the feature extractor used.

    def build(self):
        self.input_tgt = Input(self.input_shape)    # Image to be detected
        self.input_search = Input(self.input_shape) # Search target image
        # Share the same feature extractor
        x_in = Input(self.input_shape)
        feature_net = MobileNetV2(input_tensor=x_in, alpha=1.0, include_top=False)
        for temp in feature_net.layers:
            temp.trainable = False
        feature_net = Network(x_in, feature_net.output, name='feature')
        self.feature_tgt = feature_net(self.input_tgt)
        self.feature_search = feature_net(self.input_search)
        # Concatenate output results
        self.concat = concatenate([self.feature_tgt, self.feature_search], axis = 3)
        self.fc0 = Flatten()(self.concat)
        # Full join (Original from (4096,) x 3 to (4,) at the end, but reduced because it did not fit in memory
        x = Dense(1024)(self.fc0)
        x = BatchNormalization()(x)
        self.fc1 = Activation('relu')(x)
        x = Dense(1024)(self.fc1)
        x = BatchNormalization()(x)
        self.fc2 = Activation('relu')(x)
        x = Dense(1024)(self.fc2)
        x = BatchNormalization()(x)
        self.fc3 = Activation('relu')(x)
        self.output = Dense(4)(self.fc3)
        # Model output
        self.model = Model(inputs=[self.input_tgt, self.input_search], outputs=self.output)
        return self.model

if __name__ == "__main__":
    tracknet = Tracknet()
    model = tracknet.build()
    model.summary()

## Training Code:

In [None]:
import os, multiprocessing
import glob
from goturn_net import Tracknet
from tools.votutil import VOTTrainGenerator
from keras.models import load_model
from keras.optimizers import Adam
from keras.engine.network import Network

if __name__ == "__main__":
    # Number of concurrently running processe
    proc_count = multiprocessing.cpu_count() - 1
    # Image generator initialization
    img_root = '/media/bodyscrap/drive_d/Dataset/vot2016'
    gen_train = VOTTrainGenerator(img_root, target_list='list_train.txt', batch_size=64)
    gen_valid = VOTTrainGenerator(img_root, target_list='list_valid.txt', batch_size=64)
    # Model initialization
    model_dir = 'models'
    model_name = 'model_goturn'
    models = glob.glob(model_dir + '/' + model_name + '*.h5')
    train_epochs = 100
    initial_epoch = 0
    if len(models) == 0:
        net = Tracknet()
        model = net.build()
        model.compile(loss='mean_squared_error', optimizer=Adam())
    else:
        models.sort()
        path_last_model = models[-1] # Final model Path
        model = load_model(path_last_model, compile=True, custom_objects={'Network':Network})
        cnt_start = path_last_model.rfind('_') + 1
        cnt_end = path_last_model.rfind('.')
        initial_epoch = int(path_last_model[cnt_start:cnt_end])
    final_epoch = initial_epoch + train_epochs
    model.fit_generator(gen_train, validation_data=gen_valid,
    initial_epoch=initial_epoch, epochs=final_epoch, workers=proc_count)
    # Save model
    path_save = '{0}/{1}_{2:08}.h5'.format(model_dir, model_name, final_epoch)
    model.save(path_save)

## Predict:


In [None]:
# predict file

import os
from keras.models import load_model
from keras.applications import mobilenetv2
from keras.engine.network import Network
from tools.votutil import readVOTDir, makePredictInput, decodeBBox
import numpy as np
from PIL import Image, ImageDraw

def drawBBox(img_org:Image, bboxes=[], colors=[(0, 255, 0), (255, 0, 0), (0, 0, 255)])->Image:
    '''
    Create image drawing BoundingBox \ n
     args:
         img_org: Original image to be drawn \ n
         bboxes: List of BoundingBox \ n
         colors: BoundingBox drawing color list (tour) \ n
     returns:
         Image in which BoundingBox is drawn in the input order (PIL.Image)
    '''
    img = img_org.copy()
    draw = ImageDraw.Draw(img)
    for i, bbox in enumerate(bboxes):
        box_draw = [int(x) for x in bbox] # Integerization
        color_draw= colors[i % len(colors)]
        draw.rectangle(box_draw, outline=color_draw)
    return img

if __name__ == "__main__":
    # Target video directory
    mov_dir = '/media/bodyscrap/drive_d/Dataset/vot2016/fish4'
    # Loading the model
    model_path = 'model_goturn.h5'
    model = load_model(model_path, compile=False, custom_objects={'Network':Network})
    model.summary()
    # 動Read picture frame directory
    frames = readVOTDir(mov_dir)
    indices = [x for x in range(1, len(frames))] # Index after the first frame
    res_root = '/media/bodyscrap/drive_d/goturn/GOTURN-Keras/result'
    res_dir = os.path.join(res_root, os.path.basename(mov_dir))
    os.makedirs(res_dir, exist_ok=True)
    #Drawing of frame 0 (correct BoundingBox only)
    img = Image.open(frames[0].img_path)
    img = drawBBox(img, bboxes=[frames[0].bbox])
    path_save = os.path.join(res_dir, os.path.basename(frames[0].img_path))
    img.save(path_save)
    # Drawing after the first frame (correct BoundingBox + estimated BoundingBox) 
    for idx in indices:
        # Read input data
        img_tgt = Image.open(frames[idx -1].img_path)
        bbox_tgt = frames[idx -1].bbox
        img_search = Image.open(frames[idx].img_path)
        input_data, search_area = makePredictInput(img_tgt, bbox_tgt, img_search)
        result = model.predict(input_data)
        if result is None or len(result) == 0:
            continue
        res_box = decodeBBox(result[0], search_area)
        img = drawBBox(img_search, bboxes=[frames[idx].bbox, res_box])
        path_save = os.path.join(res_dir, os.path.basename(frames[idx].img_path))
        img.save(path_save)
