# 2CSSID-Workshop03. Apprentissage par renforcement

In [1]:
import matplotlib
import numpy             as np
import pandas            as pd
import matplotlib.pyplot as plt
%matplotlib inline

np.__version__, pd.__version__, matplotlib.__version__

('1.24.3', '2.0.3', '3.7.2')

In [2]:
from typing import Tuple, List, Type, Dict, Set

## 1. Agent

### 1.1. Création de la table Q

In [3]:
# TODO: Création de la table Q
def creer_Q(nbr_etats: int, nbr_actions: int) -> np.ndarray:
    return np.zeros([nbr_etats, nbr_actions])

#=====================================================================
# TEST UNITAIRE
#=====================================================================
# Resultat : 
# array([[0., 0., 0.],
#        [0., 0., 0.],
#        [0., 0., 0.],
#        [0., 0., 0.],
#        [0., 0., 0.]])
#---------------------------------------------------------------------

Q5_3 = creer_Q(5, 3)

Q5_3

array([[0., 0., 0.],
       [0., 0., 0.],
       [0., 0., 0.],
       [0., 0., 0.],
       [0., 0., 0.]])

### 1.2. Choix de l'action suivante

In [4]:
# TODO: Exploration
def exploration(Q: np.ndarray) -> int:
    return np.random.randint(0, Q.shape[1])

#=====================================================================
# TEST UNITAIRE
#=====================================================================
# Resultat : 
# un nombre aléatoire dans {0, 1, 2}
#---------------------------------------------------------------------

exploration(Q5_3)

2

In [5]:
# TODO: Exploitation
def exploitation(Q: np.ndarray, etat: int) -> int:
    return np.argmax(Q[etat])

#=====================================================================
# TEST UNITAIRE
#=====================================================================
# Resultat : 
# (2, 0, 1, 2, 1)
#---------------------------------------------------------------------

Q_t = np.array([
    [0.1, 0.2, 0.3],
    [1.0, 0.5, 0.7],
    [0.5, 1.0, 0.8],
    [0.2, 0.8, 0.9],
    [0.2, 1.0, 0.3]
])

exploitation(Q_t, 0), exploitation(Q_t, 1), exploitation(Q_t, 2), exploitation(Q_t, 3), exploitation(Q_t, 4)

(2, 0, 1, 2, 1)

In [6]:
# TODO: Choix de l'action
def choisir_action(Q: np.ndarray, etat: int, epsilon: float=0.2) -> int:
    return np.random.choice([exploration(Q), exploitation(Q, etat=etat)], p=[epsilon, 1-epsilon])

#=====================================================================
# TEST UNITAIRE
#=====================================================================
# Resultat : 
# Soit 2 soit un autre nombre dans {0, 1, 2}
#---------------------------------------------------------------------

choisir_action(Q_t, 0)

2

### 1.3. Mise à jours de la table Q

$$
Q(s_t, a_t) = Q(s_t, a_t) + \alpha * (r + \gamma * \max_a Q(s_{t+1}, a) - Q(s_t, a_t))
$$

In [7]:
# TODO: Mise à jours de la table Q
def mettre_ajour_Q(Q: np.ndarray, etat: int, netat: int, action: int, alpha: float, r: float, gamma: float) -> np.ndarray:
    new_Q = Q.copy()
    new_Q[etat, action] = new_Q[etat, action] + alpha * (r + gamma * np.max(new_Q[netat]) - new_Q[etat, action])
    return new_Q

#=====================================================================
# TEST UNITAIRE
#=====================================================================
# Resultat : 
# array([[0.1 , 0.2 , 0.3 ],
#        [1.  , 0.5 , 1.58],
#        [0.5 , 1.  , 0.8 ],
#        [0.2 , 0.8 , 0.9 ],
#        [0.2 , 1.  , 0.3 ]])
#---------------------------------------------------------------------

mettre_ajour_Q(Q_t, 1, 2, 2, 0.2, 5, 0.1)

array([[0.1 , 0.2 , 0.3 ],
       [1.  , 0.5 , 1.58],
       [0.5 , 1.  , 0.8 ],
       [0.2 , 0.8 , 0.9 ],
       [0.2 , 1.  , 0.3 ]])

### 1.4. Classe Agent

