In [1]:
import numpy as np
from typing import Tuple

In [2]:
class Kalaman_Filter:
    """ kalman Filter

    ### 2D Constant linear velocity model based kalman filter

    Usage:

        kf = Kalaman_Filter(old_x=1, old_y=1)
        kf.update(3, 3, 0.1)
        kf.update(4, 4, 0.3)
        kf.update(7, 7, 0.1)
        kf.predict_position()
        kf.predict_speed()
    
    """

    # Static Variable
    dt :int = 1
    A :np.ndarray = np.array([[1, dt],[0, 1]])
    
    # Constructor
    def __init__(self, dt = 1, initial_error = 1000 , old_x = 0, old_y = 0) -> None:
        self.dt = dt
        self.old_x = old_x
        self.old_y = old_y
        self.Xx = np.array([old_x, 0], dtype=np.float16)
        self.Xy = np.array([old_y, 0], dtype=np.float16)
        self.Px = np.eye(2, dtype=np.float16)*initial_error
        self.Py = np.eye(2, dtype=np.float16)*initial_error
    
    def __priori_estimate(self, X :np.ndarray) -> np.ndarray:
        return self.A.dot(X.transpose())
    
    def __estimate_covariance(self, P :np.ndarray) -> np.ndarray:
        return self.A.dot(P.dot(self.A.transpose()))
    
    def __calculate_kalman_gain(self, P :np.ndarray, R :np.ndarray) -> np.ndarray:
        return np.divide(P, (P+R), where=((P+R)!=0))
    
    def __postirior_estimate(self, Y :np.ndarray, K :np.ndarray, X :np.ndarray) -> np.ndarray:
        return (X + K.dot(Y.transpose() - X.transpose()).transpose())
    
    def __corrected_covariance(self, P :np.ndarray, K :np.ndarray) -> np.ndarray:
        return ((np.eye(2) - K).dot(P))
    
    def predict_position(self) -> Tuple[int, int]:
        return round(self.__priori_estimate(self.Xx)[0]), round(self.__priori_estimate(self.Xy)[0]) 
    
    def predict_speed(self) -> Tuple[int, int]:
        return self.__priori_estimate(self.Xx)[1], self.__priori_estimate(self.Xy)[1]

    def __diagonal_filter(self, K :np.ndarray) -> np.ndarray:
        top_left = np.array([[1, 0],[0, 0]])
        bottom_right = np.array([[0, 0],[0, 1]])
        return top_left.dot(K.dot(top_left)) + bottom_right.dot(K.dot(bottom_right))
    
    def update(self, x1 :int, y1 :int, error :float) -> None:
        self.Xx = self.__priori_estimate(self.Xx)
        self.Xy = self.__priori_estimate(self.Xy)
        self.Px = self.__estimate_covariance(self.Px)
        self.Py = self.__estimate_covariance(self.Py)
        Kx = self.__calculate_kalman_gain(self.Px, np.eye(2)*error*(x1-self.old_x))
        Ky = self.__calculate_kalman_gain(self.Py, np.eye(2)*error*(y1-self.old_y))
        Kx = self.__diagonal_filter(Kx)
        Ky = self.__diagonal_filter(Ky)
        self.Xx = self.__postirior_estimate(np.array([x1, (x1 - self.old_x)]), Kx, self.Xx)
        self.old_x = x1
        self.Xy = self.__postirior_estimate(np.array([y1, (y1 - self.old_y)]), Ky, self.Xy)
        self.old_y = y1
        self.Px = self.__corrected_covariance(self.Px, Kx)
        self.Py = self.__corrected_covariance(self.Py, Ky)


In [None]:
kf = Kalaman_Filter(old_x=750, old_y=273)
# kf.update(3, 3, 0.1)
# kf.update(4, 4, 0.3)
# kf.update(7, 7, 0.1)
# kf.update(9, 9, 0.1)
print('After Pos Update: ', kf.predict_position())
print('After Speed Update: ', kf.predict_speed())

In [110]:
class kalmanFilterTracker(Kalaman_Filter):
    def __init__(self, id, dt=1, initial_error=1000, old_x=0, old_y=0) -> None:
        super().__init__(dt, initial_error, old_x, old_y)
        self.time_since_update = 0
        self.id :int = id
    
    def update(self, x1: int, y1: int, error: float):
        super().update(x1, y1, error)
        self.time_since_update = 0

In [None]:
kf = kalmanFilterTracker(1, old_x=750, old_y=273)
kf.predict_position()

