In [1]:
import os
import sys
from matplotlib import pyplot as plt
import random
import numpy as np
import pandas as pd
import tqdm
from abc import ABCMeta, abstractmethod, abstractproperty
from copy import copy
from collections import Counter
%matplotlib inline
plt.style.use("ggplot")
%load_ext autoreload
%autoreload 2

## Exercise 5.3 求解blackjacket最优策略

In [2]:
class Env(metaclass=ABCMeta):
    def __init__(self,rng_seed=random.randint(0,1000)):
        self.rng = np.random.RandomState(rng_seed)
    
    @staticmethod
    def get_actions(self):
        pass
    
    @abstractmethod
    def step(self,action):
        pass
        # return done,observation,reward,extra
        
    @abstractmethod
    def get_observation(self):
        pass
    
    @abstractmethod
    def observation_to_state(self,observation):
        pass
    
    @abstractmethod
    def get_history(self,):
        pass
    
    @abstractmethod
    def get_state_shape(self):
        pass
        
    @abstractmethod
    def reset(self):
        pass
    
class BlackJackEnv(Env):
    def __init__(self,dealer_stick=17,bust_threshold=21):
        super().__init__()
        self.dealer_stick = dealer_stick
        self.cards = [1,2,3,4,5,6,7,8,9,10,10,10,10]
        self.reset()
        self.actions = ["stick","hit"]
        self.bust_threshold = bust_threshold
    
    def get_actions(self):
        return self.actions
    
    def dealer_play(self):
        while self.get_max_number(self.dealer_cards) < self.dealer_stick:
            self.dealer_cards.append(self.get_random_card())
    
    def get_reward(self):
        dealer_num = self.get_max_number(self.dealer_cards)
        player_num = self.get_max_number(self.player_cards)
        if player_num > self.bust_threshold:
            reward = -1
        elif dealer_num > self.bust_threshold:
            reward = 1
        else:
            if dealer_num > player_num:
                reward = -1
            elif dealer_num == player_num:
                reward = 0
            else:
                # dealer_num < player_num
                reward = 1
        return reward
    
    def step(self,action):
        assert(action in self.actions)
        # nature
        if len(self.player_cards) == 2 and self.get_max_number(self.player_cards) == self.bust_threshold:
            if self.get_max_number(self.dealer_cards) == self.bust_threshold:
                return True,self.get_observation(),0,None
            else:
                return True,self.get_observation(),1,None
        self.action_history.append(action)
        
        if action == "stick":
            self.dealer_play()
            reward = self.get_reward()
            return True,self.get_observation(),reward,None
        elif action == "hit":
            self.player_cards.append(self.get_random_card())
            if self.get_max_number(self.player_cards) < self.bust_threshold:
                return False,self.get_observation(),0,None
            elif self.get_max_number(self.player_cards) == self.bust_threshold:
                return True,self.get_observation(),1,None
            else:
                # player busted
                return True,self.get_observation(),-1,None
        # return done,observation,reward,extra
    
    def has_unused_ace(self,cards):
        if 1 in cards and sum(cards) + 10 <= self.bust_threshold:
            return True
        else:
            return False
    
    def get_max_number(self,cards):
        sumcards = sum(cards)
        if self.has_unused_ace(cards):
            return sumcards + 10
        else:
            return sumcards
    
    def get_max_unbusted_number(self,cards):
        sumcards = sum(cards)
        assert(sumcards <= self.bust_threshold)
        if self.has_unused_ace(cards):
            return sumcards + 10
        else:
            return sumcards
    
    def get_observation(self):
        return (self.has_unused_ace(self.player_cards),self.player_cards,self.dealer_showcard)
    
    def observation_to_state(self,observation):
        unused_ace,player_cards,dealer_showcard = observation
        return int(unused_ace),self.get_max_unbusted_number(self.player_cards),dealer_showcard
        
    def get_state_shape(self):
        return [2,22,11]
        
    def get_random_card(self):
        return self.rng.choice(self.cards)
    
    def reset(self):
        self.player_cards = [self.get_random_card() for i in range(2)]
        
        self.dealer_cards = [self.get_random_card() for i in range(2)]
        self.dealer_showcard = self.dealer_cards[0]
        self.action_history = []
    
    def get_history(self):
        str_player =  f"player [{self.player_cards[:2]}] -> {self.player_cards[2:]} dealer [{self.dealer_cards[:2]}] -> {self.dealer_cards[2:]}"
        return "{} \n {}".format(str_player,"->".join(self.action_history))

### 21点环境使用示例