In [8]:
class Agent:
    def __init__(self, nbr_etats: int, nbr_actions: int, alpha: float, epsilon=0.2):
        self.alpha = alpha
        self.epsilon = epsilon
        self.Q = creer_Q(nbr_etats, nbr_actions)
    
    def set_etat(self, etat: int):
        self.etat = etat
        self.action = 0
        
    def choisir_action(self):
        self.action = choisir_action(self.Q, self.etat, self.epsilon)
        return self.action
    
    def appliquer(self, netat: int, action: int, r: float, gamma: float):
        self.Q = mettre_ajour_Q(self.Q, self.etat, netat, self.action, self.alpha, r, gamma)
        self.etat = netat
        

#=====================================================================
# TEST UNITAIRE
#=====================================================================
# Resultat : 
# array([[-2.  ,  4.  ,  0.  ],
#        [-0.36, -0.2 , -2.  ],
#        [ 0.  ,  0.  ,  0.  ],
#        [-0.2 ,  0.4 ,  0.  ],
#        [ 2.  ,  0.  ,  0.  ]])
#---------------------------------------------------------------------

netats_rs = [(0, -1), (1, -10), (3, -1), (1, 2), (4, -1), (1, 10), (1, -10), (0, -1), (0, 20)]

agent = Agent(5, 3, 0.2, epsilon=0.) # exploitation: pour qu'il soit déterministe
agent.set_etat(3) # etat initial = 3

for netat, r in netats_rs:
    action = agent.choisir_action()
    # FeedBack de l'environnement (netat, r)
    agent.appliquer(netat, action, r, gamma=0.5)
    

agent.Q

array([[-2.  ,  4.  ,  0.  ],
       [-0.36, -0.2 , -2.  ],
       [ 0.  ,  0.  ,  0.  ],
       [-0.2 ,  0.4 ,  0.  ],
       [ 2.  ,  0.  ,  0.  ]])

## 2. Environnement


### 2.1. États

In [9]:
def encoder_etat(pos: Tuple[int, int], psg: int, dst: int, nb_l: int, nb_c: int, nb_a: int) -> int:
    return (pos[0] * nb_c + pos[1]) * (nb_a + 1) * nb_a + (psg * nb_a + dst)

#=====================================================================
# TEST UNITAIRE
#=====================================================================
# Resultat : 
# 153
#---------------------------------------------------------------------

encoder_etat((1, 2), 3, 1, 5, 5, 4)

153

In [10]:
def decoder_etat(etat: int, nb_l:int, nb_c: int, nb_a: int) -> Tuple[int, int, int, int]:
    nb_pa = (nb_a + 1) * nb_a # nombre max des positions passager * arret par case
    
    pa = etat % nb_pa # position passager * position arret
    dst = pa % nb_a
    psg = pa// nb_a
    
    lc = etat // nb_pa # ligne * colonne
    l = lc // nb_c
    c = lc % nb_c
    
    return l, c, psg, dst

#=====================================================================
# TEST UNITAIRE
#=====================================================================
# Resultat : 
# (1, 2, 3, 1)
#---------------------------------------------------------------------

decoder_etat(153, 5, 5, 4)

(1, 2, 3, 1)

### 2.2. Actions

In [11]:
def repositionner(pos: Tuple[int, int], action: int) -> Tuple[int, int]:  
    if action == 0: # gauche
        return pos[0], pos[1]  - 1
    if action == 1: # droit
        return pos[0], pos[1]  + 1
    if action == 2: # avant 
        return pos[0] + 1, pos[1]
    if action == 3: # arrière
        return pos[0] - 1, pos[1]
    
    return pos # action > 3

#=====================================================================
# TEST UNITAIRE
#=====================================================================
# Resultat : 
# ((2, 2), (1, 2), (1, 1))
#---------------------------------------------------------------------

repositionner((1, 2), 2), repositionner((1, 2), 4), repositionner((1, 2), 0)

((2, 2), (1, 2), (1, 1))

### 2.3. Récompense

In [12]:
# TODO: Récompense
def verifier_action(etat: int, action: int, 
                    nb_l: int, nb_c: int, 
                    arrets: List[Tuple[int, int]], 
                    bar: Dict[Tuple[int, int], Set[int]]) -> Tuple[int, bool]: 
    
    nb_a = len(arrets)
    l, c, psg, dst = decoder_etat(etat, nb_l, nb_c, nb_a)
    match action:
        case 5: #deposer
            if((psg != nb_a) or ((l,c) != arrets[dst])):# le passager n'est pas dans le taxi, or deposer mais pas dans son dst
                return (-10, False)
            else:
                return (20, True)
            
        case 4: #prendre
            if((psg == nb_a) or ((l, c) != arrets[psg])):# le passager est déjà dans la voiture ou il n'est pas dans la pos actuelle du taxi
                return (-10, False)
            else:
                return (0, True)
            
        case _:#moving action (0, 1, 2, 3)
            barriere = (l,c) in bar.keys() and bar[(l,c)] == set([action]) 
            l, c = repositionner((l, c), action=action)

            if( 
                (l >= 0) and (l<= nb_l-1) and (c >= 0) and (c <= nb_c-1)  # on est toujours dans la carte
                and not barriere # pas des barrières
                ): 
                return (0, True) #la nouvelle position est juste
            else:
                return (0, False)

