# MPPI (Model Predictive Path-Integral) Control

In [None]:
import math
import numpy as np
import matplotlib.pyplot as plt
from typing import Tuple
from matplotlib import patches
from matplotlib.animation import ArtistAnimation
from IPython import display

## Control Target : Vehicle

<img src="../media/pathtracking.png" width="500px">

In [None]:
class Vehicle():
    def __init__(
            self,
            wheel_base: float = 2.5, # [m]
            vehicle_width = 3.0, # [m]
            vehicle_length = 4.0, # [m]
            max_steer_abs: float = 0.523, # [rad]
            max_accel_abs: float = 2.000, # [m/s^2]
            ref_path: np.ndarray = np.array([[-100.0, 0.0], [100.0, 0.0]]),
            obstacle_circles: np.ndarray = np.array([[-2.0, 1.0, 1.0], [2.0, -1.0, 1.0]]), # [obs_x, obs_y, obs_radius]
            delta_t: float = 0.05, # [s]
            visualize: bool = True,
        ) -> None:
        """initialize vehicle environment
        state variables:
            x: x-axis position in the global frame [m]
            y: y-axis position in the global frame [m]
            yaw: orientation in the global frame [rad]
            v: longitudinal velocity [m/s]
        control input:
            steer: front tire angle of the vehicle [rad] (positive in the counterclockwize direction)
            accel: longitudinal acceleration of the vehicle [m/s^2] (positive in the forward direction)
        Note: dynamics of the vehicle is the Kinematic Bicycle Model. 
        """
        # vehicle parameters
        self.wheel_base = wheel_base#[m]
        self.vehicle_w = vehicle_width
        self.vehicle_l = vehicle_length
        self.max_steer_abs = max_steer_abs # [rad]
        self.max_accel_abs = max_accel_abs # [m/s^2]
        self.delta_t = delta_t #[s]
        self.ref_path = ref_path

        # obstacle parameters
        self.obstacle_circles = obstacle_circles

        # visualization settings
        self.view_x_lim_min, self.view_x_lim_max = -20.0, 20.0
        self.view_y_lim_min, self.view_y_lim_max = -25.0, 25.0
        self.minimap_view_x_lim_min, self.minimap_view_x_lim_max = -40.0, 40.0
        self.minimap_view_y_lim_min, self.minimap_view_y_lim_max = -10.0, 40.0

        # reset environment
        self.visualize_flag = visualize
        self.reset()

    def reset(
            self, 
            init_state: np.ndarray = np.array([0.0, 0.0, 0.0, 0.0]), # [x, y, yaw, v]
        ) -> None:
        """reset environment to initial state"""

        # reset state variables
        self.state = init_state

        # clear animation frames
        self.frames = []

        if self.visualize_flag:
            # prepare figure
            self.fig = plt.figure(figsize=(9,9))
            self.main_ax = plt.subplot2grid((3,4), (0,0), rowspan=3, colspan=3)
            self.minimap_ax = plt.subplot2grid((3,4), (0,3))
            self.steer_ax = plt.subplot2grid((3,4), (1,3))
            self.accel_ax = plt.subplot2grid((3,4), (2,3))

            # graph layout settings
            ## main view
            self.main_ax.set_aspect('equal')
            self.main_ax.set_xlim(self.view_x_lim_min, self.view_x_lim_max)
            self.main_ax.set_ylim(self.view_y_lim_min, self.view_y_lim_max)
            self.main_ax.tick_params(labelbottom=False, labelleft=False, labelright=False, labeltop=False)
            self.main_ax.tick_params(bottom=False, left=False, right=False, top=False)
            ## mini map
            self.minimap_ax.set_aspect('equal')
            self.minimap_ax.axis('off')
            self.minimap_ax.set_xlim(self.minimap_view_x_lim_min, self.minimap_view_x_lim_max)
            self.minimap_ax.set_ylim(self.minimap_view_y_lim_min, self.minimap_view_y_lim_max)
            ## steering angle view
            self.steer_ax.set_title("Steering Angle", fontsize="12")
            self.steer_ax.axis('off')
            ## acceleration view
            self.accel_ax.set_title("Acceleration", fontsize="12")
            self.accel_ax.axis('off')
            
            # apply tight layout
            self.fig.tight_layout()

    def update(
            self, 
            u: np.ndarray, 
            delta_t: float = 0.0, 
            append_frame: bool = True, 
            optimal_traj: np.ndarray = np.empty(0), # predicted optimal trajectory from mppi
            sampled_traj_list: np.ndarray = np.empty(0), # sampled trajectories from mppi
        ) -> None:
        """update state variables"""
        # keep previous states
        x, y, yaw, v = self.state

        # prepare params
        l = self.wheel_base
        dt = self.delta_t if delta_t == 0.0 else delta_t

        # limit control inputs
        steer = np.clip(u[0], -self.max_steer_abs, self.max_steer_abs)
        accel = np.clip(u[1], -self.max_accel_abs, self.max_accel_abs)

        # update state variables
        new_x = x + v * np.cos(yaw) * dt
        new_y = y + v * np.sin(yaw) * dt
        new_yaw = yaw + v / l * np.tan(steer) * dt
        new_v = v + accel * dt
        self.state = np.array([new_x, new_y, new_yaw, new_v])

        # record frame
        if append_frame:
            self.append_frame(steer, accel, optimal_traj, sampled_traj_list)

    def get_state(self) -> np.ndarray:
        """return state variables"""
        return self.state.copy()

    def append_frame(self, steer: float, accel: float, optimal_traj: np.ndarray, sampled_traj_list: np.ndarray) -> list:
        """draw a frame of the animation."""
        # get current states
        x, y, yaw, v = self.state

        ### main view ###
        # draw the vehicle shape
        vw, vl = self.vehicle_w, self.vehicle_l
        vehicle_shape_x = [-0.5*vl, -0.5*vl, +0.5*vl, +0.5*vl, -0.5*vl, -0.5*vl]
        vehicle_shape_y = [0.0, +0.5*vw, +0.5*vw, -0.5*vw, -0.5*vw, 0.0]
        rotated_vehicle_shape_x, rotated_vehicle_shape_y = \
            self._affine_transform(vehicle_shape_x, vehicle_shape_y, yaw, [0, 0]) # make the vehicle be at the center of the figure
        frame = self.main_ax.plot(rotated_vehicle_shape_x, rotated_vehicle_shape_y, color='black', linewidth=2.0, zorder=3)

        # draw wheels
        ww, wl = 0.4, 0.7 #[m]
        wheel_shape_x = np.array([-0.5*wl, -0.5*wl, +0.5*wl, +0.5*wl, -0.5*wl, -0.5*wl])
        wheel_shape_y = np.array([0.0, +0.5*ww, +0.5*ww, -0.5*ww, -0.5*ww, 0.0])

        ## rear-left wheel
        wheel_shape_rl_x, wheel_shape_rl_y = \
            self._affine_transform(wheel_shape_x, wheel_shape_y, 0.0, [-0.3*vl, 0.3*vw])
        wheel_rl_x, wheel_rl_y = \
            self._affine_transform(wheel_shape_rl_x, wheel_shape_rl_y, yaw, [0, 0])
        frame += self.main_ax.fill(wheel_rl_x, wheel_rl_y, color='black', zorder=3)

        ## rear-right wheel
        wheel_shape_rr_x, wheel_shape_rr_y = \
            self._affine_transform(wheel_shape_x, wheel_shape_y, 0.0, [-0.3*vl, -0.3*vw])
        wheel_rr_x, wheel_rr_y = \
            self._affine_transform(wheel_shape_rr_x, wheel_shape_rr_y, yaw, [0, 0])
        frame += self.main_ax.fill(wheel_rr_x, wheel_rr_y, color='black', zorder=3)

        ## front-left wheel
        wheel_shape_fl_x, wheel_shape_fl_y = \
            self._affine_transform(wheel_shape_x, wheel_shape_y, steer, [0.3*vl, 0.3*vw])
        wheel_fl_x, wheel_fl_y = \
            self._affine_transform(wheel_shape_fl_x, wheel_shape_fl_y, yaw, [0, 0])
        frame += self.main_ax.fill(wheel_fl_x, wheel_fl_y, color='black', zorder=3)

        ## front-right wheel
        wheel_shape_fr_x, wheel_shape_fr_y = \
            self._affine_transform(wheel_shape_x, wheel_shape_y, steer, [0.3*vl, -0.3*vw])
        wheel_fr_x, wheel_fr_y = \
            self._affine_transform(wheel_shape_fr_x, wheel_shape_fr_y, yaw, [0, 0])
        frame += self.main_ax.fill(wheel_fr_x, wheel_fr_y, color='black', zorder=3)

        # draw the vehicle center circle
        vehicle_center = patches.Circle([0, 0], radius=vw/20.0, fc='white', ec='black', linewidth=2.0, zorder=6)
        frame += [self.main_ax.add_artist(vehicle_center)]

        # draw the reference path
        ref_path_x = self.ref_path[:, 0] - np.full(self.ref_path.shape[0], x)
        ref_path_y = self.ref_path[:, 1] - np.full(self.ref_path.shape[0], y)
        frame += self.main_ax.plot(ref_path_x, ref_path_y, color='black', linestyle="dashed", linewidth=1.5)

        # draw the information text
        text = "vehicle velocity = {v:>+6.1f} [m/s]".format(pos_e=x, head_e=np.rad2deg(yaw), v=v)
        frame += [self.main_ax.text(0.5, 0.02, text, ha='center', transform=self.main_ax.transAxes, fontsize=14, fontfamily='monospace')]

        # draw the predicted optimal trajectory from mppi
        if optimal_traj.any():
            optimal_traj_x_offset = np.ravel(optimal_traj[:, 0]) - np.full(optimal_traj.shape[0], x)
            optimal_traj_y_offset = np.ravel(optimal_traj[:, 1]) - np.full(optimal_traj.shape[0], y)
            frame += self.main_ax.plot(optimal_traj_x_offset, optimal_traj_y_offset, color='#990099', linestyle="solid", linewidth=1.5, zorder=5)

        # draw the sampled trajectories from mppi
        if sampled_traj_list.any():
            min_alpha_value = 0.25
            max_alpha_value = 0.35
            for idx, sampled_traj in enumerate(sampled_traj_list):
                # draw darker for better samples
                alpha_value = (1.0 - (idx+1)/len(sampled_traj_list)) * (max_alpha_value - min_alpha_value) + min_alpha_value
                sampled_traj_x_offset = np.ravel(sampled_traj[:, 0]) - np.full(sampled_traj.shape[0], x)
                sampled_traj_y_offset = np.ravel(sampled_traj[:, 1]) - np.full(sampled_traj.shape[0], y)
                frame += self.main_ax.plot(sampled_traj_x_offset, sampled_traj_y_offset, color='gray', linestyle="solid", linewidth=0.2, zorder=4, alpha=alpha_value)

        # draw the circular obstacles in the main view
        for obs in self.obstacle_circles:
            obs_x, obs_y, obs_r = obs
            obs_circle = patches.Circle([obs_x-x, obs_y-y], radius=obs_r, fc='white', ec='black', linewidth=2.0, zorder=0)
            frame += [self.main_ax.add_artist(obs_circle)]

        ### mini map view ###
        frame += self.minimap_ax.plot(self.ref_path[:, 0], self.ref_path[:,1], color='black', linestyle='dashed')
        rotated_vehicle_shape_x_minimap, rotated_vehicle_shape_y_minimap = \
            self._affine_transform(vehicle_shape_x, vehicle_shape_y, yaw, [x, y]) # make the vehicle be at the center of the figure
        frame += self.minimap_ax.plot(rotated_vehicle_shape_x_minimap, rotated_vehicle_shape_y_minimap, color='black', linewidth=2.0, zorder=3)
        frame += self.minimap_ax.fill(rotated_vehicle_shape_x_minimap, rotated_vehicle_shape_y_minimap, color='white', zorder=2)

        # draw the circular obstacles in the mini map view
        for obs in self.obstacle_circles:
            obs_x, obs_y, obs_r = obs
            obs_circle = patches.Circle([obs_x, obs_y], radius=obs_r, fc='white', ec='black', linewidth=2.0, zorder=0)
            frame += [self.minimap_ax.add_artist(obs_circle)]

        ### control input view ###
        # steering angle
        MAX_STEER = self.max_steer_abs
        PIE_RATE = 3.0/4.0
        PIE_STARTANGLE = 225 # [deg]
        s_abs = np.abs(steer)
        if steer < 0.0: # when turning right
            steer_pie_obj, _ = self.steer_ax.pie([MAX_STEER*PIE_RATE, s_abs*PIE_RATE, (MAX_STEER-s_abs)*PIE_RATE, 2*MAX_STEER*(1-PIE_RATE)], startangle=PIE_STARTANGLE, counterclock=False, colors=["lightgray", "black", "lightgray", "white"], wedgeprops={'linewidth': 0, "edgecolor":"white", "width":0.4})
        else: # when turning left
            steer_pie_obj, _ = self.steer_ax.pie([(MAX_STEER-s_abs)*PIE_RATE, s_abs*PIE_RATE, MAX_STEER*PIE_RATE, 2*MAX_STEER*(1-PIE_RATE)], startangle=PIE_STARTANGLE, counterclock=False, colors=["lightgray", "black", "lightgray", "white"], wedgeprops={'linewidth': 0, "edgecolor":"white", "width":0.4})
        frame += steer_pie_obj
        frame += [self.steer_ax.text(0, -1, f"{np.rad2deg(steer):+.2f} " + r"$ \rm{[deg]}$", size = 14, horizontalalignment='center', verticalalignment='center', fontfamily='monospace')]

        # acceleration
        MAX_ACCEL = self.max_accel_abs
        PIE_RATE = 3.0/4.0
        PIE_STARTANGLE = 225 # [deg]
        a_abs = np.abs(accel)
        if accel > 0.0:
            accel_pie_obj, _ = self.accel_ax.pie([MAX_ACCEL*PIE_RATE, a_abs*PIE_RATE, (MAX_ACCEL-a_abs)*PIE_RATE, 2*MAX_ACCEL*(1-PIE_RATE)], startangle=PIE_STARTANGLE, counterclock=False, colors=["lightgray", "black", "lightgray", "white"], wedgeprops={'linewidth': 0, "edgecolor":"white", "width":0.4})
        else:
            accel_pie_obj, _ = self.accel_ax.pie([(MAX_ACCEL-a_abs)*PIE_RATE, a_abs*PIE_RATE, MAX_ACCEL*PIE_RATE, 2*MAX_ACCEL*(1-PIE_RATE)], startangle=PIE_STARTANGLE, counterclock=False, colors=["lightgray", "black", "lightgray", "white"], wedgeprops={'linewidth': 0, "edgecolor":"white", "width":0.4})
        frame += accel_pie_obj
        frame += [self.accel_ax.text(0, -1, f"{accel:+.2f} " + r"$ \rm{[m/s^2]}$", size = 14, horizontalalignment='center', verticalalignment='center', fontfamily='monospace')]

        # append frame
        self.frames.append(frame)

    # rotate shape and return location on the x-y plane.
    def _affine_transform(self, xlist: list, ylist: list, angle: float, translation: list=[0.0, 0.0]) -> Tuple[list, list]:
        transformed_x = []
        transformed_y = []
        if len(xlist) != len(ylist):
            print("[ERROR] xlist and ylist must have the same size.")
            raise AttributeError

        for i, xval in enumerate(xlist):
            transformed_x.append((xlist[i])*np.cos(angle)-(ylist[i])*np.sin(angle)+translation[0])
            transformed_y.append((xlist[i])*np.sin(angle)+(ylist[i])*np.cos(angle)+translation[1])
        transformed_x.append(transformed_x[0])
        transformed_y.append(transformed_y[0])
        return transformed_x, transformed_y

    def show_animation(self, interval_ms: int) -> None:
        """show animation of the recorded frames"""
        ani = ArtistAnimation(self.fig, self.frames, interval=interval_ms) # blit=True
        html = display.HTML(ani.to_jshtml())
        display.display(html)
        plt.close()

    def save_animation(self, filename: str, interval: int, movie_writer: str="ffmpeg") -> None:
        """save animation of the recorded frames (ffmpeg required)"""
        ani = ArtistAnimation(self.fig, self.frames, interval=interval)
        ani.save(filename, writer=movie_writer)

