# SDL Assignment #4: PID Control and Fruit Ninja
*By Marcus Schwarting*

This assignment covers proportional-integral-derivative (PID) control in a simple example of a heating process, then considers particle filters as a means of identifying target objects and navigating towards them. In order to run this code, there are two packages that you may need to install:

- cv2 for creating images (`pip install opencv-python`)
- ffmpeg for creating videos (`sudo apt install ffmpeg`)

The rubric is broken down as follows:


#### PID Control (25 points)
- Implement PID control in the `Heating_Process.pid_controller` function (10 points).
- Using the function calls below (including changes in setpoint and several kicks), identify values of $K_p, K_i, K_d \neq 0$ that provide ideal control. See `pid_example.png` for an example of what a suitable level of control looks like. Hint: differences are often an order of magnitude, so $K_p \approx 10 K_i \approx 100 K_d$ (5 points).
- Once suitable conditions are identified for the PID controller, increase and decrease the values of $K_p,K_i,K_d$ until abberant behavior occurs. For all eight cases, describe what went wrong and why (10 points).

#### Fruit Ninja (75 points)
- Implement a particle filter for a single fruit (select the watermelon). Using the `fruit_ninja.save_video` function create image frames and save a video where the crosshairs remain in place (no control), but particles and dead reckoning are shown. This should be coded in `CrosshairsController.particle_filter` (20 points).
- Demonstrate the particle filter operating across all fruits. These should be three distinct particle filters performing localization separately (5 points).
- Experiment with different values of $\sigma$ (particle drift per frame), and number of particles, and justify why you selected the value of $\sigma$ and number of particles for later tests (10 points).
- Implement a controller in the `CrosshairsController.control()` function, which takes the crosshairs coordinates as well as the input image, uses the particle filter to assess the location of the fruit in the frame, and makes a decision about where to move the crosshairs, expressed as updated coordinates, passed to `FruitNinja.play_game()`. Justify your controller design decisions, such as how a fruit was selected, how the path of the fruit was anticipated, etc. Note: it is perfectly acceptable to have some fruit that you cannot catch (25 points).
- With the completed particle filter and crosshairs controller, create a Fruit Ninja video (or several) showcasing the performance of your code. Demonstrate that your controller scores better than a controller that does not move (see `stagnant_crosshairs.mp4` for an example) (10 points).
- Adjust the settings associated with the crosshairs speed (`max_controller_move` flag under `FruitNinja.play_game()`), and show that a faster crosshairs speed allows for a higher score while a slower crosshairs speed reduces scoring potential (5 points).

#### Additional Notes
- You are strongly encouraged to work in teams for this project.
- If you run into errors or bugs in this code, reach out to the TA (Marcus Schwarting) via Slack at your earliest convenience.
- You are welcome to make edits to any pre-written code that is part of the assignment (eg. `fruit_ninja.py`), but when you do so, please make a comment detailing what you changed and why.
- When you are ready to submit your assignment, please zip the folder you are working in (including all videos, written report, etc.) and submit the `.zip` file to Canvas. For any portions requiring a write-up, please create a `README.md` to include all these items. Furthermore, for extra files you created, please leave some indication in the `README.md` as to what these files are and do.

In [5]:
import warnings
import matplotlib.image as mpimg
import matplotlib.pyplot as plt
import numpy as np
from tqdm import tqdm

import cv2
from fruit_ninja import *

# Ignore RankWarning
warnings.filterwarnings("ignore", category=np.RankWarning)



## Fruit Ninja

