In [1]:
# Implementing particle filter for pacman tracking

In [2]:
import random
import matplotlib.pyplot as plt
import cv2
import numpy as np

In [3]:

# Weights for each ghost based on how close they are to Pacman
def manhattanweights(g1, g2, g3, g4, pc):
    w1 = 1 / (abs(g1[0] - pc[0]) + abs(g1[1] - pc[1]) + 1e-7)
    w2 = 1 / (abs(g2[0] - pc[0]) + abs(g2[1] - pc[1]) + 1e-7)
    w3 = 1 / (abs(g3[0] - pc[0]) + abs(g3[1] - pc[1]) + 1e-7)
    w4 = 1 / (abs(g4[0] - pc[0]) + abs(g4[1] - pc[1]) + 1e-7)
    return [w1, w2, w3, w4]

# Motion model for ghosts
def motion_model_ghosts(g,pc,prev_action,allow_motion):

    ad = au = ar = al = 1
    if g[0] > pc[0]:
        al += (g[0] - pc[0])
    elif g[0] < pc[0]:
        ar += (pc[0] - g[0])
    if g[1] > pc[1]:
        au += (g[1] - pc[1])
    elif g[1] < pc[1]:
        ad += (pc[1] - g[1])  

    if prev_action == "r":
        ar = ar * 1.25
    elif prev_action == "l":
        al = al * 1.25
    elif prev_action == "u":
        au = au * 1.25
    elif prev_action == "d":
        ad = ad * 1.25


    # normalize probabilities
    sum = al + ad + au + ar
    p_l = al/sum
    p_r = ar/sum
    p_u = au/sum
    p_d = ad/sum

    # Given Dictionary of allowable motions, if cannot move direction, split the probabilities to other directions
    if allow_motion['u'] == 0:
        p_r += 0.4*p_u
        p_l += 0.4*p_u
        p_d += 0.2*p_u
        p_u = 0
    
    if allow_motion['d'] == 0:
        p_r += 0.4*p_d
        p_l += 0.4*p_d
        p_u += 0.2*p_u
        p_d = 0
    
    if allow_motion['r'] == 0:
        p_u += 0.4*p_r
        p_d += 0.4*p_r
        p_l += 0.2*p_r
        p_r = 0
    
    if allow_motion['l'] == 0:
        p_u += 0.4*p_l
        p_d += 0.4*p_l
        p_r += 0.2*p_l
        p_l = 0
        
    # Returns Probabilties of motion in each direction
    return p_l,p_r,p_u,p_d

# Motion model for Pacman
def motion_model(g1, g2, g3, g4, pc, prev_action, allow_motion):
    
    ad = ar = al = au = 1
    wl = manhattanweights(g1, g2, g3, g4, pc)
    gl = [g1, g2, g3, g4]

    # weighted sum of ghosts position in each direction
    # higher sum, indicates ghosts closer to pacman in respective direction
    for g, w in zip(gl, wl):
        if g[0] > pc[0]:
            al += w / (g[0] - pc[0] + 1e-7)
        elif g[0] < pc[0]:
            ar += w / (pc[0] - g[0] + 1e-7)
        if g[1] > pc[1]:
            au += w / (g[1] - pc[1] + 1e-7)
        elif g[1] < pc[1]:
            ad += w / (pc[1] - g[1] + 1e-7)

    # Inverse to obtain higher values indicate more probable direction of motion
    ad = 1 / ad
    au = 1 / au
    ar = 1 / ar
    al = 1 / al

    # Likely to repeat action, so scale up probability
    if prev_action == "r":
        ar = ar * 1.25
    elif prev_action == "l":
        al = al * 1.25
    elif prev_action == "u":
        au = au * 1.25
    elif prev_action == "d":
        ad = ad * 1.25

    # normalize probabilities
    sum = al + ad + au + ar
    p_l = al / sum
    p_r = ar / sum
    p_u = au / sum
    p_d = ad / sum

    # Given Dictionary of allowable motions, if cannot move direction, split the probabilities to other directions
    if allow_motion["u"] == 0:
        p_r += 0.4 * p_u
        p_l += 0.4 * p_u
        p_d += 0.2 * p_u
        p_u = 0

    if allow_motion["d"] == 0:
        p_r += 0.4 * p_d
        p_l += 0.4 * p_d
        p_u += 0.2 * p_u
        p_d = 0

    if allow_motion["r"] == 0:
        p_u += 0.4 * p_r
        p_d += 0.4 * p_r
        p_l += 0.2 * p_r
        p_r = 0

    if allow_motion["l"] == 0:
        p_u += 0.4 * p_l
        p_d += 0.4 * p_l
        p_r += 0.2 * p_l
        p_l = 0


    # Returns Probabilties of motion in each direction
    return p_l, p_r, p_u, p_d

