**Fast Fiducial Marker Tracking using CNNs**

*This works develops a convolutional neural network model (CNN) for tracking fiducial markers as fast as possible in order to enable real-time application in embedded systems. A comparison is made between classical methods for edge detection implemented in the OpenCV library.*

# Introduction

Fiducial markers are used in many applications which is necessary to track an object in real time, such as flight testing, automobile testing, mixed reality and optical experiments. For having a predefined shape with fixed and distinct characteristics from the ambient, such as high contrast edges, they are easier to track than an arbitary object that it's glued on. Also in applications that require a higher precision and lower misidentification errors it is often used.

There are several classical algorithms that are used for identifying many markers composed from crosses, circles, edges, lines, edges such as: Harris Corner detection, Shi-Tomasi Corner detection, SIFT, SUFT, FAST, BRIEF, ORB. 

Auxiliary code such as dataset generation and image processing has been moved from this notebook to external python files so that this notebook would be more concise and be focused on the machine learning aspect of this research.

In [None]:
import glob
import os
import urllib.request
import cv2
import sklearn.metrics
import seaborn as sns
sns.set_style("whitegrid") # or darkgrid
#from jupyterthemes import jtplot
#jtplot.style(theme='monokai', context='notebook', ticks=True, grid=False)
import matplotlib.pyplot as plt
from mpl_toolkits.axes_grid1 import ImageGrid
import pandas as pd
import numpy as np

import torch
import torch.nn.functional as F
from torch import nn
from torch.optim import Adam
from torch.utils.data import TensorDataset, DataLoader
from torch.utils.tensorboard import SummaryWriter

from libs_utils import gen_dataset, load_datafiles, save_datafiles, evaluate, train, calc_error, negative_samples, incorporate_dataset
from libs_utils import svg_gen

# Methodology
In this work a secchi disk was used, which encompasses an edge inside a circle, as show in below. The SIZE is defined in pixels. The parameters DEVICE selects the gpu if available and the RETRAIN flag trains the networks from scratch, instead of loading the lastest trained model.

In [None]:
DEVICE = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
SIZE = 24
PADD = 4
RETRAIN = False

## Positional Error
The first step was to create a model that estimates only the position of the markers given a cropped region which contains the center of the marker.

### Dataset generation

The first dataset generated is a simpler one with markers with varying position, color, scale and rotation. The default parameters varies within a uniform distribution of 8 to 30 for the marker radius (in pixels) independently for each axis, 0 to 180 degrees to rotation, 0 to 90 for the background color, 0 to to 60 for the darker marker color and 60 to 165 for the brighter marker color.

Given that the dataset occupies a few gigabytes in memory, for optimization puporses, it is keept loaded in memory throughout the training and evaluation phases.

In [None]:
try:
    imgs, pos = load_datafiles()
    SIZE = imgs.shape[2]
except:
    imgs, pos = gen_dataset(320000, SIZE, SIZE, PADD) # already randomized
    save_datafiles(imgs, pos)
rng = np.random.default_rng(42)
imgs = imgs.astype(np.float32)/255
for i in range(imgs.shape[0]):
     noise = rng.uniform(0, 0.01)
     imgs[i, :, :, :] = np.clip(imgs[i] + rng.uniform(-noise, noise, size=imgs[i].shape), 0, 1)
pos = torch.Tensor(pos.astype(np.float32)/SIZE).to(DEVICE)
validation_size = 16000
test_size = 16000
train_size = imgs.shape[0] - validation_size - test_size
train_dataset = TensorDataset(torch.Tensor(imgs[:train_size]).to(DEVICE), pos[:train_size])
train_dataloader = DataLoader(train_dataset, batch_size=64)
val_dataset = TensorDataset(torch.Tensor(imgs[train_size:-test_size]).to(DEVICE), pos[train_size:-test_size])
val_dataloader = DataLoader(train_dataset, batch_size=128)
test_dataset = TensorDataset(torch.Tensor(imgs[-test_size:]).to(DEVICE), pos[-test_size:])
test_dataloader = DataLoader(test_dataset, batch_size=128)

In [None]:
fig = plt.figure(figsize=(16, 9))
grid = ImageGrid(fig, 111, nrows_ncols=(8, 12),axes_pad=0.1)
for i, ax in enumerate(grid):
    ax.imshow(imgs[i, 0], cmap='gray',  vmin=0, vmax=1)
