<a href="https://colab.research.google.com/github/ArunMunagala7/ENGR-E503-Introduction-to-Intelligent-Systems-Final-Project/blob/main/ISE_Project_Final_Code.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
"""
Q-Learning Integrated EKF-SLAM for Unicycle Robot Trajectory Control

This script implements an Extended Kalman Filter (EKF) for Simultaneous Localization and Mapping (SLAM)
combined with Q-Learning for adaptive trajectory control on a racetrack. The robot uses a unicycle model,
range-bearing sensor, and learns to adjust angular velocity based on lateral and heading errors.

Author: Generated from Q_Learning.ipynb
Date: December 15, 2025
"""

# Install necessary packages for headless environment (e.g., in Colab or server)
!apt-get install -y ffmpeg xvfb
!pip install pyvirtualdisplay

# Set up virtual display for headless plotting
from pyvirtualdisplay import Display
display = Display(visible=0, size=(800, 600))
display.start()

# Import required libraries
import numpy as np  # Numerical computations
import matplotlib.pyplot as plt  # Plotting
from matplotlib.patches import Ellipse  # For covariance ellipses
from matplotlib.animation import FuncAnimation  # For animations
from IPython.display import HTML  # For displaying animations in notebooks

class EKFSLAM:
    """
    Extended Kalman Filter for SLAM with unicycle robot model.
    Maintains joint state estimate of robot pose and landmark positions.
    """
    def __init__(self, robot_sigma_x=0.01, robot_sigma_y=0.01, robot_sigma_theta=0.01,
                 measurement_sigma_r=0.1, measurement_sigma_b=0.05,
                 gating_threshold=9.21):
        """
        Initialize EKF-SLAM.

        Args:
            robot_sigma_x, robot_sigma_y, robot_sigma_theta: Initial pose uncertainties
            measurement_sigma_r, measurement_sigma_b: Measurement noise std devs
            gating_threshold: Mahalanobis distance threshold for data association
        """
        """
        The state vector mu contains robot pose [x, y, theta] followed by landmark positions [mx1, my1, mx2, my2, ...].
        Sigma is the covariance matrix for the entire state.
        Q is the measurement noise covariance for range and bearing.
        lm_size tracks the number of landmarks in the map.
        """
        self.mu = np.array([0.0, 0.0, 0.0], dtype=float)  # State mean: [x, y, theta]
        self.Sigma = np.diag([robot_sigma_x, robot_sigma_y, robot_sigma_theta])  # Initial covariance
        self.Q = np.diag([measurement_sigma_r**2, measurement_sigma_b**2])  # Measurement noise covariance
        self.lm_size = 0  # Number of landmarks in map
        self.gating = gating_threshold  # Gating threshold for association

    def prediction(self, v, omega, dt, sigma_v=0.1, sigma_omega=0.1):
        """
        Prediction step: Update state and covariance based on motion model.

        Args:
            v: Linear velocity
            omega: Angular velocity
            dt: Time step
            sigma_v, sigma_omega: Process noise std devs
        """
        """
        The unicycle motion model updates the robot's position and orientation.
        For small omega, it's approximately straight line; otherwise, circular arc.
        The Jacobian G linearizes the motion for covariance propagation.
        """
        theta = self.mu[2]
        if abs(omega) < 1e-6:  # Straight line motion
            dx = v * dt * np.cos(theta)
            dy = v * dt * np.sin(theta)
            dtheta = omega * dt
            G = np.eye(3)  # Jacobian matrix
            G[0, 2] = -v * dt * np.sin(theta)
            G[1, 2] = v * dt * np.cos(theta)
        else:  # Circular motion
            dx = (v / omega) * (np.sin(theta + omega * dt) - np.sin(theta))
            dy = (v / omega) * (-np.cos(theta + omega * dt) + np.cos(theta))
            dtheta = omega * dt
            G = np.eye(3)
            G[0, 2] = (v / omega) * (np.cos(theta + omega * dt) - np.cos(theta))
            G[1, 2] = (v / omega) * (np.sin(theta + omega * dt) - np.sin(theta))

        # Update state mean
        self.mu[0] += dx
        self.mu[1] += dy
        self.mu[2] += dtheta
        self.mu[2] = (self.mu[2] + np.pi) % (2 * np.pi) - np.pi  # Normalize angle

        # Update covariance
        n = len(self.mu)
        full_G = np.eye(n)
        full_G[0:3, 0:3] = G
        R_robot = np.diag([(sigma_v * dt)**2, (sigma_v * dt)**2, (sigma_omega * dt)**2])
        full_R = np.zeros((n, n))
        full_R[0:3, 0:3] = R_robot
        self.Sigma = full_G @ self.Sigma @ full_G.T + full_R

    def update(self, zs):
        """
        Update step: Incorporate measurements using data association.

        Args:
            zs: List of measurements [(r, b), ...]
        """
        """
        For each measurement, find the best matching landmark using Mahalanobis distance.
        If no good match, initialize a new landmark. Otherwise, update the existing one.
        This implements the core of SLAM: associating observations to map features.
        """
        for z in zs:
            r, b = z
            min_d = float('inf')
            best_j = -1
            best_H = None
            best_S = None
            best_innov = None

            # Find best matching landmark
            for lm in range(self.lm_size):
                j = 3 + 2 * lm  # Landmark index in state vector
                mx = self.mu[j]
                my = self.mu[j + 1]
                dx = mx - self.mu[0]
                dy = my - self.mu[1]
                q = dx**2 + dy**2
                sqrtq = np.sqrt(q)
                z_hat = np.array([sqrtq, np.arctan2(dy, dx) - self.mu[2]])  # Predicted measurement
                z_hat[1] = (z_hat[1] + np.pi) % (2 * np.pi) - np.pi

                # Jacobian H
                H = np.zeros((2, len(self.mu)))
                H[0, 0] = -dx / sqrtq
                H[0, 1] = -dy / sqrtq
                H[0, 2] = 0
                H[1, 0] = dy / q
                H[1, 1] = -dx / q
                H[1, 2] = -1
                H[0, j] = dx / sqrtq
                H[0, j + 1] = dy / sqrtq
                H[1, j] = -dy / q
                H[1, j + 1] = dx / q

                S = H @ self.Sigma @ H.T + self.Q  # Innovation covariance
                innov = np.array([r - z_hat[0], b - z_hat[1]])
                innov[1] = (innov[1] + np.pi) % (2 * np.pi) - np.pi
                d = innov.T @ np.linalg.inv(S) @ innov  # Mahalanobis distance

                if d < min_d:
                    min_d = d
                    best_j = j
                    best_H = H
                    best_S = S
                    best_innov = innov

            """
            After checking all landmarks, decide whether to initialize a new landmark
            or update an existing one based on the gating threshold.
            This handles the exploration aspect of SLAM.
            """
            if best_j == -1 or min_d > self.gating:  # No match or poor match: initialize new landmark
                theta = self.mu[2]
                cos_tb = np.cos(theta + b)
                sin_tb = np.sin(theta + b)
                mx = self.mu[0] + r * cos_tb
                my = self.mu[1] + r * sin_tb
                G_pose = np.array([[1, 0, -r * sin_tb],
                                   [0, 1, r * cos_tb]])
                G_z = np.array([[cos_tb, -r * sin_tb],
                                [sin_tb, r * cos_tb]])
                self.mu = np.append(self.mu, [mx, my])
                n = len(self.mu)
                old_n = n - 2
                new_Sigma = np.zeros((n, n))
                new_Sigma[0:old_n, 0:old_n] = self.Sigma
                lm_cov = G_pose @ self.Sigma[0:3, 0:3] @ G_pose.T + G_z @ self.Q @ G_z.T
                new_Sigma[old_n:, old_n:] = lm_cov
                cross = self.Sigma[0:old_n, 0:3] @ G_pose.T
                new_Sigma[0:old_n, old_n:] = cross
                new_Sigma[old_n:, 0:old_n] = cross.T
                self.Sigma = new_Sigma
                self.lm_size += 1
            else:  # Update existing landmark
                K = self.Sigma @ best_H.T @ np.linalg.inv(best_S)  # Kalman gain
                self.mu += K @ best_innov
                self.mu[2] = (self.mu[2] + np.pi) % (2 * np.pi) - np.pi
                I = np.eye(len(self.mu))
                self.Sigma = (I - K @ best_H) @ self.Sigma  # Joseph form update

