In [1]:
from utils import DQN, ReplayBuffer, greedy_action, epsilon_greedy, update_target, loss, ddqn_loss

import torch
from torch import nn
import torch.nn.functional as F
import torch.optim as optim
import math
import numpy as np

import gym
import matplotlib.pyplot as plt

In [4]:
## HYPERPARAMETERS

EPSILON = 0.95
NUM_RUNS = 10     
num_episodes = 300
batch_size = 128
alpha = 0.01
target_update = 20
hidden_layer = [128,128]  
buffer_size = 100000
input_size = 4 # DONT CHANGE (state_dim=4)
output_size = 2 # DONT CHANGE (action_dim = 2)
dqn_input = [input_size] + hidden_layer + [output_size]

## EPSILON DECAY

A=0.7
B=0.1
C=0.1
def epsilon(x):
    st=(x-A*num_episodes)/(B*num_episodes)
    cosh=np.cosh(math.exp(-st))
    epsilon=1-(1/cosh+(x*C/num_episodes))
    return epsilon

In [None]:
## RUN DQN ALGORITHM

DQN_runs_results = []
observed_omega = []
min_omega = None
max_omega = None

env = gym.make('CartPole-v1', render_mode='human')
for run in range(NUM_RUNS):
    print(f"Starting run {run+1} of {NUM_RUNS}")
    policy_net = DQN(dqn_input)  # [input_size, output_size]
    target_net = DQN(dqn_input)
    update_target(target_net, policy_net)
    target_net.eval()

    optimizer = optim.SGD(policy_net.parameters(), lr=alpha)
    memory = ReplayBuffer(buffer_size)

    steps_done = 0

    episode_durations = []

    for i_episode in range(num_episodes):
        if (i_episode+1) % 50 == 0:
            print("episode ", i_episode+1, "/", 300)

        observation, info = env.reset()
        state = torch.tensor(observation).float()

        done = False
        terminated = False
        t = 0
        while not (done or terminated):

            action = epsilon_greedy(EPSILON, policy_net, state)
            
            EPSILON = epsilon(i_episode)

            observation, reward, done, terminated, info = env.step(action)
            
            if min_omega is None:
                min_omega = observation[3]
                max_omega = observation[3]
            else:
                min_omega = min(observation[3],min_omega)
                max_omega = max(observation[3],max_omega)
                
            observed_omega.append(observation[3])
                
            reward = torch.tensor([reward])
            action = torch.tensor([action])
            next_state = torch.tensor(observation).reshape(-1).float()

            memory.push([state, action, next_state, reward, torch.tensor([done])])

            # Move to the next state
            state = next_state

            # Perform one step of the optimization (on the policy network)
            if not len(memory.buffer) < batch_size:
                transitions = memory.sample(batch_size)
                state_batch, action_batch, nextstate_batch, reward_batch, dones = (torch.stack(x) for x in zip(*transitions))
                # Compute loss
                mse_loss = loss(policy_net, target_net, state_batch, action_batch, reward_batch, nextstate_batch, dones)
                # Optimize the model
                optimizer.zero_grad()
                mse_loss.backward()
                optimizer.step()
            
            if done or terminated:
                episode_durations.append(t + 1)
            t += 1
        # Update the target network, copying all weights and biases in DQN
        if i_episode % target_update == 0: 
            update_target(target_net, policy_net)
            env.render()
    DQN_runs_results.append(episode_durations)
print('Complete')
print(min_omega)
print(max_omega)

Starting run 1 of 10


  cosh=np.cosh(math.exp(-st))


episode  50 / 300
episode  100 / 300
episode  150 / 300
episode  200 / 300
episode  250 / 300
episode  300 / 300
Starting run 2 of 10
episode  50 / 300
episode  100 / 300
episode  150 / 300
episode  200 / 300
episode  250 / 300
episode  300 / 300
Starting run 3 of 10
episode  50 / 300
episode  100 / 300
episode  150 / 300
episode  200 / 300


In [None]:
## ANGULAR VELOCITY RANGE

plt.scatter(range(len(observed_omega)), observed_omega, s=0.1)
plt.xlabel('Observations')
plt.ylabel('Angular Velocity')

In [None]:
## SHAPE OF EPSILON DECAY FUNCTION

e = []
for i in range(num_episodes):
    e.append(epsilon(i))
plt.plot(e)
plt.xlabel('Episode')
plt.ylabel('Epsilon')
plt.title('Epsilon Decay Function')

In [None]:
## PLOT DQN LEARNING CURVE AND BASELINE
 
results = torch.tensor(DQN_runs_results)
means = results.float().mean(0)
stds = results.float().std(0)
plt.figure(figsize=(10,6))
plt.plot(torch.arange(300), means)
for y in [100]:
    plt.axhline(y,linestyle='--', color='r')
plt.ylabel("Return", fontsize=10, labelpad=15)
plt.xlabel("Episode", fontsize=10, labelpad=15)
plt.title('DQN for A=0.7,B=0.1,C=0.1', fontsize=15)
plt.fill_between(np.arange(300), means, means+stds, alpha=0.3, color='b')
plt.fill_between(np.arange(300), means, means-stds, alpha=0.3, color='b')

baseline_results = torch.tensor(baseline_runs_results)
baseline_means = baseline_results.float().mean(0)
baseline_stds = baseline_results.float().std(0)
plt.plot(torch.arange(300), baseline_means, c='g')
plt.fill_between(np.arange(300), baseline_means, baseline_means+baseline_stds, alpha=0.3, color='g')
plt.fill_between(np.arange(300), baseline_means, baseline_means-baseline_stds, alpha=0.3, color='g')