In [4]:
# Extracting Information out of the image like observation of Entity

class Extract:
    """
    Extracts all relevant information from the current frame
    """

    def __init__(self):
        self.object_colors = {
            "pacman": {"lower": (25, 100, 100), "upper": (35, 255, 255)},  # Yellow
            "red_ghost": [
        {"lower": (0, 100, 100), "upper": (5, 255, 255)},   # Red range 1
        {"lower": (160, 100, 100), "upper": (180, 255, 255)}  # Red range 2
    ],  
            "cyan_ghost": {"lower": (80, 100, 100), "upper": (100, 255, 255)},  # Cyan
            "pink_ghost": {"lower": (110, 20, 100), "upper": (160, 110, 255)},  # Pink
            "orange_ghost": {"lower": (5, 20, 100), "upper": (20, 255, 255)},  # Orange
        }
        self.ghost_marker_colors = {
            "pacman": (0, 255, 255),
            "red_ghost": (0, 0, 255),
            "cyan_ghost": (255, 255, 0),
            "pink_ghost": (255, 0, 255),
            "orange_ghost": (0, 100, 255),
        }
        self.movements = {
            1: "up",
            -1: "down",
            -10: "left",
            10: "right",
            11: "upright",
            -11: "downleft",
            9: "upleft",
            -9: "downright",
            0: "nomo",
        }

    def find_all_centers(self,mask: cv2.Mat,entity ):
        """
        Find all centers of the object in the mask

        Args:
            mask: binary mask of the object

        Returns:
            centers: list of centers of the object
        """

        contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        centers = []
        for contour in contours:
            M = cv2.moments(contour)
            if M["m00"] != 0:
                cx = int(M["m10"] / M["m00"])
                cy = int(M["m01"] / M["m00"])
                
                # Calculate area
                area = M["m00"]
                
                if entity == "pacman":
                    centers.append((cx,cy))
                if area > 250 :  # Adjust thresholds as needed to only recognize entity centers
                    centers.append((cx, cy))
        return centers

    def bgr_to_hsv(self, frame: cv2.Mat):
        """converts BGR image to HSV image"""
        return cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)

    def create_combined_mask(self, hsv_frame: cv2.Mat, color_ranges):
        """
        Creates a combined mask from multiple color ranges.

        Args:
            hsv_frame: current frame in HSV.
            color_ranges: list or dict of lower and upper HSV bounds.

        Returns:
            combined_mask: a binary mask combining all ranges.
        """
        combined_mask = None
        if isinstance(color_ranges, list):
            for color_range in color_ranges:
                mask = cv2.inRange(hsv_frame, np.array(color_range["lower"]), np.array(color_range["upper"]))
                combined_mask = mask if combined_mask is None else cv2.bitwise_or(combined_mask, mask)
        else:
            combined_mask = cv2.inRange(hsv_frame, np.array(color_ranges["lower"]), np.array(color_ranges["upper"]))
        return combined_mask

    def extract_locations(self, frame: cv2.Mat, multi=False, remove_spawn_point=True):
        """
        Extracts the locations of all objects in the frame.

        Args:
            frame: current frame in BGR.
            multi: whether to extract multiple objects of the same type.
            remove_spawn_point: whether to remove the spawn point from the frame.

        Returns:
            positions: dictionary containing the positions of all objects.
        """
        positions = {}
        if remove_spawn_point:
            frame[220:240, 170:190, :] = 0
        hsv = self.bgr_to_hsv(frame)

        for name, color_ranges in self.object_colors.items():
            combined_mask = self.create_combined_mask(hsv, color_ranges)
            center = self.find_all_centers(combined_mask,name) 
            positions[name] = center

        return positions

    def extract_movement(self, positions: dict, prev_positions: dict):
        """
        Extracts the movement of all objects in the frame

        Args:
            frame: current frame in BGR
            prev_positions: dictionary containing the positions of all objects in the previous frame

        Returns:
            movements: dictionary containing the movements of all objects
        """
        movements = {}
        for name in positions:
            if name in prev_positions:
                prev_position = prev_positions[name]
                if prev_position is not None and positions[name] is not None:
                    movements[name] = self.movements[
                        np.sign(positions[name][0] - prev_position[0]) * 10
                        + np.sign(positions[name][1] - prev_position[1])
                    ]
                else:
                    movements[name] = None
            else:
                movements[name] = None
        return movements

    def valid_entity_movements(self, frame: cv2.Mat, entity_center: tuple, offset=20):
        """
        Checks for walls in the immediate vicinity of Pac-Man (up, down, left, right).

        Args:
            frame: Current frame in BGR
            pacman_center: (cx, cy) center of Pac-Man
            offset: How many pixels away from the center to check for a wall

        Returns:
            A dictionary indicating whether pacman can move in each direction.
            e.g. {"up": True, "down": False, "left": True, "right": False}
        """
        try:
            # If we have no entity detected, just return False in all directions
            if entity_center is None:
                return {"up": False, "down": False, "left": False, "right": False}

            hsv = self.bgr_to_hsv(frame)

            # Define a rough HSV range for the blue walls
            walls_lower = (120, 100, 30)
            walls_upper = (130, 255, 255)

            # Create a mask that highlights the walls
            walls_mask = cv2.inRange(hsv, walls_lower, walls_upper)

            cx, cy = entity_center
            height, width = walls_mask.shape

            # Safeguard boundaries
            up_y = max(cy - offset, 0)
            down_y = min(cy + offset, height - 1)
            left_x = max(cx - offset, 0)
            right_x = min(cx + offset, width - 1)

            # Check pixel values in rectangle in each direction

            up_wall_matrix = walls_mask[up_y : cy - 10, cx - 10 : cx + 10]
            down_wall_matrix = walls_mask[cy + 10 : down_y, cx - 10 : cx + 10]
            left_wall_matrix = walls_mask[cy - 10 : cy + 10, left_x : cx - 10]
            right_wall_matrix = walls_mask[cy - 10 : cy + 10, cx + 10 : right_x]

            # Boolean values for if Entity blocked by wall in a direction
            up_wall = ~np.any(up_wall_matrix)
            down_wall = ~np.any(down_wall_matrix)
            left_wall = ~np.any(left_wall_matrix)
            right_wall = ~np.any(right_wall_matrix)

            return {
                "up": up_wall,
                "down": down_wall,
                "left": left_wall,
                "right": right_wall,
            }
        except Exception as e:
            print(e)
            print(entity_center)
            return {"up": False, "down": False, "left": False, "right": False}
        


