In [1]:
import math
import random
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple, deque
from itertools import count

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

from environment import TicTacToe3D

ModuleNotFoundError: No module named 'environment'

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward'))

class ReplayMemory(object):
  def __init__(self, capacity):
    self.capacity = capacity
    self.memory = deque(maxlen=capacity)
  
  def push(self, *args):
    self.memory.append(Transition(*args))
  
  def sample(self, batch_size):
    return random.sample(self.memory, batch_size)

  def __len__(self):
    return len(self.memory)

In [None]:
class DQN(nn.Module):
  def __init__(self, n_observations, n_actions, n_hidden=128):
    super(DQN, self).__init__()
    self.n_observations = n_observations
    self.n_actions = n_actions

    self.layer1 = nn.Linear(n_observations, n_hidden)
    self.layer2 = nn.Linear(n_hidden, n_hidden)
    self.layer3 = nn.Linear(n_hidden, n_actions)

  def forward(self, x):
    x = F.relu(self.layer1(x))
    x = F.relu(self.layer2(x))
    x = self.layer3(x)
    return x

In [None]:
class QAgent:
  def __init__(self, logging=False, training=False, weight_path=None):
    self.training = training

    self.n_observations = 4*4*4
    self.n_actions = 16

    self.policy_net = DQN(self.n_observations, self.n_actions).to(device)
    self.target_net = DQN(self.n_observations, self.n_actions).to(device)
    self.target_net.load_state_dict(self.policy_net.state_dict())

    if training:
      self.BATCH_SIZE = 128
      self.memory = ReplayMemory(10000)
      self.steps_done = 0

      self.GAMMA = 0.99
      self.EPS_START = 0.9
      self.EPS_END = 0.05
      self.EPS_DECAY = 1000

      self.LR = 1e-4
      self.TAU = 0.005

      self.optimizer = optim.AdamW(self.policy_net.parameters(), lr=self.LR, amsgrad=True)
    else:
      self.policy_net.eval()
      self.target_net.eval()

      if weight_path is not None:
        self.load_weights(weight_path)  # Load weights if path is provided

  def load_weights(self, path):
    state_dict = torch.load(path)
    self.policy_net.load_state_dict(state_dict)
    self.target_net.load_state_dict(state_dict)
    print("Weights loaded successfully from", path)
  
  def save_weights(self, path):
    torch.save(self.policy_net.state_dict(), path)
    print("Weights saved successfully to", path)
  
  def create_indicator_array(self, coords, num_rows=4, num_cols=4):
    indicator_array = torch.zeros(16)
    
    for row, col in coords:
      index = row * num_cols + col
      indicator_array[index] = 1
    
    return indicator_array
  
  def creat_index_array(self, coords, num_rows=4, num_cols=4):
    index_array = []
    
    for row, col in coords:
      index = row * num_cols + col
      index_array.append(index)
    
    return torch.tensor(index_array, device=device)

  def findBestMove(self, board, possible_move, player):
    if len(possible_move) == 0:
      return None
    
    if self.training:
      sample = random.random()
      eps_threshold = self.EPS_END + (self.EPS_START - self.EPS_END) * math.exp(-1. * self.steps_done / self.EPS_DECAY)
      self.steps_done += 1

      if sample > eps_threshold:
        self.policy_net.eval()
        self.target_net.eval()

        with torch.no_grad():
          # Change the board to the player's perspective
          current_board = board * player
          state = torch.tensor(current_board, dtype=torch.float32).to(device)
          score = self.policy_net(state) * self.create_indicator_array(possible_move)
          return score.max(1).indices.view(1, 1)
      else:
        return torch.tensor([[random.choice(self.creat_index_array(possible_move))]], device=device, dtype=torch.long)
    else:
      with torch.no_grad():
        # Change the board to the player's perspective
        current_board = board * player
        state = torch.tensor(current_board, dtype=torch.float32).to(device)
        score = self.policy_net(state) * self.create_indicator_array(possible_move)
        return score.max(1).indices.view(1, 1)
  
  def optimize_model(self):
    self.policy_net.train()
    self.target_net.train()

    if len(self.memory) < self.BATCH_SIZE:
      return
    transitions = self.memory.sample(self.BATCH_SIZE)
    batch = Transition(*zip(*transitions))

    non_final_mask = torch.tensor(tuple(map(lambda s: s is not None, batch.next_state)), device=device, dtype=torch.bool)
    non_final_next_states = torch.cat([s for s in batch.next_state if s is not None])

    state_batch = torch.cat(batch.state)
    action_batch = torch.cat(batch.action)
    reward_batch = torch.cat(batch.reward)

    state_action_values = self.policy_net(state_batch).gather(1, action_batch)

    next_state_values = torch.zeros(self.BATCH_SIZE, device=device)
    with torch.no_grad():
      next_state_values[non_final_mask] = self.target_net(non_final_next_states).max(1).values
    expected_state_action_values = (next_state_values * self.GAMMA) + reward_batch

    loss = F.smooth_l1_loss(state_action_values, expected_state_action_values.unsqueeze(1))

    self.optimizer.zero_grad()
    loss.backward()
    torch.nn.utils.clip_grad_value_(self.policy_net.parameters(), 100)
    # for param in self.policy_net.parameters():
    #   param.grad.data.clamp_(-1, 1)
    self.optimizer.step()