In [35]:
import gym
import math
import numpy as np
from numpy import argmax
import torch
from torch import nn
from random import sample
from collections import deque

 

In [21]:
class Agent:

    def __init__(self, n_states, n_actions, epsilon, learning_rate, discount_value):
        self.learning_rate = learning_rate
        self.discount_value = discount_value
        self.n_states = n_states
        self.n_actions = n_actions
        self.epsilon = epsilon
        self.initialize_q_table(n_states, n_actions)
    
    
    # This is the policy
    def choose_best_action(self, state):
        '''parameters: state 
        returns: action'''
        # Always need choose_action
        return np.argmax(self.q_table[state]) # <-- returns the index with the highest action value for that state
    
    def choose_action(self, state): # Policy
        if np.random.rand(1) < self.epsilon:
            action = np.random.randint(0, self.n_actions)
        else:
            action = self.choose_best_action(state)
            
        return action
    
        
    def initialize_q_table(self, n_states, n_actions):
        self.q_table = np.zeros((n_states, n_actions))
        
       
    
    # HOMEWORK
    def bellman_equation(self, state, action, next_state, reward):
        current_q = self.q_table[state][action]
        max_q_next_state = np.max(self.q_table[next_state])
        new_q =  current_q + self.learning_rate * (reward + (self.discount_value * max_q_next_state) - current_q)
        # single step forecasting?
        # value of state = current value + immediate reward + negative or positive expected reward for that state
        # reward - immediate reward for moving to next_state? 0 for everything except the goal node (1)
        # max_q_next_state - max value of state for all of its actions?
        return new_q
        
    
    def update_q_table(self, state, action, next_state, reward):
        new_q = self.bellman_equation(state, action, next_state, reward)
        self.q_table[state][action] = new_q
        
    
        


In [62]:
class FCNeuralNetwork(nn.Module):
    
    
    def __init__(self, x_size, y_size, hidden_size, learning_rate):
        super().__init__()
        
        self.x_size = x_size
        self.y_size = y_size
        self.hidden_size = hidden_size
        
        self.learning_rate = learning_rate
        
        self.linear_relu_nn = nn.Sequential(
            nn.Linear(x_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, y_size)
        )
        
        self.optimizer = torch.optim.Adam(self.linear_relu_nn.parameters(), lr = self.learning_rate)
        self.loss = torch.nn.MSELoss()
        
        
    def forward(self, x):
        x = torch.tensor(x, dtype=torch.float32)
        out = self.linear_relu_nn(x)
        return out
        
    
    
class ReplayBuffer:
    
    def __init__(self, size):
        self.buffer = deque([], maxlen = size)
        
    def append(self, transition):
        self.buffer.append(transition)
        
    def sample(self, sample_size):
        return sample(self.buffer, sample_size)
    
    def __len__(self):
        return len(self.buffer)
    
    
 
    

