This is the demo by Yang Rui on Github.
url: https://github.com/YangRui2015/2048_env

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


#Part 1: utils

In [3]:
import numpy as np
import matplotlib.pyplot as plt
import os
import shutil
import time


BASE_OUTPUT_PATH = './drive/MyDrive/outputs/'
LATEST_MODEL_RECORD_FILE = 'THE_LAST_MODEL'


def log2_shaping(s, divide=16):
    s = np.log2(1 + s) / divide
    return s


def check_path_exist(path, verbose=True):
    if not os.path.exists(path):
        os.mkdir(path)
        if verbose:
            print("make the dir {} finished".format(path))
    else:
        if verbose:
            print("the directory {} already exists".format(path))


def running_average(lis, length=5):
    if len(lis) > 10:
        end = len(lis) // length
        lis = lis[:end * length]
        arr = np.array(lis).reshape(-1, length)
        arr = arr.mean(axis=1)

        return list(arr.reshape(-1))
    else:
        return lis


def plot_save(lis, path, title=None, x_label=None, y_label=None):
    dir = path.split("/")[:-1]
    dir = "/".join(dir) + "/"
    check_path_exist(dir, verbose=False)
    plt.figure()
    if type(lis[0]) == list:
        for li in lis:
            plt.plot(li)
    else:
        plt.plot(lis)

    if title:
        plt.title(title)
    if x_label:
        plt.xlabel(x_label)
    if y_label:
        plt.ylabel(y_label)

    plt.savefig(path)
    plt.close("all")


def del_dir_tree(path):
    if os.path.exists(path):
        try:
            shutil.rmtree(path)
        except:
            print("remove path {} failed!".format(path))


def del_files(path):
    if os.path.isdir(path):
        files = os.listdir(path)
        for file in files:
            os.remove(os.path.join(path, file))
        print("Remove files in {}".format(path))
    elif os.path.isfile(path):
        os.remove(path)
        print("Remove file {}".format(path))
    else:
        print("{} not a file or a directory".format(path))


class Perfomance_Saver():
    '''目前先支持txt'''

    def __init__(self, path='performance_data.txt'):
        self.path = BASE_OUTPUT_PATH + path
        self.clear_file()

    def clear_file(self):
        if os.path.exists(self.path): 
          with open(self.path, 'w') as file:
              file.write('clear since :{}\n\n'.format(time.ctime()))
          print("clear file finished")

    def save(self, performance_list, info):
        with open(self.path, 'a+') as file:
            file.writelines("time: {}\n".format(time.ctime()))
            file.writelines("info: {} \n".format(str(info)))
            performance_str = [str(x) + " " for x in performance_list]
            file.writelines(performance_str)
            file.writelines('\n\n')
        print('write to file finished')


class Model_Saver():
    '''存一定数量高分模型，防止模型存过多'''

    def __init__(self, num=10):
        self.num_max = num
        self.path_list = []

    def save(self, filename):
        if len(self.path_list) >= self.num_max:
            os.remove(self.path_list.pop(0))
            print('del surplus modle files')
        path_model = BASE_OUTPUT_PATH + filename
        self.path_list.append(path_model)

        path_latest_record = BASE_OUTPUT_PATH + LATEST_MODEL_RECORD_FILE
        with open(path_latest_record, 'w') as file:
            file.writelines(filename)

    def read_latest_model_path(self):
        path_latest_record = BASE_OUTPUT_PATH + LATEST_MODEL_RECORD_FILE
        if os.path.exists(path_latest_record):
            with open(path_latest_record, 'r') as file:
                lines = file.readlines()
                if len(lines) >= 1:
                    self.path_list.append(BASE_OUTPUT_PATH + lines[len(lines) - 1])
                    return lines[len(lines) - 1]
                else:
                    return None
        else:
            return None


#Part 2: NN_module

In [4]:
import torch.nn as nn
import torch.nn.functional as F


