## Import Libs

In [None]:
import sys
import time
import math
import random
import logging
import argparse
import itertools
import functools
import numpy as np
from six import StringIO
from random import sample, randint
from collections import deque


import torch
import torch.nn as nn
import torch.optim as optim

from tkinter import *
import matplotlib.pyplot as plt

In [None]:
import torch
print(torch.cuda.is_available())  # True 表示 GPU 可用


## Args

In [None]:
device='cuda' if torch.cuda.is_available() else 'cpu'

parser = argparse.ArgumentParser()

parser.add_argument("--device", type=str, default=device)          #是否用CUDA

parser.add_argument("--learning_rate", type=float, default=0.001)  # 学习率
parser.add_argument("--gamma", type=float, default=0.99)           # 经验折扣率
parser.add_argument("--epochs", type=int, default=10000)              # 迭代多少局数

parser.add_argument("--buffer_size", type=int, default=10000)      # replaybuffer大小
parser.add_argument("--batch_size", type=int, default=128)         # batchsize大小

parser.add_argument("--pre_train_model", type=str, default=None)   # 是否加载预训练模型

parser.add_argument("--use_nature_dqn", type=bool, default=True)   # 是否采用nature dqn
parser.add_argument("--target_update_freq", type=int, default=250) # 如果采用nature dqn，target模型更新频率

parser.add_argument("--epsilon", type=float, default=0.999)          # 探索epsilon取值

args, _ = parser.parse_known_args()

args

## Game Logic

In [None]:
# Raise the illegal movement when agent does
class IllegalMove(Exception):
    pass


class Game2048:
    def __init__(self, size=4):
        self.size = size
        self.matrix = np.zeros((self.size, self.size), dtype=int)
        self.score = 0
        self.no_change_count = 0
        self.no_change_threshold = 2
