# Assignment 12

In [1]:
# Change notebook width
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:70% !important; }</style>"))

In [1]:
from dataclasses import dataclass
from typing import *
from rl.markov_decision_process import *
from rl.markov_process import *
from rl.distribution import *
from rl.dynamic_programming import *
from rl.returns import *
from scipy.stats import poisson
import pprint as pp
import numpy as np
import itertools
import time
from rl.monte_carlo import mc_prediction
from rl.td import td_prediction
from rl.function_approx import Tabular
from rl.function_approx import FunctionApprox
from rl.markov_process import FiniteMarkovRewardProcess
import matplotlib.pyplot as plt

# Problem 1

In [2]:
S = TypeVar('S')
A = TypeVar('A')
V = Mapping[S, float]

# Return a Value Function mapping of zeros and a mapping of state counts that's also zeros 
# This function is used in both Problem 1 + 2
def initiate_VF_plus_Counts(states):
    # Start with empty value function
    vf: Mapping[S, float] = {}
        
    # Create a dictionary to keep count of the number of times a state has been visited
    state_counts: Mapping[S, int] = {}
        
    # Loop over each state and add the zeros 
    for state in states:
        vf[state] = 0.0
        state_counts[state] = 0
        
    return vf, state_counts

def calculate_Gt_n(transitions, vf, γ, idx, n, approx=False):
    last_idx = min(idx + n, len(transitions) - 1)
    G = 0
    for k, T in enumerate(itertools.islice(transitions, idx, last_idx)):
        G += γ**k * T.reward
        bootstrapped_state = T.state # Only need last state
    if approx:
        G += γ**(last_index - idx) * vf(bootstrapped_state)
    else: # Tabular / not usung an approximate vf
        G += γ**(last_index - idx) * vf[bootstrapped_state]
    
    return G

In [3]:


# n step bootstrap algorithm for the tabular case where all states are defined and discrete
def n_step_tabular_bootstrap(
    transitions: Iterable[TransitionStep[S]],
    vf: V,
    state_counts: Mapping[S, int],    # Dictionary of counts per state
    γ: float = .9,                    # Discount Factor
    n: int = 3,                       # Number to steps to look ahead
    αi: float = .1,                   # Initial learning rate
    num_transitions: float = 1000,    # Number of transitions to loop through
    half_life: float = 1000.,
    exponent: float = 0.5,
    tolerance: float = 1e-6
                                                                ) -> V: # Value function
    
    for i, transition in enumerate(transitions):
        # Break if number of transitions reached
        if i == num_transitons:
            break
        
        # Get current state from tansition
        cur_state = transiton.state
        
        # Increment State counts and checks if in dictionary
        if cur_state in state_counts: 
            state_counts[cur_state] += 1
        else: 
            state_counts[cur_state] = 1
        
        # Update learning rate
        α = αi / (1 + ((state_counts[cur_state] - 1) / half_life)**exponent)
        
        # Get G_{t,n} the n-step bootstrapped return 
        G = calculate_Gt_n(transitions, vf, γ, i, n, False)
            
        # Update Value funtion Approximation
        vf.update([ cur_state, vf[cur_state] * α * (Gt_n - vf[cur_state]) ])
        
    return vf
    



# n step bootstrap algorithm for the function approximation case where the value function is approximated
def n_step_funcapprox_bootstrap(
    transitions: Iterable[TransitionStep[S]], 
    state_counts: Mapping[S, int],
    vf: FunctionApprox[S],
    γ: float,
    n: int,
    αi: float,
    num_transitions: int = 1000
                                                            ) -> FunctionApprox[S]:
    
    for i, transition in enumerate(transitions):
        # Break if number of transitions reached
        if i == num_transitons:
            break
        
        # Get current state from tansition
        cur_state = transiton.state
        
        # Increment State counts and checks if in dictionary
        if cur_state in state_counts: 
            state_counts[cur_state] += 1
        else: 
            state_counts[cur_state] = 1
        
        # Update learning rate
        α = αi / (1 + ((state_counts[cur_state] - 1) / half_life)**exponent)
        
        # Get G_{t,n} the n-step bootstrapped return 
        G = calculate_Gt_n(transitions, vf, γ, i, n, True)
            
        # Update Value funtion Approximation
        vf.update([ cur_state, vf(cur_state) * α * (Gt_n - vf(cur_state)) ])
        
    return vf

# Problem 2

In [4]:
def tabular_td_lambda(
    transitions: Iterable[TransitionStep[S]],
    vf: V,
    state_counts: Mapping[S, int],
    γ: float,
    α: float,
    λ:float,
    eligibility_trace: Dict[S, float],
    num_transitions: float = 1000,
    half_life: float = 1000.,
    exponent: float = 0.5,
        
                                                        ) -> V: # Value function
    
    for i, step in enumerate(transitions):
        # Break if number of transitions reached
        if i == num_transitions:
            break
        
        # Get State, Next State, and Reward values
        cur_state = step.state
        next_state = step.next_state
        reward = step.reward
        
        # Update Dictionary of eligibility
        eligibility_trace = {state: γ*λ*eligibility for state, eligibility in eligibility_traces.items()}
        # Increment eligibility of current state and checks if state is in dictionary
        if cur_state in eligibility_trace: 
            eligibility_trace[cur_state] += 1 
        else: 
            eligibility_trace[cur_state] = 1 
        
        # Increment state counts
        state_counts[cur_state] += 1   
        
        # Update value function
        vf[state] += α * (reward + γ * vf[next_state] - vf[cur_state]) * eligibility_trace[state]
    
    return vf


def approx_td_lambda(
    transitions: Iterable[TransitionStep[S]],
    state_counts: Mapping[S, int],
    γ: float,
    λ: float,
    vf: FunctionApprox[S],
    α: float = 0.01
                                                                    ) -> FunctionApprox[S]:
    
    for i, step in enumerate(transitions):
        # Break if number of transitions reached
        if i == num_transitions:
            break
        
        # Get State, Next State, and Reward values
        cur_state = step.state
        next_state = step.next_state
        reward = step.reward
        
        # Update Dictionary of eligibility
        eligibility_trace = {state: γ*λ*eligibility for state, eligibility in eligibility_traces.items()}
        # Increment eligibility of current state and checks if state is in dictionary
        if cur_state in eligibility_trace:
            eligibility_trace[cur_state] += 1
        else: 
            eligibility_trace[cur_state] = 1 
        
        # Update value function
        vf.update([ cur_state, vf(cur_state) * α * (Gt_n - vf(cur_state)) ])
    
    return vf