def plot_cov_ellipse(ax, pos, cov, color='blue', alpha=0.3):
    """
    Plot covariance ellipse on given axes.

    Args:
        ax: Matplotlib axes
        pos: Position [x, y]
        cov: 2x2 covariance matrix
        color: Ellipse color
        alpha: Transparency
    """
    if cov.shape == (2, 2):
        lambda_, v = np.linalg.eig(cov)
        lambda_ = np.sqrt(np.abs(lambda_))
        angle = np.rad2deg(np.arccos(v[0, 0]))
        width, height = 2 * lambda_[0], 2 * lambda_[1]
        ell = Ellipse(xy=pos, width=width, height=height, angle=angle)
        ell.set_facecolor(color)
        ell.set_alpha(alpha)
        ax.add_artist(ell)

class TargetCourse:
    """
    Represents the reference trajectory (racetrack path).
    Provides methods to find nearest point on path.
    """
    def __init__(self, cx, cy):
        """
        Initialize with path coordinates.

        Args:
            cx, cy: Arrays of x and y coordinates of the path
        """
        self.cx = cx
        self.cy = cy
        self.old_nearest_point_index = None

    def search_target_index(self, x, y, yaw):
        """
        Find the target point on the path for pure pursuit.

        Args:
            x, y, yaw: Robot position and orientation

        Returns:
            ind: Index of target point
            Lf: Look-ahead distance
        """
        if self.old_nearest_point_index is None:
            dx = [x - icx for icx in self.cx]
            dy = [y - icy for icy in self.cy]
            d = np.hypot(dx, dy)
            ind = np.argmin(d)
            self.old_nearest_point_index = ind
        else:
            ind = self.old_nearest_point_index
            distance_this_index = np.hypot(x - self.cx[ind], y - self.cy[ind])
            while True:
                next_ind = (ind + 1) % len(self.cx)
                distance_next_index = np.hypot(x - self.cx[next_ind], y - self.cy[next_ind])
                if distance_this_index < distance_next_index:
                    break
                ind = next_ind
                distance_this_index = distance_next_index
            self.old_nearest_point_index = ind

        Lf = 0.1 * 1.0 + 2.0  # Look-ahead distance: k * v + Lfc

        while Lf > np.hypot(x - self.cx[ind], y - self.cy[ind]):
            ind = (ind + 1) % len(self.cx)
        return ind, Lf

