In [None]:
import random
import time
import pickle
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import copy
import os
from collections import deque
import matplotlib.pyplot as plt
from collections import namedtuple

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import gym
from gym import spaces

class MinesweeperEnv(gym.Env):
    def __init__(self, board_size=9, num_mines=10):
        super(MinesweeperEnv, self).__init__()

        self.board_size = board_size
        self.num_mines = num_mines

        self.action_space = spaces.Discrete(board_size * board_size)
        self.observation_space = spaces.Box(low=0, high=2, shape=(2, board_size, board_size), dtype=np.int64)

        self.reset()

    def reset(self):
        self.board = np.zeros((self.board_size, self.board_size), dtype=int)
        self.state = np.zeros((2, self.board_size, self.board_size), dtype=int)

        self.mines = np.random.choice(self.board_size * self.board_size, self.num_mines, replace=False)
        for mine in self.mines:
            x, y = divmod(mine, self.board_size)
            self.board[x, y] = -1

        for i in range(self.board_size):
            for j in range(self.board_size):
                if self.board[i, j] == -1:
                    continue
                count = 0
                for x in range(max(0, i - 1), min(self.board_size, i + 2)):
                    for y in range(max(0, j - 1), min(self.board_size, j + 2)):
                        if self.board[x, y] == -1:
                            count += 1
                self.board[i, j] = count

        self.done = False
        self.steps = 0
        self.total_reward = 0
        self.mine_hit = False
        self.first_click = True

        return self._get_observation()

    def step(self, action):
        x, y = divmod(action, self.board_size)
        if self.first_click:
            if self.board[x, y] == -1:
                self._relocate_mine(x, y)
            self.first_click = False

        if self.state[0, x, y] == 1:
            reward = -1
            done = False
        elif self.board[x, y] == -1:
            self.state[0, x, y] = 1
            reward = -10
            self.mine_hit = True
            done = True
        else:
            self.reveal_cells(x, y)
            reward = 1
            done = self.check_done()

            if done and not self.mine_hit:
                reward = 10

        self.total_reward += reward
        self.steps += 1

        return self._get_observation(), reward, done, {}

    def _relocate_mine(self, x, y):
        self.board[x, y] = 0
        possible_positions = set(range(self.board_size * self.board_size)) - set(self.mines)
        if x * self.board_size + y in possible_positions:
            possible_positions.remove(x * self.board_size + y)
        new_mine_position = np.random.choice(list(possible_positions))
        new_x, new_y = divmod(new_mine_position, self.board_size)
        self.board[new_x, new_y] = -1
        self.mines = [m for m in self.mines if m != x * self.board_size + y]
        self.mines.append(new_mine_position)
        self._update_board_counts()

    def _update_board_counts(self):
        for i in range(self.board_size):
            for j in range(self.board_size):
                if self.board[i, j] == -1:
                    continue
                count = 0
                for x in range(max(0, i - 1), min(self.board_size, i + 2)):
                    for y in range(max(0, j - 1), min(self.board_size, j + 2)):
                        if self.board[x, y] == -1:
                            count += 1
                self.board[i, j] = count

    def reveal_cells(self, x, y):
        stack = [(x, y)]
        while stack:
            cx, cy = stack.pop()
            if cx < 0 or cx >= self.board_size or cy < 0 or cy >= self.board_size:
                continue
            if self.state[0, cx, cy] == 1:
                continue

            self.state[0, cx, cy] = 1
            self.state[1, cx, cy] = self.board[cx, cy]

            if self.board[cx, cy] == 0:
                for dx in range(-1, 2):
                    for dy in range(-1, 2):
                        if dx != 0 or dy != 0:
                            stack.append((cx + dx, cy + dy))

    def _get_observation(self):
        norm_state = self.state.astype(np.float32)
        norm_state[1, :, :] = norm_state[1, :, :] / 8.0
        return norm_state

    def check_done(self):
        unopened_cells = np.sum(self.state[0, :, :] == 0)
        if unopened_cells == self.num_mines:
            return True
        return False

    def render(self, state=None):
        if state is None:
            state = self.state

        render_state = np.full(shape=(self.board_size, self.board_size), fill_value=".")

        for i in range(self.board_size):
            for j in range(self.board_size):
                if state[0, i, j] == 0:
                    render_state[i, j] = "."
                elif self.board[i, j] == -1:
                    render_state[i, j] = "M"
                else:
                    render_state[i, j] = str(self.board[i, j])

        render_state = pd.DataFrame(render_state)
        render_state = render_state.style.applymap(self.render_color)
        display(render_state)

    def render_answer(self):
        render_state = np.full(shape=(self.board_size, self.board_size), fill_value=".")

        for i in range(self.board_size):
            for j in range(self.board_size):
                if self.board[i, j] == -1:
                    render_state[i, j] = "M"
                else:
                    render_state[i, j] = str(self.board[i, j])

        render_state = pd.DataFrame(render_state)
        render_state = render_state.style.applymap(self.render_color)
        display(render_state)

    def render_color(self, var):
        color = {
            '0': 'black', '1': "skyblue", '2': 'lightgreen', '3': 'red', '4': 'violet',
            '5': 'brown', '6': 'turquoise', '7': 'grey', '8': 'black', 'M': 'white', '.': 'black'
        }
        return f"color: {color[var]}"

