# UE Computer Vision | Project : Visual tracking of video objects

Students: Morgane AUBERT, Rogerio KACIAVA BOMBARDELLI, Alex SZAPIRO

In [None]:
import argparse
from path import Path

import torch
import torch.backends.cudnn as cudnn
import torch.nn.functional as F
import models
from tqdm import tqdm

import torchvision.transforms as transforms
import flow_transforms
from imageio import imread, imwrite
import numpy as np
from util import flow2rgb

import matplotlib.pyplot as plt

from scipy.interpolate import interp2d
from scipy.interpolate import RegularGridInterpolator
from skimage.segmentation import mark_boundaries
from skimage.measure import regionprops

## Functions to run the model and evaluate the results

In [None]:
path_to_model = "./pretrained_model/flownets_EPE1.951.pth.tar"
path_to_sequesnces = "./sequences-train/"

def dice_assessment(groundtruth, estimated, label=255):
    A = groundtruth == label
    B = estimated == label
    TP = len(np.nonzero(A*B)[0])
    FN = len(np.nonzero(A*(~B))[0])
    FP = len(np.nonzero((~A)*B)[0])
    DICE = 0
    if (FP+2*TP+FN) != 0:
        DICE = float(2)*TP/(FP+2*TP+FN)
    return DICE*100

def seg2bmap(seg,width=None,height=None):
    """
    From a segmentation, compute a binary boundary map with 1 pixel wide
    boundaries.  The boundary pixels are offset by 1/2 pixel towards the
    origin from the actual segment boundary.

    Arguments:
        seg     : Segments labeled from 1..k.
        width   : Width of desired bmap  <= seg.shape[1]
        height  : Height of desired bmap <= seg.shape[0]

    Returns:
        bmap (ndarray):	Binary boundary map.
    """

    seg = seg.astype(bool)
    seg[seg>0] = 1

    assert np.atleast_3d(seg).shape[2] == 1

    width  = seg.shape[1] if width  is None else width
    height = seg.shape[0] if height is None else height

    h,w = seg.shape[:2]

    ar1 = float(width) / float(height)
    ar2 = float(w) / float(h)

    assert not (width>w | height>h | abs(ar1-ar2)>0.01),\
        'Can''t convert %dx%d seg to %dx%d bmap.'%(w,h,width,height)

    e  = np.zeros_like(seg)
    s  = np.zeros_like(seg)
    se = np.zeros_like(seg)

    e[:,:-1]    = seg[:,1:]
    s[:-1,:]    = seg[1:,:]
    se[:-1,:-1] = seg[1:,1:]

    b        = seg^e | seg^s | seg^se
    b[-1,:]  = seg[-1,:]^e[-1,:]
    b[:,-1]  = seg[:,-1]^s[:,-1]
    b[-1,-1] = 0

    if w == width and h == height:
        bmap = b
    else:
        bmap = np.zeros((height,width))
        for x in range(w):
            for y in range(h):
                if b[y,x]:
                    j = 1+np.floor((y-1)+height / h)
                    i = 1+np.floor((x-1)+width  / h)
                    bmap[j,i] = 1

    return bmap

def centroid_assessment(groundtruth,estimated):
    a = regionprops(groundtruth)
    b = regionprops(estimated)
    return np.linalg.norm(np.array(a[0].centroid)-np.array(b[0].centroid))

