<a href="https://colab.research.google.com/github/gupann/CS269-Parking/blob/anmol-parallel-parking-static-obstacles-env/parallelparking_model_based.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Model-Based Reinforcement Learning

## Parallel Parking
parking-parallel-dynObs-v0

In [None]:
# Models and computation
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from collections import namedtuple

# Visualization
import matplotlib.pyplot as plt
%matplotlib inline

# deleting old videos
import shutil
import os

folder_path = r"/home/aayush_wsl/cs269_rl_parking/CS269-Parking/videos"

if os.path.exists(folder_path):
    shutil.rmtree(folder_path)
    print("Video Folder deleted.")
else:
    print("Video Folder does not exist.")

We also define a simple helper function for visualization of episodes:

In [None]:
import sys
from tqdm import trange # from tqdm.notebook import trange
# !pip install tensorboardx gym pyvirtualdisplay
# !apt-get install -y xvfb ffmpeg
# !git clone https://github.com/gupann/CS269-Parking.git 2> /dev/null
sys.path.insert(0, '/home/aayush_wsl/cs269_rl_parking/CS269-Parking/scripts')
from utils import record_videos, show_videos

In [None]:
# Environment
import gymnasium as gym
import highway_env

gym.register_envs(highway_env)

### Let's try it!

Make the environment, and run an episode with random actions:

Parallel Parking

In [None]:
# Configuration for parallel parking with custom dimensions
import numpy as np

parking_config = {
    # ===== PARKING SLOT DIMENSIONS =====
    "street_length": 80.0,      # Total length of street (default: 60.0)
                                # Increasing this makes parking spots wider
    
    "n_slots": 6,               # Number of slots per side (default: 8)
                                # Decreasing this makes parking spots wider
    
    # NOTE: Parking slot width = (street_length - 2*5.0) / n_slots
    # With above values: (80 - 10) / 6 = 11.67m per slot
    # Default: (60 - 10) / 8 = 6.25m per slot
    
    # ===== LANE DIMENSIONS =====
    "lane_width": 30.0,         # Width of central driving lane (default: 10.0)
                                # This is the vertical space for ego to maneuver
    
    "curb_offset": 15.0,        # Distance from center to parking rows (default: 10.0)
                                # Increase this to move parking spots farther from center
    
    # ===== GOAL/STARTING POSITIONS =====
    "empty_slot_index": 3,      # Which bottom slot is empty (0-based)
    
    # ===== ENVIRONMENT BOUNDARIES =====
    "wall_margin": 4.0,         # Extra space outside parking area
    "add_walls": True,          # Yellow boundary walls
    
    # ===== SIMULATION PARAMETERS =====
    "observation": {
        "type": "KinematicsGoal",
        "features": ["x", "y", "vx", "vy", "cos_h", "sin_h"],
        "scales": [100, 100, 5, 5, 1, 1],
        "normalize": False,
    },
    "action": {"type": "ContinuousAction"},
    "reward_weights": [1, 1, 0, 0, 0.2, 0.2],
    "success_goal_reward": 0.10, #Changed from 0.12 to 0.10 (Tighter)
    "collision_reward": -5,
    "steering_range": np.deg2rad(45),
    "simulation_frequency": 15,
    "policy_frequency": 5,
    "duration": 100,
    "screen_width": 800,        # Wider screen for wider environment
    "screen_height": 400,       # Taller screen for wider lane
    "centering_position": [0.5, 0.5],
    "scaling": 5,               # Adjust zoom level (lower = more zoomed out)
    "show_trajectories": False,
    "controlled_vehicles": 1,
    "offscreen_rendering": False,
    "manual_control": False,
    "real_time_rendering": False,
}

# Calculate and display the resulting dimensions
slot_width = (parking_config["street_length"] - 10.0) / parking_config["n_slots"]
print(f"üìè Configuration Summary:")
print(f"  ‚Ä¢ Driving lane width: {parking_config['lane_width']} m")
print(f"  ‚Ä¢ Parking slot width: {slot_width:.2f} m")
print(f"  ‚Ä¢ Number of slots per row: {parking_config['n_slots']}")
print(f"  ‚Ä¢ Total street length: {parking_config['street_length']} m")

In [None]:
# Test the dynamic obstacle environment
import gymnasium as gym
import highway_env

gym.register_envs(highway_env)

# Create environment
env = gym.make("parking-parallel-dynObs-v0", render_mode="rgb_array", config=parking_config)
env = record_videos(env)
obs, info = env.reset()

