# 1. Développement d'un jeu

In [97]:
# imports
import random
import numpy as np
import matplotlib.pyplot as plt

In [98]:
# configuration
GAMMA = 0.96
ALPHA = 0.81


In [99]:
#for the space
"""
by default the default space is used
the space is a 2D array 4 * 4 of characters
the characters are: 
    'S' : the starting point
    '_' : empty space
    'J' : the goal
    'D' : a dragon
the default space is:
    S___
    D_D_ 
    ___D
    _D_J
"""
def get_default_space():
    return [
        ['S', '_', '_', '_'],
        ['D', '_', 'D', '_'],
        ['_', '_', '_', 'D'],
        ['_', 'D', '_', 'J']
    ]

# a random space with number of lines and columns and a number of dragons
def get_random_space(lines, columns, dragons):
    space = []
    for l in range(lines):
        space.append([])
        for c in range(columns):
            space[l].append('_')

    space[0][0] = 'S'
    space[lines-1][columns-1] = 'J'
    
    i = 0
    while i < dragons:
        l = random.randint(0, lines-1)
        c = random.randint(0, columns-1)
        if space[l][c] == '_':
            space[l][c] = 'D'
            i += 1
        else:
            i -= 1

    return space

# pretty print the space
def print_space(space):
    for l in space:
        for c in l:
            print(c, end='| ')
        
        print()

# get the size of the lines
def get_lines_size(space):
    return len(space)

# get the size of the columns
def get_columns_size(space):
    return len(space[0])

# get the size of the space
def get_size(space):
    return get_lines_size(space) * get_columns_size(space)
    

In [100]:
# a static class for the rewards
class Rewards:
    # the rewards for each character
    rewards = {
        'S': 0,
        '_': 0,
        'J': 1,
        'D': -1
    }

    # a method to get the reward of a character
    def get_reward(character):
        return Rewards.rewards.get(character)

    # a method to set the reward 
    def set_rewards(rewards):
        Rewards.rewards = rewards

    # a method to set the reward of a character
    def set_reward(character, reward):
        Rewards.rewards[character] = reward

In [101]:
# a static class for the Directions
class Directions:
    # the directions
    directions = ["HAUT", "DROITE", "BAS", "GAUCHE"]

    # a method to get the size of the directions
    def get_size():
        return len(Directions.directions)

    # a method to get the index of a direction
    def get_index(direction):
        return Directions.directions.index(direction)

    # a method to get a random direction
    def get_random_direction():
        return random.choice(Directions.directions)

    # a method to get the direction that maximizes the Q value
    def get_max_direction(mat_q, state):
        return Directions.directions[np.argmax(mat_q[state])]

In [142]:
# for the Q matrix
# init the Q matrix with zeros and the size of the space and the directions length
def init_mat_q(space):
    return np.zeros((get_lines_size(space), get_columns_size(space), Directions.get_size()))

# get the Q value of a state and a direction
def get_q_value(mat_q, state, direction):
    return mat_q[state][Directions.get_index(direction)]

# update the Q matrix
# according to state, action, reward, next_state, ALPHA and GAMMA
def update_mat_q(mat_q, state, action, reward, next_state):
    mat_q[state][Directions.get_index(action)] += ALPHA * (reward + GAMMA * np.max(mat_q[next_state]) - mat_q[state][Directions.get_index(action)])
    return mat_q

# pretty print the space
def print_mat_q(mat_q, space):
    def get_best_direction(l, c):
        return Directions.directions[np.argmax(mat_q[l][c])]

    for l in range(get_lines_size(space)):
        for c in range(get_columns_size(space)):
            case      = space[l][c]
            direction = get_best_direction(l, c)
            q_value   = str(round(get_q_value(mat_q, (l, c), direction), 2)).ljust(6)

            content = case + " (" + q_value + ") " + direction
            print(content.ljust(20), end='| ')
        print("\n_______________________________________________________________________________________")

In [103]:
def isWin (space, position):
    (l,c) = position

    if(space[l][c]== 'J'):
        return True

# a method to apply an action to the player
# returns [position, reward, fin]
def applicaion_action(action, position, space):

    # reward -1 every time
    reward = -1

    (l, c) = position
    nextPos = position

    if action == "HAUT":
        nextPos = (l-1,c)
    elif action == "DROITE":
        nextPos = (l,c+1)
    elif action == "BAS":
        nextPos = (l+1,c);
    elif action == "GAUCHE":
        nextPos = (l,c-1);

    # check if the next position is in the space
    if (nextPos[0] < len(space) and nextPos[1] < len(space) and nextPos[0] >=0 and nextPos[1] >=0 ):
        position = nextPos

        # back to the starting point if a dragon is encountered
        # get the current case in the space
        case = space[position[0]][position[1]]
        if case == 'D':
            position = (0, 0)
    
        # set the reward
        reward += Rewards.get_reward(case)

    # check if the player is at the goal
    fin = isWin(space, position)

    # if the player is at the goal, back to the starting point
    if fin:
        position = (0, 0)

    return [position, reward, fin]

