In [2]:
import numpy as np
import sys
from keras.models import Sequential
from keras.layers import Dense
from collections import deque
import random

# 模拟环境PLE库 PyGame-Learning-Environment
from ple import PLE
from ple.games.flappybird import FlappyBird
import os

In [3]:
# 模拟环境PLE库 PyGame-Learning-Environment
from ple import PLE
from ple.games.flappybird import FlappyBird

# 定义智能体Agent
class Agent():
    def __init__(self, action_space):
        # 获得游戏支持的动作集合
        self.action_set = action_space
        # 创建q-table
        self.q_table = np.zeros((6, 6, 6, 2))
        # 学习率
        self.alpha = 0.7
        # 折现因子
        self.gamma = 0.8
        # 贪婪率
        self.greedy = 0.8

    """
        提取游戏state中我们需要的数据
        输入：
        1）state: 游戏state
        输出：转换为category进行返回
    """
    def get_state(self, state):
        return_state = np.zeros((3,), dtype=int)
        # x距离
        dist_to_pipe_horz = state["next_pipe_dist_to_player"]
        # y距离
        dist_to_pipe_bottom = state["player_y"] - state["next_pipe_top_y"]
        # 小鸟的速度
        velocity = state['player_vel']
        # 设置小鸟速度的等级
        if velocity < -15:
            velocity_category = 0
        elif velocity < -10:
            velocity_category = 1
        elif velocity < -5:
            velocity_category = 2
        elif velocity < 0:
            velocity_category = 3
        elif velocity < 5:
            velocity_category = 4
        else:
            velocity_category = 5

        # 设置小鸟高度等级
        if dist_to_pipe_bottom < 8:  # very close
            height_category = 0
        elif dist_to_pipe_bottom < 20:  # close
            height_category = 1
        elif dist_to_pipe_bottom < 50:  # not close
            height_category = 2
        elif dist_to_pipe_bottom < 125:  # mid
            height_category = 3
        elif dist_to_pipe_bottom < 250:  # far
            height_category = 4
        else:
            height_category = 5

        # 设置distance等级
        if dist_to_pipe_horz < 8:  # very close
            dist_category = 0
        elif dist_to_pipe_horz < 20:  # close
            dist_category = 1
        elif dist_to_pipe_horz < 50:  # not close
            dist_category = 2
        elif dist_to_pipe_horz < 125:  # mid
            dist_category = 3
        elif dist_to_pipe_horz < 250:  # far
            dist_category = 4
        else:
            dist_category = 5
        # 返回等级参数
        return_state[0] = height_category
        return_state[1] = dist_category
        return_state[2] = velocity_category
        return return_state

    """
        更新QTable
        old_state: 执行动作前的状态
        current_action: 执行的动作
        next_state: 执行动作后的状态
        r: 奖励
    """
    def update_q_table(self, old_state, current_action, next_state, r):
        # 得到下一个状态的最大值
        next_max_value = np.max(self.q_table[next_state[0], next_state[1], next_state[2]])
        # 更新QTable
        self.q_table[old_state[0], old_state[1], old_state[2], current_action] += \
            self.alpha * (r + next_max_value - self.q_table[old_state[0], old_state[1], old_state[2], current_action])

    """
        获得最佳的动作
        输入： 
        1） state
        2） greedy 是否使用ϵ-贪婪法
        输出：最佳action
    """
    def get_best_action(self, state, greedy=False):
        # 获得q值
        jump = self.q_table[state[0], state[1], state[2], 0]
        no_jump = self.q_table[state[0], state[1], state[2], 1]
        # 是否执行策略
        if greedy:
            if np.random.rand(1) < self.greedy:
                return np.random.choice([0, 1])
            else:
                if jump > no_jump:
                    return 0
                else:
                    return 1
        else:
            if jump > no_jump:
                return 0
            else:
                return 1

    # greedy随着训练次数增加，逐渐减少
    def update_greedy(self):
        self.greedy *= 0.95

    """
        执行动作
        输入：
        1）env: 通过env向游戏发出动作命令
        2）action: 动作
        输出：reward
    """
    def act(self, env, action):
        # action_set表示游戏动作集(119，None)，其中119代表跳跃
        r = env.act(self.action_set[action])
        if r == 0: # 没有死
            r = 1
        if r == 1: # 通过一个桶
            r = 10
        else: # game over
            r = -1000
        return r

if __name__ == "__main__":
    # 训练次数
    episodes = 2_0000_0000
    # 实例化游戏对象
    game = FlappyBird()
    # 模拟游戏接口
    #env = PLE(game, fps=30, display_screen=True, force_fps=False)
    env = PLE(game, fps=30, display_screen=True, force_fps=True)
    # 初始化
    env.init()
    # 实例化Agent，将动作集传参进去
    agent = Agent(env.getActionSet())
    max_score = 0

    for episode in range(episodes):
        # 重置游戏
        env.reset_game()
        # 获得状态
        state = agent.get_state(game.getGameState())
        # 新的一局，对greedy进行衰减
        agent.update_greedy()
        while True:
            # 获得最佳动作
            action = agent.get_best_action(state)
            # 执行action，计算reward
            reward = agent.act(env, action)
            # 执行action之后的state
            next_state = agent.get_state(game.getGameState())
            # 更新q-table
            agent.update_q_table(state, action, next_state, reward)
            # 获得当前分数
            current_score = env.score()
            state = next_state
            if env.game_over():
                max_score = max(current_score, max_score)
                print('Episodes: %s, Current score: %s, Max score: %s' % (episode, current_score, max_score))
                # 保存q-table
                if current_score > 300:
                    np.save("{}_{}.npy".format(current_score, episode), agent.q_table)
                break

Episodes: 0, Current score: -5.0, Max score: 0
Episodes: 1, Current score: -5.0, Max score: 0
Episodes: 2, Current score: -5.0, Max score: 0
Episodes: 3, Current score: -5.0, Max score: 0
Episodes: 4, Current score: -5.0, Max score: 0
Episodes: 5, Current score: -5.0, Max score: 0
Episodes: 6, Current score: -5.0, Max score: 0
Episodes: 7, Current score: -5.0, Max score: 0
Episodes: 8, Current score: -5.0, Max score: 0
Episodes: 9, Current score: -5.0, Max score: 0
Episodes: 10, Current score: -5.0, Max score: 0
Episodes: 11, Current score: -5.0, Max score: 0
Episodes: 12, Current score: -5.0, Max score: 0
Episodes: 13, Current score: -5.0, Max score: 0
Episodes: 14, Current score: -5.0, Max score: 0
Episodes: 15, Current score: -5.0, Max score: 0
Episodes: 16, Current score: -5.0, Max score: 0
Episodes: 17, Current score: -5.0, Max score: 0
Episodes: 18, Current score: -5.0, Max score: 0
Episodes: 19, Current score: -5.0, Max score: 0
Episodes: 20, Current score: -5.0, Max score: 0
Ep

SystemExit: 

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)
