<center><h1>Introduction to Reinforcement Learning</h1></center>




<p>
 In this tutorial, we will be discussing the basics of Reinforcement Learning and how to implement a simple RL model using the DQN (Deep Q-Network) algorithm.
</p>

# I) What is Reinforcement Learning?

Reinforcement Learning is a machine learning method that depends on feedback processes. In this approach, an agent learns how to interact with its environment by carrying out actions and assessing the resulting consequences. When the agent makes a favorable choice, it receives positive feedback; on the other hand, negative feedback or penalties are given for detrimental decisions.

The main components of a Reinforcement Learning system are:

- Agent: The learning entity that makes decisions and takes actions.
- Environment: The world in which the agent interacts and takes actions.
- State: A snapshot of the current situation in the environment.
- Action: A decision made by the agent that affects the environment.
- Reward: Feedback given to the agent based on the outcome of an action.

<div style="text-align: center;">
  <figure>
    <img src="Agent-RL-Illustration.png" alt="Agent RL Illustration" width="500">
    <figcaption>Source: <a href="https://vitalflux.com/reinforcement-learning-real-world-examples/?utm_content=cmp-true">vitalflux</a></figcaption>
  </figure>
</div>

# II) Basic Concepts in RL

### a) Q-Values


<p>Q-values represent the expected cumulative reward an agent can obtain by performing an action in a given state. In Q-learning, we use a Q-table or Q-function to estimate these values for each state-action pair. The goal is to learn the optimal Q-function, which can be used to determine the best action in each state.</p>

<p>The Q-learning algorithm updates the Q-values iteratively using the following update rule:</p>

