In [1]:
import os

import numpy as np
import cv2
from PIL import Image
import time
import pickle
import matplotlib.pyplot as plt
from matplotlib import style
from keras.models import Sequential
from keras.layers import Dense, Activation, Flatten, Conv2D
from keras.optimizers import Adam

from rl.agents.dqn import DQNAgent
from rl.callbacks import ModelIntervalCheckpoint
from rl.policy import BoltzmannQPolicy, LinearAnnealedPolicy, EpsGreedyQPolicy
from rl.memory import SequentialMemory

style.use('ggplot')
import keras.backend as K

dtype='float16'
K.set_floatx(dtype)

In [2]:
class Cube:
    def __init__(self, size):  # 随机初始化位置坐标
        self.size = size
        self.x = np.random.randint(0, self.size - 1)
        self.y = np.random.randint(0, self.size - 1)

    def __str__(self):
        return f'{self.x},{self.y}'

    def __sub__(self, other):
        return (self.x - other.x, self.y - other.y)

    def __eq__(self, other):
        return self.x == other.x and self.y == other.y

    def action(self, choise):
        if choise == 0:
            self.move(x=1, y=1)
        elif choise == 1:
            self.move(x=-1, y=1)
        elif choise == 2:
            self.move(x=1, y=-1)
        elif choise == 3:
            self.move(x=-1, y=-1)
        elif choise == 4:
            self.move(x=0, y=1)
        elif choise == 5:
            self.move(x=0, y=-1)
        elif choise == 6:
            self.move(x=1, y=0)
        elif choise == 7:
            self.move(x=-1, y=0)
        elif choise == 8:
            self.move(x=0, y=0)

    def move(self, x=False, y=False):
        if not x:
            self.x += np.random.randint(-1, 2)
        else:
            self.x += x
        if not y:
            self.y += np.random.randint(-1, 2)
        else:
            self.y += y

        if self.x < 0:
            self.x = 0
        if self.x > self.size - 1:
            self.x = self.size - 1
        if self.y < 0:
            self.y = 0
        if self.y > self.size - 1:
            self.y = self.size - 1


class envCube:
    SIZE = 10
    OBSERVATION_SPACE_VALUES = (SIZE, SIZE, 3)
    # OBSERVATION_SPACE_VALUES = (4,)
    ACTION_SPACE_VALUES = 9
    RETURN_IMAGE = True  # 考虑返回值是否图像

    FOOD_REWARD = 25  # agent获得食物的奖励
    ENEMY_PENALITY = -300  # 遇上对手的惩罚
    MOVE_PENALITY = -1  # 每移动一步的惩罚

    # 设定三个部分的颜色分别是蓝、绿、红
    d = {1: (255, 0, 0),  # blue
         2: (0, 255, 0),  # green
         3: (0, 0, 255)}  # red
    PLAYER_N = 1
    FOOD_N = 2
    ENEMY_N = 3

    # 环境重置
    def reset(self):
        self.player = Cube(self.SIZE)
        self.food = Cube(self.SIZE)
        self.enemy = Cube(self.SIZE)
        # 如果玩家和食物初始位置相同，重置食物的位置，直到位置不同
        while self.player == self.food:
            self.food = Cube(self.SIZE)
        # 如果敌人和玩家或食物的初始位置相同，重置敌人的位置，直到位置不同
        while self.player == self.enemy or self.food == self.enemy:
            self.enemy = Cube(self.SIZE)
        # 判断观测是图像和数字
        if self.RETURN_IMAGE:
            observation = np.array(self.get_image()) / 255
        else:
            observation = (self.player - self.food) + (self.player - self.enemy)

        self.episode_step = 0

        return observation

    def step(self, action):
        self.episode_step += 1
        self.player.action(action)
        self.enemy.move()
        self.food.move()
        # 判断观测是图像和数字
        if self.RETURN_IMAGE:
            new_observation = np.array(self.get_image()) / 255
        else:
            new_observation = (self.player - self.food) + (self.player - self.enemy)

        # 奖励
        if self.player == self.food:
            reward = self.FOOD_REWARD
        elif self.player == self.enemy:
            reward = self.ENEMY_PENALITY
        else:
            reward = self.MOVE_PENALITY

        done = False

        if self.player == self.food or self.player == self.enemy or self.episode_step >= 200:
            done = True

        return new_observation, reward, done, {}

    def render(self, mode='human'):
        img = self.get_image()
        img = img.resize((800, 800))
        cv2.imshow('Predator', np.array(img))
        cv2.waitKey(1)

    def get_image(self):
        env = np.zeros((self.SIZE, self.SIZE, 3), dtype=np.uint8)
        env[self.food.x][self.food.y] = self.d[self.FOOD_N]
        env[self.player.x][self.player.y] = self.d[self.PLAYER_N]
        env[self.enemy.x][self.enemy.y] = self.d[self.ENEMY_N]
        img = Image.fromarray(env, 'RGB')
        return img

    def get_qtable(self, q_table_name=None):
        # 初始化Q表格
        if q_table_name is None:  # 如果没有表格提供，就随机初始化一个Q表格
            q_table = {}
            for x1 in range(-self.SIZE + 1, self.SIZE):
                for y1 in range(-self.SIZE + 1, self.SIZE):
                    for x2 in range(-self.SIZE + 1, self.SIZE):
                        for y2 in range(-self.SIZE + 1, self.SIZE):
                            q_table[(x1, y1, x2, y2)] = [np.random.randint(-5, 0) for i in
                                                         range(self.ACTION_SPACE_VALUES)]
        else:  # 提供了，就使用提供的Q表格
            with open(q_table_name, 'rb') as f:
                q_table = pickle.load(f)
        return q_table