## [test simulation] ##
sim_step = 100
delta_t = 0.05
ref_path = np.genfromtxt('../data/ovalpath.csv', delimiter=',', skip_header=1)
vehicle = Vehicle(ref_path=ref_path[:, 0:2])
for i in range(sim_step):
    vehicle.update(u=[0.6 * np.sin(i/3.0), 2.2 * np.sin(i/10.0)], delta_t=delta_t) # u is the control input to the vehicle, [ steer[rad], accel[m/s^2] ]
vehicle.show_animation(interval_ms=delta_t*1000) # show animation
# save animation
vehicle.save_animation("vehicle.mp4", interval=int(delta_t * 1000), movie_writer="ffmpeg") # ffmpeg is required to write mp4 file

## Controller : MPPI Controller

### Note
The following MPPI implementation follows Algorithms 1 and 2 of the reference paper. 

### Reference
1. G. Williams et al. "Information-Theoretic Model Predictive Control: Theory and Applications to Autonomous Driving" 
    - URL : https://ieeexplore.ieee.org/document/8558663
    - PDF : https://arxiv.org/pdf/1707.02342.pdf

### Brief overview of MPPI algorithm
Here is a general process flow to calculate optimal input with mppi algorithm.

**[Step 1]** ramdomly sample input sequence