plt.show()

### Model's definition

Those are the first three simpler models that were defined and had its hyperparameters manually selected during the validation.

The SimpleConvOnlyPos was superseded by its Leaky counterpart that uses a LeakyReLU activation function. The wider model has more parameters that increase its accuracy at the expense of more processing time.

In [None]:
class SimpleConvOnlyPos(nn.Module):
    def __init__(self):
        super().__init__()
        self.actv_func = F.relu
        self.conv0 = nn.Conv2d(1, 4, 3)
        self.conv1 = nn.Conv2d(4, 8, 5)
        self.conv2 = nn.Conv2d(8, 16, 5)
        self.conv3 = nn.Conv2d(16, 8, 5)
        self.conv4 = nn.Conv2d(8, 4, 5)
        self.conv5 = nn.Conv2d(4, 2, 6)
    
    def forward(self, x):
        x = self.actv_func(self.conv0(x))
        x = self.actv_func(self.conv1(x))
        x = self.actv_func(self.conv2(x))
        x = self.actv_func(self.conv3(x))
        x = self.actv_func(self.conv4(x))
        x = torch.flatten(self.conv5(x), 1)
        return x

class SimpleConvOnlyPosLeaky(SimpleConvOnlyPos):
    def __init__(self):
        super().__init__()
        self.actv_func = F.leaky_relu
    
    def forward(self, x):
        return torch.sigmoid(super().forward(x))

class SimpleConvOnlyPosWider(SimpleConvOnlyPosLeaky):
    def __init__(self):
        super().__init__()
        self.conv0 = nn.Conv2d(1, 64, 3)
        self.conv1 = nn.Conv2d(64, 32, 5)
        self.conv2 = nn.Conv2d(32, 16, 5)
        self.conv3 = nn.Conv2d(16, 8, 5)
        self.conv4 = nn.Conv2d(8, 4, 5)
        self.conv5 = nn.Conv2d(4, 2, 6)


simpleConvOnlyPos = SimpleConvOnlyPos().to(DEVICE)
simpleConvOnlyPosLeaky = SimpleConvOnlyPosLeaky().to(DEVICE)
simpleConvOnlyPosWider = SimpleConvOnlyPosWider().to(DEVICE)
onlyPosModels = [simpleConvOnlyPos, simpleConvOnlyPosLeaky, simpleConvOnlyPosWider]

### Training
The training has an early stopping implemented to it.

In [None]:
for model in onlyPosModels:
    name = model.__class__.__name__
    log_run_num = len(glob.glob('saved_models/%s_*'%name)) + 1
    n = '%s_%d' % (name, log_run_num)
    if RETRAIN or log_run_num == 1:
        print(n)
        loss_fn = nn.MSELoss()
        optim = Adam(model.parameters(), lr=0.001)
        writer = SummaryWriter('tb_logs/%s' % n, flush_secs=1)
        print('Training')
        train_loss, val_loss = train(model, loss_fn, optim, train_dataloader, val_dataloader, epochs=500, tensorboard_writer=writer, path='saved_models/%s.pth'%n, patience=50)
    else:
        model.load_state_dict(torch.load('saved_models/%s_%d.pth' % (name, log_run_num - 1)))

In [None]:
for model in onlyPosModels:
    outputs, mse = evaluate(model, nn.MSELoss(), val_dataloader)
    print(model.__class__.__name__, '%.4f' % (np.sqrt(mse) * SIZE))

## Classifying images

### Dataset augmentation with negative samples

In [None]:
neg_imgs = negative_samples(imgs.shape[0], H=SIZE, W=SIZE, images_glob_path='./neg_imgs/*.jpg')
train_dataloader2 = incorporate_dataset(neg_imgs[:train_size], train_dataset, batch_size=32)
val_dataloader2 = incorporate_dataset(neg_imgs[train_size:-test_size], val_dataset, batch_size=128)
np.random.seed(42)
test_neg_imgs = negative_samples(test_size, H=SIZE, W=SIZE, images_glob_path='./neg_imgs_test/*.jpg')
test_neg_imgs = neg_imgs[-test_size:]
test_dataloader2 = incorporate_dataset(test_neg_imgs, test_dataset, batch_size=128)

