## Tracking without bells and whistles
- Tracking without bells and whistles의 Tracktor.py에 해당하는 코드
- paper : https://arxiv.org/pdf/1903.05625.pdf
- code : https://github.com/phil-bergmann/tracking_wo_bnw


#### Tracktor에 대한 설명

- for t=0, detection의 결과로 tracker intialize
- for t>0, 2 step :   
    ○ __bbox regression__(아래 그림에서 파란색 arrow)  
    ○ __track intialization__(아래 그림에서 빨간색 arrow)
<img src='img/bell_whistles.png' width='600'>

#### Bouding box regression
- __$b^k_{t-1}$ 를 이용하여 $b_t^k$ 를 regression__
- Faster R-CNN의 경우에는, __이전 프레임 좌표에 대한 RoI pooling__을 현재 프레임에 대하여 적용 (가정 : high frame rates에 의해 target이 약간만 움직였을 것)
- 그래서 자동으로 __ID가 현재 프레임으로 이동__할 것이고, 이로 인해 짧은 trajectory가 만들어진다.
- tracjectory가 deactivating되는 두 가지 경우  
    ○ object가 프레임을 벗어나거나 non-object에 의해 가려지는 경우  
    ○ object간의 occlusion가 일어났을 때, 이에 대해  NMS를 적용하였더니 bouding box가 삭제되는 경우

#### Bounding box initialization
- 이전 프레임에 등장하지 않았던 __새로운 target__ 또한 고려하기 위해 t시점의 전체 프레임에 대한 __detection $D_t$__ 가 주어진다. 
- __$D_t$  중 active trajectories $b_t^k$ 와의 IoU가 \$lambda_new\$ 보다 작은 경우에만 initialize__ (즉, 어떤 trajectory로도 설명되지 않는 새로운 tracklet를 만들어주기 위함)
- 주목할 점은 어떠한 tracking specific training이나 optimization을 필요로 하지 않고 단순히 object detection method에만 영향을 받는다는 점이다 (따라서 다른 dataset이나 scenarios에도 쉽게 적용가능)

In [None]:
from collections import deque

import numpy as np
import torch
import torch.nn.functional as F
from torch.autograd import Variable
from scipy.optimize import linear_sum_assignment
import cv2

from .utils import bbox_overlaps, warp_pos, get_center, get_height, get_width, make_pos

from torchvision.ops.boxes import clip_boxes_to_image, nms

