In [28]:
# Install environment and visualization dependencies
!pip install git+https://github.com/eleurent/highway-env#egg=highway-env  > /dev/null 2>&1
!pip install gym pyvirtualdisplay > /dev/null 2>&1
!apt-get install -y xvfb python-opengl ffmpeg > /dev/null 2>&1

In [29]:
!sudo apt-get install xvfb

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
xvfb is already the newest version (2:21.1.4-2ubuntu1.7~22.04.11).
0 upgraded, 0 newly installed, 0 to remove and 49 not upgraded.


In [30]:
!pip install swig



In [31]:
!pip install gym[box2d]



In [1]:
!pip install highway-env



In [2]:
# Environment
import gymnasium as gym
import highway_env

# Models and computation
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from collections import namedtuple
# torch.set_default_tensor_type("torch.cuda.FloatTensor")

# Visualization
import matplotlib
import matplotlib.pyplot as plt
%matplotlib inline
from tqdm import tnrange
from IPython import display as ipythondisplay
from pyvirtualdisplay import Display
from gym.wrappers.record_video import RecordVideo
import base64

# IO
from pathlib import Path

In [3]:
# useful visualization of the episodes:

display = Display(visible=0, size=(1400, 900))
display.start()

def show_videos(path="video"):
    html = []
    for mp4 in Path(path).glob("*.mp4"):
        video_b64 = base64.b64encode(mp4.read_bytes())
        html.append('''<video alt="{}" autoplay
                      loop controls style="height: 400px;">
                      <source src="data:video/mp4;base64,{}" type="video/mp4" />
                 </video>'''.format(mp4, video_b64.decode('ascii')))
    ipythondisplay.display(ipythondisplay.HTML(data="<br>".join(html)))

In [5]:
# env = gym.make("CarRacing")
# env = gym.make("parking-v0")
# env = gym.make("Pendulum-v1")
# env = gym.make("CartPole-v1")

gym.register_envs(highway_env)

env = gym.make("highway-v0", render_mode="human")
env = RecordVideo(env, './video', episode_trigger = lambda episode_number: True)
env.reset()
done = False
while not done:
  action = env.action_space.sample()
  obs, reward, done, info = env.step(action)
env.close()
show_videos()

  and should_run_async(code)
  deprecation(
  logger.warn(
  logger.warn(


In [36]:
!pip install gym-parking

[31mERROR: Could not find a version that satisfies the requirement gym-parking (from versions: none)[0m[31m
[0m[31mERROR: No matching distribution found for gym-parking[0m[31m
[0m

In [6]:
print("Observation format:", obs)

Observation format: [[ 1.0000000e+00  1.0000000e+00  4.9913275e-01  1.7235106e-01
  -3.3384189e-04]
 [ 1.0000000e+00  2.5009662e-02  7.6147559e-04 -4.2023949e-02
   3.3384189e-04]
 [ 1.0000000e+00 -2.7061906e-02 -4.9913275e-01  6.5367825e-02
   3.3384189e-04]
 [ 1.0000000e+00  2.0049107e-01  2.5086725e-01  9.5239505e-02
   3.3384189e-04]
 [ 1.0000000e+00  2.5919056e-01 -2.4913275e-01  8.8559426e-02
   3.3384189e-04]]


## Experience Collection

In [7]:
Transition = namedtuple('Transition', ['state', 'action', 'next_state'])

def collect_interaction_data(env, size):
  transitions = []

  # how many types to repeat a samples action to collect experience (for exploration)
  action_repeat = 1
  # get initial state
  state = torch.tensor(env.reset(), dtype=torch.float64)

  for _ in range(size):
    # sample random action from env space, action must be a torch.tensor
    action = torch.tensor(env.action_space.sample(), dtype=torch.float64)

    for _ in range(action_repeat):
      next_state, _, done, _ = env.step(action.numpy())
      # state must be torch.tensor
      next_state = torch.tensor(next_state, dtype=torch.float64)

      # append this experience into defined tuple
      transitions.append(Transition(state, action, next_state))

      # set next_state into as current state for next iteration
      state = next_state

      if done:
        state = torch.tensor(env.reset(), dtype=torch.float64)
        break

  return transitions

  raise NotImplementedError

data_size = 1000
data = collect_interaction_data(env, size=data_size)

# the code below will check that collect_interaction_data implementation is correct-ish
assert isinstance(data, list) and len(data) == data_size, "return value should be a list of length data_size"
assert isinstance(data[0], Transition), "return value should be a list whose elements are Transition tuples"
assert all([isinstance(field, torch.Tensor) and field.dtype == torch.float64 for field in data[0]]), "Transition tuples should contain torch tensors whose types are float64"

# print the first transition
print("Sample transition:", data[0])

  logger.warn(
  state = torch.tensor(env.reset(), dtype=torch.float64)


ValueError: expected sequence of length 5 at dim 1 (got 4)

## Build dynamics model

In [25]:
'''
In this dynamics model, we are computing two matrices:
  A:  captures how the current state affects the state evolution
  B:  captures how the current actions affect the state evolution
'''
class DynamicsModel(nn.Module):
    STATE_X = 0
    STATE_Y = 1

    def __init__(self, state_size, action_size, hidden_size, dt):
        super().__init__()
        self.state_size, self.action_size, self.dt = state_size, action_size, dt
        A_size, B_size = state_size * state_size, state_size * action_size
        self.A1 = nn.Linear(state_size + action_size, hidden_size)
        self.A2 = nn.Linear(hidden_size, A_size)
        self.B1 = nn.Linear(state_size + action_size, hidden_size)
        self.B2 = nn.Linear(hidden_size, B_size)

    def forward(self, x, u):
      '''
      S is state size, A is action, N is batch size
      given:
        x: batch of states of shape (N, S)
        u: batch of actions of shape (N, A)

      return:
        tensor of predicted next state of shape (N, S)
      '''
      tensor_xu = torch.cat([x, u], dim=1)

      # compute A and B matrix with two connected networks with ReLU activations
      A = F.relu(self.A1(tensor_xu))
      A = self.A2(A)
      B = F.relu(self.B1(tensor_xu))
      B = self.B2(B)

      # reshape into tensors withs ize (N, S, S) and (N, S, A) to match dimensions required for matrix multiplication
      N = x.shape[0] # batch size
      A = A.view(N, self.state_size, self.state_size)
      B = B.view(N, self.state_size, self.action_size)

      # compute derivatives
      x_dot = A @ x.unsqueeze(-1) + B @ u.unsqueeze(-1)
      x_dot = x_dot.squeeze(-1)

      # compute integrals to obtain the next states x_{t+1}
      x_next = x + self.dt * x_dot

      # return next states of shape (N, S)
      return x_next

      raise NotImplementedError

dynamics = DynamicsModel(state_size=env.observation_space.spaces["observation"].shape[0],
                         action_size=env.action_space.shape[0],
                         hidden_size=16,
                         dt=1/env.unwrapped.config["policy_frequency"])

#  Forward a sample transition.
#  unqueeze(0) is used to generate a batch of 1 element, by adding a new batch dimension of size 1.
state, action = data[0].state.unsqueeze(0), data[0].action.unsqueeze(0)
next_state = dynamics(state, action).detach()  # detach() is used here because gradients are unnecessary
assert state.shape == next_state.shape
assert not torch.equal(next_state, state)
print("Forward initial model on a sample transition:", next_state)

AttributeError: 'Box' object has no attribute 'spaces'