def pure_pursuit(x, y, yaw, course, pind):
    """
    Pure pursuit algorithm for steering control.

    Args:
        x, y, yaw: Robot pose
        course: TargetCourse object
        pind: Previous target index

    Returns:
        delta: Steering angle
        ind: New target index
    """
    ind, Lf = course.search_target_index(x, y, yaw)
    if pind >= ind:
        ind = pind
    if ind < len(course.cx):
        tx = course.cx[ind]
        ty = course.cy[ind]
    else:
        tx = course.cx[-1]
        ty = course.cy[-1]
        ind = len(course.cx) - 1
    alpha = np.arctan2(ty - y, tx - x) - yaw
    WB = 0.5  # Wheelbase
    delta = np.arctan2(2.0 * WB * np.sin(alpha) / Lf, 1.0)
    return delta, ind

class QLearningController:
    """
    Q-Learning controller for adjusting angular velocity based on errors.
    Learns optimal ω offsets to minimize lateral and heading errors.
    """
    def __init__(self, num_bins=10, num_actions=5, alpha=0.1, gamma=0.9, epsilon=0.1):
        """
        Initialize Q-Learning controller.

        Args:
            num_bins: Number of bins for state discretization
            num_actions: Number of possible actions (ω offsets)
            alpha: Learning rate
            gamma: Discount factor
            epsilon: Exploration rate
        """
        """
        The Q-table is 3D: [lat_bins, head_bins, actions].
        States are discretized lateral and heading errors.
        Actions are offsets to angular velocity for fine-tuning control.
        """
        self.num_bins = num_bins
        self.num_actions = num_actions
        self.alpha = alpha
        self.gamma = gamma
        self.epsilon = epsilon
        self.actions = np.linspace(-0.2, 0.2, num_actions)  # ω offsets
        self.q_table = np.zeros((num_bins, num_bins, num_actions))  # Q-table
        self.lat_bins = np.linspace(-2, 2, num_bins)  # Lateral error bins
        self.head_bins = np.linspace(-np.pi/2, np.pi/2, num_bins)  # Heading error bins

    def discretize(self, lat_err, head_err):
        """
        Discretize continuous errors into state indices.

        Args:
            lat_err: Lateral error
            head_err: Heading error

        Returns:
            lat_idx, head_idx: Discretized state
        """
        lat_idx = np.digitize(lat_err, self.lat_bins) - 1
        head_idx = np.digitize(head_err, self.head_bins) - 1
        lat_idx = np.clip(lat_idx, 0, self.num_bins - 1)
        head_idx = np.clip(head_idx, 0, self.num_bins - 1)
        return lat_idx, head_idx

    def choose_action(self, state, train=True):
        """
        Choose action using ε-greedy policy.

        Args:
            state: (lat_idx, head_idx)
            train: Whether in training mode

        Returns:
            action_idx: Index of chosen action
        """
        if train and np.random.rand() < self.epsilon:
            return np.random.choice(self.num_actions)
        return np.argmax(self.q_table[state])

    def update(self, state, action, reward, next_state):
        """
        Update Q-table using temporal difference learning.

        Args:
            state: Current state
            action: Action taken
            reward: Reward received
            next_state: Next state
        """
        q_predict = self.q_table[state][action]
        q_target = reward + self.gamma * np.max(self.q_table[next_state])
        self.q_table[state][action] += self.alpha * (q_target - q_predict)