# Run a short episode to see the moving obstacle
for step in range(100):
    action = env.action_space.sample()  # Random action for testing
    obs, reward, done, truncated, info = env.step(action)
    
    # Print positions every 20 steps
    if step % 20 == 0:
        ego_pos = env.unwrapped.vehicle.position
        moving_obs = env.unwrapped.road.vehicles[1]  # The moving obstacle
        print(f"Step {step}:")
        print(f"  Ego position: {ego_pos}")
        print(f"  Moving obstacle position: {moving_obs.position}")
        print(f"  Moving obstacle speed: {moving_obs.speed} m/s")
    
    if done or truncated:
        break

env.close()
show_videos()
print("\n‚úÖ Dynamic obstacle is working!")

In [None]:
env = gym.make("parking-parallel-dynObs-v0", render_mode="rgb_array", config=parking_config)
env = record_videos(env)
env.reset()
done = False
while not done:
    action = env.action_space.sample()
    obs, reward, done, truncated, info = env.step(action)
env.close()
show_videos()

## Studying the environment, action space, reward  
1.) How and what modifications can be made to the environment  
2.) Figuring out how the action space looks like  
3.) How can I change the reward definition  
4.) How is collision detected in the environment  
5.) How is goal position specified in the environment  

In [None]:
print(env.unwrapped.config["observation"])

The environment is a `GoalEnv`, which means the agent receives a dictionary containing both the current `observation` and the `desired_goal` that conditions its policy.

In [None]:
print("Observation format:", obs)

There is also an `achieved_goal` that won't be useful here (it only serves when the state and goal spaces are different, as a projection from the observation to the goal space).

Alright! We are now ready to apply the model-based reinforcement learning paradigm.

## Experience collection
First, we randomly interact with the environment to produce a batch of experiences

$$D = \{s_t, a_t, s_{t+1}\}_{t\in[1,N]}$$

In [None]:
Transition = namedtuple('Transition', ['state', 'action', 'next_state'])

def collect_interaction_data(env, size=1000, action_repeat=2):
    data, done = [], True
    for _ in trange(size, desc="Collecting interaction data"):
        action = env.action_space.sample()
        for _ in range(action_repeat):
            if done:
              previous_obs, info = env.reset()
            obs, reward, done, truncated, info = env.step(action)
            data.append(Transition(torch.Tensor(previous_obs["observation"]),
                                   torch.Tensor(action),
                                   torch.Tensor(obs["observation"])))
            previous_obs = obs
    return data

env = gym.make("parking-parallel-dynObs-v0", render_mode="rgb_array", config=parking_config)
data = collect_interaction_data(env)
print("Sample transition:", data[0])

## Build a dynamics model

We now design a model to represent the system dynamics. We choose  a **structured model** inspired from *Linear Time-Invariant (LTI) systems*

$$\dot{x} = f_\theta(x, u) = A_\theta(x, u)x + B_\theta(x, u)u$$

where the $(x, u)$ notation comes from the Control Theory community and stands for the state and action $(s,a)$. Intuitively, we learn at each point $(x_t, u_t)$ the **linearization** of the true dynamics $f$ with respect to $(x, u)$.

We parametrize $A_\theta$ and $B_\theta$ as two fully-connected networks with one hidden layer.


In [None]:
class DynamicsModel(nn.Module):
    STATE_X = 0
    STATE_Y = 1

    def __init__(self, state_size, action_size, hidden_size, dt):
        super().__init__()
        self.state_size, self.action_size, self.dt = state_size, action_size, dt
        A_size, B_size = state_size * state_size, state_size * action_size
        self.A1 = nn.Linear(state_size + action_size, hidden_size)
        self.A2 = nn.Linear(hidden_size, A_size)
        self.B1 = nn.Linear(state_size + action_size, hidden_size)
        self.B2 = nn.Linear(hidden_size, B_size)

    def forward(self, x, u):
        """
            Predict x_{t+1} = f(x_t, u_t)
        :param x: a batch of states
        :param u: a batch of actions
        """
        xu = torch.cat((x, u), -1)
        xu[:, self.STATE_X:self.STATE_Y+1] = 0  # Remove dependency in (x,y)
        A = self.A2(F.relu(self.A1(xu)))
        A = torch.reshape(A, (x.shape[0], self.state_size, self.state_size))
        B = self.B2(F.relu(self.B1(xu)))
        B = torch.reshape(B, (x.shape[0], self.state_size, self.action_size))
        dx = A @ x.unsqueeze(-1) + B @ u.unsqueeze(-1)
        return x + dx.squeeze()*self.dt


dynamics = DynamicsModel(state_size=env.observation_space.spaces["observation"].shape[0],
                         action_size=env.action_space.shape[0],
                         hidden_size=64,
                         dt=1/env.unwrapped.config["policy_frequency"])
print("Forward initial model on a sample transition:",
      dynamics(data[0].state.unsqueeze(0), data[0].action.unsqueeze(0)).detach())

## Fit the model on data
We can now train our model $f_\theta$ in a supervised fashion to minimize an MSE loss $L^2(f_\theta; D)$ over our experience batch $D$ by stochastic gradient descent:

