In [None]:
from gym import Env 
from gym import spaces 
import random 
import numpy as np 
from IPython.display import clear_output
from stable_baselines3.common.env_checker import check_env
import torch
from torch import nn
import torch.nn.functional as F
from collections import deque

import os

In [None]:
# game board values
NOTHING = 0
PLAYER = 1
WIN = 2
LOSE_T = 50
# action values
UP = 0
DOWN = 1
LEFT = 2
RIGHT = 3

In [None]:
class BasicEnv(Env):

    def __init__(self):
        # custom class variable used to display the reward earned
        self.cumulative_reward = 0

        # set the initial state to a flattened 6x6 grid with a randomly placed entry, win, and player
        
        self.state = [NOTHING] * 36 
        self.player_position = random.randrange(0, 35)
        self.win_position = 35
        self.lose_time = 0
        

        self.state[self.player_position] = PLAYER
        self.state[self.win_position] = WIN
        
    

        
        self.state = np.array(self.state)
    
        """
            observation space (valid ranges for observations in the state)
            creates a space where each of the 36 elements can range from 0 to 3 (0,1,2,3)

            low: A float or array specifying the lower bounds of the box.
            high: A float or array specifying the upper bounds of the box.
            shape: (Optional) A tuple specifying the shape of the box. If not provided, it’s inferred from low or high.
            
            
        """
        self.observation_space = spaces.Box(low = 0, high = 2, shape= [36,]) 
        """
        Valid actions:
           0 = up
           1 = down
           2 = left
           3 = right

         spaces.Discrete(4) creates a discrete space with 4 possible actions, which are represented by integers 0, 1, 2, and 3.

        """

        self.action_space = spaces.Discrete(4)

    def step(self, action):
        # placeholder for debugging information
        info = {}
        # set default values for done, reward, and the player position before taking the action

        done = False
        reward = 0.01
        previous_position = self.player_position
        

        """ 
        take the action by moving the player
        The grid was transform as a vector of 36 components try to represent the grid and move to understand 
            
        """


        if action == UP:
            if (self.player_position - 6) >= 0:
                    self.player_position -= 6
        elif action == DOWN:
            if (self.player_position + 6) < 36:
                    self.player_position += 6
        elif action == LEFT:
            if (self.player_position % 6) != 0:
                    self.player_position -= 1
        elif action == RIGHT:
            if (self.player_position % 6) != 5:
                    self.player_position += 1
        else:
            # check for invalid actions
            raise Exception("invalid action")
        #
        # check for win/lose conditions and set reward
        #
        self.lose_time += 1
        
        if self.player_position==previous_position:
             reward = -0.5
        
        if self.state[self.player_position] == WIN:
            reward = 1.0
            self.cumulative_reward += reward
            done = True    
            
         
            print(f'Cumulative Reward: {self.cumulative_reward}')
            print('YOU WIN!!!!')

        elif self.lose_time == LOSE_T:
             reward = -2.0
             self.cumulative_reward += reward 
             done = True
             print(f'Cumulative Reward: {self.cumulative_reward}')
             print('YOU LOSE')
        #
        # Update the environment state
        #
        if not done:
            # update the player position
            self.state[previous_position] = NOTHING
            self.state[self.player_position] = PLAYER
        self.cumulative_reward += reward
        return self.state, reward, done, info

    def reset(self):
        self.cumulative_reward = 0
        #
        # set the initial state to a flattened 6x6 grid with a randomly placed entry, win, and player
        #
        self.state = [NOTHING] * 36

        self.player_position = random.randrange(0, 35)
        self.win_position = 35
       
        self.lose_time = 0
        
      

   
            
        self.state[self.player_position] = PLAYER
        self.state[self.win_position] = WIN
        

        # convert the python array into a numpy array (needed since Gym expects the state to be this way)
        self.state = np.array(self.state)

        return self.state
    
    def render(self):
        # visualization can be added here
        pretty_print(self.state, self.cumulative_reward)

def pretty_print(state_array, cumulative_reward):
    #clear_screen()
    print(f'Cumulative Reward: {cumulative_reward}')
    print()
    for i in range(6):
        for j in range(6):
            print('{:4}'.format(state_array[i*6 + j]), end = "")
        print()
    



In [None]:
class DQN(nn.Module):
    def __init__(self, in_states,nodes,actions) -> None:
        super().__init__()
        
        # DQN layers 
        self.input = nn.Linear(in_states,nodes) # Couche d'entré 
        self.out = nn.Linear(nodes,actions) #Couche de sortie

    def forward(self, x):
        x = F.relu(self.input(x))
        x = self.out(x)
        return x
    
#Experience Replay

class ReplayMemory():
    def __init__(self,maxlen) -> None:
        self.memory = deque([], maxlen=maxlen)
    def append(self,value):
        self.memory.append(value)
    def sample(self, s_size):
        return random.sample(self.memory,s_size)
    def __len__(self):
        return len(self.memory)