In [None]:
imgs2 = train_dataloader2.dataset.tensors[0][:256].cpu().numpy()
fig = plt.figure(figsize=(16, 9))
grid = ImageGrid(fig, 111, nrows_ncols=(8, 12),axes_pad=0.1)
for i, ax in enumerate(grid):
    ax.imshow(imgs2[i, 0], cmap='gray',  vmin=0, vmax=1)
plt.show()

### Model definition

In [None]:
class SimpleConv(SimpleConvOnlyPosLeaky):
    def __init__(self):
        super().__init__()
        self.conv5 = nn.Conv2d(4, 3, 6)
    
    def load_from_onlypos_model(self, onlypos_model):
        new_dict = onlypos_model.state_dict()
        self.conv5.weight.data[:2, :, :, :] = new_dict['conv5.weight']
        new_dict['conv5.weight'] = self.conv5.weight
        self.conv5.bias.data[:2] = new_dict['conv5.bias']
        new_dict['conv5.bias'] = self.conv5.bias
        self.load_state_dict(new_dict)

### Transfer learning from previous models

In [None]:
model = SimpleConvOnlyPosLeaky().to(DEVICE)
model.load_state_dict(torch.load('saved_models/SimpleConvOnlyPosLeaky_1.pth'))
sconv = SimpleConv()
sconv.load_from_onlypos_model(model)
sconv = sconv.to(DEVICE)

### Loss definition for positional and classification error

In [None]:
bce = nn.BCELoss()
K = SIZE * SIZE
def mse_bce(output, target):
    return torch.mean((output[:,:2]-target[:,:2])**2*target[:,2:]*K) + bce(output[:,2], target[:,2])

### Multitask training for localization and classification

In [None]:
name = 'SimpleConv'
log_run_num = len(glob.glob('saved_models/%s_*'%name)) + 1
n = '%s_%d' % (name, log_run_num)
if RETRAIN or log_run_num == 1:
    optimizer_ce = torch.optim.Adam(sconv.parameters(), lr=0.001)
    writer = SummaryWriter('tb_logs/%s' % n)
    train_loss, val_loss = train(sconv, mse_bce, optimizer_ce, train_dataloader2, val_dataloader2, epochs=500, tensorboard_writer=writer, path='saved_models/%s.pth'%n, patience=50)
else:
    sconv.load_state_dict(torch.load('saved_models/%s_%d.pth' % (name, log_run_num - 1)))

### Wider model

In [None]:
class SimpleConvWider(SimpleConvOnlyPosWider):
    def __init__(self):
        super().__init__()
        self.conv5 = nn.Conv2d(4, 3, 6)
    
    def load_from_onlypos_model(self, onlypos_model):
        new_dict = onlypos_model.state_dict()
        self.conv5.weight.data[:2, :, :, :] = new_dict['conv5.weight']
        new_dict['conv5.weight'] = self.conv5.weight
        self.conv5.bias.data[:2] = new_dict['conv5.bias']
        new_dict['conv5.bias'] = self.conv5.bias
        self.load_state_dict(new_dict)
        
model = SimpleConvOnlyPosWider().to(DEVICE)
model.load_state_dict(torch.load('saved_models/SimpleConvOnlyPosWider_1.pth'))
sconvw = SimpleConvWider()
sconvw.load_from_onlypos_model(model)
sconvw = sconvw.to(DEVICE)

name = 'SimpleConvWider'
log_run_num = len(glob.glob('saved_models/%s_*'%name)) + 1
n = '%s_%d' % (name, log_run_num)
if RETRAIN or log_run_num == 1:
    optimizer_ce = torch.optim.Adam(sconvw.parameters(), lr=0.001)
    writer = SummaryWriter('tb_logs/%s' % n)
    train_loss, val_loss = train(sconvw, mse_bce, optimizer_ce, train_dataloader2, val_dataloader2, epochs=500, tensorboard_writer=writer, path='saved_models/%s.pth'%n, patience=50)
else:
    sconvw.load_state_dict(torch.load('saved_models/%s_%d.pth' % (name, log_run_num - 1)))

### Centroid Model