$$L^2(f_\theta; D) = \frac{1}{|D|}\sum_{s_t,a_t,s_{t+1}\in D}||s_{t+1}- f_\theta(s_t, a_t)||^2$$

In [None]:
optimizer = torch.optim.Adam(dynamics.parameters(), lr=0.01)

# Split dataset into training and validation
train_ratio = 0.7
train_data, validation_data = data[:int(train_ratio * len(data))], data[int(train_ratio * len(data)):]

def compute_loss(model, data_t, loss_func = torch.nn.MSELoss()):
    states, actions, next_states = data_t
    predictions = model(states, actions)
    return loss_func(predictions, next_states)

def transpose_batch(batch):
    return Transition(*map(torch.stack, zip(*batch)))

def train(model, train_data, validation_data, epochs=1500):
    train_data_t = transpose_batch(train_data)
    validation_data_t = transpose_batch(validation_data)
    losses = np.full((epochs, 2), np.nan)
    for epoch in trange(epochs, desc="Train dynamics"):
        # Compute loss gradient and step optimizer
        loss = compute_loss(model, train_data_t)
        validation_loss = compute_loss(model, validation_data_t)
        losses[epoch] = [loss.detach().numpy(), validation_loss.detach().numpy()]
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    # Plot losses
    plt.plot(losses)
    plt.yscale("log")
    plt.xlabel("epochs")
    plt.ylabel("loss")
    plt.legend(["train", "validation"])
    plt.show()

train(dynamics, data, validation_data)

## Visualize trained dynamics

In order to qualitatively evaluate our model, we can choose some values of steering angle *(right, center, left)* and acceleration *(slow, fast)* in order to predict and visualize the corresponding trajectories from an initial state.  


In [None]:
def predict_trajectory(state, actions, model, action_repeat=1):
    states = []
    for action in actions:
        for _ in range(action_repeat):
            state = model(state, action)
            states.append(state)
    return torch.stack(states, dim=0)

def plot_trajectory(states, color):
    scales = np.array(env.unwrapped.config["observation"]["scales"])
    states = np.clip(states.squeeze(1).detach().numpy() * scales, -100, 100)
    plt.plot(states[:, 0], states[:, 1], color=color, marker='.')
    plt.arrow(states[-1,0], states[-1,1], states[-1,4]*1, states[-1,5]*1, color=color)

def visualize_trajectories(model, state, horizon=15):
    plt.cla()
    # Draw a car
    plt.plot(state.numpy()[0]+2.5*np.array([-1, -1, 1, 1, -1]),
             state.numpy()[1]+1.0*np.array([-1, 1, 1, -1, -1]), 'k')
    # Draw trajectories
    state = state.unsqueeze(0)
    colors = iter(plt.get_cmap("tab20").colors)
    # Generate commands
    for steering in np.linspace(-0.5, 0.5, 3):
        for acceleration in np.linspace(0.8, 0.4, 2):
            actions = torch.Tensor([acceleration, steering]).view(1,1,-1)
            # Predict trajectories
            states = predict_trajectory(state, actions, model, action_repeat=horizon)
            plot_trajectory(states, color=next(colors))
    plt.axis("equal")
    plt.show()

visualize_trajectories(dynamics, state=torch.Tensor([0, 0, 0, 0, 1, 0]))

## Reward model
We assume that the reward $R(s,a)$ is known (chosen by the system designer), and takes the form of a **weighted L1-norm** between the state and the goal.

In [None]:
def reward_model(states, goal, gamma=None):
    """
        The reward is a weighted L1-norm between the state and a goal
    :param Tensor states: a batch of states. shape: [batch_size, state_size].
    :param Tensor goal: a goal state. shape: [state_size].
    :param float gamma: a discount factor
    """
    goal = goal.expand(states.shape)
    reward_weigths = torch.Tensor(env.unwrapped.config["reward_weights"])
    rewards = -torch.pow(torch.norm((states-goal)*reward_weigths, p=1, dim=-1), 0.5)
    if gamma:
        time = torch.arange(rewards.shape[0], dtype=torch.float).unsqueeze(-1).expand(rewards.shape)
        rewards *= torch.pow(gamma, time)
    return rewards

obs, info = env.reset()
print("Reward of a sample transition:", reward_model(torch.Tensor(obs["observation"]).unsqueeze(0),
                                                     torch.Tensor(obs["desired_goal"])))
print(obs["desired_goal"])                                                     

## Leverage dynamics model for planning

We now use the learnt dynamics model $f_\theta$ for planning.
In order to solve the optimal control problem, we use a sampling-based optimization algorithm: the **Cross-Entropy Method** (`CEM`). It is an optimization algorithm applicable to problems that are both **combinatorial** and **continuous**, which is our case: find the best performing sequence of actions.

