<a href="https://colab.research.google.com/github/PTrillat/Reinforcement-Learning/blob/main/Jeu2048_RL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [125]:
import numpy as np
from numpy import random as rd
import tensorflow as tf


class Jeu2048(np.ndarray):
  def __new__(cls, m, n, p1, p2):
    new = np.zeros((m,n), dtype=int).view(cls)
    new.p1 = p1
    new.p2 = p2
    return new

  def maz(self):
    self *= 0
    self._peupler()
    return self._etat()
  
  def jouer(self, action):
    if action == 0: deplacement, gain = self._gauche()
    if action == 1: deplacement, gain = self._droite()
    if action == 2: deplacement, gain = self._haut()
    if action == 3: deplacement, gain = self._bas()
    fin = self._peupler() if deplacement else False
    return self._etat(), gain, fin
  
  def _etat(self):
    return np.log2(1*(self==0)+self).flatten()
  
  def _peupler(self):
    cases_vides = list(self._cases_vides())
    if len(cases_vides) == 0: return True
    i, j = cases_vides.pop(rd.randint(len(cases_vides)))
    self[i,j] = 2 if rd.rand() < self.p2 else 4
    if len(cases_vides) == 0 or rd.rand() < self.p1: return False
    i, j = cases_vides.pop(rd.randint(len(cases_vides)))
    self[i,j] = 2 if rd.rand() < self.p2 else 4
    return False
  
  def _cases_vides(self):
    m, n = self.shape
    for i in range(m):
      for j in range(n):
        if self[i,j] == 0: yield (i, j)
  
  def _gauche(self):
    def fusionner():
      self[i,j+1] += self[i,j]
      self[i,j:-1] = self[i,j+1:]
      self[i,-1] = 0
      return True, gain + self[i,j]
    deplacement, gain = False, 0
    m, n = self.shape
    for i in range(m):
      j = 0
      for iter in range(n-1):
        if self[i,j] == 0 or self[i,j+1] == 0: deplacement, gain = fusionner()
        else:
          if self[i,j] == self[i,j+1]: deplacement, gain = fusionner()
          j += 1
    return deplacement, gain
  
  def _droite(self):
    self = self[:,::-1]
    deplacement, gain = self._gauche()
    self = self[:,::-1]
    return deplacement, gain
  
  def _haut(self):
    self = self.T
    deplacement, gain = self._gauche()
    self = self.T
    return deplacement, gain
  
  def _bas(self):
    self = self[::-1,:]
    deplacement, gain = self._haut()
    self = self[::-1,:]
    return deplacement, gain

In [126]:
from typing import Any, List, Sequence, Tuple


# Wrap OpenAI Gym's `env.step` call as an operation in a TensorFlow function.
# This would allow it to be included in a callable TensorFlow graph.
# petit détaille : tf.function est un décorateur transformant les arguments d'une fonction en tenseur
# on ne peut donc pas décorer directement Jeu2048.jouer(self, action) à cause du self...
# pour le retirer, on utilisera donc une variable globale... (oui c'est moche)

grille = Jeu2048(4, 4, 0.25, 0.25)

def grille_maz() -> np.ndarray:
  global grille
  state = grille.maz()
  return state.astype(np.float32)