In [3]:
bje = BlackJackEnv()

In [4]:
bje.reset()
print(bje.get_observation())
while True:
    print(bje.get_observation())
    print(bje.observation_to_state(bje.get_observation()))
    done,observation,reward,extra = bje.step(random.choice(["hit","stick"]))
    if done:
        break 
print(reward)
print(bje.get_history())

(False, [10, 8], 10)
(False, [10, 8], 10)
(0, 18, 10)
-1
player [[10, 8]] -> [6] dealer [[10, 10]] -> [] 
 hit


### Stragegy 定义

In [5]:
class MdpStragety(metaclass=ABCMeta):
    def __init__(self,env:Env):
        self.env = env
        self.actions = self.env.get_actions()
        self.action2ind = dict(
            zip(
                self.actions,
                list(range(len(self.actions)))
            )
        )
        self.ind2action = dict(
            zip(
                list(range(len(self.actions))),
                self.actions
            )
        )
        
    @abstractmethod
    def choose_action(self,state):
        pass
    
    @abstractmethod
    def set_action(self,state,action):
        pass
    
class BlackJackStrategy(MdpStragety):
    def __init__(self,env:BlackJackEnv):
        super().__init__(env)
        self.strategy = np.zeros(env.get_state_shape(),dtype=np.int)
        
    def choose_action(self,state):
        return self.actions[self.strategy[state]]
    
    def set_action(self,state,action):
        self.strategy[state] = self.action2ind[action]

In [6]:
bje = BlackJackEnv()
bjs = BlackJackStrategy(bje)

In [7]:
bje.reset()
print(bje.get_observation())
while True:
    print(bje.get_observation())
    state = bje.observation_to_state(bje.get_observation())
    print(state)
    done,observation,reward,extra = bje.step(bjs.choose_action(state))
    if done:
        break
print(reward)
print(bje.get_history())

(False, [10, 8], 10)
(False, [10, 8], 10)
(0, 18, 10)
-1
player [[10, 8]] -> [] dealer [[10, 10]] -> [] 
 stick


### Monte Carol Exploring Starts

In [8]:
class MonteCarolExporingStarts(metaclass=ABCMeta):
    def __init__(self,strategy:MdpStragety,env:Env,discount=1.0):
        self.strategy = strategy
        self.env = env
        self.state_shape = self.env.get_state_shape()
        self.actions = self.env.get_actions()
        self.action2ind = dict(
            zip(
                self.actions,
                list(range(len(self.actions)))
            )
        )
        self.ind2action = dict(
            zip(
                list(range(len(self.actions))),
                self.actions
            )
        )
        self.action_number = len(self.actions)
        self.discount = discount
        
        self.Q_sum = np.zeros(list(self.state_shape) + list([self.action_number,]),np.float)
        self.Q_numer = np.zeros(list(self.state_shape) + list([self.action_number,]),np.int)
        
    @abstractmethod
    def set_env_random_initial_state(self):
        pass
    
    @abstractmethod
    def get_random_action(self):
        pass
    
    def rollout(self,steps=100000):
        for one_rollout_ind in tqdm.tqdm(range(steps)):
            self.set_env_random_initial_state()
            
            states = []
            actions = []
            rewards = []
            while True:
                state = self.env.observation_to_state(self.env.get_observation())
                states.append(state)
                if len(actions) == 0:
                    nextstep = self.get_random_action()
                else:
                    nextstep = self.strategy.choose_action(state)
                done,observation,reward,extra = self.env.step(nextstep)
                actions.append(nextstep)
                rewards.append(reward)
                if done:
                    break
                    
            # value calculation
            G = 0
            #print(self.env.get_history())
            for one_state,one_action,one_reward in zip(states[::-1],actions[::-1],rewards[::-1]):
                G = G * self.discount + one_reward
                
                qa_index = tuple(list(one_state) + list([self.action2ind[one_action],]))
                
                self.Q_numer[qa_index] += 1
                self.Q_sum[qa_index] += G
                
                # 这里做了一个假设，即在所有state均可使用所有action，如果不符合这个假设需要更改这里的代码
                Q_average = self.Q_sum[one_state] / (self.Q_numer[one_state] + 1e-6)
                best_action = self.ind2action[np.argmax(Q_average)]
                #print(one_state,best_action)
                
                self.strategy.set_action(one_state,best_action)
        
