https://github.com/jackdawkins11/pytorch-alpha-zero

# 게임 상태 준비

In [None]:
# 패키지 임포트
import random
import math

# 게임 상태
class State:
    # 초기화
    def __init__(self, pieces=None, enemy_pieces=None):
        # 돌의 배치
        self.pieces = pieces if pieces != None else [0] * 9
        self.enemy_pieces = enemy_pieces if enemy_pieces != None else [0] * 9

    # 돌의 수 얻기
    def piece_count(self, pieces):
        return sum(1 for i in pieces if i == 1)

    # 패배 여부 판정
    def is_lose(self):
        # 돌 3개 연결 여부 확인
        def is_comp(x, y, dx, dy):
            for k in range(3):
                if y < 0 or 2 < y or x < 0 or 2 < x or \
                        self.enemy_pieces[x + y * 3] == 0:
                    return False
                x, y = x + dx, y + dy
            return True

        # 패배 여부 판정
        if is_comp(0, 0, 1, 1) or is_comp(0, 2, 1, -1):
            return True
        for i in range(3):
            if is_comp(0, i, 1, 0) or is_comp(i, 0, 0, 1):
                return True
        return False

    # 무승부 여부 확인
    def is_draw(self):
        return self.piece_count(self.pieces) + self.piece_count(self.enemy_pieces) == 9

    # 게임 종료 여부 확인
    def is_done(self):
        return self.is_lose() or self.is_draw()

    # 다음 상태 얻기
    def next(self, action):
        pieces = self.pieces.copy()
        pieces[action] = 1
        return State(self.enemy_pieces, pieces)

    # 합법적인 수의 리스트 얻기
    def legal_actions(self):
        return [i for i in range(9) if self.pieces[i] == 0 and self.enemy_pieces[i] == 0]

    # 선 수 여부 확인
    def is_first_player(self):
        return self.piece_count(self.pieces) == self.piece_count(self.enemy_pieces)

    # 문자열 표시
    def __str__(self):
        ox = ('o', 'x') if self.is_first_player() else ('x', 'o')
        str = ''
        for i in range(9):
            if self.pieces[i] == 1:
                str += ox[0]
            elif self.enemy_pieces[i] == 1:
                str += ox[1]
            else:
                str += '-'
            if i % 3 == 2:
                str += '\n'
        return str


# 랜덤으로 행동 선택
def random_action(state):
    legal_actions = state.legal_actions()
    return legal_actions[random.randint(0, len(legal_actions) - 1)]


# 알파베타법을 활용한 상태 가치 계산
def alpha_beta(state, alpha, beta):
    # 패배 시 상태 가치 -1
    if state.is_lose():
        return -1

    # 무승부 시, 상테 가치 0
    if state.is_draw():
        return 0

    # 합법적인 수의 상태 가치 계산
    for action in state.legal_actions():
        score = -alpha_beta(state.next(action), -beta, -alpha)
        if score > alpha:
            alpha = score

        # 현재 노드의 베스트 스코어가 부모 노드보다 크면 탐색 종료
        if alpha >= beta:
            return alpha

    # 합법적인 수의 강태 가치의 최대값을 반환
    return alpha


# 알파베타법을 활용한 생동 선택
def alpha_beta_action(state):
    # 합법적인 수의 상태 가치 계산
    best_action = 0
    alpha = -float('inf')
    for action in state.legal_actions():
        score = -alpha_beta(state.next(action), -float('inf'), -alpha)
        if score > alpha:
            best_action = action
            alpha = score

    # 합법적인 수의 상태 가치의 최대값을 갖는 행동 반환
    return best_action


# 플레이아웃
def playout(state):
    # 패배 시, 상태 가치 -1
    if state.is_lose():
        return -1

    # 무승부 시, 상태 가치 0
    if state.is_draw():
        return 0

    # 다음 상태의 상태 가치
    return -playout(state.next(random_action(state)))


# 최대값의 인덱스 반환
def argmax(collection):
    return collection.index(max(collection))


