<a href="https://colab.research.google.com/github/githyj-jang/Omok_RL/blob/main/Omok_with_A3C.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import tensorflow as tf
import threading
import random
import gym
import time
from tensorflow.keras.layers import Dense, Flatten, Conv2D
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import TensorBoard
from collections import deque

  and should_run_async(code)


In [None]:
# 오목 보드 크기
BOARD_SIZE = 15

GAMMA = 0.99
LEARNING_RATE = 0.01
GLOBAL_UPDATE_FREQUENCY = 10
NUM_WORKERS = 4
MAX_EPISODES = 1000

In [None]:
# 글로벌 네트워크 정의
class GlobalNetwork(tf.keras.Model):
    def __init__(self):
        super(GlobalNetwork, self).__init__()
        self.conv1 = Conv2D(32, (3, 3), activation='relu', input_shape=(BOARD_SIZE, BOARD_SIZE, 1))
        self.conv2 = Conv2D(64, (3, 3), activation='relu')
        self.flatten = Flatten()
        self.dense = Dense(128, activation='relu')
        self.policy_logits = Dense(BOARD_SIZE * BOARD_SIZE)
        self.value = Dense(1)

    def call(self, inputs):
        x = self.conv1(inputs)
        x = self.conv2(x)
        x = self.flatten(x)
        x = self.dense(x)
        logits = self.policy_logits(x)
        value = self.value(x)
        return logits, value

In [None]:
# 오목 게임 환경 정의
class OmokEnv:
    def __init__(self, board_size=BOARD_SIZE):
        self.board_size = board_size
        self.board = np.zeros((board_size, board_size))
        self.current_player = 1  # 1은 흰돌, -1은 검은돌

    def reset(self):
        self.board = np.zeros((self.board_size, self.board_size))
        self.current_player = 1
        return self.board

    def step(self, action):
        available_actions = get_available_actions(self.board)
        if not available_actions:
            return self.board, 0, True, {}
        x, y = action
        if self.board[x, y] == 0:
            self.board[x, y] = self.current_player
            reward = get_reward(self.board, self.current_player, action)
            done = is_terminal_state(self.board)
            self.current_player *= -1
            return self.board, reward, done, {}
        else:
            return self.board, 0, False, {}

    def render(self):
        for row in self.board:
            print(' '.join(['.' if x == 0 else 'O' if x == 1 else 'X' for x in row]))
        print()

In [None]:
# 가능한 행동 반환 함수
def get_available_actions(board):
    return [(i, j) for i in range(BOARD_SIZE) for j in range(BOARD_SIZE) if board[i][j] == 0]

# reward 계산 함수
def get_reward(board, player, action):
    x, y = action
    reward = 0
    if check_win(board, player):
        return 1  # 승리 시 보상
    if check_win(board, -player):
        return -1  # 패배 시 보상
    directions = [(1, 0), (0, 1), (1, 1), (1, -1)]
    for dx, dy in directions:
        count_player = count_consecutive_stones(board, player, x, y, dx, dy)
        count_opponent = count_consecutive_stones(board, -player, x, y, dx, dy)
        # 연속된 돌의 수에 따라 보상 부여
        if count_player == 4:
            reward += 0.05  # 4개의 연속된 돌을 놓는다면 큰 보상
        elif count_player == 3:
            reward += 0.02  # 3개의 연속된 돌을 놓는다면 보상
        elif count_player == 2:
            reward += 0.01  # 2개의 연속된 돌을 놓는다면 작은 보상
        # 상대방의 연속된 돌을 막는다면 보상
        if count_opponent == 4:
            reward += 0.05
        elif count_opponent == 3:
            reward += 0.02
        elif count_opponent == 2:
            reward += 0.01
    return reward

# 연속된 돌 개수 계산 함수
def count_consecutive_stones(board, player, x, y, dx, dy):
    count = 0
    while 0 <= x < BOARD_SIZE and 0 <= y < BOARD_SIZE and board[x][y] == player:
        count += 1
        x += dx
        y += dy
    x -= dx
    y -= dy
    while 0 <= x < BOARD_SIZE and 0 <= y < BOARD_SIZE and board[x][y] == player:
        count += 1
        x -= dx
        y -= dy
    return count - 1

# 승리 조건 체크 함수
def check_win(board, player):
    def check_direction(x, y, dx, dy):
        count = 0
        for _ in range(5):
            if 0 <= x < BOARD_SIZE and 0 <= y < BOARD_SIZE and board[x][y] == player:
                count += 1
            else:
                break
            x += dx
            y += dy
        return count == 5

    for i in range(BOARD_SIZE):
        for j in range(BOARD_SIZE):
            if (check_direction(i, j, 1, 0) or  # 가로
                check_direction(i, j, 0, 1) or  # 세로
                check_direction(i, j, 1, 1) or  # 대각선 (\)
                check_direction(i, j, 1, -1)):  # 대각선 (/)
                return True
    return False

