https://gymnasium.farama.org/environments/toy_text/taxi/#taxi_ref
https://github.com/openai/gym/blob/master/gym/envs/toy_text/taxi.py
https://www.gocoder.one/blog/rl-tutorial-with-openai-gym/


In [None]:
!pip install gymnasium

In [None]:
import gymnasium as g
from gymnasium import spaces
import numpy as np
import pygame
import random

class Stop():
    def __init__(self, position, n_waiting_passengers):
        self.position = position
        self.n_waiting_passengers = n_waiting_passengers
        self.visited = False

    def reset(self, position,n_waiting_passengers):
        self.position = position
        self.n_waiting_passengers = n_waiting_passengers
        self.visited = False


class Bus():
    def __init__(self, position, capacity) -> None:
        self.position = position
        self.capacity = capacity
        self.passengers_in_bus = 0


    def reset(self):
        self.location = [0,0]
        self.capacity = 0
        self.passengers_in_bus = 0


class BusBoarding2D(g.Env):
    metadata = {'render.modes': ['human']}

    def __init__(self, screen_width=10, screen_height=10, bus_capacity=20, max_total_passengers_env=50,passenger_spawn_prob=0.02, max_total_stops = 2):

        # Déf des variables d'état
        self.screen_width = screen_width
        self.screen_height = screen_height
        self.passenger_spawn_prob = passenger_spawn_prob
        self.total_passengers_arrived = 0
        self.max_total_passengers_env = max_total_passengers_env

        # Init de la carte : une grille vide
        self.map = np.zeros((self.screen_height, self.screen_width))

        # Init du bus
        self.bus = Bus([0,0],bus_capacity)

        # Init des stops
        self.stops = []
        for _ in range(max_total_stops):
            position_stop = [random.randint(0, self.screen_width - 1), random.randint(0, self.screen_height - 1)]
            stop = Stop(position_stop, random.randint(0, self.max_total_passengers_env))
            self.stops.append(stop)
            self.map[position_stop[1], position_stop[0]] = 1  # Marque l'arrêt de bus sur la carte (1 = présence d'arrêt)

        # Déf des espaces d'observation et d'action
        self.observation_space = spaces.Box(low=0, high=self.max_total_passengers_env, shape=(self.screen_width, self.screen_height, 2))
        self.action_space = spaces.Discrete(6)


    def reset(self):
        # Remise à zéro de la carte
        self.map = np.zeros((self.screen_height, self.screen_width))

        # Réinitialisation du bus
        self.bus.reset()

        # Réinitialisation des arrêts de bus et leur position
        for stop in self.stops:
            if np.random.random() < self.passenger_spawn_prob:
                position_stop = [random.randint(0, self.screen_width - 1), random.randint(0, self.screen_height - 1)]
                stop.reset(position_stop, random.randint(0, self.max_total_passengers_env))
                self.map[position_stop[1], position_stop[0]] = 1  # = arrêt sur la carte

        return self._get_observation()

    def step(self, action):
        assert self.action_space.contains(action), f"Action {action} n'est pas valide"

        reward = 0

        # Déplacement du bus
        if action == 0:  # aller à gauche
            self.bus.position[0] = max(0, self.bus.position[0] - 1)
        elif action == 1:  # aller à droite
            self.bus.position[0] = min(self.screen_width - 1, self.bus.position[0] + 1)
        elif action == 2:  # aller en bas
            self.bus.position[1] = max(0, self.bus.position[1] - 1)
        elif action == 3:  # aller en haut
            self.bus.position[1] = min(self.screen_height - 1, self.bus.position[1] + 1)
        elif action == 4:  # faire monter un passager
          for stop in self.stops:
              if abs(self.bus.position[0] - stop.position[0]) <= 1 and abs(self.bus.position[1] - stop.position[1]) <= 1: # Vérifie si le bus est à proximité d'un arrêt de bus
                  if self.bus.passengers_in_bus < self.bus.capacity:  # Vérifie si le bus a de la place
                      if stop.n_waiting_passengers > 0:  # Vérifie s'il y a des passagers à l'arrêt
                          # Le bus prend tous les passagers qui attendent à l'arrêt
                          passengers_to_board = stop.n_waiting_passengers
                          if self.bus.passengers_in_bus + passengers_to_board <= self.bus.capacity:  # Vérifie que la capacité du bus n'est pas dépassée
                              self.bus.passengers_in_bus += passengers_to_board
                              stop.n_waiting_passengers = 0  # Tous les passagers sont montés dans le bus
                              stop.visited = True  # Marquer l'arrêt comme visité
                              reward += 10  # Récompenser l'action de prendre les passagers
                          else:
                              # Si la capacité du bus est atteinte, ne pas prendre plus de passagers
                              available_space = self.bus.capacity - self.bus.passengers_in_bus
                              self.bus.passengers_in_bus += available_space
                              stop.n_waiting_passengers -= available_space
                              stop.visited = True
                              reward -= 5  # petite pénalité pour ne pas avoir pris tous les passagers
                  else:
                      reward -= 10  # Si le bus est plein, appliquer une pénalité
                      print("Bus est plein, impossible de prendre plus de passagers!")
                  break  # Sortir de la boucle dès que le bus est à un arrêt

        elif action == 5:  # faire descendre un passager
            for stop in self.stops:
                if abs(self.bus.position[0] - stop.position[0]) <= 1 and abs(self.bus.position[1] - stop.position[1]) <= 1: # Vérifie si le bus est à proximité d'un arrêt de bus
                    if self.bus.passengers_in_bus > 0 and random.random() < 0.1 : # Proba de descendre à l'arrêt
                        passengers_descended = random.randint(0, self.max_total_passengers_env + 1)
                        self.bus.passengers_in_bus -= passengers_descended
                        self.total_passengers_arrived += passengers_descended
                        reward += 20


        # Calcul de l'observation
        observation = self._get_observation()

        done = False
        if self.total_passengers_arrived >= self.max_total_passengers_env:
            done = True


        return observation, reward, done

    def render(self, mode='human'):
     if mode == 'human':
         if not hasattr(self, 'screen'):
             pygame.init()
             self.screen = pygame.display.set_mode((self.screen_width * 50, self.screen_height * 50))
             pygame.display.set_caption("BusBoarding-v0")

         self.screen.fill((255, 255, 255))  
         # bus
         pygame.draw.rect(self.screen, (0, 255, 0), (self.bus.position[0] * 50, self.bus.position[1] * 50, 50, 50))

         # stops
         for stop in self.stops:
             color = (255, 0, 0) if stop.visited else (0, 0, 255)  # Red for visited, blue for unvisited
             pygame.draw.circle(self.screen, color, (stop.position[0] * 50 + 25, stop.position[1] * 50 + 25), 20)

         font = pygame.font.SysFont(None, 30)
         text1 = font.render(f"Passengers in Bus: {self.bus.passengers_in_bus}", True, (0, 0, 0))
         text2 = font.render(f"Total Passengers Arrived: {self.total_passengers_arrived}/{self.max_total_passengers_env}", True, (0, 0, 0))
         self.screen.blit(text1, (10, 10))
         self.screen.blit(text2, (10, 40))
         pygame.display.flip()

    def close(self):
        if hasattr(self, 'screen'):
            pygame.quit()
        super().close()

    def _get_observation(self):
        return np.array([self.bus.passengers_in_bus])



