In [7]:
import torch as T
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import gym
import matplotlib.pyplot as plt

In [8]:
env = gym.make('CarRacing-v2')
n_states = env.observation_space.shape[0]
n_actions = 3  # left, right, gas
hidden_size = 256
lr = 0.001
gamma = 0.99
epsilon = 1.0
eps_dec = 0.9995
eps_min = 0.01

In [9]:

class DoubleDQN(nn.Module):
    def __init__(self, n_states, n_actions, hidden_size, lr):
        super(DoubleDQN, self).__init__()

        self.fc1 = nn.Linear(n_states, hidden_size)
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.fc3 = nn.Linear(hidden_size, n_actions)

        self.optimizer = optim.Adam(self.parameters(), lr=lr)
        self.loss = nn.MSELoss()
        self.device = T.device('cuda:0' if T.cuda.is_available() else 'cpu')
        self.to(self.device)

    def forward(self, state):
        x = F.relu(self.fc1(state))
        x = F.relu(self.fc2(x))
        actions = self.fc3(x)
        return actions


In [10]:
class DoubleDQNAgent():
    def __init__(self, n_states, n_actions, hidden_size, lr, gamma=0.99, epsilon=1.0, eps_min=0.01, eps_dec=0.9995):
        self.lr = lr
        self.n_states = n_states
        self.n_actions = n_actions
        self.gamma = gamma
        self.epsilon = epsilon
        self.eps_dec = eps_dec
        self.eps_min = eps_min
        self.action_space = np.arange(self.n_actions)

        self.Q_eval = DoubleDQN(self.n_states, self.n_actions, hidden_size, self.lr)
        self.Q_next = DoubleDQN(self.n_states, self.n_actions, hidden_size, self.lr)

    def choose_action(self, observation):
        if np.random.random() > self.epsilon:
            state = T.tensor([observation], dtype=T.float32).to(self.Q_eval.device)
            actions = self.Q_eval.forward(state)
            action = T.argmax(actions).item()
        else:
            action = np.random.choice(self.action_space)
        return action

    def decrement_epsilon(self):
        self.epsilon = self.epsilon*self.eps_dec if self.epsilon > self.eps_min else self.eps_min

    def learn(self, state, action, reward, state_):
        self.Q_eval.optimizer.zero_grad()

        states = T.tensor([state], dtype=T.float32).to(self.Q_eval.device)
        actions = T.tensor([action]).to(self.Q_eval.device)
        rewards = T.tensor([reward]).to(self.Q_eval.device)
        states_ = T.tensor([state_], dtype=T.float32).to(self.Q_eval.device)

        q_pred = self.Q_eval.forward(states)[actions]
        q_next = self.Q_next.forward(states_)[T.argmax(self.Q_eval.forward(states_))]
        q_target = rewards + self.gamma*q_next

        loss = self.Q_eval.loss(q_target, q_pred).to(self.Q_eval.device)
        loss.backward()
        self.Q_eval.optimizer.step()
        self.decrement_epsilon()

        if self.epsilon <= self.eps_min:
            self.update_networks()

    def update_networks(self):
        self.Q_next.load_state_dict(self.Q_eval.state_dict())

In [12]:
agent = DoubleDQNAgent(n_states, n_actions, hidden_size, lr, gamma, epsilon, eps_min, eps_dec)

n_games = 500
scores = []
eps_history = []

for i in range(n_games):
    done = False
    score = 0
    observation = env.reset()
    while not done:
        action = agent.choose_action(observation)
    obs_, reward, done, info = env.step(action)
    score += reward
    DoubleDQNAgent.learn(observation, action, reward, obs_)
    observation = obs_
    scores.append(score)
    eps_history.append(DoubleDQNAgent.epsilon)

if i % 100 == 0:
    avg_score = np.mean(scores[-100:])
    print('episode ', i, 'score %.1f avg score %.1f epsilon %.2f' % (score, avg_score, DoubleDQNAgent.epsilon))


In [None]:
# Plot the rewards and epsilon decay over time
window_size = 20
i = 0
moving_averages = [-500]*window_size

while i < len(scores) - window_size + 1:
    window_average = round(np.sum(scores[i:i+window_size]) / window_size, 2)
    moving_averages.append(window_average)
    i += 1

fig,ax1 = plt.subplots(figsize=(15,5))
plt.title('Epsilon Decay and Rewards through time')
color = 'tab:red'
ax1.set_xlabel('Episodes')
ax1.set_ylabel('Rewards', color=color)
ax1.plot(scores, color=color)
ax1.plot(moving_averages, color='magenta')
ax1.tick_params(axis='y', labelcolor=color)

ax2 = ax1.twinx()
color = 'tab:blue'
ax2.set_ylabel('Epsilon Decay', color=color)
ax2.plot(eps_history, color=color)
ax2.tick_params(axis='y', labelcolor=color)

fig.tight_layout()
plt.show()

