In [1]:
import matplotlib.pyplot as plt
import numpy as np
from scipy import linalg as la
from scipy.stats import multivariate_normal as mvn
import pandas as pd
import numpy as np
import cv2

plt.style.use("seaborn-v0_8")

In [2]:
finger_tracking_data = pd.read_csv('data/kalman_task/txys_missingdata.csv')

In [3]:
def convert_coordinates(x, y, orig_res=(1980, 1080), new_res=(1280, 720)):
    orig_width, orig_height = orig_res
    new_width, new_height = new_res

    # Compute scaling factors for x and y dimensions.
    scale_x = new_width / orig_width
    scale_y = new_height / orig_height

    # Convert the coordinates to the new scale.
    x_new = x * scale_x
    y_new = y * scale_y

    return int(x_new), int(y_new)

def annotate_video(video_path, time, coordinates, output_video_path='kalman_filter_annotated_video.mp4'):
  
    # Open the video file using OpenCV
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print("Error: Unable to open video file.")
        return
    
    # Retrieve video properties for output video writing
    fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))

    idx = 0
    while True:
        ret, frame = cap.read()
        if not ret:
            break  # End of video

        # Current time in the video in milliseconds.
        current_time_ms = int(cap.get(cv2.CAP_PROP_POS_MSEC))

        if(idx<len(time) and current_time_ms == time[idx]):
            x = int(coordinates[idx, 0])  
            y = int(coordinates[idx, 1])

            x, y = convert_coordinates(x, y)
            # Draw a red circle as a marker
            cv2.circle(frame, (x, y), radius=10, color=(255, 0, 0), thickness=-1)
            idx += 1
        
        # Write the annotated frame to the output video
        out.write(frame)

        # Optionally, display the frame in a window
        cv2.imshow("Annotated Video", frame)
        if cv2.waitKey(1) & 0xFF == ord("q"):
            break

    cap.release()
    out.release()
    cv2.destroyAllWindows()
    print(f"Annotated video saved as {output_video_path}")


In [4]:
## We assume that finger movement speed is constant (as in behaviour is linear).
class KalmanFilter2d:

    def __init__(self, dt=100, process_noise_var=1e-2, measurement_noise_var=1e-1, x_init=[0.0, 0.0, 0.0, 0.0]):
        '''
            dt = time step in milli seconds
        '''
        self.dt = dt/1000 # converting to seconds
        self.R_var = measurement_noise_var
        self.Q_var = process_noise_var

        x_init = x_init
        self.x = np.array(x_init).reshape(4, 1)

        # dynamics matrix
        self.A = np.array([  [1, 0, self.dt, 0],
                             [0, 1, 0, self.dt],
                             [0, 0, 1,  0],
                             [0, 0, 0,  1]])

        # Sensor matrix
        self.C = np.array([ [1, 0, 0, 0],
                            [0, 1, 0, 0]])

        self.P = np.eye(4)  # covariance matrix for the state estimate

        # Process noise for covariance Q
        dt2 = self.dt ** 2
        dt3 = self.dt ** 3 / 2
        dt4 = self.dt ** 4 / 4
         
         # covariance matrix for the process disturbance
        self.Q = self.Q_var * np.array([[dt4,    0, dt3,    0],
                                        [0,    dt4,   0,  dt3],
                                        [dt3,    0, dt2,    0],
                                        [0,    dt3,   0,  dt2]])

        # Measurement noise covariance matrix
        self.R = self.R_var * np.eye(2)

    def __calculate_kalman_gain(self):
        S = self.C @ self.P @ self.C.T + self.R
        K = self.P @ self.C.T @ la.inv(S)
        return K

    def predict(self):
        
        self.x = self.A @ self.x
        self.P = self.A @ self.P @ self.A.T + self.Q

        return self.x
    
    def measure(self, y_hat):
        '''
            Update the measures based on the received measurements
        '''
        K = self.__calculate_kalman_gain()

        self.x = self.x + K @ (y_hat - self.C @ self.x)

        self.P = self.P - K @ self.C @ self.P

        return self.x

In [5]:
time = []
state_estimates = []
true_points = []
    
# Initialize the filter
dt = 100
kf = KalmanFilter2d(dt=dt, process_noise_var=0.001, measurement_noise_var=0.001)

previous_time = None
for index, row in finger_tracking_data.iterrows():
    current_time = row['t_ms']

    if(previous_time is not None and current_time != previous_time + dt):

        while(current_time != previous_time+dt):
            previous_time = previous_time + dt
            kf.predict()
            state_estimates.append(kf.x.flatten())
            time.append(previous_time)
            true_points.append(kf.x.flatten()[0:2])

    # Predict step
    kf.predict()

    if(previous_time is None or current_time == previous_time + dt):
        z = np.array([[row['x_px']], [row['y_px']]])
        kf.measure(z)
    
    previous_time = current_time
    
    # Save the current state estimate
    state_estimates.append(kf.x.flatten())
    time.append(current_time)
    true_points.append(np.array([[row['x_px']], [row['y_px']]]).flatten())

state_estimates = np.array(state_estimates)

In [20]:
annotate_video('data/kalman_task/hand_tracking_output.mov', time, np.array(state_estimates), output_video_path='kalman_annotated_video.mp4')

Annotated video saved as kalman_annotated_video.mp4