In [None]:
class ConvNet(nn.Module):
    def __init__(self, H, W, device=DEVICE):
        super().__init__()
        self.conv1 = nn.Conv2d(1, 4, 5, padding=2, stride=2)
        #self.batch_norm = nn.BatchNorm2d(4)
        self.upsample = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
        self.conv2 = nn.Conv2d(4, 8, 5, padding=2)
        self.conv3 = nn.Conv2d(8, 16, 5, padding=2)
        #self.conv4 = nn.Conv2d(16, 32, 3, padding=1)
        #self.fc1 = nn.Linear(96, 32)
        self.fc2 = nn.Linear(48, 16)
        self.fc3 = nn.Linear(16, 3)
        self.width = (torch.arange(0, W).reshape((1, 1, 1, W))/float(W)).to(device)
        self.height = (torch.arange(0, H).reshape((1, 1, H, 1))/float(H)).to(device)
        self.mean = (W*H)**0.5
        self.init_weights()
    
    def init_weights(self):
        c1 = self.conv1.weight.data
        c1 = c1 / 2
        c1[0, 0, ::2, ::2] = 1
        c1[1, 0, ::2, 1::2] = 1
        c1[2, 0, 1::2, ::2] = 1
        c1[3, 0, 1::2, 1::2] = 1
        self.conv1.weight.data = c1
        c2 = self.conv2.weight.data
        c2 = c2 / 2
        c2[0, :4, :2, :2] = 1
        c2[0, :4, 3:, :2] = -1
        c2[0, :4, :2, 3:] = -1
        c2[0, :4, 3:, 3:] = 1
        c2[1, :4, :2, 2] = 1
        c2[1, :4, 2, :2] = -1
        c2[1, :4, 3:, 2] = 1
        c2[1, :4, 2, 3:] = -1
        c2[2, :4, :2, :] = 1
        c2[2, :4, 3:, :] = -1
        c2[3, :4, :2, :] = 1
        c2[3, :4, 3:, :] = -1
        self.conv2.weight.data = c2
        c3 = self.conv3.weight.data
        c3[:, :8, 2, 2] = 1
        self.conv3.weight.data = c3
    
    def __image_forward__(self, x):
        x = self.conv1(x)
        #x = self.batch_norm(x)
        x = self.upsample(x)
        x = torch.tanh(self.conv2(x))
        x = F.leaky_relu(self.conv3(x))
        #x = F.leaky_relu(self.conv4(x))
        x = torch.exp(x)
        return x
    
    def forward(self, x):
        x = self.__image_forward__(x)
        sum_img = torch.sum(torch.sum(x, dim=2), dim=2)
        H_activ = torch.sum(torch.sum(self.height * x, dim=2), dim=2) / sum_img
        W_activ = torch.sum(torch.sum(self.width * x, dim=2), dim=2) / sum_img
        x = torch.cat([sum_img/self.mean, H_activ, W_activ], dim=1)
        #x = F.leaky_relu(self.fc1(x))
        x = F.leaky_relu(self.fc2(x))
        x = torch.sigmoid(self.fc3(x))
        return x

In [None]:
name = 'ConvNet'
convnet_upsam = ConvNet(SIZE, SIZE).to(DEVICE)
log_run_num = len(glob.glob('saved_models/%s_*'%name)) + 1
n = '%s_%d' % (name, log_run_num)
if RETRAIN or log_run_num == 1:
    optimizer_ce = torch.optim.Adam(convnet_upsam.parameters(), lr=0.001)
    writer = SummaryWriter('tb_logs/%s' % n)
    train_loss, val_loss = train(convnet_upsam, mse_bce, optimizer_ce, train_dataloader2, val_dataloader2, epochs=500, tensorboard_writer=writer, path='saved_models/%s.pth'%n, patience=50)
else:
    convnet_upsam.load_state_dict(torch.load('saved_models/%s_%d.pth' % (name, log_run_num - 1)))