# CNN网络
class CNN_Net(nn.Module):
    def __init__(self, input_len, output_num, conv_size=(32, 64), fc_size=(1024, 128), out_softmax=False):
        super(CNN_Net, self).__init__()
        self.input_len = input_len
        self.output_num = output_num
        self.out_softmax = out_softmax 

        self.conv1 = nn.Sequential(
            nn.Conv2d(1, conv_size[0], kernel_size=3, stride=1, padding=1),
            # nn.BatchNorm2d(32),
            nn.ReLU(inplace=True)
        )
        self.conv2 = nn.Sequential(
            nn.Conv2d(conv_size[0], conv_size[1], kernel_size=3, stride=1, padding=1),
            # nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            # nn.MaxPool2d(kernel_size=2, stride=2)
        )
        
        self.fc1 = nn.Linear(conv_size[1] * self.input_len * self.input_len, fc_size[0])
        self.fc2 = nn.Linear(fc_size[0], fc_size[1])
        self.head = nn.Linear(fc_size[1], self.output_num)

    def forward(self, x):
        x = x.reshape(-1,1,self.input_len, self.input_len)
        x = self.conv1(x)
        x = self.conv2(x)
        x = x.view(x.size(0), -1)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))

        output = self.head(x)
        if self.out_softmax:
            output = F.softmax(output, dim=1)   #值函数估计不应该有softmax
        return output

#Part 3: Buffer_module

In [5]:
import numpy as np


class SumTree(object):
    data_pointer = 0

    def __init__(self, capacity):
        self.capacity = capacity  # for all priority values
        self.tree = np.zeros(2 * capacity - 1)
        # [--------------Parent nodes-------------][-------leaves to recode priority-------]
        #             size: capacity - 1                       size: capacity
        self.data = np.zeros(capacity, dtype=object)  # for all transitions，格式是对象，相当于指针
        # [--------------data frame-------------]
        #             size: capacity

    def add(self, p, data):
        tree_idx = self.data_pointer + self.capacity - 1  # 在树的叶子节点的位置
        self.data[self.data_pointer] = data  # update data_frame
        self.update(tree_idx, p)  # update tree_frame

        self.data_pointer += 1
        if self.data_pointer >= self.capacity:  # replace when exceed the capacity
            self.data_pointer = 0

    def update(self, tree_idx, p):
        change = p - self.tree[tree_idx]
        self.tree[tree_idx] = p
        # then propagate the change through tree
        while tree_idx != 0:  # this method is faster than the recursive loop in the reference code
            tree_idx = (tree_idx - 1) // 2
            self.tree[tree_idx] += change

    def get_leaf(self, v):
        """
        Tree structure and array storage:

        Tree index:
             0         -> storing priority sum
            / \
          1     2
         / \   / \
        3   4 5   6    -> storing priority for transitions

        Array type for storing:
        [0,1,2,3,4,5,6]
        """
        parent_idx = 0
        while True:  # the while loop is faster than the method in the reference code
            cl_idx = 2 * parent_idx + 1  # this leaf's left and right kids
            cr_idx = cl_idx + 1
            if cl_idx >= len(self.tree):  # reach bottom, end search
                leaf_idx = parent_idx
                break
            else:  # downward search, always search for a higher priority node
                if v <= self.tree[cl_idx]:
                    parent_idx = cl_idx
                else:
                    v -= self.tree[cl_idx]
                    parent_idx = cr_idx

        data_idx = leaf_idx - self.capacity + 1
        return leaf_idx, self.tree[leaf_idx], self.data[data_idx]

    @property
    def total_p(self):
        return self.tree[0]  # the root