Mean input sequence $U$ and ramdomly sampled input sequence $V$ are defined as follows.  
Usually, optimal input sequence on the previous step is used as $U$. 
$$
    \begin{align}
        & (\mathbf{u}_0, \mathbf{u}_1, ... \mathbf{u}_{T-1}) = U \in \mathbb{R}^{m \times T}, \nonumber \\
        & (\mathbf{v}_0, \mathbf{v}_1, ... \mathbf{v}_{T-1}) = V \in \mathbb{R}^{m \times T}, \nonumber \\
        & \mathbf{v}_t = \mathbf{u}_t + \epsilon_t, \nonumber \\
        & \epsilon_t \sim \mathcal{N}(0, \Sigma).\nonumber 
    \end{align}
$$


**[Step 2]** predict future states and evaluate cost for each sample

We assume a discrete time, continuous state-action dynamical system as a control target.  
$\mathbf{x}$ is a system state, and $\mathbf{v}$ is a sampled control input.
$$
\begin{align}
\mathbf{x}_t  &\in \mathbb{R}^{n}, \nonumber \\
\mathbf{x}_{t+1} &= \mathbf{F}(\mathbf{x}_t, \mathbf{v}_t).\nonumber 
\end{align}
$$

Then costs (i.e. penalties to be minimized) for sampled sequences $S(V; \mathbf{x}_0)$ can be evaluated with following formulations.
$$
    \begin{align}
        & S(V; \mathbf{x}_0) = C(\mathcal{H}(V; \mathbf{x}_0)), \nonumber \\
        & C(\mathbf{x}_0, \mathbf{x}_1, ... \mathbf{x}_T) = \phi(\mathbf{x}_T) + \sum_{t=0}^{T-1}c(\mathbf{x}_t), \nonumber \\
        & \mathcal{H}(V; \mathbf{x}_0) = \left( \mathbf{x}_0, \mathbf{F}(\mathbf{x}_0, \mathbf{v}_0), \mathbf{F}(\mathbf{F}(\mathbf{x}_0, \mathbf{v}_0), \mathbf{v}_1), ... \right).\nonumber 
    \end{align}
