In [48]:
from typing import Sequence, Tuple, Mapping
import numpy as np
from numpy.random import randint

S = str
DataType = Sequence[Sequence[Tuple[S, float]]]
ProbFunc = Mapping[S, Mapping[S, float]]
RewardFunc = Mapping[S, float]
ValueFunc = Mapping[S, float]

In [6]:
def get_state_return_samples(
    data: DataType
) -> Sequence[Tuple[S, float]]:
    """
    prepare sequence of (state, return) pairs.
    Note: (state, return) pairs is not same as (state, reward) pairs.
    """
    return [(s, sum(r for (_, r) in l[i:]))
            for l in data for i, (s, _) in enumerate(l)]


def get_mc_value_function(
    state_return_samples: Sequence[Tuple[S, float]]
) -> ValueFunc:
    """
    Implement tabular MC Value Function compatible with the interface defined above.
    """
    V = {key[0]:0.0 for key in state_return_samples}; 
    count = {key[0]:0 for key in state_return_samples};
    for pair in state_return_samples:
        state, return_ = pair
        V[state] += return_; count[state] += 1 
    for state in V.keys():
        V[state] = V[state] / count[state]
    return V

In [7]:
if __name__ == '__main__':
    given_data: DataType = [
        [('A', 2.), ('A', 6.), ('B', 1.), ('B', 2.)],
        [('A', 3.), ('B', 2.), ('A', 4.), ('B', 2.), ('B', 0.)],
        [('B', 3.), ('B', 6.), ('A', 1.), ('B', 1.)],
        [('A', 0.), ('B', 2.), ('A', 4.), ('B', 4.), ('B', 2.), ('B', 3.)],
        [('B', 8.), ('B', 2.)]
    ]

    sr_samps = get_state_return_samples(given_data)

    print("------------- MONTE CARLO VALUE FUNCTION --------------")
    print(get_mc_value_function(sr_samps))

------------- MONTE CARLO VALUE FUNCTION --------------
{'A': 9.571428571428571, 'B': 5.642857142857143}


In [44]:
def get_state_reward_next_state_samples(
    data: DataType
) -> Sequence[Tuple[S, float, S]]:
    """
    prepare sequence of (state, reward, next_state) triples.
    """
    return [(s, r, l[i+1][0] if i < len(l) - 1 else 'T')
            for l in data for i, (s, r) in enumerate(l)]


def get_probability_and_reward_functions(
    srs_samples: Sequence[Tuple[S, float, S]]
) -> Tuple[ProbFunc, RewardFunc]:
    """
    Implement code that produces the probability transitions and the
    reward function compatible with the interface defined above.
    """
    count = {key[0]: {key1[2]: 0 for key1 in srs_samples} for key in srs_samples}
    reward = {key[0]: 0.0 for key in srs_samples}
    for pair in srs_samples:
        state, reward_, next_state = pair
        count[state][next_state] += 1
        reward[state] += reward_        
    for key in count:
        total_count = 0
        for key1 in count[key]:
            total_count += count[key][key1]
        for key1 in count[key]:
            count[key][key1] = count[key][key1] / total_count
        reward[key] = reward[key] / total_count    
    return count, reward


def get_mrp_value_function(
    prob_func: ProbFunc,
    reward_func: RewardFunc
) -> ValueFunc:
    """
    Implement code that calculates the MRP Value Function from the probability
    transitions and reward function, compatible with the interface defined above.
    Hint: Use the MRP Bellman Equation and simple linear algebra
    """
    state_list = list(reward_func.keys())
    reward_vec = np.array([reward_func[state] for state in state_list])
    P = np.zeros((len(state_list), len(state_list)))
    i = 0; 
    for state1 in state_list:
        j = 0
        for state2 in state_list:
            P[i][j] = prob_func[state1][state2]
            j += 1
        i += 1
    V = np.linalg.inv(np.eye(len(state_list)) - P) .dot(reward_vec)
    return V

In [45]:
srs_samps = get_state_reward_next_state_samples(given_data)
pfunc, rfunc = get_probability_and_reward_functions(srs_samps)
print("-------------- MRP VALUE FUNCTION ----------")
print(get_mrp_value_function(pfunc, rfunc))

-------------- MRP VALUE FUNCTION ----------
[12.93333333  9.6       ]


In [115]:
def get_td_value_function(
    srs_samples: Sequence[Tuple[S, float, S]],
    num_updates: int = 300000,
    learning_rate: float = 0.3,
    learning_rate_decay: int = 30
) -> ValueFunc:
    """
    Implement tabular TD(0) (with experience replay) Value Function compatible
    with the interface defined above. Let the step size (alpha) be:
    learning_rate * (updates / learning_rate_decay + 1) ** -0.5
    so that Robbins-Monro condition is satisfied for the sequence of step sizes.
    """
    sample_num = len(srs_samples)
    V = {s[0]: [0.0] for s in srs_samples}
    for i in range (num_updates):
        state, reward, next_state = srs_samples[randint(sample_num, size=1)[0]]
        alpha = learning_rate * ((i+1) / learning_rate_decay + 1) ** (-0.5)
        if next_state != 'T':
            V[state].append(V[state][-1] + alpha * (reward + V[next_state][-1] - V[state][-1]))
        else:
            V[state].append(V[state][-1] + alpha * (reward - V[state][-1]))
    return {s: np.mean(v[-int(num_updates * 0.9):]) for s, v in V.items()}

In [116]:
print("------------- TD VALUE FUNCTION --------------")
print(get_td_value_function(srs_samps))

------------- TD VALUE FUNCTION --------------
{'A': 12.956945160966981, 'B': 9.625251336084178}


In [141]:
def get_lstd_value_function(
    srs_samples: Sequence[Tuple[S, float, S]]
) -> ValueFunc:
    """
    Implement LSTD Value Function compatible with the interface defined above.
    Hint: Tabular is a special case of linear function approx where each feature
    is an indicator variables for a corresponding state and each parameter is
    the value function for the corresponding state.
    """
    sample_num = len(srs_samples)
    nt_state = list(set(s[0] for s in srs_samples))
    num_state = len(nt_state)
    A = np.zeros((num_state, num_state))
    b = np.zeros((num_state, 1))
    for i in range (sample_num):
        state, reward, next_state = srs_samples[i]
        A[nt_state.index(state), nt_state.index(state)] += 1
        if next_state != "T":
            A[nt_state.index(state), nt_state.index(next_state)] -= 1
        b[nt_state.index(state)] += reward
    return {nt_state[i]: v for i, v in enumerate(np.linalg.inv(A).dot(b))}

In [142]:
print("------------- LSTD VALUE FUNCTION --------------")
print(get_lstd_value_function(srs_samps))

------------- LSTD VALUE FUNCTION --------------
{'B': array([9.6]), 'A': array([12.93333333])}
