<a href="https://colab.research.google.com/github/anh1811/trajectory-prediction/blob/main/KITTIDataset.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import os
import torch
from torch.utils.data import TensorDataset, DataLoader,  SequentialSampler
from sklearn.model_selection import train_test_split
from torch import nn
from torch.optim.lr_scheduler import ReduceLROnPlateau
from tqdm.notebook import tqdm
from torch.nn import functional as F

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
%cd /content/drive/MyDrive
# %cd /content/drive/MyDrive/viettle/miniproject/sequencer

/content/drive/MyDrive


In [None]:
def xywh2xyxy(bbox):
  return bbox[0], bbox[1], bbox[2] + bbox[0], bbox[3] + bbox[1]

def xyxy2cxcy(bbox):
  return (bbox[0] + bbox[2])/2, (bbox[1] + bbox[3])/2, bbox[2] - bbox[0], bbox[3] - bbox[1] 


def xywh2cxcy(bbox):
  return bbox[0] + bbox[2]/2.0, bbox[1] + bbox[3]/2.0, bbox[2], bbox[3]

def Euclipe_dis(bbox1, bbox2):
  return np.sqrt((bbox1[0] - bbox2[0])**2 + (bbox1[1] - bbox2[1])**2)

def scale(bbox, h_pic = 1920, w_pic = 2560):
  if len(bbox) == 2:
    return bbox[0]/w_pic, bbox[1]/h_pic
  else: 
    return bbox[0]/w_pic, bbox[1]/h_pic, bbox[2]/w_pic, bbox[3]/h_pic

def checknois(bboxes):
  num_noise = 0
  for i in range(len(bboxes)-1):
    if Euclipe_dis(bboxes[i], bboxes[i+1]) > 0.1:
      num_noise += 1
  if num_noise >= 2:
    return True
  else:
    return False

def xywh2cxcyah(bbox):
  return bbox[0] + bbox[2]/2.0, bbox[1] + bbox[3]/2.0, bbox[3]/bbox[2], bbox[3]

def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

def generalized_iou(gt_bboxes, pr_bboxes, reduction='mean'):
    """
    gt_bboxes: tensor (-1, 4) xyxy
    pr_bboxes: tensor (-1, 4) xyxy
    loss proposed in the paper of giou
    """
    gt_area = (gt_bboxes[:, 2]-gt_bboxes[:, 0])*(gt_bboxes[:, 3]-gt_bboxes[:, 1])
    pr_area = (pr_bboxes[:, 2]-pr_bboxes[:, 0])*(pr_bboxes[:, 3]-pr_bboxes[:, 1])

    # iou
    lt = torch.max(gt_bboxes[:, :2], pr_bboxes[:, :2])
    rb = torch.min(gt_bboxes[:, 2:], pr_bboxes[:, 2:])
    TO_REMOVE = 0
    wh = (rb - lt + TO_REMOVE).clamp(min=0)
    inter = wh[:, 0] * wh[:, 1]
    union = gt_area + pr_area - inter
    iou = inter / union
    # # enclosure
    # lt = torch.min(gt_bboxes[:, :2], pr_bboxes[:, :2])
    # rb = torch.max(gt_bboxes[:, 2:], pr_bboxes[:, 2:])
    # wh = (rb - lt + TO_REMOVE).clamp(min=0)
    # enclosure = wh[:, 0] * wh[:, 1]

    # giou = iou - (enclosure-union)/enclosure
    # loss = 1. - giou
    if reduction == 'mean':
        iou = iou.mean()
    elif reduction == 'sum':
        iou =iou.sum()
    elif reduction == 'none':
        pass
    return iou

In [None]:
import os
folder_dir = './training/label_02'
list_dir = os.listdir(folder_dir)
b= dict()
for file in list_dir:
    train_dir = os.path.join(folder_dir, file)
    label  = train_dir.split('/')[-1]
    f = open(train_dir, "r")
    for line in f.readlines():
        a = (line.split(' '))
        if int(a[1]) >= 0:
            id = label + '_' + a[1]
            if id in b.keys():
                b[id].append(scale([float(coordinate) for coordinate in a[6:10]], h_pic = 512, w_pic=1382))
            else:
                b[id] = list()
                b[id].append(scale([float(coordinate) for coordinate in a[6:10]], h_pic = 512, w_pic=1382)) 

In [None]:
def create_train_list(list_ID, len_seq):
  train_list = []
  for items in list_ID.items():
    id_for_one_obj = items[1]
    len_items = len(id_for_one_obj)
    if len_items >= len_seq:
      for i,id in enumerate(id_for_one_obj[:-len_seq - 1]):
          train_list.append(list(id_for_one_obj[i:i+len_seq]))
      train_list.append(list(id_for_one_obj[-len_seq:]))
  return train_list

In [None]:
train_list = create_train_list(b, 6)
train_list[0]

[(0.4012200238784371, 0.32505196875, 0.48187896671490593, 0.530867029296875),
 (0.4142244667149059, 0.324641796875, 0.49494249348769903, 0.52942728125),
 (0.4271068306801737,
  0.324235603515625,
  0.5078791367583213,
  0.528001759765625),
 (0.43986882416787265, 0.32383333203125, 0.5206907358900145, 0.52659025390625),
 (0.45339447973950797,
  0.322442732421875,
  0.5340420390738061,
  0.52388970703125),
 (0.4667688986975398, 0.323074458984375, 0.5472892923299566, 0.52326152734375)]