# 게임 종료 여부 체크 함수 (승리 또는 무승부)
def is_terminal_state(board):
    return check_win(board, 1) or check_win(board, -1) or all(all(cell != 0 for cell in row) for row in board)

# 행동 선택 함수
def choose_action(state, available_actions, policy):
    if not available_actions:
        return random.choice(get_available_actions(state))
    logits, _ = policy(state.reshape(1, BOARD_SIZE, BOARD_SIZE, 1))
    prob = tf.nn.softmax(logits)
    action_index = np.argmax(prob.numpy())

    if action_index >= len(available_actions):
      print("Warning: action_index is out of bounds for available_actions")
      return None

    return available_actions[action_index]


In [None]:
# 워커 정의
class Worker(threading.Thread):
    def __init__(self, global_model, optimizer, worker_id):
        super(Worker, self).__init__()
        self.global_model = global_model
        self.optimizer = optimizer
        self.worker_id = worker_id
        self.local_model = GlobalNetwork()
        self.env = OmokEnv()

    def run(self):
        total_step = 1
        while total_step <= MAX_EPISODES:
            current_state = self.env.reset()
            done = False
            step = 0
            while not done:
                step += 1
                available_actions = get_available_actions(current_state)
                action = choose_action(current_state, available_actions, self.local_model)
                row, col = action
                next_state, reward, done, _ = self.env.step((row, col))

                with tf.GradientTape() as tape:
                    logits, value = self.local_model(tf.convert_to_tensor(current_state.reshape(1, BOARD_SIZE, BOARD_SIZE, 1), dtype=tf.float32))
                    _, next_value = self.local_model(tf.convert_to_tensor(next_state.reshape(1, BOARD_SIZE, BOARD_SIZE, 1), dtype=tf.float32))
                    advantage = reward + GAMMA * next_value * (1 - int(done)) - value
                    policy_loss = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=[row * BOARD_SIZE + col], logits=logits)
                    value_loss = advantage ** 2
                    loss = policy_loss + value_loss

                grads = tape.gradient(loss, self.local_model.trainable_weights)
                self.optimizer.apply_gradients(zip(grads, self.global_model.trainable_weights))
                self.local_model.set_weights(self.global_model.get_weights())

                current_state = next_state
                total_step += 1

                if done:
                    print(f"Worker {self.worker_id} Episode {total_step} Reward {reward}")

In [None]:
start_time = time.time()
# 메인 실행 부분
global_model = GlobalNetwork()
global_model(tf.convert_to_tensor(np.zeros((1, BOARD_SIZE, BOARD_SIZE, 1)), dtype=tf.float32))  # 모델 빌드
optimizer = Adam(learning_rate=LEARNING_RATE)

workers = []
for i in range(NUM_WORKERS):
    worker = Worker(global_model, optimizer, i)
    workers.append(worker)

for worker in workers:
    worker.start()

for worker in workers:
    worker.join()

print("A3C 학습 완료")

elapsed_time = time.time() - start_time

print(elapsed_time)


Worker 0 Episode 45 Reward 1
Worker 2 Episode 63 Reward 1
Worker 1 Episode 62 Reward 1
Worker 3 Episode 67 Reward 1


Exception in thread Thread-29:
Traceback (most recent call last):
  File "/usr/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
Exception in thread Thread-28:
Traceback (most recent call last):
  File "/usr/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
    self.run()
  File "<ipython-input-49-fa8bd705760e>", line 20, in run
  File "<ipython-input-48-ef9762923006>", line 78, in choose_action
  File "/usr/local/lib/python3.10/dist-packages/keras/src/utils/traceback_utils.py", line 70, in error_handler
    raise e.with_traceback(filtered_tb) from None
  File "<ipython-input-46-38e62529152d>", line 15, in call
tensorflow.python.framework.errors_impl.InvalidArgumentError: Exception encountered when calling layer 'flatten_23' (type Flatten).

{{function_node __wrapped__Reshape_device_/job:localhost/replica:0/task:0/device:GPU:0}} Input to reshape is a tensor with 7744 values, but the requested shape has 1 [Op:Reshape] name: 

Call arguments received by layer 'flatt