In [5]:
# ParticleFilter for Pac-Man


class ParticleFilter:
    def __init__(
        self,
        num_particles,
        frame_width,
        frame_height,
        extract_instance,
        spawn_point=(190, 315),
        init_prev_action="r",
        init_method="random",
        resample_method="multinomial",
        no_detection_randomise=False,
        motion_noise_std=3.0,  # <--- Gaussian noise for motion
        measurement_noise_px=2,  # <--- Pixel noise for measurements
    ):
        """
        Particle Filter for tracking Pac-Man

        Args:
            num_particles (int): Number of particles
            frame_width (int): Width of the game frame
            frame_height (int): Height of the game frame
            extract_instance (Extract): An instance of the Extract class
            init_prev_action (str): An initial guess for Pac-Man's previous action
            init_method (str): Method to initialize particles ('random' or 'center')
            no_detection_randomise (bool): Whether to randomize particles in case of no detection
            motion_noise_std (float): Standard deviation of the Gaussian noise added
                                      to each motion step
            measurement_noise_px (int): Max random integer offset added to each
                                        measured detection location
        """
        self.num_particles = num_particles
        self.frame_width = frame_width
        self.frame_height = frame_height
        self.extract = extract_instance
        self.init_prev_action = init_prev_action
        self.init_method = init_method
        self.no_detection_randomise = no_detection_randomise
        self.spawn_point = spawn_point
        self.resample_method = resample_method
        self.motion_noise_std = motion_noise_std
        self.measurement_noise_px = measurement_noise_px

        # Each particle: (x, y, prev_action)
        self.entities = ["pacman", "red_ghost", "cyan_ghost", "pink_ghost", "orange_ghost"]
        self.particles = {entity: [] for entity in self.entities}
        self.weights = {entity: np.ones(num_particles, dtype=np.float32) / num_particles for entity in self.entities}

        for entity in self.entities:
            self.initialize_particles(entity)

    def initialize_particles(self,entity):
        """Initialize particles in the valid region of the frame."""
        self.particles[entity] = []
        for _ in range(self.num_particles):
            if self.init_method == "random":
                x = random.randint(0, self.frame_width - 1)
                y = random.randint(0, self.frame_height - 1)
            elif self.init_method == "center":
                x = self.frame_width // 2
                y = self.frame_height // 2
            elif self.init_method == "spawn_point" and entity == "pacman":
                if random.random() < 0.5:
                    x = random.randint(
                        self.spawn_point[0] - 40, self.spawn_point[0] + 50
                    )
                    y = random.randint(
                        self.spawn_point[1] - 25, self.spawn_point[1] + 25
                    )
                else:
                    x = random.randint(0, self.frame_width - 1)
                    y = random.randint(0, self.frame_height - 1)
            else:
                x = random.randint(0, self.frame_width - 1)
                y = random.randint(0, self.frame_height - 1)

            self.particles[entity].append((x, y, self.init_prev_action))

        self.particles[entity] = np.array(self.particles[entity], dtype=object)
    



    def step(self, frame, prev_frame_positions):
        """
        Main PF step for one iteration.

        1) Extract ghost positions from the current frame
        2) Determine allowed movements for Entity's (walls, etc.)
        3) Motion update: apply motion_model to each particle + motion noise
        4) Observation update: weigh each particle by how well it matches the detection
           (with measurement noise)
        5) Resample
        6) Estimate the best Entity's position from the particles

        Args:
            frame (np.ndarray): Current game frame (BGR)
            prev_frame_positions (dict): Positions from the previous frame
                                         

        """
        

        ghost_keys = ["red_ghost", "cyan_ghost", "pink_ghost", "orange_ghost"]
        
        # Best Guess for Pacman
        if prev_frame_positions and prev_frame_positions.get("pacman") is not None:
            # Might be a list of centers or a single center
            prev_pacman_pos = prev_frame_positions["pacman"]
            if len(prev_pacman_pos) == 0:
                px = int(np.mean(self.particles["pacman"][:, 0]))
                py = int(np.mean(self.particles["pacman"][:, 1]))
                best_guess_pacman = (px, py)
            elif isinstance(prev_pacman_pos, list) and len(prev_pacman_pos) > 0:
                best_guess_pacman = prev_pacman_pos[0]
            else:
                best_guess_pacman = prev_pacman_pos
        else:
            # If no info, compute average of the current particles
            px = int(np.mean(self.particles["pacman"][:, 0]))
            py = int(np.mean(self.particles["pacman"][:, 1]))
            best_guess_pacman = (px, py)


        # Best Guess for all Ghosts
        ghostspos = {}
        for gkey in ghost_keys:
            if prev_frame_positions and prev_frame_positions.get(gkey) is not None and self.particles[gkey] is not None:
                
                # Might be a list of centers or a single center
                prev_ghost_pos = prev_frame_positions[gkey]
                if prev_ghost_pos is not None and isinstance(prev_ghost_pos, list) and len(prev_ghost_pos) > 0:
                    bestgpos = prev_ghost_pos[0]
                else:
                    bestgpos = prev_ghost_pos
            else:
                # If no info, compute average of the current particles
                if self.particles[gkey] is not None:
                    px = int(np.mean(self.particles[gkey][:, 0]))
                    py = int(np.mean(self.particles[gkey][:, 1]))
                    bestgpos = (px, py)
                else:
                    bestgpos = None
            ghostspos[gkey] = bestgpos

        # Compute allowable direction of motion for each entity based on best guess
        allow_dict = self._compute_allow_dict(frame, best_guess_pacman)
        ghostallow = {}
        for gkey in ghost_keys:
            ghostallow[gkey] = self._compute_allow_dict(frame, ghostspos[gkey])
            
        ghosts = []
        for gkey in ghost_keys:
            
            if len(ghostspos[gkey]) == 0 :
                ghosts.append((999,999)) 
                # If no Ghost in previous frame
            else:
                ghosts.append(ghostspos[gkey])

        # Motion Update for Each Entity
        self._motion_update(ghosts, allow_dict)
        for gkey in ghost_keys:
            if self.particles[gkey] is not None:
                self.ghost_motion(gkey,best_guess_pacman,ghostallow[gkey])
        motion_update_positions = self.particles.copy()
        
        # Observation Update for Each Entity
        positions = {}
        for entity in self.entities:
            positions[entity] = self._observation_update(frame,entity)
            
        
        # Resample all particles
        for entity in self.entities:
            if self.particles[entity] is not None:
                self._resample(entity,method=self.resample_method)
        observation_update_positions = self.particles.copy()

        # Compute best estimate of all particles
        best_estimate = {}
        for entity in self.entities:
            if self.particles[entity] is not None:
                best_estimate[entity] = self.estimate(entity)
            else:
                best_estimate[entity] = None

        return best_estimate, positions,motion_update_positions,observation_update_positions

    def ghost_motion(self,ghost,pc,allow_motion_dict):
        
        updated_particles = []
        for i, (x, y, prev_act) in enumerate(self.particles[ghost]):
            ghostposition = (x, y)
            # motion_model returns p_left, p_right, p_up, p_down
            p_l, p_r, p_u, p_d = motion_model_ghosts(
                ghostposition,pc,prev_act,allow_motion_dict
            )

            # Normalize if necessary
            probs = np.array([p_l, p_r, p_u, p_d])
            probs_sum = probs.sum()
            if probs_sum > 0:
                probs /= probs_sum
            else:
                # fallback: uniform
                probs = np.ones(4) / 4.0
            action = np.random.choice(["l", "r", "u", "d"], p=probs)

            # Move the particle
            new_x, new_y = x, y
            if action == "l":
                new_x = x - 2
            elif action == "r":
                new_x = x + 2
            elif action == "u":
                new_y = y - 2
            elif action == "d":
                new_y = y + 2

            # Clamp bounds
            new_x = max(0, min(self.frame_width - 1, new_x))
            new_y = max(0, min(self.frame_height - 1, new_y))
            

            # Add motion noise (Gaussian).
            # Clamp again in case noise pushes it out of bounds.
            noise_dx = int(round(random.gauss(0, self.motion_noise_std)))
            noise_dy = int(round(random.gauss(0, self.motion_noise_std)))

            new_x = max(0, min(self.frame_width - 1, new_x + noise_dx))
            new_y = max(0, min(self.frame_height - 1, new_y + noise_dy))    

            updated_particles.append((new_x, new_y, action))

        # Change Particle Set to newly modified set baed on movement
        self.particles[ghost] = np.array(updated_particles, dtype=object)
        
    def _motion_update(self, ghosts, allow_motion_dict):
        
        updated_particles = []
        for i, (x, y, prev_act) in enumerate(self.particles["pacman"]):
            pc = (x, y)
            g1, g2, g3, g4 = ghosts
            # motion_model returns p_left, p_right, p_up, p_down
            p_l, p_r, p_u, p_d = motion_model(
                g1, g2, g3, g4, pc, prev_act, allow_motion_dict
            )

            # Normalize if necessary
            probs = np.array([p_l, p_r, p_u, p_d])
            probs_sum = probs.sum()
            if probs_sum > 0:
                probs /= probs_sum
            else:
                # fallback: uniform
                probs = np.ones(4) / 4.0

            action = np.random.choice(["l", "r", "u", "d"], p=probs)

            # Move the particle
            new_x, new_y = x, y
            if action == "l":
                new_x = x - 2
            elif action == "r":
                new_x = x + 2
            elif action == "u":
                new_y = y - 2
            elif action == "d":
                new_y = y + 2

            # Clamp bounds
            new_x = max(0, min(self.frame_width - 1, new_x))
            new_y = max(0, min(self.frame_height - 1, new_y))

            # Add motion noise (Gaussian).
            # Clamp again in case noise pushes it out of bounds.
            noise_dx = int(round(random.gauss(0, self.motion_noise_std)))
            noise_dy = int(round(random.gauss(0, self.motion_noise_std)))

            new_x = max(0, min(self.frame_width - 1, new_x + noise_dx))
            new_y = max(0, min(self.frame_height - 1, new_y + noise_dy))

            updated_particles.append((new_x, new_y, action))

        self.particles["pacman"] = np.array(updated_particles, dtype=object)

    def _observation_update(self, frame,entity):
        """
        Weigh each particle by how well it matches the detected positions of Entity.
        If multiple centers are found, we split probability among them.

        Adds measurement noise by artificially jittering the positions we get from
        extract_locations. This simulates uncertain detection.
        """
        positions = self.extract.extract_locations(
            frame, multi=True, remove_spawn_point=False
        )
        entity_centers = positions[entity]

        # If no detection, randomize few particles 
        if self.no_detection_randomise and not entity_centers:
            for i, particle in enumerate(self.particles[entity]):
                if random.random() < 0.3:
                    particle[0] = random.randint(0, self.frame_width - 1)
                    particle[1] = random.randint(0, self.frame_height - 1)
                    self.weights[entity][i] *= 0.1
                else:
                    self.weights[entity][i] *= 0.8
            return 
        
        #  If no detection, only reduce the weights
        if not self.no_detection_randomise and not entity_centers:
            self.weights[entity] *= 0.1
            return 
        
        if self.particles[entity] is None:
            self.weights[entity] *= 0.8
            return 

        # Inject measurement noise into each detected center
        noisy_centers = []
        for c in entity_centers:
            # small random offset in x and y
            # ± self.measurement_noise_px
            nx = c[0] + random.randint(
                -self.measurement_noise_px, self.measurement_noise_px
            )
            ny = c[1] + random.randint(
                -self.measurement_noise_px, self.measurement_noise_px
            )
            # clamp to bounds
            nx = max(0, min(self.frame_width - 1, nx))
            ny = max(0, min(self.frame_height - 1, ny))
            noisy_centers.append((nx, ny))

        match_threshold = 15
        num_centers = len(noisy_centers)
        center_prob = 1.0 / num_centers

        for i, (x, y, _) in enumerate(self.particles[entity]):
            # Find distance to the closest noisy center
            min_dist = float("inf")
            for c in noisy_centers:
                dist = np.hypot(x - c[0], y - c[1])
                if dist < min_dist:
                    min_dist = dist

            if min_dist < match_threshold:
                # The closer, the higher the weighting
                self.weights[entity][i] *= (
                    1.0 + center_prob * (match_threshold - min_dist) / match_threshold
                )
            else:
                # If cant be associated, weight is reduced
                self.weights[entity][i] *= 0.5

        # Normalize weights
        weight_sum = np.sum(self.weights[entity])
        if weight_sum > 0:
            self.weights[entity] /= weight_sum
        else:
            self.weights[entity][:] = 1.0 / len(self.weights[entity])

        return positions[entity]

    def _resample(self, entity,method="multinomial"):
        """
        Systematic or multinomial resampling of the particles
        based on current weights.
        """
        N = self.num_particles
        new_particles = []
        # normalized weights
        self.weights[entity] = self.weights[entity] / np.sum(self.weights[entity])

        if method == "multinomial":
            indices = np.random.choice(range(N), size=N, p=self.weights[entity])
            for idx in indices:
                new_particles.append(self.particles[entity][idx])

            self.particles[entity] = np.array(new_particles, dtype=object)
            self.weights[entity] = np.ones(N, dtype=np.float32) / N
        elif method == "systematic":
            indices = np.zeros(N, dtype=int)
            r = np.random.rand() / N
            c = self.weights[entity][0]
            i = 0
            for m in range(N - 1):
                U = r + m / N
                while U > c:
                    i += 1
                    c += self.weights[entity][i]
                indices[m] = i

            for idx in indices:
                new_particles.append(self.particles[entity][idx])

            self.particles[entity] = np.array(new_particles, dtype=object)
            self.weights[entity] = np.ones(N, dtype=np.float32) / N

    def estimate(self,entity):
        """
        Estimate Entity's position from the particles 
        Returns:
            (est_x, est_y)
        """
        if self.weights[entity].sum() < 1e-7:
            # Fallback
            est_x = int(np.mean(self.particles[entity][:, 0]))
            est_y = int(np.mean(self.particles[entity][:, 1]))
            return (est_x, est_y)

        # Weighted average
        w_norm = self.weights[entity] / self.weights[entity].sum()
        est_x = int(np.sum(self.particles[entity][:, 0] * w_norm))
        est_y = int(np.sum(self.particles[entity][:, 1] * w_norm))
        return (est_x, est_y)

    def _compute_allow_dict(self, frame, entity_center):
        """
        Returns Dictionary of Possible Movements for an Entity
        """
        valid_moves = self.extract.valid_entity_movements(frame, entity_center)
        allow_dict = {
            "u": 1 if valid_moves["up"] else 0,
            "d": 1 if valid_moves["down"] else 0,
            "l": 1 if valid_moves["left"] else 0,
            "r": 1 if valid_moves["right"] else 0,
        }
        return allow_dict