$$

**[Step 3]** calculate weight for each sample sequence

Weight for a each sample sequence is derived on the basis of information theory.  
There are K sample sequences in total, represented with an index k.  
Good control sequence with small cost value get more weight, and vice versa.  

$$
\begin{align}
& w(V) = \frac{1}{\eta} \exp
\left( 
    -\frac{1}{\lambda}
    \left(
        S(V) + \lambda(1-\alpha) \sum^{T-1}_{t=0} \mathbf{u}_t^T \Sigma^{-1} (\mathbf{u}_t + \epsilon_t) - \rho
    \right)
\right) \nonumber \\
& \eta = 
\sum_{k=1}^K \exp
\left( 
    -\frac{1}{\lambda}
    \left(
        S(U + \mathcal{E}_k) + \lambda(1-\alpha) \sum^{T-1}_{t=0} \mathbf{u}_t^T \Sigma^{-1} (\mathbf{u}_t + \epsilon_t^k) - \rho
    \right)
\right)\nonumber \\
& \rho = 
\min_k 
\left( S(V_k) + \lambda(1-\alpha) \sum^{T-1}_{t=0} \mathbf{u}_t^T \Sigma^{-1} (\mathbf{u}_t + \epsilon_t^k) \right)\nonumber
\end{align}
$$

