## Question 1 : Implementation for finite episodes

In [93]:
from typing import Sequence, Tuple, Mapping
from typing import Iterable, Tuple, Mapping, TypeVar, Iterator, Sequence
from operator import itemgetter
import numpy as np
from itertools import *
from numpy.random import randint
from collections import defaultdict
import itertools

S = str
DataType = Sequence[Sequence[Tuple[S, float]]]
ProbFunc = Mapping[S, Mapping[S, float]]
RewardFunc = Mapping[S, float]
ValueFunc = Mapping[S, float]


def get_state_return_samples(
    data: DataType
) -> Sequence[Tuple[S, float]]:
    """
    prepare sequence of (state, return) pairs.
    Note: (state, return) pairs is not same as (state, reward) pairs.
    """
    return [(s, sum(r for (_, r) in l[i:]))
            for l in data for i, (s, _) in enumerate(l)]


def get_mc_value_function(
    state_return_samples: Sequence[Tuple[S, float]]
) -> ValueFunc:
    """
    Implement tabular MC Value Function compatible with the interface defined above.
    """
    def full_group_by(l, key=lambda x: x[0]):
        d = defaultdict(list)
        for item in l:
            d[key(item)].append(item[1])
        return d.items()
    return {s: np.mean([r for r in l]) for s, l in full_group_by(state_return_samples)}



def get_state_reward_next_state_samples(
    data: DataType
) -> Sequence[Tuple[S, float, S]]:
    """
    prepare sequence of (state, reward, next_state) triples.
    """
    return [(s, r, l[i+1][0] if i < len(l) - 1 else 'T')
            for l in data for i, (s, r) in enumerate(l)]


def get_probability_and_reward_functions(
    srs_samples: Sequence[Tuple[S, float, S]]
) -> Tuple[ProbFunc, RewardFunc]:
    """
    Implement code that produces the probability transitions and the
    reward function compatible with the interface defined above.
    """
    count : Mapping[S,int] = {}
    for s1,r,s2 in srs_samples :
        if s1 in count.keys():
            count[s1] +=1
        else :
            count[s1] = 1
    prob_func : Mapping[S, Mapping[S, float]] = {s : {} for s in count.keys()}
    reward_func : Mapping[S, float] = {s : 0 for s in count.keys()}
    for s1,r,s2 in srs_samples :
        if s2 in prob_func[s1].keys():
            prob_func[s1][s2] += 1/count[s1]
        else:
            prob_func[s1][s2] = 1/count[s1]
        reward_func[s1] += r/count[s1]
    return (prob_func, reward_func)

    


def get_mrp_value_function(
    prob_func: ProbFunc,
    reward_func: RewardFunc
) -> ValueFunc:
    """
    Implement code that calculates the MRP Value Function from the probability
    transitions and reward function, compatible with the interface defined above.
    Hint: Use the MRP Bellman Equation and simple linear algebra
    """
    def get_transition_matrix(prob_func) -> np.ndarray:
        sz = len(prob_func.keys())
        mat = np.zeros((sz, sz))
        for i, s1 in enumerate(prob_func.keys()):
            for j, s2 in enumerate(prob_func.keys()):
                mat[i, j] = prob_func[s1][s2]
        return mat
    prob_mat = get_transition_matrix(prob_func)
    reward_vec = np.zeros(len(reward_func.keys()))
    for i, s1 in enumerate(reward_func.keys()):
        reward_vec[i] = reward_func[s1]
    value_vec = np.linalg.solve(np.eye(len(prob_func.keys())) - prob_mat, reward_vec) #gamma = 1
    return(value_vec)
    


