In [None]:
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch
import numpy as np
from matplotlib import pyplot as plt
import time
from tqdm.notebook import trange, tqdm
import pandas as pd

# Pour torch si vous avez un GPU
# device = "cpu" if not torch.cuda.is_available() else "cuda"
device = "cpu" # Pour forcer l'utilisation du CPU

# Agent qui utilise les LSTM
Dans se notebook nous allons implémenter un agent qui utilise les LSTM pour prédire le prochain feedback. À la suite de cette prédiction nous allons implémenter un mécanisme capable de décider la meilleur action a faire.

- model : LSTM
- environement : grille 2D 

In [None]:
from environnement.environnement import Environnement as env # mother class
from environnement.small_loop import small_loop

# model machine learning
from model.Tokenizer import *
from model.RNN import *
from model.CustomDataSet import CustomDataSet, CustomDataSetRNN
from outil import *
from inter.simpleInteraction import simpleInteraction as inter

# L'agent :
Nous devons passer à l'agent une valence qui doit contenir chaque action et outcome de l'environement. Cette valence permet à l'agent de connaitre toutes les actions et feedback possible. Elle aussi util au moment de la décision, c'est elle qui définie ce qu'es un bon comportement. 

Nous devons aussi lui passé un model de type RNN dans notre cas un LSTM (non entrainer), qui devra prédire les feedback possible. Pour que le modèl s'entraine nous avons besoin d'un optimizer et d'une fonction de perte. Pour obtenir de meilleur performance, nous allons passé à l'agent un nombre. Ce nombre corespond aux nombre d'action et d'outcome, que le modèl doit utiliser pour s'entrainer et prédire. Ce nombre sera appeler `gap_train` pour l'entrainement et `gap_predi` pour la prédiction. Pour obtenir de meilleur performance il faut que ce nombre soit le même pour les deux cas.

Comme pour les agents développemental d'Olivier Georgeon, l'agent doit pouvoir sélectionner une séquence d'action à réaliser (tant que tout ce passe bien).

