In [15]:
from typing import Dict, List, Tuple, Type, Union
import numpy as np

import gym
import torch as th
from gym import spaces
from torch import nn

from stable_baselines3.common.preprocessing import get_flattened_obs_dim, is_image_space
from stable_baselines3.common.type_aliases import TensorDict
from stable_baselines3.common.utils import get_device


class BaseFeaturesExtractor(nn.Module):
    """
    Base class that represents a features extractor.

    :param observation_space:
    :param features_dim: Number of features extracted.
    """

    def __init__(self, observation_space: gym.Space, features_dim: int = 0) -> None:
        super().__init__()
        assert features_dim > 0
        self._observation_space = observation_space
        self._features_dim = features_dim

    @property
    def features_dim(self) -> int:
        return self._features_dim


class FlattenExtractor(BaseFeaturesExtractor):
    """
    Feature extract that flatten the input.
    Used as a placeholder when feature extraction is not needed.

    :param observation_space:
    """

    def __init__(self, observation_space: gym.Space) -> None:
        super().__init__(observation_space, get_flattened_obs_dim(observation_space))
        self.flatten = nn.Flatten()

    def forward(self, observations: th.Tensor) -> th.Tensor:
        return self.flatten(observations)


class NatureCNN(BaseFeaturesExtractor):
    """
    CNN from DQN Nature paper:
        Mnih, Volodymyr, et al.
        "Human-level control through deep reinforcement learning."
        Nature 518.7540 (2015): 529-533.

    :param observation_space:
    :param features_dim: Number of features extracted.
        This corresponds to the number of unit for the last layer.
    :param normalized_image: Whether to assume that the image is already normalized
        or not (this disables dtype and bounds checks): when True, it only checks that
        the space is a Box and has 3 dimensions.
        Otherwise, it checks that it has expected dtype (uint8) and bounds (values in [0, 255]).
    """

    def __init__(
        self,
        observation_space: spaces.Box,
        action_space,
        features_dim: int = 512,
        normalized_image: bool = False,
    ) -> None:
        super().__init__(observation_space, features_dim)
        # We assume CxHxW images (channels first)
        # Re-ordering will be done by pre-preprocessing or wrapper
        assert is_image_space(observation_space, check_channels=False, normalized_image=normalized_image), (
            "You should use NatureCNN "
            f"only with images not with {observation_space}\n"
            "(you are probably using `CnnPolicy` instead of `MlpPolicy` or `MultiInputPolicy`)\n"
            "If you are using a custom environment,\n"
            "please check it using our env checker:\n"
            "https://stable-baselines3.readthedocs.io/en/master/common/env_checker.html.\n"
            "If you are using `VecNormalize` or already normalized channel-first images "
            "you should pass `normalize_images=False`: \n"
            "https://stable-baselines3.readthedocs.io/en/master/guide/custom_env.html"
        )
        n_input_channels = observation_space.shape[0]
        self.cnn = nn.Sequential(
            nn.Conv2d(n_input_channels, 32, kernel_size=8, stride=4, padding=0),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2, padding=0),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, stride=1, padding=0),
            nn.ReLU(),
            nn.Flatten(),
        )

        # Compute shape by doing one forward pass
        with th.no_grad():
            n_flatten = self.cnn(th.as_tensor(observation_space.sample()[None]).float()).shape[1]
            print(n_flatten)

        self.linear = nn.Sequential(nn.Linear(n_flatten, features_dim), nn.ReLU())
        self.action_output = nn.Linear(features_dim, len(action_space))

    def forward(self, observations: th.Tensor) -> th.Tensor:
        # print(self.cnn(observations).flatten().shape)
        out = self.linear(self.cnn(observations).flatten())
        return self.action_output(out)

