In [1]:
import sys
sys.path.append("../")

In [5]:
from __future__ import annotations

import numpy as np
from rl.function_approx import FunctionApprox 
from dataclasses import dataclass
from typing import TypeVar, Iterable, Tuple, Optional
from collections import defaultdict

X = TypeVar('X')

In [6]:
class TabularApprox(FunctionApprox):
    def __init__(self):
        self.count_dict = defaultdict(int)
        self.value_dict = defaultdict(int)
    
    def evaluate(self, x_values_seq: Iterable[X]) -> np.ndarray:
        return np.array([self.value_dict[x] for x in x_values_seq])
    
    def representational_gradient(self, x_value: X) -> TabularApprox[X]:
        pass
    
    def solve(
        self,
        xy_vals_seq: Iterable[Tuple[X, float]],
        error_tolerance: Optional[float] = None
    ) -> TabularApprox[X]:
        tmp = TabularApprox()
        tmp.update(xy_vals_seq=xy_vals_seq)
        return tmp
    
    def update(
        self,
        xy_vals_seq: Iterable[Tuple[X, float]]
    ) -> TabularApprox:
        for (x,y) in xy_vals_seq:
            self.count_dict[x] += 1
            self.value_dict[x] += (1 / self.count_dict[x]) * (y - self.value_dict[x])
        return self
    
    def within(self, other: FunctionApprox[X], tolerance: float) -> bool:
        if isinstance(other, TabularApprox):
            return np.all(
                (k in other) and (other.value_dict[k] == self.value_dict[k])
                for k in self.value_dict.keys()
            )

        return False

In [17]:
from typing import Sequence, Tuple, Mapping

S = str
DataType = Sequence[Sequence[Tuple[S, float]]]
ProbFunc = Mapping[S, Mapping[S, float]]
RewardFunc = Mapping[S, float]
ValueFunc = Mapping[S, float]


def get_state_return_samples(
    data: DataType
) -> Sequence[Tuple[S, float]]:
    """
    prepare sequence of (state, return) pairs.
    Note: (state, return) pairs is not same as (state, reward) pairs.
    """
    return [(s, sum(r for (_, r) in l[i:]))
            for l in data for i, (s, _) in enumerate(l)]


def get_mc_value_function(
    state_return_samples: Sequence[Tuple[S, float]]
) -> ValueFunc:
    """
    Implement tabular MC Value Function compatible with the interface defined above.
    """
    tab = TabularApprox()
    tab.update(state_return_samples)
    return tab.value_dict


def get_state_reward_next_state_samples(
    data: DataType
) -> Sequence[Tuple[S, float, S]]:
    """
    prepare sequence of (state, reward, next_state) triples.
    """
    return [(s, r, l[i+1][0] if i < len(l) - 1 else 'T')
            for l in data for i, (s, r) in enumerate(l)]


def get_probability_and_reward_functions(
    srs_samples: Sequence[Tuple[S, float, S]]
) -> Tuple[ProbFunc, RewardFunc]:
    """
    Implement code that produces the probability transitions and the
    reward function compatible with the interface defined above.
    """
    from collections import defaultdict, Counter
    next_state_map = defaultdict(list)
    reward_map = defaultdict(list)
    
    for s,r,s_ in srs_samples:
        next_state_map[s].append(s_)
        reward_map[s].append(r)

        
    next_state_map = dict(next_state_map) 
    for s in next_state_map.keys():
        n = len(next_state_map[s])
        tmp = Counter(next_state_map[s]) 
        for s_ in tmp.keys():
            tmp[s_] /= n
        next_state_map[s] = tmp
        
    
    reward_map = dict(reward_map)
    for s in reward_map.keys():
        reward_map[s] = np.sum(reward_map[s])
    
    return next_state_map, reward_map

def get_mrp_value_function(
    prob_func: ProbFunc,
    reward_func: RewardFunc
) -> ValueFunc:
    """
    Implement code that calculates the MRP Value Function from the probability
    transitions and reward function, compatible with the interface defined above.
    Hint: Use the MRP Bellman Equation and simple linear algebra
    """
    state_array = list(prob_func.keys())
    
    pMat = []
    for s in state_array:
        pMat.append([ prob_func[s][s_] for s_ in state_array ])
    pMat = np.array(pMat)
    
    rVec = np.array([ reward_func[s] for s in state_array ])
    
    valueFunc = np.linalg.inv(np.eye(len(state_array)) - pMat) @ rVec
    
    return {s: r for s,r in zip(state_array, valueFunc)}


def get_td_value_function(
    srs_samples: Sequence[Tuple[S, float, S]],
    num_updates: int = 300000,
    learning_rate: float = 0.3,
    learning_rate_decay: int = 30
) -> ValueFunc:
    """
    Implement tabular TD(0) (with experience replay) Value Function compatible
    with the interface defined above. Let the step size (alpha) be:
    learning_rate * (updates / learning_rate_decay + 1) ** -0.5
    so that Robbins-Monro condition is satisfied for the sequence of step sizes.
    """


def get_lstd_value_function(
    srs_samples: Sequence[Tuple[S, float, S]]
) -> ValueFunc:
    """
    Implement LSTD Value Function compatible with the interface defined above.
    Hint: Tabular is a special case of linear function approx where each feature
    is an indicator variables for a corresponding state and each parameter is
    the value function for the corresponding state.
    """


if __name__ == '__main__':
    given_data: DataType = [
        [('A', 2.), ('A', 6.), ('B', 1.), ('B', 2.)],
        [('A', 3.), ('B', 2.), ('A', 4.), ('B', 2.), ('B', 0.)],
        [('B', 3.), ('B', 6.), ('A', 1.), ('B', 1.)],
        [('A', 0.), ('B', 2.), ('A', 4.), ('B', 4.), ('B', 2.), ('B', 3.)],
        [('B', 8.), ('B', 2.)]
    ]

    sr_samps = get_state_return_samples(given_data)

    print("------------- MONTE CARLO VALUE FUNCTION --------------")
    print(get_mc_value_function(sr_samps))

    srs_samps = get_state_reward_next_state_samples(given_data)

    pfunc, rfunc = get_probability_and_reward_functions(srs_samps)
    print("-------------- MRP VALUE FUNCTION ----------")
    print(get_mrp_value_function(pfunc, rfunc))

    print("------------- TD VALUE FUNCTION --------------")
    print(get_td_value_function(srs_samps))

    print("------------- LSTD VALUE FUNCTION --------------")
    print(get_lstd_value_function(srs_samps))

------------- MONTE CARLO VALUE FUNCTION --------------
defaultdict(<class 'int'>, {'A': 9.571428571428571, 'B': 5.642857142857142})
-------------- MRP VALUE FUNCTION ----------
{'A': 143.7333333333333, 'B': 120.39999999999998}
------------- TD VALUE FUNCTION --------------
None
------------- LSTD VALUE FUNCTION --------------
None