In [None]:

extract = Extract()  


cap = cv2.VideoCapture("4m.mp4")

frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

video_filename = "kidnapcase.avi"  
frame_width, frame_height = frame_width, frame_height  
fps = 10  
fps = 30  


fourcc = cv2.VideoWriter_fourcc(*"XVID")  
out = cv2.VideoWriter(video_filename, fourcc, fps, (frame_width, frame_height))
no_of_particles = 500
init_method = "random"
resample_method = "systematic"

# Testing Randomize On for No Detection
pf1 = ParticleFilter(
    num_particles=no_of_particles,
    frame_width=frame_width,
    frame_height=frame_height,
    extract_instance=extract,
    init_prev_action="r",
    init_method=init_method,
    no_detection_randomise=True,
    resample_method=resample_method,
    motion_noise_std=5,  
    measurement_noise_px=10,  
)


colors = {
    "pacman": (0, 255, 255),       # Yellow
    "red_ghost": (0, 0, 255),      # Red
    "cyan_ghost": (255, 255, 0),   # Cyan
    "pink_ghost": (255, 0, 0),     # Pink
    "orange_ghost": (0, 165, 255)  # Orange
}

colors_best_estimate = {
    "pacman": (0, 120, 120),       # Yellow
    "red_ghost": (255, 0, 255),      # Red
    "cyan_ghost": (255, 0, 0),   # Cyan
    "pink_ghost": (255, 255, 0),   # Pink
    "orange_ghost": (0, 0, 255)  # Orange
}

