# Noteboot for all of out amazing ideas


In [8]:
#imports
import torch
import copy
import torch.nn as nn
import torch.optim as optim
import numpy as np
import random
import gym
from collections import deque
import os
import pandas as pd
import matplotlib.pyplot as plt
from collections import namedtuple


In [None]:
# This is the 'standard' neural network
class QNetwork(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(QNetwork, self).__init__()
        self.fc1 = nn.Linear(state_dim, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, action_dim)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        return self.fc3(x)



In [12]:
def test_pole_length(env, q_network):
    """
    This function runs your trained network on a specific pole length
    You are not allowed to change this function
    """

    wind = 25
    state = env.reset()[0]
    state = torch.tensor(state, dtype=torch.float32).unsqueeze(0)
    done = False
    total_reward = 0


    while not done:

        action = q_network(state).argmax().item()
        next_state, reward, done, _, __ = env.step(action)
        next_state = torch.tensor(next_state, dtype=torch.float32).unsqueeze(0)
        state = next_state
        total_reward += reward

        if total_reward >= 500 and total_reward <= 1000:
            if total_reward % wind == 0:

                env.unwrapped.force_mag = 75

        if total_reward > 1000:
            env.unwrapped.force_mag = 25 + (0.01 * total_reward)

    return total_reward

In [37]:
Transition = namedtuple('Transition', 
                        ('observation', 'action', 'next_observation', 'reward', 'done'))

class ReplayMemory(object):

    def __init__(self, capacity):
        self.memory = deque([], maxlen=capacity)

    def push(self, *args):
        """save a transtion"""
        self.memory.append(Transition(*args))
        print(self.memory)

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size) 
    
    def __len__(self):
        return len(self.memory)

In [44]:
def pick_action(epsilon, policy_net, env, obs):
    p = random.uniform(0,1)
    if p < epsilon:
        action = env.action_space.sample()
    else:
        if isinstance(obs, np.ndarray):
            obs = torch.tensor(obs, dtype=torch.float32)

        if obs.ndim == 1:
            obs = obs.unsqueeze(0)

        with torch.no_grad():
            q_values = policy_net(obs)
            action = torch.argmax(q_values)
    return action

def error(current_q, td_target, error_type):
    if error_type == "mse":
        error = (current_q - td_target) ** 2
    
    return error


In [39]:
def sample_batch(replay_buffer, batch_size):
    """Return tensors for observations, actions, rewards, next_observations, dones"""
    transitions = replay_buffer.sample(batch_size)
    batch = Transition(*zip(*transitions))

    obs = torch.stack([torch.tensor(o, dtype=torch.float32) for o in batch.observation])
    next_obs = torch.stack([torch.tensor(o, dtype=torch.float32) for o in batch.next_observation])
    a = torch.tensor(batch.action, dtype=torch.int64).unsqueeze(1)
    r = torch.tensor(batch.reward, dtype=torch.float32).unsqueeze(1)
    done = torch.tensor(batch.done, dtype=torch.float32).unsqueeze(1)
    return obs, a, r, next_obs, done


In [None]:
def baseline_DQN(learning_rate, gamma, episodes, hidden_dim, target_update, epsilon, capacity, batch_size):

    # Initialize the policy network and optimizer
    env = gym.make('CartPole-v1')
    observation, _ = env.reset()
    policy_net = QNetwork(state_dim=4, action_dim=2)
    target_net = QNetwork(state_dim=4, action_dim=2)
    target_net.load_state_dict(policy_net.state_dict())
    optimizer = optim.Adam(policy_net.parameters(), lr=learning_rate)
    plot_avg_rewards = []

    replay_buffer = ReplayMemory(capacity=capacity)
    
    # Training loop
    for episode in range(episodes):
        observation, _ = env.reset()
        observation = torch.tensor(observation, dtype=torch.float32)
        terminated = False
        truncated = False 
        step_count = 0
        total_reward = 0

        #first action
        action = env.action_space.sample()
        next_observation, reward, terminated, truncated, __ = env.step(action) 
        replay_buffer.push(observation, action, next_observation, reward, terminated)
        observation = next_observation
          
        while not terminated:

            action = pick_action(epsilon, policy_net, env, observation)
            next_observation, reward, terminated, truncated, __ = env.step(action)
            replay_buffer.push(observation, action, next_observation, reward, terminated)
            total_reward += reward

            #do random steps until buffer has at least reached the batch_size
            if step_count < batch_size:
                step_count += 1
                continue 

            #obs, a , r, obs_next, done = replay_buffer.sample(batch_size)
            obs, a , r, obs_next, done = sample_batch(replay_buffer, batch_size)

            if step_count % target_update:
                target_net.load_state_dict(policy_net.state_dict())

            step_count += 1
        
        # update rule
        #sample a batch
            next_q_values = target_net(obs_next)
            next_q_max = next_q_values.max(dim=1)
            td_target = r + gamma * (1-done) * next_q_max

            current_q = policy_net(obs).gather(1, a)

            loss = error(current_q, td_target, "mse")
            

            #update weights of the NN/policy
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            observation = next_observation

    
        # if episode % target_update == 0:
        #     # update target network
        #     target = network.clone()
        if episode % 25 == 0:
            # calculate the avg rewards of the last 25 steps here
            average = total_reward / 25
            plot_avg_rewards.append(average)
            total_reward = 0

    
    env.close()
    
    return plot_avg_rewards


# Hyperparameters, do not change
learning_rate = 0.01
gamma = 0.99
episodes = 500
hidden_dim = 32
target_update = 500
epsilon = 0.6
capacity = 200
batch_size = 20
plot_avg_rewards = baseline_DQN(learning_rate, gamma, episodes, hidden_dim, target_update, epsilon, capacity, batch_size)


deque([Transition(observation=tensor([-0.0027,  0.0318,  0.0185,  0.0030]), action=1, next_observation=array([-0.00207714,  0.22666481,  0.01860359, -0.28375697], dtype=float32), reward=1.0, done=False)], maxlen=200)


AttributeError: 'int' object has no attribute 'item'

he TOLD ME to add that

In [13]:
def plot_results(learning_rate, gamma, episode, hidden_dim, result):

    result = np.array(result)
    
    y = [25*i for i in range(len(result))]
    y_axis = np.array(y)

    #plot graph
    plt.plot(y_axis, result)
    plt.xlabel('Number of Episodes')
    plt.ylabel('Reward')
    plt.title('Average rewards, Learning Rate: {}, Gamma: {}, Episode: {}, Hidden_dim: {}'.format(learning_rate, gamma, episode, hidden_dim))
    plt.show()

    pass

In [None]:
plot_results(learning_rate, gamma, episodes, hidden_dim, plot_avg_rewards)