def db_eval_boundary(foreground_mask,gt_mask,bound_th=0.008):
    """
    Compute mean,recall and decay from per-frame evaluation.
    Calculates precision/recall for boundaries between foreground_mask and
    gt_mask using morphological operators to speed it up.

    Arguments:
        foreground_mask (ndarray): binary segmentation image.
        gt_mask         (ndarray): binary annotated image.

    Returns:
        F (float): boundaries F-measure
    """
    assert np.atleast_3d(foreground_mask).shape[2] == 1

    bound_pix = bound_th if bound_th >= 1 else \
            np.ceil(bound_th*np.linalg.norm(foreground_mask.shape))

    # Get the pixel boundaries of both masks
    fg_boundary = seg2bmap(foreground_mask);
    gt_boundary = seg2bmap(gt_mask);

    from skimage.morphology import binary_dilation,disk

    fg_dil = binary_dilation(fg_boundary,disk(bound_pix))
    gt_dil = binary_dilation(gt_boundary,disk(bound_pix))

    # Get the intersection
    gt_match = gt_boundary * fg_dil
    fg_match = fg_boundary * gt_dil

    # Area of the intersection
    n_fg     = np.sum(fg_boundary)
    n_gt     = np.sum(gt_boundary)

    #% Compute precision and recall
    if n_fg == 0 and  n_gt > 0:
        precision = 1
        recall = 0
    elif n_fg > 0 and n_gt == 0:
        precision = 0
        recall = 1
    elif n_fg == 0  and n_gt == 0:
        precision = 1
        recall = 1
    else:
        precision = np.sum(fg_match)/float(n_fg)
        recall    = np.sum(gt_match)/float(n_gt)

    # Compute F measure
    if precision + recall == 0:
        F = 0
    else:
        F = 2*precision*recall/(precision+recall);

    return F*100.

def concatenation(unary_flow, to_ref_flow):
    flow = np.zeros((unary_flow.shape[0],unary_flow.shape[1],2), dtype=np.float32)
    x0 = np.arange(0, unary_flow.shape[0])
    y0 = np.arange(0, unary_flow.shape[1])
    xx, yy = np.meshgrid(x0, y0)
    z = to_ref_flow[xx,yy,1]
    fx = interp2d(x0,y0,z,kind='cubic')
    z = to_ref_flow[xx,yy,0]
    fy = interp2d(x0,y0,z,kind='cubic')
    for x in range(unary_flow.shape[0]):
        for y in range(unary_flow.shape[1]):
            flow_x = fx(x+unary_flow[x,y,1], y+unary_flow[x,y,0])
            flow_y = fy(x+unary_flow[x,y,1], y+unary_flow[x,y,0])
            flow[x,y,1] = unary_flow[x,y,1] + flow_x
            flow[x,y,0] = unary_flow[x,y,0] + flow_y
    return flow

def set_cuda():
    # setting the cuda device
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(device)
    return device

def load_model(model_path, cuda_device):
    # Importing the model
    network_data = torch.load(model_path, map_location=cuda_device)
    print("=> using pre-trained model '{}'".format(network_data['arch']))
    model = models.__dict__[network_data['arch']](network_data).to(cuda_device)
    model.eval()
    cudnn.benchmark = True

    if 'div_flow' in network_data.keys():
        div_flow = network_data['div_flow']
    else:
        div_flow = 20.0

    return model, div_flow

@torch.no_grad()
def inference(nb_start, nb_end, name_of_the_figure, cuda_device, model, div_flow):
    # create the folder for saving the results
    path_to_save = Path('my_results')
    if not path_to_save.exists():
        path_to_save.mkdir()


    # Data loading code
    input_transform = transforms.Compose([
        flow_transforms.ArrayToTensor(),
        transforms.Normalize(mean=[0,0,0], std=[255,255,255]),
        transforms.Normalize(mean=[0.411,0.432,0.45], std=[1,1,1])
    ])

    ###############importing images and nework form the paths################
    path_to_sequesnces = "./sequences-train/"
    name = name_of_the_figure + '-'

    
    ###############importing images and nework form the paths################
    for i in range(nb_start, nb_end):

        n_img_1 = i
        n_img_2 = i+1

        path_img1 = path_to_sequesnces + name + str(n_img_1).zfill(3) + ".bmp"
        path_img2 = path_to_sequesnces + name + str(n_img_2).zfill(3) + ".bmp"

        img1 = imread(path_img1)
        img2 = imread(path_img2)
        img1 = input_transform(img1)
        img2 = input_transform(img2)

        input_var = torch.cat([img1, img2]).unsqueeze(0)
        input_var = input_var.to(cuda_device)
        output = model(input_var)
        output = F.interpolate(output, size=img1.size()[-2:], mode = "bilinear", align_corners=False)
  
        flow_output = output.squeeze(0)
        
        file_name = "./my_results/" + name + str(n_img_1).zfill(3) + "-" + str(n_img_2).zfill(3)
        rgb_flow = flow2rgb(div_flow*flow_output, max_value=None)
        to_save_rgb = (rgb_flow * 255).astype(np.uint8).transpose(1,2,0)
        imwrite(file_name + '.png', to_save_rgb)

        to_save_np = (div_flow*flow_output).cpu().numpy().transpose(1,2,0)
        np.save(file_name + '.npy', to_save_np)

