# Monte Carlo Methods

## Observable environments

In contrast to the DP chapter, we now work with environments where we don't know about all dynamics. Instead, we can interact with the environment and observe its reactions. This allows us to sample from the environment.

Monte Carlo methods perform a lot of these sampling steps, and try to draw conclusions from the results. By the law of large numbers, averaging the returns from many samples converges to the true expected value.

One important constraint is that the methods from this chapter only work in environments with terminal states.

In [None]:
import numpy as np

class ObservableEnvironment:
    def get_states(self):
        """
        Return the set of possible states
        """
        pass
    
    def get_possible_actions(self, state):
        """
        Returns the actions that can be taken from the given state
        """
        pass
    
    def execute_action(self, state, action):
        """
        Returns the new state and the given reward. This does not have to
        be deterministic
        """
        pass
    
    def is_terminal_state(self, state):
        """
        Returns a boolean indicating whether the state is terminal
        """
        pass
    
    def sample(self, policy, state):
        """
        Follows the policy until a terminal state is reached.
        Returns a list of states, actions, and the rewards they gave
        """
        
        result = []
        
        while not self.is_terminal_state(state):
            actions = self.get_possible_actions(state)
            ps = [policy[(state, action)] for action in actions]
            
            action = np.random.choice(actions, p=ps)
            
            new_state, reward = self.execute_action(state, action)
            
            result.append((state, action, reward))
            state = new_state
        
        return result

## BlackJack

In this chapter, we try to find an optimal policy to play a simplified version of BlackJack. The game is described in detail in Example 5.1 of Sutton & Barto's book.

In [None]:
from itertools import product
from random import choice

class BlackJack(ObservableEnvironment):
    def __init__(self):
        self._init_deck()
        
        self.HIT = 0
        self.STICK = 1
        
        self.WINNING_STATE = (21, 0, 0)
        self.LOSING_STATE = (0, 21, 0)
        self.DRAWING_STATE = (21, 21, 0)
        self.terminal_states = [self.WINNING_STATE, self.LOSING_STATE, self.DRAWING_STATE]
        
        self.WON = (self.WINNING_STATE, 1)
        self.LOST = (self.LOSING_STATE, -1)
        self.DRAW = (self.DRAWING_STATE, 0)
        
    def _init_deck(self):
        # Ace: 1
        # Numbers: 2 to 10
        # Jack/Queen/King: 10
        self.deck = range(1, 10 + 1) + [10] * 3
        
    def get_states(self):
        player_state = range(12, 21 + 1)
        dealer_state = range(1, 10 + 1)
        usable_ace = [0, 1]
        
        return list(product(player_state, dealer_state, usable_ace)) + self.terminal_states
    
    def get_possible_actions(self, state):
        player, dealer, usable_ace = state
        return [self.HIT, self.STICK]
    
    def execute_action(self, state, action):
        player, dealer, usable_ace = state
        
        if action == self.HIT:
            new_card = self._sample_card()
            player += new_card
            
            if new_card == 1:
                usable_ace = 1
            
            if player > 21:
                if usable_ace == 1:
                    player -= 10
                    usable_ace = 0
                else:
                    return self.LOST
        elif action == self.STICK:
            while dealer <= 17:
                dealer += self._sample_card()
                
            if player < dealer <= 21:
                return self.LOST
            else:
                return self.WON
        
        state = (player, dealer, usable_ace)
        return state, 0
    
    def _sample_card(self):
        return choice(range(1, 10 + 1))
    
    def is_terminal_state(self, state):
        return state in self.terminal_states

## On-policy

On-policy methods are RL methods where the policy that we try to optimize is the same policy that we use to explore. This implies that we always keep exploring.

Here, we use an $\epsilon$-greedy approach to make sure we always keep exploring.

### First-visit

First-visit means that the value associated with a state is the mean return after the state was visited for the first time.

In [None]:
class Agent:
    def print_policy(self):
        policy = self.policy
        
        for state in self.env.get_states():
            if self.env.is_terminal_state(state):
                continue
            
            actions = self.env.get_possible_actions(state)                
            QA = { action: self.Q[(state, action)] for action in actions }
            best_action = max(QA.items(), key=itemgetter(1))[0]
            
            print state, best_action

In [None]:
from operator import itemgetter

