Purpose of this notebook is to demonstrate the Monte Carlo method for policy approximation.

Aim is for you to understand the following features of Monte Carlo methods

- high variance
- inefficient in terms of using experience
- lack of any bootstrapping
- lack of environment model 

In [None]:
import collections

import numpy as np

In [None]:
#  we make use of a class for the Maze Markov Decision Process
#  I have incorporated the logic from the dynamic programming notebook into this class

class Maze_Env(object):
    def __init__(self, verbose=0):

        self.verbose = verbose
        
        #  creating the functions needed to define a Markov Decision Process
        self.state_space = np.array(['s{}'.format(state) for state in np.arange(1,6)])
        self.action_space = np.array(['left', 'right', 'up', 'down'])
        self.state_transitions = {state:np.genfromtxt('{}.csv'.format(state), delimiter=',') for state in self.state_space}
        self.reward_functions = {state:np.full((len(self.state_space)),-1) for state in self.state_space}
        
        #  our two changes to the reward function
        self.reward_functions['s4'][4] = 10
        self.reward_functions['s5'] = np.full((len(self.state_space)),0)
        
        self.state = self.reset()
        
    def reset(self):   
        """
        Resets the environment to the initial state
        """
        self.state = 's1'
        return self.state
        
    def step(self, action):
        """
        Environment response to a given action
        
        action -> reward + next_state
        """

        state_idx = np.argwhere(self.state==self.state_space).flatten()
        action_idx = np.argwhere(action==self.action_space).flatten()

        state_transition = self.state_transitions[self.state][action_idx].flatten()
        next_state = np.random.choice(self.state_space, p=state_transition)
        next_state_idx = np.argwhere(next_state==self.state_space).flatten()
    
        reward_function = self.reward_functions[self.state].flatten()     
        reward = reward_function[next_state_idx]
        
        if self.verbose == 1:
            print('state is {}'.format(self.state))
            print('action is {}'.format(action))
            print('reward is {}'.format(reward))
            print('next_state is {}'.format(next_state))
        self.state = next_state   
        return reward, next_state

In [None]:
#  we also use our old friend random_policy
#  we don't need the probability distribution now

def random_policy(state, action_space):
    action = np.random.choice(action_space)
    return action

In [None]:
HORIZION = 30
EPISODES = 10000

env = Maze_Env(verbose=0)

rewards = np.zeros(shape=(EPISODES, HORIZION))
states_visited = np.empty(shape=rewards.shape, dtype=object)  # can I use a different dtype here?

print('running {} episodes'.format(EPISODES))       
for episode in range(0, EPISODES):
    state = env.reset()
    
    for step in range(0, HORIZION):
        action = random_policy(state, env.action_space)
        reward, next_state = env.step(action)
        rewards[episode][step] = reward
        states_visited[episode][step] = state
        state = next_state
      

In [None]:
#  define a class for our Monte Carlo policy approximator

class MonteCarlo(object):
    def __init__(self, state_space):
        self.state_space = state_space
        
        self.returns_lists = collections.defaultdict(list) 
        self.value_function = collections.defaultdict(list) 

        self.discount_factor = 0.9
        
    def calc_returns(self, rewards):    
        R = 0
        returns = []
        for r in list(rewards)[::-1]:
            R = r + self.discount_factor * R  # the Bellman equation
            returns.insert(0, R)
        return np.array(returns)
    
    def update_value_function(self, state):
        self.value_function[state] = np.mean(self.returns_lists[state]) 
        return self.value_function
    
    def process_experience(self, states, rewards):
        print('processing {} episodes'.format(states.shape[0]))
        for ep, (episode_states, episode_rewards) in enumerate(zip(list(states), list(rewards))):
            episode_returns = self.calc_returns(episode_rewards)
            
            assert episode_states.shape == episode_rewards.shape
            assert episode_rewards.shape == episode_returns.shape
            
            for state, rtn in zip(episode_states, episode_returns):
                self.returns_lists[state].append(rtn)
                self.value_function = self.update_value_function(state)
            
            if ep % 100 == 0:
                print('episode {} value function is {}'.format(ep, self.value_function))
                
        return self.value_function
        
approx = MonteCarlo(env.state_space)
value_function = approx.process_experience(states_visited, rewards)

In [None]:
value_function