Note that $\rho$ is inserted into the formulation to avoid overflow errors during implementation.

**[Step 4]** get optimal control input sequence

Finally, optimal input trajectory for the next ($i+1$) step is given adding weighted sample sequences to the previous solution.

$$
\begin{align}
    \mathbf{u}_t^{i+1} % &= \mathbb{E}_{\mathbb{Q}_{\hat{U}, \Sigma}}[w(V)\mathbf{v}_t]
                 = u_t^i + \sum_{k=1}^K w(V_k) \epsilon_t^k \nonumber 
\end{align}
$$


In [None]:
class MPPIControllerForPathTracking():
    def __init__(
            self,
            delta_t: float = 0.05,
            wheel_base: float = 2.5, # [m]
            vehicle_width: float = 3.0, # [m]
            vehicle_length: float = 4.0, # [m]
            max_steer_abs: float = 0.523, # [rad]
            max_accel_abs: float = 2.000, # [m/s^2]
            ref_path: np.ndarray = np.array([[0.0, 0.0, 0.0, 1.0], [10.0, 0.0, 0.0, 1.0]]),
            horizon_step_T: int = 30,
            number_of_samples_K: int = 1000,
            param_exploration: float = 0.0,
            param_lambda: float = 50.0,
            param_alpha: float = 1.0,
            sigma: np.ndarray = np.array([[0.5, 0.0], [0.0, 0.1]]), 
            stage_cost_weight: np.ndarray = np.diag([50.0, 50.0, 1.0, 20.0]), # weight for [x, y, yaw, v]
            terminal_cost_weight: np.ndarray = np.diag([50.0, 50.0, 1.0, 20.0]), # weight for [x, y, yaw, v]
            visualize_optimal_traj = True,  # if True, optimal trajectory is visualized
            visualze_sampled_trajs = False, # if True, sampled trajectories are visualized
            obstacle_circles: np.ndarray = np.array([[-2.0, 1.0, 1.0], [2.0, -1.0, 1.0]]), # [obs_x, obs_y, obs_radius]
            collision_safety_margin_rate: float = 1.2, # safety margin for collision check
    ) -> None:
        """initialize mppi controller for path-tracking"""
        # mppi parameters
        self.dim_x = 4 # dimension of system state vector
        self.dim_u = 2 # dimension of control input vector
        self.T = horizon_step_T # prediction horizon
        self.K = number_of_samples_K # number of sample trajectories
        self.param_exploration = param_exploration  # constant parameter of mppi
        self.param_lambda = param_lambda  # constant parameter of mppi
        self.param_alpha = param_alpha # constant parameter of mppi
        self.param_gamma = self.param_lambda * (1.0 - (self.param_alpha))  # constant parameter of mppi
        self.Sigma = sigma # deviation of noise
        self.stage_cost_weight = stage_cost_weight
        self.terminal_cost_weight = terminal_cost_weight
        self.visualize_optimal_traj = visualize_optimal_traj
        self.visualze_sampled_trajs = visualze_sampled_trajs

        # vehicle parameters
        self.delta_t = delta_t #[s]
        self.wheel_base = wheel_base #[m]
        self.vehicle_w = vehicle_width #[m]
        self.vehicle_l = vehicle_length #[m]
        self.max_steer_abs = max_steer_abs # [rad]
        self.max_accel_abs = max_accel_abs # [m/s^2]
        self.ref_path = ref_path

        # obstacle parameters
        self.obstacle_circles = obstacle_circles
        self.collision_safety_margin_rate = collision_safety_margin_rate

        # mppi variables
        self.u_prev = np.zeros((self.T, self.dim_u))

        # ref_path info
        self.prev_waypoints_idx = 0

    def _get_nearest_waypoint(self, x: float, y: float, update_prev_idx: bool = False):
        """search the closest waypoint to the vehicle on the reference path"""

        SEARCH_IDX_LEN = 200 # [points] forward search range
        prev_idx = self.prev_waypoints_idx
        dx = [x - ref_x for ref_x in self.ref_path[prev_idx:(prev_idx + SEARCH_IDX_LEN), 0]]
        dy = [y - ref_y for ref_y in self.ref_path[prev_idx:(prev_idx + SEARCH_IDX_LEN), 1]]
        d = [idx ** 2 + idy ** 2 for (idx, idy) in zip(dx, dy)]
        min_d = min(d)
        nearest_idx = d.index(min_d) + prev_idx

        # get reference values of the nearest waypoint
        ref_x = self.ref_path[nearest_idx,0]
        ref_y = self.ref_path[nearest_idx,1]
        ref_yaw = self.ref_path[nearest_idx,2]
        ref_v = self.ref_path[nearest_idx,3]

        # update nearest waypoint index if necessary
        if update_prev_idx:
            self.prev_waypoints_idx = nearest_idx 

        return  ref_x, ref_y, ref_yaw, ref_v
    
    def _calc_epsilon(self, size_sample: int, size_time_step: int, size_dim_u: int) -> np.ndarray:
        
        sigma = self.Sigma
        # Implement your epsilon calculation logic
        if sigma.shape[0] != sigma.shape[1] or sigma.shape[0] != size_dim_u or size_dim_u < 1:
            print("[ERROR] sigma must be a square matrix with the size of size_dim_u.")
            raise ValueError

        mu = np.zeros((size_dim_u))
        epsilon = np.random.multivariate_normal(mu, sigma, (size_sample, size_time_step))
        return epsilon

    #clamp control inputs
    def _g(self, v: np.ndarray) -> np.ndarray:
        # 0 - steering, 1 - acceleration

        v[0] = np.clip(v[0], -self.max_steer_abs, self.max_steer_abs)
        v[1] = np.clip(v[1], -self.max_accel_abs, self.max_accel_abs)

        return v
    
    #Stage cost
    def _c(self, x_t: np.ndarray) -> float:
        # x_t :x, y, yaw, vel
        #normalize yaw
        x_t[2] = ((x_t[2] + 2.0*np.pi) % (2.0*np.pi))

        #get tracking traj
        ref_x, ref_y, ref_yaw, ref_v = self._get_nearest_waypoint(x_t[0], x_t[1])
        ref_x_t = np.array([ref_x, ref_y, ref_yaw, ref_v])

        delta_x = (x_t - ref_x_t).reshape(-1,1) 
        # print(delta_x.shape)
        stage_cost = (delta_x.T @ self.stage_cost_weight @ delta_x)
        # print(stage_cost)
        #To add collision costs
        stage_cost += 1.0e8*self._is_collided(x_t)

        return stage_cost[0][0]
        

    def _phi(self, x_T: np.ndarray) -> float:
        # Implement your terminal cost calculation logic
        x_T[2] = ((x_T[2] + 2.0*np.pi) % (2.0*np.pi))

        #get tracking traj
        ref_x, ref_y, ref_yaw, ref_v = self._get_nearest_waypoint(x_T[0], x_T[1])
        ref_x_T = np.array([ref_x, ref_y, ref_yaw, ref_v])

        delta_xT = (x_T - ref_x_T).reshape(-1,1)
        # print(delta_xT.shape)
        terminal_cost = delta_xT.T @ self.terminal_cost_weight @ delta_xT
        
        #Add collision cost
        terminal_cost += 1.0e8*self._is_collided(x_T)
        return terminal_cost[0][0]

    def _affine_transform(self, xlist: list, ylist: list, angle: float, translation: list = [0.0, 0.0]) -> Tuple[list, list]:
        # Implement your affine transformation logic
        x_cen, y_cen = translation

        transformed_x = []
        transformed_y = []

        if(len(xlist) != len(ylist)):
            print("[ERROR] sigma must be a square matrix with the size of size_dim_u.")
            raise ValueError

        for i in range(len(xlist)):
            transformed_x.append( xlist[i]*np.cos(angle) - ylist[i]*np.sin(angle) + x_cen)
            transformed_y.append( ylist[i]*np.cos(angle) + xlist[i]*np.sin(angle) + y_cen)

        transformed_x.append(transformed_x[0])
        transformed_y.append(transformed_y[0])
        return transformed_x, transformed_y

    def _is_collided(self, x_t: np.ndarray) -> bool:
        # Implement your collision checking logic
        safe_veh_width = self.vehicle_w * self.collision_safety_margin_rate
        safe_veh_len = self.vehicle_l * self.collision_safety_margin_rate

        veh_x = [0.5*safe_veh_len, 0.5*safe_veh_len, 0.5*safe_veh_len, 0, \
                 -0.5*safe_veh_len, -0.5*safe_veh_len, -0.5*safe_veh_len, 0]
        
        veh_y = [0.5*safe_veh_width, 0, -0.5*safe_veh_width, -0.5*safe_veh_width,\
                -0.5*safe_veh_width, 0, 0.5*safe_veh_width, 0.5*safe_veh_width]
        
        x, y, yaw, _ = x_t
        transform_x, transform_y = self._affine_transform(veh_x, veh_y, yaw, [x,y])

        for obstacle in self.obstacle_circles:
            for i in range(len(transform_x)):
                if ((transform_x[i] - obstacle[0])**2 + (transform_y[i]- obstacle[1])**2 < obstacle[2]**2):
                    return True
        
        return False
        

    def _F(self, x_t: np.ndarray, v_t: np.ndarray) -> np.ndarray:
        x, y, yaw, vel = x_t
        steer, accel = v_t

        wheel_base = self.wheel_base
        dt = self.delta_t
        new_x = x + vel*np.cos(yaw)*dt
        new_y = y + vel*np.sin(yaw)*dt
        new_yaw = yaw + (vel*np.tan(steer)/ wheel_base)*dt
        new_vel = vel + accel*dt

        return np.array([new_x, new_y, new_yaw, new_vel])

    def _compute_weights(self, S: np.ndarray) -> np.ndarray:

        w = np.zeros(self.K)

        rho = S.min()

        eta = 0.0

        for k in range(self.K):
            eta += np.exp( (-1.0/self.param_lambda) * (S[k]-rho) )

        # calculate weight
        for k in range(self.K):
            w[k] = (1.0 / eta) * np.exp( (-1.0/self.param_lambda) * (S[k]-rho) )
        return w
    

    def _moving_average_filter(self, xx: np.ndarray, window_size: int) -> np.ndarray:
        """apply moving average filter for smoothing input sequence
        Ref. https://zenn.dev/bluepost/articles/1b7b580ab54e95
        """
        b = np.ones(window_size)/window_size
        dim = xx.shape[1]
        xx_mean = np.zeros(xx.shape)

        for d in range(dim):
            xx_mean[:,d] = np.convolve(xx[:,d], b, mode="same")
            n_conv = math.ceil(window_size/2)
            xx_mean[0,d] *= window_size/n_conv
            for i in range(1, n_conv):
                xx_mean[i,d] *= window_size/(i+n_conv)
                xx_mean[-i,d] *= window_size/(i + n_conv - (window_size % 2)) 
        return xx_mean
    
    def calc_control_input(self, observed_x: np.ndarray) -> Tuple[float, np.ndarray]:
        # Implement your main control input calculation logic

        #load prev control 
        u = self.u_prev

        #get initial state
        x_0 = observed_x

        #get nearest waypoint and update the idx
        self._get_nearest_waypoint(x_0[0], x_0[1], update_prev_idx=True)

        #buffer to store sampled trajectory costs
        traj_costs = np.zeros(self.K)
        
        #epsilon to sample the control inputs
        epsilon = self._calc_epsilon(self.K, self.T, self.dim_u)

        #buffer to store control inputs of sampled trajectories
        traj_controls = np.zeros((self.K, self.T, self.dim_u))

        #sampling trajectories
        for k in range(self.K):
            #init state to keep updating with the trajectory inputs
            state_xt = x_0

            for dt in range(1, self.T + 1):
                
                #sampled inputs from exploration or exploitation
                if k < (1 - self.param_exploration) * self.K:
                    traj_controls[k, dt-1] = u[dt-1] + epsilon[k, dt-1]
                
                else:
                    traj_controls[k, dt-1] = epsilon[k, dt-1]

                ## update state
                #clamp u
                u_clamp = self._g(traj_controls[k, dt-1])
                state_xt = self._F(state_xt, u_clamp)

                #stage cost
                traj_costs[k] += self._c(state_xt) + self.param_gamma * u[dt-1].T @ np.linalg.inv(self.Sigma) @ traj_controls[k, dt-1]

            #terminal cost
            traj_costs[k] += self._phi(state_xt)

        #compute the IT weights
        w = self._compute_weights(traj_costs)

        w_eps = np.zeros((self.T, self.dim_u))
        
        #compute w*epsilon
        for dt in range(self.T):
            for k in range(self.K):
                w_eps[dt] += w[k] * epsilon[k, dt]
        
        #update the control inputs with w*eps
        u += self._moving_average_filter(w_eps, window_size= 10)

        #finding optimal trajectory
        optimal_traj = np.zeros((self.T, self.dim_x))

        if self.visualize_optimal_traj:
            state_xt = x_0
            for dt in range(self.T):
                #taking latest control input from the current control buffer
                optimal_u = self._g(u[dt-1])
                state_xt = self._F(state_xt, optimal_u)
                optimal_traj[dt] = state_xt
        
        sampled_traj = np.zeros((self.K, self.T, self.dim_x))
        sort_idx = np.argsort(traj_costs)
        if self.visualze_sampled_trajs:
            for k in sort_idx:
                state_xt = x_0
                for dt in range(self.T):
                    #taking controls from traj buffer
                    sampled_u = self._g(traj_controls[k, dt - 1])
                    state_xt = self._F(state_xt, sampled_u)
                    sampled_traj[k, dt] = state_xt

        #updating the control sequence to shift 1 step to the right
        self.u_prev[:-1] = u[1:]
        self.u_prev[-1] = u[-1]
        
        return u[0], u, optimal_traj, sampled_traj