In [11]:
class CrosshairsController:
    def __init__(self, total_particles=200):
        self.total_particles = total_particles
        self.W_particles = []
        self.S_particles = []
        self.O_particles = []
        self.W_particle_weights = []
        self.S_particle_weights = []
        self.O_particle_weights = []

        # background color distribution
        self.background = None
        # which if any fruit is being hunted atm
        self.curr_Hunt = None

        # for each fruit maintain
        # color distribution, shape, if its found at the moment
        # a list of average points from each step, and last average point to see if filter is stuck
        self.w_color_gen = None
        self.wShape = (154, 157)
        self.wFound = False
        self.wBestP = []
        self.wLastP = None

        self.s_color_gen = None
        self.sShape = (158, 150)
        self.sFound = False
        self.sBestP = []
        self.sLastP = None

        self.o_color_gen = None
        self.oShape = (190, 152)
        self.oFound = False
        self.oBestP = []
        self.oLastP = None

    # INITALIZING FUNCTIONS

    # scan background and check for overlapping pixels
    def scan_background(self):

        img_size = np.array([512, 1024, 3])
        image = cv2.resize(
            cv2.cvtColor(
                cv2.imread(
                    "/Users/bayardwalsh/Desktop/Assignment4BayardIreneKevin/SDL_Fruit_Ninja/fruit_imgs/wood_background.png"
                ),
                cv2.COLOR_BGR2RGB,
            ),
            (img_size[1], img_size[0]),
        )

        c_dst = np.zeros((256, 256, 256))
        height, width, _ = image.shape
        pixel_counts = {}
        for y in range(height):
            for x in range(width):
                pixel_value = tuple(image[y, x])
                pixel_counts[pixel_value] = pixel_counts.get(pixel_value, 0) + 1

        for pixel_value, count in pixel_counts.items():
            c_dst[pixel_value] = count

        # account for crosshairs location by adding high count crosshair colors (and tracking dots)
        c_dst[(255, 255, 255)] = 100
        c_dst[(255, 255, 0)] = 100
        c_dst[(255, 0, 255)] = 100
        c_dst[(0, 255, 255)] = 100
        c_dst[(255, 0, 0)] = 100
        c_dst[(0, 255, 0)] = 100
        c_dst[(0, 0, 255)] = 100
        c_dst[(0, 0, 0)] = 100

        self.background = c_dst

    # get distrubutions based on likliehood of being a given image for every pixel
    # this is loaded once initially for each fruit and assumes background is run
    # averages pixels in background and fruit to prevent false positives
    # highly weights pixels in fruit and NOT in background
    def particle_weights(self, image, fruit):
        height, width, _ = image.shape
        c_dst = np.zeros((256, 256, 256))

        pixel_counts = {}

        for y in range(height):
            for x in range(width):
                pixel_value = tuple(image[y, x])

                pixel_counts[pixel_value] = pixel_counts.get(pixel_value, 0) + 1

        for pixel_value, count in pixel_counts.items():
            r, g, b, = (
                int(pixel_value[0] * 255),
                int(pixel_value[1] * 255),
                int(pixel_value[2] * 255),
            )
            if self.background[r, g, b] == 0:
                c_dst[r, g, b] = 1
            else:
                c_dst[r, g, b] = (count) / (count + self.background[r, g, b])

        match fruit:
            case "watermelon":
                self.w_color_gen = c_dst
            case "strawberry":
                self.s_color_gen = c_dst
            case "orange":
                self.o_color_gen = c_dst
            case _:
                raise

    # GENERATE PIXEL FUNCTIONS

    # random point search
    def get_random_points(self, img, num_points=1000):
        height, width, _ = img.shape
        random_x = np.random.randint(0, width, num_points)
        random_y = np.random.randint(0, height, num_points)
        random_points = np.column_stack((random_x, random_y))

        return random_points

    # weight point distribution
    def get_weights_from_points(self, img, points, fruit):
        match fruit:
            case "w":
                ref = self.w_color_gen
            case "s":
                ref = self.s_color_gen
            case "o":
                ref = self.o_color_gen
            case _:
                raise ValueError("Invalid fruit type")

        weights = []
        for (x, y) in points:
            r, g, b = img[y, x]
            weights.append(ref[r, g, b])

        return weights

    # given a fruit and points, resample random points with sigma distribution and relative to fruit shape.
    def resample(self, img, points, fruit, sigma=5, num_points=500):
        new_points = []
        height, width, _ = img.shape
        attr_shape = f"{fruit}Shape"
        shape_height, shape_width = getattr(self, attr_shape)
        weights = self.get_weights_from_points(img, points, fruit)

        # no points found, fruit is lost, reset to random search
        if np.sum(weights) == 0:
            setattr(self, f"{fruit}Found", False)
            return self.get_random_points(img)

        weights /= np.sum(weights)

        for _ in range(num_points):
            while True:
                chosen_index = np.random.choice(len(points), p=weights)
                chosen_point = points[chosen_index]
                random_offsets_x = np.random.randint(
                    -shape_height / 4 * 1 / sigma, shape_height / 4 * 1 / sigma
                )
                random_offsets_y = np.random.randint(
                    -shape_width / 4 * 1 / sigma, shape_width / 4 * 1 / sigma
                )
                new_point = chosen_point + np.array(
                    [random_offsets_x, random_offsets_y]
                )
                if (0 <= new_point[0] < width) and (0 <= new_point[1] < height):
                    new_points.append(new_point.astype(int))
                    break

        return np.array(new_points)

    # MOVEMENT FUNCTIONSs

    # calculate dead reckoning from polynomial fnc to get next best point
    def get_dead_reckoning(self, points):
        x = np.array([point[0] for point in points])
        y = np.array([point[1] for point in points])

        try:
            coefficients = np.polyfit(x, y, 2)
        except np.linalg.LinAlgError:
            # Handle the exception, fallback to a linear fit or other method
            print("Warning: SVD did not converge, using linear fit as fallback.")
            coefficients = np.polyfit(x, y, 1)
            a, b = coefficients
            c = 0  # For a linear fit, c will be zero

        if len(coefficients) == 2:
            a, b = coefficients
            c = 0
        else:
            a, b, c = coefficients

        next_x = x[-1] + (x[-1] - x[-2]) if len(x) > 1 else x[-1] + 1
        next_y = a * next_x**2 + b * next_x + c

        return round(next_x), round(next_y)

    # based on the current fruit hunting, move cursor towards fruit based on dead reckoning
    def calculate_next_cursor_position(
        self, cursor_pos, object_pos, cursor_speed, time_step=1
    ):
        cursor_pos = np.array(cursor_pos)
        object_pos = np.array(object_pos)
        direction_vector = object_pos - cursor_pos
        distance_to_object = np.linalg.norm(direction_vector)
        if distance_to_object <= cursor_speed * time_step:
            return tuple(object_pos)
        direction_vector_normalized = direction_vector / distance_to_object
        new_cursor_pos = (
            cursor_pos + direction_vector_normalized * cursor_speed * time_step
        )

        return tuple(new_cursor_pos)

    # calls particle filter for given fruit
    def particle_filter(
        self,
        curr_image,
        curr_particles,
        curr_particle_weights,
        fruit,
        sigma=1.5,
    ):
        hitpoints = []

        f = getattr(self, f"{fruit}Found")

        # if fruit not found generate random search points
        if not f:
            setattr(
                self, f"{fruit.upper()}_particles", self.get_random_points(curr_image)
            )
        # if found resample points with sigma distribution and color weighting
        else:
            bestpart = getattr(self, f"{fruit.upper()}_particles")
            self.get_weights_from_points(curr_image, bestpart, fruit)
            setattr(
                self,
                f"{fruit.upper()}_particles",
                self.resample(curr_image, bestpart, fruit, sigma=sigma),
            )

        for (x, y) in curr_particles:
            attr = f"{fruit}_color_gen"
            r, g, b = curr_image[y, x]
            dist = getattr(self, attr)[r, g, b]
            if (
                dist == 1
            ):  # only take average of "good points" (in fruit and not in background)
                hitpoints.append((x, y))

        if len(hitpoints) != 0:
            match fruit:
                case "w":
                    self.wFound = True
                    self.wBestP.append(np.round(np.mean(hitpoints, axis=0)).astype(int))
                case "o":
                    self.oFound = True
                    self.oBestP.append(np.round(np.mean(hitpoints, axis=0)).astype(int))
                case "s":
                    self.sFound = True
                    self.sBestP.append(np.round(np.mean(hitpoints, axis=0)).astype(int))

    def control(self, crosshairs_pos, curr_image, killedfruits, max_dist=20, show=True, move_cursor=True):

        # reset filters on 3 instances
        # 1) fruit is killed
        # 2) fruit is missed/ offscreen
        # 3) fruit is "stuck" on background ie not moving
        def reset_fruit_status(fruit, bestP_attr, found_attr, lastP_attr):
            if fruit in killedfruits:
                setattr(self, found_attr, False)
                setattr(self, bestP_attr, [])
                setattr(self, lastP_attr, None)
            bestP = getattr(self, bestP_attr)
            lastP = getattr(self, lastP_attr)
            if bestP:
                if bestP[-1][1] + 5 > 512:
                    setattr(self, found_attr, False)
                    setattr(self, bestP_attr, [])
                    setattr(self, lastP_attr, None)
                elif lastP is None:
                    setattr(self, lastP_attr, bestP[-1])
                elif np.array_equal(bestP[-1], lastP):
                    setattr(self, found_attr, False)
                    setattr(self, bestP_attr, [])
                    setattr(self, lastP_attr, None)
                else:
                    setattr(self, lastP_attr, bestP[-1])

        fruits = [
            ("w", "wBestP", "wFound", "wLastP"),
            ("o", "oBestP", "oFound", "oLastP"),
            ("s", "sBestP", "sFound", "sLastP"),
        ]

        for fruit, bestP_attr, found_attr, lastP_attr in fruits:
            reset_fruit_status(fruit, bestP_attr, found_attr, lastP_attr)

        # Call particle filters for each fruit
        self.particle_filter(curr_image, self.W_particles, self.W_particle_weights, "w")

        self.particle_filter(curr_image, self.O_particles, self.O_particle_weights, "o")

        self.particle_filter(curr_image, self.S_particles, self.S_particle_weights, "s")

        #  move cursor or not
        if move_cursor:
            points = []
            coords = {
                "w": (self.wFound, self.wBestP),
                "o": (self.oFound, self.oBestP),
                "s": (self.sFound, self.sBestP),
            }
            positions = {"w": None, "o": None, "s": None}

            for key, (found, bestP) in coords.items():
                if found:
                    positions[key] = self.get_dead_reckoning(bestP)
                    points.append(positions[key])

            if points:
                xP, yP = crosshairs_pos
                cp = np.array(crosshairs_pos)

                if self.curr_Hunt and not coords[self.curr_Hunt][0]:
                    self.curr_Hunt = None

                if self.curr_Hunt is None:
                    distances = {
                        key: np.linalg.norm(pos - cp)
                        for key, pos in positions.items()
                        if pos is not None
                    }
                    if distances:
                        self.curr_Hunt = min(distances, key=distances.get)

                if self.curr_Hunt:
                    p = positions[self.curr_Hunt]
                    crosshairs_pos = self.calculate_next_cursor_position(
                        (xP, yP), p, max_dist
                    )

        # draw particles or not
        if show:
            particle_groups = [
                (self.W_particles, self.wBestP, (0, 0, 255)),
                (self.O_particles, self.oBestP, (0, 255, 0)),
                (self.S_particles, self.sBestP, (255, 0, 0)),
            ]

            for particles, bestP, color in particle_groups:
                for (x, y) in particles:
                    cv2.circle(curr_image, (x, y), 2, color, -1)

                if len(bestP) != 0:
                    (xx, yy) = self.get_dead_reckoning(bestP)
                    cv2.circle(curr_image, (xx, yy), 15, color, -1)

                if len(bestP) > 1:
                    for i in range(1, len(bestP)):
                        p1 = bestP[i - 1]
                        p2 = bestP[i]
                        cv2.line(curr_image, p1, p2, color, thickness=2)

        return crosshairs_pos