In [None]:
class ConvNetCompat(nn.Module):
    def __init__(self, H, W, device=DEVICE):
        super().__init__()
        self.conv1 = nn.Conv2d(1, 4, 5, padding=2)
        self.conv2 = nn.Conv2d(4, 8, 5, padding=2)
        self.conv3 = nn.Conv2d(8, 16, 5, padding=2)
        #self.conv4 = nn.Conv2d(16, 32, 3, padding=1)
        #self.fc1 = nn.Linear(96, 32)
        self.fc2 = nn.Linear(48, 16)
        self.fc3 = nn.Linear(16, 3)
        self.width = (torch.arange(0, W).reshape((1, 1, 1, W))/float(W)).to(device)
        self.height = (torch.arange(0, H).reshape((1, 1, H, 1))/float(H)).to(device)
        self.mean = (W*H)**0.5
        self.init_weights()
    
    def init_weights(self):
        c1 = self.conv1.weight.data
        c1 = c1 / 2
        c1[0, 0, ::2, ::2] = 1
        c1[1, 0, ::2, 1::2] = 1
        c1[2, 0, 1::2, ::2] = 1
        c1[3, 0, 1::2, 1::2] = 1
        self.conv1.weight.data = c1
        c2 = self.conv2.weight.data
        c2 = c2 / 2
        c2[0, :4, :2, :2] = 1
        c2[0, :4, 3:, :2] = -1
        c2[0, :4, :2, 3:] = -1
        c2[0, :4, 3:, 3:] = 1
        c2[1, :4, :2, 2] = 1
        c2[1, :4, 2, :2] = -1
        c2[1, :4, 3:, 2] = 1
        c2[1, :4, 2, 3:] = -1
        c2[2, :4, :2, :] = 1
        c2[2, :4, 3:, :] = -1
        c2[3, :4, :2, :] = 1
        c2[3, :4, 3:, :] = -1
        self.conv2.weight.data = c2
        c3 = self.conv3.weight.data
        c3[:, :8, 2, 2] = 1
        self.conv3.weight.data = c3
    
    def __image_forward__(self, x):
        x = self.conv1(x)
        #x = self.batch_norm(x)
        x = torch.tanh(self.conv2(x))
        x = F.leaky_relu(self.conv3(x))
        #x = F.leaky_relu(self.conv4(x))
        x = torch.exp(x)
        return x
    
    def forward(self, x):
        x = self.__image_forward__(x)
        sum_img = torch.sum(torch.sum(x, dim=2), dim=2)
        H_activ = torch.sum(torch.sum(self.height * x, dim=2), dim=2) / sum_img
        W_activ = torch.sum(torch.sum(self.width * x, dim=2), dim=2) / sum_img
        x = torch.cat([sum_img/self.mean, H_activ, W_activ], dim=1)
        #x = F.leaky_relu(self.fc1(x))
        x = F.leaky_relu(self.fc2(x))
        x = torch.sigmoid(self.fc3(x))
        return x

In [None]:
name = 'ConvNetCompat'
convnet = ConvNetCompat(SIZE, SIZE).to(DEVICE)
log_run_num = len(glob.glob('saved_models/%s_*'%name)) + 1
n = '%s_%d' % (name, log_run_num)
if RETRAIN or log_run_num == 1:
    optimizer_ce = torch.optim.Adam(convnet.parameters(), lr=0.001)
    writer = SummaryWriter('tb_logs/%s' % n)
    train_loss, val_loss = train(convnet, mse_bce, optimizer_ce, train_dataloader2, val_dataloader2, epochs=500, tensorboard_writer=writer, path='saved_models/%s.pth'%n, patience=50)
else:
    convnet.load_state_dict(torch.load('saved_models/%s_%d.pth' % (name, log_run_num - 1)))

### Extended Model

In [None]:
class ConvNetExtended(nn.Module):
    def __init__(self, convnet):
        super().__init__()
        self.h, self.w = convnet.height.shape[2], convnet.width.shape[3]
        self.h2, self.w2 = self.h//2, self.w//2
        self.convnet = convnet
        self.sumconv = nn.Conv2d(1, 1, (self.h, self.w), stride=(self.h2, self.w2))
        self.hconv = nn.Conv2d(1, 1, (self.h, self.w), stride=(self.h2, self.w2))
        self.wconv = nn.Conv2d(1, 1, (self.h, self.w), stride=(self.h2, self.w2))
        self.fconv2 = nn.Conv2d(48, 16, 1)
        self.fconv3 = nn.Conv2d(16, 3, 1)    
        self.sumconv.weight.data[:,:,:,:] = 1
        self.hconv.weight.data[:,:,:,:] = convnet.height.data
        self.wconv.weight.data[:,:,:,:] = convnet.width.data
        self.fconv2.weight.data[:,:,0,0] = convnet.fc2.weight.data
        self.fconv3.weight.data[:,:,0,0] = convnet.fc3.weight.data
        
    def forward(self, x):
        x = self.convnet.__image_forward__(x)
        b, c, h, w = x.size()
        rh, rw = int((h-self.h)/self.h2)+1, int((w-self.w)/self.w2)+1
        x = x.view(-1, 1, h, w)
        sum_img =  self.sumconv(x)
        H_activ = (self.hconv(x)/sum_img).view(b, c, rh, rw)
        W_activ = (self.wconv(x)/sum_img).view(b, c, rh, rw)
        sum_img = (sum_img/self.convnet.mean).view(b, c, rh, rw)
        x = torch.cat([sum_img, H_activ, W_activ], dim=1)
        x = F.leaky_relu(self.fconv2(x))
        x = torch.sigmoid(self.fconv3(x))
        return x