In [3]:
def build_model(status, nb_actions):
    model = Sequential()
    model.add(Conv2D(32, (3, 3), input_shape=(1,) + status))
    model.add(Activation('relu'))
    model.add(Conv2D(32, (3, 3)))
    model.add(Activation('relu'))
    model.add(Flatten())
    model.add(Dense(32))
    model.add(Activation('relu'))
    model.add(Dense(32))
    model.add(Activation('relu'))
    model.add(Dense(nb_actions, activation='linear'))
    return model


model = build_model(envCube.OBSERVATION_SPACE_VALUES, envCube.ACTION_SPACE_VALUES)
model.summary()
print(model.dtype)

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d (Conv2D)             (None, 1, 8, 8, 32)       896       
                                                                 
 activation (Activation)     (None, 1, 8, 8, 32)       0         
                                                                 
 conv2d_1 (Conv2D)           (None, 1, 6, 6, 32)       9248      
                                                                 
 activation_1 (Activation)   (None, 1, 6, 6, 32)       0         
                                                                 
 flatten (Flatten)           (None, 1152)              0         
                                                                 
 dense (Dense)               (None, 32)                36896     
                                                                 
 activation_2 (Activation)   (None, 32)                0

ValueError: The model cannot be compiled because it has no loss to optimize.

In [None]:
def build_agent(model, nb_actions):
    memory = SequentialMemory(limit=50000, window_length=1)
    policy = BoltzmannQPolicy()
    dqn = DQNAgent(model=model, nb_actions=nb_actions, memory=memory, nb_steps_warmup=1000,
                   enable_double_dqn=True, target_model_update=5000, policy=policy)
    dqn.compile(Adam(learning_rate=1e-3), metrics=['mae'])
    return dqn


def build_duel_agent(model, nb_actions):
    memory = SequentialMemory(limit=50000, window_length=1)
    policy = LinearAnnealedPolicy(EpsGreedyQPolicy(), attr='eps', value_max=1., value_min=.1, value_test=.05,
                                  nb_steps=500000)
    dqn = DQNAgent(model=model, nb_actions=nb_actions, memory=memory, nb_steps_warmup=1000,
                   enable_dueling_network=True, target_model_update=1e-2, dueling_type='avg', policy=policy)

    dqn.compile(Adam(learning_rate=1e-3))
    return dqn

In [None]:
env = envCube()
dqn = build_duel_agent(model, envCube.ACTION_SPACE_VALUES)

In [None]:
checkpoint_weights_filename = 'dqn_weights_{step}.h5f'
callbacks = [ModelIntervalCheckpoint(checkpoint_weights_filename, interval=10000)]

# Okay, now it's time to learn something! We visualize the training here for show, but this
# slows down training quite a lot. You can always safely abort the training prematurely using
# Ctrl + C.
dqn.fit(env, nb_steps=100000, visualize=False, verbose=1)

In [None]:
# After training is done, we save the final weights.
dqn.save_weights('duel_dqn_weights.h5f', overwrite=True)

In [None]:
dqn.load_weights('duel_dqn_weights.h5f')
# Finally, evaluate our algorithm for 5 episodes.
score = dqn.test(env, nb_episodes=5, visualize=True)
print(np.mean(score.history['episode_reward']))