In [None]:
#Had partially referred from code on the Internet
import random
import gym
import numpy as np
import torch
import torch.nn.functional as F
import matplotlib.pyplot as plt
import rl_utils
from tqdm import tqdm
from gym import spaces
import torch.nn as nn


class Qnet(torch.nn.Module):
    def __init__(self, state_dim, hidden_dim, action_dim):
        super(Qnet, self).__init__()
        self.fc1 = torch.nn.Linear(state_dim, hidden_dim)
        nn.init.kaiming_normal_(self.fc1.weight, mode='fan_in', nonlinearity='relu')
        self.fc2 = torch.nn.Linear(hidden_dim, hidden_dim)
        nn.init.kaiming_normal_(self.fc2.weight, mode='fan_in', nonlinearity='relu')
        self.fc3 = torch.nn.Linear(hidden_dim, action_dim)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        return self.fc3(x)

In [None]:
class DQN:
    def __init__(self,
                 state_dim,
                 hidden_dim,
                 action_dim,
                 learning_rate,
                 gamma,
                 epsilon,
                 target_update,
                 device,
                 dqn_type='VanillaDQN'):
        self.action_dim = action_dim
        self.q_net = Qnet(state_dim, hidden_dim, self.action_dim).to(device)
        self.target_q_net = Qnet(state_dim, hidden_dim,
                                 self.action_dim).to(device)
        self.optimizer = torch.optim.Adam(self.q_net.parameters(),
                                          lr=learning_rate)
        self.gamma = gamma
        self.epsilon = epsilon
        self.target_update = target_update
        self.count = 0
        self.dqn_type = dqn_type
        self.device = device

    def take_action(self, state):
        if np.random.random() < self.epsilon:
            action = np.random.randint(self.action_dim)
        else:
            state = torch.tensor([state], dtype=torch.float).to(self.device)
            action = self.q_net(state).argmax().item()
        return action

    def max_q_value(self, state):
        state = torch.tensor([state], dtype=torch.float).to(self.device)
        return self.q_net(state).max().item()

    def update(self, transition_dict):
        states = torch.tensor(transition_dict['states'],
                              dtype=torch.float).to(self.device)
        actions = torch.tensor(transition_dict['actions']).view(-1, 1).to(
            self.device)
        rewards = torch.tensor(transition_dict['rewards'],
                               dtype=torch.float).view(-1, 1).to(self.device)
        next_states = torch.tensor(transition_dict['next_states'],
                                   dtype=torch.float).to(self.device)
        dones = torch.tensor(transition_dict['dones'],
                             dtype=torch.float).view(-1, 1).to(self.device)

        q_values = self.q_net(states).gather(1, actions)  
        if self.dqn_type == 'DoubleDQN': 
            max_action = self.q_net(next_states).max(1)[1].view(-1, 1) 
            max_next_q_values = self.target_q_net(next_states).gather(1, max_action)
        else: 
            max_next_q_values = self.target_q_net(next_states).max(1)[0].view(-1, 1)
        q_targets = rewards + self.gamma * max_next_q_values * (1 - dones)  
        dqn_loss = torch.mean(F.mse_loss(q_values, q_targets))  
        self.optimizer.zero_grad()  
        dqn_loss.backward()
        self.optimizer.step()

        if self.count % self.target_update == 0:
            self.target_q_net.load_state_dict(
                self.q_net.state_dict()) 
        self.count += 1

In [None]:
lr = 1e-3
num_episodes = 1000
hidden_dim = 8
gamma = 0.98
epsilon = 0.01
target_update = 50
buffer_size = 5000
minimal_size = 3000
batch_size = 512
device = torch.device("cuda") if torch.cuda.is_available() else torch.device(
    "cpu")
class BlottoGameEnv(gym.Env):
    """
    Blotto Game environment following OpenAI Gym interface
    """

    def __init__(self,N=3,S=5):
        self.N = N  # Number of battlefields
        self.S = S  # Total number of soldiers
        self.strategy_list = sorted(self.recursive(self.S, self.N))
        #self.strategy_list = np.array(sorted([combo for combo in product(range(self.S + 1), repeat=self.N) if sum(combo) == self.S]))
        #print(self.strategy_list)
        self.action_space = spaces.Discrete(len(self.strategy_list))
        self.observation_space = spaces.MultiDiscrete([self.S + 1] * self.N)
        self.state = None
        self.opponent_strategy = self.strategy_list[0]
    def reset(self):
        self.state = random.choice(self.strategy_list)
        return self.state
    def step(self, action, opponent_action):
        player_strategy = self.strategy_list[action]
        self.opponent_strategy = opponent_action
        reward, terminated = self.resolve_round(player_strategy, self.opponent_strategy)
        self.state = self.opponent_strategy
        return self.state, reward, terminated, player_strategy, {}
    def resolve_round(self, player_strategy, opponent_strategy):
        win = 0
        for player, opponent in zip(player_strategy, opponent_strategy):
            if player > opponent:
                win += 1
            elif player < opponent:
                win -= 1
        if win > 0:
            return 1, False  # Player wins
        elif win < 0:
            return -1, True  # Player loses
        else:
            return 0, False  # Tie
    def recursive(self, S, N):
        return [(S,)] if N == 1 else [(i,) + j for i in range(S+1) for j in self.recursive(S-i, N-1)]
