In [5]:
import numpy as np
import torch 
import torch.nn as nn
import torch.optim as optim
import gymnasium as gym
import random
from collections import deque
	
env = gym.make("CartPole-v1")
#env = gym.make("CartPole-v1", render_mode="human")
env._max_episode_steps = 1000


In [6]:
class ReplayMemory():
    def __init__(self, max_samples):
        self.memory = deque([], maxlen=max_samples)

    def push(self, state, action, next_state, reward):
        self.memory.append((state, action, next_state, reward))

    def sample(self, sample_size):
        return random.sample(self.memory, sample_size)

    def __len__(self):
        return len(self.memory)


In [7]:
class DQN(nn.Module):
    def __init__(self, input_size, output_size):
        super().__init__()
        
        self.hidden1 = nn.Linear(input_size, 512)
        self.relu1 = nn.ReLU()
        self.hidden2 = nn.Linear(512, 512)
        self.relu2 = nn.ReLU()
        self.out = nn.Linear(512, output_size)

    def forward(self, x):
        x = self.hidden1(x)
        x = self.relu1(x)
        x = self.hidden2(x)
        x = self.relu2(x)
        x = self.out(x)

        return x

In [8]:
n_observations = env.observation_space.shape
n_actions = env.action_space.n
device = "cuda" if torch.cuda.is_available() else "cpu"

learning_rate = 0.005
gamma = 0.99
tau = 0.01
replay_memory_size = 5000
batch_size = 100

episodes = 2000

In [9]:
target_policy = DQN(n_observations[0], n_actions)
policy_net = DQN(n_observations[0], n_actions)
target_policy.load_state_dict(policy_net.state_dict())

memory = ReplayMemory(replay_memory_size)

In [10]:
policy_optimizer = optim.Adam(policy_net.parameters(), lr=learning_rate, amsgrad=True)
bellmann_error = nn.HuberLoss()

In [11]:
def choose_action(state, steps):
    eps_max = 0.95
    eps_min = 0.05
    eps_step = 10000

    threshold = eps_min + (eps_max - eps_min) * np.exp(-1 * (steps / eps_step))

    selection = np.random.rand()

    if selection > threshold:
        with torch.no_grad():
            return policy_net(state).max(1).indices.item()
    else:
        return env.action_space.sample()


In [12]:
import gymnasium as gym
import math
import random
import matplotlib
import matplotlib.pyplot as plt

# set up matplotlib
is_ipython = 'inline' in matplotlib.get_backend()
if is_ipython:
    from IPython import display

plt.ion()

episode_durations = []
cum_rewards = []

def plot_durations(show_result=False):
    plt.subplots
    durations_t = torch.tensor(episode_durations, dtype=torch.float)
    if show_result:
        plt.title('Result')
    else:
        plt.clf()
        plt.title('Training...')
    plt.xlabel('Episode')
    plt.ylabel('Duration')
    plt.plot(durations_t.numpy())
    # Take 100 episode averages and plot them too
    if len(durations_t) >= 100:
        means = durations_t.unfold(0, 100, 1).mean(1).view(-1)
        means = torch.cat((torch.zeros(99), means))
        plt.plot(means.numpy())

    plt.pause(0.001)  # pause a bit so that plots are updated
    if is_ipython:
        if not show_result:
            display.display(plt.gcf())
            display.clear_output(wait=True)
        else:
            display.display(plt.gcf())

In [19]:
def optimize():
    if len(memory) < batch_size:
        return
    
    transitions = memory.sample(batch_size)

    #there won't be None values in state, which is why we can cat it
    states = torch.cat([t[0] for t in transitions])
    actions = torch.tensor([t[1] for t in transitions]).unsqueeze(-1)
    rewards = torch.tensor([t[3] for t in transitions], dtype=torch.float32)
    
    #there will be None values in next_state which is why we cant cat it -> create mask
    next_states = [t[2] for t in transitions]
    non_final_next_states_idxs = torch.tensor([i for i, t in enumerate(next_states) if t is not None])
    non_final_next_states = torch.cat([t for t in next_states if t is not None])
    
    q_values_policy_net = policy_net(states).gather(1, actions)
    
    q_values_next_states = torch.zeros(batch_size, dtype=torch.float32, device=device)
    
    with torch.no_grad():
        q_values_next_states[non_final_next_states_idxs] = target_policy(non_final_next_states).max(1).values
        
    expected_q_values = rewards + (gamma * q_values_next_states)
    
    loss = bellmann_error(q_values_policy_net, expected_q_values.unsqueeze(-1))
    
    policy_optimizer.zero_grad()
    loss.backward()
    
    torch.nn.utils.clip_grad_value_(policy_net.parameters(), 100)
    
    policy_optimizer.step()
        


In [20]:
step = 0

for episode in range(episodes):
    state, info = env.reset()
    state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)

    done = False
    
    t = 0
    
    while not done:
        action = choose_action(state, step)

        new_state, reward, terminated, truncated, _ = env.step(action)
        
        # decrease the reward, the more it deviates from the center
        reward = reward * (1 - np.abs(new_state[0] / 4.8)) #4.8 is the maximum observation in each direction

        done = terminated or truncated
        
        if terminated:
            new_state = None
        else:
            new_state = torch.tensor(new_state, dtype=torch.float32, device=device).unsqueeze(0)

        memory.push(state, action, new_state, reward)

        state = new_state

        # optimize step here
        optimize()
        # until here

        #update weights of target with policy
        target_dic = target_policy.state_dict()
        policy_dic = policy_net.state_dict()

        for keys in target_dic:
            target_dic[keys] = policy_dic[keys] * tau + target_dic[keys] * (1-tau)

        target_policy.load_state_dict(target_dic)
        
        if done:
            episode_durations.append(t + 1)
            plot_durations()
            break

        step += 1
        t+=1
    

    

KeyboardInterrupt: 

<Figure size 640x480 with 0 Axes>

In [3]:
torch.save(target_policy.state_dict(), "cartpole_target")

NameError: name 'target_policy' is not defined

In [13]:
env = gym.make("CartPole-v1", render_mode="human")
env._max_episode_steps = 5000

evaluation_net = DQN(n_observations[0], n_actions)
state_dict = torch.load("cartpole_target")
evaluation_net.load_state_dict(state_dict)

evaluation_net.eval()
state, _ = env.reset()
state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)

done = False

while not done:
    action = evaluation_net(state).max(1).indices.item()

    new_state, reward, terminated, truncated, _ = env.step(action)

    done = terminated or truncated
    
    if terminated:
        new_state = None
    else:
        new_state = torch.tensor(new_state, dtype=torch.float32, device=device).unsqueeze(0)

    state = new_state