class OnPolicyFirstVisitMonteCarloAgent(Agent):
    def __init__(self, env, epsilon=.05):
        self.env = env
        self.epsilon = epsilon
        
        self.Q = {}
        self.returns = {}
        self.policy = {}
        
        for state in self.env.get_states():
            actions = self.env.get_possible_actions(state)
            num_actions = len(actions)
            
            for action in actions:
                self.Q[(state, action)] = 0
                self.returns[(state, action)] = []
                self.policy[(state, action)] = 1. / num_actions
        
    def learn(self, num_samples):
        for _ in range(num_samples):
            episode = self.env.sample(self.policy, self._get_start_state())
            
            if len(episode) == 0:
                continue
            
            self._calculate_returns(episode)
                            
            states, _, _ = zip(*episode)
            for state in set(states):
                actions = self.env.get_possible_actions(state)
                num_actions = len(actions)
                
                QA = { action: self.Q[(state, action)] for action in actions }
                best_action = max(QA.items(), key=itemgetter(1))[0]
                
                for action in actions:
                    self.policy[(state, action)] = self.epsilon / num_actions
            
                self.policy[(state, best_action)] += 1 - self.epsilon
    
    def _calculate_returns(self, episode):
        returns = {}
        reward_after = 0

        for state, action, reward in reversed(episode):
            reward_after += reward
            returns[(state, action)] = reward_after
                
        for (state, action), ret in returns.items():
            self.returns[(state, action)].append(ret)
            self.Q[(state, action)] = mean(self.returns[(state, action)])
    
    def _get_start_state(self):
        return choice(self.env.get_states())

By learning from 500,000 samples we can learn the optimal policy. The same policy is shown in the book, but visualized a bit nicer.

In [None]:
env = BlackJack()
agent = OnPolicyFirstVisitMonteCarloAgent(env)

agent.learn(num_samples=500000)

agent.print_policy()

### Every-visit

First-visit means that the value associated with a state is the mean return after all visits to that state.

In [None]:
class OnPolicyEveryVisitMonteCarloAgent(OnPolicyFirstVisitMonteCarloAgent):
    def _calculate_returns(self, episode):
        returns = {}
        reward_after = 0

        for state, action, reward in reversed(episode):
            reward_after += reward
            returns[(state, action)] = reward_after
            self.returns[(state, action)].append(reward_after)
                
        for (state, action), ret in returns.items():
            self.Q[(state, action)] = mean(self.returns[(state, action)])

As expected, this method yields the same results as the first-visit method.

In [None]:
env = BlackJack()
agent = OnPolicyEveryVisitMonteCarloAgent(env)

agent.learn(num_samples=500000)

agent.print_policy()

## Off-policy

The on-policy approach has a fundamental flaw: We try to optimize a policy that also needs to keep exploring, which means it can never really be optimal.

The off-policy approach tries to fix this by using a different policy just for exploration. Importance sampling is then used to make sure the results are weighted correctly.

In [None]:
from collections import defaultdict

class OffPolicyEveryVisitMonteCarloAgent(OnPolicyEveryVisitMonteCarloAgent):
    def learn(self, num_samples):
        self._init_u_policy()
        
        self.C = defaultdict(float)
        
        for _ in range(num_samples):
            episode = self.env.sample(self.u_policy, self._get_start_state())
            
            if len(episode) == 0:
                continue
                
            G = 0.
            W = 1.
            
            for i in reversed(range(len(episode) - 1)):
                next_state, next_action, next_reward = episode[i + 1]
                state, action, reward = episode[i]
                
                G += next_reward
                self.C[(state, action)] += W
                self.Q[(state, action)] += W / self.C[(state, action)] * (G - self.Q[(state, action)])
                
                actions = self.env.get_possible_actions(state)                
                QA = { action: self.Q[(state, possible_action)] for possible_action in actions }
                best_action = max(QA.items(), key=itemgetter(1))[0]
                
                for possible_action in actions:
                    self.policy[(state, possible_action)] = 0
            
                self.policy[(state, best_action)] = 1
                
                if action != best_action:
                    break
                    
                W *= 1. / self.u_policy[(state, action)]
    
    def _init_u_policy(self):
        self.u_policy = {}
        
        for state in self.env.get_states():
            actions = self.env.get_possible_actions(state)
            num_actions = len(actions)
            
            for action in actions:
                self.u_policy[(state, action)] = 1. / num_actions

In [None]:
env = BlackJack()
agent = OffPolicyEveryVisitMonteCarloAgent(env)

agent.learn(num_samples=500000)

agent.print_policy()

In [None]:
agent.Q