In [1]:
import numpy as np
import torch
from torch import nn
import torch.nn.init as init
import torch.optim as optim
from scipy.signal import savgol_filter
import collections
import tqdm
import matplotlib.pyplot as plt
import random
import gymnasium as gym

%matplotlib inline

In [2]:
env = gym.make('CartPole-v1')
num_actions = env.action_space.n
num_states = env.observation_space.shape[0]

num_episodes = 200
num_eval_episodes = 20
eval_interval = 10

num_repetitions = 20
num_evaluation_points = num_episodes // eval_interval
returns_over_repetitions = np.zeros((num_repetitions, num_evaluation_points))

learning_rate = 1e-3

In [3]:
# Check for GPU
device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)
print(f"Using {device} device")

np.random.seed(42)
torch.manual_seed(42)

Using mps device


<torch._C.Generator at 0x10def5a10>

In [4]:
def argmax(x):
    """Own variant of np.argmax with random tie breaking"""
    try:
        return np.random.choice(np.where(x == np.max(x))[0])
    except:
        return np.argmax(x)
def smooth(y, window, poly=2):
    """
    y: vector to be smoothed
    window: size of the smoothing window"""
    # print('Smoothing with window size: {} and y: {}'.format(window, y))
    return savgol_filter(y, window, poly)


def softmax(x, temp):
    """Computes the softmax of vector x with temperature parameter 'temp'"""
    x = x / temp  # scale by temperature
    z = x - max(x)  # substract max to prevent overflow of softmax
    return np.exp(z) / np.sum(np.exp(z))  # compute softmax

class LearningCurvePlot:

    def __init__(self, title=None):
        self.fig, self.ax = plt.subplots()
        self.ax.set_xlabel("Timestep")
        self.ax.set_ylabel("Episode Return")
        if title is not None:
            self.ax.set_title(title)

    def add_curve(self, x, y, label=None):
        """y: vector of average reward results
        label: string to appear as label in plot legend"""
        if label is not None:
            self.ax.plot(x, y, label=label)
        else:
            self.ax.plot(x, y)

    def set_ylim(self, lower, upper):
        self.ax.set_ylim([lower, upper])

    def add_hline(self, height, label):
        self.ax.axhline(height, ls="--", c="k", label=label)

    def save(self, name="test.png"):
        """name: string for filename of saved figure"""
        self.ax.legend()
        self.fig.savefig(name, dpi=300)

In [5]:
class NeuralNetwork(nn.Module):
    def __init__(self, num_states, num_actions):
        super(NeuralNetwork, self).__init__()

        self.dqn_model = nn.Sequential(
            nn.Linear(num_states, 64),
            nn.ReLU(),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, 16),
            nn.ReLU(),
            nn.Linear(16, num_actions),
        )
        
        self.init_weights()

    def forward(self, x):
        return self.dqn_model(x)
    
    def init_weights(self):
        for layer in self.dqn_model:
            if isinstance(layer, nn.Linear):
                init.xavier_uniform_(layer.weight)

In [6]:
class DQN_Agent:
    def __init__(self, env, learning_rate=1e-3):
        self.env = env
        self.learning_rate = learning_rate
        self.policy_net = NeuralNetwork(num_states=4, num_actions=2)
        self.target_net = NeuralNetwork(num_states=4, num_actions=2)
        self.optimizer = optim.Adam(self.policy_net.parameters(), lr=learning_rate)
        # self.optimizer = optim.AdamW(self.policy_net.parameters(), lr=learning_rate, amsgrad=True)
        self.target_net.load_state_dict(self.policy_net.state_dict())  # Synchronize weights of target and policy network
        self.memory = collections.deque([], maxlen=1000000)
        self.batch_size = 32 # Need tuning and plot graphs for different settings
        self.gamma = 0.95 # Need tuning and plot graphs for different settings
        self.steps_done = 0

        
    def select_action(self, state, policy="egreedy", epsilon=0.05):
        if policy == "egreedy":
            if np.random.rand() <= epsilon:
                return torch.tensor([[self.env.action_space.sample()]], dtype=torch.long)
            else:
                with torch.no_grad():
                    action_values = self.policy_net(state)
                    return torch.argmax(action_values)
        elif policy == "greedy":
            with torch.no_grad():
                action_values = self.policy_net(state)
                return torch.argmax(action_values)
        # elif policy == "softmax":
        #     with torch.no_grad():
        #         action_values = self.policy_net.net(state)
        #         # Probabilities of taking each action for state s
        #         p = softmax(action_values, temp=1)
        #         action = torch.multinomial(, 1)
        #         return action

    def get_sample(self):
        return random.sample(self.memory, self.batch_size)

    def evaluate(self, max_episode_length=1000):
        returns = []
        for _ in range(num_eval_episodes):
            state= self.env.reset()[0]
            R_episode = 0
            for _ in range(max_episode_length):
                state = torch.from_numpy(state).float().unsqueeze(0)
                action = self.select_action(state, policy="greedy")
                next_state, reward, episode_done, episode_truncated, _= self.env.step(action.item())
                R_episode += reward
                if episode_done or episode_truncated:
                    break
                state = next_state
            returns.append(R_episode)
        mean_return = np.mean(returns)
        return mean_return