class CNN(nn.Module):
    def __init__(self, state_shape, action_size):
        super(CNN, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=2, out_channels=16, kernel_size=(3, 3), stride=1, padding=2)
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv2 = nn.Conv2d(in_channels=16, out_channels=32, kernel_size=(3, 3), stride=1, padding=1)
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)

        conv_output_size = self._get_conv_output(state_shape)
        self.fc1 = nn.Linear(conv_output_size, 256)
        self.fc2 = nn.Linear(256, action_size)

    def _get_conv_output(self, shape):
        input = torch.rand(1, *shape)
        output = self.pool1(torch.relu(self.conv1(input)))
        output = self.pool2(torch.relu(self.conv2(output)))
        return int(np.prod(output.size()))

    def forward(self, x):
        x = self.pool1(torch.relu(self.conv1(x)))
        x = self.pool2(torch.relu(self.conv2(x)))
        x = x.view(x.size(0), -1)
        x = torch.relu(self.fc1(x))
        x = self.fc2(x)
        return x

MEM_SIZE_MAX = 50000
MEM_SIZE_MIN = 1000
BATCH_SIZE = 64

LEARNING_RATE = 0.001
DISCOUNT = 0.1

EPSILON = 1.0
EPSILON_DECAY = 0.995
EPSILON_MIN = 0.01

UPDATE_TARGET_EVERY = 5
EPISODES = 10000
MAX_STEPS = 71

class DQNAgent:
    def __init__(self, state_shape, action_size):
        self.state_shape = state_shape
        self.action_size = action_size
        self.memory = deque(maxlen=MEM_SIZE_MAX)

        self.model = CNN(state_shape, action_size).cuda()
        self.target_model = CNN(state_shape, action_size).cuda()
        self.target_model.load_state_dict(self.model.state_dict())
        self.target_model.eval()

        self.optimizer = optim.Adam(self.model.parameters(), lr=LEARNING_RATE)
        self.loss_fn = nn.MSELoss()

        self.epsilon = EPSILON
        self.losses = []

    def update_epsilon(self):
        if self.epsilon > EPSILON_MIN:
            self.epsilon *= EPSILON_DECAY

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state):
        state = self.normalize_state(state)
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)
        state = torch.FloatTensor(state).unsqueeze(0).cuda()
        with torch.no_grad():
            act_values = self.model(state)
        return torch.argmax(act_values).item()

    def replay(self):
        if len(self.memory) < MEM_SIZE_MIN:
            return

        minibatch = random.sample(self.memory, BATCH_SIZE)

        states = torch.FloatTensor([self.normalize_state(m[0]) for m in minibatch]).cuda()
        actions = torch.LongTensor([m[1] for m in minibatch]).cuda()
        rewards = torch.FloatTensor([m[2] for m in minibatch]).cuda()
        next_states = torch.FloatTensor([self.normalize_state(m[3]) for m in minibatch]).cuda()
        dones = torch.FloatTensor([m[4] for m in minibatch]).cuda()

        q_values = self.model(states).gather(1, actions.unsqueeze(1)).squeeze(1)
        next_q_values = self.target_model(next_states).max(1)[0]
        target_q_values = rewards + (DISCOUNT * next_q_values * (1 - dones))

        loss = self.loss_fn(q_values, target_q_values.detach())
        self.losses.append(loss.item())
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

    def update_target_model(self):
        self.target_model.load_state_dict(self.model.state_dict())

    def normalize_state(self, state):
        norm_state = state.astype(np.float32)
        norm_state[1, :, :] = norm_state[1, :, :] / 8.0
        return norm_state

