In [None]:
# 최종본에서 cnn pooling layer 삭제 버전

# Minesweeper

## Import

In [None]:
import random
import time
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from collections import deque
import os, sys

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cuda')

## Env_new

In [None]:
class Environment:
    def __init__(self):
        self.grid_size_X = 9
        self.grid_size_Y = 9
        self.num_mines = 10

        self.minefield = np.zeros((self.grid_size_X, self.grid_size_Y), dtype=int)

        self.playerfield = np.full((self.grid_size_X, self.grid_size_Y), 9, dtype=int)

        self.state_size = self.minefield.size

        self.explode = False
        self.done = False
        self.first_move = True
        self.visited = set()

        self.rewards = {'explode' : -1, 'noprogress' : -0.1,'progress' : 0.3, 'guess' : 0.1, 'clear' : 1}

    def reset(self):
        self.minefield = np.zeros((self.grid_size_X, self.grid_size_Y), dtype=int)
        self.playerfield = np.full((self.grid_size_X, self.grid_size_Y), 9, dtype=int)

        self.explode = False
        self.done = False
        self.first_move = True

        self.visited = set()

        self.place_mines()

        return list(self.playerfield)

    def place_mines(self):
        mines_placed = 0

        # num_mines만큼 임의의 좌표에 지뢰 심기
        while mines_placed < self.num_mines:
            x = random.randint(0, self.grid_size_X - 1)
            y = random.randint(0, self.grid_size_Y - 1)

            if self.minefield[x, y] == 0:
                self.minefield[x, y] = -1
                mines_placed += 1

        # 지뢰 없는 좌표: 인접 지뢰 개수 세기
        for x in range(self.grid_size_X):
            for y in range(self.grid_size_Y):
                if self.minefield[x, y] == -1:
                    continue
                self.minefield[x, y] = self.count_adjacent_mines(x, y)

    def count_adjacent_mines(self, x, y):
        count = 0
        # (x,y) 주변 지뢰 개수
        for i in range(max(0, x - 1), min(self.grid_size_X, x + 2)):
            for j in range(max(0, y - 1), min(self.grid_size_Y, y + 2)):
                if (i, j) != (x, y) and self.minefield[i, j] == -1:
                    count += 1
        return count

    def count_adjacent_hidden(self, x, y):
        count = 0
        # (x,y) 주변 hidden tile 개수
        for i in range(max(0, x - 1), min(self.grid_size_X, x + 2)):
            for j in range(max(0, y - 1), min(self.grid_size_Y, y + 2)):
                if (i, j) != (x, y) and self.playerfield[i, j] == 9:
                    count += 1
        return count

    def step(self, action):
        x, y = divmod(action, self.grid_size_X)

        reward = 0
        done = False

        # explode: 지뢰 선택 시 done
        if self.minefield[x, y] == -1:
            self.playerfield[x, y] = self.minefield[x, y]
            self.explode = True
            done = True
            reward = self.rewards['explode']

        # 지뢰를 선택하지 않은 경우
        else:
          # noprogress: 선택한 좌표 (x,y)가 이미 방문된 경우
            if (x, y) in self.visited:
                reward = self.rewards['noprogress']
          # 선택한 좌표 (x, y)가 처음 방문된 경우
            else:
                self.playerfield[x, y] = self.minefield[x, y]
                self.visited.add((x,y))
                # 가장자리 타일
                if x in [0, 8] or y in [0, 8]:
                    # guess
                    if self.count_adjacent_hidden(x, y) == 5:
                        reward = self.rewards['guess']
                    # progress
                    else:
                        reward = self.rewards['progress']
                # 꼭짓점 타일
                elif x in [0, 8] and y in [0, 8]:
                    # guess
                    if self.count_adjacent_hidden(x, y) == 3:
                        reward = self.rewards['guess']
                    # progress
                    else:
                        reward = self.rewards['progress']
                # 중심부 타일
                else:
                    if self.count_adjacent_hidden(x, y) == 8:
                        reward = self.rewards['guess']
                    # progress
                    else:
                        reward = self.rewards['progress']
                # open한 타일이 0이면 주위 타일 open
                if self.playerfield[x, y] == 0:
                  self.auto_reveal_tiles(x, y)

            # clear: 모든 hidden 타일이 지뢰만 남아 있는 경우 승리
            if np.count_nonzero(self.playerfield == 9) == self.num_mines:
                done = True
                reward = self.rewards['clear']

        self.done = done
        next_state = self.playerfield
        return next_state, reward, done

    def check_boundary(self, x, y):
        return 0 <= x < self.grid_size_X and 0 <= y < self.grid_size_Y

    def auto_reveal_tiles(self, x, y):
        queue = deque([(x, y)])

        while queue:
            cx, cy = queue.popleft()
            self.visited.add((cx, cy))
            self.playerfield[cx, cy] = self.minefield[cx, cy]

            # (cx, cy) 주변 8개 타일 확인
            if self.minefield[cx, cy] == 0: # 방문하지 않았으면 open
                for dx in [-1, 0, 1]:
                    for dy in [-1, 0, 1]:
                        nx, ny = cx + dx, cy + dy
                        # 인덱스가 게임판 범위 내에 있는지 확인
                        if self.check_boundary(nx, ny) and (nx, ny) not in self.visited and (nx, ny) not in queue:  # nonvisited 주위 타일 큐에 추가
                            queue.append((nx, ny))

    def render(self):
        for x in range(self.grid_size_X):
            for y in range(self.grid_size_Y):
                tile = self.playerfield[x, y]
                if tile == 9:
                    print('.', end=' ')
                elif tile == -1:
                    print('X', end=' ')
                else:
                    print(tile, end=' ')
                if y == self.grid_size_Y - 1:
                    print()
        print('\n')

