Implementation of Monte Carlo ES (Sutton and Barto, section 5.3, page 99)

In [2]:
import numpy as np
import gym
from tqdm import tqdm

For OpenAI's gym, blackjack has an observation space consisting of a 3-tuple of

The player's current sum

The value of dealer's one showing card (1 - 10)

Whether or not the player has an ace (0 or 1) - important because aces can be either 1 or 11

The action space consists of two values, stick/stop (0) and hit / draw a card (1)

In [44]:
class Agent:

    def __init__(self, gamma=0.85):
        self.env = gym.make("Blackjack-v1", natural=False) # Blackjack environment from the OpenAI gym
        self.sum_size = self.env.observation_space[0].n # Number of possibilities for the player's current sum
        self.dealer_size = self.env.observation_space[1].n # Number of possibilities for the dealer's face up card
        self.gamma = gamma
        self.policy = self.initializeBlackjackPolicy()
        self.Q = self.initializeQ()
        self.returns = self.initializeReturns()

    '''
    Create an empty returns list
    '''
    def initializeReturns(self):

        returns = {}

        for sum in range(self.sum_size):
            for dealer_value in range(self.dealer_size):
                for action in range(self.env.action_space.n):
                    returns[((sum, dealer_value), action)] = (0, 0) # Initialize returns to a tuple of the form (number_of_samples, average_reward)
        
        return returns
    
    '''
    Initializes each state's policy to a random initial value
    '''
    def initializeBlackjackPolicy(self):
        policy = {}

        for sum in range(self.sum_size):
            for dealer_value in range(self.dealer_size):
                policy[(sum, dealer_value)] = 0 # Initialize policy arbitrarily
        
        return policy
    
    '''
    Creates a Q function of zeros for each state action pair
    '''
    def initializeQ(self):
        Q = {}

        for sum in range(self.sum_size):
            for dealer_value in range(self.dealer_size):
                for action in range(self.env.action_space.n):
                    Q[((sum, dealer_value), action)] = 0 # Initialize Q arbitrarily
        
        return Q
    
    '''
    Returns a random action to take and gain experience from
    Primarily for testing purposes, inefficient in practice
    '''
    def getRandomAction(self, observation):
        return np.random.randint(0, self.env.action_space.n)

    '''
    Follows a policy to select an action
    '''
    def getActionFromPolicy(self, observation):
        S_t_dim_0 = observation[0]
        S_t_dim_1 = observation[1]

        if np.random.randint(0, 10) >= 7: # Adding in some randomness to avoid potential initial pitfalls
            return self.getRandomAction(observation)
        
        return self.policy[(S_t_dim_0, S_t_dim_1)]
    
    '''
    Generates an action based on the policy to follow
    '''
    def generateEpisode(self, actionType=getActionFromPolicy):
        observation = self.env.reset()
        observation = (observation[0][0], observation[0][1])
        episode = []
        done = False
        while not done:
            action = actionType(self, observation)
            new_observation, reward, terminated, truncated, _ = self.env.step(action)

            new_state = (observation[0], observation[1])
            episode.append((new_state, action, reward))
            observation = new_observation

            done = terminated or truncated
        return episode

    def updateAfterEpisode(self, episode):
        G = 0
        for step in reversed(range(len(episode))):
            state = (episode[step][0][0], episode[step][0][1])
            
            A_t = episode[step][1]

            reward = episode[step][2]
            G = self.gamma * G + reward

            if episode[step][0] in episode[:step]: # Implement first look only
                print("not first")
                continue

            self.returns[(state, A_t)] = (self.returns[(state, A_t)][0] + 1, (self.returns[(state, A_t)][0] * self.returns[(state, A_t)][1] + G) / (self.returns[(state, A_t)][0] + 1))
            self.Q[(state, A_t)] = self.returns[(state, A_t)][1]
            self.policy[state] = np.argmax([self.Q[x] for x in self.Q if x[0] == state])

    def monteCarloES(self, gamma=0.85, num_episodes=200000):
        for ep in tqdm(range(num_episodes)):
            observation, info = self.env.reset()
            episode = self.generateEpisode()
            self.updateAfterEpisode(episode)
    
    def displayPolicy(self):
        print("   ", end="")
        for dealer in range(1, 11):
            print(" %03d " % dealer, end="")
        print()
        for sum in range(11, 21):
            print("%d " % sum, end="")
            for dealer in range(1, 11):
                print("  %d  " % self.policy[(sum, dealer)], end="")
            print()

In [45]:
a = Agent()
a.monteCarloES()
a.displayPolicy()

100%|██████████| 200000/200000 [00:55<00:00, 3610.99it/s]

    001  002  003  004  005  006  007  008  009  010 
11   1    1    1    1    1    1    1    1    1    1  
12   1    0    0    0    0    0    1    1    1    1  
13   1    0    0    0    0    0    1    1    1    1  
14   1    0    0    0    0    0    1    1    1    1  
15   1    0    0    0    0    0    1    1    1    1  
16   1    0    0    0    0    0    1    1    0    0  
17   1    0    0    0    0    0    0    0    0    0  
18   0    0    0    0    0    0    0    0    0    0  
19   0    0    0    0    0    0    0    0    0    0  
20   0    0    0    0    0    0    0    0    0    0  





Manual testing to play around with the mechanics of the gym

In [79]:
env = gym.make("Blackjack-v1", natural=False)
while True:
    observation, _ = env.reset()
    if observation[0] > observation[1]:
        continue
    
    print(observation)
    terminated = False
    while not terminated:
        action = int(input())
        observation, reward, terminated, _, _ = env.step(action)
        print(observation, action, reward)

(7, 7, False)
(7, 7, False) 0 -1.0
(7, 9, False)
(7, 9, False) 0 1.0
(9, 10, False)


ValueError: invalid literal for int() with base 10: ''