def plot_metrics(episode_list, avg_rewards, avg_steps, success_rates, avg_loss):
    plt.figure(figsize=(20, 12))

    plt.subplot(2, 2, 1)
    plt.plot(episode_list, avg_rewards, label='Average Reward')
    plt.xlabel('Episode')
    plt.ylabel('Average Reward')
    plt.title('Average Reward over Episodes')
    plt.legend()

    plt.subplot(2, 2, 2)
    plt.plot(episode_list, avg_steps, label='Average Steps', color='orange')
    plt.xlabel('Episode')
    plt.ylabel('Average Steps')
    plt.title('Average Steps over Episodes')
    plt.legend()

    plt.subplot(2, 2, 3)
    plt.plot(episode_list, success_rates, label='Success Rate', color='green')
    plt.xlabel('Episode')
    plt.ylabel('Success Rate (%)')
    plt.title('Success Rate over Episodes')
    plt.legend()

    plt.subplot(2, 2, 4)
    plt.plot(episode_list, avg_loss, label='Average Loss', color='red')
    plt.xlabel('Episode')
    plt.ylabel('Average Loss')
    plt.title('Average Loss over Episodes')
    plt.legend()

    plt.tight_layout()
    plt.show()

env = MinesweeperEnv()
state_shape = env.observation_space.shape
action_size = env.action_space.n

agent = DQNAgent(state_shape, action_size)

episode_rewards = []
total_steps = []
success_rates = []
episode_list = []
losses = []
avg_rewards = []
avg_steps = []
avg_loss = []

success_count = 0

for episode in range(EPISODES):
    state = env.reset()
    total_reward = 0
    done = False
    steps = 0

    while not done and steps < MAX_STEPS:
        action = agent.act(state)
        next_state, reward, done, _ = env.step(action)
        total_reward += reward

        agent.remember(state, action, reward, next_state, done)
        state = next_state

        agent.replay()
        steps += 1

    agent.update_epsilon()
    total_steps.append(steps)
    episode_rewards.append(total_reward)

    if episode % UPDATE_TARGET_EVERY == 0:
        agent.update_target_model()

    if done and (reward == 10 or not env.mine_hit):
        success_count += 1

    if (episode + 1) % 100 == 0:
        avg_reward_last_100 = np.mean(episode_rewards[-100:])
        avg_steps_last_100 = np.mean(total_steps[-100:])
        success_rate = (success_count / (episode + 1)) * 100

        avg_rewards.append(avg_reward_last_100)
        avg_steps.append(avg_steps_last_100)
        success_rates.append(success_rate)
        episode_list.append(episode + 1)

        avg_loss_last_100 = np.mean(agent.losses[-100:])
        avg_loss.append(avg_loss_last_100)

        print(f"Episode: {episode + 1}, Average Reward : {avg_reward_last_100:.2f}, Average Steps: {avg_steps_last_100:.2f}, Success Rate: {success_rate:.2f}%, Avg Loss: {avg_loss_last_100:.4f}, Epsilon: {agent.epsilon:.4f}")

plot_metrics(episode_list, avg_rewards, avg_steps, success_rates, avg_loss)

agent.epsilon = 0.0
success_count = 0

episodes = 1000

for episode in range(episodes):
    state = env.reset()
    total_reward = 0
    done = False
    steps = 0

    while not done and steps < MAX_STEPS:
        action = agent.act(state)
        next_state, reward, done, _ = env.step(action)
        total_reward += reward
        state = next_state
        steps += 1

    success = done and (reward == 10 or not env.mine_hit)
    if success:
        success_count += 1

    print(f"Episode {episode + 1}: Success: {success}, Steps: {steps}, Total Reward: {total_reward:.2f}, Epsilon: {agent.epsilon:.2f}")

success_rate = (success_count / episodes) * 100
print(f"Success Rate: {success_rate:.2f}%")
