In [54]:
import numpy as np
import itertools
from tqdm import tqdm

学长写的环境

In [1]:
"""
21点环境
=======
state: [玩家手牌列表], 庄家明牌
action: {叫牌(0)，停牌(1)}
reward: 胜利 1，失败 -1，平局 0
"""
import numpy as np


class BasePolicy:
    """策略基类
    """
    def act(self, obs):
        raise NotImplementedError('Policy.act Not Implemented')


class DealerPolicy(BasePolicy):
    """庄家策略

    手牌小于17要牌，否则停止
    """
    def act(self, obs):
        if obs < 17:
            return 0   #hit
        else:
            return 1   #stick


class BlackJack:
    def __init__(self):
        super().__init__()

        # 动作空间
        # 0: 要牌 hit
        # 1: 停止 stick
        self.action_space = (0, 1)

        # 游戏状态:
        # 0 玩家抽卡阶段，玩家停止抽卡时进入下一阶段
        # 1 庄家抽卡阶段
        # 2 结算阶段
        self.state = 0

        # 玩家与庄家的卡
        self.player_trajectory = []
        self.dealer_trajectory = []

        # 庄家策略
        self.dealer_policy = DealerPolicy()

    def reset(self):
        # 给每人发两张卡
        # 返回当前的观察（玩家的牌列表、庄家的明牌）
        self.player_trajectory = []
        self.player_trajectory.append(self._get_card())
        self.player_trajectory.append(self._get_card())
        self.dealer_trajectory = []
        self.dealer_trajectory.append(self._get_card())
        self.dealer_trajectory.append(self._get_card())

        self.state = 0

        return self._get_obs()

    def step(self, action):
        if action == 0:  # 玩家抽卡
            assert self.state == 0, '只能在 0 状态抽卡'

            # 抽卡
            self.player_trajectory.append(self._get_card())

            # 检测是否爆牌
            if self._is_blast(self.player_trajectory):
                return self._get_obs(), -1, True, {}

            return self._get_obs(), 0, False, {}
        
        elif action == 1: # 玩家停止要牌
            # 进入庄家决策阶段
            self.state += 1

            # 获取庄家观测
            dealer_obs = max_point(self.dealer_trajectory)
            # 庄家决策
            action = self.dealer_policy.act(dealer_obs)
            while action == 0:
                # 庄家抽排
                self.dealer_trajectory.append(self._get_card())
                # 计算当前点数之和
                dealer_obs = max_point(self.dealer_trajectory)
                # 爆牌检测，如果庄家爆牌，玩家得到1的回报
                if self._is_blast(self.dealer_trajectory):
                    return self._get_obs(), 1, True, {}
                # 如果没有爆牌，根据当前点数计算新的动作
                action = self.dealer_policy.act(dealer_obs)

            # 庄家停止要牌，开始结算
            self.state += 1
            player_point = max_point(self.player_trajectory)
            dealer_point = max_point(self.dealer_trajectory)
            # 比较点数，计算回报
            if player_point > dealer_point:
                reward = 1
            elif player_point == dealer_point:
                reward = 0
            else:
                reward = -1
            return self._get_obs(), reward, True, {}

        else:
            raise ValueError('非法动作')

    def _get_card(self):
        """抽卡

        Returns
        -------
        int
            抽到的卡(1-10)
        """
        card = np.random.randint(1, 14)
        card = min(card, 10)
        return card

    def _get_obs(self):
        """获取观测

        Returns
        -------
        Cards : list of int
            手牌列表
        Dealer's card 1 : int
            庄家的第一张牌
        """
        return (self.player_trajectory.copy(), self.dealer_trajectory[0])

    def _is_blast(self, traj):
        """检测是否爆牌

        Parameters
        ----------
        traj : list of int
            牌列表

        Returns
        -------
        blast : bool
            如果爆牌返回 True，否则 False
        """
        return max_point(traj) > 21


def max_point(traj):
    """工具函数，计算一个牌列表的最大点数

    Parameters
    ----------
    traj : list of int
        牌列表
    
    Returns
    -------
    point : int
        最大点数
    """
    s = 0
    num_ace = 0
    for card in traj:
        if card == 1:
            num_ace += 1
            s += 11
        else:
            s += card
    
    while s > 21 and num_ace > 0:
        s -= 10
        num_ace -= 1
    
    return s



根据该环境写一个简单的小游戏