class Buffer_PER(object):  # stored as ( s, a, r, s_ ) in SumTree
    epsilon = 0.01  # small amount to avoid zero priority
    alpha = 0.6  # [0~1] convert the importance of TD error to priority
    beta = 0.4  # importance-sampling, from initial value increasing to 1
    beta_increment_per_sampling = 0.001
    abs_err_upper = 1.  # clipped abs error

    def __init__(self, capacity):
        self.tree = SumTree(capacity)

    def store(self, transition):
        max_p = np.max(self.tree.tree[-self.tree.capacity:])
        if max_p == 0:
            max_p = self.abs_err_upper
        self.tree.add(max_p, transition)  # set the max p for new p

    def sample(self, n):
        b_idx, b_memory, ISWeights = np.empty((n,), dtype=np.int32), np.empty((n, self.tree.data[0].size)), np.empty(
            (n, 1))
        pri_seg = self.tree.total_p / n  # priority segment
        self.beta = np.min([1., self.beta + self.beta_increment_per_sampling])  # max = 1

        min_prob = np.min(self.tree.tree[-self.tree.capacity:]) / self.tree.total_p  # for later calculate ISweight
        for i in range(n):
            a, b = pri_seg * i, pri_seg * (i + 1)
            v = np.random.uniform(a, b)
            idx, p, data = self.tree.get_leaf(v)
            prob = p / self.tree.total_p
            ISWeights[i, 0] = np.power(prob / min_prob, -self.beta)
            b_idx[i], b_memory[i, :] = idx, data

        return b_idx, b_memory, ISWeights

    def batch_update(self, tree_idx, abs_errors):
        abs_errors += self.epsilon  # convert to abs and avoid 0
        clipped_errors = np.minimum(abs_errors, self.abs_err_upper)
        ps = np.power(clipped_errors, self.alpha)
        for ti, p in zip(tree_idx, ps):
            self.tree.update(ti, p)

class Buffer():
  def __init__(self, n_features, buffer_type='', capacity=1e4):
      self.memory_size = capacity
      self.n_features = n_features
      self.type = buffer_type
      self.memory_counter = 0

      if self.type == 'priority':
          self.memory = Buffer_PER(capacity=capacity)
      else:
          self.memory = np.zeros((self.memory_size, n_features * 2 + 2))

  def store(self, transition):
      self.memory_counter += 1

      if self.type == 'priority':
          self.memory.store(transition)
      else:
          index = self.memory_counter % self.memory_size
          self.memory[index, :] = transition

  def sample(self, batch_size):
      info = None
      if self.type == 'priority':
          tree_idx, batch_memory, ISWeights = self.memory.sample(batch_size)
          info = (tree_idx, ISWeights)
      else:
          sample_index = np.random.choice(self.memory_size, size=batch_size)  # 考虑buffer已先填满
          batch_memory = self.memory[sample_index, :]

      return batch_memory, info

  def update(self, tree_idx, td_errors):
      assert self.type == 'priority'
      self.memory.batch_update(tree_idx, td_errors)


#Part 4: dqn_agent

In [6]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np