In [16]:
img_size = (64, 64)
cameraSettings = {
    # "resolution": (1920, 1080),
    "resolution": img_size,
    "fov": {"diagonal": 77},  # realsense diagonal fov is 77 degrees IIRC
    "angle": {
        "roll": 0,
        "pitch": 0,
        "yaw": 0,
    },  # don't go too crazy with these, my code should be good up to like... 45 degrees probably? But the math gets unstable
    # "angle": {"roll": 13, "pitch": 30, "yaw": 30}, # don't go too crazy with these, my code should be good up to like... 45 degrees probably? But the math gets unstable
    "height": 66,  # 8 pixels/inch - represents how high up the camera is relative to the road
}

mapParameters = {"loops": 1, "size": (6, 6), "expansions": 5, "complications": 4}

# Can also pass car parameters for max/min speed, etc
carParameters = {
    "wheelbase": 6.5,  # inches, influences how quickly the steering will turn the car.  Larger = slower
    "maxSteering": 30.0,  # degrees, extreme (+ and -) values of steering
    "steeringOffset": 0.0,  # degrees, since the car is rarely perfectly aligned
    "minVelocity": 0.0,  # pixels/second, slower than this doesn't move at all.
    "maxVelocity": 480.0,  # pixels/second, 8 pixels/inch, so if the car can move 5 fps that gives us 480 pixels/s top speed
}

# taken from https://github.com/DLR-RM/rl-baselines3-zoo/blob/master/hyperparams/dqn.yml
config = {
    "n_timesteps": 1e6,  # sb3 dqn runs go up to 1e7 at most
    "policy": "CnnPolicy",
    "env": "CustomDuckieTown",
    "actions": [-30,-15, 0, 15, 30],
    "camera_settings": cameraSettings,
    "map_parameters": mapParameters,
    "car_parameters": carParameters,
    "learning_rate": 1e-4,
    "batch_size": 32,
    "buffer_size": 100000,
    "learning_starts": 100000,
    "gamma": 0.99,
    "target_update_interval": 1000,
    "train_freq": 4,
    "gradient_steps": 1,
    "exploration_fraction": 0.1,
    "exploration_final_eps": 0.01,
}

In [17]:
import torch


N_DISCRETE_ACTIONS = len(config['actions'])
action_space = spaces.Discrete(N_DISCRETE_ACTIONS)

N_CHANNELS = 3
(HEIGHT, WIDTH) = cameraSettings["resolution"]
observation_space = spaces.Box(
    low=0, high=1, shape=(N_CHANNELS, HEIGHT, WIDTH), dtype=np.uint8
)
#!had to change channels to beginning!!! it was at the end before

model = NatureCNN(observation_space, config['actions'], normalized_image=True)

# print(torch.load("./CUSTOM_SAVE.pt").keys())
model.load_state_dict(torch.load("./CUSTOM_SAVE.pt"))

print(model)


1024
NatureCNN(
  (cnn): Sequential(
    (0): Conv2d(3, 32, kernel_size=(8, 8), stride=(4, 4))
    (1): ReLU()
    (2): Conv2d(32, 64, kernel_size=(4, 4), stride=(2, 2))
    (3): ReLU()
    (4): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1))
    (5): ReLU()
    (6): Flatten(start_dim=1, end_dim=-1)
  )
  (linear): Sequential(
    (0): Linear(in_features=1024, out_features=512, bias=True)
    (1): ReLU()
  )
  (action_output): Linear(in_features=512, out_features=5, bias=True)
)


In [18]:
from CustomDuckieTownEnv import CustomDuckieTownSim
from stable_baselines3.common.monitor import Monitor

def make_env(display, config):
    env = CustomDuckieTownSim(
        config["camera_settings"],
        config["map_parameters"],
        config["car_parameters"],
        config["actions"],
        display,
    )
    env = Monitor(env)  # record stats such as returns
    return env

In [23]:
env = make_env(True, config)
obs = env.reset()
# print(torch.from_numpy(obs).shape)
while True:
    print((torch.from_numpy(obs/255).float()).shape)
    action = model(torch.from_numpy(obs/255).float()).max(0)[1].view(1,1)
    print(config['actions'][action])
    obs, reward, done, info = env.step(action)
    if done:
        obs = env.reset()

torch.Size([3, 64, 64])


: 

: 