In [1]:
import env.env as env
import gym
import numpy as np
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

from itertools import combinations

env = gym.make('MillionDoubtEnv-v0').unwrapped

# if gpu is to be used
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

  "Initializing wrapper in old step API which returns one bool instead of two. It is recommended to set `new_step_api=True` to use new step API. This will be the default behaviour in future."
  "Initializing environment in old step API which returns one bool instead of two. It is recommended to set `new_step_api=True` to use new step API. This will be the default behaviour in future."


In [2]:
import os
import datetime
import math
import random
from collections import namedtuple
from itertools import count
from tqdm import tqdm_notebook as tqdm

リプレイメモリを管理するクラス。
メモリに格納する経験データ（transition）のタプルをnamedtupleで定義。

通常は、経験データは{現在の状態、選択した行動、次の状態、報酬}であるが、~~効率化のために次の状態の合法手の一覧も格納するようにする~~。

In [3]:
######################################################################
# Replay Memory

Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward'))


class ReplayMemory(object):

    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = []
        self.position = 0

    def push(self, *args):
        """Saves a transition."""
        if len(self.memory) < self.capacity:
            self.memory.append(None)
        self.memory[self.position] = Transition(*args)
        self.position = (self.position + 1) % self.capacity

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

In [4]:
######################################################################
# DQN
# 10層の畳み込みニューラルネットワークを全結合層に接続。
# 活性化関数: tanh >> 行動価値(-1 ~ 1)を出力