<pre>
Q(s, a) = Q(s, a) + α * (r + γ * max<sub>a'</sub> Q(s', a') - Q(s, a))
</pre>

<p>Here, <b>s</b> represents the current state, <b>a</b> is the action taken, <b>r</b> is the immediate reward received, <b>s'</b> is the next state, <b>a'</b> is the next action, <b>α</b> is the learning rate, and <b>γ</b> is the discount factor.</p>


### b) Value function
The value function, denoted as V(s), represents the expected cumulative reward an agent can obtain starting from a given state and following a specific policy. The value function is related to the Q-values, as the value of a state is the maximum Q-value for that state:

<b> V(s) = max_a Q(s, a)</b>

### c) Policy function

The policy function, denoted as π(s), represents the action the agent should take in a given state to maximize its expected cumulative reward. The policy function is derived from the Q-values, as it selects the action with the highest Q-value for each state:

<b>π(s) = argmax_a Q(s, a)</b>

# III) Deep Q-Networks (DQN) 

In many practical problems, the state and action spaces are too large to represent the Q-values in a table. In such cases, we can use a neural network, called a Deep Q-Network (DQN), to approximate the Q-function. A DQN takes the state as input and produces Q-values for each action as output.

##### Experience Replay

- Experience replay is a technique used to improve the stability and efficiency of the DQN. Instead of updating the network with consecutive samples, the agent stores the experiences (state, action, reward, next state, done) in a memory buffer and samples a mini-batch of experiences to update the network. This helps to break the correlation between samples and improves the learning process.

##### Target Networks

- Another technique used in DQN is the use of a separate target network to estimate the Q-values for the next state during the update step. This network has the same architecture as the original DQN but with separate parameters. The target network's parameters are periodically updated with the main DQN's parameters to provide more stable Q-value estimates.

# IV) First RL Model

In this part of the tutorial, we will implement a simple reinforcement learning model using the concepts discussed above. The model consists of a custom game environment, a DQN to estimate Q-values, and an agent that uses the DQN to navigate through the environment.

###  a) Game Environment Class


Imagine a grid-based labyrinth where an agent must find its way out. The agent can take 4 actions: up, down, left, and right. The grid has different types of cells, including:

- Empty cells: The agent can move freely in these cells.
- Walls(obstacle 1): If the agent tries to move into a wall, it will receive a negative reward (-1) and stay in its current cell.
- Holes (obstacle 2): If the agent falls into a hole, it dies and receives a negative reward (-100).
- Magic doors (obstacle 3): When the agent encounters a magic door, it gets teleported closer to the exit and receives a +20 reward.
- If the agent does not find the exit after 300 steps, it dies and receives a negative reward.
- If the agent does find the exit, it gets a postive reward (+100)

In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import random
from collections import deque
import os

class GameEnv:
    def __init__(self):
        self.board_size = 10
        self.board = np.zeros((self.board_size, self.board_size), dtype=int)
        #Obstacle de niveau 1, mur
        self.obstacles_1 = [(4, 8), (4, 2), (7, 3), (3, 5), (8, 0)]
        self.obstacles_2 = [(6, 6), (3, 4), (4, 4), (7, 7)]
        self.obstacles_3 = [(4, 9), (3, 2)]

        self.numbre_de_tour = 0

        self.max_steps = 300
        
        self.reset()

    def reset(self):
        self.board.fill(0)
        self.p1_pos = (0, 0)
        
        self.board[self.p1_pos] = 1
        for obs in self.obstacles_1:
            self.board[obs] = -1

        for obs in self.obstacles_2:
            self.board[obs] = -50

        for obs in self.obstacles_3:
            self.board[obs] = 20
        return self.get_state()

    def get_state(self):
        state = np.zeros((4, self.board_size, self.board_size), dtype=int)
        state[0][self.p1_pos] = 1

        for obs in self.obstacles_1:
            y, x = obs
            state[1][y][x] = 1

        for obs in self.obstacles_2:
            y, x = obs
            state[2][y][x] = 1

        for obs in self.obstacles_3:
            y, x = obs
            state[3][y][x] = 1

        return state

    def reset_turns(self):
        self.numbre_de_tour = 0

    def step(self, player, action):
        new_pos = self.move(self.p1_pos, action)

        if new_pos in self.obstacles_1:
            self.p1_pos = self.p1_pos
            mur = 'Positif'
        else:
            self.p1_pos = new_pos
            mur = 'RAF'

        self.numbre_de_tour += 1  
        reward, done = self.get_reward(player, mur)
        return self.get_state(), reward, done



    def move(self, pos, action):
        y, x = pos
        if action == 0:  # Up
            y = max(0, y - 1)
        elif action == 1:  # Down
            y = min(self.board_size - 1, y + 1)
        elif action == 2:  # Left
            x = max(0, x - 1)
        elif action == 3:  # Right
            x = min(self.board_size - 1, x + 1)
        new_pos = (y, x)
        return new_pos

    
    def render(self):
        print("\n")
        for i in range(self.board_size):
            row = ""
            for j in range(self.board_size):
                if (i, j) == self.p1_pos:
                    row += "P1 "

                elif (i, j) in self.obstacles_1:
                    row += "O "
                elif (i, j) in self.obstacles_2:
                    row += "X "

                elif (i, j) in self.obstacles_3:
                    row += "A "
                else:
                    row += ". "
            print(row)

    def get_reward(self, player, mur):
        reward, done = 0, False

        if self.p1_pos == (9, 9):
            reward, done = 100, True
        elif mur == 'Positif':
            reward = -1
        elif self.p1_pos in self.obstacles_2:
            self.p1_pos = (0, 0)
            reward, done = -50, True
        elif self.p1_pos in self.obstacles_3:
            self.p1_pos = (8, 9)
            reward = 10
        else:
            reward = 0.0

        if self.numbre_de_tour >= self.max_steps:  
            reward = -100 
            done = True  
        return reward, done


### b) DQN Class

Let's create the DQN class defines the architecture of the neural network.
As we said above, the Deep Q-Network  is a neural network that approximates the Q-function in Q-learning.The Q-function represents the expected future reward of taking a specific action in a given state. The DQN takes the current state as input and outputs Q-values for all possible actions.



In [None]:
class DQN(nn.Module):
    def __init__(self):
        super(DQN, self).__init__()
        self.conv1 = nn.Conv2d(4, 32, kernel_size=3, stride=1, padding=1)
        self.bn1 = nn.BatchNorm2d(32)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
        self.bn2 = nn.BatchNorm2d(64)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1)
        self.bn3 = nn.BatchNorm2d(128)
        self.fc1 = nn.Linear(128 * 10 * 10, 512)
        self.dropout1 = nn.Dropout(0.5)
        self.fc2 = nn.Linear(512, 256)
        self.dropout2 = nn.Dropout(0.5)
        self.fc3 = nn.Linear(256, 4)

    def forward(self, x):
        x = torch.relu(self.bn1(self.conv1(x)))
        x = torch.relu(self.bn2(self.conv2(x)))
        x = torch.relu(self.bn3(self.conv3(x)))
        x = x.view(-1, 128 * 10 * 10)
        x = torch.relu(self.fc1(x))
        x = self.dropout1(x)
        x = torch.relu(self.fc2(x))
        x = self.dropout2(x)
        x = self.fc3(x)

        return x