## Net

In [None]:
class Net(nn.Module):
    def __init__(self, action_size):
        super(Net, self).__init__()

        self.conv1 = nn.Conv2d(1, 64, kernel_size=3, stride=1, padding=2, bias=False)
        self.conv2 = nn.Conv2d(64, 64, kernel_size=3, stride=1, padding=1, bias=False)
        self.conv3 = nn.Conv2d(64, 64, kernel_size=3, stride=1, padding=1, bias=False)
        self.conv4 = nn.Conv2d(64, action_size, kernel_size=3, stride=1, padding=1, bias=False)

        self.bn2 = nn.BatchNorm2d(64)
        self.bn3 = nn.BatchNorm2d(64)

        self.dropout = nn.Dropout(0.25)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.bn2(self.conv2(x)))
        x = F.relu(self.bn3(self.conv3(x)))
        x = F.relu(self.conv4(x))
        x = self.dropout(x)
        x = torch.mean(x, dim=[2, 3])  # Global Average Pooling
        return x

## Agent

In [None]:
DISCOUNT_FACTOR = 0.1
LEARNING_RATE = 0.01

EPSILON = 0.99
EPSILON_DECAY = 0.9999
EPSILON_MIN = 0.01

TARGET_UPDATE_COUNTER = 0
UPDATE_TARGET_EVERY = 5

BATCH_SIZE = 64
TRAIN_START = 1000
MAX_LEN = 50000

