In [6]:
import gymnasium as gym
import numpy as np
import torch
from torch import nn
import random
from collections import deque
import torch.optim as optim
import torch.nn.functional as F
from collections import namedtuple
import math

In [12]:
env = gym.make("highway-v0", render_mode="rgb_array")

config = {
    "observation": {
        "type": "OccupancyGrid",
        "vehicles_count": 10,
        "features": ["presence", "x", "y", "vx", "vy", "cos_h", "sin_h"],
        "features_range": {
            "x": [-100, 100],
            "y": [-100, 100],
            "vx": [-20, 20],
            "vy": [-20, 20],
        },
        "grid_size": [[-20, 20], [-20, 20]],
        "grid_step": [5, 5],
        "absolute": False,
    },
    "action": {
        "type": "DiscreteAction",
    },
    "lanes_count": 3,
    "vehicles_count": 10,
    "duration": 20,  # [s]
    "initial_spacing": 0,
    "collision_reward": -1,  # The reward received when colliding with a vehicle.
    "right_lane_reward": 0.5,  # The reward received when driving on the right-most lanes, linearly mapped to
    # zero for other lanes.
    "high_speed_reward": 0.1,  # The reward received when driving at full speed, linearly mapped to zero for
    # lower speeds according to config["reward_speed_range"].
    "lane_change_reward": 0,
    "reward_speed_range": [
        20,
        30,
    ],  # [m/s] The reward for high speed is mapped linearly from this range to [0, HighwayEnv.HIGH_SPEED_REWARD].
    "simulation_frequency": 5,  # [Hz]
    "policy_frequency": 1,  # [Hz]
    "other_vehicles_type": "highway_env.vehicle.behavior.IDMVehicle",
    "screen_width": 600,  # [px]
    "screen_height": 150,  # [px]
    "centering_position": [0.3, 0.5],
    "scaling": 5.5,
    "show_trajectories": True,
    "render_agent": True,
    "offscreen_rendering": False,
    "disable_collision_checks": True,
}


NameNotFound: Environment `highway` doesn't exist.

In [3]:
env.unwrapped.configure(config)
env.reset()

(array([[[0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 1., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0.]],
 
        [[0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0.]],
 
        [[0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0.],
     

In [3]:
candidate_actions = []

for steering in np.linspace(-0.5, 0.5, 3):
    for acceleration in np.linspace(0.8, 0.4, 3):
        candidate_actions.append(torch.Tensor([acceleration, steering]))

In [4]:

class DQN(nn.Module):
    def __init__ (self, state_size, hidden_size, action_size):
        super(DQN, self).__init__()
        self.layer1 = nn.Linear(state_size, 128)
        self.layer2 = nn.Linear(128, 128)
        self.layer3 = nn.Linear(128, action_size)

        # Called with either one element to determine next action, or a batch
        # during optimization. Returns tensor([[left0exp,right0exp]...]).
    def forward(self, x):
        x = torch.flatten(x, start_dim=1)
        x = F.relu(self.layer1(x))
        x = F.relu(self.layer2(x))
        return self.layer3(x)

In [5]:
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward'))

In [6]:

class ReplayMemory (object) : 
    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = []
        self.position = 0

    def push(self, *args):
        if len(self.memory) < self.capacity:
            self.memory.append(None)
        self.memory[self.position] = Transition(*args)
        self.position = (self.position + 1) % self.capacity

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

In [8]:
# Hyperparamètres
BATCH_SIZE = 128
GAMMA = 0.999
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 200
TARGET_UPDATE = 10
LEARNING_RATE = 0.001
num_episodes = 50

policy_net = DQN(8*8*7, 128, 9)
target_net = DQN(8*8*7, 128, 9)


# Initialisation de l'optimiseur
optimizer = torch.optim.Adam(policy_net.parameters(), lr=LEARNING_RATE)


import torch

def select_action(state, steps_done):
    sample = random.random()
    eps_threshold = EPS_END + (EPS_START - EPS_END) * math.exp(-1. * steps_done / EPS_DECAY)
    
    # Convertir state en tensor PyTorch si nécessaire
    if not isinstance(state, torch.Tensor):
        # print("state", state)
        state = torch.tensor([state], dtype=torch.float) 
    
    if sample > eps_threshold:
        with torch.no_grad():
            # Utiliser le réseau pour choisir une action
            return policy_net(state).max(1)[1].view(1, 1)
    else:
        # Choisir une action au hasard
        return torch.tensor([[random.randrange(6)]], dtype=torch.long)

# Fonction d'optimisation
def optimize_model():
    if len(memory) < BATCH_SIZE:
        return
    transitions = memory.sample(BATCH_SIZE)
    # Ici, vous préparerez les données et exécuterez une étape d'optimisation.

steps_done = 0
memory = ReplayMemory(10000)


for i_episode in range(num_episodes):
    # Initialisation de l'environnement et de l'état
    state, _ = env.reset()
    # env.render()

    done = False
    while not done:
        # Sélection et exécution d'une action
        action = select_action(state, steps_done)
        next_state, reward, done, truncated, _ = env.step(action.item())

        # Stockage de la transition dans la mémoire
        memory.push(state, action, next_state, reward)

        # Déplacement vers le nouvel état
        state = next_state

        # Performer une étape d'optimisation sur le batch actuel
        optimize_model()

    # Mise à jour du réseau cible, copiant tous les poids du réseau principal
    if i_episode % TARGET_UPDATE == 0:
        target_net.load_state_dict(policy_net.state_dict())

print('Entraînement complet')



KeyError: 5