In [None]:
!git clone https://github.com/JoyPang123/RL-Explore-with-Own-made-Env.git
!mv RL-Explore-with-Own-made-Env/snake ./snake
!pip install -e snake

Cloning into 'RL-Explore-with-Own-made-Env'...
remote: Enumerating objects: 55, done.[K
remote: Counting objects: 100% (55/55), done.[K
remote: Compressing objects: 100% (39/39), done.[K
remote: Total 55 (delta 16), reused 48 (delta 11), pack-reused 0[K
Unpacking objects: 100% (55/55), done.


In [None]:
import math

from collections import deque

import random

import numpy as np

import torch
import torch.nn as nn
import torch.optim as optim

import torchvision.transforms as transforms
from torchvision.transforms import InterpolationMode

import gym

In [None]:
class Model(nn.Module):
    def __init__(self, num_actions, in_channels=3):
        super(Model, self).__init__()

        # Create the layers for the model
        self.layers = nn.Sequential(
            nn.Conv2d(
                in_channels=3, out_channels=16,
                kernel_size=5, padding=2, stride=2
            ),  # (16, 16, 16)
            nn.BatchNorm2d(16),
            nn.ReLU(inplace=True),
            nn.Conv2d(
                in_channels=16, out_channels=8,
                kernel_size=5, padding=2, stride=2
            ),  # (8, 8, 8)
            nn.BatchNorm2d(8),
            nn.ReLU(inplace=True),
            nn.Flatten(start_dim=1),
            nn.Linear(64 * 8, num_actions)
        )

    def forward(self, x):
        return self.layers(x)

In [None]:
class DQN():
    def __init__(self, num_actions, device,
                 replace_iter=150, max_len=100,
                 EPS_START=0.9, EPS_END=0.05, EPS_DECAY=200):
        # Create network for target and evaluation
        self.eval_net = Model(num_actions=num_actions).to(device)
        self.target_net = Model(num_actions=num_actions).to(device)
        
        # Set up the replay experience
        self.replay = deque(maxlen=max_len)

        # Transform the image
        self.transforms = transforms.Compose([
            transforms.ToTensor(),
            transforms.Resize(
                (32, 32), interpolation=InterpolationMode.BICUBIC
            )
        ])

        # Set up the counter to update target from eval
        self.target_counter = 0

        # Set up hyperparameters
        self.device = device
        self.num_actions = num_actions
        self.replace_iter = replace_iter
        self.step_counter = 0

        # For exploration probability 
        self.EPS_START = EPS_START
        self.EPS_END = EPS_END
        self.EPS_DECAY = EPS_DECAY
        self.step_total_count = 0

    def choose_action(self, cur_state):
        # Open evaluation mode
        self.eval_net.eval()

        eps_threshold = self.EPS_END + (self.EPS_START - self.EPS_END) \
            * math.exp(-1. * self.step_total_count / self.EPS_DECAY)
        self.step_total_count += 1

        """Choose the action using epsilon greedy policy"""
        # Exploration
        if np.random.uniform() < eps_threshold:
            action = np.random.randint(0, self.num_actions)
        else: # Exploitation
            cur_state = self.transforms(cur_state).to(device).unsqueeze(0)
            action_list = self.eval_net(cur_state)
            action = torch.argmax(action_list, dim=-1).item()

        # Open training mode
        self.eval_net.train()
        return action

    def store_experience(self, state, reward,
                         action, next_state,
                         done):
        """Record the play experience into deque

        The format of the experience:
            [state, reward, action, next_state, done]
        """

        self.replay.append([state, reward, action, next_state, done])

In [None]:
def update(model, batch_size,
           optimizer, criterion,
           tau=0.3,
           gamma=0.9):
    # Set up the device same as model
    used_device = model.device
    # Get the data from the experience
    batch_data = random.sample(model.replay,
                               batch_size)
    
    # Seperate the data into groups
    cur_state_batch = []
    reward_batch = []
    action_batch = []
    next_state_batch = []
    done_batch = []

    for cur_state, reward, action, next_state, done in batch_data:
        cur_state_batch.append(model.transforms(cur_state).unsqueeze(0))
        reward_batch.append(reward)
        action_batch.append(action)
        next_state_batch.append(model.transforms(next_state).unsqueeze(0))
        done_batch.append(done)

    cur_state_batch = torch.cat(cur_state_batch, dim=0).to(device)
    reward_batch = torch.FloatTensor(reward_batch).to(device)
    action_batch = torch.FloatTensor(action_batch).to(device)
    next_state_batch = torch.cat(next_state_batch, dim=0).to(device)
    done_batch = torch.Tensor(done_batch).to(device)

    # Compute the error between eval and target net
    Q_eval = model.eval_net(cur_state_batch).gather(
        dim=1,
        index=action_batch.long().unsqueeze(1)
    ).squeeze(1)

    # Detach from target net to avoid computing the gradient
    Q_next = model.target_net(next_state_batch).detach()
    Q_target = reward_batch + gamma * (1 - done_batch) * torch.max(Q_next, dim=1)[0]

    # Compute loss and update the model
    loss = criterion(Q_eval, Q_target)

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    # Add the counter for the eval
    model.step_counter += 1
    
    # Replace target net with eval net
    if model.step_counter == model.replace_iter:
        model.step_counter = 0
        for eval_parameters, target_parameters in zip(model.eval_net.parameters(),
                                                      model.target_net.parameters()):
            target_parameters.data.copy_(tau * eval_parameters.data + \
                                         (1.0 - tau) * target_parameters.data)

    return loss.item()

In [None]:
def train(model, env, optimizer,
          criterion, iteration,
          batch_size,
          tau=0.3,
          gamma=0.9):
    # Save the training info
    average_reward_history = []
    reward_history = []
    loss_history = []
    total_rewards = 0

    for cur_iter in range(iteration):
        print(f"===========Iteration {cur_iter + 1}/{iteration}============")
        time_step = 0
        rewards = 0
        state = env.reset()["frame"]

        while True:
            # Choose action
            action = model.choose_action(state)
            next_state, reward, done, _ = env.step(action)
            
            # Store experience
            model.store_experience(state, reward, action, next_state["frame"], done)

            # Add rewards
            rewards += reward

            # Train if the experience is enough
            if len(model.replay) > batch_size:
                loss = update(model=model,
                              batch_size=batch_size,
                              optimizer=optimizer,
                              criterion=criterion,
                              tau=tau,
                              gamma=gamma)
                loss_history.append(loss)

            # Step into next state
            state = next_state["frame"]

            # Check whether current model is done or not
            if done:
                print(f"Iteration finished after {time_step + 1} timesteps")
                print(f"Get total rewards {rewards}")
                print(f"The length of the snake is {env.snake.length}")
                break

            time_step += 1

        reward_history.append(rewards)
        total_rewards += rewards
        average_reward_history.append(total_rewards / (cur_iter + 1))

    return average_reward_history, reward_history, loss_history

In [None]:
# Set up device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Used: {device}")

Used: cuda


In [None]:
env = gym.make("snake:snake-v0")

# Set up environment hyperparameters
num_actions = env.action_space.n

# Set up training hyperparameters
batch_size = 128
learning_rate = 0.01
gamma = 0.99
tau = 1.0
replace_iter = 10
max_len = 1000
iteration = 4000

# Used for the exploration
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 200

# Build the model
dqn_model = DQN(
    num_actions=num_actions, 
    device=device, replace_iter=replace_iter,
    max_len=max_len, EPS_START=EPS_START, EPS_END=EPS_END,
    EPS_DECAY=EPS_DECAY
)

In [None]:
# Set up optimizer and criterion
optimizer = optim.Adam(dqn_model.eval_net.parameters(),
                        lr=learning_rate)
criterion = nn.SmoothL1Loss()

In [None]:
# Start training DQN
average_reward_history, reward_history, \
    loss_history = train(
        model=dqn_model, env=env, optimizer=optimizer,
        criterion=criterion, iteration=iteration,
        batch_size=batch_size, tau=tau, gamma=gamma
)

Iteration finished after 72 timesteps
Get total rewards -6.420000000000001
The length of the snake is 1
Iteration finished after 15 timesteps
Get total rewards -5.28
The length of the snake is 1
Iteration finished after 33 timesteps
Get total rewards -5.640000000000001
The length of the snake is 1
Iteration finished after 49 timesteps
Get total rewards -5.960000000000001
The length of the snake is 1
Iteration finished after 16 timesteps
Get total rewards -5.3
The length of the snake is 1
Iteration finished after 31 timesteps
Get total rewards -5.6000000000000005
The length of the snake is 1
Iteration finished after 32 timesteps
Get total rewards -5.62
The length of the snake is 1
Iteration finished after 15 timesteps
Get total rewards -5.28
The length of the snake is 1
Iteration finished after 11 timesteps
Get total rewards -5.2
The length of the snake is 1
Iteration finished after 37 timesteps
Get total rewards -5.720000000000001
The length of the snake is 1
Iteration finished after 2