@torch.no_grad()
def simple_inference(img1, img2, name, model, cuda_device, save = False):
    div_flow = 20.0
    path_to_save = Path('my_results')
    if not path_to_save.exists():
        path_to_save.mkdir()

    # Data loading code
    input_transform = transforms.Compose([
        flow_transforms.ArrayToTensor(),
        transforms.Normalize(mean=[0,0,0], std=[255,255,255]),
        transforms.Normalize(mean=[0.411,0.432,0.45], std=[1,1,1])
    ])

    img1 = input_transform(img1)
    img2 = input_transform(img2)

    input_var = torch.cat([img1, img2]).unsqueeze(0)
    input_var = input_var.to(cuda_device)
    output = model(input_var)
    output = F.interpolate(output, size=img1.size()[-2:], mode="bilinear", align_corners=False)
    flow_output = output.squeeze(0)

    name = "simple_inference_" + name
    file_name = "./my_results/" + name
    rgb_flow = flow2rgb(div_flow * flow_output, max_value=None)
    to_save_rgb = (rgb_flow * 255).astype(np.uint8).transpose(1, 2, 0)

    # Detach the tensor before converting to numpy
    to_save_np = (div_flow * flow_output).detach().cpu().numpy().transpose(1, 2, 0)

    
        
    
    return  to_save_np
    
@torch.no_grad()    
def complete_inferece_saving_seq(nb_start, nb_end, name):

    device = set_cuda()
    model, div_flow = load_model(path_to_model, device)
    mask = imread("./sequences-train/"+ name +"-001.png")

    dice_seq, fmeasures_seq, centroid_assessment_seq = [], [], [] #tbt
    for i in tqdm(range(nb_start, nb_end)):

        n_img_1 = i
        n_img_2 = i+1

        path_img1 = path_to_sequesnces + name + "-" + str(n_img_1).zfill(3) + ".bmp"
        path_img2 = path_to_sequesnces + name + "-" + str(n_img_2).zfill(3) + ".bmp"

        mask_cur = imread("./sequences-train/"+ name +"-"+ str(n_img_2).zfill(3) +".png")

        img1 = imread(path_img1)
        img2 = imread(path_img2)
        black_image = np.zeros((img1.shape[0], img1.shape[1], 3), dtype=np.uint8)

        flow = simple_inference(img1, img2, name + str(n_img_1).zfill(3), model, device)


        if i == 1:
            flow_conc = flow
        else:
            flow_conc = concatenation(flow, flow_conc)

        mask_predict = propagate_mask(flow_conc, img_current= img2, mask_begin = mask_cur)
        boundaries_predict =  mark_boundaries(black_image, mask_predict, color=(1, 0, 0))
        boundaries_gd      =  mark_boundaries(black_image, mask_cur, color=(0, 1, 0)) 

        imwrite( "./my_results/" + name +"-mask_pro_sequential"+ str(n_img_2).zfill(3) +'.png', mask_predict)

        dice_seq.append(dice_assessment(mask, mask_predict))
        fmeasures_seq.append(db_eval_boundary(mask,mask_predict))
        centroid_assessment_seq.append(centroid_assessment(mask,mask_predict)) #tbt

        np.save("./my_results/" + name + "-dice_seq.npy", dice_seq)
        np.save("./my_results/" + name + "-fmeasures_seq.npy", fmeasures_seq)
        np.save("./my_results/" + name + "-centroid_assessment.npy", centroid_assessment_seq) #tbt

    print("ok " + name)        