In [None]:
        """
        elif action == 7:  # aller en diagonale vers le haut et la gauche
            self.bus_position[0] = max(0, self.bus_position[0] - 1)
            self.bus_position[1] = min(self.height - 1, self.bus_position[1] + 1)
        elif action == 8:  # aller en diagonale vers le haut et la droite
            self.bus_position[0] = min(self.width - 1, self.bus_position[0] + 1)
            self.bus_position[1] = min(self.height - 1, self.bus_position[1] + 1)




            # Génération aléatoire de nouveaux passagers
        if random.random() < self.passenger_spawn_prob:
            self.waiting_passengers += 1
            self.passenger_locations.append((random.randint(0, self.screen_width - 1), random.randint(0, self.screen_height - 1)))
        """

# observation_space
Il s'agit de l'espace dans lequel se trouvent les états possibles de l'environnement. Dans l'exemple de l'environnement BusBoarding2D-v0, l'espace d'observation est une boîte de dimension 4 qui représente l'état actuel de l'environnement. Cette boîte contient 4 valeurs qui sont :

- ~~la position x du bus sur l'axe horizontal (entre 0 et 10)~~
- ~~la position y du bus sur l'axe vertical (entre 0 et 10)~~
- le nombre de passagers actuellement dans le bus (entre 0 et 10)
- le nombre de passagers qui attendent à l'arrêt (entre 0 et 5)
- La boîte d'observation peut être obtenue en appelant env.observation_space.sample().

# action_space
Il s'agit de l'espace dans lequel se trouvent les actions possibles pour l'agent. Dans l'exemple de l'environnement BusBoarding2D-v0, l'espace d'action est une discrétisation de l'espace 2D dans lequel évolue le bus. Plus précisément, l'espace d'action contient 9 valeurs discrètes qui correspondent aux actions suivantes :

- aller à gauche
- aller à droite
- aller en bas
- aller en haut
- faire monter un passager dans le bus
- faire descendre un passager du bus
- ~~ne rien faire~~
- ~~aller en diagonale vers le haut et la gauche~~
- ~~aller en diagonale vers le haut et la droite~~

Les actions sont identifiées par un nombre entier entre 0 et 6 ~~8~~. L'agent choisit une action en appelant la méthode env.step(action) avec l'action souhaitée en paramètre.

In [None]:
env = BusBoarding2D()
observation = env.reset()

done = False
while not done:
    action = env.action_space.sample()
    observation, reward, done = env.step(action)
    print(f"Action: {action}, Observation: {observation}, Reward: {reward}, Done: {done}")

env.close()

In [None]:
env = BusBoarding2D()

n_episodes = 10000
alpha = 0.1
gamma = 0.99
epsilon = 1.0
epsilon_min = 0.01
epsilon_decay = 0.995

# Table Q
q_table = np.zeros((10, 10, 6, 5))  # (x, y, passagers, actions)

# Boucle pour un nombre d'épisodes
for episode in range(n_episodes):
    # Réinitialiser l'environnement
    observation = env.reset()
    done = False
    episode_reward = 0

    # Boucle pour un nombre d'étapes
    while not done:
        x, y, passengers = observation

        # Choisir une action en utilisant la politique epsilon-greedy
        if np.random.uniform(0, 1) < epsilon:
            action = env.action_space.sample()
        else:
            action = np.argmax(q_table[x, y, passengers])

        # Appliquer l'action et récupérer les informations
        next_observation, reward, done, info = env.step(action)
        next_x, next_y, next_passengers = next_observation

        # Màj table Q
        old_value = q_table[x, y, passengers, action]


        next_x_clipped = np.clip(next_x, 0, 9)
        next_y_clipped = np.clip(next_y, 0, 9)

        next_max = np.max(q_table[next_x_clipped, next_y_clipped, next_passengers])
        new_value = (1 - alpha) + old_value + alpha * (reward + gamma * next_max)
        q_table[x, y, passengers, action] = new_value

        observation = next_observation
        episode_reward += reward

    epsilon = max(epsilon_min, epsilon * epsilon_decay)
    print(f"Épisode {episode+1} terminé avec une récompense totale de {episode_reward}.")