# 蛇棋环境搭建

In [1]:
import numpy as np
import gym
from gym.spaces import Discrete

from contextlib import contextmanager
import time

@contextmanager
def timer(name):
    start = time.time()
    yield
    end = time.time()
    print('{} COST:{}'.format(name, end-start))

In [2]:
class SnakeEnv(gym.Env):
    SIZE=100
    
    def __init__(self, ladder_num, dices):
        self.ladder_num = ladder_num
        self.dices = dices
        self.observation_space = Discrete(self.SIZE+1)
        self.action_space = Discrete(len(dices))
        
        if ladder_num == 0:
            self.ladders = {0:0}
        else:
            # 处理梯子值，让梯子的数值无重复地反向赋值
            ladders = set(np.random.randint(1, self.SIZE, size=self.ladder_num*2))
            while len(ladders) < self.ladder_num*2:
                ladders.add(np.random.randint(1, self.SIZE))

            ladders = list(ladders)
            ladders = np.array(ladders)
            np.random.shuffle(ladders)
            ladders = ladders.reshape((self.ladder_num,2))

            re_ladders = list()
            for i in ladders:
                re_ladders.append([i[1],i[0]])

            re_ladders = np.array(re_ladders)
            # dict()可以把nx2维数组转化为字典形式
            self.ladders = dict(np.append(re_ladders, ladders, axis=0))
        print(f'ladders info:{self.ladders} dice ranges:{self.dices}')
        self.pos = 1
        
    def reset(self):
        self.pos = 1
        return self.pos
    
    def step(self, a):
        step = np.random.randint(1, self.dices[a]+1)
        self.pos += step
        if self.pos == 100:
            return 100, 100, 1, {}
        elif self.pos > 100:
            self.pos = 200 - self.pos
            
        if self.pos in self.ladders:
            self.pos = self.ladders[self.pos]
        return self.pos, -1, 0, {}
    
    def reward(self, s):
        if s == 100:
            return 100
        else:
            return -1
    
    # 无渲染
    def render(self):
        pass

# 智能体构建

In [3]:
class TableAgent(object):
    def __init__(self, env):
        self.s_len = env.observation_space.n
        self.a_len = env.action_space.n
        
        self.r = [env.reward(s) for s in range(0, self.s_len)]
        # 确定性策略
        self.pi = np.zeros(self.s_len, dtype=int)
        # A x S x S
        self.p = np.zeros([self.a_len, self.s_len, self.s_len], dtype=float)
        
        # 函数参数向量化，参数可以传入列表
        ladder_move = np.vectorize(lambda x: env.ladders[x] if x in env.ladders else x)
        
        # based-model 初始化表格所有位置的概率p[A,S,S]
        for i, dice in enumerate(env.dices):
            prob = 1.0 / dice
            for src in range(1, 100):
                # 因为arange只给一个数字的时候，是从0开始取到end-1，所以在此处+1
                step = np.arange(dice) + 1
                step += src
                step = np.piecewise(step, [step>100, step<=100], [lambda x: 200-x, lambda x: x])
                step = ladder_move(step)
                for dst in step:
                    # 在当前位置pos=src的情况下，采取i投掷色子的方式，得到最终位置dst
                    # 概率直接求和的方式是否合理？
                    self.p[i, src, dst] += prob
        
        # 因为src最多到99，所以p[:, 100, 100]是0，此处进行填补
        self.p[:, 100, 100] = 1
        self.value_pi = np.zeros((self.s_len))
        self.value_q = np.zeros((self.s_len, self.a_len))
        self.gamma = 0.8
        
        
    def play(self, state):
        return self.pi[state]

# 策略评估（reward计算）

In [4]:
def eval_game(env, agent):
    state = env.reset()
    total_reward = 0
    state_action = []
    
    while True:
        act = agent.play(state)
        state_action.append((state,act))
        state, reward, done, _ = env.step(act)
        total_reward += reward
        if done:
            break
    
    return total_reward, state_action

# 算法

## 1. 策略迭代

In [5]:
class PolicyIteration(object):
    
    dice = [3,6]
    
    def policy_evaluation(self, agent, max_iter=-1):
        iteration = 0
        while True:
            iteration += 1
            new_value_pi = agent.value_pi.copy()
            # 遍历所有的state 1~100  (s.len=101)
            for i in range(1, agent.s_len):
                ac = agent.pi[i]
                
                for j in range(0, agent.a_len):
                    # 选择确定性策略的action
                    if ac != j:
                        continue
                    transition = agent.p[ac, i, :]
                    value_sa = np.dot(transition, agent.r + agent.gamma * agent.value_pi)
                    # 放在j循环外部会报错:UnboundLocalError 因为跳过action无法算value_sa的值。
                    # 未求得value_sa就用该值就会报错(UnboundLocalError)
                    new_value_pi[i] = value_sa
            
            diff = np.sqrt(np.sum(np.power(agent.value_pi - new_value_pi, 2)))
            # 判断是否收敛
            if diff < 1e-6:
                print('policy evaluation proceed {} iters.'.format(iteration))
                break
            else:
                agent.value_pi = new_value_pi
            if iteration == max_iter:
                print('policy evaluation proceed {} iters.'.format(iteration))
                break
    
    
    def policy_improvement(self, agent):
        new_policy = np.zeros_like(agent.pi)
        for i in range(1, agent.s_len):
            for j in range(0, agent.a_len):
                transition = agent.p[j, i, :]
                agent.value_q[i,j] = np.dot(transition, agent.r + agent.gamma * agent.value_pi)
            # update policy
            max_act = np.argmax(agent.value_q[i,:])
            # 选择使value_q最大的action
            new_policy[i] = max_act
        
        # 如果没有更新（新策略和旧策略一致），返回False；如果不相等就赋值新策略/优化策略后，返回True
        if np.all(np.equal(new_policy, agent.pi)):
            return False
        else:
            agent.pi = new_policy
            return True
    
    
    def policy_iteration(self, agent, max_iter=-1):
        iteration = 0
        with timer('Timer PolicyIter'):
            while True:
                iteration += 1
                with timer('Timer PolicyEval'):
                    # 通过迭代求得agent.value_pi 准确估计值函数
                    self.policy_evaluation(agent, max_iter)
                with timer('Timer PolicyImprove'):
                    # 获得最优策略
                    ret = self.policy_improvement(agent)
                if not ret:
                    break
        print('Iter {} rounds converge'.format(iteration))