@torch.no_grad()
def inference_direct(nb_start, nb_end, name_of_the_figure, cuda_device, model, div_flow):

    # create the folder for saving the results
    path_to_save = Path('my_results')
    if not path_to_save.exists():
        path_to_save.mkdir()


    # Data loading code
    input_transform = transforms.Compose([
        flow_transforms.ArrayToTensor(),
        transforms.Normalize(mean=[0,0,0], std=[255,255,255]),
        transforms.Normalize(mean=[0.411,0.432,0.45], std=[1,1,1])
    ])

    ###############importing images and nework form the paths################
    path_to_sequesnces = "./sequences-train/"
    name = name_of_the_figure + '-'

    
    ###############importing images and nework form the paths################

    n_img_1 = 1
    path_img1 = path_to_sequesnces + name + str(n_img_1).zfill(3) + ".bmp"
    img1 = imread(path_img1)
    img1 = input_transform(img1)

    for i in range(nb_start, nb_end):

        n_img_2 = i+1
        path_img2 = path_to_sequesnces + name + str(n_img_2).zfill(3) + ".bmp"
        img2 = imread(path_img2)
        img2 = input_transform(img2)

        input_var = torch.cat([img1, img2]).unsqueeze(0)
        input_var = input_var.to(cuda_device)
        output = model(input_var)
        output = F.interpolate(output, size=img1.size()[-2:], mode = "bilinear", align_corners=False)
        
        flow_output = output.squeeze(0)

        file_name = "./my_results/" + "direct" + name + str(n_img_1).zfill(3) + "-" + str(n_img_2).zfill(3)
        rgb_flow = flow2rgb(div_flow*flow_output, max_value=None)
        to_save_rgb = (rgb_flow * 255).astype(np.uint8).transpose(1,2,0)
        imwrite(file_name + '.png', to_save_rgb)

        to_save_np = (div_flow*flow_output).cpu().numpy().transpose(1,2,0)
        print(file_name + '.npy')
        np.save(file_name + '.npy', to_save_np)

    print("End of flow calculation for direct integration")


def propagate_mask(flow, img_current, mask_begin):
    new_mask = np.zeros(shape=img_current.shape[:2], dtype=np.uint8)
    for x in range(img_current.shape[0]):
        for y in range(img_current.shape[1]):
            x_, y_ = np.rint(x+flow[x,y,1]).astype(int), np.rint(y+flow[x,y,0]).astype(int)
            if (x_>=0) and (x_<img_current.shape[0]) and (y_>=0) and (y_<img_current.shape[1]):
                if mask_begin[x_,y_] > 0:
                    new_mask[x,y] = 255
    return new_mask


def propagate_mask_direct(nb_start, nb_end, name_in_the_figure):
    # generate all the propagated masks for the direct method

    original_mask = imread(f"./sequences-train/{name_in_the_figure}-001.png")
    first_img = imread(f'./sequences-train/{name_in_the_figure}-001.bmp')
    # imwrite( f"./my_results/{name_in_the_figure}-mask_pro-001-001.png", original_mask) # For coherence when reading them later


    for i in range(nb_start+1, nb_end+1):
        flow = np.load( "./my_results/" + "direct" + name_in_the_figure + "-001-" + str(i).zfill(3) + '.npy')    
        current_mask = propagate_mask(flow, img_current= first_img, mask_begin = original_mask)

        imwrite( f"./my_results/{name_in_the_figure}-mask_pro_dir-001-"+ str(i).zfill(3) +'.png', current_mask)
        print(f"ok {name_in_the_figure}-mask_pro_dir-001-"+ str(i).zfill(3) +'.png')
    print("End of mask propagation for direct integration")