In [None]:
import numpy as np

# def checkidle(bboxes):
#   return np.conv(bbox)
def create_dataset_kitti_(train_list, type_box = 'xyxy', transformer = False):
  datas = list()
  labels = list()
  for seq in train_list:
    bbox_cxcy = [xyxy2cxcy(bbox) for bbox in seq[:-1]]
    if np.var(bbox_cxcy) > 1e-5 and not checknois(bbox_cxcy):
      # bbox_seq = [xywh2xyxy(attr['bbox']) for attr in bbox_list]
      if type_box == 'xyxy':
        datas.append(seq[:-1])
        if transformer:
          label = seq[1:] + seq[-1]
        else:
          label = seq[-1]
        labels.append(label)
      elif type_box == 'cxcy':
        datas.append(bbox_cxcy)
        labels.append(xyxy2cxcy(seq[-1]))
      
  return np.array(datas, dtype='float32'), np.array(labels, dtype='float32')

In [None]:
x_scale, y_scale = create_dataset_kitti(train_list)

def dataloader(X, y, batch_size = 128):
  inputs = torch.tensor(X)
  labels = torch.tensor(y)

  data = TensorDataset(inputs, labels)
  sampler = SequentialSampler(data)
  dataloader = DataLoader(data, sampler=sampler,batch_size=batch_size)

  return dataloader

In [None]:
x_train, x_test, y_train, y_test = train_test_split(x_scale, y_scale, test_size=0.2, shuffle=True, random_state=42)
x_train, x_val, y_train, y_val = train_test_split(x_train, y_train, test_size=0.2, shuffle = True, random_state=42)

#Model