In [None]:
def state_to_input(state,nb_states)->torch.Tensor:
    player = np.where(state == 1)[0]
    win = np.where(state == 2)[0]
    

    
    input_tensor = torch.zeros(nb_states)
    input_tensor[player] = 1
    input_tensor[win] = 2 
        
    return input_tensor

In [None]:
class Game_P():
    
    learning_rate = 0.001         
    discount_factor = 0.9               
    network_sync_rate = 10          # Nombre d'etape que l'agent prend avant la synchronisation de la policy et la target network
    replay_memory_size = 1000       # Taille de la replay memory
    mini_batch_size = 32         

    # Neural Network
    loss = nn.MSELoss()            
    optimizer = None                

    #ACTIONS = ['U','D','L','R']  
    

    def train(self, episodes):
      env = BasicEnv()

      nb_states = env.observation_space.shape[0]
      nb_actions = env.action_space.n
      
  
      epsilon = 1 # 1 = 100% d'actions aléatoire
      memory = ReplayMemory(self.replay_memory_size)

      # policy and target network.
      policy_dqn = DQN(in_states=nb_states, nodes=nb_states, actions=nb_actions)
      target_dqn = DQN(in_states=nb_states, nodes=nb_states, actions=nb_actions)

      target_dqn.load_state_dict(policy_dqn.state_dict())
      



      self.optimizer = torch.optim.Adam(policy_dqn.parameters(), lr=self.learning_rate)

      rewards_par_episode = np.zeros(episodes)

      epsilon_history = []

      step_count=0

      for i in range(episodes):
        print("EPISODES = ",i)
        state = env.reset()
        done = False 
        
        
        while (not done ):
            if random.random() < epsilon:
                action = env.action_space.sample()

            else :
                with torch.no_grad():
                    action = policy_dqn(state_to_input(state,nb_states)).argmax().item()

            new_state, reward, done, info = env.step(action)
            env.render()

            memory.append((state, action, new_state, reward, done)) 

            state = new_state

            step_count += 1
            

        if reward == 1 :
            rewards_par_episode[i] = 1

        if len(memory)>self.mini_batch_size and np.sum(rewards_par_episode)>0:
            mini_batch = memory.sample(self.mini_batch_size)
            self.optimize(mini_batch, policy_dqn, target_dqn)        

            # Decay epsilon
            epsilon = max(epsilon - 1/episodes, 0)
            epsilon_history.append(epsilon)

            
            # Copie de policy network vers target network apres un certain nb d'etape
            if step_count > self.network_sync_rate:
                target_dqn.load_state_dict(policy_dqn.state_dict())
                step_count=0
                
     

      env.close()


      torch.save(policy_dqn.state_dict(), "game_dql.pt")
    
    # Optimize policy 
    def optimize(self, mini_batch, policy_dqn, target_dqn):

        # Get nb of input nodes
        nb_states = policy_dqn.input.in_features

        current_q_list = []
        target_q_list = []

        for state, action, new_state, reward, done in mini_batch:


            if done: 
              
                target = torch.FloatTensor([reward])
            else:
                # Calculate target q value decsente de gradient (Q learning Formula)
                with torch.no_grad():
                    target = torch.FloatTensor(
                        reward + self.discount_factor * target_dqn(state_to_input(new_state,nb_states)).max()
                    )

            # recuppérer les valeurs de Q values
            current_q = policy_dqn(state_to_input(state,nb_states))
            current_q_list.append(current_q)

            # Get the target set of Q values
            target_q = target_dqn(state_to_input(state,nb_states)) 
            # Lier l'action au reward
            target_q[action] = target
            target_q_list.append(target_q)
                
        # loss
        loss = self.loss(torch.stack(current_q_list), torch.stack(target_q_list))

        # Descente de gradient
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
    
    def test(self, episodes):
        
        env = BasicEnv()
        nb_states = env.observation_space.shape[0]
        nb_actions = env.action_space.n
        

        # charger la policy sauvegardé
        policy_dqn = DQN(in_states=nb_states, nodes=nb_states, actions=nb_actions) 
        
        policy_dqn.load_state_dict(torch.load("game_dql.pt"))
        
        policy_dqn.eval()    # eval model
        


        for i in range(episodes):
            print("EPISODES = ",i)
            state = env.reset()
            done = False 
                              

            
            while(not done ):  
                
                  
                with torch.no_grad():
                    action = policy_dqn(state_to_input(state,nb_states)).argmax().item()
                    

                
                state,reward,done,info = env.step(action)
                
               
                env.render()

        env.close()    


          
          

       

In [None]:
Game = Game_P()
Game.train(10000)

In [None]:
Game.test(10)