In [126]:
class SORT:
    INITIAL_ID :int = 0
    Tlost_max :int = 3
    iou_min :float = 0.3

    def __init__(self, Tlost_max = 3, iou_min = 0.3) -> None:
        self.Tlost_max = Tlost_max
        self.iou_min = iou_min
        self.tracker :list[kalmanFilterTracker] = []
    
    def __generate_tracker_predictions(self) -> np.ndarray:
        """ Returns (X_prediction, Y_Prediction, id) """
        predictions = np.zeros((len(self.tracker),3))
        for i, j in enumerate(self.tracker):
            x, y = j.predict_position()
            predictions[i][0] =  x
            predictions[i][1] =  y
            predictions[i][2] = j.id
        return predictions
    
    def __generate_id(self) -> int:
        self.INITIAL_ID += 1
        return self.INITIAL_ID
    
    def __intersection_over_union(self, boxA, boxB):
        xA = max(boxA[0], boxB[0])
        yA = max(boxA[1], boxB[1])
        xB = min(boxA[2], boxB[2])
        yB = min(boxA[3], boxB[3])
        interArea = max(0, xB - xA + 1) * max(0, yB - yA + 1)
        boxAArea = (boxA[2] - boxA[0] + 1) * (boxA[3] - boxA[1] + 1)
        boxBArea = (boxB[2] - boxB[0] + 1) * (boxB[3] - boxB[1] + 1)
        iou = interArea / float(boxAArea + boxBArea - interArea)
        return iou


    def __iou_bulk(self, model_predictions :np.ndarray, tracker_predictions :np.ndarray):
        if len(tracker_predictions) == 0:
            return np.zeros((model_predictions.shape[0], tracker_predictions.shape[0]))
        
        ret = np.zeros((model_predictions.shape[0], tracker_predictions.shape[0]))
        for i in range(model_predictions.shape[0]):
            for j in range(tracker_predictions.shape[0]):
                x1 = model_predictions[i][0]
                y1 = model_predictions[i][1]
                w = model_predictions[i][2]
                h = model_predictions[i][3]
                x2 = tracker_predictions[j][0]
                y2 = tracker_predictions[j][1]
                box1 = [x1, y1, x1+w, y1+h]
                box2 = [x2, y2, x2+w, y2+h]
                ret[i][j] = self.__intersection_over_union(box1, box2)
        return ret
    
    def update(self, model_predictions :np.ndarray) -> np.ndarray :
        """
        Takes in [[x, y, l, b, error_percentage]]
        Results [[x, y, id, tracker_index]]
        """
        result = np.concatenate((model_predictions.copy(), np.full((model_predictions.shape[0], 1), -1)), axis=1)
        tracker_predictions = self.__generate_tracker_predictions()
        iou_bulk = self.__iou_bulk(model_predictions, tracker_predictions)
        tracked_ids = []

        for i in range(iou_bulk.shape[1]):
            max_iou = iou_bulk[i].argmax()
            if iou_bulk[i][max_iou] >= self.iou_min and result[i][5] == -1:
                result[i][5] = tracker_predictions[max_iou][2]
                tracked_ids.append(result[i][5])
                for v in self.tracker:
                    if v.id == result[i][5]:
                        v.update(model_predictions[i][0], model_predictions[i][1], (1 - model_predictions[i][4])/100)
            else:
                result[i][5] = -1

        # creating ID's for not matching kalman filter
        for i in range(len(result)):
            if result[i][5] == -1:
                id = self.__generate_id()
                kf = kalmanFilterTracker(id, old_x=result[i][0], old_y=result[i][1])
                result[i][5] = id
                self.tracker.append(kf)
                tracked_ids.append(result[i][5])
        
        # Increase the strike and predict kalman filter, Handling excess tracker_predictions
        to_be_deleted = []
        for i, v in enumerate(self.tracker):
            if v.id not in tracked_ids:
                v.update(*v.predict_position(), 0.1)
                if v.time_since_update > self.Tlost_max :
                    to_be_deleted.append(i)
        
        for i in to_be_deleted:
            del self.tracker[i]
        
        return result

In [127]:
prediction1 = np.array([[ 536,365, 39, 35, 19],
 [ 858,436, 75, 61, 25],
 [ 444,326, 32, 22, 29],
 [ 377,413, 64, 48, 31],
 [ 678,305, 29, 21, 38],
 [1029,489,122, 88, 40],
 [ 496,327, 39, 27, 42],
 [ 565,313, 28, 20, 46],
 [ 750,273, 59, 57, 28],
 [ 200,441, 99, 54, 41]])

prediction2 = np.array([[ 536,365, 39, 35, 19],
 [ 858,436, 75, 61, 25],
 [ 444,326, 32, 22, 28],
 [ 377,413, 64, 48, 31],
 [ 678,305, 29, 21, 38],
 [1029,489,122, 88, 39],
 [ 496,327, 39, 27, 42],
 [ 565,313, 28, 20, 46],
 [ 750,273, 59, 57, 29],
 [ 200,441, 99, 54, 41]])

prediction2 = np.array([[ 536,365, 39, 35, 19],
 [ 858,436, 75, 61, 25],
 [ 444,326, 32, 22, 28],
 [ 377,413, 64, 48, 31],
 [ 678,305, 29, 21, 38],
 [1029,489,122, 88, 39],
 [ 496,327, 39, 27, 42],
 [ 565,313, 28, 20, 46],
 [ 750,273, 59, 57, 29],
 [ 210,481, 99, 54, 41]])

print('Prediction 1:\n', prediction1, '\n')
print('Prediction 2:\n', prediction2)

Prediction 1:
 [[ 536  365   39   35   19]
 [ 858  436   75   61   25]
 [ 444  326   32   22   29]
 [ 377  413   64   48   31]
 [ 678  305   29   21   38]
 [1029  489  122   88   40]
 [ 496  327   39   27   42]
 [ 565  313   28   20   46]
 [ 750  273   59   57   28]
 [ 200  441   99   54   41]] 

Prediction 2:
 [[ 536  365   39   35   19]
 [ 858  436   75   61   25]
 [ 444  326   32   22   28]
 [ 377  413   64   48   31]
 [ 678  305   29   21   38]
 [1029  489  122   88   39]
 [ 496  327   39   27   42]
 [ 565  313   28   20   46]
 [ 750  273   59   57   29]
 [ 210  481   99   54   41]]


In [128]:
A = SORT()
A.update(prediction1)
A.update(prediction2)

array([[ 536,  365,   39,   35,   19,    1],
       [ 858,  436,   75,   61,   25,    2],
       [ 444,  326,   32,   22,   28,    3],
       [ 377,  413,   64,   48,   31,    4],
       [ 678,  305,   29,   21,   38,    5],
       [1029,  489,  122,   88,   39,    6],
       [ 496,  327,   39,   27,   42,    7],
       [ 565,  313,   28,   20,   46,    8],
       [ 750,  273,   59,   57,   29,    9],
       [ 210,  481,   99,   54,   41,   11]])