## Sequential integration

**Inference and Evaluatinon of the results using the sequential approach**

In [None]:
# name  = [ "bear", "book", "camel", "rhino", "swan"]
# start = [ 1     ,  1    , 1      , 1      ,  1]  
# end   = [ 26    ,  51   , 90     , 90     ,  50]


# for i in range(len(name)):
#     complete_inferece_saving_seq(start[i], end[i], name[i])

# complete_inferece_saving_seq(1,104,"cow")

# complete_inferece_saving_seq(1,26,"fish")

complete_inferece_saving_seq(1,26,"octopus")







## Direct Integration

In [None]:
nb_start = 1
nb_end   = 50

device = set_cuda()
model, div_flow = load_model(path_to_model, device)


# name  = [ "bear", "book", "camel", "rhino", "swan"]
# start = [ 1     ,  1    , 1      , 1      ,  1]  
# end   = [ 26    ,  51   , 90     , 90     ,  50]

# for i in range(len(name)):
#     inference_direct(start[i], end[i], name[i], device, model, div_flow)
#     propagate_mask_direct(start[i], end[i], name[i])


inference_direct(1, 104, "cow", device, model, div_flow)
propagate_mask_direct(1, 104, "cow")



In [None]:
# calculate the dice and f-measure for the direct integration

for i in range(len(name)):
    mask = imread("./sequences-train/"+ name[i] +"-001.png")
    dice_dir, fmeasures_dir = [], []
    for j in range(start[i]+1, end[i]):
        
        mask_cur = imread("./sequences-train/"+ name[i] +"-"+ str(j).zfill(3) +".png")
        mask_predict = imread("./my_results/"+ name[i]+ "-mask_pro_dir-001-"+ str(j).zfill(3) +'.png')
        dice_dir.append(dice_assessment(mask, mask_predict))
        fmeasures_dir.append(db_eval_boundary(mask,mask_predict))
        np.save("./my_results/" + name[i] + "-dice_dir.npy", dice_dir)
        np.save("./my_results/" + name[i] + "-fmeasures_dir.npy", fmeasures_dir)

## Results visualization

In [None]:
# Results sequential

name = ["bear", "book", "camel", "rhino", "swan"]
start = [ 1     ,  1    , 1      , 1      ,  1]  
end =   [ 26    ,  51   , 90     , 90     ,  50]


for i in range(len(name)):
    file_path_dice ="./my_results/"+name[i] +"-dice_seq.npy"
    file_path_fmeasures ="./my_results/"+name[i] +"-fmeasures_seq.npy"

    loaded_array_dice = np.load(file_path_dice)
    loaded_array_fmeasure = np.load(file_path_fmeasures)

    x = range(start[i]+1,end[i]+1)
    print(name[i])
    fig, ax = plt.subplots(figsize=(17, 6))
    ax.set_title("Sequential " + name[i])
    ax.plot(x,loaded_array_dice,marker='o',color='r')
    ax.plot(x,loaded_array_fmeasure,marker='v',color='g')
    ax.set_xlim((start[i]+1+1,end[i]+1))
    ax.set_ylim((0,100))
    ax.set_ylabel('score')
    ax.set_xlabel("im")
    ax.grid()
    ax.legend(['dice', 'Fmeasure'])
    plt.savefig('results_' + name[i] +'.png')
    plt.show()






 **Results Direct integration**

In [None]:
name = ["bear", "book", "camel", "rhino", "swan"]
start = [ 1     ,  1    , 1      , 1      ,  1]  
end =   [ 26    ,  51   , 90     , 90     ,  50]


