In [1]:
import pickle
import time
import numpy as np
import random
import graphviz as gv
import imageio
import os
import imageio.v2 as imageio
from scipy.optimize import linprog

class States():
    def __init__(self, state,transitions_with_action, transitions_without_action, reward, resolution = False):
        """
        Initialisation des états de la chaine de markov
        
        Args
        ----
        state : str
            Nom de l'état (S0...)
        reward : dictionnaire des rewards
            Récompense de l'état
        transitions_with_action : dict
            Dictionnaire de l'ensemble des transitions avec action
        transitions_without_action : dict
            Dictionnaire de l'ensemble des transitions sans action
        resolution : bool
            Booléen qui permet de savoir si on veut résoudre les problèmes détéctés automatiquement si possible
        """
        self.state = state
        if reward != {}: # Dans le cas où on n'a pas renseigné de récompense pour un état
            self.reward = reward[state] # récompense de l'état
        else:
            self.reward = 0
        self.transitions_without_action = {}
        self.transitions_with_action = {}

        sans_action = True # on crée ce booléen pour vérifier qu'il n'y a pas de transition avec action ensuite
        for transition in transitions_without_action:
            # on parcours l'ensemble des transitions
            transi_active = transitions_without_action[transition]
            if transi_active["from"]==self.state: # on vérifie si l'état de départ est l'état actif
                sans_action = False
                self.transitions_without_action["targets"] = transi_active["targets"]
                self.transitions_without_action["weights"] = transi_active["weights"]

        for transition in transitions_with_action:
            transi_active = transitions_with_action[transition]
            if transi_active["from"]==self.state and sans_action == True:
                # les clés dans transitions_with_action sont les actions et les valeurs des dict des états cibles et leurs poids
                self.transitions_with_action[transi_active["action"]] = {"targets" : transi_active["targets"], "weights" : transi_active["weights"]}
            elif transi_active["from"]==self.state and sans_action == False:
                print("\nWarning : l'état", self.state, "comporte des transitions avec et sans action")
                if resolution == True: # On choisit pour résoudre le problème de supprimer les transitions avec action de l'état, pour ça on ne rajoute pas la transition dans self.transitions_with_action
                    self.transitions_with_action = {}
                    print("Le problème a été résolu automatiquement : une transition avec action de l'état", self.state, "a été supprimée")
                    pass

    def __repr__(self):
        return (f"State : {self.state} \n State reward {self.reward} \n Transitions without action : {self.transitions_without_action} \n Transitions with action : {self.transitions_with_action}")


