# Particle Filter

A particle filter is a probabilistic estimation technique used in the field of Bayesian filtering and state estimation. It approximates the probability distribution of a system's state by representing it with a set of discrete, weighted particles (sample points). These particles evolve over time through a recursive process that incorporates measurements and dynamic models, allowing for effective tracking of dynamic systems with non-linear and non-Gaussian characteristics, such as object tracking in computer vision

Here we will implement a Particle Filter  for motion tracking

In [83]:
# imports
import cv2
import numpy as np

**Display the Video**

Create a function that reads the video. While the video is opened, we read the frames.

In [89]:
def get_frames(filename):
    video = cv2.VideoCapture(filename)
    while video.isOpened():
        ret, frame =video.read()
        if ret:
            yield frame
        else:
            break
    video.release()
    yield None

We begin by defining a function for particle visualization. Initially, we verify the existence of particles for display. If particles are present, we proceed to iterate through them, drawing circles at their respective pixel coordinates. Additionally, if a particle's location is available, we attempt to track it across the video and mark its position with a red circle.

In [90]:
def display(frame, particles, location, NUM_PARTICLES):
    if len(particles)> 0:
        for i in range(NUM_PARTICLES):
            x = int(particles[i,0])
            y = int(particles[i,1])
            cv2.circle(frame,(x,y),1,(0,255,0),1)
    if len(location) > 0:
        cv2.circle(frame,location,15,(0,0,255),5)
    cv2.imshow(frame)
    #stop the video if pressing the escape button
    if cv2.waitKey(30)==27:
        if cv2.waitKey(0)==27:
            return True
    return False


### **Intialize the particles**


In order to initialize the particles, we have to estimate the state of the target, meaning its position and velocity within the video. At the beginning of the video, we don’t know that state. All we know is that the position should lie within the frame somewhere, and the velocity could be in any direction but not moving too fast.

In [91]:
FILENAME = "./video_test.mp4"
NUM_PARTICLES = 5000
VEL_RANGE = 0.5
frame_height = 720
frame_width = 1280

We start by initializing the number of total particles and the initial velocity range to be a pixel per frame.



In [92]:
def initialize_particles(NUM_PARTICLES= NUM_PARTICLES,frame_width= frame_height,frame_height=frame_height,VEL_RANGE=VEL_RANGE):
    particles = np.random.rand(NUM_PARTICLES,4)
    particles = particles * np.array((frame_width,frame_height, VEL_RANGE,VEL_RANGE))
    particles[:,2:4] -= VEL_RANGE/2.0
    return particles

The particles are generated as an array, with each particle represented by a row consisting of four random numbers. The first two columns denote the particle's coordinates, constrained within the frame dimensions (ranging from zero to the frame size). The remaining two columns correspond to their velocities. Initially, the velocity is set at 0.5, with a centering adjustment to allow movement in both directions. This involves reducing the velocity range by half, effectively shifting velocities to be centered around zero.

Let's display the results:

- Define an empty list for the location of the particles
- Initialize the particle
- Display the results

In [None]:
location =[]
particles = initialize_particles()

for frame in get_frames(FILENAME):
    if frame is None:
      print('None')
      break
    terminate = display(frame, particles, location,NUM_PARTICLES)
    if terminate:
        break

cv2.destroyAllWindows()

### **Moving Particles**

As you can see from the video, the particles are not moving during video playlback even though they have a velocity. We solve this by creating a function apply_velocity in which we increment the particle's x and y velocity component.

In [34]:
def apply_velocity(particles):
    particles[:,0] += particles[:,2]
    particles[:,1] += particles[:,3]

    return particles

# Now, we can see the particles are moving according their velocity

In [35]:
location = []
particles = initialize_particles()


for frame in get_frames(FILENAME):
    if frame is None: break
    particles = apply_velocity(particles)

    terminate = display(frame, particles, location, NUM_PARTICLES)
    if terminate:
        break

cv2.destroyAllWindows()


### **Prevent Particles to fall off the edges**

In [36]:
# We prevent the particles to fall off the edges by putting some limit on the particles location.
# To do so, we will loop over the particles and set an upper and lower boundaries on both x and y coordinates.

def enforce_edges(particles):
    for i in range(NUM_PARTICLES):
        particles[i,0] = max(0,min(frame_width-1, particles[i,0]))
        particles[i,1] = max(0,min(frame_height-1, particles[i,1]))
    return particles

In [37]:
# Display result

location = []
particles = initialize_particles()

for frame in get_frames(FILENAME):
    if frame is None: break
    particles = apply_velocity(particles)
    particles = enforce_edges(particles)
    terminate = display(frame, particles, location,NUM_PARTICLES)
    if terminate:
        break

cv2.destroyAllWindows()

### **Measure the quality of the particle**

Imagine we aim to follow a person's elbow, which involves checking the color of the pixel beneath each particle and comparing it to the desired target color. To achieve this, we establish an array of zeros called "errors" to record the color disparities. We then proceed to examine every particle, calculating the color difference for each. This error is determined by computing the mean squared difference between the two colors.

In [38]:
TARGET_COLOR = np.array((66,63, 105))

def compute_errors(particles, frame):

    errors = np.zeros(NUM_PARTICLES)
    for i in range(NUM_PARTICLES):
        x = int(particles[i,0])
        y= int(particles[i,1])
        pixel_color = frame[y, x, :]
        errors[i] = np.sum((TARGET_COLOR - pixel_color)**2)

    return errors

### **Assign Weights**