# TODO k...特徴量の合計サイズ
k = 74
fcl_units = 256
num_actions = sum((act.n) for act in env.action_space.values())
class DQN(nn.Module):

    def __init__(self):
        super(DQN, self).__init__()
        self.conv1 = nn.Conv2d(1, k, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(k)
        self.conv2 = nn.Conv2d(k, k, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(k)
        self.conv3 = nn.Conv2d(k, k, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm2d(k)
        self.conv4 = nn.Conv2d(k, k, kernel_size=3, padding=1)
        self.bn4 = nn.BatchNorm2d(k)
        self.conv5 = nn.Conv2d(k, k, kernel_size=3, padding=1)
        self.bn5 = nn.BatchNorm2d(k)
        self.conv6 = nn.Conv2d(k, k, kernel_size=3, padding=1)
        self.bn6 = nn.BatchNorm2d(k)
        self.conv7 = nn.Conv2d(k, k, kernel_size=3, padding=1)
        self.bn7 = nn.BatchNorm2d(k)
        self.conv8 = nn.Conv2d(k, k, kernel_size=3, padding=1)
        self.bn8 = nn.BatchNorm2d(k)
        self.conv9 = nn.Conv2d(k, k, kernel_size=3, padding=1)
        self.bn9 = nn.BatchNorm2d(k)
        self.conv10 = nn.Conv2d(k, k, kernel_size=3, padding=1)
        self.bn10 = nn.BatchNorm2d(k)
        # TODO
        self.fcl1 = nn.Linear(k * 74, fcl_units)
        self.fcl2 = nn.Linear(fcl_units, num_actions)

    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = F.relu(self.bn3(self.conv3(x)))
        x = F.relu(self.bn4(self.conv4(x)))
        x = F.relu(self.bn5(self.conv5(x)))
        x = F.relu(self.bn6(self.conv6(x)))
        x = F.relu(self.bn7(self.conv7(x)))
        x = F.relu(self.bn8(self.conv8(x)))
        x = F.relu(self.bn9(self.conv9(x)))
        x = F.relu(self.bn10(self.conv10(x)))
        # TODO 
        # print(x.shape)
        x = F.relu(self.fcl1(x.view(-1, k * 74)))
        x = self.fcl2(x)
        return x.tanh()

In [5]:
def obs_to_tensor(obs):

    player_hand_tensor = torch.tensor(obs['player_hand'], dtype=torch.float32).view(1, 28)
    opponent_hand_len_tensor = torch.tensor([obs['opponent_hand_len']], dtype=torch.float32).view(1, 1)
    field_tensor = torch.tensor(obs['field'], dtype=torch.float32).view(1, 39)
    phase_type_tensor = torch.tensor([obs['phase_type']], dtype=torch.float32).view(1, 1)
    is_revolution_tensor = torch.tensor([obs['is_revolution']], dtype=torch.float32).view(1, 1)
    restricted_suits_tensor = torch.tensor([obs['restricted_suits']], dtype=torch.float32).view(1, 4)
    
    # Concatenate all tensors along the second dimension to create a single input tensor
    input_tensor = torch.cat((player_hand_tensor, opponent_hand_len_tensor, field_tensor, phase_type_tensor, is_revolution_tensor, restricted_suits_tensor), dim=1)
    state = input_tensor.view(1, 1, 2, 37).to(device)

    return state


In [6]:
######################################################################
# Training
# 訓練に使用するハイパーパラメータの設定。ニューラルネットワーク、オプティマイザ、リプレイメモリの初期化

# εグリーディー方策で選ぶ関数を定義

from re import L


BATCH_SIZE = 256
GAMMA = 0.99
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 2000
OPTIMIZE_PER_EPISODES = 16
TARGET_UPDATE = 4

policy_net = DQN().to(device)
target_net = DQN().to(device)
target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

optimizer = optim.RMSprop(policy_net.parameters(), lr=1e-5)

memory = ReplayMemory(131072)

def epsilon_greedy(state, legal_moves):
    sample = random.random()
    eps_threshold = EPS_END + (EPS_START - EPS_END) * \
        math.exp(-1. * episodes_done / EPS_DECAY)

    if sample > eps_threshold:
        with torch.no_grad():
            q = policy_net(state)
            # print(q.shape) # NOTE
            if any(isinstance(sublist, list) for sublist in legal_moves):
                legal_moves = convert_legal_moves(legal_moves)
            print(legal_moves)
            _, select = q[0, legal_moves].max(0)
    else:
        select = random.randrange(len(legal_moves))
        
    return select

# TODO dict
def select_action(obs, game, player):
    action = {
        'play_card': np.empty([0]),
        'play_card_back': np.empty([0]),
        'doubt': np.empty([0]),
        'select_card': np.empty([0]),
    }
    
    legal_moves = []
    state = obs_to_tensor(obs)
    
    if obs['phase_type'] == 1:
        legal_moves = game.searching_legal_move(player)
        
        select = epsilon_greedy(state, legal_moves)
        selected_move = legal_moves[select]
        action['play_card'] = selected_move[0]
        action['play_card_back'] = selected_move[1]
        selected_move = convert_array(selected_move)
        selected_move.append([0] * 13)
        selected_move.append([0] * 13)

# TODO
    elif obs['phase_type'] == 2:
        legal_moves = [0, 1]
        
        select = epsilon_greedy(state, legal_moves)
        action['doubt'] = legal_moves[select]
        selected_move = [[0] * 13]
        selected_move.append([0] * 13)
        converted_move = [legal_moves[select]]
        converted_move.extend([0] * 12)
        selected_move.append(converted_move)
        selected_move.append([0] * 13)

# select card(sample())
    elif obs['phase_type'] == 3:
        indexed_list = list(range(len(game.field)))
        for i in range(len(game.field)):
            for subset in combinations(indexed_list, i):
                legal_moves.append(list(subset))

        select = epsilon_greedy(state, legal_moves)
        selected_move = legal_moves[select]
        action['select_card'] = selected_move
        selected_move = [[0] * 13]
        selected_move.append([0] * 13)
        selected_move.append([0] * 13)
        converted_move = convert_array(legal_moves[select])
        selected_move.append(converted_move)
    
    else:
        selected_move = [[0] * 13]
        selected_move.append([0] * 13)
        selected_move.append([0] * 13)
        selected_move.append([0] * 13)
        
        print("NotImplemented")

    # print(obs['phase_type']) # NOTE
    # print(selected_move) # NOTE
    tensor_move = torch.tensor(selected_move, device=device, dtype=torch.long)
    
    return action, tensor_move

def convert_array(input_list):
    if any(isinstance(sublist, list) for sublist in input_list):
    # 1つ目のサブリストの変換
        first_sublist = [1 if i in input_list[0] else 0 for i in range(13)]
    
    # 2つ目のサブリストの変換
        second_sublist = [first_sublist[i] for i in input_list[1]]
        second_sublist = [1 if i in second_sublist else 0 for i in range(13)]
    
        return [first_sublist, second_sublist]
    
    else:
        lst = [1 if i in input_list else 0 for i in range(13)]
    
    return lst

def flatten_nested_list(lst):
    flat_list = []
    for item in lst:
        if isinstance(item, list):
            flat_list.extend(item)
        else:
            flat_list.append(item)
    return flat_list

def convert_legal_moves(lst):
    combined_list = [flatten_nested_list(sublist) for sublist in lst]
    converted_list = [int("0".join(map(str, sublist))) if sublist else 0 for sublist in combined_list]
    return converted_list


In [7]:
######################################################################
# Training loop

losses = []

def optimize_model():
    if len(memory) < BATCH_SIZE:
        return
    transitions = memory.sample(BATCH_SIZE)
    # Transpose the batch (see https://stackoverflow.com/a/19343/3343043 for
    # detailed explanation). This converts batch-array of Transitions
    # to Transition of batch-arrays.
    batch = Transition(*zip(*transitions))

    # Compute a mask of non-final states and concatenate the batch elements
    # (a final state would've been the one after which simulation ended)
    non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
                                          batch.next_state)), device=device, dtype=torch.bool)
    non_final_next_states = torch.cat([s for s in batch.next_state
                                                if s is not None])
    state_batch = torch.cat(batch.state)
    action_batch = torch.cat(batch.action)
    reward_batch = torch.cat(batch.reward)

    # 合法手のみ
    non_final_next_actions_list = []
    for next_actions in batch.next_actions:
        if next_actions is not None:
            non_final_next_actions_list.append(next_actions + [next_actions[0]] * (30 - len(next_actions)))
    non_final_next_actions = torch.tensor(non_final_next_actions_list, device=device, dtype=torch.long)

    # Compute Q(s_t, a) - the model computes Q(s_t), then we select the
    # columns of actions taken. These are the actions which would've been taken
    # for each batch state according to policy_net
    state_action_values = policy_net(state_batch).gather(1, action_batch)

    # Compute V(s_{t+1}) for all next states.
    # Expected values of actions for non_final_next_states are computed based
    # on the "older" target_net; selecting their best reward with max(1)[0].
    # This is merged based on the mask, such that we'll have either the expected
    # state value or 0 in case the state was final.
    next_state_values = torch.zeros(BATCH_SIZE, device=device)
    # 合法手のみの最大値
    target_q = target_net(non_final_next_states)
    # 相手番の価値のため反転する
    next_state_values[non_final_mask] = -target_q.gather(1, non_final_next_actions).max(1)[0].detach()
    # Compute the expected Q values
    expected_state_action_values = next_state_values * GAMMA + reward_batch

    # Compute Huber loss
    loss = F.smooth_l1_loss(state_action_values, expected_state_action_values.unsqueeze(1))

    losses.append(loss.item())

    # Optimize the model
    optimizer.zero_grad()
    loss.backward()
    for param in policy_net.parameters():
        param.grad.data.clamp_(-1, 1)
    optimizer.step()