def calculer_recompense(etat: int, action: int, 
                        nb_l:int, nb_c: int, 
                        arrets: List[Tuple[int, int]],
                        bar: Dict[Tuple[int, int], Set[int]]) -> Tuple[float, int, bool]:
    
    recompense = -1 # Toujours on applique cette récompense
    netat = etat
    fin = False
    nb_a = len(arrets)
    l, c, psg, dst = decoder_etat(etat, nb_l, nb_c, nb_a)
    pos = (l, c)
    # Compléter ici
    checking_action = verifier_action(etat=etat, action=action, nb_l=nb_l, nb_c=nb_c, arrets=arrets, bar=bar)
    recompense += checking_action[0]

    if(checking_action[1] == True):
        if(action == 4): #prendre
            psg = nb_a
        elif(action == 5): #deposer
            psg = dst
            fin = True
        elif(action <= 3):
            pos = repositionner(pos=pos, action=action)
        
        netat = encoder_etat(pos=pos, psg=psg, dst=dst, nb_l=nb_l, nb_c=nb_c, nb_a=nb_a)

    return recompense, netat, fin

#=====================================================================
# TEST UNITAIRE
#=====================================================================
# Resultat : 
# [(-11, 56, False),
#  (-11, 44, False),
#  (-1, 96, False),
#  (19, 0, True),
#  (-11, 4, False),
#  (-11, 56, False),
#  (-11, 44, False),
#  (-1, 44, False),
#  (-1, 304, False),
#  (-1, 224, False)]
#---------------------------------------------------------------------

nb_l, nb_c = 5, 5
arrets = [(0,0), (0,4), (4,0), (4,3)]
nb_a = len(arrets)
barrieres = {
    (0, 1): set([1]), # barrière à droit
    (0, 2): set([0]), # barrière à gauche
    (3, 0): set([1]), # barrière à droit
    (4, 0): set([1]), # barrière à droit
    (3, 1): set([0]), # barrière à gauche
    (4, 1): set([0]), # barrière à gauche
    (3, 2): set([1]), # barrière à droit
    (4, 2): set([1]), # barrière à droit
    (3, 3): set([0]), # barrière à gauche
    (4, 3): set([0]), # barrière à gauche
}

resultats = []

tests = [# (etat, action)
    # action = 4 (prendre un passager)
    (encoder_etat((0, 2), 4, 0, nb_l, nb_c, nb_a), 4), # pos=(0,2); psg=dans la voiture; dst=arrêt0(0, 0)
    (encoder_etat((0, 2), 1, 0, nb_l, nb_c, nb_a), 4), # pos=(0,2); psg=arrêt1; dst=arrêt0(0, 0)
    (encoder_etat((0, 4), 1, 0, nb_l, nb_c, nb_a), 4), # pos=arrêt1; psg=arrêt1; dst=arrêt0(0, 0)
    # action = 5 (déposer un passager)
    (encoder_etat((0, 0), 4, 0, nb_l, nb_c, nb_a), 5), # pos=arrêt0; psg=dans la voiture; dst=arrêt0(0, 0)
    (encoder_etat((0, 0), 1, 0, nb_l, nb_c, nb_a), 5), # pos=arrêt0; psg=arrêt1; dst=arrêt0(0, 0)
    (encoder_etat((0, 2), 4, 0, nb_l, nb_c, nb_a), 5), # pos=(0, 2); psg=dans la voiture; dst=arrêt0(0, 0)
    (encoder_etat((0, 2), 1, 0, nb_l, nb_c, nb_a), 5), # pos=(0, 2); psg=arrêt1; dst=arrêt0(0, 0)
    # action = 0 (aller à gauche)
    (encoder_etat((0, 2), 1, 0, nb_l, nb_c, nb_a), 0), # il existe une barrière à gauche
    (encoder_etat((3, 0), 1, 0, nb_l, nb_c, nb_a), 0), # il existe une barrière à droite
    (encoder_etat((2, 2), 1, 0, nb_l, nb_c, nb_a), 0), # il n'existe aucune barrière
]