In [3]:
l=['平局','你赢了','你输了']
env=BlackJack()
while(1):
    print('\n开始游戏！')
    env.reset()
    count=0
    while(1):
        count+=1
        print('round{}'.format(count))
        obs=env._get_obs()
        print('这是当前你的手牌:{}'.format(obs[0]),'\n','这是庄家的一张牌:{}'.format(obs[1]))

        while(1):
            action=int(input(r'怎么说？(0：要牌，1：停牌)'))
            if action!=0 and action!=1:
                print('打咩得斯哟！')
            else:
                break

        x=env.step(action)
        obs=x[0]
        if not x[2]:
            #玩家可以继续摸牌
            continue
        if env.state==0 and x[2]:
            #游戏是否在第一阶段结束
            print('----------')
            print('这是当前你的手牌:{}'.format(obs[0]),'\n','这是庄家的牌:{}'.format(env.dealer_trajectory))
            print('你的总和大于21，你输了')
            break
        elif env.state==1 and x[2]:
            #游戏是否在第二阶段结束
            print('----------')
            print('这是当前你的手牌:{}'.format(obs[0]),'\n','这是庄家的牌:{}'.format(env.dealer_trajectory))
            print('庄家的总和大于21，你赢了')
            break
        elif env.state==2 and x[2]:
            #游戏是否在第三阶段结束
            print('----------')
            print('这是当前你的手牌:{}'.format(obs[0]),'\n','这是庄家的牌:{}'.format(env.dealer_trajectory))
            reward=x[1]
            print(l[reward])
            break
    f=input('是否继续？')
    if not f:
        break


开始游戏！
round1
这是当前你的手牌:[8, 10] 
 这是庄家的一张牌:2
怎么说？(0：要牌，1：停牌)1
----------
这是当前你的手牌:[8, 10] 
 这是庄家的牌:[2, 1, 4]
你赢了
是否继续？1

开始游戏！
round1
这是当前你的手牌:[9, 10] 
 这是庄家的一张牌:5
怎么说？(0：要牌，1：停牌)1
----------
这是当前你的手牌:[9, 10] 
 这是庄家的牌:[5, 10, 6]
你输了
是否继续？1

开始游戏！
round1
这是当前你的手牌:[1, 10] 
 这是庄家的一张牌:10
怎么说？(0：要牌，1：停牌)1
----------
这是当前你的手牌:[1, 10] 
 这是庄家的牌:[10, 1]
平局
是否继续？1

开始游戏！
round1
这是当前你的手牌:[10, 10] 
 这是庄家的一张牌:10
怎么说？(0：要牌，1：停牌)1
----------
这是当前你的手牌:[10, 10] 
 这是庄家的牌:[10, 7]
你赢了
是否继续？1

开始游戏！
round1
这是当前你的手牌:[1, 10] 
 这是庄家的一张牌:9
怎么说？(0：要牌，1：停牌)1
----------
这是当前你的手牌:[1, 10] 
 这是庄家的牌:[9, 4, 4]
你赢了
是否继续？1

开始游戏！
round1
这是当前你的手牌:[9, 10] 
 这是庄家的一张牌:8
怎么说？(0：要牌，1：停牌)1
----------
这是当前你的手牌:[9, 10] 
 这是庄家的牌:[8, 2, 10]
你输了
是否继续？1

开始游戏！
round1
这是当前你的手牌:[4, 1] 
 这是庄家的一张牌:8
怎么说？(0：要牌，1：停牌)0
round2
这是当前你的手牌:[4, 1, 7] 
 这是庄家的一张牌:8
怎么说？(0：要牌，1：停牌)0
----------
这是当前你的手牌:[4, 1, 7, 10] 
 这是庄家的牌:[8, 5]
你的总和大于21，你输了
是否继续？1

开始游戏！
round1
这是当前你的手牌:[8, 7] 
 这是庄家的一张牌:10
怎么说？(0：要牌，1：停牌)0
round2
这是当前你的手牌:[8, 7, 5] 
 这是庄家的一张牌:1

根据书上的分析，该游戏有10\*2\*10=200个状态：

* 由于当手牌小于11时必要牌，当前手牌总和只可能为12-21，10种
* 当前手牌是否有可以作11用的A，2种可能
* 庄家的第一张牌，1-10两种可能

该游戏只有两个动作，hit和stick。

下面我们定义这些状态和动作。

In [51]:
player_sum_list=[12+i for i in range(10)]
if_usabelA=[0,1]
dealer_sum_list=[1+i for i in range(10)]

states_id_list=[-1+i for i in range(201)]   #-1代表terminal -1-199
states_list=list(itertools.product(player_sum_list,if_usabelA,dealer_sum_list))
states_dic={-1:'T'}
for i in range(200):
    states_dic[i]=states_list[i]
state_action_pair=np.zeros((200,2))

In [52]:
def GetStateID(state):
    #将state转换成id
    #输入:三元组(player_sum,if_usabelA,dealer_card)
    #输出:id
    if state=='T':
        return -1
    else:
        return (state[0]-12)*20+state[1]*10+(state[2]-1)
    