class BlackJackMonteCarolExporingStarts(MonteCarolExporingStarts):
    def __init__(self,strategy:MdpStragety,env:BlackJackEnv,discount=1.0):
        super().__init__(strategy,env,discount)
        self.cards = [1,2,3,4,5,6,7,8,9,10]
        self.card_pairs = []
        for one_card in self.cards:
            for another_card in self.cards:
                card_pair = [one_card,another_card]
                self.card_pairs.append(card_pair)
                
        self.card_pairs.append([1,4,6]) # 用于专门模拟 have unused ace 且val=21的情况
    
    def set_env_random_initial_state(self):
        self.env.reset()
        self.env.player_cards = copy(random.choice(self.card_pairs))
    
    def get_random_action(self):
        return random.choice(self.actions)

In [9]:
bje = BlackJackEnv()
bjs = BlackJackStrategy(bje)
bjce = BlackJackMonteCarolExporingStarts(bjs,bje)

In [17]:
bjce.rollout(1000000)

100%|██████████| 1000000/1000000 [02:19<00:00, 7183.71it/s]


### 打印最优策略

In [18]:
pd.DataFrame(bjce.strategy.strategy[0])

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,10
0,0,0,0,0,0,0,0,0,0,0,0
1,0,0,0,0,0,0,0,0,0,0,0
2,0,0,0,0,0,0,0,0,0,0,0
3,0,0,0,0,0,0,0,0,0,0,0
4,0,1,1,1,1,1,1,1,1,1,1
5,0,1,1,1,1,1,1,1,1,1,1
6,0,1,1,1,1,1,1,1,1,1,1
7,0,1,1,1,1,1,1,1,1,1,1
8,0,1,1,1,1,1,1,1,1,1,1
9,0,1,1,1,1,1,1,1,1,1,1


In [19]:
pd.DataFrame(bjce.strategy.strategy[1])

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,10
0,0,0,0,0,0,0,0,0,0,0,0
1,0,0,0,0,0,0,0,0,0,0,0
2,0,0,0,0,0,0,0,0,0,0,0
3,0,0,0,0,0,0,0,0,0,0,0
4,0,0,0,0,0,0,0,0,0,0,0
5,0,0,0,0,0,0,0,0,0,0,0
6,0,0,0,0,0,0,0,0,0,0,0
7,0,0,0,0,0,0,0,0,0,0,0
8,0,0,0,0,0,0,0,0,0,0,0
9,0,0,0,0,0,0,0,0,0,0,0


### 使用最优策略评估整个Black Jacket game是否可以exploit

In [20]:
class MonteCoralEvaluator(metaclass=ABCMeta):
    def __init__(self,env:Env,strategy:MdpStragety):
        self.env = env
        self.strategy = strategy
    
    @abstractmethod
    def set_env_random_initial_state(self):
        pass
    
    @abstractmethod
    def rollout_once(self):
        pass
    
    def rollout(self,steps):
        for one_rollout_ind in tqdm.tqdm(range(steps)):
            self.set_env_random_initial_state()
            self.rollout_once()
            
           
    @abstractmethod
    def get_evaluation_metrics(self):
        pass
    
class BlackjackMonteCoralEvaluator(MonteCoralEvaluator):
    def __init__(self,env:Env,strategy:MdpStragety):
        super().__init__(env,strategy)
        self.rewards = []
    
    def set_env_random_initial_state(self):
        self.env.reset()
    
    def rollout_once(self):
        while True:
            state = self.env.observation_to_state(self.env.get_observation())
            nextstep = self.strategy.choose_action(state)
            done,observation,reward,extra = self.env.step(nextstep)
            if done:
                break
        self.rewards.append(reward)
           
    def get_evaluation_metrics(self):
        average_value = np.average(self.rewards)
        win_rate = np.average(np.asarray(self.rewards) == 1)
        lose_rate = np.average(np.asarray(self.rewards) == -1)
        peace_rate = np.average(np.asarray(self.rewards) == 0)
        return {
            "average_value": average_value,
            "win_rate": win_rate,
            "lose_rate": lose_rate,
            "peace_rate": peace_rate,
        }

In [21]:
evaluator = BlackjackMonteCoralEvaluator(bje,bjs)

In [22]:
evaluator.rollout(1000000)

100%|██████████| 1000000/1000000 [01:35<00:00, 10443.65it/s]


In [23]:
evaluator.get_evaluation_metrics()

{'average_value': -0.034379,
 'win_rate': 0.440902,
 'lose_rate': 0.475281,
 'peace_rate': 0.083817}

## 结论

在blackjack的例子中可以发现，即使采用了最优策略player的收益仍然是负的，即使我们使用Exploring Start Monte Carl方法找到了最优解，我们也无法晚会player在对阵dealer时的劣势。