class DQN():
    batch_size = 128
    lr = 1e-4
    epsilon = 0.15   
    memory_capacity = int(1e4)
    gamma = 0.99
    q_network_iteration = 200
    save_path = "./drive/MyDrive/outputs/"
    soft_update_theta = 0.1
    clip_norm_max = 1
    train_interval = 5
    conv_size = (32, 64)   # num filters
    fc_size = (512, 128)

    def __init__(self, num_state, num_action, enable_double=True, enable_priority=True):
        super(DQN, self).__init__()
        self.num_state = num_state
        self.num_action = num_action
        self.state_len = int(np.sqrt(self.num_state))
        self.enable_double = enable_double
        self.enable_priority = enable_priority

        self.eval_net, self.target_net = CNN_Net(self.state_len, num_action,self.conv_size, self.fc_size), CNN_Net(self.state_len, num_action, self.conv_size, self.fc_size)

        self.learn_step_counter = 0
        self.buffer = Buffer(self.num_state, 'priority', self.memory_capacity)  
        self.initial_epsilon = self.epsilon
        self.optimizer = torch.optim.Adam(self.eval_net.parameters(), lr=self.lr)

    def select_action(self, state, random=False, deterministic=False):
        state = torch.unsqueeze(torch.FloatTensor(state), 0)
        if not random and np.random.random() > self.epsilon or deterministic:  # greedy policy
            action_value = self.eval_net.forward(state)
            action = torch.max(action_value.reshape(-1, 4), 1)[1].data.numpy()
        else:  # random policy
            action = np.random.randint(0, self.num_action)
        return action

    def store_transition(self, state, action, reward, next_state):
        state = state.reshape(-1)
        next_state = next_state.reshape(-1)

        transition = np.hstack((state, [action, reward], next_state))
        self.buffer.store(transition)

    def save(self, path=None, name='dqn_net.pkl'):
        path = self.save_path if not path else path
        check_path_exist(path)
        torch.save(self.eval_net.state_dict(), path + name)

    def load(self, path=None, name='dqn_net.pkl'):
        path = self.save_path if not path else path
        self.eval_net.load_state_dict(torch.load(path + name))

    def epsilon_decay(self, episode, total_episode):
        self.epsilon = self.initial_epsilon * (1 - episode / total_episode)

    def update(self):
        # soft update the parameters
        if self.learn_step_counter % self.q_network_iteration == 0 and self.learn_step_counter:
            for p_e, p_t in zip(self.eval_net.parameters(), self.target_net.parameters()):
                p_t.data = self.soft_update_theta * p_e.data + (1 - self.soft_update_theta) * p_t.data

        self.learn_step_counter += 1

        # sample batch from memory
        if self.enable_priority:
            batch_memory, (tree_idx, ISWeights) = self.buffer.sample(self.batch_size)
        else:
            batch_memory, _ = self.buffer.sample(self.batch_size)

        batch_state = torch.FloatTensor(batch_memory[:, :self.num_state])
        batch_action = torch.LongTensor(batch_memory[:, self.num_state: self.num_state + 1].astype(int))
        batch_reward = torch.FloatTensor(batch_memory[:, self.num_state + 1: self.num_state + 2])
        batch_next_state = torch.FloatTensor(batch_memory[:, -self.num_state:])

        # q_eval
        q_eval = self.eval_net(batch_state).gather(1, batch_action)
        q_eval_next = self.eval_net(batch_next_state)
        q_target_next = self.target_net(batch_next_state).detach()

        if self.enable_double:
            q_eval_argmax = q_eval_next.max(1)[1].view(self.batch_size, 1)
            q_max = q_target_next.gather(1, q_eval_argmax).view(self.batch_size, 1)
        else:
            q_max = q_target_next.max(1)[0].view(self.batch_size, 1)
        q_target = batch_reward + self.gamma * q_max

        if self.enable_priority:
            abs_errors = (q_target - q_eval.data).abs()
            self.buffer.update(tree_idx, abs_errors)
            loss = (torch.FloatTensor(ISWeights) * (q_target - q_eval).pow(2)).mean()  # with importance sampling weight
            # loss = (q_target - q_eval).pow(2).mean()  # without importance sampling weight
        else:
            loss = F.mse_loss(q_eval, q_target)

        self.optimizer.zero_grad()
        loss.backward()
        nn.utils.clip_grad_norm_(self.eval_net.parameters(), self.clip_norm_max)
        self.optimizer.step()

        return loss


#Part 5: gym_2048

In [7]:
import gym
from gym import spaces
from gym.utils import seeding
import numpy as np
import itertools
import logging
from six import StringIO
import sys


def pairwise(iterable):
    "s -> (s0,s1), (s1,s2), (s2, s3), ..."
    a, b = itertools.tee(iterable)
    next(b, None)
    return zip(a, b)


class IllegalMove(Exception):
    pass


def stack(flat, layers=16):
    larray = []
    for i in range(1, layers + 1):
        ii = 2 ** i
        layer = np.copy(flat)
        layer[layer != ii] = 0
        layer[layer == ii] = 1
        # print("Layer")
        # print(layer)
        # print(layer.shape)
        larray.append(layer)

    newstack = np.stack(larray, axis=-1)
    return newstack