for i in range(len(name)):
    file_path_dice ="./my_results/"+name[i] +"-dice_dir.npy"
    file_path_fmeasures ="./my_results/"+name[i] +"-fmeasures_dir.npy"

    loaded_array_dice = np.load(file_path_dice)
    loaded_array_fmeasure = np.load(file_path_fmeasures)

    x = range(start[i]+1,end[i])
    print(name[i])
    fig, ax = plt.subplots(figsize=(17, 6))
    ax.set_title("Direct " + name[i])
    ax.plot(x,loaded_array_dice,marker='o',color='r')
    ax.plot(x,loaded_array_fmeasure,marker='v',color='g')
    ax.set_xlim((start[i]+1+1,end[i]+1))
    ax.set_ylim((0,100))
    ax.set_ylabel('score')
    ax.set_xlabel("im")
    ax.grid()
    ax.legend(['dice', 'Fmeasure'])
    plt.savefig('results_' + name[i] +'.png')
    plt.show()

In [None]:
name = [ "book", "camel", "rhino", "swan"]
start = [   1    , 1      , 1      ,  1]  
end =   [  51   , 90     , 90     ,  50]


for i in range(len(name)):

    # direct 
    file_path_dice_d ="./my_results/"+name[i] +"-dice_dir.npy"
    file_path_fmeasures_d ="./my_results/"+name[i] +"-fmeasures_dir.npy"

    loaded_array_dice_d = np.load(file_path_dice_d)
    loaded_array_fmeasure_d = np.load(file_path_fmeasures_d)

    print(name[i])
  

    # sequential
    file_path_dice_s     ="./my_results/"+name[i] +"-dice_seq.npy"
    file_path_fmeasures_s ="./my_results/"+name[i] +"-fmeasures_seq.npy"

    loaded_array_dice_s = np.load(file_path_dice_s)[:-1]
    loaded_array_fmeasure_s = np.load(file_path_fmeasures_s)[:-1]


    print(np.shape(loaded_array_dice_d))
    print(np.shape(loaded_array_fmeasure_d))
    print(np.shape(loaded_array_dice_s))
    print(np.shape(loaded_array_fmeasure_s))

    x = range(start[i],end[i]-1)
    # print(name[i])
    print(x)
    fig, ax = plt.subplots(figsize=(17, 6))
    ax.set_title("Sequential X Direct " + name[i])
    ax.plot(x,loaded_array_dice_d,marker='o',color='r')
    ax.plot(x,loaded_array_fmeasure_d,marker='v',color='g')

    ax.plot(x,loaded_array_dice_s,marker='o',color='b')
    ax.plot(x,loaded_array_fmeasure_s,marker='x',color='pink')
    

    ax.set_xlim((start[i],end[i]-1))
    ax.set_ylim((0,100))
    ax.set_ylabel('score')
    ax.set_xlabel("im")
    ax.grid()
    ax.legend(['dice direct', 'Fmeasure direct ', 'dice sequential ', 'Fmeasure sequential'])
    plt.savefig('results_total_' + name[i] +'.png')
    plt.show()


In [None]:
# generatig gifs 
import imageio


# sequential 
name = ["bear", "book", "camel", "rhino", "swan"]
start = [ 1     ,  1    , 1      , 1      ,  1]  
end =   [ 26    ,  51   , 90     , 90     ,  50]


j = 3 # rhino
rhino_seq_gif = []
rhino_dir_gif = []
rhino_gd_gif = []
for j in range(start[j]+1, end[j]):
    rhino_seq_gif.append(imread("./my_results/rhino-mask_pro_sequential"+ str(j).zfill(3) +'.png'))
    rhino_dir_gif.append(imread("./my_results/rhino-mask_pro_dir-001-"+ str(j).zfill(3) +'.png'))
    rhino_gd_gif.append(imread("./sequences-train/" + name[3]+ "-"+ str(j).zfill(3) + ".png"))


# imageio.mimsave('rhino_seq.gif', rhino_seq_gif)
# imageio.mimsave('rhino_dir.gif', rhino_dir_gif)
# imageio.mimsave('rhino_gd.gif', rhino_gd_gif)
