In [2]:
import cv2
import numpy as np
import matplotlib.pyplot as plt
import time
import json

In [None]:
# Capture video footage
videocapture = cv2.VideoCapture(0)

frame_width = int(videocapture.get(3))
frame_height = int(videocapture.get(4))

video_writer = cv2.VideoWriter(
    "./testview_2.mp4",
    cv2.VideoWriter_fourcc(*"avc1"),
    30,
    (frame_width, frame_height),
)

while True:
    frame_is_ok, current_frame = videocapture.read()
    if frame_is_ok:
        video_writer.write(current_frame)
        cv2.imshow("Frame", current_frame)

    if cv2.waitKey(1) & 0xFF == ord("s"):
        break

videocapture.release()
video_writer.release()

cv2.destroyAllWindows()

In [7]:
def get_middle(array):
    x = (array[0][0] + array[2][0]) // 2
    y = (array[0][1] + array[2][1]) // 2
    return np.asanyarray([x, y], dtype=int)


class MovingObject:
    id = None
    corner = None


class TrackingMovingObject:
    id = None
    corner = None
    millis_last_update = None
    detected_times = None


class TrackingEngine:
    MINIMUM_TRACKED = 3
    MINIMUM_TRACKED_TIME_SECOND = 1000 * 5
    MINIMUM_TO_EXPIRE_TRACK_OBJECT = 1000 * 10

    pending_objects: dict[str, TrackingMovingObject] = {}
    tracking_objects: dict[str, TrackingMovingObject] = {}

    def update(self, objects: list[MovingObject]):
        current_millis = time.time()

        self.remove_timeout_objects(current_millis)
        self.remove_timeout_tracked_object(current_millis)

        for moving_object in objects:
            id = moving_object.id
            corner = moving_object.corner

            object_pending = id in self.pending_objects
            object_tracking = id in self.tracking_objects

            if object_pending and not object_tracking:
                # Update pending object status
                pending_object = self.pending_objects[id]
                pending_object.detected_times += 1
                pending_object.corner = corner
                pending_object.millis_last_update = current_millis
            elif not object_pending and not object_tracking:
                # Add object into pending
                pending_object = TrackingMovingObject()
                pending_object.id = id
                pending_object.corner = corner
                pending_object.detected_times = 1
                pending_object.millis_last_update = current_millis
                self.pending_objects.update({id: pending_object})
            elif not object_pending and object_tracking:
                pending_object = self.tracking_objects[id]
                pending_object.detected_times += 1
                pending_object.corner = corner
                pending_object.millis_last_update = current_millis

        self.promote_object_to_tracking(current_millis)

    def remove_timeout_objects(self, current_millis: float):
        object_id_delete = []
        for id, object in self.pending_objects.items():
            less_than_tracked = object.detected_times < self.MINIMUM_TRACKED
            more_than_tracked_time = (
                current_millis - object.millis_last_update
            ) > self.MINIMUM_TRACKED_TIME_SECOND
            if less_than_tracked and more_than_tracked_time:
                object_id_delete.append(id)

        for id in object_id_delete:
            self.pending_objects.pop(id)

    def remove_timeout_tracked_object(self, current_millis: float):
        object_id_delete = []
        for id, object in self.tracking_objects.items():
            more_than_tracked_time = (
                current_millis - object.millis_last_update
            ) > self.MINIMUM_TO_EXPIRE_TRACK_OBJECT
            if more_than_tracked_time:
                object_id_delete.append(id)
        for id in object_id_delete:
            self.tracking_objects.pop(id)

    def promote_object_to_tracking(self, current_millis: float):
        object_id_to_promote = []
        for id, object in self.pending_objects.items():
            more_than_tracked = object.detected_times >= self.MINIMUM_TRACKED
            less_than_tracked_time = (
                current_millis - object.millis_last_update
            ) <= self.MINIMUM_TRACKED_TIME_SECOND
            if more_than_tracked and less_than_tracked_time:
                object_id_to_promote.append(id)

        for object_id in object_id_to_promote:
            self.tracking_objects.update({object_id: self.pending_objects[object_id]})
            self.pending_objects.pop(object_id)