The error plays a crucial role in determining the weight assigned to each particle. When the error is minimal, we desire a higher weight, indicating that a particle is positioned where the pixel color closely matches the target. To achieve this, we start by subtracting the errors array from the highest error value. Additionally, to prevent particles from clustering along the frame's edges, particles positioned at the edge are assigned a weight of zero.

In [40]:
def compute_weights(errors):
    weights = np.max(errors) - errors

    weights[
        (particles[:,0]==0) |
        (particles[:,0]==frame_width-1) |
        (particles[:,1]==0) |
        (particles[:,1]==frame_height-1) ] = 0

    return weights

### **Resample the weights**


By normalizing the weights to ensure their sum equals one, we can treat them as a probability distribution among the particles. Consequently, we construct a new particle array by sampling from the existing particles. Particles with high weights will have a greater likelihood of being selected multiple times, while those with low weights might not be chosen at all. This process is facilitated by the numpy function choice:

- The first argument is the sampling range (NUM_PARTICLES)
- The second argument is how many samples we have to take (we need as many sample as we have particles)
- The third argument is the probability distribution

In [41]:
def resample(particles, weights,NUM_PARTICLES):
    probabilities = weights / np.sum(weights)
    index_numbers = np.random.choice(
        NUM_PARTICLES,
        size=NUM_PARTICLES,
        p=probabilities)
    particles = particles[index_numbers, :]

    x = np.mean(particles[:,0])
    y = np.mean(particles[:,1])

    return particles, [int(x), int(y)]

The best guess is the mean over the new particles array

In [42]:
particles = initialize_particles(NUM_PARTICLES,frame_width,frame_height,VEL_RANGE)

for frame in get_frames(FILENAME):
    if frame is None: break
    particles = apply_velocity(particles)
    particles = enforce_edges(particles)
    errors = compute_errors(particles, frame)
    weights = compute_weights(errors)
    particles, location = resample(particles, weights,NUM_PARTICLES)

    terminate = display(frame, particles, location,NUM_PARTICLES)
    if terminate:
        break

cv2.destroyAllWindows()

However it wasn't quite a pixel on the target. We need to locate the target and keep tracking the target, even if it moves around the frame or the lighting conditions change. The solution for this is to just add some noise.

In [43]:
POS_SIGMA = 0.75 # standard deviation on position
VEL_SIGMA = 0.1 # standard deviation on velocity

def apply_noise(particles,POS_SIGMA,VEL_SIGMA,NUM_PARTICLES):
    noise= np.concatenate(
    (
        np.random.normal(0.0, POS_SIGMA, (NUM_PARTICLES,1)),
        np.random.normal(0.0, POS_SIGMA, (NUM_PARTICLES,1)),
        np.random.normal(0.0, VEL_SIGMA, (NUM_PARTICLES,1)),
        np.random.normal(0.0, VEL_SIGMA, (NUM_PARTICLES,1))

    ),
    axis=1)

    particles += noise
    return particles

In [44]:
particles = initialize_particles(NUM_PARTICLES,frame_width,frame_height,VEL_RANGE)

for frame in get_frames(FILENAME):
    if frame is None: break
    particles = apply_velocity(particles)
    particles = enforce_edges(particles)
    errors = compute_errors(particles, frame)
    weights = compute_weights(errors)
    particles, location = resample(particles, weights,NUM_PARTICLES)
    particles = apply_noise(particles,POS_SIGMA,VEL_SIGMA,NUM_PARTICLES)

    terminate = display(frame, particles, location,NUM_PARTICLES)
    if terminate:
        break

cv2.destroyAllWindows()

From the result. It looks like the particle cloud is distributed along different subject and not drawn in the target. So we have to make the weights more sensitive to color differences. One possible solution is to square the weights.

In [45]:
def compute_weights(errors):
    weights = np.max(errors) - errors

    weights[
        (particles[:,0]==0) |
        (particles[:,0]==frame_width-1) |
        (particles[:,1]==0) |
        (particles[:,1]==frame_height-1) ] = 0

    weights = weights**2

    return weights

In [46]:
particles = initialize_particles(NUM_PARTICLES,frame_width,frame_height,VEL_RANGE)

for frame in get_frames(FILENAME):
    if frame is None: break
    particles = apply_velocity(particles)
    particles = enforce_edges(particles)
    errors = compute_errors(particles, frame)
    weights = compute_weights(errors)
    particles, location = resample(particles, weights,NUM_PARTICLES)
    particles = apply_noise(particles,POS_SIGMA,VEL_SIGMA,NUM_PARTICLES)

    terminate = display(frame, particles, location,NUM_PARTICLES)
    if terminate:
        break

cv2.destroyAllWindows()

Now, let's increase the power of the weights to 8.




In [47]:
def compute_weights(errors):
    weights = np.max(errors) - errors

    weights[
        (particles[:,0]==0) |
        (particles[:,0]==frame_width-1) |
        (particles[:,1]==0) |
        (particles[:,1]==frame_height-1) ] = 0

    weights = weights**8

    return weights

In [48]:
particles = initialize_particles(NUM_PARTICLES,frame_width,frame_height,VEL_RANGE)


for frame in get_frames(FILENAME):
    if frame is None: break
    particles = apply_velocity(particles)
    particles = enforce_edges(particles)
    errors = compute_errors(particles, frame)
    weights = compute_weights(errors)
    particles, location = resample(particles, weights,NUM_PARTICLES)
    particles = apply_noise(particles,POS_SIGMA,VEL_SIGMA,NUM_PARTICLES)

    terminate = display(frame, particles, location,NUM_PARTICLES)
    if terminate:
        break

cv2.destroyAllWindows()

Now, it is much better, but we have some spreading of particle cloud and it takes long time to go to the target.