class Game2048Env(gym.Env):  # directions 0, 1, 2, 3 are up, right, down, left
    metadata = {'render.modes': ['human', 'ansi']}
    max_steps = 10000

    def __init__(self):
        # Definitions for game. Board must be square.
        self.size = 4
        self.w = self.size
        self.h = self.size
        self.squares = self.size * self.size

        # Maintain own idea of game score, separate from rewards
        self.score = 0

        # Members for gym implementation
        self.action_space = spaces.Discrete(4)
        # Suppose that the maximum tile is as if you have powers of 2 across the board.
        layers = self.squares
        self.observation_space = spaces.Box(0, 1, (self.w, self.h, layers), dtype=np.int)
        self.set_illegal_move_reward(0.)
        self.set_max_tile(None)

        self.max_illegal = 10  # max number of illegal actions
        self.num_illegal = 0

        # Initialise seed
        self.seed()

        # # Reset ready for a game
        # self.reset()

    def _get_info(self, info=None):
        if not info:
            info = {}
        else:
            assert type(info) == dict, 'info should be of type dict!'

        info['highest'] = self.highest()
        info['score'] = self.score
        info['steps'] = self.steps
        return info

    def seed(self, seed=None):
        self.np_random, seed = seeding.np_random(seed)
        return [seed]

    def set_illegal_move_reward(self, reward):
        """Define the reward/penalty for performing an illegal move. Also need
            to update the reward range for this."""
        # Guess that the maximum reward is also 2**squares though you'll probably never get that.
        # (assume that illegal move reward is the lowest value that can be returned
        self.illegal_move_reward = reward
        self.reward_range = (self.illegal_move_reward, float(2 ** self.squares))

    def set_max_tile(self, max_tile):
        """Define the maximum tile that will end the game (e.g. 2048). None means no limit.
           This does not affect the state returned."""
        assert max_tile is None or isinstance(max_tile, int)
        self.max_tile = max_tile

    # Implement gym interface
    def step(self, action):
        """Perform one step of the game. This involves moving and adding a new tile."""
        logging.debug("Action {}".format(action))
        self.steps += 1
        score = 0
        done = None
        info = {
            'illegal_move': False,
        }
        try:
            score = float(self.move(action))
            self.score += score
            assert score <= 2 ** (self.w * self.h)
            self.add_tile()
            done = self.isend()
            reward = float(score)
        except IllegalMove as e:
            logging.debug("Illegal move")
            info['illegal_move'] = True
            if self.steps > self.max_steps:
                done = True
            else:
                done = False
            reward = self.illegal_move_reward
            self.num_illegal += 1
            if self.num_illegal >= self.max_illegal:  # exceed the maximum number of illegal actions
                done = True

        info = self._get_info(info)

        # Return observation (board state), reward, done and info dict
        return self.Matrix, reward, done, info

    def reset(self):
        self.Matrix = np.zeros((self.h, self.w), np.int)
        self.score = 0
        self.steps = 0
        self.num_illegal = 0

        logging.debug("Adding tiles")
        self.add_tile()
        self.add_tile()

        return self.Matrix, 0, False, self._get_info()

    def render(self, mode='human'):
        outfile = StringIO() if mode == 'ansi' else sys.stdout
        s = 'Score: {}\n'.format(self.score)
        s += 'Highest: {}\n'.format(self.highest())
        npa = np.array(self.Matrix)
        grid = npa.reshape((self.size, self.size))
        s += "{}\n\n".format(grid)
        outfile.write(s)
        return outfile

    # Implement 2048 game
    def add_tile(self):
        """Add a tile, probably a 2 but maybe a 4"""
        possible_tiles = np.array([2, 4])
        tile_probabilities = np.array([0.9, 0.1])
        val = self.np_random.choice(possible_tiles, 1, p=tile_probabilities)[0]
        empties = self.empties()
        assert empties.shape[0]
        empty_idx = self.np_random.choice(empties.shape[0])
        empty = empties[empty_idx]
        logging.debug("Adding %s at %s", val, (empty[0], empty[1]))
        self.set(empty[0], empty[1], val)

    def get(self, x, y):
        """Return the value of one square."""
        return self.Matrix[x, y]

    def set(self, x, y, val):
        """Set the value of one square."""
        self.Matrix[x, y] = val

    def empties(self):
        """Return a 2d numpy array with the location of empty squares."""
        return np.argwhere(self.Matrix == 0)

    def highest(self):
        """Report the highest tile on the board."""
        return np.max(self.Matrix)

    def move(self, direction, trial=False):
        """Perform one move of the game. Shift things to one side then,
        combine. directions 0, 1, 2, 3 are up, right, down, left.
        Returns the score that [would have] got."""
        if not trial:
            if direction == 0:
                logging.debug("Up")
            elif direction == 1:
                logging.debug("Right")
            elif direction == 2:
                logging.debug("Down")
            elif direction == 3:
                logging.debug("Left")

        changed = False
        move_score = 0
        dir_div_two = int(direction / 2)
        dir_mod_two = int(direction % 2)
        shift_direction = dir_mod_two ^ dir_div_two  # 0 for towards up left, 1 for towards bottom right

        # Construct a range for extracting row/column into a list
        rx = list(range(self.w))
        ry = list(range(self.h))

        if dir_mod_two == 0:
            # Up or down, split into columns
            for y in range(self.h):
                old = [self.get(x, y) for x in rx]
                (new, ms) = self.shift(old, shift_direction)
                move_score += ms
                if old != new:
                    changed = True
                    if not trial:
                        for x in rx:
                            self.set(x, y, new[x])
        else:
            # Left or right, split into rows
            for x in range(self.w):
                old = [self.get(x, y) for y in ry]
                (new, ms) = self.shift(old, shift_direction)
                move_score += ms
                if old != new:
                    changed = True
                    if not trial:
                        for y in ry:
                            self.set(x, y, new[y])
        if changed != True:
            raise IllegalMove

        return move_score

    def combine(self, shifted_row):
        """Combine same tiles when moving to one side. This function always
           shifts towards the left. Also count the score of combined tiles."""
        move_score = 0
        combined_row = [0] * self.size
        skip = False
        output_index = 0
        for p in pairwise(shifted_row):
            if skip:
                skip = False
                continue
            combined_row[output_index] = p[0]
            if p[0] == p[1]:
                combined_row[output_index] += p[1]
                move_score += p[0] + p[1]
                # Skip the next thing in the list.
                skip = True
            output_index += 1
        if shifted_row and not skip:
            combined_row[output_index] = shifted_row[-1]

        return (combined_row, move_score)

    def shift(self, row, direction):
        """Shift one row left (direction == 0) or right (direction == 1), combining if required."""
        length = len(row)
        assert length == self.size
        assert direction == 0 or direction == 1

        # Shift all non-zero digits up
        shifted_row = [i for i in row if i != 0]

        # Reverse list to handle shifting to the right
        if direction:
            shifted_row.reverse()

        (combined_row, move_score) = self.combine(shifted_row)

        # Reverse list to handle shifting to the right
        if direction:
            combined_row.reverse()

        assert len(combined_row) == self.size
        return (combined_row, move_score)

    def isend(self):
        """Has the game ended. Game ends if there is a tile equal to the limit
           or there are no legal moves. If there are empty spaces then there
           must be legal moves."""

        if self.max_tile is not None and self.highest() == self.max_tile:
            return True

        if self.steps >= self.max_steps:
            return True

        for direction in range(4):
            try:
                self.move(direction, trial=True)
                # Not the end if we can do any move
                return False
            except IllegalMove:
                pass
        return True

    def get_board(self):
        """Retrieve the whole board, useful for testing."""
        return self.Matrix

    def set_board(self, new_board):
        """Retrieve the whole board, useful for testing."""
        self.Matrix = new_board