def get_td_value_function(
    srs_samples: Sequence[Tuple[S, float, S]],
    num_updates: int = 300000,
    learning_rate: float = 0.3,
    learning_rate_decay: int = 30
) -> ValueFunc:
    """
    Implement tabular TD(0) (with experience replay) Value Function compatible
    with the interface defined above. Let the step size (alpha) be:
    learning_rate * (updates / learning_rate_decay + 1) ** -0.5
    so that Robbins-Monro condition is satisfied for the sequence of step sizes.
    """
    nb_samples = len(srs_samples)
    vf = {s : 0 for s in set(x for x,_,__ in srs_samples)}
    for updates in range(num_updates):
        s,r,s1 = srs_samples[randint(nb_samples, size=1)[0]]
        if s1 =='T':
            vf[s] =  vf[s] + (learning_rate * (updates / learning_rate_decay + 1) ** -0.5)*(r + 0 - vf[s]) #gamma = 1
        else:
            vf[s] = vf[s] + (learning_rate * (updates / learning_rate_decay + 1) ** -0.5)*(r + 1*vf[s1] - vf[s]) #gamma = 1
    return vf


#We can calculate the solution w∗ as A−1 · b, where the m × m Matrix A is accumulated at
# each each atomic experience (Si, Ri, Si′) as: A←A+φ(Si)·(φ(Si)−γ·φ(Si′))T (notetheOuter-Product)
# and the m-Vector b is accumulated at each atomic experience (Si, Ri, Si′) as: b ← b+φ(Si)·Ri


def get_lstd_value_function(
    srs_samples: Sequence[Tuple[S, float, S]]
) -> ValueFunc:
    """
    Implement LSTD Value Function compatible with the interface defined above.
    Hint: Tabular is a special case of linear function approx where each feature
    is an indicator variables for a corresponding state and each parameter is
    the value function for the corresponding state.
    """
    nt_states = list(set(x for x,_,__ in srs_samples))
    nb_nt_states = len(nt_states)
    A = np.zeros((nb_nt_states,nb_nt_states))
    b = np.zeros(nb_nt_states)
    for s,r,s1 in srs_samples : #Tabular
        phi = np.zeros(nb_nt_states)
        phi[nt_states.index(s)] +=1
        phi_1 = np.zeros(nb_nt_states)
        if s1 != 'T': #If terminal, then 0
            phi_1[nt_states.index(s1)] +=1
        A = A + np.outer(phi,(phi-phi_1).T) #gamma = 1 
        b = b + r*phi
    vf = {s : 0 for s in nt_states}
    sol = np.linalg.inv(A).dot(b)
    for s in nt_states:
        vf[s] = sol[nt_states.index(s)]
    return vf




In [94]:
given_data: DataType = [
    [('A', 2.), ('A', 6.), ('B', 1.), ('B', 2.)],
    [('A', 3.), ('B', 2.), ('A', 4.), ('B', 2.), ('B', 0.)],
    [('B', 3.), ('B', 6.), ('A', 1.), ('B', 1.)],
    [('A', 0.), ('B', 2.), ('A', 4.), ('B', 4.), ('B', 2.), ('B', 3.)],
    [('B', 8.), ('B', 2.)]
]

sr_samps = get_state_return_samples(given_data)

print("------------- MONTE CARLO VALUE FUNCTION --------------")
print(get_mc_value_function(sr_samps))

srs_samps = get_state_reward_next_state_samples(given_data)
(pfunc, rfunc) = get_probability_and_reward_functions(srs_samps)
print("-------------- MRP VALUE FUNCTION ----------")
print(get_mrp_value_function(pfunc, rfunc))

print("------------- TD VALUE FUNCTION --------------")
print(get_td_value_function(srs_samps))

print("------------- LSTD VALUE FUNCTION --------------")
print(get_lstd_value_function(srs_samps))

------------- MONTE CARLO VALUE FUNCTION --------------
{'A': 9.571428571428571, 'B': 5.642857142857143}
-------------- MRP VALUE FUNCTION ----------
[12.93333333  9.6       ]
------------- TD VALUE FUNCTION --------------
{'B': 9.898222375600405, 'A': 13.309504556473101}
------------- LSTD VALUE FUNCTION --------------
{'B': 9.600000000000001, 'A': 12.933333333333334}


In [58]:
test ={'a': 1, 'b' : 2, 'c' : 3}

In [14]:
for i,s in enumerate(test.keys()):
    print(i,s)

0 a
1 b
2 c