Worker 0 Episode 146 Reward 1
Worker 1 Episode 137 Reward 1
Worker 0 Episode 203 Reward 1
Worker 1 Episode 194 Reward 1
Worker 0 Episode 260 Reward 1
Worker 1 Episode 251 Reward 1
Worker 0 Episode 317 Reward 1
Worker 1 Episode 308 Reward 1
Worker 0 Episode 374 Reward 1
Worker 1 Episode 365 Reward 1
Worker 0 Episode 431 Reward 1
Worker 1 Episode 422 Reward 1
Worker 0 Episode 488 Reward 1
Worker 1 Episode 479 Reward 1
Worker 0 Episode 545 Reward 1
Worker 1 Episode 536 Reward 1
Worker 0 Episode 602 Reward 1
Worker 0 Episode 659 Reward 1
Worker 1 Episode 593 Reward 1
Worker 0 Episode 716 Reward 1
Worker 1 Episode 650 Reward 1
Worker 0 Episode 773 Reward 1
Worker 1 Episode 707 Reward 1
Worker 0 Episode 830 Reward 1
Worker 1 Episode 764 Reward 1
Worker 0 Episode 887 Reward 1
Worker 1 Episode 821 Reward 1
Worker 0 Episode 944 Reward 1
Worker 1 Episode 868 Reward 1
Worker 0 Episode 995 Reward 1
Worker 0 Episode 1082 Reward 1
Worker 1 Episode 970 Reward 1


Exception in thread Thread-27:
Traceback (most recent call last):
  File "/usr/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
    self.run()
  File "<ipython-input-49-fa8bd705760e>", line 21, in run
TypeError: cannot unpack non-iterable NoneType object


A3C 학습 완료
110.95767951011658


In [None]:
def evaluate_model(global_model, episodes=100):
    env = OmokEnv()
    player1_wins = 0
    player2_wins = 0
    draws = 0
    for _ in range(episodes):
        state = env.reset()
        done = False
        while not done:
            # Player 1 (학습된 모델)의 행동 선택
            available_actions = get_available_actions(state)
            action = choose_action(state, available_actions, global_model)
            state, reward, done, _ = env.step(action)

            if done:
                if reward == 1:
                    player1_wins += 1
                elif reward == -1:
                    player2_wins += 1
                else:
                    draws += 1

            if not done:
                # Player 2 (무작위 행동)의 행동 선택
                available_actions = get_available_actions(state)
                action = random.choice(available_actions)
                state, reward, done, _ = env.step(action)

                if done:
                  if reward == 1:
                      player1_wins += 1
                  elif reward == -1:
                      player2_wins += 1
                  else:
                      draws += 1

    player1_win_rate = player1_wins / episodes
    player2_win_rate = player2_wins / episodes
    draw_rate = draws / episodes

    return player1_win_rate, player2_win_rate, draw_rate

player1_win_rate, player2_win_rate, draw_rate = evaluate_model(global_model, episodes=100)

print(f"Player 1 (학습된 모델)의 승률: {player1_win_rate * 100}%")
print(f"Player 2 (무작위 행동)의 승률: {player2_win_rate * 100}%")
print(f"무승부 비율: {draw_rate * 100}%")

Player 1 (학습된 모델)의 승률: 100.0%
Player 2 (무작위 행동)의 승률: 0.0%
무승부 비율: 0.0%


In [None]:
def evaluate_model_equal(global_model, episodes=100):
    env = OmokEnv()
    player1_wins = 0
    player2_wins = 0
    draws = 0
    for _ in range(episodes):
        state = env.reset()
        done = False
        while not done:
            # Player 1 (학습된 모델)의 행동 선택
            available_actions = get_available_actions(state)
            action = choose_action(state, available_actions, global_model)
            state, reward, done, _ = env.step(action)

            if done:
                if reward == 1:
                    player1_wins += 1
                elif reward == -1:
                    player2_wins += 1
                else:
                    draws += 1

            if not done:
                # Player 2 (학습된 모델)의 행동 선택
                available_actions = get_available_actions(state)
                action = choose_action(state, available_actions, global_model)
                state, reward, done, _ = env.step(action)

                if done:
                  if reward == 1:
                      player1_wins += 1
                  elif reward == -1:
                      player2_wins += 1
                  else:
                      draws += 1


    player1_win_rate = player1_wins / episodes
    player2_win_rate = player2_wins / episodes
    draw_rate = draws / episodes

    return player1_win_rate, player2_win_rate, draw_rate

player1_win_rate, player2_win_rate, draw_rate = evaluate_model_equal(global_model, episodes=100)

print(f"Player 1 (학습된 모델)의 승률: {player1_win_rate * 100}%")
print(f"Player 2 (학습된 모델)의 승률: {player2_win_rate * 100}%")
print(f"무승부 비율: {draw_rate * 100}%")

Player 1 (학습된 모델)의 승률: 100.0%
Player 2 (학습된 모델)의 승률: 0.0%
무승부 비율: 0.0%