# Results

## Classical methodologies

In [None]:
criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 30, 0.001)
mask = np.zeros((SIZE, SIZE), dtype=np.uint8)
mask[PADD:-PADD, PADD:-PADD] = 255
DIFF_MAX = 4

def cveval(cvfunc, imgs, t, param):
  pos, dets, metrics = [], [], []
  for i in range(imgs.shape[0]):
      p, detect, metric = cvfunc((imgs[i, 0]*255).astype(np.uint8), np.round(t[i, :2]*SIZE-0.5), param)
      pos.append(p)
      dets.append(detect)
      metrics.append(metric)
  pos = np.array(pos)
  dets = np.array(dets, dtype = bool)
  metrics = np.array(metrics)
  f1 = sklearn.metrics.f1_score(t[:, 2], dets)
  rmse = np.sqrt(np.mean(np.square(t[:, :2] * SIZE - pos)[np.logical_and(dets, t[:, 2]==1), :]))
  err = np.linalg.norm(t[:, :2] * SIZE - pos, axis=-1)[np.logical_and(dets, t[:, 2]==1)]
  sigma_err = err.std()
  return rmse, f1, np.hstack([pos, dets.reshape(-1, 1).astype(np.float64)]), metrics, sigma_err

def subpix(img, ti, param):
    pos = cv2.cornerSubPix(img, np.array([[[ti[0], ti[1]]]], dtype=np.float32), (8,8), (-1,-1), criteria).reshape(-1) + 0.5
    detect = True
    quality = 100 - np.linalg.norm(ti-pos)
    if quality < param or np.all(pos == ti) or np.linalg.norm(ti - pos) > DIFF_MAX:
        detect = False
    return pos, detect, quality

def shitomasi(img, ti, param):
    corners, quality =  cv2.goodFeaturesToTrackWithQuality(img, 1, 0.01, 10, mask=mask, blockSize=4)
    if corners is None or len(corners)==0 or np.linalg.norm(ti - corners.ravel()) > DIFF_MAX:
        return np.array([0, 0.]), False, 0
    return corners.ravel(), quality.ravel()[0] > param, quality.ravel()[0]

def harris(img, ti, param):
    corners, quality = cv2.goodFeaturesToTrackWithQuality(img, 1, 0.01, 10,  mask=mask, blockSize=4, useHarrisDetector=True)
    if corners is None or len(corners)==0 or np.linalg.norm(ti - corners.ravel()) > DIFF_MAX:
        return np.array([0, 0.]), False, 0
    return corners.ravel(), quality.ravel()[0] > param, quality.ravel()[0]

base_img = svg_gen(SIZE, SIZE, PADD, pos=[SIZE/2, SIZE/2], scale=[20, 20], rot=[120, 0], colors=[[150]*3, [40]*3, [60]*3])[0][:, :, 0]
matcher = cv2.BFMatcher.create(cv2.NORM_L2, crossCheck=False)
def matchers(kps, des, base_des):
    if kps is None or len(kps) == 0:
        return np.array([0, 0.]), False, 0
    pos = np.mean(np.array([kp.pt for kp in kps]), axis=0)
    matches = matcher.knnMatch(base_des, des, k=2)
    if len(matches) == 0 or len(matches[0]) < 2:
        metric = 0
    else:
        metric = np.sum([a.distance < 0.8 * b.distance  for a, b in matches])
    return pos, metric > param, metric

sift_config = cv2.SIFT_create(0, 3, 0.03, 10, 1)
base_sift_kp, base_sift_des = sift_config.detectAndCompute(base_img, mask)
def sift(img, ti, param):
    kps, des = sift_config.detectAndCompute(img, mask)
    return matchers(kps, des, base_sift_des)

# surf = cv2.xfeatures2d.SURF_create(350)
# base_surf_kp, base_surf_des = surf.detectAndCompute(base_img, mask)
# def surf(img, ti, param):
#     kps, des = surf.detectAndCompute(img, mask)
#     return matchers(kps, des, base_surf_des)