plt.title('DQN Performance', fontsize=15)
plt.legend(['DQN','Target','Baseline'])
plt.show()

In [None]:
## GREEDY POLICY/Q VALUE PLOTS

import matplotlib.patches as mpatches
q = True    # whether q values or greedy policy is visualised q(False for policy)

angle_range = .2095 
omega_range = 1     

angle_samples = 100
omega_samples = 100
angles = torch.linspace(angle_range, -angle_range, angle_samples)
omegas = torch.linspace(-omega_range, omega_range, omega_samples)

greedy_q_array = torch.zeros((angle_samples, omega_samples))
policy_array = torch.zeros((angle_samples, omega_samples))
for i, angle in enumerate(angles):
    for j, omega in enumerate(omegas):
        state = torch.tensor([0., 0, angle, omega])  # second value is velocity
        with torch.no_grad():
            q_vals = policy_net(state)
            greedy_action = q_vals.argmax()
            greedy_q_array[i, j] = q_vals[greedy_action]
            policy_array[i, j] = greedy_action
if q:
    plt.figure(figsize=(10,6))
    plt.contourf(angles, omegas, greedy_q_array.T, cmap='cividis', levels=100)
    bar = plt.colorbar()
    bar.set_label('Q Values')
    plt.title('Cart Velocity = 0m/s')
    for x in [0]:
        plt.axvline(x,linestyle='--', color='r')
    for y in [0]:
        plt.axhline(y,linestyle='--', color='r')
else:
    plt.contourf(angles, omegas, policy_array.T, cmap='cividis')
    left_patch = mpatches.Patch(color='blue',label='Left')
    right_patch = mpatches.Patch(color='yellow',label='Right')
    plt.legend(handles=[left_patch,right_patch])
    plt.title('Cart Velocity = 0m/s')
    for x in [0]:
        plt.axvline(x,linestyle='--', color='r')
    for y in [0]:
        plt.axhline(y,linestyle='--', color='r')
    
plt.xlabel("Angle (rad)")
plt.ylabel("Angular Velocity (rad/s)")
plt.show()

In [None]:
## RUN DDQN ALGORITHM

DDQN_runs_results = []

env = gym.make('CartPole-v1')
for run in range(NUM_RUNS):
    print(f"Starting run {run+1} of {NUM_RUNS}")
    policy_net = DQN(dqn_input)  # [input_size, output_size]
    target_net = DQN(dqn_input)
    update_target(target_net, policy_net)
    target_net.eval()

    optimizer = optim.SGD(policy_net.parameters(), lr=alpha)
    memory = ReplayBuffer(buffer_size)

    steps_done = 0

    episode_durations = []

    for i_episode in range(num_episodes):
        if (i_episode+1) % 50 == 0:
            print("episode ", i_episode+1, "/", 300)

        observation, info = env.reset()
        state = torch.tensor(observation).float()

        done = False
        terminated = False
        t = 0
        while not (done or terminated):

            # Select and perform an action
            action = epsilon_greedy(EPSILON, policy_net, state)
            
            EPSILON = epsilon(i_episode)

            observation, reward, done, terminated, info = env.step(action)
                
            reward = torch.tensor([reward])
            action = torch.tensor([action])
            next_state = torch.tensor(observation).reshape(-1).float()

            memory.push([state, action, next_state, reward, torch.tensor([done])])

            # Move to the next state
            state = next_state

            # Perform one step of the optimization (on the policy network)
            if not len(memory.buffer) < batch_size:
                transitions = memory.sample(batch_size)
                state_batch, action_batch, nextstate_batch, reward_batch, dones = (torch.stack(x) for x in zip(*transitions))
                # Compute loss
                mse_loss = ddqn_loss(policy_net, target_net, state_batch, action_batch, reward_batch, nextstate_batch, dones)
                # Optimize the model
                optimizer.zero_grad()
                mse_loss.backward()
                optimizer.step()
            
            if done or terminated:
                episode_durations.append(t + 1)
            t += 1
        # Update the target network, copying all weights and biases in DQN
        if i_episode % target_update == 0: 
            update_target(target_net, policy_net)
    DDQN_runs_results.append(episode_durations)
print('Complete')

In [None]:
## DQN VS DDQN PERFORMANCE

DQN_results = torch.tensor(DQN_runs_results)
DQN_means = DQN_results.float().mean(0)
DQN_stds = DQN_results.float().std(0)
plt.figure(figsize=(10,6))
plt.plot(torch.arange(300), DQN_means, c='orange')
for y in [100]:
    plt.axhline(y,linestyle='--', color='r')
plt.ylabel("Return", fontsize=10, labelpad=15)
plt.xlabel("Episode", fontsize=10, labelpad=15)
plt.fill_between(np.arange(300), DQN_means, DQN_means+DQN_stds, alpha=0.3, color='orange')
plt.fill_between(np.arange(300), DQN_means, DQN_means-DQN_stds, alpha=0.3, color='orange')

DDQN_results = torch.tensor(DDQN_runs_results)
DDQN_means = DDQN_results.float().mean(0)
DDQN_stds = DDQN_results.float().std(0)
plt.plot(torch.arange(300), DDQN_means, c='m')
plt.fill_between(np.arange(300), DDQN_means, DDQN_means+DDQN_stds, alpha=0.3, color='m')
plt.fill_between(np.arange(300), DDQN_means, DDQN_means-DDQN_stds, alpha=0.3, color='m')

plt.title('DQN and DDQN Performance', fontsize=15)
plt.legend(['DQN','Target','DDQN'])
plt.show()