def grille_jouer(action: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
  global grille
  state, reward, done = grille.jouer(action)
  return (state.astype(np.float32), np.array(reward, np.int32), np.array(done, np.int32))

def tf_grille_maz() -> tf.Tensor:
  return tf.numpy_function(grille_maz, [], tf.float32)

def tf_grille_jouer(action: tf.Tensor) -> List[tf.Tensor]:
  return tf.numpy_function(grille_jouer, [action], [tf.float32, tf.int32, tf.int32])

In [127]:
from tensorflow.keras import layers
huber_loss = tf.keras.losses.Huber(reduction=tf.keras.losses.Reduction.SUM)


class ActorCritic(tf.keras.Model):
  def __init__(self, entree, shape_common, shape_actor, action, shape_critic):
    super().__init__()
    # common entree -> shape_common -> intermédiaire
    self.common = tf.keras.models.Sequential()
    self.common.add(tf.keras.Input(shape=(entree,)))
    for num in shape_common: self.common.add(tf.keras.layers.Dense(num, activation='relu'))
    intermedaire = num
    # common intermédiaire -> shape_actor -> action
    self.actor = tf.keras.models.Sequential()
    self.actor.add(tf.keras.Input(shape=(intermedaire,)))
    for num in shape_actor: self.actor.add(tf.keras.layers.Dense(num, activation='relu'))
    self.actor.add(tf.keras.layers.Dense(action, activation=tf.keras.activations.softmax))
    # common intermédiaire -> shape_critic -> 1
    self.critic = tf.keras.models.Sequential()
    self.critic.add(tf.keras.Input(shape=(intermedaire,)))
    for num in shape_critic: self.critic.add(tf.keras.layers.Dense(num, activation='relu'))
    self.critic.add(tf.keras.layers.Dense(1, activation=tf.keras.activations.softmax))
  
  def call(self, inputs):
    x = self.common(inputs)
    return self.actor(x), self.critic(x)

def run_episode(model, max_steps):
  # Type particulier de tf efficace pour l'ajout à la fin
  Probas = tf.TensorArray(dtype=tf.float32, size=0, dynamic_size=True)
  Values = tf.TensorArray(dtype=tf.float32, size=0, dynamic_size=True)
  Rewards = tf.TensorArray(dtype=tf.int32, size=0, dynamic_size=True)
  state = tf_grille_maz()
  for t in tf.range(max_steps):
    state = tf.expand_dims(state, 0) # Convert state into a batched tensor (batch size = 1)
    probas, value = model(state) # Run the model to get action probabilities and critic value
    action = tf.random.categorical(tf.math.log(probas), 1)[0,0] # Sample next action from the action probability distribution
    state, reward, done = tf_grille_jouer(action) # Apply action to the environment to get next state and reward
    Probas = Probas.write(t, tf.squeeze(probas))
    Values = Values.write(t, tf.squeeze(value)) # Store log probability of the action chosen
    Rewards = Rewards.write(t, reward) # Store reward
    if tf.cast(done, tf.bool): break
  Probas = Probas.stack()
  Values = Values.stack()
  Rewards = Rewards.stack()
  return Probas, Values, Rewards

def get_gains(Values, Rewards, gamma, lamed):
  n = tf.shape(Rewards)[0]
  Rewards = tf.cast(Rewards, dtype=tf.float32)
  Gains = tf.TensorArray(dtype=tf.float32, size=n)
  gain = Values[-1]
  for t in range(n-1,-1,-1):
    gain = (1-lamed)*Values[t] + lamed*(Rewards[t] + gamma*gain)
    Gains = Gains.write(t, gain)
  Gains = Gains.stack()
  #Gains = (Gains - tf.math.reduce_mean(Gains)) / (tf.math.reduce_std(Gains) + 1e-5)
  return Gains

def compute_loss(Probas, Values, Gains):
  Avantages = tf.expand_dims(Gains - Values, 1)
  actor_loss = -tf.math.reduce_sum(tf.math.log(Probas)*Avantages)
  critic_loss = huber_loss(Values, Gains)
  return actor_loss + critic_loss

@tf.function
def train_step(model: tf.keras.Model, optimizer: tf.keras.optimizers.Optimizer, gamma: float, lamed: float, max_steps: int) -> tf.Tensor:
  with tf.GradientTape() as tape:
    Probas, Values, Rewards = run_episode(model, max_steps)
    Gains = get_gains(Values, Rewards, gamma, lamed)
    loss = compute_loss(Probas, Values, Gains)
  grads = tape.gradient(loss, model.trainable_variables) # Compute the gradients from the loss
  optimizer.apply_gradients(zip(grads, model.trainable_variables)) # Apply the gradients to the model's parameters
  episode_reward = tf.math.reduce_sum(Rewards)
  return episode_reward

In [None]:
optimizer = tf.keras.optimizers.Adam(learning_rate=0.01)
cerveau = ActorCritic(grille.size, (16,16), (), 4, ())

from collections import deque
import tqdm

gamma = 0.999
lamed = 0.8
num_episodes = 500
min_steps = 100
max_steps = 5000

rewards = deque(maxlen=num_episodes)
maxima = deque(maxlen=num_episodes)

with tqdm.trange(num_episodes) as t:
  for i in t:
    reward = train_step(cerveau, optimizer, gamma, lamed, int(min_steps + i/num_episodes*(max_steps-min_steps)))
    maximum = np.max(grille)
    reward = int(reward)
    rewards.append(reward)
    maxima.append(maximum)
      
    t.set_description(f'Episode {i}')
    t.set_postfix(episode_reward=reward, maximum=maximum)

Episode 3:   1%|          | 4/500 [00:12<20:31,  2.48s/it, episode_reward=1008, maximum=64]



Episode 4:   1%|          | 5/500 [00:13<18:27,  2.24s/it, episode_reward=1052, maximum=64]



In [None]:
import matplotlib.pyplot as plt

plt.figure()
plt.plot(rewards, label='gain instantané')
plt.plot(np.cumsum(rewards)/range(1,1+len(rewards)), label='gain moyen')
plt.legend()
plt.grid()
plt.show()

plt.figure()
plt.plot(maxima, label='tuile maximum')
plt.legend()
plt.grid()
plt.show()