fast_config = cv2.FastFeatureDetector_create(threshold = 10, nonmaxSuppression = True , type=cv2.FAST_FEATURE_DETECTOR_TYPE_9_16)
base_fast_kp = fast_config.detect(base_img, mask)
def fast(img, ti, param):
    kps = fast_config.detect(img, mask)
    if kps is None or len(kps) == 0:
        return np.array([0., 0]), False, 90
    pos = np.array([kp.pt for kp in kps]).mean(axis=0)
    quality = 100 - np.linalg.norm(ti-pos)
    return pos, quality > param, quality

def fit_param(quality, label):
    qual_false = np.median(quality[label==0])
    qual_true = np.median(quality[label==1])
    param = np.mean([qual_false, qual_true])
    return param

methods = {
    harris: 0.001,
    shitomasi: 0.02,
    subpix: 96.8,
    sift: 0.5,
    fast: 96
}

Each method is first evaluated in the training set to find the optimal hyperparameters. Then they are evaluated on the validadation set in order to verify its performance and finally they are evaluated on the test set in order to get unbiased metrics. The mask, the block (and window) size, the "k" and other parameters where manually selected in the train set.

In [None]:
train_imgs = train_dataloader2.dataset.tensors[0].cpu().numpy()
train_res = train_dataloader2.dataset.tensors[1].cpu().numpy()
val_imgs = val_dataloader2.dataset.tensors[0].cpu().numpy()
val_res = val_dataloader2.dataset.tensors[1].cpu().numpy()
test_imgs = test_dataloader2.dataset.tensors[0].cpu().numpy()
test_res = test_dataloader2.dataset.tensors[1].cpu().numpy()

Some methods have quality measures for the corner detection which are used to decide whether the image contains the center of the marker or not. The problem is that this corner detection technique has a high rate of false positives, and even the external circunference of the marker may be incorrectly identified as its center.
For the cornerSubPix function, it has a metric of distancing between the original position and the refined, and there is also an internal metric of the error of the gradient that could also be used as a discriminator for marker detection. Nevertheless the positional method is simpler to implement.

Therefore the training set was used to determine the values of those parameters that could discriminate between the markers

In [None]:
if RETRAIN:
    method_res = {}
    for method, param in methods.items():
        method_res[method] = cveval(method, train_imgs, train_res, param)
        methods[method] = fit_param(method_res[method][3], train_res[:, 2])
        print(methods)
method_res_val = {}
for method, param in methods.items():
    method_res_val[method] = cveval(method, val_imgs, val_res, param)
    print(method.__name__, '   \t RMSE: %.4f / %.4f' % (method_res_val[method][0], method_res_val[method][-1]), ', F1: %.4f' % method_res_val[method][1], ', Acc: %.4f' % np.mean(method_res_val[method][2][:,-1] == val_res[:,-1]))

In [None]:
method_res_test = {}
for method, param in methods.items():
    method_res_test[method] = cveval(method, test_imgs, test_res, param)
    print(method.__name__, '   \t RMSE: %.4f' % method_res_val[method][0], ', F1: %.4f' % method_res_val[method][1], ', Acc: %.4f' % np.mean(method_res_test[method][2][:,2] == test_res[:,2]))

## Localization Errors

### For models that only estimate the position

In [None]:
for model in onlyPosModels:
    outputs, mse = evaluate(model, nn.MSELoss(), test_dataloader)
    print(model.__class__.__name__, '%.4f' % (np.sqrt(mse) * SIZE))

## Classification Errors

In [None]:
for model in [sconv, sconvw, convnet_upsam, convnet]:
    outputs, mse = evaluate(model, mse_bce, test_dataloader2)
    f1 = sklearn.metrics.f1_score(test_res[:, 2], outputs[:, 2]>0.5)
    rmse = np.sqrt(np.mean(np.square(test_res[:, :2] -  outputs[:, :2])[np.logical_and(test_res[:, 2]==1, outputs[:, 2]>0.5), :])) * SIZE
    err = np.linalg.norm(test_res[:, :2] -  outputs[:, :2], axis=-1)[np.logical_and(test_res[:, 2]==1, outputs[:, 2]>0.5),] * SIZE
    sigma_err = err.std()
    print(model.__class__.__name__, '   \t RMSE: %.4f / %.4f' % (rmse, sigma_err), ', F1: %.6f' % f1, ', Acc: %.6f' % np.mean((outputs[:, 2]>0.5) == test_res[:, 2]))