In [None]:
class AgentLSTM:
    def __init__(self, valence:dict[inter, float], model:nn.Module, max_depth:int, seuil:float,
                optimizer, loss_fn, gap_train:int=11, gap_predi:int=11, nb_epoch:int=100, data_validate=None):
        self.model = model
        self.valence = valence
        self.optimizer = optimizer
        self.loss_fn = loss_fn
        self.gap_train = gap_train
        self.gap_predi = gap_predi
        self.nb_epoch = nb_epoch
        self.seq_to_exe = [] # Séquence d'actions et d'outcome choisit par la décision
        self.history_act = [] # Historique des actions
        self.history_fb = [] # Historique des feedbacks
        self.data_validate = data_validate
        self.max_depth:int = max_depth
        self.seuil:float = seuil
        
        self.all_outcomes = set()
        self.all_act = set()
        key:inter = None
        for key in valence.keys():
            self.all_outcomes.add(key.getOutcome())
            self.all_act.add(key.getAction())
        self.all_outcomes = list(self.all_outcomes)
        self.all_act = list(self.all_act)
        
        self.action_choice = self.all_act[0] # De base nous choisissons la première action
        self.history_act.append(self.action_choice)
        self.outcome_prediction = None # De base le modèl ne prédi rien
        
        # number_patern = np.sum([(len(self.all_act) * len(self.all_outcomes)) **i for i in range(1, self.max_depth + 1)])
        number_patern = 20000
        self.prealloc_df = pd.DataFrame(np.empty((number_patern, 4)), columns=["proposition", "valence", "action", "probability"])
        # self.prealloc_df = self.prealloc_df.astype({"proposition": "U20", "valence": float, "action": int, "probability": float})
        self.current_index = 0
        
        # Nous avons besoin d'un tokenizer pour transformer les actions et outcomes en entiers
        # Pour des questions de simplicité, nous voulons que les outcomes soient passé en premier
        self.tokenizer = SimpleTokenizerV1(
            vocab={key: i for i, key in enumerate(self.all_outcomes + self.all_act)})
        
        # Variable moniteur
        self.loss_train = [] # Contient toutes les listes des pertes d'entrainement
        self.loss_val = [] # Contient toutes les listes des pertes de validation
        self.acc_train = []
        self.acc_val = []
        
    def fit(self):
        """
        Fonction de l'agent pour entrainer le modèle
        """        
        dataset = CustomDataSetRNN(actions=self.history_act, outcomes=self.history_fb, 
                                 context_lenght=self.gap_train, dim_out=len(self.all_outcomes),
                                 tokenizer=self.tokenizer)
        
        data_loader = DataLoader(dataset, batch_size=16, shuffle=True)
        
        if self.data_validate is not None:
            dataset_test = CustomDataSetRNN(actions=self.data_validate[0], outcomes=self.data_validate[1], 
                                 context_lenght=self.gap_train, dim_out=len(self.all_outcomes),
                                 tokenizer=self.tokenizer)
            data_loader_test = DataLoader(dataset_test, batch_size=32, shuffle=True)
            loss_test = []
        
        for _ in range(self.nb_epoch):
            self.model.train()
            steps = 0
            train_acc = 0
            training_loss = []
            for x,t in data_loader:
                bs = t.shape[0]
                h = torch.zeros(self.model.num_layers, bs, self.model.hidden_size, device=device)
                cell = torch.zeros(self.model.num_layers, bs, self.model.hidden_size, device=device)

                pred, h, cell = self.model(x, h, cell)

                loss = self.loss_fn(pred[:, -1, :], t)
                self.optimizer.zero_grad()
                loss.backward()
                self.optimizer.step()
                
                training_loss.append(loss.item())
                
                train_acc += (pred[:, -1, :].argmax(1) == t).sum()
                steps += bs
                
            self.loss_train.append(training_loss)
            self.acc_train.append(train_acc)
            
            if self.data_validate is not None:
                self.model.eval()
                steps = 0
                test_acc = 0
                loss_test = []
                
                for text, label in data_loader_test:
                    text = text.to(device)
                    label = label.to(device)
                    bs = label.shape[0]

                    # Initialize hidden and memory states
                    hidden = torch.zeros(self.model.num_layers, bs, self.model.hidden_size, device=device)
                    memory = torch.zeros(self.model.num_layers, bs, self.model.hidden_size, device=device)
                    
                    # Forward pass through the model
                    pred, hidden, memory = self.model(text, hidden, memory)

                    # Calculate the loss
                    loss = self.loss_fn(pred[:, -1, :], label)
                    loss_test.append(loss.item())

                    # Calculate test accuracy
                    test_acc += (pred[:, -1, :].argmax(1) == label).sum()
                    steps += bs
                    
                self.loss_val.append(loss_test)
                self.acc_val.append(test_acc)
                
    def predict(self, action):
        """
        Fonction de l'agent pour prédire l'outcome en fonction de l'action \
        utilise l'historique des actions et outcomes comme contexte

        Args:
            action : L'action dont on prédit l'outcome 

        Raises:
            Exception: Si l'historique des actions et outcomes est insuffisant

        Returns:
            out : L'outcome prédit
        """        
        # Nous devons recupérer les gap dernières actions/outcomes
        x = []
        for i in range(-(self.gap_predi - 1) // 2, 0, 1):
            x.append(self.history_act[i])
            x.append(self.history_fb[i])
        x.append(action)
        seq_to_pred = self.tokenizer.encode(x)
        # On simule un batch de taille 1
        seq_to_pred = torch.tensor([seq_to_pred], device=device)
        h = torch.zeros(self.model.num_layers, 1, self.model.hidden_size, device=device)
        cell = torch.zeros(self.model.num_layers, 1, self.model.hidden_size, device=device)
        probs, _, _ = self.model(seq_to_pred, h, cell)
        
        pred_feedback = torch.argmax(probs[:, -1, :]).item()
        pred_feedback = self.tokenizer.decode(pred_feedback)
        
        return pred_feedback
    
    # def recursive_expective_valance(self, max_depth:int, proba:int, seuil:float, h, cell, verbose:bool=False):
    #     """
    #     Teste toutes les actions possibles pour leur calculer l'expective valance
        
    #     Args:
    #         max_depth (int): Maximum d'action à tester
    #         seuil (float): Ne continuer que si le seuil n'est pas dépassé
    #         h : La couche cachée du modèle
    #         cell : La mémoire du modèle
    #         verbose (bool, optional): Defaults to False.
    #     """
    #     if max_depth <= 0 or proba < seuil:
    #         return {}
        
    #     dict_seq = {}
    #     for act in self.all_act:
    #         act_to_pred = self.tokenizer.encode(act)
    #         act_to_pred = torch.tensor([act_to_pred], device=device).unsqueeze(0)
    #         # print("Hidden and Cell")
    #         # print(h)
    #         # print(cell)
            
    #         probs, h_new, cell_new = self.model(act_to_pred, h, cell)
    #         probs = torch.nn.functional.softmax(probs[0, -1, :], dim=0).tolist()
    #         for fb in self.all_outcomes:
    #             # print(f'probability \033[0;32m proba {probs} \033[0m')
    #             fb_encode = self.tokenizer.encode(fb)
    #             # print(fb_encode)
    #             prob = probs[fb_encode]
                
    #             dict_seq[str([act, fb])] = prob * self.valence[inter(act, fb)]
    #             if verbose:
    #                 # print(f"Depth {max_depth}, Action: {act}, Outcome: {fb}, Valence: {dict_seq[act]}")
    #                 pass
                    
    #             x = torch.tensor([fb_encode], device=device).unsqueeze(0)                    
    #             _, h_new_to_pass, cell_new_to_pass = self.model(x, h_new, cell_new)
    #             following = self.recursive_expective_valance(max_depth=max_depth - 1, 
    #                                                         proba=proba * prob,
    #                                                         seuil=seuil, 
    #                                                         h=h_new_to_pass, 
    #                                                         cell=cell_new_to_pass, 
    #                                                         verbose=verbose)
                
    #             for key, value in following.items():
    #                 new_key = [act, fb]
    #                 new_key += eval(key)
    #                 dict_seq[str(new_key)] = value + dict_seq[str([act, fb])]
                    
    #     return dict_seq
    
    def recursif_expective_valance(self, context:list, max_depth:int, seuil:float=0.2, proba:float = 1, seq_predi:list = [], valence_precedente:float = 0):
        """
        Create the list of proposed sequences
        """
        max_depth -= 1
        self.model.eval()
        
        # print('debug recu val :')
        # print("probabilite", proba)
        # print("context", context)
        # print("max_depth", max_depth)
        # print("seuil", seuil)
        # print("seq_predi", seq_predi)
        
        # Compute the expected valence of each action
        for act in self.all_act:
            new_seq = seq_predi + [act]
            seq_to_predict = context + [self.tokenizer.encode(act)]
            seq_to_predict = torch.tensor([seq_to_predict], dtype=torch.int).to(device)

            hidden = torch.zeros(self.model.num_layers, 1, self.model.hidden_size, device=device)
            memory = torch.zeros(self.model.num_layers, 1, self.model.hidden_size, device=device)

            x, _, _ = self.model(seq_to_predict, hidden, memory)
            x = x[0, -1, :]

            # Transforme x into list proba
            probs = torch.nn.functional.softmax(x, dim=0).tolist()
            
            # for each outcome, record the expected valence
            for i, out in enumerate(self.all_outcomes):
                tmp_new_seq = new_seq + [out]
                tmp_proba = probs[i] * proba
                # If the probability is above a threshold
                if tmp_proba > seuil:
                    # Record the proposed sequence with its expected valence
                    expected_valence = float(np.round(self.valence[inter(act, out)] * tmp_proba, decimals=4)) + valence_precedente
                    self.prealloc_df.iloc[self.current_index] = [str(tmp_new_seq), expected_valence, tmp_new_seq[0], tmp_proba]
                    self.current_index += 1
                    # If the max_depth is not reached 
                    if max_depth > 0: 
                        # Recursively look for longer sequences
                        new_context = context + self.tokenizer.encode([act, out])
                        self.recursif_expective_valance(context=new_context[2:], max_depth=max_depth, seuil=seuil, 
                            proba=tmp_proba, seq_predi=tmp_new_seq.copy(), valence_precedente=expected_valence)
    
    def expective_valance(self, verbose:bool=False):
        """
        Permet de calculer l'expective valance d'une séquence d'interaction

        Args:
            max_depth (int): _description_
            seuil (float, optional): _description_. Defaults to 0.2.
            verbose (bool, optional): _description_. Defaults to False.
        """
        
        x = []
        for i in range(-(self.gap_predi - 1) // 2, 0, 1):
            x.append(self.history_act[i])
            x.append(self.history_fb[i])
        seq_to_pred = self.tokenizer.encode(x)
        self.prealloc_df[:] = np.nan
        self.prealloc_df["valence"] = -np.inf
        self.current_index = 0
        return self.recursif_expective_valance(context=seq_to_pred,
                                            max_depth=self.max_depth,
                                            proba=1, seq_predi=[],
                                            seuil=self.seuil)
    def decide(self):
        if self.seq_to_exe and len(self.seq_to_exe) > 1:
            out = self.seq_to_exe.pop(0)
            if out == self.history_fb[-1]:
                act = self.seq_to_exe.pop(0)
                return act
            else:
                self.seq_to_exe = []
        self.seq_to_exe = []        
        
        self.expective_valance()
        self.seq_to_exe = self.prealloc_df.sort_values(by="valence", ascending=False).iloc[0].proposition
        self.seq_to_exe = eval(self.seq_to_exe)
        print("after compute ...")
        print(self.seq_to_exe)
        act = self.seq_to_exe.pop(0)
        return act
        
    def action(self, real_outcome, verbose=False, explore:bool=False):
        """
        La fonction action permet à l'agent de choisir une action en fonction de l'outcome réel.
        Cette fonction entraine le modèle a prévoir les outcomes futurs en fonction des actions passées.

        Args:
            real_outcome : L'outcome réel suite à l'action de l'agent
            verbose : Affiche les informations sur l'entrainement ou non
        """
        # La première étape est de sauvegarder l'outcome réel
        self.history_fb.append(real_outcome)
        good_pred:bool = self.outcome_prediction == real_outcome
        if verbose :
            print(f"Action: {self.action_choice}, Prediction: {self.outcome_prediction}, Outcome: {real_outcome}, \033[0;31m Satisfaction: {good_pred} \033[0m")
        
        # Ensuite nous regardons si nous devons entrainer le modèle
        # not(explore) and 
        if not(good_pred) and len(self.history_fb) + len(self.history_fb) > self.gap_train:
            self.fit()
        # Nous devons maintenant choisir une action
        if len(self.history_fb) + len(self.history_fb) > self.gap_predi:
            self.action_choice = self.decide()
            self.outcome_prediction = self.predict(self.action_choice)
        else:
            self.action_choice = np.random.choice(self.all_act)
        # self.action_choice = np.random.choice(self.all_act)
        self.history_act.append(self.action_choice)
        
        return self.action_choice, self.outcome_prediction
        
        
              

# Environement
Nous pouvons entrainer sur divers environement. Ici nous allons tester sur une grille 2D avec un robot qui peut faire ces actions :
- forward
- turn left
- turn right
- feel_front

L'enviroment renvoie simplement :
- Empty
- Wall

# Valence
Pour donner un bon comportement a notre robot nous allons essayer avec cette valence :
- forward, empty : 1
- forward, wall : -30  `Nous voulons que le robot évite les murs`
- turn_left, empty : -7
- turn_left, wall : -7 `Cas impossible, mais nous devons le noter`
- turn_right, empty : -7
- turn_right, wall : -7 `Cas impossible, mais nous devons le noter`
- feel_front, wall : -5
- feel_front, empty : -5

In [None]:
environenment = small_loop(x=1, y=1, theta=0, world=np.array([
                [1, 1, 1, 1, 1],
                [1, 0, 0, 0, 1],
                [1, 0, 1, 0, 1],
                [1, 0, 0, 0, 1],
                [1, 1, 1, 1, 1],
            ]))


valence = {
    inter('forward', 'empty') : 7,
    inter('forward', 'wall') : -30,
    inter('turn_left', 'empty') : -7,
    inter('turn_left', 'wall') : -7,
    inter('turn_right', 'empty') : -7,
    inter('turn_right', 'wall') : -7,
    inter('feel_front', 'wall') : -7,
    inter('feel_front', 'empty') : -7,
}

In [None]:
torch.manual_seed(0)
np.random.seed(0)
hidden_size = 64
num_layers = 1
len_vocab = len(environenment.get_outcomes() + environenment.get_actions())

# Create the LSTM classifier model
lstm_classifier = LSTM(num_emb=len_vocab, output_size=2, 
                       num_layers=num_layers, hidden_size=hidden_size).to(device)

optimizer = torch.optim.Adam(lstm_classifier.parameters(), lr=0.01)
loss_func = nn.CrossEntropyLoss()
tokenizer = SimpleTokenizerV1(create_dico_numerate_word(environenment.get_outcomes() + environenment.get_actions()))

In [None]:
# Pour évaluler la performance du modèle
act_val, fb_val = [], []
for i in trange(1000):
    action = np.random.choice(environenment.get_actions())
    outcome = environenment.outcome(action)
    act_val.append(action)
    fb_val.append(outcome)

In [None]:
torch.manual_seed(0)
np.random.seed(0)
# Définition de l'agent
agent = AgentLSTM(valence=valence, model=lstm_classifier, optimizer=optimizer, loss_fn=loss_func,
    gap_predi=5, gap_train=5, max_depth=4, seuil=0.5, nb_epoch=100,
    data_validate=(act_val, fb_val))
history_good = []
pourcent_by_10  = []

In [None]:
torch.manual_seed(0)
np.random.seed(0)

outcome = environenment.outcome(agent.action_choice)

for i in tqdm(range(80)):
    action, predi = agent.action(outcome, verbose=True, explore=True)
    df = agent.prealloc_df
    outcome = environenment.outcome(action)
    history_good.append(outcome == predi)
    pourcent_by_10.append(sum(history_good[-10:]) * 10 if len(history_good) >= 10 else 0)
    # env_test2.display_world(out)
    environenment.save_world()

print(agent.loss_train)
see_evolued_acc(agent.loss_train)
    
# for i in tqdm(range(40)):
#     action, predi = agent.action(outcome, verbose=True, explore=False)
#     outcome = environenment.outcome(action)
#     history_good.append(outcome == predi)
#     pourcent_by_10.append(sum(history_good[-10:]) * 10 if len(history_good) >= 10 else 0)
#     # env_test2.display_world(out)
#     environenment.save_world()

In [None]:
print(agent.loss_train)
see_evolued_acc(agent.loss_train)