In [None]:
import gym
import random
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from collections import deque

In [None]:
# Create the environment 
environment_name = "CartPole-v1"
environment = gym.make(environment_name)

In [None]:
class NN_Model(nn.Module):
    def __init__(self):
        nn.Module.__init__(self)
        self.fc1 = nn.Linear(4,32) # 1st layer: 4 inputs, Hidden layer: 128 neurons 
        self.fc2 = nn.Linear(32,2) # 3rd layer 2 outputs 
    
    def forward(self,x):
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x    

In [None]:
class DQN_Agent():
    def __init__(self):
        self.action_size = environment.action_space.n # Action size = 2 (left, right)
        self.state_size = environment.observation_space.shape[0] # State size = 4 ()
        
        self.gamma = 0.95 # 0 means prioritise immediate rewards, 1 means prioritise future rewards
        self.epsilon = 1 # Exploration rate 
        self.epsilon_decay = 0.995 # The rate of decreasing the exploration rate
        self.epsilon_min = 0.01 # The minimum exploration rate 
        self.learning_rate = 0.001 
        self.memory = deque(maxlen=2000) # Basically an array but last element gets removed when a new one is added
        self.batch_size = 64
        
        self.optimizer = optim.Adam(model.parameters(), self.learning_rate)
        self.mse_loss = nn.MSELoss()
    
    # Store each episode in memory
    def remember(self, state, action, reward, next_state, done):
        state = torch.tensor(state).float()
        next_state = torch.tensor(next_state).float()
        self.memory.append((state, action, reward, next_state, done))
     
    # Select an action to make
    def action(self, state, model):
        # If generated number is between 0-1 is less than epsilon, select random action
        if np.random.rand() <= self.epsilon: 
            return random.randrange(self.action_size)
        # Otherwise select action based on neural network
        else:
            state = torch.tensor(state).float()
            action = model(state)
            return torch.argmax(action).tolist()
    
    # Train the neural network with the stored memory 
    def train(self, model):
        # Only perform training if memory is greater than the batch size
        if len(self.memory) > self.batch_size:
            batch = random.sample(self.memory, self.batch_size) # Extract a random batch of replays in memory
            for state, action, reward, next_state, done in batch:
                current_q_values = model(state)
                target_q_values = model(next_state)
                if not done:
                    action_value = (reward + self.gamma * torch.max(target_q_values).tolist()) # Calculate temporal difference
                else:
                    action_value = reward
                target_q_values[action] = action_value

                loss = self.mse_loss(current_q_values, target_q_values) 
                self.optimizer.zero_grad()
                loss.backward()
                self.optimizer.step()
    
    # Decay the exploration rate 
    def update_epsilon(self):
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay
        

In [None]:
model = NN_Model()
agent = DQN_Agent()

In [None]:
time = 0 
episodes = 0
while time < 498: # Train for 200 epochs
    episodes += 1
    state = environment.reset() # Reset the game 
    state = torch.tensor(state).float() # Convert from numpy to tensor

    for time in range(2000):
        action = agent.action(state, model) # Choose action
        next_state, reward, done, _ = environment.step(action) # Apply the action onto the cartpole
        reward = reward if not done else -10
        agent.remember(state, action, reward, next_state, done) # Add episode to memory
        agent.train(model) #  Train agent
        agent.update_epsilon() 
        state = next_state
        if done:
            print("Episode:", episodes, "/1000", "Score:", time)
            break

        environment.render() # Display the cartpole game
environment.close() # Close the window 

# Save the trained weights 
torch.save(model.state_dict(), 'trained_weights.pt')


In [None]:
#Load the trained weights 
model.load_state_dict(torch.load('trained_weights.pt'))
load = model.eval()

In [None]:
state = environment.reset() # Reset the game 
state = torch.tensor(state).float() # Convert from numpy to tensor
for time in range(2000):
    action = agent.action(state, load) # Choose action
    next_state, reward, done, _ = environment.step(action) # Apply the action onto the cartpole
    state = next_state
    if done:
        print("Score:", time)
        break

    environment.render() # Display the cartpole game
environment.close() # Close the window 