class markov():
    def __init__(self, fichier_mdp):
        """
        Initialisation de la chaine de markov

        Args
        ----
        fichier_mdp : str
            Nom du fichier contenant la modélisation de la chaine de markov
        """
        ! python mdp_lecture/mdp.py < {fichier_mdp} # On lance le fichier mdp.py avec la modélisation voulue
        # On lit le .pickle des données que l'on souhaitent récupérer
        def read_list(nom_liste):
            # for reading also binary mode is important
            with open(nom_liste, 'rb') as fp:
                liste = pickle.load(fp)
                return liste

        L = read_list("liste_donnees")
        # print('Données récupérées', L)
        states, actions, transitions_with_action, transitions_without_action, reward = L['States'], L['Actions'], L['Transitions_with_action'], L['Transitions_without_action'], L['Rewards']
        print("-"*50 + "\n")

        # Pour savoir si on veut résoudre les problèmes détectés automatiquement

        print("\nSouhaitez-vous que les problèmes détéctés prochainement soient résolus automatiquement si cela est possible? (y/n)")
        time.sleep(0.5)
        reponse_resolution_pb = input()
        if reponse_resolution_pb == "y":
            resolution = True
        else:
            resolution = False


        # on peut alors initialiser les éléments de la chaine de markov
        self.states = {}
        self.liste_states = states
        self.actions = actions
        self.transitions_with_action = transitions_with_action
        self.transitions_without_action = transitions_without_action

        for state in states:
            self.states[f"{state}"] = States(state, transitions_with_action, transitions_without_action, reward, resolution)
            delete = [] # on crée une liste pour stocker les transitions à supprimer
            for transition in transitions_with_action: # On supprime les transitions avec action qui ne sont pas dans l'état actif mais qui ont le même état de départ du à une correction de chaine
                if transitions_with_action[transition]["from"] == state : 
                    if transitions_with_action[transition]['action'] not in self.states[f"{state}"].transitions_with_action : 
                        delete.append(transition)
            for transition in delete:
                del transitions_with_action[transition]

        # On vérifie que la chaine de markov est correctement définie et on résout les problèmes si possibles et si souhaité
        markov.parsing(self, states, resolution) # on vérifie qu'on a pas de problèmes de parsing

        
        markov.afficher(self,  etat = "default", file_name = "initial")
        print("\nLa chaine de markov a été initialisée, il est possible de la voir dans le fichier initial.png \n")
        
        print("Que souhaitez-vous faire ?")
        print("1. lancer un parcours")
        print("2. model checking d'un eventually")
        print("3. SMC quantitatif")
        print("4. SMC qualitatif")
        print("5. model checking d'un next")
        print("6. Algorithme d'itération de valeurs")
        print("7. Q learning\n")
        time.sleep(0.5)
        reponse = input()
        while reponse not in ["1", "2", "3","4", "5", "6", "7"]:
            reponse = input()
        if reponse == "1":
            markov.parcours(self)
        elif reponse == "2":
            print("Quel sont les états cibles ? Veuillez les rentrer un par un, et taper 'fin' quand vous avez fini")
            time.sleep(0.5)
            etat = input()
            etats_cibles = []
            while etat != "fin":
                if etat not in self.liste_states:
                    print("L'état", etat, "n'existe pas")
                    etat = input()
                else :
                    etats_cibles.append(etat)
                    etat = input()
            if self.actions[0] == "None":
                markov.eventually_check_DTMC(self, etats_cibles)
            else:
                markov.eventually_check_MDP(self, etats_cibles)
        elif reponse == "3" :
            if self.transitions_with_action == {}:
                markov.smc_quantitatif(self)
            else :
                print("SMC quantitatif impossible car la chaine de markov n'est pas déterministe")
                 # Il faut redemarrer le proramme pour pouvoir lancer autre chose
        elif reponse == "4" :
            if self.transitions_with_action == {}:
                markov.smc_qualitatif(self)
            else :
                print("SMC qualitatif impossible car la chaine de markov n'est pas déterministe")
                 # Il faut redemarrer le proramme pour pouvoir lancer autre chose
        elif reponse =="5":
            markov.next_check(self)
        elif reponse == "6":
            markov.algos_iterations(self)
        elif reponse == "7":
            markov.q_learning(self)
       
    def parsing(self, states, resolution):
        """
        On réalise différents tests pour vérifier que la chaine de markov (ou la MDP) est correctement définie

        Args
        ----
        states : list
            Liste des états de la chaines (avec possiblement des doublons)
        resolution : bool
            Booléen qui permet de savoir si on veut résoudre les problèmes détéctés automatiquement si possible
        """
        # On concatène les transitions avec et sans action pour plus de simplicité pour les test
        Warning = False # passe à True si on a un warning

        all_transitions = []
        transitions_without_action = []
        transitions_with_action = []
        for elem in self.transitions_without_action:
            transitions_without_action.append(self.transitions_without_action[elem])
            all_transitions.append(self.transitions_without_action[elem])
        for elem in self.transitions_with_action:
            transitions_with_action.append(self.transitions_with_action[elem])
            all_transitions.append(self.transitions_with_action[elem])

        # On vérifie que les états déclarés sont utilisés et qu'un état utilisé dans des transitions est déclaré
        used_states = []
        for transitions in all_transitions:
            if transitions["from"] not in self.states:
                print(f"Warning : l'état {transitions['from']} est utilisé dans une transition mais n'est pas déclaré")
                Warning = True
                if resolution:
                    # On rajoute l'état manquant
                    self.states[transitions["from"]] = States(transitions["from"], self.transitions_with_action, self.transitions_without_action)

            used_states.append(transitions["from"])
            used_states.append(transitions["targets"])
            for target in transitions["targets"]:
                if target not in self.states:
                    print(f"Warning : l'état {target} est utilisé dans une transition mais n'est pas déclaré")
                    Warning = True
                    if resolution:
                        # On rajoute l'état manquant
                        print("Rajout de l'état manquant")
                        self.states[target] = States(target, self.transitions_with_action, self.transitions_without_action, {})
        for state in self.states:
            if state not in used_states:
                print(f"Warning : l'état {state} est déclaré mais n'est pas utilisé")
                Warning = True
                if resolution:
                    # On rajoute une transition de l'état vers lui même pour éviter les erreurs
                    print("Rajout d'une transition de l'état vers lui même")
                    self.transitions_without_action[f"{state}"] = {'from': f'{state}', 'targets': [f"{state}"], 'weights': [10]}
                    self.states[f"{state}"].transitions_without_action = self.transitions_without_action[f"{state}"]

        # On vérifie qu'un état n'est pas déclaré plusieurs fois
        if len(states) != len(self.states): # self.states supprime automatiquement les doublons
            print("Warning : un état est déclaré plusieurs fois")
            Warning = True

        # On vérifie qu'un état possède bien une transition de sortie
        etats_sans_sortie = []
        dict_etats = self.states
        bool_etat_sans_sortie = False
        for state in dict_etats:
            if dict_etats[state].transitions_without_action == {} and dict_etats[state].transitions_with_action == {}:
                etats_sans_sortie.append(state)
                bool_etat_sans_sortie = True
                if resolution:
                    # On rajoute une transition de l'état vers lui même pour pour lui créer un état de sortie
                    self.transitions_without_action[f"{state}"] = {'from': f'{state}', 'targets': [f"{state}"], 'weights': [10]}
                    self.states[f"{state}"].transitions_without_action = {'from': f'{state}', 'targets': [f"{state}"], 'weights': [10]}

        if bool_etat_sans_sortie == True:
            print("Warning : les états suivants n'ont pas de transition de sortie : ", etats_sans_sortie)
            print(f"Rajout d'une transition des états {etats_sans_sortie} vers eux mêmes")

        # On vérifie que les actions ne sont pas déclarées plusieurs fois
        actions_uniques = set(self.actions)
        if len(actions_uniques) != len(self.actions):
            print("Warning : une action est déclarée plusieurs fois")
            Warning = True
            if resolution:
                print("Suppressions automatiques des actions déclarées plusieurs fois")
    
        # On vérifie qu'une action déclarée est utilisée
        actions_utilisés = []
        for transitions in transitions_with_action:
            actions_utilisés.append(transitions["action"])
        actions_utilisés = set(actions_utilisés) # on enlève les doublons
        if len(actions_utilisés) != len(self.actions) and self.actions != ["None"]: # On met None de base s'il n'y a pas d'action
            print("Warning : une action est déclarée mais n'est pas utilisée")
            Warning = True
            if resolution:
                print("Suppression des actions non utilisées")
                # On supprime les actions non utilisées
                for action in self.actions:
                    if action not in actions_utilisés:
                        self.actions.remove(action)

        # On vérifie que les actions utilisées sont déclarées
        for action in actions_utilisés:
            if action not in self.actions:
                print(f"Warning : l'action {action} est utilisée mais n'est pas déclarée")
                Warning = True
                if resolution:
                    print("Rajout de l'action manquante")
                    # On rajoute l'action manquante
                    if self.actions == ["None"]:
                        self.actions = [action]
                    else:
                        self.actions.append(action)

        if Warning :
            print("\n Écrire ok pour contiuer")
            print("-"*25 + "\n")
            while True:
                rep = input()
                if rep == "ok":
                    break
            
    def affichage_etat(self,etat_actif,choix = False, reward = False):
        """
        Affiche proprement l'état actif de la chaine de markov et permet de choisir une action le cas échéant

        Args
        ----
        etat_actif : état de la chaine de markov
            Etat actif de la chaine de markov
        choix : bool
            Si on doit afficher des choix pour des actions
        """
        nom = etat_actif.state
        print("-"*50)
        chaine_nom = f"| Etat actif : {nom}"
        l = len(chaine_nom)
        print(chaine_nom + " "*(49-l) + "|")
        if choix :
            chaine_choix = f"| choix possibles pour l'état {etat_actif.state}:"
            l = len(chaine_choix)
            print(chaine_choix + " "*(49-l) + "|")

            chaine_transi = f"| {list(etat_actif.transitions_with_action.keys())}"
            l = len(chaine_transi)
            print(chaine_transi + " "*(49-l) + "|")

            chaine_action = f"| choix de l'action  :"
            l = len(chaine_action)
            print(chaine_action + " "*(49-l) + "|")
            time.sleep(0.5)
            choix = input()
            while choix not in list(etat_actif.transitions_with_action.keys()): # on vérifie que l'action choisie est valide
                choix = input()
                if choix == "stop": # permet d'arrêter le programme à la main
                    break
            chaine_choix = f"| Vous avez choisi l'action {choix}"
            l = len(chaine_choix)
            print(chaine_choix + " "*(49-l) + "|")

        elif reward:
            chaine_choix = f"| Récompense pour l'état {etat_actif.state}:"
            l = len(chaine_choix)
            print(chaine_choix + " "*(49-l) + "|")
            time.sleep(0.5)
            choix = input()
            chaine_rec = f"| Vous avez choisi : {choix}"
            l = len(chaine_rec)
            print(chaine_rec + " "*(49-l) + "|")

            
        print("-"*50)
        print("\n")   
        return choix    

    def parcours(self, file_name="visu_parcours"): # on parcours la chaine (en faisant N étapes)
        """ 
        On parcours la chaine en faisant N étapes
        
        Args
        ----
        file_name : str
            Nom du fichier de sauvegarde de la vidéo
        """
        positionnel = False
        n_pos = False
        without_action = False
        
        print("Quel mode de parcours voulez-vous ?")
        print("1 : sans actions")
        print("2 : avec adversaire positionnel")
        print("3 : avec adversaire non positionnel")
        print("4 : avec adversaire choisi de manière aléatoire")
        time.sleep(0.5)
        choix_parcours = input()
        while choix_parcours not in ["1","2","3","4"]:
            choix_parcours = input()
        if choix_parcours == "1":
            without_action = True
        elif choix_parcours == "2":
            positionnel = True
        elif choix_parcours == "3":
            n_pos = True
        elif choix_parcours == "4":
            random_adv = True
        
        print("Combien d'étapes voulez-vous faire ?")
        time.sleep(0.5)
        N = int(input())

        etat_initial = self.liste_states[0] #état initial : premier élément
        etat_actif = self.states[etat_initial] # on prend l'objet correspondant à l'état initial
        reward_total = 0
        
        self.afficher(etat = etat_initial, file_name='image0')
        images = []
        images.append(imageio.imread('image0'+'.png'))

        detection_erreur = False
        if without_action:
        # On vérifie qu'on ne choisit pas le mode "sans actions" alors qu'il y en a
            for state in self.states:
                if len(self.states[state].transitions_with_action) != 0:
                    print(f"Error : il y a des transitions avec actions dans l'état {state}, il faut choisir le mode avec actions")
                    without_action = False
                    detection_erreur = True
                    break
            if detection_erreur:
                print("Voulez-vous changer de mode ? (y/n, si non, le programme s'arrête)")
                time.sleep(0.5)
                choix = input()
                while choix not in ["y","n"]:
                    choix = input()
                if choix == "y":
                    print("Quel mode de parcours voulez-vous ?")
                    print("1 : avec adversaire positionnel")
                    print("2 : avec adversaire non positionnel")
                    print("3 : avec adversaire choisi de manière aléatoire")
                    time.sleep(0.5)
                    choix_parcours = input()
                    while choix_parcours not in ["1","2","3"]:
                        choix_parcours = input()
                    if choix_parcours == "1":
                        positionnel = True
                    elif choix_parcours == "2":
                        n_pos = True
                    elif choix_parcours == "3":
                        random_adv = True
                else :
                    return
        
        if without_action:
            for i in range(N):
                markov.affichage_etat(self, etat_actif)
                # print(etat_actif.state) # on affiche l'état en cours
                poids = etat_actif.transitions_without_action["weights"]
                poids_total = np.sum(poids)
                poids = poids/poids_total # on normalise les poids pour qu'ils soient entre 0 et 1
                poids = np.cumsum(poids) # on fait la somme cumulée des poids, afin de pouvoir faire un tirage aléatoire (il ne faut pas que par exemple les deux probas soient de 0.5, il en faut une de 0.5 et l'autre de 1)

                choix = random.random() # tirage aléatoire entre 0 et 1

                for j in range(len(poids)): # on recherche l'état cible
                    if choix <= poids[j]:
                        reward_total += etat_actif.reward
                        etat_actif = self.states[etat_actif.transitions_without_action["targets"][j]]
                        break
                self.afficher(etat = etat_actif.state, file_name='image'+str(i+1))
                images.append(imageio.imread('image'+str(i+1)+'.png'))
            # create gif
            imageio.mimsave(file_name+'.gif', images, fps=3)
            for i in range(N+1):
                os.remove('image'+str(i)+'.png')

        elif positionnel == True or n_pos == True: # adversaire positionnel
            if positionnel :
                print("Choix d'un adversaire positionnel")
                adv_pos = {} # contient pour chaque état, le choix de l'adversaire
                for state in self.states :
                    etat = self.states[state]
                    if etat.transitions_with_action != {} : # si l'état possèdes des transitions avec actions
                        action = markov.affichage_etat(self, etat, choix = True)
                        adv_pos[state] = action

            for i in range(N):
                if etat_actif.transitions_with_action == {} : # si l'état n'a pas de transitions avec actions :
                    markov.affichage_etat(self, etat_actif)
                    poids = etat_actif.transitions_without_action["weights"]

                    poids_total = np.sum(poids)
                    poids = poids/poids_total # on normalise les poids pour qu'ils soient entre 0 et 1
                    poids = np.cumsum(poids) # on fait la somme cumulée des poids, afin de pouvoir faire un tirage aléatoire (il ne faut pas que par exemple les deux probas soient de 0.5, il en faut une de 0.5 et l'autre de 1)

                    choix = random.random() # tirage aléatoire entre 0 et 1

                    for j in range(len(poids)): # on recherche l'état cible
                        if choix <= poids[j]:
                            reward_total += etat_actif.reward
                            etat_actif = self.states[etat_actif.transitions_without_action["targets"][j]]
                            break
                else :
                    if n_pos == True :
                        action_choisie = markov.affichage_etat(self, etat_actif, choix =True)
                    else :
                        action_choisie = adv_pos[etat_actif.state]
                        print(f"Action choisie : {action_choisie}")
                        markov.affichage_etat(self, etat_actif)
                    poids = etat_actif.transitions_with_action[action_choisie]["weights"] # on ne prend que les poids de l'action choisie par l'adversaire
                    poids_total = np.sum(poids)
                    poids = poids/poids_total # on normalise les poids pour qu'ils soient entre 0 et 1
                    poids = np.cumsum(poids) # on fait la somme cumulée des poids, afin de pouvoir faire un tirage aléatoire (il ne faut pas que par exemple les deux probas soient de 0.5, il en faut une de 0.5 et l'autre de 1)

                    choix = random.random() # tirage aléatoire entre 0 et 1

                    for j in range(len(poids)): # on recherche l'état cible
                        if choix <= poids[j]:
                            reward_total += etat_actif.reward
                            etat_actif = self.states[etat_actif.transitions_with_action[action_choisie]["targets"][j]]
                            break
                        
                self.afficher(etat = etat_actif.state, file_name='image'+str(i+1))
                images.append(imageio.imread('image'+str(i+1)+'.png'))
            # create gif
            imageio.mimsave(file_name+'.gif', images, fps=3)
            for i in range(N+1):
                os.remove('image'+str(i)+'.png')
            for i in range(N+1):
                os.remove('image'+str(i))
                
        elif random_adv == True :
            for i in range(N):
                    actions_possibles = []
                    for action in etat_actif.transitions_with_action :  # on récupère les actions possibles
                        actions_possibles.append(action)
                    if len(actions_possibles) == 0 : # si l'état n'a pas de transitions avec actions :

                        poids = etat_actif.transitions_without_action["weights"]

                        poids_total = np.sum(poids)
                        poids = poids/poids_total # on normalise les poids pour qu'ils soient entre 0 et 1
                        poids = np.cumsum(poids) # on fait la somme cumulée des poids, afin de pouvoir faire un tirage aléatoire (il ne faut pas que par exemple les deux probas soient de 0.5, il en faut une de 0.5 et l'autre de 1)

                        choix = random.random() # tirage aléatoire entre 0 et 1

                        for j in range(len(poids)): # on recherche l'état cible
                            if choix <= poids[j]:
                                reward_total += etat_actif.reward
                                etat_actif = self.states[etat_actif.transitions_without_action["targets"][j]]
                                break
                    else :
                        action_choisie = random.choice(actions_possibles) # on choisit une action aléatoire
                        poids = etat_actif.transitions_with_action[action_choisie]["weights"] # on ne prend que les poids de l'action choisie par l'adversaire
                        poids_total = np.sum(poids)
                        poids = poids/poids_total # on normalise les poids pour qu'ils soient entre 0 et 1
                        poids = np.cumsum(poids) # on fait la somme cumulée des poids, afin de pouvoir faire un tirage aléatoire (il ne faut pas que par exemple les deux probas soient de 0.5, il en faut une de 0.5 et l'autre de 1)

                        choix = random.random() # tirage aléatoire entre 0 et 1

                        for j in range(len(poids)): # on recherche l'état cible
                            if choix <= poids[j]:
                                reward_total += etat_actif.reward
                                etat_actif = self.states[etat_actif.transitions_with_action[action_choisie]["targets"][j]]
                                break
                                           
                    self.afficher(etat = etat_actif.state, file_name='image'+str(i+1))
                    images.append(imageio.imread('image'+str(i+1)+'.png'))
            imageio.mimsave(file_name+'.gif', images,fps=3)
            for i in range(N+1):
                os.remove('image'+str(i)+'.png')
            for i in range(N+1):
                os.remove('image'+str(i))

        print(f"Reward total : {reward_total}")
        print("Le graphique est visible dans le fichier visu_parcours.gif")
    

    def parcours_SMC(self, length, etat): # on parcours la chaine (en faisant N étapes)
        """ 
        On parcours la chaine sans les actions, sans afficher le graphe. 
        
        Args:
            length (int): nombre max d'itération 
            etat (str): état d'arrivée
        
        """

        etat_initial = self.liste_states[0] #état initial : premier élément
        etat_actif = self.states[etat_initial] # on prend l'objet correspondant à l'état initial
        reward_total = 0
        if etat_actif.state == etat:
            return etat
                        
        for i in range(length):
            # print(etat_actif.state) # on affiche l'état en cours
            poids = etat_actif.transitions_without_action["weights"]
            poids_total = np.sum(poids)
            poids = poids/poids_total # on normalise les poids pour qu'ils soient entre 0 et 1
            poids = np.cumsum(poids) # on fait la somme cumulée des poids, afin de pouvoir faire un tirage aléatoire (il ne faut pas que par exemple les deux probas soient de 0.5, il en faut une de 0.5 et l'autre de 1)

            choix = random.random() # tirage aléatoire entre 0 et 1

            for j in range(len(poids)): # on recherche l'état cible
                if choix <= poids[j]:
                    reward_total += etat_actif.reward
                    etat_actif = self.states[etat_actif.transitions_without_action["targets"][j]]
                    break
            if etat_actif.state == etat:
                break
            
        return etat_actif.state


    
    def afficher(self, etat = "default", file_name = "graph"):
        
        """Fonction qui permet de représenter le graphe de la chaine de Markov avec ou sans action. Dans le mode "default", le graphe de base est affiché.
        Dans le mode "etat", on peut choisir un état et il sera mis en évidence en étant de couleur bleue.
         """
    
        # On récupère les données
        States = self.states
        Actions = self.actions
        Transitions_with_action = self.transitions_with_action
        Transitions_without_action = self.transitions_without_action

        # On crée le graphique
        G = gv.Digraph(format='png')

        if etat == "default":
            for state in States:
                G.node(state)
        else:
            for state in States:
                if state == etat:
                    G.node(state, color = 'blue', style = 'filled')
                else:
                    G.node(state)
        
        #On ajoute les actions
        for actions in Actions:
            G.node(actions, shape = 'point')
            
        for transition in Transitions_with_action:
            G.edge(Transitions_with_action[transition]['from'], Transitions_with_action[transition]['action'], label=str(Transitions_with_action[transition]['action']), color = 'red')
        

        # On ajoute les transitions sans action
        for transition in Transitions_without_action:
            for i in range(len(Transitions_without_action[transition]['targets'])):
                G.edge(Transitions_without_action[transition]['from'],Transitions_without_action[transition]['targets'][i], label = str(Transitions_without_action[transition]['weights'][i])) 

        # On ajoute les transitions avec action
        for transition in Transitions_with_action:
            for i in range(len(Transitions_with_action[transition]['targets'])):
                G.edge(Transitions_with_action[transition]['action'],Transitions_with_action[transition]['targets'][i], label = str(Transitions_with_action[transition]['weights'][i]))
        
        G.render(file_name, view=False)

    def eventually_check_DTMC(self, S1):
        """ Model Checking du eventually pour une chaine de Markov sans action.
        
        Args
        ----
        S1 : list
            Liste des états qu'on cherche à atteindre
        """
        

        S_int = [x for x in self.liste_states if x not in S1] # on crée la liste des étates S_? ("int" pour interrogation)
        S0 = [] # on crée la liste des états S_0 qu'on remplira par récurrence
        S_int_2 = S_int.copy()

        for state in S_int : # Algorithme de recherche en profondeur qu'on applique à chaque état de S_int
            pile = []
            pile.append(state)
            etats_marques = [state]
            while pile != []:
                etat = pile[-1]
                etat_actif = self.states[etat]
                if etat in S1 :
                    break
                etats_faisables = [etat for etat in etat_actif.transitions_without_action["targets"] if etat not in etats_marques]
                if etats_faisables != [] :
                    pile.append(etats_faisables[0])
                    etats_marques.append(etats_faisables[0])
                else:
                    pile.pop()
            if pile == []:
                S_int_2.remove(state)
                S0.append(state) # on ajoute l'état à S0 si on ne peut pas atteindre un état de S1

        print(f"Les états S_0 sont : {S0}")
        print(f"Les états S_? sont : {S_int_2}")
        print(f"Les états S_1 sont : {S1}")

        A = np.zeros((len(S_int_2),len(S_int_2))) # Création de la matrice A
        for i in range(len(S_int_2)):
            etat_actif = self.states[S_int_2[i]]
            for j in range(len(S_int_2)):
                if S_int_2[j] in etat_actif.transitions_without_action["targets"]:
                    index = etat_actif.transitions_without_action["targets"].index(S_int_2[j])
                    A[i,j] = etat_actif.transitions_without_action["weights"][index]/(np.sum(etat_actif.transitions_without_action["weights"])) # on remplit la matrice A avec les bons poids

        b = np.zeros((len(S_int_2),1)) # Création du vecteur b
        for i in range(len(S_int_2)):
            etat_actif = self.states[S_int_2[i]]
            for transitions in etat_actif.transitions_without_action["targets"]:
                if transitions in S1:
                    index = etat_actif.transitions_without_action["targets"].index(transitions)
                    b[i,0] += etat_actif.transitions_without_action["weights"][index]/(np.sum(etat_actif.transitions_without_action["weights"])) # on remplit le vecteur b avec les bons poids

        # On résout (Id-A)^-1 * b
        Id = np.identity(len(S_int_2))
        y = np.dot(np.linalg.inv(Id - A),b)

        print(f"La probabilité de respecter la propriété à partir de l'état initial {self.liste_states[0]} est : {y[0,0]}")
        print(" vecteur y total :")
        print(y)

    def smc_quantitatif(self):
        print("-"*50)
        print("Quel sont les états cibles ? Veuillez les rentrer un par un, et taper 'fin' quand vous avez fini")
        time.sleep(0.5)
        etat = input()
        etats_cibles = []
        while etat != "fin":
            if etat not in self.liste_states:
                print("L'état", etat, "n'existe pas")
                etat = input()
            else :
                etats_cibles.append(etat)
                etat = input()
        print("Etats cibles : ", etats_cibles)
        print("Précision souhaitée :")
        time.sleep(0.5)
        precision = float(input())
        print("Précision choisie : ", precision)
        print("Erreur souhaitée :")
        time.sleep(0.5)
        erreur = float(input())
        print("Erreur choisie : ", erreur)
        print("Longueur des chaines voulues :")
        time.sleep(0.5)
        longueur = int(input())
        print("Longueur des chaines : ", longueur)
        print("-"*50)

        # Calcul de N (nombre d'itérations)
        N =int((np.log(2)-np.log(erreur))/(2*precision)**2)+1

        print("Calcul en cours.. \n")
        # On lance un certain nombre d'itération pour chaque état cible
        res_prob = [] # vecteur des probabilités d'atteindre les étates
        for i in range(len(etats_cibles)): 
            lancers = [markov.parcours_SMC(self, longueur, etats_cibles[i]) for _ in range(N)]
            res_prob.append(round(sum([1 for x in lancers if x==etats_cibles[i]])/N,5))
        print(f"Les probabilité d'atteindre chacun des états cibles {etats_cibles} sont : {res_prob}")

        return

    def smc_qualitatif(self):
        print("-"*50)
        print("Quel est l'état cible ?")
        time.sleep(0.5)
        etat = input()
        while etat not in self.liste_states:
            print("L'état", etat, "n'existe pas")
            etat = input()
        print("Etat cible : ", etat)
        print("Valeur de alpha :")
        time.sleep(0.5)
        alpha = float(input())
        print("alpha = ", alpha)
        print("Valeur de beta :")
        time.sleep(0.5)
        beta = float(input())
        print("beta = ", beta)
        print("Valeur de théta :")
        time.sleep(0.5)
        theta = float(input())
        print("théta = ", theta)
        print("Valeur de epsilon (zone d'indifférence) :")
        time.sleep(0.5)
        epsilon = float(input())
        print("epsilon = ", epsilon)
        print("Longueur des chaines voulues :")
        time.sleep(0.5)
        longueur = int(input())
        print("Longueur des chaines = ", longueur)
        print("-"*50)

        print("Calcul en cours.. \n")

        gamma0 = theta + epsilon
        gamma1 = theta - epsilon
        A = (1-beta)/alpha
        B = beta/(1-alpha)
        Vadd = np.log(gamma1/gamma0)
        Vrem = np.log((1-gamma1)/(1-gamma0))
        FA = np.log(A)
        FB = np.log(B)
        Fm = 0
        m=0 # nombtre de lancers
        dm=0 # nombre de lancers acceptés
        while True:
            lancer = markov.parcours_SMC(self, longueur, etat)
            m+=1
            if  lancer == etat:
                Fm = Fm + Vadd
                dm+=1
            else :
                Fm = Fm + Vrem
            if Fm > FA :
                print(f"La probabilité d'atteindre l'état {etat} est inférieure à {theta}")
                print("Le nombre de simulations nécessaires pour démontrer ce résultat est de", m)
                break
            elif Fm < FB :
                print(f"La probabilité d'atteindre l'état {etat} est bien supérieure ou égal à {theta}")
                print("Le nombre de simulations nécessaires pour démontrer ce résultat est de", m)
                break

    def next_check(self):
        print("-"*50)
        print("Quel sont les états cibles ? Veuillez les rentrer un par un, et taper 'fin' quand vous avez fini")
        time.sleep(0.5)
        etat = input()
        etats_cibles = []
        while etat != "fin":
            if etat not in self.liste_states:
                print("L'état", etat, "n'existe pas")
                etat = input()
            else :
                etats_cibles.append(etat)
                etat = input()
        print("Les états cibles sont :", etats_cibles)

        print("Quel est l'état initial du calcul du next ?")
        time.sleep(0.5)
        etat_initial = input()
        while etat_initial not in self.liste_states:
            print("L'état", etat, "n'existe pas")
            etat_initial = input()
        print("L'état initial est :", etat_initial)
        print("-"*50)
        print("Calcul en cours.. \n")

        probas = 0
        etat_actif = self.states[etat_initial]
        if etat_actif.transitions_without_action != {}:
            for i,etat in enumerate(etat_actif.transitions_without_action["targets"]):
                if etat in etats_cibles:
                    probas += etat_actif.transitions_without_action["weights"][i]
            print("Probabilité du next à partir de l'état initial :", probas/(np.sum(etat_actif.transitions_without_action["weights"])))
        else :
            actions_possibles = []
            for action in etat_actif.transitions_with_action :  # on récupère les actions possibles pour l'état actif
                actions_possibles.append(action)
            res_matrice = np.empty((len(actions_possibles),2), dtype = object)
            res_matrice[:,0] = actions_possibles
            for action in actions_possibles:
                probas = 0
                for i,etat in enumerate(etat_actif.transitions_with_action[action]["targets"]):
                    if etat in etats_cibles:
                        probas += etat_actif.transitions_with_action[action]["weights"][i]
                res_matrice[i,1] = probas/probas/(np.sum(etat_actif.transitions_with_action[action]["weights"]))
            print(f"Probabilité du next à partir de l'état {etat_initial} pour chaque action possible:")
            print(res_matrice)

        
    def calcul_max(self,etat_actif,V0,gamma):
        max = etat_actif.reward
        # print(f"Etat actif {etat_actif.state}reward {etat_actif.reward}")
        max_action = None
        somme = 0
        
        actions_possibles = []
        for action in etat_actif.transitions_with_action :  # on récupère les actions possibles
            actions_possibles.append(action)

        if actions_possibles != []: #Si l'état possèdes des transitions avec actions
            for action in actions_possibles:
                for i,etat in enumerate(etat_actif.transitions_with_action[action]["targets"]): # Si l'état possède des transitions avec une action
                    # On calcule la probabilité de transition de l'état actif vers l'état i en passant par l'action action
                    poids = etat_actif.transitions_with_action[action]["weights"][i]/np.sum(etat_actif.transitions_with_action[action]["weights"])
                    somme += poids*V0[self.liste_states.index(etat)]
                if max < etat_actif.reward + gamma*somme:
                    max = etat_actif.reward + gamma*somme
                    max_action = action
                somme = 0
        else :
            for i,etat in enumerate(etat_actif.transitions_without_action["targets"]): # Si l'état possède des transitions avec une action
                # On calcule la probabilité de transition de l'état actif vers l'état i en passant par l'action action
                poids = etat_actif.transitions_without_action["weights"][i]/np.sum(etat_actif.transitions_without_action["weights"])
                somme += poids*V0[self.liste_states.index(etat)]

            if max < etat_actif.reward + gamma*somme:
                max = etat_actif.reward + gamma*somme
                max_action = None
        return(max, max_action)

    
    def algos_iterations(self):
            """
            Calcul de meilleur adversaire à partir d'un algorithme par itérations
            """
            print("-"*50)
            print("Choix de gamma :")
            time.sleep(0.5)
            gamma = float(input())
            print("Gamma choisi :", gamma)
            print("Choix de epsilon :")
            time.sleep(0.5)
            epsilon = float(input())
            print("Epsilon choisi :", epsilon)
            print("Nombre d'itérations avant arrêt forcé (au cas où il n'y pas de convergence) :")
            time.sleep(0.5)
            nb_iter_max = int(input())
            print("Nombre d'itérations max :", nb_iter_max)
            print("-"*50)
            print("Calcul de Vn par itérations...\n")
            # Initialisation du vecteur
            V0 = np.zeros((len(self.liste_states),1))
            V1 = V0 + 2*epsilon # Pour être sur de ne pas sortir de la boucle à la première itération 
            nb_iter=0
            while True and nb_iter < nb_iter_max:
                for i,etat in enumerate(self.liste_states):
                    etat_actif = self.states[etat]
                    V1[i], _ = markov.calcul_max(self,etat_actif, V0, gamma)
                nb_iter+=1
                if np.linalg.norm(V1-V0) < epsilon:
                    break
                else :
                    V0 = np.copy(V1)
            print("-"*50)
            if nb_iter == nb_iter_max:
                print("Le nombre d'itérations maximum a été atteint")
            print("Valeur de Vn :")
            print(V1)
            print("Nombre d'itérations nécessaires pour la couvergence (s'il y a convergence): ", i)
            print("-"*50 + "\n")
            adv_choisi = np.empty((len(self.liste_states),2), dtype = object)
            adv_choisi[:,0] = self.liste_states
            for i,etat in enumerate(self.liste_states):
                etat_actif = self.states[etat]
                _, max_action = markov.calcul_max(self,etat_actif, V0, gamma)
                adv_choisi[i,1] = max_action
            
            print("-"*50)
            print("Adversaire optimal choisi (si None, c'est qu'il n'y a pas d'actions de disponibles) : ")
            print(adv_choisi)
            print("-"*50 + "\n")
            return

    def parcours_q_learning(self, etat, action_param):
        """
        Fonction de parcours particulière pour le q_learning
        On ne fait qu'une étape à chaque fois, en partant d'un état et d'une action en particulier

        Args
        ----
        etat : str
            état de départ (nom de l'état)
        action : str
            action à effectuer (nom de l'action)

        Returns
        -------
        etat_suivant : str
            état d'arrivée
        reward : float
            reward associé à l'état dont on sort
        """

        etat_initial = self.liste_states[self.liste_states.index(etat)] #état initial : état donné en paramètre
        etat_actif = self.states[etat_initial] # on prend l'objet correspondant à l'état initial     

        actions_possibles = []
        for action in etat_actif.transitions_with_action :  # on récupère les actions possibles
            actions_possibles.append(action)
        if len(actions_possibles) == 0 : # si l'état n'a pas de transitions avec actions :
            poids = etat_actif.transitions_without_action["weights"]

            poids_total = np.sum(poids)
            poids = poids/poids_total # on normalise les poids pour qu'ils soient entre 0 et 1
            poids = np.cumsum(poids) # on fait la somme cumulée des poids, afin de pouvoir faire un tirage aléatoire (il ne faut pas que par exemple les deux probas soient de 0.5, il en faut une de 0.5 et l'autre de 1)

            choix = random.random() # tirage aléatoire entre 0 et 1

            for j in range(len(poids)): # on recherche l'état cible
                if choix <= poids[j]:
                    reward_total = etat_actif.reward
                    etat_actif = self.states[etat_actif.transitions_without_action["targets"][j]]
                    break
        else :
            if action_param not in actions_possibles : # si l'action n'est pas possible, on choisit une action aléatoire
                action_choisie = random.choice(actions_possibles)
            else :
                action_choisie = action_param # on choisit l'action passée en argument
            
            poids = etat_actif.transitions_with_action[action_choisie]["weights"] # on ne prend que les poids de l'action choisie par l'adversaire
            poids_total = np.sum(poids)
            poids = poids/poids_total # on normalise les poids pour qu'ils soient entre 0 et 1
            poids = np.cumsum(poids) # on fait la somme cumulée des poids, afin de pouvoir faire un tirage aléatoire (il ne faut pas que par exemple les deux probas soient de 0.5, il en faut une de 0.5 et l'autre de 1)

            choix = random.random() # tirage aléatoire entre 0 et 1

            for j in range(len(poids)): # on recherche l'état cible
                if choix <= poids[j]:
                    reward_total = etat_actif.reward
                    etat_actif = self.states[etat_actif.transitions_with_action[action_choisie]["targets"][j]]
                    break
        return etat_actif.state, reward_total

    def q_learning(self):
        """
        Implémentation du Q learning
        """
        print("-"*50)
        print("Choix de gamma :")
        time.sleep(0.5)
        gamma = float(input())
        print("Gamma choisi :", gamma)
        print("Choix du nombre d'itérations total (Ttot) :")
        time.sleep(0.5)
        Ttot = int(input())
        print("Ttot choisi :", Ttot)
        print("Souhaitez-vous l'affichage de la matrice Q ? (y/n)")
        time.sleep(0.5)
        affichage = input()
        while affichage != "y" and affichage != "n":
            affichage = input()
        print("-"*50 + "\n")

        print("Calcul en cours...\n")
        print("-"*50)

        etats = self.liste_states
        actions = self.actions
        alpha_tab = np.zeros((len(etats),len(actions))) # alpha est une fonction décroissante (plus on a rencontré un état, plus changements effectués seront faibles)
        Q = np.zeros((len(etats),len(actions))) # Création de la matrice Q
        st = self.liste_states[0]
        st_next = st
        DejaVu = False
        for t in range(Ttot):
            if st_next == st and DejaVu == True: # on a déjà vu cet état deux fois d'affilées, on change d'état
                st = "S0"
                DejaVu = False
            elif st_next == st and DejaVu == False:
                DejaVu = True
            else :
                DejaVu = False
                st = st_next
            if random.random() <0.9: # dilemme exploitation/exploration
                i = np.argmax(Q[etats.index(st),:]) # Dans la plupart des cas, ou prend la "meilleure" action
                at = actions[i]
            else :
                at = random.choice(actions) # on choisit une action au hasard dans qq cas
            (st_next,rt) = markov.parcours_q_learning(self,st,at) # On effectue le mouvement
            etat_index = etats.index(st)
            action_index = actions.index(at)

            delta = rt + gamma*max(Q[etats.index(st_next),:]) - Q[etat_index,action_index]
            alpha_tab[etat_index,action_index]+=1
            Q[etat_index,action_index] += (1/alpha_tab[etat_index,action_index])*delta
        
        if affichage == "y":
            print("Matrice Q :")
            print(Q)

        # On fait l'argmax sur chaque ligne de la matrice Q pour avoir le choix optimal pour chaque état
        mat_choix = np.argmax(Q, axis=1)
        res_choix = np.empty(dtype=object, shape=(len(etats),2))
        for i in range(len(etats)):
            # if etats[i] has no transitions with actions :
            if len(self.states[etats[i]].transitions_with_action) == 0:
                res_choix[i,1] = "None"
            else :
                res_choix[i,1] = actions[mat_choix[i]]
        res_choix[:,0] = etats
        print("Choix optimal pour chaque état (Si None c'est qu'il n'y pas de transitions avec actions pour l'état) :")
        print(res_choix)
        print("-"*50)
    
        return Q
    
        
    def eventually_check_MDP(self,S1) : 
        """ Model Checking du eventually pour une chaine de Markov avec action.
        
        Args
        ----
        S1 : list
            Liste des états qu'on cherche à atteindre
        """
        n_etat = len(self.liste_states) - len(S1) # On ne compte pas les états qu'on veut atteindre 
        n_action = [] # Nombre d'actions possibles pour chaque état
        
        for i in range(n_etat):
            n_action.append(len(self.states[self.liste_states[i]].transitions_with_action))
        
        n_tot_ligne = 0 
        for i in range(n_etat):
            if n_action[i] != 0:
                n_tot_ligne += n_action[i] + 2
            else:
                n_tot_ligne += 3
            
        # On crée la matrice A (avec n_etat colonnes et n_etat*(n_action+2) lignes)
        A = np.zeros((n_tot_ligne,n_etat))
        
        # On crée le vecteur b 
        b = np.zeros((n_tot_ligne,1))
        
        # On remplit la matrice A et le vecteur b

        # Pour chaque bloc de taille (n_action[i]+2) lignes et n_etat colonnes on remplit la matrice A 
        indice = 0
        for i in range(n_etat):
            etat_actif = self.states[self.liste_states[i]]
            # noms des actions possibles pour l'état i
            if n_action[i] != 0:
                for j in range(n_action[i]):
                    nom_action = list(etat_actif.transitions_with_action.keys())[j]
                    for k in range(n_etat):
                        etat_cible = self.states[self.liste_states[k]]
                        index_cible = -10
                        if etat_cible.state in etat_actif.transitions_with_action[nom_action]["targets"]:
                            index_cible = self.states[self.liste_states[i]].transitions_with_action[nom_action]["targets"].index(etat_cible.state)
                        if k == i:
                            if index_cible != -10:
                                A[indice+j,k] = 1 - self.states[self.liste_states[i]].transitions_with_action[nom_action]["weights"][index_cible]  / np.sum(self.states[self.liste_states[i]].transitions_with_action[nom_action]["weights"])
                            else:
                                A[indice+j,k] = 1
                        else:
                            if index_cible != -10:    
                                A[indice+j,k] = - self.states[self.liste_states[i]].transitions_with_action[nom_action]["weights"][index_cible]  / np.sum(self.states[self.liste_states[i]].transitions_with_action[nom_action]["weights"])
                            else:
                                A[indice+j,k] = 0
                for transitions in etat_actif.transitions_with_action[nom_action]["targets"]:
                    if transitions in S1:
                        index_cible = etat_actif.transitions_with_action[nom_action]["targets"].index(transitions)
                        b[indice+j] = self.states[self.liste_states[i]].transitions_with_action[nom_action]["weights"][index_cible]  / np.sum(self.states[self.liste_states[i]].transitions_with_action[nom_action]["weights"])
                indice += n_action[i]
                
            else:
                for j in range(1):
                    if j == 0:
                        for k in range(n_etat):
                            etat_cible = self.states[self.liste_states[k]]
                            index_cible = -10
                            if etat_cible.state in etat_actif.transitions_without_action["targets"]:
                                index_cible = self.states[self.liste_states[i]].transitions_without_action["targets"].index(etat_cible.state)
                            if k == i:
                                if index_cible != -10:
                                    A[indice+j,k] = 1 - self.states[self.liste_states[i]].transitions_without_action["weights"][index_cible]  / np.sum(self.states[self.liste_states[i]].transitions_without_action["weights"])
                                else:
                                    A[indice+j,k] = 1
                            else:
                                if index_cible != -10:    
                                    A[indice+j,k] = - self.states[self.liste_states[i]].transitions_without_action["weights"][index_cible]  / np.sum(self.states[self.liste_states[i]].transitions_without_action["weights"])
                                else:
                                    A[indice+j,k] = 0
                    for transitions in etat_actif.transitions_without_action["targets"]:
                        if transitions in S1:
                            index_cible = etat_actif.transitions_without_action["targets"].index(transitions)
                            b[indice+j] = self.states[self.liste_states[i]].transitions_without_action["weights"][index_cible]  / np.sum(self.states[self.liste_states[i]].transitions_without_action["weights"])
                indice += 1

            A[indice,i] = -1
            A[indice+1,i] = 1
            b[indice] = -1
            indice += 2
        
        print(A)
        print(b)
        A = -1*A
        b = -1*b
                    
        # On résout Ax>=b   avec comme fonction objectif (1,1,...,1)
        c = np.ones((n_etat,1))
        res = linprog(c, A_ub=A, b_ub=b, bounds=(None,None), method='simplex')
        print(f"Bravo ! L'algorithme converge ! La probabilité d'atteindre l'ensemble à partir des autre sommets est : \n")
        liste_res = []
        for etat in self.liste_states:
            if etat not in S1:
                liste_res.append(etat)
        for etat in liste_res:
            print(f"Etat {etat} : {res.x[self.liste_states.index(etat)]}")



    def __repr__(self):
        return (f"States : {self.states} \n Actions : {self.actions}")