env = BlottoGameEnv(N = 3, S = 5)
env_name = 'BlottoGame'
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n

In [None]:
def train_DQN(agent, env, num_episodes, replay_buffer, minimal_size,
              batch_size):
    return_list = []
    max_q_value_list = []
    max_q_value = 0
    for i in range(10):
        with tqdm(total=int(num_episodes / 10),
                  desc='Iteration %d' % i) as pbar:
            for i_episode in range(int(num_episodes / 10)):
                episode_return = 0
                state = env.reset()
                done = False
                upper_limit = 0
                while not done:
                    upper_limit += 1
                    action = agent.take_action(state)
                    max_q_value = agent.max_q_value(
                        state) * 0.005 + max_q_value * 0.995  
                    max_q_value_list.append(max_q_value) 
                    opponent_action = random.choice(env.strategy_list)
                    next_state, reward, done, _, _ = env.step(action, opponent_action)
                    done = episode_return < 0 or upper_limit > 100
                    replay_buffer.add(state, action, reward, next_state, done)
                    state = next_state
                    episode_return += reward
                    if replay_buffer.size() > minimal_size:
                        b_s, b_a, b_r, b_ns, b_d = replay_buffer.sample(
                            batch_size)
                        transition_dict = {
                            'states': b_s,
                            'actions': b_a,
                            'next_states': b_ns,
                            'rewards': b_r,
                            'dones': b_d
                        }
                        agent.update(transition_dict)
                return_list.append(episode_return)
                if (i_episode + 1) % 10 == 0:
                    pbar.set_postfix({
                        'episode':
                        '%d' % (num_episodes / 10 * i + i_episode + 1),
                        'return':
                        '%.3f' % np.mean(return_list[-10:])
                    })
                pbar.update(1)
    return return_list, max_q_value_list

In [None]:
replay_buffer = rl_utils.ReplayBuffer(buffer_size)
agent = DQN(state_dim, hidden_dim, action_dim, lr, gamma, epsilon,
            target_update, device, dqn_type='DoubleDQN')
return_list, max_q_value_list = train_DQN(agent, env, num_episodes,
                                          replay_buffer, minimal_size,
                                          batch_size)
episodes_list = list(range(len(return_list)))
mv_return = rl_utils.moving_average(return_list, 5)
plt.plot(episodes_list, mv_return)
plt.xlabel('Episodes')
plt.ylabel('Returns')
plt.title('DQN on {}'.format(env_name))
plt.show()

frames_list = list(range(len(max_q_value_list)))
plt.plot(frames_list, max_q_value_list)
plt.axhline(0, c='orange', ls='--')
plt.axhline(10, c='red', ls='--')
plt.xlabel('Frames')
plt.ylabel('Q value')
plt.title('DQN on {}'.format(env_name))
plt.show()

In [None]:
from scipy.signal import savgol_filter
episodes_list = list(range(len(return_list)))
mv_return = rl_utils.moving_average(return_list, 5)
#y_smooth = savgol_filter(return_list, 51, 3)
window_size = 50
rolling_mean = np.convolve(mv_return, np.ones(window_size)/window_size, mode='valid')
rolling_x = episodes_list[(window_size-1):]
plt.plot(episodes_list, mv_return)
plt.plot(rolling_x, rolling_mean, label='Rolling Mean', color='darkorange', linewidth=2, linestyle='-')
#plt.plot(episodes_list, mv_return)
#plt.plot(episodes_list, y_smooth, label='Trend Line', color='red', linewidth=2)
plt.xlabel('Episodes')
plt.ylabel('Returns')
plt.title('DQN on {}'.format(env_name))
plt.show()

frames_list = list(range(len(max_q_value_list)))
plt.plot(frames_list, max_q_value_list)
plt.axhline(0, c='orange', ls='--')
plt.axhline(10, c='red', ls='--')
plt.xlabel('Frames')
plt.ylabel('Q value')
plt.title('DQN on {}'.format(env_name))
plt.show()