This method approximates the optimal importance sampling estimator by repeating two phases:
1. **Draw samples** from a probability distribution. We use Gaussian distributions over sequences of actions.
2. Minimize the **cross-entropy** between this distribution and a **target distribution** to produce a better sample in the next iteration. We define this target distribution by selecting the top-k performing sampled sequences.

![Credits to Olivier Sigaud](https://github.com/yfletberliac/rlss2019-hands-on/blob/master/imgs/cem.png?raw=1)

Note that as we have a local linear dynamics model, we could instead choose an `Iterative LQR` planner which would be more efficient. We prefer `CEM` in this educational setting for its simplicity and generality.

In [None]:
def cem_planner(state, goal, action_size, horizon=5, population=100, selection=10, iterations=5):
    state = state.expand(population, -1)
    action_mean = torch.zeros(horizon, 1, action_size)
    action_std = torch.ones(horizon, 1, action_size)
    for _ in range(iterations):
        # 1. Draw sample sequences of actions from a normal distribution
        actions = torch.normal(mean=action_mean.repeat(1, population, 1), std=action_std.repeat(1, population, 1))
        actions = torch.clamp(actions, min=env.action_space.low.min(), max=env.action_space.high.max())
        states = predict_trajectory(state, actions, dynamics, action_repeat=5)
        # 2. Fit the distribution to the top-k performing sequences
        returns = reward_model(states, goal).sum(dim=0)
        _, best = returns.topk(selection, largest=True, sorted=False)
        best_actions = actions[:, best, :]
        action_mean = best_actions.mean(dim=1, keepdim=True)
        action_std = best_actions.std(dim=1, unbiased=False, keepdim=True)
    return action_mean[0].squeeze(dim=0)


# Run the planner on a sample transition
action = cem_planner(torch.Tensor(obs["observation"]),
                     torch.Tensor(obs["desired_goal"]),
                     env.action_space.shape[0])
print("Planned action:", action)

## Quick Diagnostic Script  
1.) To address issues like non-moving ego vehicle

In [None]:
print("="*60)
print("DIAGNOSTIC CHECK")
print("="*60)

# 1. Check if model was trained
print("\n1. Model parameters (first few):")
print(list(dynamics.parameters())[0][0, :5])
# If all zeros or very small, model not trained!

# 2. Check action space
print("\n2. Action space:")
print(f"   Low: {env.action_space.low}")
print(f"   High: {env.action_space.high}")

# 3. Test model prediction
obs, info = env.reset()
test_action = torch.Tensor([0.5, 0.2])
current = torch.Tensor(obs["observation"])
predicted = dynamics(current.unsqueeze(0), test_action.unsqueeze(0))
print("\n3. Model prediction test:")
print(f"   Current position: {current[:2].numpy()}")
print(f"   Predicted position: {predicted.squeeze()[:2].detach().numpy()}")
print(f"   Position change: {(predicted.squeeze()[:2] - current[:2]).detach().numpy()}")

# 4. Test CEM planner
print("\n4. CEM planner test:")
action = cem_planner(current, torch.Tensor(obs["desired_goal"]), 
                     env.action_space.shape[0])
print(f"   Planned action: {action.numpy()}")
print(f"   Action magnitude: {torch.norm(action).item()}")

print("\n" + "="*60)

In [None]:
print(env.unwrapped.config["reward_weights"])

## Visualize a few episodes

En voiture, Simone !

In [None]:
env = gym.make("parking-parallel-dynObs-v0", render_mode='rgb_array', config=parking_config)
env = record_videos(env)
obs, info = env.reset()

for step in trange(3*env.unwrapped.config["duration"], desc="Testing 3 episodes..."):
    action = cem_planner(torch.Tensor(obs["observation"]),
                         torch.Tensor(obs["desired_goal"]),
                         env.action_space.shape[0])
    obs, reward, done, truncated, info = env.step(action.numpy())
    if done or truncated:
        obs, info = env.reset()
env.close()
show_videos()

In [18]:
#Analysis of the results
print('Desired Goal: ', obs['desired_goal'])
print('Observed Goal: ', obs['observation'][:2])
print('Action Taken: ', action)
print('Reward: ', reward)
print('Done: ', done)
print('Truncated: ', truncated)
print('Info: ', info)

Desired Goal:  [ 0.45833333 -0.15        0.          0.          1.          0.        ]
Observed Goal:  [0.22202574 0.04363642]
Action Taken:  tensor([0.7805, 0.9334])
Reward:  -0.8069012728880963
Done:  False
Truncated:  False
Info:  {'speed': 7.840243625640871, 'crashed': False, 'action': array([0.7805107, 0.9333933], dtype=float32), 'is_success': np.False_}