In [None]:
class MineSweeper(nn.Module):
    def __init__(self, state_size, action_size, grid_size_X, grid_size_Y, environment):
        super(MineSweeper, self).__init__()
        self.render = False

        self.state_size = state_size
        self.action_size = action_size
        self.grid_size_X = grid_size_X
        self.grid_size_Y = grid_size_Y

        self.environment = environment

        self.discount_factor = DISCOUNT_FACTOR
        self.learning_rate = LEARNING_RATE
        self.epsilon = EPSILON
        self.epsilon_decay = EPSILON_DECAY
        self.epsilon_min = EPSILON_MIN

        self.target_update_counter = TARGET_UPDATE_COUNTER
        self.update_target_every = UPDATE_TARGET_EVERY

        self.batch_size = BATCH_SIZE
        self.train_start = TRAIN_START
        self.maxlen = MAX_LEN
        self.minlen = MIN_LEN

        self.memory = deque(maxlen=self.maxlen)

        self.model = Net(self.action_size).to(device)
        self.target_model = Net(self.action_size).to(device)
        self.loss = nn.MSELoss()
        self.optimizer = optim.Adam(self.model.parameters(), lr=self.learning_rate)
        self.scheduler = optim.lr_scheduler.CyclicLR(optimizer=self.optimizer, base_lr=0.0001, max_lr=0.1, step_size_up=10000, mode='exp_range')

        self.update_target_model()

    def update_target_model(self):
        self.target_model.load_state_dict(self.model.state_dict())

    def get_action(self, state):
        state = np.array(state).reshape(1, 1, self.grid_size_X, self.grid_size_Y)
        state = torch.FloatTensor(state).to(device)

        if np.random.rand() <= self.epsilon:
            action = random.randrange(self.action_size)
        else:
            q_value = self.model(state)
            self.q_value = q_value.detach().cpu().numpy().flatten()
            action = torch.argmax(q_value).item()

        return action

    def append_sample(self, state, action, reward, next_state, done):
        state = state
        next_state = next_state
        self.memory.append((state, action, reward, next_state, done))

    def train_model(self):
        if len(self.memory) < self.batch_size:
            return

        minibatch = random.sample(self.memory, self.batch_size)
        states, actions, rewards, next_states, dones = zip(*minibatch)

        states = np.array(states)
        actions = np.array(actions)
        rewards = np.array(rewards)
        next_states = np.array(next_states)
        dones = np.array(dones)

        states = states.reshape(self.batch_size, 1, self.grid_size_X, self.grid_size_Y)
        next_states = next_states.reshape(self.batch_size, 1, self.grid_size_X, self.grid_size_Y)

        states = torch.tensor(states, dtype=torch.float32).to(device)
        actions = torch.tensor(actions, dtype=torch.long).to(device)
        rewards = torch.tensor(rewards, dtype=torch.float32).to(device)
        next_states = torch.tensor(next_states, dtype=torch.float32).to(device)
        dones = torch.tensor(dones, dtype=torch.float32).to(device)

        pred = self.model(states)
        target_pred = self.target_model(next_states).max(1)[0].detach()

        targets = rewards + (1 - dones) * self.discount_factor * target_pred

        pred = pred.gather(1, actions.unsqueeze(1))
        trg = targets.unsqueeze(1)

        loss = self.loss(pred, trg)

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        self.scheduler.step()

        self.target_update_counter += 1
        if self.target_update_counter >= self.update_target_every:
            self.target_update_counter = 0
            self.update_target_model()

##모델 저장

In [None]:
def save_checkpoint(model, optimizer, epoch, path):
    torch.save({
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict()
    }, path)