In [6]:
def policy_iteration_demo(env):
    agent = TableAgent(env)
    pi_algo = PolicyIteration()
    pi_algo.policy_iteration(agent)
    print('agent.pi={}'.format(agent.pi))
    total_reward, state_action = eval_game(env, agent)
    print('total_reward={0}, state_action={1}'.format(total_reward, state_action))

## 2. 价值迭代

In [12]:
def value_iteration(agent, max_iter=-1):
        iteration = 0
        dice = [3,6]
        with timer('Timer ValueIter'):
            while True:
                iteration += 1
                new_value_pi = agent.value_pi.copy()
                for i in range(1, agent.s_len):
                    value_sas = []
                    for j in range(0, agent.a_len):
                        value_sa = np.dot(agent.p[j,i,:], agent.r + agent.gamma * agent.value_pi)
                        value_sas.append(value_sa)
                    new_value_pi[i] = max(value_sas)

                diff = np.sqrt(np.sum(np.power(agent.value_pi - new_value_pi, 2)))
                if diff < 1e-6:
                    break
                else:
                    agent.value_pi = new_value_pi
                if iteration == max_iter:
                    break
            print('Iter {} rounds converge'.format(iteration))
            for i in range(1, agent.s_len):
                for j in range(0, agent.a_len):
                    agent.value_q[i,j] = np.dot(agent.p[j,i,:], agent.r + agent.gamma * agent.value_pi)
                max_act = np.argmax(agent.value_q[i,:])
                agent.pi[i] = max_act

In [8]:
# 价值迭代和策略迭代的对比
def policy_vs_value_demo(env):
    policy_agent = TableAgent(env)
    value_agent = TableAgent(env)
    
    pi_algo = PolicyIteration()
    pi_algo.policy_iteration(policy_agent)
    print('agent.pi={}'.format(policy_agent.pi))
    total_reward, state_action = eval_game(env, policy_agent)
    print('total_reward={0}, state_action={1}'.format(total_reward, state_action))
    
    value_iteration(value_agent)
    print('agent.pi={}'.format(value_agent.pi))
    total_reward, state_action = eval_game(env, value_agent)
    print('total_reward={0}, state_action={1}'.format(total_reward, state_action))

## 3. 泛化迭代

In [9]:
# 泛化迭代和前两种迭代的对比
def generalized_policy_compare(env):
    policy_vs_value_demo(env)
    
    gener_agent = TableAgent(env)
    
    with timer('Timer GeneralizedIter'):
        value_iteration(gener_agent, 10)
        pi_algo = PolicyIteration()
        pi_algo.policy_iteration(gener_agent, 1)
    print('agent.pi={}'.format(gener_agent.pi))
    total_reward, state_action = eval_game(env, gener_agent)
    print('total_reward={0}, state_action={1}'.format(total_reward, state_action))

In [14]:
if __name__ == '__main__':
    # 策略迭代  两个demo梯子数不同
#     env1 = SnakeEnv(0,[3,6])
    env2 = SnakeEnv(10,[3,6])
#     policy_iteration_demo(env1)
#     policy_iteration_demo(env2)
    
    # 价值迭代和策略迭代的对比
#     policy_vs_value_demo(env2)
    
    # 泛化迭代
    generalized_policy_compare(env2)
    

ladders info:{27: 4, 23: 9, 22: 91, 80: 63, 55: 15, 20: 24, 58: 33, 50: 16, 89: 41, 13: 48, 4: 27, 9: 23, 91: 22, 63: 80, 15: 55, 24: 20, 33: 58, 16: 50, 41: 89, 48: 13} dice ranges:[3, 6]
policy evaluation proceed 94 iters.
Timer PolicyEval COST:0.1179966926574707
Timer PolicyImprove COST:0.0030281543731689453
policy evaluation proceed 1 iters.
Timer PolicyEval COST:0.0
Timer PolicyImprove COST:0.003000497817993164
Timer PolicyIter COST:0.12502598762512207
Iter 2 rounds converge
agent.pi=[0 1 1 1 1 1 0 0 0 1 0 1 1 1 1 1 1 1 1 0 0 0 0 0 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 0 0 0 1 1 1 1 1 1 0 0 0 1 1 0 0 0 0 0 0 1 1 0 0 0 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 0 0 0 0]
total_reward=85, state_action=[(1, 1), (7, 0), (23, 0), (20, 0), (21, 0), (9, 1), (11, 1), (50, 1), (15, 1), (17, 1), (24, 1), (30, 1), (36, 1), (89, 1), (90, 1), (96, 1)]
Iter 94 rounds converge
Timer ValueIter COST:0.2518918514251709
agent.pi=[0 1 1 1 1 1 1 1 1 1 0 1 1 1 1 1 1 1 1 0 0 0 0 0 1 1 1 