In [8]:
######################################################################
# main training loop

num_episodes = 10000
episodes_done = 0
pbar = tqdm(total=num_episodes)
for i_episode in range(num_episodes):
    # Initialize the environment and state
    env.reset()

    for t in count():
        obs0, obs1 = env.update_obs()
        next_obs0 = []
        next_obs1 = []
        
        # Select and perform an action
        if (env.game.turn and env.game.my_phase == "play") or (not env.game.turn and not env.game.my_phase == "play"):
            move, action = select_action(obs0, env.game, env.game.player0)
            next_obs0, reward0, done, info = env.step(move)

            reward0 = torch.tensor([reward0], device=device)
            
            # Store the transition in memory
            memory.push(obs0, action, next_obs0, reward0)
        else:
            move, action = select_action(obs1, env.game, env.game.player0)
            next_obs1, reward1, done, info = env.step(move)

            reward1 = torch.tensor([reward1], device=device)
            
            # memory.push(obs0, action, next_obs0, reward0)


        if done:
            break

        # Move to the next state
        obs0 = next_obs0
        obs1 = next_obs1

    episodes_done += 1
    pbar.update()

    if i_episode % OPTIMIZE_PER_EPISODES == OPTIMIZE_PER_EPISODES - 1:
        # Perform several episodes of the optimization (on the target network)
        optimize_model()

        pbar.set_description(f'loss = {losses[-1]:.3e}')

        # Update the target network, copying all weights and biases in DQN
        if i_episode // OPTIMIZE_PER_EPISODES % TARGET_UPDATE == 0:
            target_net.load_state_dict(policy_net.state_dict())

modelfile = 'model.pt'
print('save {}'.format(modelfile))
torch.save({'state_dict': target_net.state_dict(), 'optimizer': optimizer.state_dict()}, modelfile)

print('Complete')
env.close()

Please use `tqdm.notebook.tqdm` instead of `tqdm.tqdm_notebook`
  


  0%|          | 0/10000 [00:00<?, ?it/s]

  elif action['doubt']:


your atk turn.
[♠6, ♠K]
MLPlayer(MLPlayer)_1の操作
 
[♡J, ♣Q]
[0, 1]
MLPlayer(MLPlayer)_0の操作
 
[♢6, ♢10]
MLPlayer(MLPlayer)_1の操作
ダウト成功
NotImplemented
 
[♠7, ♠A]
MLPlayer(MLPlayer)_0の操作
 
[0, 10001, 20001, 1020001]


RuntimeError: CUDA error: device-side assert triggered

In [None]:
plt.plot(losses)