class DQNAgent(): 

    
    def __init__(self, num_actions, num_states, model, epsilon, use_replay_buffer=True, use_target_model=True):
        
        # Hyperparameters
        self.sync_network_rate = 200
        self.batch_size = 25
        self.buffer_size = 10_000

        self.num_states = num_states,
        self.num_actions = num_actions
        self.model = model
        self.epsilon = epsilon
        self.step_counter = 0


        # Target model initialisation
        if use_target_model == True:
            self.target_model = type(model)(model.x_size, model.y_size,
                                            model.hidden_size, model.learning_rate) # Creates another instance of that model
            self.target_model.load_state_dict(self.model.state_dict()) # Transfers the weights
            self.use_target_model = True
        else:
            self.use_target_model = False

        # Replay buffer initialisation
        if use_replay_buffer == True:
            self.replay_buffer = ReplayBuffer(self.buffer_size)
            self.use_replay_buffer = True
        else:
             self.use_replay_buffer = False

    def choose_best_action(self, state):
        if use_target_model == True:
            action = argmax(self.target_model(state))
        else:
            action = argmax(self.model(state))
        return action

    def choose_action(self, state):
        if np.random.rand(1) < self.epsilon:
            action = np.random.randint(0, self.num_actions)
        else:
            action = argmax(self.model(state))
        return action

    def sync_network(self):
        if use_target_model == False:
            return
        elif sync_network_rate % step_counter == 0 and step_counter != 0:
            self.target_model.load_state_dict(self.model.state_dict()) 


    def learn(self, samples):

        self.sync_network()

        if self.use_replay_buffer == True:
            # this will batch samples from buffer together into the samples list
            if len(self.replay_buffer) < batch_size:
                return
            samples = self.replay_buffer.sample(batch_size)


        for sample in samples:

            self.step_counter += 1

            # Q value predicted for that state, action pair
            predicted_value = self.model(state)[action] # predicted value = Q(s, a)

            # Predict the next Q value using either the target network or the same network
            if self.use_target_network == True:
                predicted_next_value = self.target_model(next_state)
            else:
                predicted_next_value = self.model(next_state)

            target_value = reward + self.discount_factor * max(predicted_next_value)  # target value = r + γQ(s', a')

            # Loss function and backpropogation
            loss = self.model.loss(predicted_value, target_value)
            loss.backward()
            self.model.optimizer.step()


    
        
    
    
## TO DO
# FIGURE OUT LOSS FUNCTION AND BACKPROPOGATION - DONE
# IMPLEMENT OPTIONAL BUFFER SAMPLING - DONE
# IMPLEMENT OPTIONAL TARGET NETWORK - DONE
# USE MATPLOTLIB OR SOMETHING TO VISUALIZE THE PERFORMANCE OF THE THREE AGENT TYPES

    
    

In [63]:
episodes = 3000

# Epsilon values
start_epsilon = 0.8
min_epsilon = 0.1
epsilon_reduction = 0.1
epsilon_decay = 100

# Visualization
render_decay = 1000

# Environment setup
env_name = "FrozenLake-v1"
env = gym.make(env_name, render_mode="human")
num_actions = env.action_space.n
num_states = env.observation_space.n

seed = 42
np.random.seed(seed)


# Neural network hyperparameters
hidden_size = 128
learning_rate = 0.01

# Neural network setup
model = FCNeuralNetwork(num_actions, num_states, hidden_size, learning_rate)


# Agent setup
agent_plain = DQNAgent(num_actions, num_states, model, start_epsilon, use_replay_buffer=False, use_target_model=False)
agent_buffer = DQNAgent(num_actions, num_states, model, start_epsilon, use_replay_buffer=True, use_target_model=False)
agent_buffer_target = DQNAgent(num_actions, num_states, model, start_epsilon, use_replay_buffer=True, use_target_model=True)

agent_list = [agent_plain, agent_buffer, agent_buffer_target]


for agent in agent_list:
    

    for episode in range(episodes):

        if (episode + 1) % render_decay == 0:
            env=gym.make(env_name, render_mode="human")
        else:
            env=gym.make(env_name)

        if (episode + 1) % epsilon_decay == 0 and (agent.epsilon - epsilon_reduction) > min_epsilon:
            agent.epsilon -= epsilon_reduction


        state, _ = env.reset()
        terminated, truncated = False, False
        samples = []

        while not terminated and not truncated:
            

            action = agent.choose_action(state)
            new_state, reward, terminated, truncated, _ = env.step(action)
            
            transition = (state, action, new_state, reward)
            
            if agent.use_replay_buffer == True:
                agent.replay_buffer.append(transition)
            else: 
                samples.append(transition)
                
            agent.step_counter += 1
                
            if agent.step_counter % agent.batch_size == 0 and agent.step_counter != 0:
                agent.learn(samples)
                
            state = new_state
    
    

RuntimeError: both arguments to matmul need to be at least 1D, but they are 0D and 2D

In [51]:
env.observation_space.n

16