# 몬테카를로 트리 탐색을 활용한 행동 선택
def mcts_action(state):
    # 몬테카를로 트리 탐색 노드
    class node:
        # 초기화
        def __init__(self, state):
            self.state = state  # 상태
            self.w = 0  # 가치 누계
            self.n = 0  # 시행 횟수
            self.child_nodes = None  # 자녀 노드군

        # 평가
        def evaluate(self):
            # 게임 종료 시
            if self.state.is_done():
                # 승패 결과로 가치 얻기
                value = -1 if self.state.is_lose() else 0  # 패배 시 -1, 무승부 시 0

                # 가치 누계와 시행 횟수 갱신
                self.w += value
                self.n += 1
                return value

            # 자녀 노드가 존재하지 않는 경우
            if not self.child_nodes:
                # 플레이아웃으로 가치 얻기
                value = playout(self.state)

                # 가치 누계와 시행 횟수 갱신
                self.w += value
                self.n += 1

                # 자녀 노드 전개
                if self.n == 10:
                    self.expand()
                return value

            # 자녀 노드가 존재하는 경우
            else:
                # UCB1이 가장 큰 자녀 노드를 평가해 가치 얻기
                value = -self.next_child_node().evaluate()

                # 보상 누계와 시행 횟수 갱신
                self.w += value
                self.n += 1
                return value

        # 자녀 노드 전개
        def expand(self):
            legal_actions = self.state.legal_actions()
            self.child_nodes = []
            for action in legal_actions:
                self.child_nodes.append(node(self.state.next(action)))

        # UCB1가 가장 큰 자녀 노드 얻기
        def next_child_node(self):
            # 시행 횟수 n이 0인 자녀 노드를 반환
            for child_node in self.child_nodes:
                if child_node.n == 0:
                    return child_node

            # UCB1 계산
            t = 0
            for c in self.child_nodes:
                t += c.n
            ucb1_values = []
            for child_node in self.child_nodes:
                ucb1_values.append(-child_node.w / child_node.n + 2 * (2 * math.log(t) / child_node.n) ** 0.5)

            # UCB1가 가장 큰 자녀 노드를 반환
            return self.child_nodes[argmax(ucb1_values)]

    # 루트 노드 생성
    root_node = node(state)
    root_node.expand()

    # 루트 노드를 100회 평가
    for _ in range(100):
        root_node.evaluate()

    # 시행 횟수 최대값을 갖는 행동 반환
    legal_actions = state.legal_actions()
    n_list = []
    for c in root_node.child_nodes:
        n_list.append(c.n)
    return legal_actions[argmax(n_list)]


# 동작 확인
if __name__ == '__main__':
    # 상태 생성
    state = State()

    # 게임 종료 시까지 반복
    while True:
        # 게임 종료 시
        if state.is_done():
            break

        # 다음 상태 얻기
        state = state.next(random_action(state))

        # 문자열 표시
        print(state)
        print()

---
---
o--


-x-
---
o--


-xo
---
o--


xxo
---
o--


xxo
---
oo-


xxo
---
oox


xxo
--o
oox


xxo
-xo
oox




# 듀얼 네트워크 생성

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import os

# 파라미터 준비
FILTERS = 128  # 컨볼루션 레이어 커널 수(오리지널 256）
RESIDUAL_NUM = 16  # 레지듀얼 블록 수(오리지널 19)
INPUT_SHAPE = (3,3,2)  # 입력 셰이프
OUTPUT_SIZE = 9  # 행동 수(배치 수(3*3))