prev_positions = None
innovations1 = {entity: [] for entity in ["pacman"]}
variances1 = {entity: [] for entity in ["pacman"]}


while True:
    ret, frame = cap.read()
    if not ret:
        break

    best_estimates, positions,motion_update,observation_update = pf1.step(frame, prev_positions)

    for entity in innovations1.keys():
        total_mean_distance = np.sum(np.abs(motion_update[entity][:, :2] - observation_update[entity][:, :2]))
        innovations1[entity].append(total_mean_distance)
        if pf1.particles[entity].size > 0:
            variance = np.var(pf1.particles[entity][:, :2], axis=0).sum()
            variances1[entity].append(variance)

    for entity in pf1.entities:
        if best_estimates[entity] is not None:
            entity_particles = pf1.particles[entity]
            for x, y, _ in entity_particles:
                cv2.circle(frame, (x, y), 1, colors[entity], -1)
            best_estimate = best_estimates[entity]
            cv2.circle(frame, best_estimate, 7, colors_best_estimate[entity], -1)

            if positions and positions.get(entity, None):
                if isinstance(positions[entity], list):
                    for c in positions[entity]:
                        cv2.circle(frame, c, 5, colors[entity], 2)
                elif positions[entity] is not None:
                    cv2.circle(frame, positions[entity], 5, colors[entity], 2)

    prev_positions = positions
    
    cv2.imshow("Pacman Tracking PF (with noise)", frame)
    if cv2.waitKey(1) & 0xFF == 27:
        break  # Esc to quit
    print("Best Estimate Locations:")
    for entity in pf1.entities:
        print(f"{entity.capitalize()}: {best_estimates[entity]}")
        if positions and positions.get(entity, None):
            print(f"Detected {entity.capitalize()} Locations: {positions[entity]}")

    out.write(frame)