In [None]:
#plt.rc('text', usetex = True ) # Enable tex for article
plt.rcParams.update({'font.family': 'serif' })
plt.rcParams.update({'font.size': 16})

In [None]:
train_loss = pd.read_csv('./tb_logs/run-ConvNet_1-tag-Training Loss.csv')['Value'].values
val_loss = pd.read_csv('./tb_logs/run-ConvNet_1-tag-Validation Loss.csv')['Value'].values
plt.figure(figsize=(16,7))
plt.plot(list(range(len(train_loss))), train_loss)
plt.plot(list(range(len(val_loss))), val_loss, '--')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend(['Train', "Validation"])
plt.yscale('log')
#plt.grid(color='gray', linestyle='-', linewidth=1)
plt.savefig('traininghistory.pdf', bbox_inches='tight')

In [None]:
s = method_res_test[subpix][2]
o, mse = evaluate(sconvw, mse_bce, test_dataloader2)
conv_rmse = np.sqrt(np.mean(np.square(test_res[:, :2] -  o[:, :2])[np.logical_and(test_res[:, 2]==1, o[:, 2]>0.5), :], axis=1)) * SIZE
subpix_rmse = np.sqrt(np.mean(np.square(test_res[:, :2] * SIZE - s[:,:2])[np.logical_and(s[:,2]==1, test_res[:, 2]==1), :], axis=1))

In [None]:
g = sns.displot({'Convolutional':conv_rmse[conv_rmse<1], 'Sub. Pix.': subpix_rmse[subpix_rmse<1]}, aspect=2.2)
sns.move_legend(g, "upper center")
plt.xlabel('Pixel Error (RMSE)')
plt.savefig('errordist.pdf', bbox_inches='tight')

In [None]:
i = (imgs[0][0]*255).astype(np.uint8)
p = np.array([12., 12])

In [None]:
%%timeit
harris(i, p, methods[harris])

In [None]:
%%timeit
shitomasi(i, p, methods[shitomasi])

In [None]:
%%timeit
subpix(i, p, methods[subpix])

In [None]:
%%timeit
sift(i, p, methods[sift])

In [None]:
%%timeit
fast(i, p, methods[fast])

# Exports

The models should be exported and run with OpenVINO in order to assess its throughput (FPS).
```
python3 /opt/intel/openvino_2021/deployment_tools/model_optimizer/mo.py --input_model ./onxx/SimpleConv/sconv.onnx -o ~/openvino_models/
path_to_openvino_build/inference_engine_samples_build/intel64/Release/benchmark_app -d GPU -i input_img.png -m ~/openvino_models/sconv.xml -pc -niter 1000
```

In [None]:
os.makedirs('onxx', exist_ok=True)
os.makedirs('onxx/SimpleConv', exist_ok=True)
os.makedirs('onxx/SimpleConvWider', exist_ok=True)
os.makedirs('onxx/ConvNetCompat', exist_ok=True)

In [None]:
X = next(iter(train_dataloader2))[0]
torch.onnx.export(sconv, X, "onxx/SimpleConv/sconv.onnx", verbose=True)

In [None]:
torch.onnx.export(sconvw, X, "onxx/SimpleConvWider/sconvw.onnx", verbose=True)

In [None]:
torch.onnx.export(convnet, X, "onxx/ConvNetCompat/convnet.onnx", verbose=True)

In [None]:
for d,i in [('SimpleConv','sconv'), ('SimpleConvWider','sconvw'), ('ConvNetCompat','convnet')]:
    mo_command = f"""mo --input_model "onxx/{d}/{i}.onnx" --compress_to_fp16 --output_dir "onnx/{d}/" """
    mo_result = %sx $mo_command
    print("\n".join(mo_result))

In [None]:
for d,i in [('SimpleConv','sconv'), ('SimpleConvWider','sconvw'), ('ConvNetCompat','convnet')]:
    benchmark_cmd = f"""~/openvino_cpp_samples_build/intel64/Release/benchmark_app -m "onnx/{d}/{i}.xml" -d GPU -t 30"""
    benchmark_result = %sx $benchmark_cmd
    print("\n".join(benchmark_result))

# Final Considerations

The models were refined on real images from the flight tests and were qualitatively evaluated on those videos. The real data and the refined model has a restricted access, due to data leakage concerns, but the model architecture is about the same as presented in this work.