In [3]:
M = markov(fichier_mdp="chaines/casino.mdp")

ANTLR runtime and generated code versions disagree: 4.11.1!=4.12.0
ANTLR runtime and generated code versions disagree: 4.11.1!=4.12.0
States: ['S0', 'S1', 'S2', 'S3', 'S4']
Actions: ['a', 'b']
Transition from S0 with action a and targets ['S1', 'S2'] with weights [5, 5]
Transition from S0 with action b and targets ['S3', 'S4'] with weights [1, 9]
Transition from S1 with no action and targets ['S1'] with weights [10]
Transition from S3 with no action and targets ['S0'] with weights [10]
Transition from S2 with no action and targets ['S0'] with weights [10]
Transition from S4 with no action and targets ['S0'] with weights [10]
{'States': ['S0', 'S1', 'S2', 'S3', 'S4'], 'Actions': ['a', 'b'], 'Transitions_with_action': {0: {'from': 'S0', 'action': 'a', 'targets': ['S1', 'S2'], 'weights': [5, 5]}, 1: {'from': 'S0', 'action': 'b', 'targets': ['S3', 'S4'], 'weights': [1, 9]}}, 'Transitions_without_action': {2: {'from': 'S1', 'targets': ['S1'], 'weights': [10]}, 3: {'from': 'S3', 'targets': [

  res = linprog(c, A_ub=A, b_ub=b, bounds=(None,None), method='simplex')