In [None]:
np.diag([50.0, 50.0, 1.0, 20.0])

## Simulation

In [None]:
# simulation settings
delta_t = 0.05 # [sec]
sim_steps = 250 # [steps]
print(f"[INFO] delta_t : {delta_t:.2f}[s] , sim_steps : {sim_steps}[steps], total_sim_time : {delta_t*sim_steps:.2f}[s]")

# obstacle params
OBSTACLE_CIRCLES = np.array([
    [+ 8.0, +5.0, 5.0], # pos_x, pos_y, radius [m] in the global frame
    [+18.0, -5.0, 4.0], # pos_x, pos_y, radius [m] in the global frame
    [+ 23.0, +5.0, 2.5]
])
# OBSTACLE_CIRCLES = None
# load and visualize reference path
ref_path = np.genfromtxt('../data/ovalpath.csv', delimiter=',', skip_header=1)
plt.title("Reference Path")
plt.plot(ref_path[:,0], ref_path[:,1])
plt.show()

# initialize a vehicle as a control target
vehicle = Vehicle(
    wheel_base=2.5,
    max_steer_abs=0.523, # [rad]
    max_accel_abs=2.000, # [m/s^2]
    ref_path = ref_path[:, 0:2], # ndarray, size is <num_of_waypoints x 2>
    obstacle_circles = OBSTACLE_CIRCLES, # [obs_x, obs_y, obs_radius]
)
vehicle.reset(
    init_state = np.array([0.0, 0.0, 0.0, 0.0]), # [x[m], y[m], yaw[rad], v[m/s]]
)