In [None]:
class Tracker:
    """The main tracking file, here is where magic happens."""
    # only track pedestrian
    cl = 1

    def __init__(self, obj_detect, reid_network, tracker_cfg):
        self.obj_detect = obj_detect  # detection model  (Faster R-CNN 이용)
        self.reid_network = reid_network  # Siamese CNN (official에서는 resnet 이용)
        self.detection_person_thresh = tracker_cfg['detection_person_thresh']
        self.regression_person_thresh = tracker_cfg['regression_person_thresh']
        self.detection_nms_thresh = tracker_cfg['detection_nms_thresh']
        self.regression_nms_thresh = tracker_cfg['regression_nms_thresh']
        self.public_detections = tracker_cfg['public_detections']  # 공개 데이터셋 여부 (특정 모델의 result(pred bbox)를 들고 올 수 있는지)
        self.inactive_patience = tracker_cfg['inactive_patience']  # inactive되는 기간이 inactive patience 보다 길면 remove
        self.do_reid = tracker_cfg['do_reid']  # re-ID를 수행 여부 (T/F)
        self.max_features_num = tracker_cfg['max_features_num']  # Track에서 고려할 feature 수
        self.reid_sim_threshold = tracker_cfg['reid_sim_threshold']  # re-ID를 위한 similiarity measure (distance)에 대한 threshold 
        self.reid_iou_threshold = tracker_cfg['reid_iou_threshold']  # re-ID를 위한 iou threshold
        self.do_align = tracker_cfg['do_align']  # ECC(Enhanced Correlation Coefficient)를 사용한 motion compensation 적용 여부
                                                 # 사용하지 않으면 적은 수치의 등속으로 가정
        self.motion_model_cfg = tracker_cfg['motion_model']  # moction compensation 사용여부


        self.warp_mode = getattr(cv2, tracker_cfg['warp_mode'])
        self.number_of_iterations = tracker_cfg['number_of_iterations']
        self.termination_eps = tracker_cfg['termination_eps']

        self.tracks = []  # active tracks
        self.inactive_tracks = []  # inactive tracks
        self.track_num = 0  # 지금까지 만들어진 track 수 
        self.im_index = 0  # image index
        self.results = {}

    # reset tracks
    def reset(self, hard=True):
        self.tracks = []  # tracker 초기화
        self.inactive_tracks = []

        if hard:
            self.track_num = 0
            self.results = {}
            self.im_index = 0

    # Make inactive tracks list 
    def tracks_to_inactive(self, tracks):
        self.tracks = [t for t in self.tracks if t not in tracks]
        for t in tracks:
            t.pos = t.last_pos[-1]
        self.inactive_tracks += tracks

    # Add New tracks
    def add(self, new_det_pos, new_det_scores, new_det_features):
        """Initializes new Track objects and saves them."""
        num_new = new_det_pos.size(0)  # size : 몇 개의 요소가 있는지 ex) 3*4인 2차원 array -> 12
        for i in range(num_new):
            self.tracks.append(Track(
                new_det_pos[i].view(1, -1),
                new_det_scores[i],
                self.track_num + i,
                new_det_features[i].view(1, -1),
                self.inactive_patience,
                self.max_features_num,
                self.motion_model_cfg['n_steps'] if self.motion_model_cfg['n_steps'] > 0 else 1
            ))
        self.track_num += num_new

    def regress_tracks(self, blob):
        """
        Regress the position of the tracks and also checks their scores.
        """
        pos = self.get_pos()  # active track에 있는 positions (이전 프레임의 position) 가져오기

        # regress
        boxes, scores = self.obj_detect.predict_boxes(pos)  # Facter RCNN의 RoI head 부분 (RoI Pooling하고, Bbox regression & Classification)
        pos = clip_boxes_to_image(boxes, blob['img'].shape[-2:])    # box가 image안에 포함되도록 image를 벗어나는 x, y 좌표 clamp해주기
 
        s = []
        for i in range(len(self.tracks) - 1, -1, -1):   # 뒤에서부터
            t = self.tracks[i]
            t.score = scores[i]

            # score가 threshold보다 작으면 inactive (self.tracks에서 빼주고 inactive_tracks에 넣어주기)
            if scores[i] <= self.regression_person_thresh:  
                self.tracks_to_inactive([t])
            
            # score가 threshold보다 크면
            else:
                s.append(scores[i])
                t.pos = pos[i].view(1, -1)

        return torch.Tensor(s[::-1]).cuda()  # 나중에 들어온 track에 대해서 먼저 score를 넣어줬기 때문에 list reverse

    def get_pos(self):
        """Get the positions of all active tracks."""
        if len(self.tracks) == 1:
            pos = self.tracks[0].pos
        elif len(self.tracks) > 1:
            pos = torch.cat([t.pos for t in self.tracks], 0)
        else:
            pos = torch.zeros(0).cuda()
        return pos

    def get_features(self):
        """Get the features of all active tracks."""
        if len(self.tracks) == 1:
            features = self.tracks[0].features
        elif len(self.tracks) > 1:
            features = torch.cat([t.features for t in self.tracks], 0)
        else:
            features = torch.zeros(0).cuda()
        return features

    def get_inactive_features(self):
        """Get the features of all inactive tracks."""
        if len(self.inactive_tracks) == 1:
            features = self.inactive_tracks[0].features
        elif len(self.inactive_tracks) > 1:
            features = torch.cat([t.features for t in self.inactive_tracks], 0)
        else:
            features = torch.zeros(0).cuda()
        return features

    def reid(self, blob, new_det_pos, new_det_scores):
        """Tries to ReID inactive tracks with new detections."""
        new_det_features = [torch.zeros(0).cuda() for _ in range(len(new_det_pos))]

        if self.do_reid:
            # new_det_pos의 appearance feature 구하기
            # 이때 blob['img'] (BS, c, h, w)에 대해서 net_det_pos 부분을 crop해서 reid network에 forward 시킴
            new_det_features = self.reid_network.test_rois(
                blob['img'], new_det_pos).data

            # inactive track과의 reID (active track과 matching된 게 없으므로)
            if len(self.inactive_tracks) >= 1:

                # calculate appearance distances
                dist_mat, pos = [], []
                for t in self.inactive_tracks:

                    # 기존 (inactive) track과의 appearance distance matrix
                    dist_mat.append(torch.cat([t.test_features(feat.view(1, -1))   
                                               for feat in new_det_features], dim=1))
                    pos.append(t.pos)

                if len(dist_mat) > 1:
                    dist_mat = torch.cat(dist_mat, 0)
                    pos = torch.cat(pos, 0)
                else:
                    dist_mat = dist_mat[0]
                    pos = pos[0]

                # calculate IoU distance (IoU가 threshold보다 작은 값들에 대해 dist_mat 행렬에 큰 값을 부여하기 위해, 
                # 즉 거리를 멀게 하여 assignment를 어렵게 하기 위해)
                iou = bbox_overlaps(pos, new_det_pos)
                iou_mask = torch.ge(iou, self.reid_iou_threshold)
                iou_neg_mask = ~iou_mask

                # make all impossible assignments to the same add big value
                dist_mat = dist_mat * iou_mask.float() + iou_neg_mask.float() * 1000  # iou가 threshold보다 작은 값들에 대하여 (iou_neg_mask) 1000를 더해줌
                dist_mat = dist_mat.cpu().numpy()

                row_ind, col_ind = linear_sum_assignment(dist_mat)  # Hungarian algorithm을 이용해 assignment 해주기
                # row : inactive_tracks, col : new_det_pos

                # row_ind, col_ind를 이용하여 row_ind에 해당하는 inactive_trackers를 self.tracks에 넣어주고,
                # position update, 새로 계산한 new_det_feature를 features에 add 해주기
                assigned = []
                remove_inactive = []
                for r, c in zip(row_ind, col_ind):
                    if dist_mat[r, c] <= self.reid_sim_threshold:
                        t = self.inactive_tracks[r]
                        self.tracks.append(t)
                        t.count_inactive = 0
                        t.pos = new_det_pos[c].view(1, -1)
                        t.reset_last_pos()
                        t.add_features(new_det_features[c].view(1, -1))
                        assigned.append(c)
                        remove_inactive.append(t)

                for t in remove_inactive:
                    self.inactive_tracks.remove(t)

                # re-ID되지 않은, 즉 처음으로 나온 object를 keep해서 이것을 리턴해줌
                keep = torch.Tensor([i for i in range(new_det_pos.size(0)) if i not in assigned]).long().cuda()
                if keep.nelement() > 0:
                    new_det_pos = new_det_pos[keep]
                    new_det_scores = new_det_scores[keep]
                    new_det_features = new_det_features[keep]
                else:
                    new_det_pos = torch.zeros(0).cuda()
                    new_det_scores = torch.zeros(0).cuda()
                    new_det_features = torch.zeros(0).cuda()

        return new_det_pos, new_det_scores, new_det_features

    def get_appearances(self, blob):
        """Uses the siamese CNN to get the features for all active tracks."""
        new_features = self.reid_network.test_rois(blob['img'], self.get_pos()).data
        return new_features

    def add_features(self, new_features):
        """Adds new appearance features to active tracks."""
        for t, f in zip(self.tracks, new_features):
            t.add_features(f.view(1, -1))

    def align(self, blob):
        """Aligns the positions of active and inactive tracks depending on camera motion."""
        if self.im_index > 0:
            im1 = np.transpose(self.last_image.cpu().numpy(), (1, 2, 0))
            im2 = np.transpose(blob['img'][0].cpu().numpy(), (1, 2, 0))
            im1_gray = cv2.cvtColor(im1, cv2.COLOR_RGB2GRAY)
            im2_gray = cv2.cvtColor(im2, cv2.COLOR_RGB2GRAY)
            warp_matrix = np.eye(2, 3, dtype=np.float32)
            criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, self.number_of_iterations,  self.termination_eps)
            cc, warp_matrix = cv2.findTransformECC(im1_gray, im2_gray, warp_matrix, self.warp_mode, criteria)
            warp_matrix = torch.from_numpy(warp_matrix)

            for t in self.tracks:
                t.pos = warp_pos(t.pos, warp_matrix)
                # t.pos = clip_boxes(Variable(pos), blob['im_info'][0][:2]).data

            if self.do_reid:
                for t in self.inactive_tracks:
                    t.pos = warp_pos(t.pos, warp_matrix)

            if self.motion_model_cfg['enabled']:
                for t in self.tracks:
                    for i in range(len(t.last_pos)):
                        t.last_pos[i] = warp_pos(t.last_pos[i], warp_matrix)

    def motion_step(self, track):
        """Updates the given track's position by one step based on track.last_v"""
        if self.motion_model_cfg['center_only']:
            center_new = get_center(track.pos) + track.last_v
            track.pos = make_pos(*center_new, get_width(track.pos), get_height(track.pos))
        else:
            track.pos = track.pos + track.last_v

    def motion(self):
        """Applies a simple linear motion model that considers the last n_steps steps."""
        for t in self.tracks:
            last_pos = list(t.last_pos)

            # avg velocity between each pair of consecutive positions in t.last_pos
            if self.motion_model_cfg['center_only']:
                vs = [get_center(p2) - get_center(p1) for p1, p2 in zip(last_pos, last_pos[1:])]
            else:
                vs = [p2 - p1 for p1, p2 in zip(last_pos, last_pos[1:])]

            t.last_v = torch.stack(vs).mean(dim=0)  
            self.motion_step(t)

        if self.do_reid:
            for t in self.inactive_tracks:
                if t.last_v.nelement() > 0:
                    self.motion_step(t)


    def step(self, blob):
        """
        매 시점마다 실행되는 함수

        This function should be called every timestep to perform tracking with a blob
        containing the image information.
        """

        # 각 track들의 현재 pos를 last_pos에 append
        for t in self.tracks:
            t.last_pos.append(t.pos.clone())

        ###########################
        # Look for new detections #
        ###########################
        '''
        공개된 데이터셋이라면 그 결과(detection 결과)를 이용하고 아니라면 detection 진행
        detect된 box가 있다면 해당 box와 score를 det_pos, det_score로 설정
        '''

        self.obj_detect.load_image(blob['img'])

        # 공개된 데이터셋이면 결과가 있으므로 그 결과를 이용
        if self.public_detections:
            dets = blob['dets'].squeeze(dim=0)  # dets를 가져옴
            
            # dets가 있으면, RPN(Region Proposal Network)를 진행하지 않고 bbox regression과 classification만 진행
            if dets.nelement() > 0:
                boxes, scores = self.obj_detect.predict_boxes(dets)
            # dets가 없으면 boxes와 scores를 빈 텐서로 만들어줌
            else:
                boxes = scores = torch.zeros(0).cuda()

        # 공개된 데이터셋이 아니면 detection 진행
        else:
            boxes, scores = self.obj_detect.detect(blob['img'])


        # boxes가 있다면 image안에 들어오도록 좌표를 clamp 해줌
        if boxes.nelement() > 0:
            boxes = clip_boxes_to_image(boxes, blob['img'].shape[-2:])

            # Filter out tracks that have too low person score
            inds = torch.gt(scores, self.detection_person_thresh).nonzero().view(-1)  
                # torch.gt : Computes \text{input} > \text{other}input>other element-wise.
        else:
            inds = torch.zeros(0).cuda()

        # inds가 있다면 해당 boxes, score를 det_pos, det_scores로 설정
        if inds.nelement() > 0:
            det_pos = boxes[inds]
            det_scores = scores[inds]
        else:
            det_pos = torch.zeros(0).cuda()
            det_scores = torch.zeros(0).cuda()

        ##################
        # Predict tracks #
        ##################
        '''
        카메라 Motion 기반 조정 및 box regression
        '''

        num_tracks = 0
        nms_inp_reg = torch.zeros(0).cuda()

        # tracks이 하나라도 있으면
        if len(self.tracks):

            # align 
            # 카메라 모션에 따라 active/inactive tracks의 position을 align
            if self.do_align:
                self.align(blob)

            # apply motion model
            if self.motion_model_cfg['enabled']:
                self.motion()
                self.tracks = [t for t in self.tracks if t.has_positive_area()]  # area가 양수인 것만 self.tracks로!

            # regress
            person_scores = self.regress_tracks(blob)

            if len(self.tracks):
                # NMS를 통해 keep할 track/inactive track 분리하기
                keep = nms(self.get_pos(), person_scores, self.regression_nms_thresh)
                self.tracks_to_inactive([self.tracks[i] for i in list(range(len(self.tracks))) if i not in keep])

                # do_reid 이면 appearance vector 구해서 track에 feature 넣어주기
                if keep.nelement() > 0 and self.do_reid:
                        new_features = self.get_appearances(blob)
                        self.add_features(new_features)

        #####################
        # Create new tracks #
        #####################

        # 새로운 트랙이 생성될 때 (이전에 트랙한 object가 아니라고 판단 되었을 때)

        # !!! Here NMS is used to filter out detections that are already covered by tracks. This is
        # !!! done by iterating through the active tracks one by one, assigning them a bigger score
        # !!! than 1 (maximum score for detections) and then filtering the detections with NMS.
        # !!! In the paper this is done by calculating the overlap with existing tracks, but the
        # !!! result stays the same.

        # detection된 것이 하나라도 있으면 track들과 NMS 해주기
        if det_pos.nelement() > 0:
            keep = nms(det_pos, det_scores, self.detection_nms_thresh)
            det_pos = det_pos[keep]
            det_scores = det_scores[keep]

            # 각 track에 대하여 det_pos와 비교
            # check with every track in a single run (problem if tracks delete each other)
            for t in self.tracks:
                nms_track_pos = torch.cat([t.pos, det_pos])  # track의 position과 det의 position concatenate
                nms_track_scores = torch.cat(
                    [torch.tensor([2.0]).to(det_scores.device), det_scores]) 
                keep = nms(nms_track_pos, nms_track_scores, self.detection_nms_thresh)  # nms : return the indices

                keep = keep[torch.ge(keep, 1)] - 1   # nms_track_pos에는 0번째 인덱스에 t.pos가 있기 때문에
                                                     # det_pos에서 keep할 index를 구해주기 위해서 
                                                     # 1보다 큰 keep(index)에 대해서 1 빼주기
                                                     # torch.ge : Computes input(왼)≥other(오) element-wise.        
                det_pos = det_pos[keep]
                det_scores = det_scores[keep]
                if keep.nelement() == 0:
                    break

        # 모든 track과의 nms 적용 후, 살아남은 det_pos가 있다면 initialize
        if det_pos.nelement() > 0:
            new_det_pos = det_pos
            new_det_scores = det_scores

            # re-ID matching을 해주고 matching되지 않은 (즉, 처음 나온) object를 리턴
            new_det_pos, new_det_scores, new_det_features = self.reid(blob, new_det_pos, new_det_scores)

            # 새로운 object에 대한 tracker를 만들어주고 self.tracks에 추가해줌
            if new_det_pos.nelement() > 0:
                self.add(new_det_pos, new_det_scores, new_det_features)

        ####################
        # Generate Results #
        ####################

        for t in self.tracks:
            if t.id not in self.results.keys():
                self.results[t.id] = {}
            self.results[t.id][self.im_index] = np.concatenate([t.pos[0].cpu().numpy(), np.array([t.score])])
            # 즉, t.id : {"im_index" : [t.pos[0].cpu().numpy(), np.array([t.score])], ...}

        # inactive track의 count_inactive +1 해주기
        for t in self.inactive_tracks:
            t.count_inactive += 1

        # 정해준 임계값 (inactive_patience) 보다 count_inactive 큰 경우는 remove
        self.inactive_tracks = [
            t for t in self.inactive_tracks if t.has_positive_area() and t.count_inactive <= self.inactive_patience
        ]

        self.im_index += 1  # image index
        self.last_image = blob['img'][0]

    def get_results(self):
        return self.results


class Track(object):
    """This class contains all necessary for every individual track."""

    def __init__(self, pos, score, track_id, features, inactive_patience, max_features_num, mm_steps):
        self.id = track_id
        self.pos = pos
        self.score = score
        self.features = deque([features])
        self.ims = deque([])
        self.count_inactive = 0
        self.inactive_patience = inactive_patience
        self.max_features_num = max_features_num
        self.last_pos = deque([pos.clone()], maxlen=mm_steps + 1)
        self.last_v = torch.Tensor([])
        self.gt_id = None

    def has_positive_area(self):
        return self.pos[0, 2] > self.pos[0, 0] and self.pos[0, 3] > self.pos[0, 1]

    def add_features(self, features):
        """Adds new appearance features to the object."""
        self.features.append(features)
        if len(self.features) > self.max_features_num:
            self.features.popleft()

    def test_features(self, test_features):
        """Compares test_features to features of this Track object"""
        if len(self.features) > 1:
            features = torch.cat(list(self.features), dim=0)
        else:
            features = self.features[0]
        features = features.mean(0, keepdim=True)
        dist = F.pairwise_distance(features, test_features, keepdim=True)
        return dist

    def reset_last_pos(self):
        self.last_pos.clear()
        self.last_pos.append(self.pos.clone())