# 2. Développement du Q-learning

In [121]:
# a class for the game
class Game:
    # constructor that takes :
    # number of episodes : 10000 by default
    # number of steps : 100 by default
    # is_random_space : False by default
    # a Q matrix : initialized with zeros (with the size of the space and the number of directions)
    def __init__(self, episodes = 10000, steps = 100, is_random_space = False):
        self.episodes = episodes
        self.steps = steps
        # the space
        if is_random_space:
            self.space = get_random_space(4, 4, 3)
        else:
            self.space = get_default_space()
    
        # the Q matrix 
        self.mat_q = init_mat_q(self.space)

    # a method to choose an action with the epsilon greedy policy
    def choose_action(self, state, epsilon, mat_q):
        if random.random() < epsilon:
            return Directions.get_random_direction()
        else:
            return Directions.get_max_direction(mat_q, state)

    # a method to play one step (with mat_q, state, epsilon)
    def oneStep(self, mat_q, state, epsilon, verbose):
        # choose an action
        action = self.choose_action(state, epsilon, mat_q)
        if verbose:
            print(action, end=', ')
        # apply the action
        new_state, reward, fin = applicaion_action(action, state, self.space)
        # update the Q matrix
        new_q = update_mat_q(mat_q, state, action, reward, new_state)
        return new_q, new_state, fin

In [144]:
# PLAY
game = Game()

total_steps = 0

# apply the algorithm 
for episode in range(game.episodes):
    # reset the position
    position = (0, 0)
    # calculate the epsilon
    epsilon = game.episodes / (game.episodes + episode)
    #print("epsilon : ", epsilon)

    # play the game
    for step in range(1, game.steps):
        # play one step
        game.mat_q, position, fin = game.oneStep(game.mat_q, position, epsilon, False)
        #print("position : ", game.position)

        # if the game is finished
        if fin:
            total_steps += step
            break

print("total steps : ", total_steps)
print("average steps : ", total_steps / game.episodes)

print(game.mat_q)

total steps :  238132
average steps :  23.8132
[[[-21.396843   -21.24671146 -22.396843   -21.396843  ]
  [-21.24671146 -21.396843   -21.09032444 -21.396843  ]
  [-21.396843   -21.54096928 -22.396843   -21.24671146]
  [-21.54096928 -21.54096928 -21.67933051 -21.396843  ]]

 [[  0.           0.           0.           0.        ]
  [-21.24671146 -22.396843   -20.92742129 -22.396843  ]
  [  0.           0.           0.           0.        ]
  [-21.54096928 -21.67933051 -22.396843   -22.396843  ]]

 [[-22.396843   -20.92742129 -21.24671146 -21.09032444]
  [-21.09032444 -20.75773051 -22.396843   -21.09032444]
  [-22.396843   -22.396843   -20.58096928 -20.92742129]
  [  0.           0.           0.           0.        ]]

 [[-21.09032444 -22.396843   -21.24671146 -21.24671146]
  [  0.           0.           0.           0.        ]
  [-20.75773051 -20.396843   -20.58096928 -22.396843  ]
  [  0.           0.           0.           0.        ]]]


In [146]:
# play a with the optimal policy
for step in range(1, game.steps):
    # play one step
    game.mat_q, position, fin = game.oneStep(game.mat_q, position, 0, True)
    if fin:
        print("fin de partie en", step, "coups")
        break

DROITE, BAS, BAS, DROITE, BAS, DROITE, fin de partie en 6 coups


In [143]:
print_mat_q(game.mat_q, game.space)

S (-21.25) DROITE   | _ (-21.09) BAS      | _ (-21.25) GAUCHE   | _ (-21.4 ) GAUCHE   | 
_______________________________________________________________________________________
D (0.0   ) HAUT     | _ (-20.93) BAS      | D (0.0   ) HAUT     | _ (-21.54) HAUT     | 
_______________________________________________________________________________________
_ (-20.93) DROITE   | _ (-20.76) DROITE   | _ (-20.58) BAS      | D (0.0   ) HAUT     | 
_______________________________________________________________________________________
_ (-21.09) HAUT     | D (0.0   ) HAUT     | _ (-20.4 ) DROITE   | J (0.0   ) HAUT     | 
_______________________________________________________________________________________


# Deep Q-Learning

In [None]:
# IMPORTS
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import sys


##### Test avec une structure 2 couches denses ayant 16 entrées (nombre de cases) et 4 sorties (4 actions)


