In [27]:
!pip install -q gym accelerate torch

In [2]:
import os
import numpy as np
import time

import matplotlib.pyplot as plt

# Initialize env
import random
from gym import Env, spaces 
from IPython.display import clear_output

# Model
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque

random.seed(0)

### Game

There is a board 6x6. Objection to move the fastest way from Agent 1 to Goal 3 -> win, but if it catches Trap 2 -> lose.

![](https://miro.medium.com/v2/resize:fit:640/format:webp/1*zx8131dzXB2ku7DJr_QJOw.png)


- The agent is the number 1
- The green square is the number 2
- The red square is the number 3
- The empty squares are the number 0

Cause `gym` requires the environment state to be represented as a single row of values, not a grid of values, we must flatten the grid by taking the top row, adding the 2nd row to the end of it, adding the 3rd row to the end of those combined rows, etc. etc.

![](https://miro.medium.com/v2/resize:fit:720/format:webp/1*ydFqgzVgLGiBQj66JHUFkQ.png)


The `action` of `agent` must be converted to number too. I assume:

- Up: 0
- Down: 1
- Right: 2
- Left: 3

We also have to define `reward` of `agent` after finishing an `action`:

- If the agent wins the game, it will be rewarded with 1 point.
- If the agent loses the game, it will be rewarded with -1 point.
- Every action that does not result in a win or a loss will give a reward of -0.01 points. This is to incentivize the agent to take the fewest actions to win the game

**Notice:**

`Gym` environments have 4 functions that need to be defined within the environment class:

- __init__(self)
- step(self, action)
- reset(self)
- render(self)

In [169]:
### ENVIRONMENT ###
class CustomEnv(Env):
    def __init__(self):
        # Initialize cumulative reward of agent
        self.cumulative_reward = 0

        self.env_dict = {
            "NOTHING": 0,
            "PLAYER": 1,
            "WIN": 3,
            "LOSE": 2
        }
        self.action_dict = {
            "UP": 0,
            "DOWN": 1,
            "LEFT": 2,
            "RIGHT": 3
        }
        
        # Initialize board 6x6 in flatten shape or state
        self.state = [self.env_dict['NOTHING']] * 36 

        # Initialize random position of player, win and lose
        position = np.random.choice(36, size = 3, replace=False)
        self.player_position = position[0]
        self.win_position = position[1]
        self.lose_position = position[2]

        # Assign position of player, win and lose to state
        self.state[self.player_position] = self.env_dict['PLAYER']
        self.state[self.win_position] = self.env_dict['WIN']
        self.state[self.lose_position] = self.env_dict['LOSE']

        # Convert to numpy array to afford requirement from gym library
        self.state = np.array(self.state, dtype=np.int16)

        # Initialize observation space of state
        self.observation_space = spaces.Box(0, 3, [36,], dtype=np.int16)

        # Define action agent can do (0-4)
        self.action_space = spaces.Discrete(4) 
        
    def step(self, action):
        # placeholder for debugging information
        info = {}

        # Define origin value for parameters
        done = False
        reward = -0.01
        previous_position = self.player_position

        # Take action
        if action == self.action_dict['UP']:
            if (self.player_position - 6) >= 0:
                self.player_position -= 6
        elif action == self.action_dict['DOWN']:
            if (self.player_position + 6) < 36:
                self.player_position += 6
        elif action == self.action_dict['LEFT']:
            if (self.player_position % 6) != 0:
                self.player_position -= 1
        elif action == self.action_dict['RIGHT']:
            if (self.player_position % 6) != 5:
                self.player_position += 1
        else:
            # check for invalid actions
            raise Exception("invalid action")

        # Check for win/lose statement
        if self.state[self.player_position] == self.env_dict['WIN']:
            reward = 1.0
            self.cumulative_reward += reward
            done = True
            print("---------------------------------")
            print(f'Cumulative Reward: {self.cumulative_reward}')
            print('YOU WIN!!!!')
            print("---------------------------------")

        elif self.state[self.player_position] == self.env_dict['LOSE']:
            reward = -10.0
            self.cumulative_reward += reward
            done = True
            print("---------------------------------")
            print(f'Cumulative Reward: {self.cumulative_reward}')
            print('YOU LOSE...')
            print("---------------------------------")

        # Update the environment state
        if not done:
            self.state[previous_position] = self.env_dict['NOTHING']
            self.state[self.player_position] = self.env_dict['PLAYER']
            self.cumulative_reward += reward

        return self.state, reward, done, info
        
    def reset(self):
        
        self.cumulative_reward = 0
        
        self.state = [self.env_dict['NOTHING']] * 36
        
        position = np.random.choice(36, size = 3, replace=False)
        self.player_position = position[0]
        self.win_position = position[1]
        self.lose_position = position[2]

        self.state[self.player_position] = self.env_dict['PLAYER']
        self.state[self.win_position] = self.env_dict['WIN']
        self.state[self.lose_position] = self.env_dict['LOSE']

        self.state = np.array(self.state, dtype=np.int16)

        return self.state

        
    def render(self, process: bool = False):
        if process:
            self._screen_print(self.state, self.cumulative_reward)
        else:
            self._clear_screen()
            self._screen_print(self.state, self.cumulative_reward)

    def _clear_screen(self):
        clear_output()
        os.system('cls')
        
    def _screen_print(self, state_array, cumulative_reward):
        print(f'Cumulative Reward: {cumulative_reward}')
        print()
        for i in range(6):
            for j in range(6):
                print('{:4}'.format(state_array[i*6 + j]), end = "")
            print()
    
    def observe(self):
        return self.state

    def show_action(self, action):
        if action == self.action_dict['UP']:
            print("UP")
        elif action == self.action_dict['DOWN']:
            print("DOWN")
        elif action == self.action_dict['LEFT']:
            print("LEFT")
        elif action == self.action_dict['RIGHT']:
            print("RIGHT")


In [71]:
### Test env 
env_object = CustomEnv()
env_object.render()

action = int(input("Enter action:"))

state, reward, done, info = env_object.step(action)

while not done:
    env_object.render()
    action = int(input("Enter action:"))
    state, reward, done, info = env_object.step(action)

env_object.close()

sh: 1: cls: not found


Cumulative Reward: -0.04

   0   0   0   1   0   0
   0   0   2   3   0   0
   0   0   0   0   0   0
   0   0   0   0   0   0
   0   0   0   0   0   0
   0   0   0   0   0   0


Enter action: 1


---------------------------------
Cumulative Reward: 0.96
YOU WIN!!!!
---------------------------------


This is done for creating env. 

Now we will continue to train a RL Agent.

There are 2 approaches to solve a RL problem:

- Policy-based: Try to build a policy that agent follow to solve the problem.
- Value-based: Not using policy but only value from value function
- Hybrid (actor-critic): combine two approaches above.

In this notebook, I will use value-approach using Q-value.

Environment can be divided into 2 approaches:

- Model-based: RL agent can model the environment, so it can predict what will happen if a new action happens: reward model, transition model. 
- Model-free: RL agent study to optimize action based on reward it gains after finishing one action. RL agent does not predict next action but choose a set of actions to minimum cumulative reward.

The agent can be `off-policy` or `on-policy`:

- on-policy: Agent use policy to generate actions and learn from these actions but does not change the policy. Usually used in situations where policy has less changes.
- off-policy: Agent can learn from actions that come from other policy.

I will use `temporal difference learning` where the Q-value estimates will be learned by experience in replay buffer.

In [186]:
class DQN(nn.Module):
    def __init__(self, input_dim=36, output_dim=4, nf = 64):
        super(DQN, self).__init__()
        
        self.input_dim = input_dim
        self.output_dim = output_dim
        self.nf = nf
        
        self.fc1 = nn.Linear(self.input_dim, self.nf)
        self.fc2 = nn.Linear(self.nf, self.nf*2)
        # self.fc3 = nn.Linear(self.nf*2, self.nf*4)
        self.fc4 = nn.Linear(self.nf*2, self.output_dim)

        self.dropout = nn.Dropout(0.2)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = self.dropout(x)
        x = torch.relu(self.fc2(x))
        x = self.dropout(x)
        # x = torch.relu(self.fc3(x))
        x = self.fc4(x)
        return x

In [171]:
### Experience replay
class ExperienceReplay:
    def __init__(self, max_memory: int = 10000):
        self.max_memory = max_memory
        self.memory = deque(maxlen=self.max_memory)

    def add_experience(self, sars: list):
        """
        Args:
        sars (List): [previous_state, action, reward, current_state, done]
        """
        self.memory.append(sars)
        
    def get_batch(self, env, batch_size = 8):
        memory_length = len(self.memory)
        env_dim = env.observation_space.shape[0]
        true_batch_size = min(batch_size, memory_length)

        idxes = np.random.choice(memory_length, size=true_batch_size, replace=False)
        
        batch_samples = list(zip(*[self.memory[idx] for idx in idxes]))
        previous_states, actions, rewards, current_states, dones = (np.stack(sample) for sample in batch_samples)

        return previous_states, current_states, actions, rewards, dones

    def _getlen(self):
        return len(self.memory)

In [187]:
class DQNAgent:
    def __init__(self, env, model, exp_replay, gamma, batch_size = 8, learning_rate = 0.001, epochs=10, epsilon=1.0):
        # Initialize variables
        self.model = model
        self.env = env
        self.exp_replay = exp_replay
        self.gamma = gamma

        # Train parameters
        self.optimizer = optim.Adam(self.model.parameters(), lr = learning_rate)
        self.criterion = nn.MSELoss()
        self.batch_size = batch_size
        self.epochs = epochs
        self.epsilon = epsilon
        self.loop = 0

    def _getloss(self, x_previous: np.array, x_current: np.array, actions, rewards, dones):
        
        q_values = self.model(torch.tensor(x_previous, dtype=torch.float32))
        q_targets = q_values.clone()
        
        # Bellman equation
        current_q_values = self.model(torch.tensor(x_current, dtype=torch.float32))
    
        for i in range(x_previous.shape[0]):
            q_targets[i, actions[i]] = rewards[i] + self.gamma * torch.max(current_q_values[i]) * ~dones[i]
        
        loss = self.criterion(q_values, q_targets)
    
        return loss
     
    def train(self):
        print("Initializing model training")
        
        for epoch in range(self.epochs):
            self.env.reset()
            current_state = self.env.observe()
            
            done = False
            loop_count = 0
            time_start = time.time()
            epsilon = self.epsilon
            
            while not done:
                previous_state = current_state
                if random.random() < epsilon:
                    action = self.env.action_space.sample()
                else:
                    with torch.no_grad():
                        q = self.model(torch.tensor(previous_state, dtype=torch.float32))
                        action = torch.argmax(q).item()
    
                current_state, reward, done, _ = self.env.step(action)
    
                exp_replay.add_experience(
                    [previous_state, int(action), reward, current_state, done]
                )
    
                previous_batch, target_batch, action_batch, \
                reward_batch, done_batch = exp_replay.get_batch(
                    self.env, batch_size=self.batch_size
                )

                loss = self._getloss(
                    previous_batch, target_batch, 
                    action_batch, reward_batch, done_batch
                )
                
                loop_count += 1
                
                self.optimizer.zero_grad()
                loss.backward()
                self.optimizer.step()

                # Gradient clipping 
                max_norm = 1.0 
                torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm)
                
                epsilon = epsilon/(1 + 0.1*epsilon)
                    
            time_end = time.time()
            print(f"End of epochs: {epoch+1}, number of loop: {loop_count}, time taken: {round(time_end - time_start, 3)} seconds")

    def save_model(self, path="./saved_model.pth"):
        torch.save(model, path)
        print(f"Model saved to {path}")

    def load_model(self, path=None, train=False):
        assert path is not None, "Add model's path."
        model = torch.load(path)
        if train:
            return model
        else:
            model.eval()
            return model

In [188]:
env_object = CustomEnv()
model = DQN(env_object.observation_space.shape[0],4)
exp_replay = ExperienceReplay()

agent = DQNAgent(
    env=env_object, 
    model=model, 
    exp_replay=exp_replay,
    gamma=0.9,
    batch_size = 64,
    learning_rate = 0.001,
    epochs = 20,
    epsilon = 1.0
)

# Train
agent.train()

Initializing model training
---------------------------------
Cumulative Reward: 0.96
YOU WIN!!!!
---------------------------------
End of epochs: 1, number of loop: 5, time taken: 0.036 seconds
---------------------------------
Cumulative Reward: 0.99
YOU WIN!!!!
---------------------------------
End of epochs: 2, number of loop: 2, time taken: 0.009 seconds
---------------------------------
Cumulative Reward: -10.0
YOU LOSE...
---------------------------------
End of epochs: 3, number of loop: 1, time taken: 0.006 seconds
---------------------------------
Cumulative Reward: -10.1
YOU LOSE...
---------------------------------
End of epochs: 4, number of loop: 11, time taken: 0.074 seconds
---------------------------------
Cumulative Reward: -10.73
YOU LOSE...
---------------------------------
End of epochs: 5, number of loop: 74, time taken: 1.126 seconds
---------------------------------
Cumulative Reward: 0.37999999999999967
YOU WIN!!!!
---------------------------------
End of epoch

In [190]:
exp_replay._getlen()

1046

In [189]:
env_object2 = CustomEnv()
print("----Origin-----")
env_object2.render(process=True)
previous_state = env_object2.state

with torch.no_grad():
    action = torch.argmax(model(torch.tensor(previous_state, dtype=torch.float32))).item()

current_state, reward, done, info = env_object2.step(action)

for i in range(20):
    print("-----------------------")
    env_object2.show_action(action)
    env_object2.render(process=True)
    with torch.no_grad():
        action = torch.argmax(model(torch.tensor(current_state, dtype=torch.float32))).item()
    current_state, reward, done, info = env_object2.step(action)
    if done:
        break

----Origin-----
Cumulative Reward: 0

   0   0   3   0   2   0
   0   0   0   0   0   0
   0   0   1   0   0   0
   0   0   0   0   0   0
   0   0   0   0   0   0
   0   0   0   0   0   0
-----------------------
RIGHT
Cumulative Reward: -0.01

   0   0   3   0   2   0
   0   0   0   0   0   0
   0   0   0   1   0   0
   0   0   0   0   0   0
   0   0   0   0   0   0
   0   0   0   0   0   0
-----------------------
LEFT
Cumulative Reward: -0.02

   0   0   3   0   2   0
   0   0   0   0   0   0
   0   0   1   0   0   0
   0   0   0   0   0   0
   0   0   0   0   0   0
   0   0   0   0   0   0
-----------------------
DOWN
Cumulative Reward: -0.03

   0   0   3   0   2   0
   0   0   0   0   0   0
   0   0   0   0   0   0
   0   0   1   0   0   0
   0   0   0   0   0   0
   0   0   0   0   0   0
-----------------------
UP
Cumulative Reward: -0.04

   0   0   3   0   2   0
   0   0   0   0   0   0
   0   0   1   0   0   0
   0   0   0   0   0   0
   0   0   0   0   0   0
   0   0   0   0  