In [1]:
import cv2
import numpy as np
import pdb

from Managers import *

# Kalman Filter Example

The number of dynamic paramters is the first number and the number of measurements measurements is the second parameter.

There is initial uncertainty over where the object can be. It is best to start at the top corner which seems to be the default initial state of everything.

In [2]:
class KalmanFilterCallback:
    def __init__(self):
        self.kalman_filter = cv2.KalmanFilter(4, 2)
        self.kalman_filter.measurementMatrix = np.array([[1, 0, 0, 0],
                                                         [0, 1, 0, 0]], dtype=np.float32)
        self.kalman_filter.transitionMatrix = np.array([[1, 0, 1, 0], 
                                                        [0, 1, 0, 1], 
                                                        [0, 0, 1, 0], 
                                                        [0, 0, 0, 1]], dtype=np.float32)
        self.kalman_filter.processNoiseCov = np.array([[1, 0, 0, 0], 
                                                       [0, 1, 0, 0], 
                                                       [0, 0, 1, 0], 
                                                       [0, 0, 0, 1]], dtype=np.float32) * 0.03
        # This set-up assumes constant x and y speed
        
        self.current_measurement = None
        self.last_measurement = None
        self.current_prediction = None
        self.last_prediction = None


    def kalmanUpdate(self, measurement):
        self.last_measurement = self.current_measurement
        self.current_measurement = measurement
        self.last_prediction = self.current_prediction
        
        self.kalman_filter.correct(measurement)
        self.current_prediction = self.kalman_filter.predict()
    
    
    def drawOnImage(self, image):
        if not self.last_measurement is None and not self.current_measurement is None:
            lm_x, lm_y = self.last_measurement
            cm_x, cm_y = self.current_measurement
            cv2.line(image, (int(lm_x), int(lm_y)), (int(cm_x), int(cm_y)), (0, 100, 0), 5)

        if not self.last_prediction is None and not self.current_prediction is None:
            lp_x, lp_y = self.last_prediction[0].item(), self.last_prediction[1].item()
            cp_x, cp_y = self.current_prediction[0].item(), self.current_prediction[1].item()
            cv2.line(image, (int(lp_x), int(lp_y)), (int(cp_x), int(cp_y)), (0, 0, 200), 5)
        
        return image
        
        
    def __call__(self, event, x, y, flags, param):
        if event == cv2.EVENT_LBUTTONDOWN:
            self.kalmanUpdate(np.array([x, y], dtype=np.float32))

In [3]:
class KalmanViewer:
    def __init__(self, kalman_filter):
        self.kalman_filter = kalman_filter

    def __call__(self, image):
        return self.kalman_filter.drawOnImage(image)

Using the Kalman Filter alongside visual tracking can give us information about what the object is doing and how fast it is moving.

In [9]:
kalman_callback = KalmanFilterCallback()
kalmanViewer = KalmanViewer(kalman_callback)

cam = CameraManager('Kalman Filter', cv2.VideoCapture(0), kalmanViewer, mouseCallback=kalman_callback).addCloseCallback()
cam.run()