def GetState(obs):
    #从当前情况得到state三元组
    #input:obs(list,int)
    #output:三元组
    player_sum=max_point(obs[0])
    dealer_card=obs[1]
    if not 1 in obs[0]:
        if_usabelA=0
        return (player_sum,if_usabelA,dealer_card)
    else:
        if_usabelA= 0 if sum(obs[0])>=12 else 1   #当所有牌的总和大于等于12时，其中的A不能换为11；当总和小于等于11时，其中的一个A可以换为11
        return (player_sum,if_usabelA,dealer_card)
        

for i in states_dic.keys():
    assert GetStateID(states_dic[i])==i,'有问题'

下面定义策略，我这里使用Off-policy n-step Sarsa，因为这个算法可以概括前面的MC、TD(0)的on-policy和off-policy、n-step TD等算法。

当然，后面还有$Q(\sigma)$算法，可以把这些都包含进去，还能加上Tree Backup。我暂时不用那个。

In [61]:
class NStepSarsa:
    
    def __init__(self,gamma=1,alpha=0.1,n=3,num_state=200,num_action=2,off_policy=True,epsilon=None):
        super().__init__()
        self.gamma=gamma
        self.alpha=alpha
        self.n=n
        self.num_state=num_state
        self.num_action=num_action   #实际上严格不应该这么写，因为每个state对应的action不一定一样，在这个问题中我们可以这么写，后面也有很多这种不太严格的地方
        self.off_policy=off_policy
        self.Q=np.zeros((num_state,num_action))   #估计
        self.pi=np.zeros((num_state,num_action))   #pi策略
        self.b=np.full((num_state,num_action),0.5)   #默认的b策略，可以改
        
        if not off_policy:
            self.epsilon=epsilon
    
    def UpdatePiFromQ(self):
        #根据估计值得到当前的最优策略(off-policy:determinal,on-policy:epsilon-greedy)
        tmp=np.argmax(self.Q,axis=1)
        if self.off_policy:
            self.pi=np.zeros((self.num_state,self.num_action))
            for i in range(self.num_state):
                self.pi[i][tmp[i]]=1
        else:
            self.pi=np.full((self.num_state,self.num_action),self.epsilon/(self.num_action-1))
            for i in range(self.num_state):
                self.pi[i][tmp[i]]=1-self.epsilon
    
    def GenerateAction(self,s):
        #根据当前b策略生成动作
        #input:s(id)
        return np.random.choice([0,1],p=self.b[s])
    
    def learn(self,num_episode,env):
        #估计
        for i in tqdm(range(num_episode)):
            env.reset()
            s_list=[]
            a_list=[]
            reward=[]   #注意reward列表的序号！跟书上有区别！
            s0=np.random.randint(0,200)   #初始状态
            s_list.append(s0)
            a0=self.GenerateAction(s0)   #初始动作
            a_list.append(a0)
            T=np.inf
            t=0
            while(1):
                
                if t<T:
                    #take action A
                    x=env.step(a_list[t])
                    if x[2]:
                        #一轮游戏结束
                        final_reward=x[1]   #最终时刻的奖励
                        reward.append(0)
                        reward.append(final_reward)
                        s_list.append(-1)
                        T=t+1
                    else:
                        #尚未结束
                        obs=x[0]
                        s=GetStateID(GetState(obs))
                        s_list.append(s)
                        reward.append(0)
                        a=self.GenerateAction(s)
                        a_list.append(a)
                tao=t-self.n+1   #当前需要更新的位置
                
                if tao>=0:
                    #需要更新
                    ro=1
                    for i in range(tao+1,min(tao+self.n,T-1)):
                        s=s_list[i]
                        a=a_list[i]
                        ro*=self.pi[s][a]/self.b[s][a]
                    G=0 if tao+self.n<T else reward[-1]   #书上公式在这个特定问题上的简化
                    if tao+self.n<T:
                        G+=(self.gamma**self.n)*self.Q[s_list[tao+self.n]][a_list[tao+self.n]]
                    self.Q[s_list[tao]][a_list[tao]]=self.Q[s_list[tao]][a_list[tao]]+self.alpha*ro*(G-self.Q[s_list[tao]][a_list[tao]])
                    self.UpdatePiFromQ()
                t+=1
                if tao==T-1:
                    break
                        
                    

In [70]:
Sarsa=NStepSarsa(gamma=1,num_state=200,num_action=2,off_policy=True,n=50)
env=BlackJack()
Sarsa.learn(100000,env)

100%|████████████████████████████████████████████████████████████████████████| 100000/100000 [00:32<00:00, 3071.84it/s]


In [73]:
Sarsa.pi

array([[0., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [1., 0.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [1., 0.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [1., 0.],
       [0., 1.],
       [1., 0.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.

In [46]:
T=np.inf
T-1

inf