In [34]:
import cv2
import numpy as np

from filterpy.kalman import KalmanFilter, rts_smoother
from filterpy.common import Q_discrete_white_noise

def setup_kalman_filter():
    kf = KalmanFilter(dim_x=4, dim_z=2)
    dt = 1/30  # Assuming 30 fps

    # Define the state transition matrix
    kf.F = np.array([[1, 0, dt, 0],
                     [0, 1, 0, dt],
                     [0, 0, 1, 0],
                     [0, 0, 0, 1]])

    # Define the observation matrix
    kf.H = np.array([[1, 0, 0, 0],
                     [0, 1, 0, 0]])

    kf.R = np.array([[0.001, 0.],
                     [0., 0.001]])

    kf.Q = np.array([[1, 0, 0, 0],   # Variance for x position
                     [0, 1, 0, 0],   # Variance for y position
                     [0, 0, 1, 0],     # Variance for x velocity
                     [0, 0, 0, 1]])    # Variance for y velocity

    kf.x = np.array([0., 0., 0., 0.])  # Initial state

    kf.P = np.array([[1, 0, 0, 0],    # Lower variance for position
                     [0, 1, 0, 0],
                     [0, 0, 1, 0],  # Higher variance for velocity
                     [0, 0, 0, 1]])

    return kf

In [1]:
import sys
import os
from tqdm import tqdm

path_append = '../../'
print(f'Adding path {os.path.abspath(path_append)} to sys.path')
sys.path.append(path_append)

from core.detections import load_video_detections

def process_video(video_path, kalman_filter):
    _, detections = load_video_detections(video_path, module='detect')

    last_position = None
    max_frames = 1000

    # Arrays to store states and covariances for each time step
    Xs = []
    Ps = []
    Qs = []
    Fs = []

    for idx, detection in enumerate(tqdm(detections[:max_frames])):
        closest_detection = None
        min_distance = float('inf')
        
        if len(detection) > 0:
            for bbox in detection.xyxy:
                x_center = (bbox[0] + bbox[2]) / 2
                y_center = (bbox[1] + bbox[3]) / 2
                distance = np.linalg.norm([x_center - last_position[0], y_center - last_position[1]]) if last_position is not None else 0

                if distance < min_distance:
                    min_distance = distance
                    closest_detection = (x_center, y_center)

            kalman_filter.update(np.array(closest_detection))

            Xs.append(kalman_filter.x.copy())
            Ps.append(kalman_filter.P.copy())
            Qs.append(kalman_filter.Q.copy())
            Fs.append(kalman_filter.F.copy())

            estimated_position = kalman_filter.x[:2]
        else:
            kalman_filter.predict()

            Xs.append(kalman_filter.x.copy())
            Ps.append(kalman_filter.P.copy())
            Qs.append(kalman_filter.Q.copy())
            Fs.append(kalman_filter.F.copy())

            estimated_position = kalman_filter.x[:2]

        last_position = estimated_position

    Xs = np.array(Xs)
    Ps = np.array(Ps)
    Fs = np.array(Fs)
    Qs = np.array(Qs)
    
    return Xs, Ps, Fs, Qs

Adding path /home/luis/workspace/vacocam to sys.path


In [36]:
kf = setup_kalman_filter()
video_path = "/home/luis/workspace/vacocam_render/source/prerender.mp4"

Xs, Ps, Fs, Qs = process_video(video_path, kf)

smoothed_states, *_ = rts_smoother(Xs, Ps, Fs, Qs)

video_capture = cv2.VideoCapture(video_path)

fourcc = cv2.VideoWriter_fourcc(*'XVID')
frame_width, frame_height = int(video_capture.get(3)), int(video_capture.get(4))
out = cv2.VideoWriter('kalman.avi', fourcc, 30, (frame_width, frame_height))

for idx, state in enumerate(smoothed_states):
    ret, frame = video_capture.read()
    if not ret:
        break

    cv2.circle(frame, (int(state[0]), int(state[1])), 10, (0, 255, 0), -1)
    out.write(frame)

video_capture.release()
out.release()

loading detections from /home/luis/workspace/vacocam_render/source/detect/prerender_detections.npy
loaded 112300 detections from /home/luis/workspace/vacocam_render/source/detect/prerender_detections.npy


100%|██████████| 1000/1000 [00:00<00:00, 38194.97it/s]