# 레지듀얼 블록 생성
class ResidualBlock(nn.Module):
    def __init__(self, filters):
        super(ResidualBlock,self).__init__()
        self.conv1 = nn.Conv2d(filters, filters, kernel_size=3, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(filters)
        self.conv2 = nn.Conv2d(filters, filters, kernel_size=3, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(filters)

    def forward(self, x):
        sc = x
        x = F.relu(self.bn1(self.conv1(x)))
        x = self.bn2(self.conv2(x))
        x += sc
        x = F.relu(x)
        return x


# 듀얼 네트워크 생성
class DualNetwork(nn.Module):
    def __init__(self, input_shape, output_size, filters=128, residual_num=16):
        super(DualNetwork, self).__init__()
        self.input_shape = input_shape
        self.input_conv = nn.Conv2d(input_shape[0], filters, kernel_size=3, padding=1, bias=False)
        self.input_bn = nn.BatchNorm2d(filters)
        self.residual_blocks = nn.Sequential(*[ResidualBlock(filters) for _ in range(residual_num)])
        self.global_pooling = nn.AdaptiveAvgPool2d(1)
        self.policy_head = nn.Linear(filters, output_size)
        self.value_head = nn.Linear(filters, 1)

    def forward(self, x):
        x = F.relu(self.input_bn(self.input_conv(x)))
        x = self.residual_blocks(x)
        x = self.global_pooling(x)  # [batch_size, filters, 1, 1]
        x = x.view(x.size(0), -1)  # [batch_size, filters]

        # Policy head
        policy = F.softmax(self.policy_head(x), dim=1)

        # Value head
        value = torch.tanh(self.value_head(x))

        return policy, value


# 모델 저장 함수
def save_dual_network(model, path='./model/best.pth'):
    os.makedirs(os.path.dirname(path), exist_ok=True)
    torch.save(model.state_dict(), path)


# 모델 로드 함수
def load_dual_network(input_shape, output_size, filters=128, residual_num=16, path='./model/best.pth'):
    model = DualNetwork(input_shape, output_size, filters, residual_num)
    if os.path.exists(path):
        model.load_state_dict(torch.load(path))
    return model


# 동작 확인
if __name__ == '__main__':
    # 모델 생성
    input_shape =INPUT_SHAPE # [channels, height, width]
    output_size = OUTPUT_SIZE
    model = DualNetwork(input_shape, output_size, FILTERS, RESIDUAL_NUM)

    # 모델 저장
    save_dual_network(model)

    # 모델 로드
    loaded_model = load_dual_network(input_shape, output_size)
    print("모델이 성공적으로 생성 및 저장/로드되었습니다.")

모델이 성공적으로 생성 및 저장/로드되었습니다.


  model.load_state_dict(torch.load(path))


# 몬테카를로 트리 탐색 구현

In [None]:
# 패키지 임포트
import torch
import torch.nn.functional as F
import numpy as np
from pathlib import Path
from math import sqrt

# 파라미터 준비
PV_EVALUATE_COUNT = 50  # 추론 1회당 시뮬레이션 횟수(오리지널: 1600회)


# 추론
def predict(model, state, device):
    # 추론을 위한 입력 데이터 셰이프 변환
    a, b, c = INPUT_SHAPE
    x = np.array([state.pieces, state.enemy_pieces])
    x = x.reshape(c, a, b).transpose(1, 2, 0).reshape(1, a, b, c)

    # 텐서로 변환 및 GPU 이동
    x = torch.tensor(x, dtype=torch.float32).to(device)

    # 추론
    with torch.no_grad():
        policy, value = model(x)

    # 정책 얻기
    policies = policy.squeeze(0).cpu().numpy()[list(state.legal_actions())]  # 합법적인 수만 선택
    policies /= sum(policies) if sum(policies) else 1  # 합계 1의 확률 분포로 변환

    # 가치 얻기
    value = value.squeeze(0).item()
    return policies, value

# 노드 리스트를 시행 횟수 리스트로 변환
def nodes_to_scores(nodes):
    scores = [node.n for node in nodes]
    return scores


# 몬테카를로 트리 탐색 스코어 얻기
def pv_mcts_scores(model, state, temperature, device):
    # 몬테카를로 트리 탐색 노드 정의
    class Node:
        # 노드 초기화
        def __init__(self, state, p):
            self.state = state  # 상태
            self.p = p  # 정책
            self.w = 0  # 가치 누계
            self.n = 0  # 시행 횟수
            self.child_nodes = None  # 자녀 노드군

        # 국면 가치 계산
        def evaluate(self):
            # 게임 종료 시
            if self.state.is_done():
                # 승패 결과로 가치 얻기
                value = -1 if self.state.is_lose() else 0

                # 누계 가치와 시행 횟수 갱신
                self.w += value
                self.n += 1
                return value

            # 자녀 노드가 존재하지 않는 경우
            if not self.child_nodes:
                # 뉴럴 네트워크 추론을 활용한 정책과 가치 얻기
                policies, value = predict(model, self.state, device)

                # 누계 가치와 시행 횟수 갱신
                self.w += value
                self.n += 1

                # 자녀 노드 전개
                self.child_nodes = []
                for action, policy in zip(self.state.legal_actions(), policies):
                    self.child_nodes.append(Node(self.state.next(action), policy))
                return value

            # 자녀 노드가 존재하지 않는 경우
            else:
                # 아크 평가값이 가장 큰 자녀 노드의 평가로 가치 얻기
                value = -self.next_child_node().evaluate()

                # 누계 가치와 시행 횟수 갱신
                self.w += value
                self.n += 1
                return value

        # 아크 평가가 가장 큰 자녀 노드 얻기
        def next_child_node(self):
            # 아크 평가 계산
            C_PUCT = 1.0
            t = sum(nodes_to_scores(self.child_nodes))
            pucb_values = []
            for child_node in self.child_nodes:
                pucb_values.append((-child_node.w / child_node.n if child_node.n else 0.0) +
                                   C_PUCT * child_node.p * sqrt(t) / (1 + child_node.n))

            # 아크 평가값이 가장 큰 자녀 노드 반환
            return self.child_nodes[np.argmax(pucb_values)]

    # 현재 국면의 노드 생성
    root_node = Node(state, 0)

    # 여러 차례 평가 실행
    for _ in range(PV_EVALUATE_COUNT):
        root_node.evaluate()

    # 합법적인 수의 확률 분포
    scores = nodes_to_scores(root_node.child_nodes)
    if temperature == 0:  # 최대값인 경우에만 1
        action = np.argmax(scores)
        scores = np.zeros(len(scores))
        scores[action] = 1
    else:  # 볼츠만 분포를 기반으로 분산 추가
        scores = boltzman(scores, temperature)
    return scores


# 몬테카를로 트리 탐색을 활용한 행동 선택
def pv_mcts_action(model, temperature, device='cuda'):
    def pv_mcts_action(state):
        scores = pv_mcts_scores(model, state, temperature, device)
        return np.random.choice(state.legal_actions(), p=scores)

    return pv_mcts_action


# 볼츠만 분포
def boltzman(xs, temperature):
    xs = [x ** (1 / temperature) for x in xs]
    return [x / sum(xs) for x in xs]


# 동작 확인
if __name__ == '__main__':
    # GPU/CPU 설정
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    # 모델 로드
    model_path = sorted(Path('./model').glob('*.pth'))[-1]
    model = DualNetwork(INPUT_SHAPE, OUTPUT_SIZE, filters=128, residual_num=16)  # 모델 파라미터 설정
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.to(device)
    model.eval()

    # 상태 생성
    state = State()

    # 몬테카를로 트리 탐색을 활용해 행동을 얻는 함수 생성
    next_action = pv_mcts_action(model, temperature=1.0, device=device)

    # 게임 종료 시까지 반복
    while True:
        # 게임 종료 시
        if state.is_done():
            break

        # 행동 얻기
        action = next_action(state)

        # 다음 상태 얻기
        state = state.next(action)

        # 문자열 출력
        print(state)

  model.load_state_dict(torch.load(model_path, map_location=device))


---
---
o--

---
-x-
o--

---
-x-
o-o

---
-xx
o-o

---
oxx
o-o

x--
oxx
o-o

x--
oxx
ooo



# self play

In [None]:
# 패키지 임포트
import torch
import torch.nn as nn
from datetime import datetime
import numpy as np
import pickle
import os
from pathlib import Path

# 파라미터 준비
SP_GAME_COUNT = 10  # 셀프 플레이를 수행할 게임 수(오리지널: 25,000)



# 선 수를 둔 플레이어 가치
def first_player_value(ended_state):
    # 1: 선 수 플레이어 승리, -1: 선 수 플레이어 패배, 0: 무승부
    if ended_state.is_lose():
        return -1 if ended_state.is_first_player() else 1
    return 0


# 학습 데이터 저장
def write_data(history):
    now = datetime.now()
    os.makedirs('./data/', exist_ok=True)  # 폴더가 없는 경우에는 생성
    path = './data/{:04}{:02}{:02}{:02}{:02}{:02}.history'.format(
        now.year, now.month, now.day, now.hour, now.minute, now.second)
    with open(path, mode='wb') as f:
        pickle.dump(history, f)


# 1 게임 실행
def play(model):
    # 학습 데이터
    history = []

    # 상태 생성
    state = State()

    while True:
        # 게임 종료 시
        if state.is_done():
            break

        # 합법적인 수의 확률 분포 얻기
        scores = pv_mcts_scores(model, state, temperature=1.0, device=device)

        # 학습 데이터에 상태와 정책 추가
        policies = [0] * OUTPUT_SIZE
        for action, policy in zip(state.legal_actions(), scores):
            policies[action] = policy
        history.append([[state.pieces, state.enemy_pieces], policies, None])

        # 행동 얻기
        action = np.random.choice(state.legal_actions(), p=scores)

        # 다음 상태 얻기
        state = state.next(action)

    # 학습 데이터에 가치 추가
    value = first_player_value(state)
    for i in range(len(history)):
        history[i][2] = value
        value = -value
    return history


# 셀프 플레이
def self_play():
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    # 학습 데이터
    history = []

    # 베스트 플레이어 모델 로드
    model = DualNetwork(INPUT_SHAPE, OUTPUT_SIZE, filters=128, residual_num=16)
    model.to(device)

    # 여러 차례 게임 실행
    for i in range(SP_GAME_COUNT):
        # 게임 1회 실행
        h = play(model)
        history.extend(h)

        # 출력
        print('\rSelfPlay {}/{}'.format(i+1, SP_GAME_COUNT), end='')
    print('')

    # 학습 데이터 저장
    write_data(history)

    # 모델 파기
    del model


# 동작 확인
if __name__ == '__main__':
    self_play()

SelfPlay 10/10


# 파라미터 갱신

In [None]:
# 패키지 임포트
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from pathlib import Path
import numpy as np
import pickle
from torch.optim.lr_scheduler import StepLR  # 학습률 스케줄러 임포트

# 파라미터 준비
RN_EPOCHS = 100  # 학습 횟수
BATCH_SIZE = 128  # 배치 크기

# 학습 데이터 로드
def load_data():
    history_path = sorted(Path('./data').glob('*.history'))[-1]
    with history_path.open(mode='rb') as f:
        return pickle.load(f)


# 듀얼 네트워크 학습
def train_network():
    # 학습 데이터 로드
    history = load_data()
    xs, y_policies, y_values = zip(*history)

    # 학습을 위한 입력 데이터 셰이프로 변환
    a, b, c = INPUT_SHAPE
    xs = np.array(xs)
    xs = xs.reshape(len(xs), c, a, b).transpose(0, 2, 3, 1)
    y_policies = np.array(y_policies)
    y_values = np.array(y_values)

    # PyTorch 텐서로 변환
    xs = torch.tensor(xs, dtype=torch.float32)
    y_policies = torch.tensor(y_policies, dtype=torch.float32)
    y_values = torch.tensor(y_values, dtype=torch.float32)

    dataset = TensorDataset(xs, y_policies, y_values)
    dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True)

    # 베스트 플레이어 모델 로드
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = DualNetwork(INPUT_SHAPE, OUTPUT_SIZE, filters=128, residual_num=16)
    # model = torch.load('./model/best.pth')  # .h5 -> .pth로 변경

    # 모델을 GPU로 이동
    # model.to(device)

    optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
    # 손실 함수 설정
    criterion_policy = torch.nn.CrossEntropyLoss()
    criterion_value = torch.nn.MSELoss()

    # 학습률 스케줄러 설정
    lr_scheduler = StepLR(optimizer, step_size=50, gamma=0.5)  # 50번 에포크마다 학습률을 0.5배로 줄임

    # 학습 시작
    for epoch in range(RN_EPOCHS):
        model.train()
        running_loss_policy = 0.0
        running_loss_value = 0.0
        for batch_idx, (inputs, policies, values) in enumerate(dataloader):
            # 옵티마이저 초기화
            optimizer.zero_grad()

            # 모델 예측
            policy_output, value_output = model(inputs)

            # 손실 계산
            loss_policy = criterion_policy(policy_output, policies)
            loss_value = criterion_value(value_output, values)

            # 전체 손실 계산
            loss = loss_policy + loss_value

            # 역전파
            loss.backward()
            optimizer.step()

            # 손실 출력
            running_loss_policy += loss_policy.item()
            running_loss_value += loss_value.item()

        # 학습률 조정
        lr_scheduler.step()

        # 매 epoch 출력
        print(f'\rTrain {epoch + 1}/{RN_EPOCHS}, Loss Policy: {running_loss_policy:.4f}, Loss Value: {running_loss_value:.4f}, LR: {optimizer.param_groups[0]["lr"]:.6f}', end='')

    print('')

    # 최신 플레이어 모델 저장
    torch.save(model, './model/latest.pth')  # .h5 -> .pth로 변경

    # 모델 파기
    del model

# 동작 확인
if __name__ == '__main__':
    train_network()

  return F.mse_loss(input, target, reduction=self.reduction)


Train 100/100, Loss Policy: 1.9135, Loss Value: 1.5909, LR: 0.000250


# 신규 파라미터 평가 파트

In [None]:
# 패키지 임포트
import torch
import torch.nn as nn
from pathlib import Path
from shutil import copy
import numpy as np

# 파라미터 준비
EN_GAME_COUNT = 10  # 평가 1회 당 게임 수(오리지널: 400)
EN_TEMPERATURE = 1.0  # 볼츠만 분포 온도


# 선 수를 둔 플레이어의 포인트
def first_player_point(ended_state):
    # 1: 선 수 플레이어 승리, 0: 선 수 플레이어 패배, 0.5: 무승부
    if ended_state.is_lose():
        return 0 if ended_state.is_first_player() else 1
    return 0.5


# 1 게임 실행
def play(next_actions):
    # 상태 생성
    state = State()

    # 게임 종료 시까지 반복
    while True:
        # 게임 종료 시
        if state.is_done():
            break

        # 행동 얻기
        next_action = next_actions[0] if state.is_first_player() else next_actions[1]
        action = next_action(state)

        # 다음 상태 얻기
        state = state.next(action)

    # 선 수 플레이어의 포인트 반환
    return first_player_point(state)


# 베스트 플레이어 교대
def update_best_player():
    copy('./model/latest.h5', './model/best.h5')
    print('Change BestPlayer')


# 네트워크 평가
def evaluate_network():
    # 최신 플레이어 모델 로드
    model0 = torch.load('./model/latest.pth', map_location=device, weights_only=False)
    # model0.eval()  # 평가 모드로 설정

    # 베스트 플레이어 모델 로드
    model1 = torch.load('./model/best.pth', map_location=device, weights_only=False)
    # model1.eval()  # 평가 모드로 설정

    # PV MCTS를 활용해 행동 선택을 수행하는 함수 생성
    next_action0 = pv_mcts_action(model0, EN_TEMPERATURE)
    next_action1 = pv_mcts_action(model1, EN_TEMPERATURE)
    next_actions = (next_action0, next_action1)

    # 여러 차례 대전을 반복
    total_point = 0
    for i in range(EN_GAME_COUNT):
        # 1 게임 실행
        if i % 2 == 0:
            total_point += play(next_actions)
        else:
            total_point += 1 - play(list(reversed(next_actions)))

        # 출력
        print('\rEvaluate {}/{}'.format(i + 1, EN_GAME_COUNT), end='')
    print('')

    # 평균 포인트 계산
    average_point = total_point / EN_GAME_COUNT
    print('AveragePoint', average_point)

    # 모델 파기
    del model0
    del model1

    # 베스트 플레이어 교대
    if average_point > 0.5:
        update_best_player()
        return True
    else:
        return False

# 동작 확인
if __name__ == '__main__':
    evaluate_network()

TypeError: 'collections.OrderedDict' object is not callable