# Example usage

# NOTE: reset the file paths for each fruit and scan background
controller = CrosshairsController()
controller.scan_background()
imagew = mpimg.imread(
    "/Users/bayardwalsh/Desktop/Assignment4BayardIreneKevin/SDL_Fruit_Ninja/fruit_imgs/watermelon.png"
)
imageo = mpimg.imread(
    "/Users/bayardwalsh/Desktop/Assignment4BayardIreneKevin/SDL_Fruit_Ninja/fruit_imgs/orange.png"
)
images = mpimg.imread(
    "/Users/bayardwalsh/Desktop/Assignment4BayardIreneKevin/SDL_Fruit_Ninja/fruit_imgs/strawberry.png"
)

controller.particle_weights(imagew, "watermelon")
controller.particle_weights(imageo, "orange")
controller.particle_weights(images, "strawberry")

FNinja = FruitNinja()
FNinja.play_game(controller.control, game_length=300, save_vid_folder="testing")


100%|██████████| 300/300 [00:44<00:00,  6.74it/s]


<Figure size 640x480 with 0 Axes>

ffmpeg version 7.0 Copyright (c) 2000-2024 the FFmpeg developers
  built with Apple clang version 15.0.0 (clang-1500.3.9.4)
  configuration: --prefix=/opt/homebrew/Cellar/ffmpeg/7.0_1 --enable-shared --enable-pthreads --enable-version3 --cc=clang --host-cflags= --host-ldflags='-Wl,-ld_classic' --enable-ffplay --enable-gnutls --enable-gpl --enable-libaom --enable-libaribb24 --enable-libbluray --enable-libdav1d --enable-libharfbuzz --enable-libjxl --enable-libmp3lame --enable-libopus --enable-librav1e --enable-librist --enable-librubberband --enable-libsnappy --enable-libsrt --enable-libssh --enable-libsvtav1 --enable-libtesseract --enable-libtheora --enable-libvidstab --enable-libvmaf --enable-libvorbis --enable-libvpx --enable-libwebp --enable-libx264 --enable-libx265 --enable-libxml2 --enable-libxvid --enable-lzma --enable-libfontconfig --enable-libfreetype --enable-frei0r --enable-libass --enable-libopencore-amrnb --enable-libopencore-amrwb --enable-libopenjpeg --enable-libopenvino -