# initialize a mppi controller for the vehicle
mppi = MPPIControllerForPathTracking(
    delta_t = delta_t*2.0, # [s]
    wheel_base = 2.5, # [m]
    max_steer_abs = 0.523, # [rad]
    max_accel_abs = 2.000, # [m/s^2]
    ref_path = ref_path, # ndarray, size is <num_of_waypoints x 2>
    horizon_step_T = 20, # [steps]
    number_of_samples_K = 500, # [samples]
    param_exploration = 0.05,
    param_lambda = 100.0,
    param_alpha = 0.98,
    sigma = np.array([[0.075, 0.0], [0.0, 2.0]]),
    stage_cost_weight = np.diag([50.0, 50.0, 1.0, 50.0]), # weight for [x, y, yaw, v]
    terminal_cost_weight = np.diag([50.0, 50.0, 1.0, 50.0]), # weight for [x, y, yaw, v]
    visualze_sampled_trajs = True, # if True, sampled trajectories are visualized
    obstacle_circles = OBSTACLE_CIRCLES, # [obs_x, obs_y, obs_radius]
    collision_safety_margin_rate = 1.2, # safety margin for collision check
)

# simulation loop
for i in range(sim_steps):

    # get current state of vehicle
    current_state = vehicle.get_state()

    try:
        # calculate input force with MPPI
        optimal_input, optimal_input_sequence, optimal_traj, sampled_traj_list = mppi.calc_control_input(
            observed_x = current_state
        )
    except IndexError as e:
        # the vehicle has reached the end of the reference path
        print("[ERROR] IndexError detected. Terminate simulation.")
        break

    # print current state and input force
    print(f"Time: {i*delta_t:>2.2f}[s], x={current_state[0]:>+3.3f}[m], y={current_state[1]:>+3.3f}[m], yaw={current_state[2]:>+3.3f}[rad], v={current_state[3]:>+3.3f}[m/s], steer={optimal_input[0]:>+6.2f}[rad], accel={optimal_input[1]:>+6.2f}[m/s]")

    # update states of vehicle
    vehicle.update(u=optimal_input, delta_t=delta_t, optimal_traj=optimal_traj[:, 0:2], sampled_traj_list=sampled_traj_list[:, :, 0:2])

# show animation
vehicle.show_animation(interval_ms=int(delta_t * 2000))
# save animation
vehicle.save_animation("mppi_pathtracking_obav_demo.mp4", interval=int(delta_t * 2000), movie_writer="ffmpeg") # ffmpeg is required to write mp4 file