for etat, action in tests:
    resultats.append(calculer_recompense(etat, action, nb_l, nb_c, arrets, barrieres))

resultats

[(-11, 56, False),
 (-11, 44, False),
 (-1, 96, False),
 (19, 0, True),
 (-11, 4, False),
 (-11, 56, False),
 (-11, 44, False),
 (-1, 44, False),
 (-1, 304, False),
 (-1, 224, False)]

### 2.3. Classe Envinonnement

In [13]:
import time, sys
from IPython.display import HTML, display, clear_output

class TaxiEnv():
    def __init__(self, nb_l:int, nb_c: int, 
                 arrets: List[Tuple[int, int]], 
                 bar: Dict[Tuple[int, int], Set[int]], gamma: float = 0.5):
        self.actions = ['gauche', 'droit', 'avant', 'arriere', 'prendre', 'deposer']
        self.arrets = arrets
        self.nb_l = nb_l
        self.nb_c = nb_c
        self.nb_etats = nb_l * nb_c * (len(arrets) + 1) * len(arrets)
        self.bar = bar
        self.gamma = gamma
        
        for i in range(nb_l):
            pos = (i, 0)
            if pos not in bar:
                bar[pos] = set()
            bar[pos].add(0) # on ne peut pas aller à gauche
            pos = (i, nb_c-1)
            if pos not in bar:
                bar[pos] = set()
            bar[pos].add(1) # on ne peut pas aller à droit
        for j in range(nb_c):
            pos = (0, j)
            if pos not in bar:
                bar[pos] = set()
            bar[pos].add(3) # on ne peut pas aller en avant
            pos = (nb_l-1, j)
            if pos not in bar:
                bar[pos] = set()
            bar[pos].add(2) # on ne peut pas aller en arrière    
        
    def ajouter_agent(self, alpha: float, epsilon=0.2):
        self.agent = Agent(self.nb_etats, len(self.actions), alpha, epsilon=epsilon)
    
    def encoder_etat(self, pos: Tuple[int, int], psg: int, dst: int):
        return encoder_etat(pos, psg, dst, self.nb_l, self.nb_c, len(self.arrets))
    
    def decoder_etat(self, etat: int) -> Tuple[int, int, int]:
        return decoder_etat(etat, self.nb_l, self.nb_c, len(self.arrets))
    
    def initialiser(self, pos: Tuple[int, int], psg: int, dst: int):
        etat = self.encoder_etat(pos, psg, dst)
        self.agent.set_etat(etat)
    
    def transporter(self, plot=False):
        
        nb_l = self.nb_l
        nb_c = self.nb_c
        arrets = self.arrets
        bar = self.bar
        nb_a = len(arrets)
        actions = self.actions
        
        etat = self.agent.etat
        
        etapes = []
        fin = False
        rt = 0
        
        while not fin:
            action = self.agent.choisir_action()
            r, netat, fin = calculer_recompense(etat, action, nb_l, nb_c, arrets, bar)
            etapes.append((self.decoder_etat(etat), actions[action], r, fin))
            self.agent.appliquer(netat, action, r, self.gamma)
            if plot:
                rt += r
                html = self.dessiner()
                html += '<div class="cont">'
                html += f'<p>Etape: {len(etapes)}</p>'
                html += f'<p>Etat: {etat}</p>'
                html += f'<p>Action: {actions[action]}</p>'
                html += f'<p>Récompense: {r}</p>'
                html += f'<p>Récompense totale: {rt}</p>'
                html += '</div>'
                time.sleep(0.5)
                clear_output(wait=True)
                display(HTML(html))
                sys.stdout.flush()
            etat = netat
 
        return etapes
    
    def dessiner(self):
        bordures = ['l', 'r', 'b', 't']
        
        nb_a = len(self.arrets)
        
        if hasattr(self.agent, 'etat'):
            l, c, psg, dst = decoder_etat(self.agent.etat, self.nb_l, self.nb_c, nb_a)
        else:
            l, c, psg, dst = None, None, None, None
        
        html = """<style>
                div.cont {display:inline-block; margin:5px; vertical-align: top; float: left;}
                div.table {display:table;}
                div.row {display:table-row; clear: both; width:auto;}
                div.cell {display:table-cell; height: 50px; width: 50px; text-align: center; border: 1px dotted black; vertical-align: middle;}
                div.l {border-left: 2px solid red ;}
                div.r {border-right: 2px solid red ;}
                div.b {border-bottom: 2px solid red ;}
                div.t {border-top: 2px solid red ; }
                div.arret {background: yellow;}
                </style>
                <div class="cont">
                <div class="table">
                """
        for i in range(self.nb_l):
            html += '<div class="row">'
            for j in range(self.nb_c):
                html += '<div class="cell '
                if (i, j) in self.bar:
                    for b in self.bar[(i, j)]:
                        html += bordures[b] + ' '
                if (i, j) in self.arrets:
                    html += 'arret'
                html += '">'
                cont = ''
                if dst != None and self.arrets[dst] == (i, j):
                    cont = '🏲'
                if psg != None and psg != nb_a and self.arrets[psg] == (i, j):
                    cont += '👽'
                if (l, c) == (i, j):
                    if psg != None and psg != nb_a:
                        cont += '🚖'
                    else:
                        cont += '🚍'
                if not cont:
                    cont = ':'
                html += cont + '</div>'
            html += '</div>'
            
        html += '</div></div>'

        # print(html)
        
        return html
        
