In [None]:
#@title MPPI
!pip install git+https://gitlab.com/neu-autonomy/gym-neu-racing.git

import gymnasium
import numpy as np
import gym_neu_racing
from gymnasium import spaces
from gym_neu_racing.envs.wrappers import StateFeedbackWrapper
import matplotlib.pyplot as plt
from typing import Callable
import matplotlib.cm as cmx
import matplotlib.colors as colors
from gym_neu_racing import motion_models
import cvxpy as cp

class MPPIRacetrack:
    def __init__(
        self,
        static_map,
        motion_model=motion_models.Unicycle(),
        sigma = 3,
        lmbd = 10,
        num_samples = 70,
        timesteps = 10,

    ):
        """ Your implementation here """
        self.static_map = static_map
        self.motion_model = motion_model
        self.sigma = sigma
        self.lmbd = lmbd
        self.num_samples = num_samples
        self.timesteps = timesteps
        self.u = np.zeros((self.timesteps, 2))
        self.count = 0


    def clip(self, velocity, angular):
      velocity = np.clip(velocity, self.motion_model.action_space.low[0], self.motion_model.action_space.high[0])
      angular = np.clip(angular, self.motion_model.action_space.low[1], self.motion_model.action_space.high[1])

      return np.array([velocity, angular])

    def cost(self, current_state, next_state, cur_action):
      coord, in_map = self.static_map.world_coordinates_to_map_indices(next_state[:2])
      if self.static_map.static_map[coord[0], coord[1]]:
        return 100

      # current_state = current_state - 0.6
      d = np.abs( ( (next_state[0] - 0.6) / 3.3)**2 + ( (next_state[1] - 0.7) / 3.3)**2 - 1 )
      cost = 60 * d**2 + (cur_action[0] - 2)**2

      return cost

    def plot(self, initial_state, goal_pos, rollout):
      plt.axis('equal')
      rollout = np.array(rollout)
      plt.plot(initial_state[0], initial_state[1], 'o')
      plt.plot(goal_pos[0], goal_pos[1], 'x')
      plt.plot(rollout[:, 0], rollout[:, 1])

      # plt.show()

    def get_action(self, initial_state: np.ndarray):
      """ Your implementation here """
      #set nominal control for unicycle, vx, vw
      self.count += 1
      u = self.u.copy()

      #sample perturbations from normal dist
      perturbations = np.random.normal(0, self.sigma, size = (self.num_samples, self.timesteps, 2))

      weights = np.zeros((self.num_samples, 1))
      rollouts = []
      #for every sample
      for i, perturbation in enumerate(perturbations):
        cur_state = initial_state
        cur_rollout = [initial_state]
        cost = 0
        #for every time step
        for t in range(self.timesteps):
          cur_action = u[t] + perturbation[t]
          cur_action = self.clip(cur_action[0], cur_action[1])
          next_state = self.motion_model.step(cur_state, cur_action)
          cur_rollout.append(next_state)
          cost += self.cost(cur_state, next_state, cur_action)
          cur_state = next_state

        # self.plot(initial_state, goal_pos, cur_rollout)
        rollouts.append(cur_rollout)
        weights[i] = np.exp(-(1/self.lmbd) * cost)

      weights = weights / np.sum(weights)
      for i in range(self.timesteps):
        u[i] = u[i] + np.sum(weights * perturbations[:, i, :], axis = 0)

      action = u[0]
      action[0] = 1

      self.u = u

      return rollouts, self.u.copy()

In [None]:
#@title Iterative LQG
class IterativeLQG:
  def __init__(self):
    pass


In [None]:
#@title Tube MPC
class TubeMPC:
    def __init__(self):
      self.nominal = MPPIRaceTrack()
      self.z = np.zeros(3)
      self.ancillary = None
      pass

    def solve_ancillary(self):
      """The ancillary controller finds the control seq which minimizes the
      difference between the uncertain state and the nominal state"""

      pass

    def solve_nominal(self):
      rollouts, controls = self.mppi.get_action(self.z)

      return rollouts, controls

    def solve(self):
      #current state of uncertain system
      x = None
      #current state of nomial system
      z = None

      #solve the nominal system and get the states and controls
      rollouts, controls = self.solve_nominal()

      #solve ancillary problem and obtain control
      self.solve_ancillary()

      #apply control found from solving ancillary problem to the uncertain system
      #measure the successor state

      #simulate the next state of nominal system

      #repeat

In [None]:
#@title Run Racetrack
def run_planner_on_racetrack(
    env: gymnasium.Env,
    planner_class=MPPIRacetrack,
    seed: int = 0,
    num_laps: int = 3,
) -> int:

    np.random.seed(seed)
    obs, _ = env.reset()
    env.unwrapped.laps_left = num_laps

    # Create an instance of your planner
    planner = planner_class(static_map=env.unwrapped.map)

    # Draw a map of the environment with the finish line + initial position
    ax = env.unwrapped.map.draw_map(show=False)
    ax.plot(
        env.unwrapped.finish_line[:, 0],
        env.unwrapped.finish_line[:, 1],
        "g",
        lw=3,
    )
    ax.plot(obs[0], obs[1], "rx")

    # Run the environment for num_timesteps, unless the robot hits an obstacle
    # or successfully completes the number of laps needed
    num_timesteps = 500
    success = False
    for t in range(num_timesteps):
      if t % 50 == 0:
        print("Time step", t)
      action = planner.get_action(obs)
      obs, _, terminated, _, _ = env.step(action)

      ax.plot(obs[0], obs[1], "bx")

      if terminated:
          success = True
          break

    num_timesteps_used = t

    plt.show()

    if success:
        return num_timesteps_used
    else:
        return -1


# seed = 0
seed = np.random.randint(0, 3000)
num_laps = 3
planner_class = MPPIRacetrack
num_timesteps_used = run_planner_on_racetrack(
    env, planner_class=planner_class, seed=seed, num_laps=num_laps
)
print(f"num timesteps used: {num_timesteps_used}")