def generate_racetrack(L=20, W=10, num_straight=50, num_curve=50):
    """
    Generate oval racetrack coordinates.

    Args:
        L: Length of straight sections
        W: Width (diameter of semicircles)
        num_straight, num_curve: Number of points per section

    Returns:
        cx, cy: Arrays of x and y coordinates
    """
    r = W / 2
    # Bottom straight
    x_bottom = np.linspace(0, L, num_straight)
    y_bottom = np.zeros(num_straight)
    # Right semicircle
    theta_right = np.linspace(-np.pi/2, np.pi/2, num_curve)
    x_right = L + r * np.cos(theta_right)
    y_right = r + r * np.sin(theta_right)
    # Top straight
    x_top = np.linspace(L, 0, num_straight)
    y_top = np.ones(num_straight) * W
    # Left semicircle
    theta_left = np.linspace(np.pi/2, np.pi/2 + np.pi, num_curve)
    x_left = 0 + r * np.cos(theta_left)
    y_left = r + r * np.sin(theta_left)
    cx = np.concatenate([x_bottom[:-1], x_right[:-1], x_top[:-1], x_left])
    cy = np.concatenate([y_bottom[:-1], y_right[:-1], y_top[:-1], y_left])
    return cx, cy

def get_errors(x, y, yaw, course):
    """
    Compute lateral and heading errors relative to path.

    Args:
        x, y, yaw: Robot pose
        course: TargetCourse object

    Returns:
        lat_err: Signed lateral error
        head_err: Heading error
    """
    ind, _ = course.search_target_index(x, y, yaw)
    tx, ty = course.cx[ind], course.cy[ind]
    head_err = np.arctan2(ty - y, tx - x) - yaw
    head_err = (head_err + np.pi) % (2 * np.pi) - np.pi
    dx = [x - icx for icx in course.cx]
    dy = [y - icy for icy in course.cy]
    lat_err = np.min(np.hypot(dx, dy))
    sign = np.sign(np.cross([tx - x, ty - y], [np.cos(yaw), np.sin(yaw)]))
    lat_err *= sign if sign != 0 else 1
    return lat_err, head_err