out.release()
cap.release()
cv2.destroyAllWindows()
entity_colors = {
"pacman": (0.0, 0.47, 0.47),       # Scaled from (0, 120, 120) BGR
"red_ghost": (1.0, 0.0, 1.0),      # Scaled from (255, 0, 255)
"cyan_ghost": (1.0, 0.0, 0.0),     # Scaled from (255, 0, 0)
"pink_ghost": (1.0, 1.0, 0.0),     # Scaled from (255, 255, 0)
"orange_ghost": (0.0, 0.0, 1.0)    # Scaled from (0, 0, 255)
}
# Plot innovation graphs for each entity
plt.figure()

entity_legend = {
"pacman": "pacman",       # Scaled from (0, 120, 120) BGR
"red_ghost": "red",      # Scaled from (255, 0, 255)
"cyan_ghost": "cyan",     # Scaled from (255, 0, 0)
"pink_ghost": "pink",     # Scaled from (255, 255, 0)
"orange_ghost": "orange"    # Scaled from (0, 0, 255)
} 

plt.figure()
for entity in innovations1.keys():
    plt.plot(np.array(innovations1[entity]) / no_of_particles, label=f"Randomize True ", color="r")
plt.xlabel("Iterations")
plt.ylabel("Innovation")
plt.legend()
plt.show()

# Plot variance graphs
plt.figure()
for entity in variances1.keys():
    plt.plot(np.array(variances1[entity])/no_of_particles, label=f"Randomize True ", linestyle='--', color="r")

plt.xlabel("Iterations")
plt.ylabel("Particle Variance")
plt.legend()
plt.show()