In [7]:
def DQN_learning():
    agent = DQN_Agent(env, learning_rate=learning_rate)
    reward_means = []
    for e in range(num_episodes):
        state = agent.env.reset()[0] # Sample initial state
        state = torch.tensor(state, dtype=torch.float32).unsqueeze(0)

        episode_done = False # Completed the episode
        episode_truncated = False # For example reaching the maximum number of steps

        while not (episode_done or episode_truncated):
            action = agent.select_action(state, policy="egreedy").reshape(1, 1) # Sample action (e.g, epsilon-greedy)
            action_index = action.item()
            next_state, reward, episode_done, episode_truncated, _ = agent.env.step(action_index) # Simulate environment
            reward = torch.tensor([reward])
            next_state = None if episode_done else torch.tensor(next_state, dtype=torch.float32).unsqueeze(0) # If the epsidoe terminates no next state

            # Store experience in buffer
            agent.memory.append((state, action, next_state, reward))
            state = next_state

            # Sample a batch of experiences
            if len(agent.memory) >= agent.batch_size:
                experiences = agent.get_sample()
                states_tuple, actions_tuple, next_states_tuple, rewards_tuple = zip(*experiences) # Unpack the batch
                # Convert to tensors
                states_batch = torch.cat(states_tuple)
                actions_batch = torch.cat(actions_tuple)
                rewards_batch = torch.cat(rewards_tuple)

                # Calculate the current estimated Q-values by following the current policy
                current_q_values = agent.policy_net(states_batch).gather(1, actions_batch)

                # Calculate the target Q-values by Q-learning update rule
                next_state_values = torch.zeros(agent.batch_size)
                for i in range(len(next_states_tuple)):
                    if next_states_tuple[i] is not None:
                        with torch.no_grad(): # Speed up the computation by not tracking gradients
                            next_state_values[i] = agent.target_net(next_states_tuple[i]).max(1)[0]               
                target_q_values = (next_state_values * agent.gamma) + rewards_batch
                
                # Update current policy
                # criterion = torch.nn.SmoothL1Loss() # Compute Huber loss <= works better 
                criterion = torch.nn.MSELoss()
                loss = criterion(current_q_values, target_q_values.unsqueeze(1))
                agent.optimizer.zero_grad()
                loss.backward()
                # torch.nn.utils.clip_grad_value_(agent.policy_net.parameters(), 100) # Clip gradients
                agent.optimizer.step()

                # Syncronize target and policy network to stabilize learning
                agent.steps_done += 1
                if agent.steps_done % 50 == 0:
                    agent.target_net.load_state_dict(agent.policy_net.state_dict())
        # Evaluate the performance every eval_interval episodes
        if e % eval_interval == 0:
            print("Episode: {}".format(e))
            returns = agent.evaluate()
            print(f"Evaluation: reward for episode {e} is {returns}")
            reward_means.append(returns)
            
    return reward_means

In [8]:
def average_over_repetitions():
    for i in tqdm.tqdm(range(num_repetitions)):
        returns = DQN_learning()
        returns_over_repetitions[i] = np.array(returns)

    # Plotting the average performance
    episodes = np.arange(num_evaluation_points) * eval_interval
    average_returns = np.mean(returns_over_repetitions, axis=0)

    plot = LearningCurvePlot(title="Average DQN Performance Over Repetitions")
    plot.add_curve(episodes, average_returns, label="Average Return")
    plot.save(name="dqn.png")

In [9]:
def experiment():
    average_over_repetitions()

In [10]:

experiment()

  0%|          | 0/20 [00:00<?, ?it/s]

Episode: 0
Evaluation: reward for episode 0 is 9.65
Episode: 10
Evaluation: reward for episode 10 is 9.45
Episode: 20
Evaluation: reward for episode 20 is 9.25
Episode: 30
Evaluation: reward for episode 30 is 9.5
Episode: 40
Evaluation: reward for episode 40 is 9.65
Episode: 50
Evaluation: reward for episode 50 is 9.3
Episode: 60
Evaluation: reward for episode 60 is 10.25
Episode: 70
Evaluation: reward for episode 70 is 61.05
Episode: 80
Evaluation: reward for episode 80 is 213.65
Episode: 90
Evaluation: reward for episode 90 is 238.1
Episode: 100
Evaluation: reward for episode 100 is 320.95
Episode: 110
Evaluation: reward for episode 110 is 142.4