def train_qlearning(course, ql, num_episodes=50, steps_per_ep=600, dt=0.1, v=1.0, sigma_v=0.05, sigma_omega=0.02):
    """
    Train Q-Learning controller on the racetrack.

    Args:
        course: TargetCourse object
        ql: QLearningController object
        num_episodes: Number of training episodes
        steps_per_ep: Steps per episode
        dt: Time step
        v: Linear velocity
        sigma_v, sigma_omega: Process noise std devs
    """
    """
    Training involves multiple episodes where the robot learns to adjust ω
    to minimize lateral and heading errors. Each episode resets the pose,
    and the agent explores actions while updating the Q-table based on rewards.
    """
    for ep in range(num_episodes):
        true_pose = np.array([0.0, 0.0, 0.0])  # Reset pose each episode
        for t in range(steps_per_ep):
            x, y, theta = true_pose
            lat_err, head_err = get_errors(x, y, theta, course)
            state = ql.discretize(lat_err, head_err)
            action_idx = ql.choose_action(state, train=True)
            omega_offset = ql.actions[action_idx]
            delta, _ = pure_pursuit(x, y, theta, course, 0)
            WB = 0.5
            omega_base = v / WB * np.tan(delta)
            omega = omega_base + omega_offset
            v_true = v + np.random.normal(0, sigma_v)
            omega_true = omega + np.random.normal(0, sigma_omega)
            # Update true pose
            if abs(omega_true) < 1e-6:
                true_pose[0] += v_true * dt * np.cos(true_pose[2])
                true_pose[1] += v_true * dt * np.sin(true_pose[2])
                true_pose[2] += omega_true * dt
            else:
                true_pose[0] += (v_true / omega_true) * (np.sin(true_pose[2] + omega_true * dt) - np.sin(true_pose[2]))
                true_pose[1] += (v_true / omega_true) * (-np.cos(true_pose[2] + omega_true * dt) + np.cos(true_pose[2]))
                true_pose[2] += omega_true * dt
            true_pose[2] = (true_pose[2] + np.pi) % (2 * np.pi) - np.pi
            next_lat_err, next_head_err = get_errors(true_pose[0], true_pose[1], true_pose[2], course)
            next_state = ql.discretize(next_lat_err, next_head_err)
            reward = - (abs(next_lat_err) + 0.5 * abs(next_head_err))  # Negative error as reward
            ql.update(state, action_idx, reward, next_state)