class LapseEngine:
    def __init__(self) -> None:
        # self.camera = cv2.VideoCapture("testview.mp4")
        self.camera = cv2.VideoCapture(0)
        self.tracking_engine = TrackingEngine()
        self.qr_detector = cv2.aruco.ArucoDetector()
        self.tracking: dict[str, list[float]] = {}
        self.pending: dict[str, list[float]] = {}

    def visualisation(self, frame):
        for moving_object in self.tracking_engine.tracking_objects.values():
            color = np.random.uniform(low=0, high=255, size=(3,)).astype(int)
            frame = cv2.circle(
                frame,
                get_middle(moving_object.corner),
                10,
                (0, 255, 255),
                5,
            )

        # draw a single line in the middle
        frame = cv2.line(
            frame,
            (0, 1080 // 2 - 5),
            (1920, 1080 // 2 - 5),
            (0, 255, 0),
            thickness=1,
            lineType=cv2.LINE_4,
        )

        frame = cv2.line(
            frame,
            (0, 1080 // 2 + 5),
            (1920, 1080 // 2 + 5),
            (0, 255, 0),
            thickness=1,
            lineType=cv2.LINE_4,
        )
        return frame

    def count_lapse(self):
        for moving_object in self.tracking_engine.tracking_objects.values():
            passed_finish_line = get_middle(moving_object.corner)[1] > (1080 // 2)

            object_in_tracking = moving_object.id in self.tracking
            object_in_pending = moving_object.id in self.pending

            if not passed_finish_line and not object_in_pending:
                self.pending.update(
                    {moving_object.id: [moving_object.millis_last_update]}
                )
            elif passed_finish_line and object_in_pending and not object_in_tracking:
                self.pending.pop(moving_object.id)
                self.tracking.update(
                    {moving_object.id: [moving_object.millis_last_update]}
                )
                self.get_lapses()
            elif passed_finish_line and object_in_pending and object_in_tracking:
                self.pending.pop(moving_object.id)
                self.tracking[moving_object.id].append(moving_object.millis_last_update)
                self.get_lapses()

    def get_lapses(self):
        lapse_per_vehicle = {}
        for id, millis_list in self.tracking.items():
            lapses = []
            for i in range(1, len(millis_list)):
                lapses.append(millis_list[i] - millis_list[i - 1])
            lapse_per_vehicle.update({id: lapses})
        print(lapse_per_vehicle)

    def run(self):
        while True:
            ret, frame = self.camera.read()
            if not ret:
                break

            height, width, channel = frame.shape
            corners, ids, rejected = self.qr_detector.detectMarkers(frame)

            all_moving_objects: list[MovingObject] = []
            if len(corners) != 0:
                for marker_corner, marker_id in zip(corners, ids):
                    marker_id = marker_id.reshape(1)
                    marker_corner = marker_corner.reshape([4, 2])

                    moving_object = MovingObject()
                    moving_object.id = marker_id[0]
                    moving_object.corner = marker_corner

                    all_moving_objects.append(moving_object)

            self.tracking_engine.update(all_moving_objects)

            frame = self.visualisation(frame)
            self.count_lapse()

            cv2.imshow("Frame", frame)
            if cv2.waitKey(1) & 0xFF == ord("s"):
                break
        self.camera.release()
        cv2.destroyAllWindows()


lapse_engine = LapseEngine()
lapse_engine.run()

{16: []}
{16: [2.4762139320373535]}
{16: [2.4762139320373535, 2.0967681407928467]}
{16: [2.4762139320373535, 2.0967681407928467, 0.7321209907531738]}
{16: [2.4762139320373535, 2.0967681407928467, 0.7321209907531738, 0.7741189002990723]}
{16: [2.4762139320373535, 2.0967681407928467, 0.7321209907531738, 0.7741189002990723, 0.6564102172851562]}
{16: [2.4762139320373535, 2.0967681407928467, 0.7321209907531738, 0.7741189002990723, 0.6564102172851562, 0.7102279663085938]}
{16: [2.4762139320373535, 2.0967681407928467, 0.7321209907531738, 0.7741189002990723, 0.6564102172851562, 0.7102279663085938, 0.6325187683105469]}


KeyboardInterrupt: 

In [None]:
class KalmanFilter:
    def __init__(
        self,
    ):
        self.measurement_covariance = 20
        self.state_covariance = 1

        self.last_state_covariance = 25
        self.last_estimate_state = 0

    def updateEstimate(self, measurement):
        kalman_gain = (self.last_state_covariance) / (
            self.last_state_covariance + self.measurement_covariance
        )
        measurement_residual = measurement - self.last_estimate_state
        current_estimate = self.last_estimate_state + kalman_gain * measurement_residual

        # Update error estimate
        self.last_state_covariance = (
            1.0 - kalman_gain
        ) * self.last_state_covariance + abs(
            self.last_estimate_state - current_estimate
        ) * 0.1
        self.last_estimate_state = current_estimate

    def latestEstimate(self):
        return self.last_estimate_state


kalman_filter = KalmanFilter()
real_input = []
noisy_input = []
estimate_input = []
for i in range(100):
    noise_input = i + np.random.normal(-10, 10)
    kalman_filter.updateEstimate(noise_input)
    real_input.append(i)
    noisy_input.append(noise_input)
    estimate_input.append(kalman_filter.latestEstimate())

plt.plot(noisy_input, color="blue")
plt.plot(real_input, color="green")
plt.plot(estimate_input, color="red")