In [1]:
import cv2 as cv
import numpy as np
import io

import pyautogui
import time

from IPython.html import widgets
from ipywidgets import interact, interact_manual, IntProgress
from IPython.display import display, Image, clear_output
from PIL import Image as pil_image



# Utilities

In [2]:
def track_mouse(maximum = 2000, frequency = 0.01):
    f = IntProgress(min=0, max=maximum, description='Recording:', bar_style='info')
    result = []
    try:
        print('\nStart mouse tracking...')
        display(f)
        while True:
            x, y = pyautogui.position()
            result.append((x, y))
            time.sleep(frequency)
            f.value += 1
            if f.value == maximum:
                raise KeyboardInterrupt
    except KeyboardInterrupt:
        print('\nFinish mouse tracking.')
        return result

def display_frames_sequence(frames):
    def exec(i):
        a = np.uint8(frames[i])
        f = io.StringIO()

        imgByteArr = io.BytesIO()
        pil_image.fromarray(a, 'RGB').save(imgByteArr, format='PNG')
        display(Image(data=imgByteArr.getvalue()))
        
    widgets.interact(exec, i=widgets.IntSlider(min=0, max=len(frames) - 1, step=1, value=0))
    
def write_video(path, frames):
    fourcc = cv.VideoWriter_fourcc(*'MP4V')
    h, w = frames[0].shape[:2]
    out = cv.VideoWriter(path + ".mp4", fourcc, 60, (w, h)) 
    for frame in frames:
        out.write(cv.cvtColor(frame, cv.COLOR_BGR2RGB))
    cv.destroyAllWindows()
    out.release()
        
def draw_points(points, screen_size, frames, points_to_draw = 100, color=(0,255,0), point_size=6):
    recent = []
    for i, point in enumerate(points):
        img = frames[i] if i < len(frames) else np.zeros((screen_size[1],screen_size[0], 3), np.uint8)
        cv.circle(img, point, point_size, color, -11)
        # draw recent points
        for prev_point in recent:
            cv.circle(img, prev_point, point_size, color, -11)
        recent.append(point)
        if i > points_to_draw:
            recent.pop(0)
            
        if i < len(frames):
            frames[i] = img
        else:
            frames.append(img)
    return frames

# Create dataset

In [10]:
true_data = track_mouse(frequency=0.005)
screen_size = pyautogui.size()
print("Screen size: {}.\nTracked {} events".format(screen_size, len(true_data)))


Start mouse tracking...


IntProgress(value=0, bar_style='info', description='Recording:', max=2000)


Finish mouse tracking.
Screen size: Size(width=1440, height=900).
Tracked 2000 events


In [11]:
frames = draw_points(true_data, screen_size, [])
display_frames_sequence(frames)

interactive(children=(IntSlider(value=0, description='i', max=1999), Output()), _dom_classes=('widget-interact…

# Add random noise

Lets add some random noise to our true data. 

We assume that true value is a mean (expectation) of normal distribution with some variance), so we need to substitute the true value with some noisy one laying on this distrubution.

In [12]:
sigma = 20 # value defined by user
noisy_data = [(int(np.random.normal(x, sigma, 1)), int(np.random.normal(y, sigma, 1)))for (x, y) in true_data]

In [13]:
with_noise = draw_points(noisy_data, screen_size, frames, color=(255,0,0), point_size=3)
display_frames_sequence(with_noise)

interactive(children=(IntSlider(value=0, description='i', max=1999), Output()), _dom_classes=('widget-interact…

# Kalman filter

[Application example](https://www.cs.utexas.edu/~teammco/misc/kalman_filter/)

[Article with explenation and intuition for Kalman filter](https://www.bzarg.com/p/how-a-kalman-filter-works-in-pictures/)

**The Kalman Filter estimates the true state of an object given noisy input (input with some inaccuracy)**. In the case of this simulation, the Kalman Filter estimates the true position of your cursor when there is random input noise. It can also predict the future state using past readings (i.e. the most likely position of your cursor after n seconds).

In [14]:
class KalmanFilter:
    def __init__(self):
        # A Matrix - State Transition
        self.A = np.array([
            [1, 0, 0.2,  0],
            [0, 1, 0,  0.2],
            [0, 0, 1,    0],
            [0, 0, 0,    1]
        ])
        
        # H Matrix - Measurement
        self.H = np.array([
            [1, 0, 1, 0],
            [0, 1, 0, 1],
            [0, 0, 0, 0],
            [0, 0, 0, 0]
        ])
        
        # Q Matrix - Action Uncertainty
        self.Q = np.diag([0, 0, .1, .1])

        # R Matrix - Sensor Noise
        self.R = np.diag([.1, .1, .1, .1])
        
        self.x = np.array([0, 0, 0, 0])
        
        # P Matrix - prediction matrix
        self.P = np.zeros((4, 4))
        
        self.I = np.identity(4)
    
    def predict(self):
        self.x = self.A.dot(self.x)
        self.P = self.A.dot(self.P).dot(self.A.T) + self.Q
        return self.x
    
    def correct(self, x, y, prev_x, prev_y):
        """
            x, y - coordinates of current position
            prev_x, prev_y - coordinates of previous position (used for calculation the velocity). 
            we assume that position and velocity correlate
        """
        m = np.array([x, y, x - prev_x, y - prev_y])
        
        self.S = self.H.dot(self.P).dot(self.H.T) + self.R
        K = self.P.dot(self.H.T).dot(np.linalg.inv(self.S))
        Y = m - self.H.dot(self.x)
        self.x = self.x + K.dot(Y)
        self.P = (self.I - K.dot(self.H)).dot(self.P)
    
    @staticmethod
    def run(measurements):
        kf = KalmanFilter()
        predictions = []
        prev_x, prev_y = 0, 0
        for (x, y) in measurements:
            pred_x, pred_y, _, _ = kf.predict()
            predictions.append((int(pred_x), int(pred_y)))
            kf.correct(x, y, prev_x, prev_y)
            prev_x = x
            prev_y = y
        return predictions

# Test

In [15]:
estimated = KalmanFilter.run(noisy_data)
with_estimated = draw_points(estimated, screen_size, frames, color=(0,0,255))
display_frames_sequence(with_estimated)

interactive(children=(IntSlider(value=0, description='i', max=1999), Output()), _dom_classes=('widget-interact…

# Save result of the experiment

In [16]:
write_video('result', with_estimated)