In [None]:
class LSTMRegression(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(LSTMRegression, self).__init__()

        # Defining the number of nodes in each layer
        self.hidden_dim = hidden_dim

        # LSTM layer
        self.lstm1 = nn.LSTM(input_dim, hidden_dim, batch_first=True)
        self.lstm2 = nn.LSTM(hidden_dim, hidden_dim, batch_first=True)
        self.lstm3 = nn.LSTM(2*hidden_dim, hidden_dim, batch_first=True)
        self.lstm4 = nn.LSTM(3*hidden_dim, hidden_dim, batch_first=True)
        self.lstm5 = nn.LSTM(4*hidden_dim, hidden_dim, batch_first=True)
        # self.lstm6 = nn.LSTM(5*hidden_dim, hidden_dim, batch_first=True)
        # self.lstm7 = nn.LSTM(6*hidden_dim, hidden_dim, batch_first=True)
        # self.lstm8 = nn.LSTM(7*hidden_dim, hidden_dim, batch_first=True)
        self.cnn1d = nn.Conv1d(in_channels=5*hidden_dim,out_channels=128, kernel_size=3, padding=1) # (64, 2048, 5) => (64, 128, 5)

        # Fully connected layer
        self.fc = nn.Linear(128, output_dim)
        self.sigmoid = nn.Sigmoid()
    
    def forward(self, x):
        h0, c0 = self.init_hidden(x)
        out1, (h1, c1) = self.lstm1(x, (h0, c0))
        out2, (h2, c2) = self.lstm2(out1, (h1, c1))
        x2 = torch.cat((out1, out2), 2)
        out3, (h3, c3) = self.lstm3(x2, (h2, c2))
        x3 = torch.cat((out1, out2, out3), 2)
        out4, (h4, c4) = self.lstm4(x3, (h3, c3))
        x4 = torch.cat((out1, out2, out3, out4), 2)
        out5, (h5, c5) = self.lstm5(x4, (h4, c4))
        x5 = torch.cat((out1, out2, out3, out4, out5), 2)
        # out6, (h6, c6) = self.lstm6(x5, (h5, c5))
        # x6 = torch.cat((out1, out2, out3, out4, out5, out6), 2)
        # out7, (h7, c7) = self.lstm7(x6, (h6, c6))
        # x7 = torch.cat((out1, out2, out3, out4, out5, out6, out7), 2)
        # out8, (h8, c8) = self.lstm8(x7, (h7, c7))
        # x8 = torch.cat((out1, out2, out3, out4, out5, out6, out7, out8), 2)
        x5 = torch.transpose(x5, 1, 2)
        out = self.cnn1d(x5)
        out = torch.tanh(out)
        out, indices = F.max_pool1d(out ,out.size(2), return_indices=True)
        out = out.squeeze()
        out = self.fc(out)
        out = self.sigmoid(out)

        return out
    
    def init_hidden(self, x):
        # Initializing hidden state for first input with zeros
        h0 = torch.zeros(1, x.size(0), self.hidden_dim)
        # Initializing cell state for first input with zeros
        c0 = torch.zeros(1, x.size(0), self.hidden_dim)
        return [t.cuda() for t in (h0, c0)]

In [None]:
class Attention(nn.Module):
    def __init__(self, hidden_size, units = 128):
        super(Attention, self).__init__()
        self.hidden_size = hidden_size
        self.concat_linear = nn.Linear(self.hidden_size * 2, units)
        self.attn = nn.Linear(self.hidden_size, hidden_size)
        self.other = torch.FloatTensor(1, hidden_size)

    def forward(self, outputs, final_hidden_state):
        # rnn_output.shape:         (batch_size, seq_len, hidden_size)
        # final_hidden_state.shape: (batch_size, hidden_size)
        # NOTE: hidden_size may also reflect bidirectional hidden states (hidden_size = num_directions * hidden_dim)
        batch_size, seq_len, _ = outputs.shape
        # if self.method == AttentionModel.DOT:
        #     attn_weights = torch.bmm(rnn_outputs, final_hidden_state.unsqueeze(2))
        # elif self.method == AttentionModel.GENERAL:
        attn_weights = self.attn(outputs) # (batch_size, seq_len, hidden_dim)
        attn_weights = torch.bmm(attn_weights, final_hidden_state.unsqueeze(2))
        attn_weights = F.softmax(attn_weights.squeeze(2), dim=1)

        context = torch.bmm(outputs.transpose(1, 2), attn_weights.unsqueeze(2)).squeeze(2)

        attn_hidden = torch.tanh(self.concat_linear(torch.cat((context, final_hidden_state), dim=1)))

        return attn_hidden


class LSTM_Attention(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(LSTM_Attention, self).__init__()

        # Defining the number of nodes in each layer
        self.hidden_dim = hidden_dim

        # LSTM layer
        self.lstm1 = nn.LSTM(input_dim, hidden_dim,  batch_first=True)
        self.dropout = nn.Dropout(p=0.2)
        self.lstm2 = nn.LSTM(hidden_dim, hidden_dim, batch_first=True)
        self.lstm3 = nn.LSTM(hidden_dim, hidden_dim, batch_first = True)
        # self.lstm3 = nn.LSTM(2*hidden_dim, hidden_dim, batch_first=True)
        # self.lstm4 = nn.LSTM(3*hidden_dim, hidden_dim, batch_first=True)
        # self.lstm5 = nn.LSTM(4*hidden_dim, hidden_dim, batch_first=True)
        self.attention = Attention(hidden_dim)

        # Fully connected layer
        self.fc1 = nn.Linear(128, 256)
        self.leakyRelu = nn.LeakyReLU()
        self.fc2 = nn.Linear(256, output_dim)
        self.sigmoid = nn.Sigmoid()
    
    def forward(self, x):
        batch_size = x.size(0)
        h0, c0 = self.init_hidden(x)
        out1, (h1, c1) = self.lstm1(x, (h0, c0))
        out1 = self.dropout(out1)
        out2, (h2, c2) = self.lstm2(out1, (h1, c1))
        out3, (h3,c3) = self.lstm3(out2, (h2,c2))
        out3 = self.dropout(out3)
        final_state = h3.squeeze(0)
        out = self.attention(out3, final_state)
        out = self.fc1(out)
        out = self.leakyRelu(out)
        out = self.sigmoid(self.fc2(out))

        return out
    
    def init_hidden(self, x):
        # Initializing hidden state for first input with zeros
        h0 = torch.zeros(1, x.size(0), self.hidden_dim)
        # Initializing cell state for first input with zeros
        c0 = torch.zeros(1, x.size(0), self.hidden_dim)
        return [t.cuda() for t in (h0, c0)]

In [None]:
import math
import warnings
def ciou_loss(pred, target, eps=1e-6, reduction='mean'):
    """`Implementation of paper `Enhancing Geometric Factors into
    Model Learning and Inference for Object Detection and Instance
    Segmentation <https://arxiv.org/abs/2005.03572>`_.

    Code is modified from https://github.com/Zzh-tju/CIoU.

    Args:
        pred (Tensor): Predicted bboxes of format (x1, y1, x2, y2),
            shape (n, 4).
        target (Tensor): Corresponding gt bboxes, shape (n, 4).
        eps (float): Eps to avoid log(0).
    Return:
        Tensor: Loss tensor.
    """
    # overlap
    lt = torch.max(pred[:, :2], target[:, :2])
    rb = torch.min(pred[:, 2:], target[:, 2:])
    wh = (rb - lt).clamp(min=0)
    overlap = wh[:, 0] * wh[:, 1]

    # union
    ap = (pred[:, 2] - pred[:, 0]) * (pred[:, 3] - pred[:, 1])
    ag = (target[:, 2] - target[:, 0]) * (target[:, 3] - target[:, 1])
    union = ap + ag - overlap + eps

    ious = overlap / union

    # enclose area
    enclose_x1y1 = torch.min(pred[:, :2], target[:, :2])
    enclose_x2y2 = torch.max(pred[:, 2:], target[:, 2:])
    enclose_wh = (enclose_x2y2 - enclose_x1y1).clamp(min=0)

    cw = enclose_wh[:, 0]
    ch = enclose_wh[:, 1]

    c2 = cw**2 + ch**2 + eps

    b1_x1, b1_y1 = pred[:, 0], pred[:, 1]
    b1_x2, b1_y2 = pred[:, 2], pred[:, 3]
    b2_x1, b2_y1 = target[:, 0], target[:, 1]
    b2_x2, b2_y2 = target[:, 2], target[:, 3]

    w1, h1 = b1_x2 - b1_x1, b1_y2 - b1_y1 + eps
    w2, h2 = b2_x2 - b2_x1, b2_y2 - b2_y1 + eps

    left = ((b2_x1 + b2_x2) - (b1_x1 + b1_x2))**2 / 4
    right = ((b2_y1 + b2_y2) - (b1_y1 + b1_y2))**2 / 4
    rho2 = left + right

    factor = 4 / math.pi**2
    v = factor * torch.pow(torch.atan(w2 / h2) - torch.atan(w1 / h1), 2)

    with torch.no_grad():
        alpha = (ious > 0.5).float() * v / (1 - ious + v)

    # CIoU
    cious = ious - (rho2 / c2 + alpha * v)
    loss = 1 - cious.clamp(min=-1.0, max=1.0)
    if reduction == 'mean':
        loss = loss.mean()
    elif reduction == 'sum':
        loss = loss.sum()
    elif reduction == 'none':
        pass
    return loss


class CIoULoss(nn.Module):

    def __init__(self, eps=1e-6, reduction='mean', loss_weight=1.0):
        super(CIoULoss, self).__init__()
        self.eps = eps
        self.reduction = reduction
        self.loss_weight = loss_weight

    def forward(self,
                pred,
                target,
                weight=None,
                avg_factor=None,
                reduction_override=None,
                **kwargs):
        if weight is not None and not torch.any(weight > 0):
            if pred.dim() == weight.dim() + 1:
                weight = weight.unsqueeze(1)
            return (pred * weight).sum()  # 0
        assert reduction_override in (None, 'none', 'mean', 'sum')
        reduction = (
            reduction_override if reduction_override else self.reduction)
        if weight is not None and weight.dim() > 1:
            # TODO: remove this in the future
            # reduce the weight of shape (n, 4) to (n,) to match the
            # giou_loss of shape (n,)
            assert weight.shape == pred.shape
            weight = weight.mean(-1)
        loss = self.loss_weight * ciou_loss(
            pred,
            target,
            # weight,
            eps=self.eps,
            reduction=reduction,
            # avg_factor=avg_factor,
            # **kwargs
        )
        return loss



class LossMSE_YOLOv1(nn.Module):

  def __init__(self, reduction = 'mean'):
      super(LossMSE_YOLOv1, self).__init__()
      self.mse = nn.MSELoss(reduction = reduction)
  
  def forward(self, prediction, target):
    loss = self.mse(prediction[:,:2], target[:,:2]) + self.mse(torch.sqrt(prediction[:,2:4]),torch.sqrt(target[:,2:4]))
    return loss.float()

In [None]:
def train(model, X_train, y_train, X_val, y_val, lr, epochs, writer, path_save, loss_type = 'ciou', patience = 12, transformer = False):
  train_dataloader = dataloader(X_train, y_train)
  val_dataloader = dataloader(X_val, y_val)

  if loss_type == 'ciou':
    criterion = CIoULoss()
  elif loss_type == 'mse':
    criterion = LossMSE_YOLOv1()
  opt = torch.optim.Adam(model.parameters(), lr=lr)
  sched = ReduceLROnPlateau(opt, threshold=1e-4, min_lr=1e-7, patience = patience)

  patience, trials = 25, 0
  train_losses, val_losses = [], []
  train_iou_losses, val_iou_losses = [], []
  val_loss_min = 100

  for epoch in range(epochs):
    print('======== Epoch {:} ========'.format(epoch + 1))

    total_loss = 0
    model.train()


    for step, batch in tqdm(enumerate(train_dataloader)):
      x_batch = batch[0].cuda()
      y_batch = batch[1].cuda()
      opt.zero_grad()
      out = model(x_batch)
      
      if transformer:
        num_fea = out.size(2)
        out = out.view(-1, num_fea)
        y_batch = y_batch.view(-1, num_fea)
      loss = criterion(out, y_batch)
      total_loss += loss.item()

      
      loss.backward()
      opt.step()

    train_loss = total_loss / len(train_dataloader)
    # train_iou_loss = giou_loss/ len(train_dataloader)

    model.eval()
    total_val_loss = 0
    val_iou_loss = 0

    with torch.no_grad():
      for batch in tqdm(val_dataloader):
        x_val = batch[0].cuda()
        y_val = batch[1].cuda()
        out = model(x_val)
        if transformer:
          num_fea = out.size(2)
          out = out.view(-1, num_fea)
          y_val = y_val.view(-1, num_fea)
        loss = criterion(out, y_val)
        total_val_loss += loss.item()
        # val_iou_loss += generalized_iou_loss(y_val, out)
      
      val_loss = total_val_loss/len(val_dataloader)
      # val_iou_loss = val_iou_loss/len(val_dataloader)
      sched.step(val_loss)

    if val_loss < val_loss_min:
        trials = 0
        torch.save({
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
            }, path_save)
        val_loss_min = val_loss
    else:
        trials += 1
        if trials >= patience:
            print(f'Early stopping on epoch {epoch + 1}')
            break
    
    print('Epoch[{}/{}]: train_loss: {:.10f}, val_loss:{:.10f}'.format(epoch, epochs, train_loss, val_loss))
    #add to tensorboard
    writer.add_scalars(f'loss traing', {
        'train': train_loss,
        'val': val_loss,
    }, epoch)
  return model

In [None]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

def generalized_iou(gt_bboxes, pr_bboxes, reduction='mean'):
    """
    gt_bboxes: tensor (-1, 4) xyxy
    pr_bboxes: tensor (-1, 4) xyxy
    loss proposed in the paper of giou
    """
    gt_area = (gt_bboxes[:, 2]-gt_bboxes[:, 0])*(gt_bboxes[:, 3]-gt_bboxes[:, 1])
    pr_area = (pr_bboxes[:, 2]-pr_bboxes[:, 0])*(pr_bboxes[:, 3]-pr_bboxes[:, 1])

    # iou
    lt = torch.max(gt_bboxes[:, :2], pr_bboxes[:, :2])
    rb = torch.min(gt_bboxes[:, 2:], pr_bboxes[:, 2:])
    TO_REMOVE = 0
    wh = (rb - lt + TO_REMOVE).clamp(min=0)
    inter = wh[:, 0] * wh[:, 1]
    union = gt_area + pr_area - inter
    iou = inter / union

    if reduction == 'mean':
        iou = iou.mean()
    elif reduction == 'sum':
        iou =iou.sum()
    elif reduction == 'none':
        pass
    return iou

In [None]:
x_train[0]

array([[0.4343453 , 0.3570865 , 0.4856105 , 0.4889641 ],
       [0.43418545, 0.35715538, 0.48464862, 0.48699075],
       [0.4338491 , 0.35637265, 0.4838297 , 0.48480797],
       [0.43351907, 0.3556022 , 0.48302627, 0.48266616],
       [0.4331952 , 0.3548437 , 0.48223794, 0.4805642 ]], dtype=float32)

In [None]:
num_frames_input = 6
from torch.utils.tensorboard import SummaryWriter
writer = SummaryWriter('./output/5lstm_KITTI{}'.format(num_frames_input))
input_dim = 4 
hidden_dim = 256
output_dim = 4
# path_pretrained = './output/Lstm_Attetion.bestweight{}'.format(num_frames_input)
path_pretrained = './output/5lstm_CIOULoss.bestweight{}'.format(num_frames_input)
path_save = './output/5lstm_KITTI.bestweight{}'.format(num_frames_input)

lr = 0.0001
n_epochs = 30
# model = LSTM_Attention(input_dim, hidden_dim, output_dim)
model = LSTMRegression(input_dim, hidden_dim, output_dim)
# model = model.cuda()
# model = train(model, x_train, y_train, x_val, y_val,lr = lr, epochs = n_epochs, writer = writer, path_save = path_save)

checkpoint = torch.load(path_pretrained)
model.load_state_dict(checkpoint['model_state_dict'])
model = model.cuda()

In [None]:
out = model(torch.as_tensor(x_train[:10]).cuda())
out.size()



torch.Size([10, 4])

In [None]:
model = train(model, x_train, y_train, x_val, y_val,lr = lr, epochs = n_epochs, writer = writer, path_save = path_save)



0it [00:00, ?it/s]



  0%|          | 0/53 [00:00<?, ?it/s]

Epoch[0/30]: train_loss: 0.1503484082, val_loss:0.1305411586


0it [00:00, ?it/s]

  0%|          | 0/53 [00:00<?, ?it/s]

Epoch[1/30]: train_loss: 0.1189330717, val_loss:0.1169470498


0it [00:00, ?it/s]

  0%|          | 0/53 [00:00<?, ?it/s]

Epoch[2/30]: train_loss: 0.1099972064, val_loss:0.1094960915


0it [00:00, ?it/s]

  0%|          | 0/53 [00:00<?, ?it/s]

Epoch[3/30]: train_loss: 0.1061674618, val_loss:0.1060918726


0it [00:00, ?it/s]

  0%|          | 0/53 [00:00<?, ?it/s]

Epoch[4/30]: train_loss: 0.1033574283, val_loss:0.1038693613


0it [00:00, ?it/s]

  0%|          | 0/53 [00:00<?, ?it/s]

Epoch[5/30]: train_loss: 0.1010441753, val_loss:0.0998689272


0it [00:00, ?it/s]

  0%|          | 0/53 [00:00<?, ?it/s]

Epoch[6/30]: train_loss: 0.0990255190, val_loss:0.1011085600


0it [00:00, ?it/s]

  0%|          | 0/53 [00:00<?, ?it/s]

Epoch[7/30]: train_loss: 0.0964804500, val_loss:0.0930847205


0it [00:00, ?it/s]

  0%|          | 0/53 [00:00<?, ?it/s]

Epoch[8/30]: train_loss: 0.0967188901, val_loss:0.1016970751


0it [00:00, ?it/s]

  0%|          | 0/53 [00:00<?, ?it/s]

Epoch[9/30]: train_loss: 0.0949817279, val_loss:0.0987045411


0it [00:00, ?it/s]

  0%|          | 0/53 [00:00<?, ?it/s]

Epoch[10/30]: train_loss: 0.0941465413, val_loss:0.0883299050


0it [00:00, ?it/s]

  0%|          | 0/53 [00:00<?, ?it/s]

Epoch[11/30]: train_loss: 0.0942792857, val_loss:0.0947961092


0it [00:00, ?it/s]

  0%|          | 0/53 [00:00<?, ?it/s]

Epoch[12/30]: train_loss: 0.0923908126, val_loss:0.0891271913


0it [00:00, ?it/s]

  0%|          | 0/53 [00:00<?, ?it/s]

Epoch[13/30]: train_loss: 0.0901020499, val_loss:0.0897107741


0it [00:00, ?it/s]

  0%|          | 0/53 [00:00<?, ?it/s]

Epoch[14/30]: train_loss: 0.0905776504, val_loss:0.0867000829


0it [00:00, ?it/s]

  0%|          | 0/53 [00:00<?, ?it/s]

Epoch[15/30]: train_loss: 0.0905538257, val_loss:0.0887271368


0it [00:00, ?it/s]

  0%|          | 0/53 [00:00<?, ?it/s]

Epoch[16/30]: train_loss: 0.0889824625, val_loss:0.0894962500


0it [00:00, ?it/s]

  0%|          | 0/53 [00:00<?, ?it/s]

Epoch[17/30]: train_loss: 0.0905504797, val_loss:0.0897289270


0it [00:00, ?it/s]

  0%|          | 0/53 [00:00<?, ?it/s]

Epoch[18/30]: train_loss: 0.0897158181, val_loss:0.0921696963


0it [00:00, ?it/s]

  0%|          | 0/53 [00:00<?, ?it/s]

Epoch[19/30]: train_loss: 0.0879813855, val_loss:0.0891527397


0it [00:00, ?it/s]

  0%|          | 0/53 [00:00<?, ?it/s]

Epoch[20/30]: train_loss: 0.0879857904, val_loss:0.0895667624


0it [00:00, ?it/s]

  0%|          | 0/53 [00:00<?, ?it/s]

Epoch[21/30]: train_loss: 0.0877922621, val_loss:0.0848890998


0it [00:00, ?it/s]

  0%|          | 0/53 [00:00<?, ?it/s]

Epoch[22/30]: train_loss: 0.0861173721, val_loss:0.0869002889


0it [00:00, ?it/s]

  0%|          | 0/53 [00:00<?, ?it/s]

Epoch[23/30]: train_loss: 0.0871789245, val_loss:0.0830645485


0it [00:00, ?it/s]

  0%|          | 0/53 [00:00<?, ?it/s]

Epoch[24/30]: train_loss: 0.0863016999, val_loss:0.0864532948


0it [00:00, ?it/s]

  0%|          | 0/53 [00:00<?, ?it/s]

Epoch[25/30]: train_loss: 0.0841270044, val_loss:0.0866921508


0it [00:00, ?it/s]

  0%|          | 0/53 [00:00<?, ?it/s]

Epoch[26/30]: train_loss: 0.0855763067, val_loss:0.0830957485


0it [00:00, ?it/s]

  0%|          | 0/53 [00:00<?, ?it/s]

Epoch[27/30]: train_loss: 0.0860814358, val_loss:0.0847714812


0it [00:00, ?it/s]

  0%|          | 0/53 [00:00<?, ?it/s]

Epoch[28/30]: train_loss: 0.0848659504, val_loss:0.0817291627


0it [00:00, ?it/s]

  0%|          | 0/53 [00:00<?, ?it/s]

Epoch[29/30]: train_loss: 0.0833238756, val_loss:0.0857878219


In [None]:
def batch_xyxy2cxcy(bboxes):
  batch_size = bboxes.size(0)
  bbox_cxcy = torch.empty((batch_size,2), dtype=torch.float32)
  bbox_cxcy[:,0] = (bboxes[:,0] + bboxes[:,2])/2.
  bbox_cxcy[:,1] = (bboxes[:,1] + bboxes[:,3])/2.
  return bbox_cxcy


In [None]:
model.eval()
with torch.no_grad():
  pred = model(torch.as_tensor(x_test).cuda())
  y_true = torch.as_tensor(y_test).cuda()
  loss = nn.MSELoss()
  print('mseLoss = {}'.format(loss(batch_xyxy2cxcy(pred), batch_xyxy2cxcy(y_true))))
  print('IOU = {}'.format(generalized_iou(y_true, pred)))

mseLoss = 8.240666829806287e-06
IOU = 0.9142970442771912




#Karmal Filter


In [None]:
!pip install filterpy

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting filterpy
  Downloading filterpy-1.4.5.zip (177 kB)
[K     |████████████████████████████████| 177 kB 31.2 MB/s 
Building wheels for collected packages: filterpy
  Building wheel for filterpy (setup.py) ... [?25l[?25hdone
  Created wheel for filterpy: filename=filterpy-1.4.5-py3-none-any.whl size=110474 sha256=239d0580a0b3214d1b6f92089d60440ee2cd80dba22b8e94469b28f25d482eec
  Stored in directory: /root/.cache/pip/wheels/ce/e0/ee/a2b3c5caab3418c1ccd8c4de573d4cbe13315d7e8b0a55fbc2
Successfully built filterpy
Installing collected packages: filterpy
Successfully installed filterpy-1.4.5


In [None]:
from filterpy.kalman import KalmanFilter
def convert_bbox_to_z(bbox):
  """
  Takes a bounding box in the form [x1,y1,x2,y2] and returns z in the form
    [x,y,s,r] where x,y is the centre of the box and s is the scale/area and r is
    the aspect ratio
  """
  w = bbox[2] - bbox[0]
  h = bbox[3] - bbox[1]
  x = bbox[0] + w/2.
  y = bbox[1] + h/2.
  s = w * h    #scale is just area
  r = w / float(h)
  return np.array([x, y, s, r]).reshape((4, 1))


def convert_x_to_bbox(x,score=None):
  """
  Takes a bounding box in the centre form [x,y,s,r] and returns it in the form
    [x1,y1,x2,y2] where x1,y1 is the top left and x2,y2 is the bottom right
  """
  w = np.sqrt(x[2] * x[3])
  h = x[2] / w
  if(score==None):
    return np.clip(np.array([x[0]-w/2.,x[1]-h/2.,x[0]+w/2.,x[1]+h/2.]), 0, None).reshape((1,4))
  else:
    return np.array([x[0]-w/2.,x[1]-h/2.,x[0]+w/2.,x[1]+h/2.,score]).reshape((1,5))

class KalmanBoxTracker(object):
  """
  This class represents the internal state of individual tracked objects observed as bbox.
  """
  count = 0
  def __init__(self,bbox):
    """
    Initialises a tracker using initial bounding box.
    """
    #define constant velocity model
    self.kf = KalmanFilter(dim_x=7, dim_z=4) 
    self.kf.F = np.array([[1,0,0,0,1,0,0],[0,1,0,0,0,1,0],[0,0,1,0,0,0,1],[0,0,0,1,0,0,0],  [0,0,0,0,1,0,0],[0,0,0,0,0,1,0],[0,0,0,0,0,0,1]])
    self.kf.H = np.array([[1,0,0,0,0,0,0],[0,1,0,0,0,0,0],[0,0,1,0,0,0,0],[0,0,0,1,0,0,0]])

    self.kf.R[2:,2:] *= 10.
    self.kf.P[4:,4:] *= 1000. #give high uncertainty to the unobservable initial velocities
    self.kf.P *= 10.
    self.kf.Q[-1,-1] *= 0.01
    self.kf.Q[4:,4:] *= 0.01

    self.kf.x[:4] = convert_bbox_to_z(bbox)
    self.time_since_update = 0
    self.id = KalmanBoxTracker.count
    KalmanBoxTracker.count += 1
    self.history = []
    self.hits = 0
    self.hit_streak = 0
    self.age = 0

  def update(self,bbox):
    """
    Updates the state vector with observed bbox.
    """
    self.time_since_update = 0
    self.history = []
    self.hits += 1
    self.hit_streak += 1
    self.kf.update(convert_bbox_to_z(bbox))

  def predict(self):
    """
    Advances the state vector and returns the predicted bounding box estimate.
    """
    if((self.kf.x[6]+self.kf.x[2])<=0):
      self.kf.x[6] *= 0.0
    self.kf.predict()
    self.age += 1
    if(self.time_since_update>0):
      self.hit_streak = 0
    self.time_since_update += 1
    self.history.append(convert_x_to_bbox(self.kf.x))
    return self.history[-1]

  def get_state(self):
    """
    Returns the current bounding box estimate.
    """
    return convert_x_to_bbox(self.kf.x)

In [None]:
# !pip install tensorflow_addons
# import tensorflow_addons as tfa
# metrics = tfa.losses.GIoULoss(mode = 'iou')

In [None]:
import numpy as np
import scipy.linalg
"""
Table for the 0.95 quantile of the chi-square distribution with N degrees of
freedom (contains values for N=1, ..., 9). Taken from MATLAB/Octave's chi2inv
function and used as Mahalanobis gating threshold.
"""
chi2inv95 = {
    1: 3.8415,
    2: 5.9915,
    3: 7.8147,
    4: 9.4877,
    5: 11.070,
    6: 12.592,
    7: 14.067,
    8: 15.507,
    9: 16.919}


class Kalman_Filter(object):
    """
    A simple Kalman filter for tracking bounding boxes in image space.
    The 8-dimensional state space
        x, y, a, h, vx, vy, va, vh
    contains the bounding box center position (x, y), aspect ratio a, height h,
    and their respective velocities.
    Object motion follows a constant velocity model. The bounding box location
    (x, y, a, h) is taken as direct observation of the state space (linear
    observation model).
    """

    def __init__(self):
        ndim, dt = 4, 1.

        # Create Kalman filter model matrices.
        self._motion_mat = np.eye(2 * ndim, 2 * ndim)
        for i in range(ndim):
            self._motion_mat[i, ndim + i] = dt
        self._update_mat = np.eye(ndim, 2 * ndim)

        # Motion and observation uncertainty are chosen relative to the current
        # state estimate. These weights control the amount of uncertainty in
        # the model. This is a bit hacky.
        self._std_weight_position = 1. / 20
        self._std_weight_velocity = 1. / 160

    def initiate(self, measurement):
        """Create track from unassociated measurement.
        Parameters
        ----------
        measurement : ndarray
            Bounding box coordinates (x, y, a, h) with center position (x, y),
            aspect ratio a, and height h.
        Returns
        -------
        (ndarray, ndarray)
            Returns the mean vector (8 dimensional) and covariance matrix (8x8
            dimensional) of the new track. Unobserved velocities are initialized
            to 0 mean.
        """
        mean_pos = measurement
        mean_vel = np.zeros_like(mean_pos)
        mean = np.r_[mean_pos, mean_vel]

        std = [
            2 * self._std_weight_position * measurement[3],
            2 * self._std_weight_position * measurement[3],
            1e-2,
            2 * self._std_weight_position * measurement[3],
            10 * self._std_weight_velocity * measurement[3],
            10 * self._std_weight_velocity * measurement[3],
            1e-5,
            10 * self._std_weight_velocity * measurement[3]]
        covariance = np.diag(np.square(std))
        return mean, covariance

    def predict(self, mean, covariance):
        """Run Kalman filter prediction step.
        Parameters
        ----------
        mean : ndarray
            The 8 dimensional mean vector of the object state at the previous
            time step.
        covariance : ndarray
            The 8x8 dimensional covariance matrix of the object state at the
            previous time step.
        Returns
        -------
        (ndarray, ndarray)
            Returns the mean vector and covariance matrix of the predicted
            state. Unobserved velocities are initialized to 0 mean.
        """
        std_pos = [
            self._std_weight_position * mean[3],
            self._std_weight_position * mean[3],
            1e-2,
            self._std_weight_position * mean[3]]
        std_vel = [
            self._std_weight_velocity * mean[3],
            self._std_weight_velocity * mean[3],
            1e-5,
            self._std_weight_velocity * mean[3]]
        motion_cov = np.diag(np.square(np.r_[std_pos, std_vel]))

        mean = np.dot(self._motion_mat, mean)
        covariance = np.linalg.multi_dot((
            self._motion_mat, covariance, self._motion_mat.T)) + motion_cov

        return mean, covariance

    def project(self, mean, covariance):
        """Project state distribution to measurement space.
        Parameters
        ----------
        mean : ndarray
            The state's mean vector (8 dimensional array).
        covariance : ndarray
            The state's covariance matrix (8x8 dimensional).
        Returns
        -------
        (ndarray, ndarray)
            Returns the projected mean and covariance matrix of the given state
            estimate.
        """
        std = [
            self._std_weight_position * mean[3],
            self._std_weight_position * mean[3],
            1e-1,
            self._std_weight_position * mean[3]]
        innovation_cov = np.diag(np.square(std))

        mean = np.dot(self._update_mat, mean)
        covariance = np.linalg.multi_dot((
            self._update_mat, covariance, self._update_mat.T))
        return mean, covariance + innovation_cov

    def update(self, mean, covariance, measurement):
        """Run Kalman filter correction step.
        Parameters
        ----------
        mean : ndarray
            The predicted state's mean vector (8 dimensional).
        covariance : ndarray
            The state's covariance matrix (8x8 dimensional).
        measurement : ndarray
            The 4 dimensional measurement vector (x, y, a, h), where (x, y)
            is the center position, a the aspect ratio, and h the height of the
            bounding box.
        Returns
        -------
        (ndarray, ndarray)
            Returns the measurement-corrected state distribution.
        """
        projected_mean, projected_cov = self.project(mean, covariance)

        chol_factor, lower = scipy.linalg.cho_factor(
            projected_cov, lower=True, check_finite=False)
        kalman_gain = scipy.linalg.cho_solve(
            (chol_factor, lower), np.dot(covariance, self._update_mat.T).T,
            check_finite=False).T
        innovation = measurement - projected_mean

        new_mean = mean + np.dot(innovation, kalman_gain.T)
        new_covariance = covariance - np.linalg.multi_dot((
            kalman_gain, projected_cov, kalman_gain.T))
        return new_mean, new_covariance

    def gating_distance(self, mean, covariance, measurements,
                        only_position=False):
        """Compute gating distance between state distribution and measurements.
        A suitable distance threshold can be obtained from `chi2inv95`. If
        `only_position` is False, the chi-square distribution has 4 degrees of
        freedom, otherwise 2.
        Parameters
        ----------
        mean : ndarray
            Mean vector over the state distribution (8 dimensional).
        covariance : ndarray
            Covariance of the state distribution (8x8 dimensional).
        measurements : ndarray
            An Nx4 dimensional matrix of N measurements, each in
            format (x, y, a, h) where (x, y) is the bounding box center
            position, a the aspect ratio, and h the height.
        only_position : Optional[bool]
            If True, distance computation is done with respect to the bounding
            box center position only.
        Returns
        -------
        ndarray
            Returns an array of length N, where the i-th element contains the
            squared Mahalanobis distance between (mean, covariance) and
            `measurements[i]`.
        """
        mean, covariance = self.project(mean, covariance)
        if only_position:
            mean, covariance = mean[:2], covariance[:2, :2]
            measurements = measurements[:, :2]

        cholesky_factor = np.linalg.cholesky(covariance)
        d = measurements - mean
        z = scipy.linalg.solve_triangular(
            cholesky_factor, d.T, lower=True, check_finite=False,
            overwrite_b=True)
        squared_maha = np.sum(z * z, axis=0)
        return squared_maha

In [None]:
y_predict = []
model.eval()
with torch.no_grad():
  for x in x_test:
    kalman = KalmanBoxTracker(x[0])
    for seq in x[1:]:
      kalman.predict()
      kalman.update(seq)
    y_predict.append(kalman.predict())

y_predict = np.array(y_predict)
y_predict = y_predict.squeeze()
loss = nn.MSELoss()
print('mseLoss = {}'.format(loss(batch_xyxy2cxcy(torch.from_numpy(y_test)), batch_xyxy2cxcy(torch.from_numpy(y_predict)))))
print('IOU = {}'.format(generalized_iou(torch.from_numpy(y_test), torch.from_numpy(y_predict))))

mseLoss = 1.7858867067843676e-05
IOU = 0.8835493537051713