#         self.reset()

    def reset(self):
        """初始化游戏并在两个随机位置添加初始数字"""
        self.new_game()
        self.add_tile()
        self.add_tile()
        self.score = 0

    def new_game(self):
        """创建一个空的矩阵"""
        self.matrix = np.zeros((self.size, self.size), dtype=int)

    def add_tile(self):
        """在矩阵中随机添加一个数字（2或4），使用最小值位置策略"""
        empty_cells, min_value, min_pos = self.find_empty_and_min()
        if len(empty_cells) == 0:
            return self.matrix

        # 找到离最小数字最近的空格子
        if np.array_equal(min_pos, (-1, -1)) or min_value != 2:
            index_pair = empty_cells[np.random.randint(len(empty_cells))]
        else:
            # 找到离最小数字最近的空格子

            distances = np.array([self.distance(pos, min_pos) for pos in empty_cells])
            index_pair = empty_cells[np.argmin(distances)]

        if min_value == 2:
            ran = np.random.random()  # 确保 ran 是标量
        else:
            ran = np.random.random() * int(math.log(max(self.score, 1), 2))  # 生成一个标量

        # 生成2或4
        new_value = 2 if ran < 0.9 else 4

        self.matrix[index_pair[0], index_pair[1]] = new_value


    def distance(self, pos1, pos2):
        """计算两个位置的曼哈顿距离"""
        return abs(pos1[0] - pos2[0]) + abs(pos1[1] - pos2[1])

    def find_empty_and_min(self, flag='all'):
        """查找空格子和最小值的位置"""
        empty_cells = np.argwhere(self.matrix == 0)
        non_zero_elements = self.matrix[self.matrix != 0]

        if non_zero_elements.size > 0:
            min_value = np.min(non_zero_elements)
            min_pos = np.argwhere(self.matrix == min_value)[0]
        else:
            min_value, min_pos = float('inf'), (-1, -1)

        if flag == 'empty':
            return empty_cells
        elif flag == 'min':
            return min_value, min_pos
        elif flag == 'all':
            return empty_cells, min_value, min_pos
        else:
            raise ValueError(f"Invalid flag value: {flag}. Must be 'empty', 'min', or 'all'.")

    def slide_and_combine(self, row):
        """将一行的数字先滑动，然后合并，再次滑动"""
        # 移除0并将数字靠一边
        non_zero = row[row != 0]
        new_row = np.zeros_like(row)

        # 合并相邻相同的数字
        skip = False
        idx = 0
        for i in range(len(non_zero)):
            if skip:
                skip = False
                continue
            if i + 1 < len(non_zero) and non_zero[i] == non_zero[i + 1]:
                new_row[idx] = 2 * non_zero[i]
                self.score += new_row[idx]
                skip = True
            else:
                new_row[idx] = non_zero[i]
            idx += 1

        return new_row

    def move_left(self):
        """左滑操作"""
        for i in range(self.size):
            self.matrix[i] = self.slide_and_combine(self.matrix[i])

    def move_right(self):
        """右滑操作"""
        for i in range(self.size):
            self.matrix[i] = np.flip(self.slide_and_combine(np.flip(self.matrix[i])))

    def move_up(self):
        """上滑操作"""
        self.matrix = np.transpose(self.matrix)
        self.move_left()  # 上滑等效于左滑转置
        self.matrix = np.transpose(self.matrix)

    def move_down(self):
        """下滑操作"""
        self.matrix = np.transpose(self.matrix)
        self.move_right()  # 下滑等效于右滑转置
        self.matrix = np.transpose(self.matrix)



    def move(self, direction):

        # 保存执行动作前的分数
        previous_score = self.score
        previous_matrix = self.matrix.copy()

        if direction == 0:  # 上
            self.move_up()
        elif direction == 1:  # 下
            self.move_down()
        elif direction == 2:  # 左
            self.move_left()
        elif direction == 3:  # 右
            self.move_right()

        # Raise exception if movement is illegal
        if np.array_equal(self.matrix, previous_matrix):
            raise IllegalMove

        if np.array_equal(self.matrix, previous_matrix):
            self.no_change_count += 1  # 增加未变化计数
        else:
            self.no_change_count = 0  # 重置计数

        # 判断是否达到非法动作的阈值
        if self.score > 100:
          self.no_change_threshold = 2

        if self.no_change_count >= self.no_change_threshold:
            raise IllegalMove

            #这里有个问题在于有时候我们需要等一回合会增加新的tile，而不是直接结束，增加一个counter，连续两次视为illegal

        #calculate the score
        reward = self.score - previous_score
        return reward


    def is_game_over(self):
        """判断游戏是否结束，返回 True 表示游戏结束，False 表示游戏未结束"""
        # 检查是否有空格子
        if np.any(self.matrix == 0):
            return False

        # 检查相邻元素是否可以合并
        for i in range(self.size):
            for j in range(self.size - 1):
                if self.matrix[i, j] == self.matrix[i, j + 1] or self.matrix[j, i] == self.matrix[j + 1, i]:
                    return False

        # 如果没有可合并的格子，游戏结束
        return True

In [None]:


class GameGrid(Game2048, Frame):
    def __init__(self, master=None):
        Game2048.__init__(self)
        self.reset()
        self.set_illegal_move_reward(-100)


    def set_illegal_move_reward(self,reward):
        self.illegal_move_reward = reward

    def step(self, action):


        try:
            # 执行动作（0=上, 1=下, 2=左, 3=右）
            reward = self.move(direction=action)

            # 判断游戏是否结束
            done = self.is_game_over()

            # 如果游戏还没结束，添加一个新 tile
            if not done:
                self.add_tile()

        except IllegalMove:
            done = True
            reward = self.illegal_move_reward

        # 返回新的状态、奖励和游戏是否结束
        next_state = self.matrix.copy()
        return next_state, reward, done

    def reset(self):

        super().reset()

        return self.matrix



DQN


目前的问题：
- DQN全部为全连接层，不具备卷积能力，对图像捕捉能力不足 -v 通过修改DQN类实现了三个卷积层堆叠
- Replay Buffer完全没有，memory靠随机获得
- penalty不足，惩罚和跳出机制不足 -v 目前通过重写game2048以及gamegrid的逻辑实现了非法操作的惩罚机制

目前的进度Nov05 2024：
 - nature DQN 尚不完全
 - replay buffer尚不完全
 - nature dqn的learn尚不完全
 - rainbow dqn没有实现


我记得model会因为误判导致被惩罚，penalty的触发貌似有一丢丢太宽泛了

0916 Model 因为严重的penalty开始迅速收敛

