In [227]:
import numpy as np 
import random
from itertools import groupby
import tensorflow.keras as keras

### Construction de l'environnement 

L'état est un vecteur colonne de 12 valeurs correspondant aux 12 cases du jeu de Yam. Les valeurs sont initialisées à 0 puis progressivement remplies avec les valeurs du jeu. 

In [81]:
state = np.zeros(12, dtype=int)
state

array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])

In [6]:
def roll_dice():
    min=1
    max=6
    return random.randint(min,max)

In [77]:
throw = [roll_dice() for _ in range(5)]
throw

[1, 3, 4, 5, 1]

In [99]:
def hist(throw):
    count=np.zeros(6, dtype=int)
    
    for i in throw:
        count[i-1]+=1
    return count

In [175]:
def env_step(state, action, throw, n_throw):
    # state est composé de la grille de case du jeu 
    # action est l'action entreprise par le joueur/agent
    # throw est le jeu de dés tirés durant le tour 
    # n_throw est le nombre de tir effectué pour le tour 
    
    reward=0
    done=False
    
    # vérifier si la colonne intermédiaire est pleine
    if (0 in state[0:5]):
        info=0
    else:
        info=1
    count=hist(throw)
    
    # si le joueur/agent souhaite retirer ses dés
    # on passe l'interaction avec l'environnement
    if action==12:
        print("Veuillez retirer")
        True
    # réprimer les actions interdites
    # sur-écritures interdites
    elif (not (state[action]==0) or (state[action]==-1)):
        reward=-20
        
    else:
        # chaque action est perfomée pour valider une case demandée 
        # que cette action soit valide (satisfasse la condition)
        # ou invalide (on barre alors la case et lui donne la valeur -1)
        
        if action in range(6):
            if not count[action]==0:
                reward=state[action]=count[action]*(action+1)
            else:
                state[action]=-1
                reward=-10        
        elif action==6:
            if (3 in count):
                reward=state[6]=sum(throw)
            else:
                reward=-10
                state[6]=-1
        elif action==7:
            if [i==1 for i in count]==[1,1,1,1,1,0]:
                reward=state[7]=15
            else:
                reward=-10
                state[7]=-1
        elif action==8:
            if [i==1 for i in count]==[0,1,1,1,1,1]:
                reward=state[8]=20
            else:
                reward=-10
                state[8]=-1
        elif action==9:
            if (3 in count) and (2 in count) :
                reward=state[9]=30
            else:
                reward=-10
                state[9]=-1
        elif action==10:
            if (4 in count) :
                reward=state[10]=40
            else:
                reward=-10
                state[10]=-1
        elif action==11:
            if (5 in count) :
                reward=state[11]=50
            else:
                reward=-10
                state[11]=-1
        else:
            print("Action non répertoriée")
    
    # à l'issue de ce tour on est parvenu à remplir la colonne intermédiaire
    # la récompense est donc de 35 points supplémentaires
    if info==0 and (0 not in state[0:5]) and (-1 not in state[0:5]):
        reward+=35
    
    # si aucune des cases n'est restée vide à l'issue du tour 
    # la partie est terminée !
    if (0 not in state):
        done=True
        
    return state,reward,done

In [193]:
def throwing(throw,n_throw, mask):
    if n_throw==3:
        throw = [roll_dice() for _ in range(5)]
        n_throw=1
    else:
        keep=[a*b for a,b in zip(throw,mask) if a*b !=0]
        throw= keep + [roll_dice() for _ in range(5-len(keep))]
        n_throw+=1
        
    return(throw,n_throw)

In [230]:
input_shape = [18] # == 12 cases dans la grille + 5 dés lancés + 1 n° lancé
n_outputs = 18 # == 12 cases à cocher dans la grille + 5 dés pouvant être gardés + relancer en booléen

model = keras.models.Sequential([
    keras.layers.Dense(64, activation="elu", input_shape=input_shape),
    keras.layers.Dense(32, activation="elu"),
    keras.layers.Dense(n_outputs)
])

In [232]:
def epsilon_greedy_policy(state, epsilon=0):
    if np.random.rand() < epsilon:
        return np.random.randint(18)
    else:
        Q_values = model.predict(state[np.newaxis])
        return np.argmax(Q_values[0])

In [233]:
from collections import deque

replay_memory = deque(maxlen=2000)

In [234]:
def sample_experiences(batch_size):
    indices = np.random.randint(len(replay_memory), size=batch_size)
    batch = [replay_memory[index] for index in indices]
    states, actions, rewards, next_states, dones = [
        np.array([experience[field_index] for experience in batch])
        for field_index in range(5)]
    return states, actions, rewards, next_states, dones

In [235]:
def play_one_step(env, state, epsilon):
    action = epsilon_greedy_policy(state, epsilon)
    next_state, reward, done = env_step(state, action, throw, n_throw)
    replay_memory.append((state, action, reward, next_state))
    return next_state, reward, done

In [236]:
batch_size = 32
discount_rate = 0.95
optimizer = keras.optimizers.Adam(lr=1e-3)
loss_fn = keras.losses.mean_squared_error

def training_step(batch_size):
    experiences = sample_experiences(batch_size)
    states, actions, rewards, next_states, dones = experiences
    next_Q_values = model.predict(next_states)
    max_next_Q_values = np.max(next_Q_values, axis=1)
    target_Q_values = (rewards +
                       (1 - dones) * discount_rate * max_next_Q_values)
    target_Q_values = target_Q_values.reshape(-1, 1)
    mask = tf.one_hot(actions, n_outputs)
    with tf.GradientTape() as tape:
        all_Q_values = model(states)
        Q_values = tf.reduce_sum(all_Q_values * mask, axis=1, keepdims=True)
        loss = tf.reduce_mean(loss_fn(target_Q_values, Q_values))
    grads = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(grads, model.trainable_variables))

In [None]:
for episode in range(600):
    obs = env.reset()    
    for step in range(200):
        epsilon = max(1 - episode / 500, 0.01)
        obs, reward, done = play_one_step(env, obs, epsilon)
        if done:
            break
    rewards.append(step) # Not shown in the book
    if step > best_score: # Not shown
        best_weights = model.get_weights() # Not shown
        best_score = step # Not shown
    print("\rEpisode: {}, Steps: {}, eps: {:.3f}".format(episode, step + 1, epsilon), end="") # Not shown
    if episode > 50:
        training_step(batch_size)

model.set_weights(best_weights)