### c) Agent class

The Agent class handles the learning process of the RL model. It includes the DQN model, the target model, the optimizer, the memory for experience replay, and the action selection.

The agent uses the ε-greedy strategy for action selection, which allows it to balance exploration and exploitation. It also uses experience replay and a target network to stabilize learning.

In [None]:
class Agent:
    def __init__(self, player_id):
        self.player_id = player_id
        self.model = DQN()
        self.target_model = DQN()
        self.update_target_model()
        self.optimizer = optim.Adam(self.model.parameters(), lr=0.0001)
        self.memory = deque(maxlen=10000)
        self.batch_size = 4
        self.gamma = 0.98
        self.epsilon = 1
        self.epsilon_decay = 0.995
        self.epsilon_min = 0.02
        self.save_path = f"model_player_{player_id}.pth"

    def update_target_model(self):
        self.target_model.load_state_dict(self.model.state_dict())

    def memorize(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state):
        if random.random() <= self.epsilon:
            return random.randint(0, 3)
        state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0)
        q_values = self.model(state_tensor)
        return torch.argmax(q_values).item()

    def replay(self):
        if len(self.memory) < self.batch_size:
            return

        minibatch = random.sample(self.memory, self.batch_size)
        for state, action, reward, next_state, done in minibatch:
            state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0)
            next_state_tensor = torch.tensor(next_state, dtype=torch.float32).unsqueeze(0)
            target = self.model(state_tensor)

            if done:
                target[0][action] = reward
            else:
                target[0][action] = reward + self.gamma * torch.max(self.target_model(next_state_tensor))

            self.optimizer.zero_grad()
            loss = nn.MSELoss()(self.model(state_tensor), target)
            loss.backward()
            self.optimizer.step()

        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

    def save(self):
        if file_name is None:
            file_name = self.save_path
        torch.save(self.model.state_dict(), file_name)

    def load(self):
        self.model.load_state_dict(torch.load(self.save_path))
        self.update_target_model()

### d) Training Function

The train_agent function trains the agent using the custom game environment. It plays multiple games (epochs) and adjusts the agent's behavior based on the experience gained during each game. The agent's performance is evaluated using the total reward and the mean reward of the last games.

In [None]:
def train_agent(epochs=1000):
    env = GameEnv()
    agent = Agent(player_id=1)
    reward_history = []

    for e in range(epochs):
        total_reward = 0
        state = env.reset()
        env.reset_turns()
        print(f"Starting epoch {e+1}/{epochs}")
        done = False
        step_counter = 0 # Compteur d'étapes pour chaque épisode
        rec =[]
        while not done:
            
            step_counter += 1
            

            
            action = agent.act(state)
            next_state, reward, done = env.step(player=1, action=action)
            
            agent.memorize(state, action, reward, next_state, done)
            total_reward += reward
            state = next_state

            rec.append(reward)
            
            if step_counter % 100 == 0:
                
                print(f"Epoch {e+1}/{epochs}, Step {step_counter}") # Ajouter un message pour indiquer une nouvelle étape
                print("Raward totale:", total_reward)
                

            agent.replay()

        reward_history.append(total_reward)

        mean_reward = np.mean(reward_history[-100:])
            
        print(f"Epoch {e+1}/{epochs} - Total reward for player : {total_reward}")
        print(f"Epoch {e+1}/{epochs} - Mean reward for player : {mean_reward}")

       
        

        if e % 5 == 0:
            agent.update_target_model()
            
        if e % 10 == 0:
            torch.save(agent.model.state_dict(), "model_custom_name.pth")
            
    # À la fin de la fonction train_agent()
    torch.save(agent.model.state_dict(), "model_custom_name.pth")


### e)  Testing  Function

After training the agent, you can test its performance using the test function. The function plays a single game using the trained agent and reports the total reward and the number of steps taken to reach the goal.

In [None]:
def test(env, agent):
    total_reward = 0
    state = env.reset()
    done = False
    step_counter = 0
    while not done:
        step_counter += 1
        action = agent.act(state)
        next_state, reward, done = env.step(player=1, action=action)
        total_reward += reward
        state = next_state
    print(f"Total reward during test: {total_reward}")
    print(f"Steps taken during test: {step_counter}")
    return total_reward, step_counter


### f) Run 

- Run the rl_train_script.py to train you first agent.