In [None]:

  class ReplayBufferBase:
      def __init__(self, buffer_size):
          self.buffer_size = buffer_size
          self.size = 0
          self.pos = 0

      def add_transition(self, *args):
          raise NotImplementedError("This method should be implemented by subclasses.")

      def get_sample(self, sample_size, sample_priority=False):
          raise NotImplementedError("This method should be implemented by subclasses.")

      def update_priorities(self, indices, priorities):
          pass  # 基础类不需要实现优先级更新

  class SimpleReplayBuffer(ReplayBufferBase):
      def __init__(self, buffer_size):
          super().__init__(buffer_size)
          self.memory = deque(maxlen=buffer_size)

      def add_transition(self, s1, action, s2, done, reward):
          self.memory.append((s1, action, s2, done, reward))
          self.size = len(self.memory)  # 动态更新大小

      def get_sample(self, sample_size, sample_priority=False):
          sample = random.sample(self.memory, sample_size)
          s1, a, s2, done, r = zip(*sample)
          return np.array(s1), np.array(a), np.array(s2), np.array(done), np.array(r)

  class OptimizedReplayBuffer(ReplayBufferBase):
      def __init__(self, buffer_size, obs_space):
          super().__init__(buffer_size)
          self.s1 = np.zeros((buffer_size, *obs_space), dtype=np.float32)
          self.s2 = np.zeros((buffer_size, *obs_space), dtype=np.float32)
          self.a = np.zeros(buffer_size, dtype=np.int32)
          self.r = np.zeros(buffer_size, dtype=np.float32)
          self.done = np.zeros(buffer_size, dtype=np.float32)

      def add_transition(self, s1, action, s2, done, reward):
          self.s1[self.pos] = s1
          self.a[self.pos] = action
          self.s2[self.pos] = s2 if not done else np.zeros_like(s2)
          self.done[self.pos] = done
          self.r[self.pos] = reward
          self.pos = (self.pos + 1) % self.buffer_size
          self.size = min(self.size + 1, self.buffer_size)

      # def get_sample(self, sample_size, sample_priority=False):

      #     # indices = sample(range(0, self.size), sample_size)
      #     # return self.s1[indices], self.a[indices], self.s2[indices], self.done[indices], self.r[indices]

      def get_sample(self, sample_size):
          indices = np.random.choice(self.size, sample_size, replace=False)
          return (
              self.s1[indices][:, np.newaxis, :, :],  # 添加通道维度
              self.a[indices],
              self.s2[indices][:, np.newaxis, :, :],  # 添加通道维度
              self.done[indices],
              self.r[indices]
          )


      def get_buffer_size(self):
        return self.size


In [None]:
class DQN(nn.Module):
    def __init__(self, state_size, action_size=4):
        super(DQN, self).__init__()

        self.conv1 = nn.Conv2d(1, 128, kernel_size=2, stride=1)
        self.conv2 = nn.Conv2d(128, 64, kernel_size=2, stride=1)
        self.conv3 = nn.Conv2d(64, 16, kernel_size=2, stride=1)
        self.fc1 = nn.Linear(16, action_size)
        self.relu = nn.ReLU(inplace=True)

    def forward(self, x):

        x = self.relu(self.conv1(x))
        x = self.relu(self.conv2(x))
        x = self.relu(self.conv3(x))
        x = self.fc1(x.view(x.shape[0], -1))
        return x


写在ver4的留言：改进了学习方法，使得其从原本的单个样本学习变为批量学习，配合ver3主要改进的replaybuffer，以及q_eval的矢量化操作，理论上来说可以使得模型更好更快的学习