def simulate_and_animate():
    """
    Run full simulation with training and animation.

    Returns:
        ani: Animation object
    """
    """
    The simulation consists of:
    1. Training the Q-Learning controller.
    2. Running the integrated SLAM with Q-Learning control for multiple laps.
    3. Generating an animation of the SLAM process, showing trajectories and landmarks.
    This demonstrates the combined performance of learning-based control and probabilistic mapping.
    """
    np.random.seed(42)  # For reproducibility
    cx, cy = generate_racetrack()
    course = TargetCourse(cx, cy)
    num_laps = 3
    num_steps = 600 * num_laps
    dt = 0.1
    v = 1.0
    max_range = 4.0
    sigma_r = 0.1
    sigma_b = 0.05
    sigma_v = 0.05
    sigma_omega = 0.02
    num_true_lm = 15
    landmarks = np.random.uniform(-5, 25, (num_true_lm, 2))  # Random landmarks around track
    true_pose = np.array([0.0, 0.0, 0.0], dtype=float)
    slam = EKFSLAM(robot_sigma_x=0.0, robot_sigma_y=0.0, robot_sigma_theta=0.0,
                   measurement_sigma_r=sigma_r, measurement_sigma_b=sigma_b)
    slam.mu = np.array([0.5, 0.2, 0.1], dtype=float)  # Initial estimate
    slam.Sigma = np.diag([0.2**2, 0.2**2, 0.1**2])
    ql = QLearningController()
    train_qlearning(course, ql, num_episodes=50)  # Train Q-Learning
    true_poses = [true_pose.copy()]
    est_poses = [slam.mu[:3].copy()]
    est_lms_hist = [slam.mu[3:].reshape(-1, 2).copy() if slam.lm_size > 0 else np.array([])]
    cov_hist = [slam.Sigma.copy()]
    target_ind = 0
    """
    Main simulation loop: For each time step, compute errors, choose Q-Learning action,
    predict SLAM state, simulate true motion with noise, generate measurements,
    update SLAM, and record history for animation.
    """
    for t in range(1, num_steps):
        est_x, est_y, est_theta = slam.mu[0], slam.mu[1], slam.mu[2]
        lat_err, theta_err = get_errors(est_x, est_y, est_theta, course)
        state = ql.discretize(lat_err, theta_err)
        action_idx = ql.choose_action(state, train=False)  # Greedy action
        omega_offset = ql.actions[action_idx]
        delta, target_ind = pure_pursuit(est_x, est_y, est_theta, course, target_ind)
        WB = 0.5
        omega_base = v / WB * np.tan(delta)
        omega = omega_base + omega_offset
        slam.prediction(v, omega, dt, sigma_v, sigma_omega)
        v_true = v + np.random.normal(0, sigma_v)
        omega_true = omega + np.random.normal(0, sigma_omega)
        # Update true pose
        if abs(omega_true) < 1e-6:
            true_pose[0] += v_true * dt * np.cos(true_pose[2])
            true_pose[1] += v_true * dt * np.sin(true_pose[2])
            true_pose[2] += omega_true * dt
        else:
            true_pose[0] += (v_true / omega_true) * (np.sin(true_pose[2] + omega_true * dt) - np.sin(true_pose[2]))
            true_pose[1] += (v_true / omega_true) * (-np.cos(true_pose[2] + omega_true * dt) + np.cos(true_pose[2]))
            true_pose[2] += omega_true * dt
        true_pose[2] = (true_pose[2] + np.pi) % (2 * np.pi) - np.pi
        zs = []  # Measurements
        for lm in landmarks:
            dx = lm[0] - true_pose[0]
            dy = lm[1] - true_pose[1]
            dist = np.sqrt(dx**2 + dy**2)
            if dist < max_range:
                bearing = np.arctan2(dy, dx) - true_pose[2]
                bearing = (bearing + np.pi) % (2 * np.pi) - np.pi
                r_noisy = dist + np.random.normal(0, sigma_r)
                b_noisy = bearing + np.random.normal(0, sigma_b)
                zs.append([r_noisy, b_noisy])
        slam.update(zs)
        true_poses.append(true_pose.copy())
        est_poses.append(slam.mu[:3].copy())
        est_lms_hist.append(slam.mu[3:].reshape(-1, 2).copy() if slam.lm_size > 0 else np.array([]))
        cov_hist.append(slam.Sigma.copy())
    true_poses = np.array(true_poses)
    est_poses = np.array(est_poses)
    """
    Set up matplotlib figure and animation. The update function redraws the plot
    for each frame, showing the evolution of trajectories, landmarks, and uncertainties.
    """
    fig, ax = plt.subplots(figsize=(8, 8))
    def update(frame):
        ax.clear()
        ax.set_xlim(-5, 25)
        ax.set_ylim(-5, 15)
        ax.plot(true_poses[:frame, 0], true_poses[:frame, 1], 'g-', label='True Trajectory')
        ax.plot(est_poses[:frame, 0], est_poses[:frame, 1], 'b-', label='Estimated Trajectory')
        ax.scatter(landmarks[:, 0], landmarks[:, 1], c='k', marker='x', label='True Landmarks')
        if est_lms_hist[frame].size > 0:
            ax.scatter(est_lms_hist[frame][:, 0], est_lms_hist[frame][:, 1], c='r', marker='o', label='Est Landmarks')
        plot_cov_ellipse(ax, est_poses[frame, :2], cov_hist[frame][:2, :2], color='blue')
        for lm_idx in range(len(est_lms_hist[frame])):
            j = 3 + 2 * lm_idx
            pos = est_lms_hist[frame][lm_idx]
            cov = cov_hist[frame][j:j+2, j:j+2]
            plot_cov_ellipse(ax, pos, cov, color='red')
        ax.plot(cx, cy, 'k--', label='Racetrack Path')
        ax.legend()
        ax.set_title(f'SLAM Convergence - Step {frame}')
    ani = FuncAnimation(fig, update, frames=range(0, num_steps, 5), interval=50)
    ani.save('slam.mp4', writer='ffmpeg')  # Save animation
    plt.close(fig)
    return ani

# Run simulation and display animation
ani = simulate_and_animate()
HTML(ani.to_jshtml())