print('FIN')

FIN


## 3. Experimentation

In [14]:
arrets = [(0,0), (0,4), (4,0), (4,3)]
barrieres = {
    (0, 1): set([1]), # barrière à droit
    (0, 2): set([0]), # barrière à gauche
    (3, 0): set([1]), # barrière à droit
    (4, 0): set([1]), # barrière à droit
    (3, 1): set([0]), # barrière à gauche
    (4, 1): set([0]), # barrière à gauche
    (3, 2): set([1]), # barrière à droit
    (4, 2): set([1]), # barrière à droit
    (3, 3): set([0]), # barrière à gauche
    (4, 3): set([0]), # barrière à gauche
}

taxi = TaxiEnv(5, 5, arrets, barrieres)
taxi.ajouter_agent(0.1, 0.1)
taxi.initialiser((3, 1), 2, 0)


html = taxi.dessiner()
display(HTML(html))
# hist = taxi.transporter(plot=True)

In [15]:
# Tester après l'exécution de la même initialisation plusieurs fois
for i in range(1000):
    taxi.initialiser((3, 1), 2, 0)
    taxi.transporter() 

taxi.initialiser((3, 1), 2, 0)
hist = taxi.transporter(plot=True)

In [16]:
# historique des étapes
hist

[((3, 1, 2, 0), 'avant', -1, False),
 ((4, 1, 2, 0), 'prendre', -11, False),
 ((4, 1, 2, 0), 'gauche', -1, False),
 ((4, 0, 2, 0), 'prendre', -1, False),
 ((4, 0, 4, 0), 'arriere', -1, False),
 ((3, 0, 4, 0), 'arriere', -1, False),
 ((2, 0, 4, 0), 'arriere', -1, False),
 ((1, 0, 4, 0), 'arriere', -1, False),
 ((0, 0, 4, 0), 'deposer', 19, True)]

In [17]:
# tester l'appentissage avec des initialisations aléatoires
arrets2 = [(0,0), (0,4), (4,0), (4,3)]
barrieres2 = {
    (0, 1): set([1]), # barrière à droit
    (0, 2): set([0]), # barrière à gauche
    (3, 0): set([1]), # barrière à droit
    (4, 0): set([1]), # barrière à droit
    (3, 1): set([0]), # barrière à gauche
    (4, 1): set([0]), # barrière à gauche
    (3, 2): set([1]), # barrière à droit
    (4, 2): set([1]), # barrière à droit
    (3, 3): set([0]), # barrière à gauche
    (4, 3): set([0]), # barrière à gauche
}

taxi2 = TaxiEnv(5, 5, arrets2, barrieres2)
taxi2.ajouter_agent(0.1, 0.1)

def exec_aleatoire(taxi_env, plot=False):
    pos = np.random.randint(5), np.random.randint(5)
    psg, dst = np.random.randint(len(arrets2)), np.random.randint(len(arrets2))
    taxi_env.initialiser(pos, psg, dst)
    return taxi_env.transporter(plot=plot) 

for i in range(10000):
    exec_aleatoire(taxi2, plot=False)
    
print('fin')

fin


In [18]:
hist = exec_aleatoire(taxi2, plot=True)