In [None]:
# 1 modifier la fonction choose_action avec en sortir model.predict(vec_etat)
def choose_action(vect_etat, epsilon, model):
    # L'agent est dans un certain état s, on choisit une action a selon :

    # Au hasard avec une probabilité epsilon
    if random.random() < epsilon:
        # on choisit une action aléatoire
        action = random.choice(DIRECTIONS)
    else:
        # La meilleure avec une probabilité 1-epsilon
        Sortie_Q = model.predict(vect_etat, verbose=0)  # En entrée le vecteur symbolisant l'état
        action = DIRECTIONS[np.argmax(Sortie_Q)] #On sélectionne l'action associée avec la sortie max
    return action


In [None]:
# creation du model
# Une structure simple avec 16 entrées et 4 sorties, la sortie est sans activation
model = Sequential([
    Dense(4, activation='relu', input_shape=[16]),
    Dense(4, activation='relu'),
    Dense(4),
])

# En préambule création d’un second modèle
model_stable = keras.models.clone_model(model)
model_stable.set_weights(model.get_weights())

In [None]:
# choix de l'optimiseur
optimizer = tf.keras.optimizers.Adam(learning_rate=0.01) 

# on va définir la fonction de perte
loss_fn = tf.keras.losses.mean_squared_error

# on va définir la dérivé de la fonction d'activation

# relu
@tf.custom_gradient
def my_relu(x):
    y = tf.nn.relu(x)
    def grad(dy):
        return dy * tf.cast(x > 0, tf.float32)
    return y, grad

# tanh
@tf.custom_gradient
def my_tanh(x):
    y = tf.math.tanh(x)
    def grad(dy):
        return dy * (1 - y ** 2)
    return y, grad


In [None]:
# function d'initialisation
def init():
    position = (0,0)
    # Créer en entrée un vecteur de taille lines * columns avec comme nombre de sorties le nombre d'action possible 
    vect_etat = np.zeros((1,16))
    vect_etat[0, int(len(space) * position[0] + position[1])] = 1

    return position, vect_etat

In [None]:
position, vect_etat = init()

# ITERATIONS D'APPRENTISSAGE

# on applique l'algorithme
for iterationPartie in range(Nparties):
    progress = "Partie " + str(iterationPartie) + "/" + str(Nparties) 
    sys.stdout.write("\r" + progress)

    # on réinitialise la position
    position = (0,0)

    # calcul de epsilon
    epsilon = Nparties/(Nparties+iterationPartie)

    fin = False

    while not fin:
        # on choisit une action
        action = choose_action(vect_etat, epsilon, model)

        # on applique l'action
        new_position, reward, fin = application_action(action, position, space)

        # on met à jour vect_etat
        vect_etat[0, int(len(space) * position[0] + position[1])] = 0
        vect_etat[0, int(len(space) * new_position[0] + new_position[1])] = 1

        # model stable
        sortie_Q_stable = model_stable.predict(vect_etat, verbose=0)
        max_Q = np.max(sortie_Q_stable)

        target = reward + gamma * max_Q

        # descente de gradient
        with tf.GradientTape() as tape:
            predict = model(vect_etat, training=True)  #Ce que l'on pense obtenir 
            mask = tf.one_hot(DIRECTIONS.index(action), len(DIRECTIONS)) #On crée un masque pour sélectionner la sortie correspondant à l'action choisie
            val_predict = tf.reduce_sum(predict * mask, axis=1) #On sélectionne la sortie correspondant à l'action choisie
            loss = loss_fn(target, val_predict) #On calcule la perte
        
        # on applique la descente de gradient
        gradients = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # optimisation des paramètres

        # on met à jour la position
        position = new_position

        # on met à jour le model stable
        if iterationPartie % 100 == 0:
            model_stable.set_weights(model.get_weights())

    
# sauvegarde du model
model.save('deep_Q_learning_model.h5')

In [None]:
# JEU

def play():

    # affichage de l'espace avec un retour à la ligne à la fin de chaque ligne
    for i in range(len(space)):
        for j in range(len(space)):
            print(space[i][j], end="| ")
        print()
    # on charge le model
    model = keras.models.load_model('deep_Q_learning_model.h5')

    # initialisation de la position
    position = (0,0)
    fin = False

    MAX_ITER = 100
    iter = 0
    while not fin and iter < MAX_ITER:
        iter += 1

        # on crée le vecteur d'état
        vect_etat = np.zeros((1,16))
        vect_etat[0, int(len(space) * position[0] + position[1])] = 1

        # on choisit une action
        action = choose_action(vect_etat, 0, model)

        # on applique l'action
        new_position, reward, fin = application_action(action, position, space)

        # on met à jour la position
        position = new_position

        # on affiche l'action choisie avec ", " en end de print pour ne pas faire de retour à la ligne
        # mais avec un retour à la ligne tout les 10 actions
        print(action, end=", ")
        if iter % 10 == 0:
            print()
        


    if fin:
        print("Victoire ! en " + str(iter) + " itérations")
    else:
        print("Défaite")
        if (iter >= MAX_ITER):
            print("Trop d'itérations")

play()