#Part 6: main_dqn

In [None]:
import numpy as np
import time
import datetime
import re

train_episodes = 50000
test_episodes = 50
ifrender = False
ifprinteachepisode = False
ifevaluate = True
eval_interval = 25
epsilon_decay_interval = 100
log_interval = 5
backup_interval = 100


def train():
    episodes = train_episodes
    agent = DQN(num_state=16, num_action=4)
    env = Game2048Env()

    pf_saver = Perfomance_Saver()
    model_saver = Model_Saver(num=10)

    # If there is already a model, use it
    starting_episode = 0
    existing_model = model_saver.read_latest_model_path()
    if existing_model:
        matches = re.findall(r'\d+', existing_model)
        starting_episode = int(matches[0])
        agent.load(name=existing_model)

    eval_max_score = 0
    for i in range(starting_episode+1, episodes):
        state, reward, done, info = env.reset()
        state = log2_shaping(state)

        # start = time.time()
        loss = None
        while True:
            if ifrender:
                env.render()

            if agent.buffer.memory_counter <= agent.memory_capacity and starting_episode==0:
                action = agent.select_action(state, random=True)
            else:
                action = agent.select_action(state)

            next_state, reward, done, info = env.step(action)
            next_state = log2_shaping(next_state)
            reward = log2_shaping(reward, divide=1)

            agent.store_transition(state, action, reward, next_state)
            state = next_state

            if agent.buffer.memory_counter % agent.train_interval == 0 and agent.buffer.memory_counter > agent.memory_capacity:  # 相当于填满后才update
                loss = agent.update()

            if done:
                if i % log_interval == 0:
                    
                    if loss and ifprinteachepisode:
                        print(
                            'loss {0}, training progress {1}, episode reward {2}, episode steps {3}, highest {4}, epsilon {5}'.format(
                                loss, (i + 1) / episodes, info['score'], info['steps'], info['highest'], agent.epsilon))

                    loss = None

                if i % epsilon_decay_interval == 0:  # episilon decay
                    agent.epsilon_decay(i, episodes)
                break

        # end = time.time()
        # print('episode start time:{} end time: {} s\n'.format(start, end))

        # eval 
        if ifevaluate and i % eval_interval == 0 and i:
            eval_info = test(episodes=test_episodes, agent=agent)
            average_score, max_score, score_lis = eval_info['mean'], eval_info['max'], eval_info['list']

            pf_saver.save(score_lis, info=f'episode:{i}')

            # Save the highest score and every 10 episode.
            if i % backup_interval == 0 or int(average_score) > eval_max_score:
                name = 'dqn_{}.pkl'.format(i)
                agent.save(name=name)
                model_saver.save(name)
            
            datetime_object = datetime.datetime.now()
            print(
                'episolde{0} eval average score {1}, eval max socre {2}, current time: {3}'.format(
                 i, average_score, max_score, datetime_object))