def load_checkpoint(model, optimizer, path):
    checkpoint = torch.load(path)
    model.load_state_dict(checkpoint['model_state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    epoch = checkpoint['epoch']
    return model, optimizer, epoch

## 시각화

In [None]:
def plot_training_results(episodes, episodes_N, scores, win_rates, timesteps):
    fig, axs = plt.subplots(3, figsize=(12, 18))

    # 에피소드 점수
    axs[0].plot(episodes, scores, label='Score')
    axs[0].set_xlabel('Episode')
    axs[0].set_ylabel('Score')
    axs[0].set_title('Episode Scores')
    axs[0].legend()
    axs[0].grid(True)

    # 승률
    axs[1].plot(episodes_N, win_rates, label='Win Rate', color='orange')
    axs[1].set_xlabel('Episode')
    axs[1].set_ylabel('Win Rate (%)')
    axs[1].set_title('Win Rate over Episodes')
    axs[1].legend()
    axs[1].grid(True)

    # 타임스텝
    axs[2].plot(episodes, timesteps, label='Timesteps', color='green')
    axs[2].set_xlabel('Episode')
    axs[2].set_ylabel('Timesteps')
    axs[2].set_title('Timesteps per Episode')
    axs[2].legend()
    axs[2].grid(True)

    plt.tight_layout()
    plt.show()

def visualize_q_values(q_values, grid_size_X, grid_size_Y):
    q_values_grid = q_values.reshape(grid_size_X, grid_size_Y)

    plt.figure(figsize=(8, 8))
    plt.imshow(q_values_grid, cmap='viridis', interpolation='none')
    plt.colorbar()
    plt.title("Q-values")

    for i in range(grid_size_X):
        for j in range(grid_size_Y):
            plt.text(j, i, f'{q_values_grid[i, j]:.2f}', ha='center', va='center', color='white', fontsize=8, fontweight='bold')

    plt.show()

## Train

In [None]:
env = Environment()

state_size = env.state_size
action_size = env.state_size
grid_size_X = env.grid_size_X
grid_size_Y = env.grid_size_Y

agent = MineSweeper(state_size, action_size, grid_size_X, grid_size_Y, env)

EPISODES = 100000
RENDER_PROCESS = False
RENDER_END = False

total_moves = []
scores = np.zeros(EPISODES)
length_memory = np.zeros(EPISODES)
wins = np.zeros(EPISODES)
episodes = np.zeros(EPISODES)
timesteps = np.zeros(EPISODES)
win_rates = {}

N = 500
CHECKPOINT_INTERVAL = 10000

for epi in range(EPISODES):
    done = False
    score = 0
    time_step = 0
    actions = []
    rewards = []

    state = env.reset()

    last_loss = None

    while not done and time_step <= 71:
        time_step += 1
        if env.first_move:
            mine_state = env.minefield.flatten()
            first_action = random.randint(0, len(mine_state)-1)
            first_state = mine_state[first_action]
            while first_state == -1:
                first_action = random.randint(0, len(mine_state)-1)
                first_state = mine_state[first_action]
            action = first_action
            env.first_move = False
        else:
            action = agent.get_action(state)

        next_state, reward, done = env.step(action)
        score += reward

        (action_x, action_y) = divmod(action, env.grid_size_X)
        actions.append((action_x, action_y))
        rewards.append(reward)

        # state (신경망의 input) 정규화
        scaled_state = (next_state - (-1)) / (8 - (-1))

        agent.append_sample(state, action, reward, scaled_state, done)

        if len(agent.memory) >= agent.train_start:
            agent.train_model()

        state = next_state

    scores[epi] = score
    timesteps[epi] = time_step

    # 에피소드가 끝날 때 승리 여부를 기록
    if env.explode or time_step > 71:
        wins[epi] = 0
    elif not env.explode:
        wins[epi] = 1
        print(f"episode: {epi}")
        print(f"episode score: {score}")
        print(f"time step: {time_step}")
        print(f"epsilon: {agent.epsilon:.4f}")
        env.render()

    if agent.epsilon > agent.epsilon_min:
        agent.epsilon *= agent.epsilon_decay

    if (epi+1) % N == 0:
        scores_N = np.median(scores[max(0, epi-N+1):epi+1])  # 마지막 N개의 요소에 대한 중간값 계산
        win_rate = np.mean(wins[max(0, epi-N+1):epi+1]) * 100  # 마지막 N개의 에피소드에 대한 승률 계산
        win_rates[epi] = win_rate
        length_memory[epi] = len(agent.memory)
        print(f"episode: {epi:3d} | time step: {time_step}")
        print(f"episode score: {score} | epsilon: {agent.epsilon:.4f}\n")
        print(f"<last {N} episode> score: {scores_N:.2f} | win rate: {win_rate:.2f}%\n")
        print(f"wins: {np.sum(wins[max(0, epi-N+1):epi+1])}\n")
        print(f"length of memory: {length_memory[epi]}\n")
        env.render()
        print(f"chosen_coordinate: {actions}")
        print(f"reward per time step: {rewards}")
        print("--------------------------------------------------")

    # 매 CHECKPOINT_INTERVAL마다 모델 저장
    if (epi+1) % CHECKPOINT_INTERVAL == 0:
        checkpoint_path = f"checkpoint_{epi}.tar"
        save_checkpoint(agent, agent.optimizer, epi, checkpoint_path)
        print(f"Checkpoint saved at episode {epi} to {checkpoint_path}.")

episode: 499 | time step: 12
episode score: 0.10000000000000009 | epsilon: 0.9417

<last 500 episode> score: -0.80 | win rate: 0.00%

wins: 0.0

length of memory: 4895.0

0 1 . . . 1 . . . 
0 2 . . 2 . . . . 
1 2 . . . . . . . 
. . . . . . . . 1 
1 1 2 . . . . . . 
0 0 1 X 2 2 2 2 1 
0 0 1 1 1 0 0 0 0 
1 1 1 0 0 0 0 0 0 
1 . 1 0 0 0 0 0 0 


chosen_coordinate: [(5, 5), (8, 0), (5, 0), (0, 0), (4, 1), (1, 4), (3, 8), (7, 2), (6, 7), (0, 0), (0, 5), (5, 3)]
reward per time step: [0.1, 0.3, 0.1, 0.3, -0.1, 0.1, 0.1, -0.1, 0.1, -0.1, 0.3, -1]
--------------------------------------------------
episode: 999 | time step: 2
episode score: -0.9 | epsilon: 0.8958

<last 500 episode> score: -0.80 | win rate: 0.00%

wins: 0.0

length of memory: 9702.0

. . . X . . . . . 
. . . . . . . . . 
. . . . . . . . . 
. . . . . . . . . 
. . . . . . . . . 
. . . . . . . . . 
. . . . . . . . . 
. . . . . . . . . 
. . . . . . 1 . . 


chosen_coordinate: [(8, 6), (0, 3)]
reward per time step: [0.1, -1]
--------

In [None]:
# 훈련 완료 후 Q-value 시각화
q_values = agent.q_value
visualize_q_values(q_values, grid_size_X, grid_size_Y)

In [None]:
episodes = list(range(0, EPISODES))

In [None]:
plot_training_results(episodes, [np.median(scores[i-N:i]) for i in episodes],
                      [np.mean(wins[i-N:i]) * 100 for i in episodes],
                      [np.median(timesteps[i-N:i]) for i in episodes])

##저장한 모델 불러오기

In [None]:
# # 체크포인트 불러오기
# checkpoint_path = 'checkpoint_5000.tar'  # 예를 들어 5000번째 에피소드 체크포인트
# agent, optimizer, start_epoch, last_loss = load_checkpoint(agent, optimizer, checkpoint_path)

# print(f"Checkpoint loaded from {checkpoint_path}. Starting from epoch {start_epoch}.")


## Test

In [None]:
# TEST_EPI = 1000

# test_scores = np.zeros(TEST_EPI)
# test_wins = np.zeros(TEST_EPI)
# test_timesteps = np.zeros(TEST_EPI)

# for i in range(TEST_EPI):
#     state = env.reset()
#     done = False
#     score = 0
#     steps = 0
#     agent.epsilon = 0

#     while not done:
#         action = agent.get_action(state)
#         next_state, reward, done = env.step(action)
#         score += reward
#         steps += 1
#         state = next_state

#     if done and not env.explode:
#         test_wins[i] = 1
#     else:
#         test_wins[i] = 0

#     test_scores[i] = score
#     test_timesteps[i] = steps

#     if (i+1) % 100 == 0:
#         print(f"Episode {i+1} | Score: {score}, Steps: {steps}")
#         env.render()
#         visualize_q_values(agent.q_value, agent.grid_size_X, agent.grid_size_Y) # Q-values 시각화
#         print("\n")
#         print("--------------------------------------------------------------------------------------")
#         print("\n")

# # 테스트 승률 출력
# test_win_rate = np.mean(test_wins) * 100
# print(f"Test Win Rate: {test_win_rate:.2f}%")

# test_epi = list(range(TEST_EPI))
# fig, axs = plt.subplots(2, figsize=(12, 18))
# # 에피소드 점수
# axs[0].plot(test_epi, test_scores, label='Score')
# axs[0].set_xlabel('Episode')
# axs[0].set_ylabel('Score')
# axs[0].set_title('Episode Scores')
# axs[0].legend()
# axs[0].grid(True)

# # 타임스텝
# axs[1].plot(test_epi, test_timesteps, label='Timesteps', color='green')
# axs[1].set_xlabel('Episode')
# axs[1].set_ylabel('Timesteps')
# axs[1].set_title('Timesteps per Episode')
# axs[1].legend()
# axs[1].grid(True)

# plt.tight_layout()
# plt.show()