In [None]:
class DuelingDQN(nn.Module):
    def __init__(self, n_states, n_actions, hidden_size, lr):
        super(DuelingDQN, self).__init__()

        self.fc1 = nn.Linear(n_states, hidden_size)
        self.fc2 = nn.Linear(hidden_size, hidden_size)

        self.advantage_fc1 = nn.Linear(hidden_size, hidden_size)
        self.advantage_fc2 = nn.Linear(hidden_size, n_actions)

        self.value_fc1 = nn.Linear(hidden_size, hidden_size)
        self.value_fc2 = nn.Linear(hidden_size, 1)

        self.optimizer = optim.Adam(self.parameters(), lr=lr)
        self.loss = nn.MSELoss()
        self.device = T.device('cuda:0' if T.cuda.is_available() else 'cpu')
        self.to(self.device)

    def forward(self, state):
        x = F.relu(self.fc1(state))
        x = F.relu(self.fc2(x))

        advantage = F.relu(self.advantage_fc1(x))
        advantage = self.advantage_fc2(advantage)

        value = F.relu(self.value_fc1(x))
        value = self.value_fc2(value)

        q_values = value + advantage - advantage.mean()
        return q_values


In [None]:

class DuelingDQNAgent():
    def __init__(self, n_states, n_actions, hidden_size, lr, gamma=0.99, epsilon=1.0, eps_min=0.01, eps_dec=0.9995):
        self.lr = lr
        self.n_states = n_states
        self.n_actions = n_actions
        self.gamma = gamma
        self.epsilon = epsilon
        self.eps_dec = eps_dec
        self.eps_min = eps_min
        self.action_space = np.arange(self.n_actions)

        self.Q_eval = DuelingDQN(self.n_states, self.n_actions, hidden_size, self.lr)

    def choose_action(self, observation):
        if np.random.random() > self.epsilon:
            state = T.tensor([observation], dtype=T.float32).to(self.Q_eval.device)
            actions = self.Q_eval.forward(state)
            action = T.argmax(actions).item()
        else:
            action = np.random.choice(self.action_space)
        return action

    def decrement_epsilon(self):
        self.epsilon = self.epsilon*self.eps_dec if self.epsilon > self.eps_min else self.eps_min

    def learn(self, state, action, reward, state_):
        self.Q_eval.optimizer.zero_grad()

        states = T.tensor([state], dtype=T.float32).to(self.Q_eval.device)
        actions = T.tensor([action]).to(self.Q_eval.device)
        rewards = T.tensor([reward]).to(self.Q_eval.device)
        states_ = T.tensor([state_], dtype=T.float32).to(self.Q_eval.device)

        q_pred = self.Q_eval.forward(states)[actions]
        q_next = self.Q_eval.forward(states_).max()
        q_target = rewards + self.gamma*q_next

        loss = self.Q_eval.loss(q_target, q_pred).to(self.Q_eval.device)
        loss.backward()
        self.Q_eval.optimizer.step()
        self.decrement_epsilon()





In [None]:

env = gym.make('CarRacing-v2')
n_states = env.observation_space.shape[0]
n_actions = 3  # left, right, gas
hidden_size = 256
lr = 0.001
gamma = 0.99
epsilon = 1.0
eps_dec = 0.9995
eps_min = 0.01



In [None]:
agent = DuelingDQNAgent(n_states, n_actions, hidden_size, lr, gamma, epsilon, eps_min, eps_dec)

n_games = 500
scores = []
eps_history = []

for i in range(n_games):
    done = False
    score = 0
    observation = env.reset()
    while not done:
        action = agent.choose_action(observation)
        obs_, reward, done, info = env.step(action)
        score += reward
        agent.learn(observation, action, reward, obs_)
        observation = obs_
    scores.append(score)
    eps_history.append(agent.epsilon)

    if i % 100 == 0:
        avg_score = np.mean(scores[-100:])
        print('episode ', i, 'score %.1f avg score %.1f epsilon %.2f' % (score, avg_score, agent.epsilon))


In [None]:

# Plot the rewards and epsilon decay over time
window_size = 20
i = 0
moving_averages = [-500]*window_size

while i < len(scores) - window_size + 1:
    window_average = round(np.sum(scores[i:i+window_size]) / window_size, 2)
    moving_averages.append(window_average)
    i += 1

fig,ax1 = plt.subplots(figsize=(15,5))
plt.title('Epsilon Decay and Rewards through time')
color = 'tab:red'
ax1.set_xlabel('Episodes')
ax1.set_ylabel('Rewards', color=color)
ax1.plot(scores, color=color)
ax1.plot(moving_averages, color='magenta')
ax1.tick_params(axis='y', labelcolor=color)

ax2 = ax1.twinx()
color = 'tab:blue'
ax2.set_ylabel('Epsilon Decay', color=color)
ax2.plot(eps_history, color=color)
ax2.tick_params(axis='y', labelcolor=color)

fig.tight_layout()
plt.show()