def test(episodes=20, agent=None, load_path=None, ifrender=False, log=False):
    if agent is None:
        agent = DQN(num_state=16, num_action=4)
        if load_path:
            agent.load(load_path)

    env = Game2048Env()
    score_list = []
    highest_list = []

    for i in range(episodes):
        state, _, done, info = env.reset()
        state = log2_shaping(state)

        start = time.time()
        while True:
            action = agent.select_action(state, deterministic=True)
            next_state, _, done, info = env.step(action)
            next_state = log2_shaping(next_state)
            state = next_state

            if ifrender:
                env.render()

            if done:
                if log:
                    print(
                        'episode number {0}, episode reward {1}, episode steps {2}, highest {3}'.format(
                            i + 1, info['score'], info['steps'], info['highest']))
                break

        end = time.time()
        if log:
            print('episode time:{} s\n'.format(end - start))

        score_list.append(info['score'])
        highest_list.append(info['highest'])

    print('mean score:{}, mean highest:{}'.format(np.mean(score_list), np.mean(highest_list)))
    print('max score:{}, max hightest:{}'.format(np.max(score_list), np.max(highest_list)))
    result_info = {'mean': np.mean(score_list), 'max': np.max(score_list), 'list': score_list}
    return result_info


if __name__ == "__main__":
    # test(episodes=test_episodes, ifrender=ifrender)
    train()

Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations


clear file finished


  ary = asanyarray(ary)


mean score:1450.56, mean highest:146.72
max score:3060.0, max hightest:256
write to file finished
the directory ./drive/MyDrive/outputs/ already exists
episolde19375 eval average score 1450.56, eval max socre 3060.0, current time: 2022-11-09 01:01:50.070216
mean score:1656.88, mean highest:165.92
max score:3188.0, max hightest:256
write to file finished
the directory ./drive/MyDrive/outputs/ already exists
episolde19400 eval average score 1656.88, eval max socre 3188.0, current time: 2022-11-09 01:01:59.945048