In [None]:
class RLAgent:
    def __init__(self, args ,obs_space, action_size):


        #self.state_size = np.prod(obs_space)
        self.state_size = 1

        self.action_size = action_size
        self.device = torch.device(args.device)

        # 将模型和目标模型移动到指定设备（GPU 或 CPU）
        self.model = DQN(state_size=self.state_size).to(self.device)
        self.target_model = DQN(state_size=self.state_size).to(self.device)  # 用于稳定训练的目标网络

        self.lr = args.learning_rate
        self.optimizer = optim.Adam(self.model.parameters(), lr=0.001)

        self.gamma = args.gamma  # 折扣因子
        self.epsilon = args.epsilon  # ε-greedy 的探索率
        self.epsilon_min = 0.1
        self.epsilon_decay = 0.99
        self.replay_buffer = OptimizedReplayBuffer(buffer_size=args.buffer_size,obs_space=obs_space)  # 用于存储经验回放
        self.learn_step_counter = 0
        self.args = args

    def act(self, state):
        if np.random.rand() <= self.epsilon:
            return np.random.choice(self.action_size)  # 随机选择动作

        # 确保 state 是 NumPy 数组或者 list，然后转换成 GPU 张量
        state_tensor = torch.tensor(state, dtype=torch.float32, device=self.device).unsqueeze(0).unsqueeze(0)  # 保持空间结构


        q_values = self.model(state_tensor)  # 在 GPU 上执行前向传播
        return torch.argmax(q_values).item()




    def remember(self, state, action, reward, next_state, done):
        self.replay_buffer.add_transition(state, action, next_state, done, reward)
        # self.memory.append((state, action, reward, next_state, done))


    def train(self, batch_size=64):
        if self.replay_buffer.get_buffer_size() < batch_size:
            return 0

        if self.learn_step_counter % args.target_update_freq == 0:
            self.target_model.load_state_dict(self.model.state_dict())
            self.learn_step_counter += 1


        s1, a, s2, done, r = self.replay_buffer.get_sample(batch_size)

        s1 = torch.FloatTensor(s1).to(self.device)  # 状态
        s2 = torch.FloatTensor(s2).to(self.device)  # 下一状态

        r = torch.FloatTensor(r).to(self.device)    # 奖励
        a = torch.LongTensor(a).to(self.device)     # 动作
        done = torch.FloatTensor(done).to(self.device)  # 是否终止

        # 遍历每个样本
        next_q_values = self.target_model(s2).detach()  # 使用目标网络
        target_q = r + self.gamma * (1 - done) * next_q_values.max(1)[0]  # 终止状态直接为 reward
        target_q = target_q.unsqueeze(1)  # 添加维度以匹配 eval_q 的形状

        # 计算评估 Q 值
        eval_q = self.model(s1).gather(1, a.unsqueeze(1))  # 提取与动作对应的 Q 值

        # 计算损失并更新模型
        loss = nn.MSELoss()(eval_q, target_q)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

        return loss.item()

目前来看，replay buffer的意义在于，我可以利用numpy更快的sample，反正目前用这个optimized的这个buffer好了


In [None]:
obs_space = (4,4)
action_size = 4

agent = RLAgent(args, obs_space=obs_space, action_size=action_size)  # RLAgent 的初始化，状态和动作空间设置
env = GameGrid()  # 游戏环境
all_rewards = []
all_losses = []

# 定义设备，优先使用 GPU，如果不可用则使用 CPU
device = args.device

# 假设你的 agent 是 RLAgent 类的实例，确保模型在 GPU 上
agent.model.to(device)
agent.target_model.to(device)

max_reward = 0
begin_t = time.time()

# 修改训练循环，将数据和模型移动到 GPU
for episode in range(args.epochs):  # 假设训练100个episode
    state = env.reset()
    state = torch.FloatTensor(state).to(device)  # 将状态转为张量并放到 GPU

    done = False
    total_loss = 0
    steps = 0


    while True:

        action = agent.act(state)  # 选择动作
        next_state, reward, done = env.step(action=action)  # 执行动作并获得下一状态、奖励

        next_state = torch.FloatTensor(next_state).to(device)  # 将下一状态转为张量并放到 GPU
        reward = torch.tensor(reward).to(device)  # 将奖励放到 GPU

        reward = (reward if not done else reward + env.score)  # 更新奖励

        # 存储经验用于经验回放
        agent.remember(state.cpu().numpy(), action, reward.cpu().item(), next_state.cpu().numpy(), done)

        # 训练Q网络，并返回当前损失
        loss = agent.train()
        total_loss += loss
        steps += 1

        # 更新状态
        state = next_state
        if done:
          print(f'Episode {episode}, Total loss: {total_loss},Current Reword: {reward}')
          if reward > max_reward:
              print(f'Current Max Reword: {reward}')
              max_reward = reward
              torch.save(agent.model, "2048.pt")
          break
print("finish! time cost is {}s".format(time.time() - begin_t))

