In [1]:
import numpy as np
from scipy.linalg import eig
import copy
import math

#### Question 1
Write code for the interface for tabular RL algorithms. The core of this interface should be a mapping from a (state, action) pair to a sampling of the (next state, reward) pair. It is important that this interface doesn't present the state-transition probability model or the reward model.

In [6]:
# some class prototype defined, e.g MDP and Policy
# the function definition of Policy is in accordance with https://github.com/coverdrive/MDP-DP-RL/
# with a simplified test harness
from operator import itemgetter
from typing import Mapping, Set, Tuple, Sequence, Any, Callable, TypeVar

X = TypeVar('X')
A = TypeVar('A')
S = TypeVar('S')

def epsilon_greedy(action_value_dict, epsilon: float) -> Mapping[A, float]:
    """
    Using Epsilon-Greedy method to select the action based on the value
    @return: the function returns the a dictionary of a action and its probability to be selected
    """
    max_act = max(action_value_dict.items(), key=itemgetter(1))[0]
    m = len(action_value_dict)
    if epsilon == 0:
        return {max_act: 1.}
    else:
        # with 1 / epsilon to select a random  
        return {action: epsilon / m + (1. - epsilon if a == max_act else 0.) for action in action_value_dict.keys()}




class Policy(Generic[S, A]):

    def __init__(self, data: Dict[S, Mapping[A, float]]) -> None:
        self.policy_data = data

    def get_state_probabilities(self, state: S) -> Mapping[A, float]:
        return self.policy_data[state]

    def get_state_action_probability(self, state: S, action: A) -> float:
        return self.get_state_probabilities(state).get(action, 0.)

    def edit_state_action_to_epsilon_greedy(
        self,
        state: S,
        action_value_dict: Mapping[A, float],
        epsilon: float
    ) -> None:
        self.policy_data[state] = epsilon_greedy(action_value_dict, epsilon)

    def __repr__(self):
        return self.policy_data.__repr__()

    def __str__(self):
        return self.policy_data.__str__()



In [14]:
import MDP from mdp
import Policy from policy

class tabularRL:
    """
    The model-free RL without state-transition probability model
    MC and TD0 should inherit from this tabularRL class;
    however, in this assignment, the following MC and TD are inedpendent from this super class
    """
    

    def init_policy(self) -> Policy:
        # initiate a policy where each state can take all actions with equal probability
        policy = {}
        for state, action_set in state_action_dict.items():
            policy[state] = {}
            for action in action_set:
                policy[state][action] = 1.0 / len(action_set)
        return Policy(policy)

    def get_value_func_dict(self, pol: Policy) -> VFDictType:
        return get_vf_dict_from_qf_dict_and_policy(
            self.get_qv_func_dict(pol),
            pol
        )

    @abstractmethod
    def get_qv_func_dict(self, pol: Optional[Policy]) -> QFDictType:
        pass

    def get_act_value_func_dict(self, pol: Policy) -> QFDictType:
        return self.get_qv_func_dict(pol)

    def get_optimal_det_policy(self) -> DetPolicy:
        return get_det_policy_from_qf_dict(self.get_qv_func_dict(None))
    
    def __init__(self, mdp:MDPforTabular):
        # a MDP is what the rl should learn
        self.mdp = mdp
        self.state_action_dict = mdp.state_action_dict
        # initiate a random policy
        self.policy = self.init_policy()

In [None]:
class MDP:
    """
    The MDP compatible to RL, where it doesn't have a specific transition probability
    But for each state and action, it can still generate a value and next action

    """
    def __init__(self):
        
        

In [None]:
# TODO: this class requires more thinking and will reconstruct it
# TODO: state_reward_gen_dict requires substatiate
# referenced from  https://github.com/coverdrive/MDP-DP-RL/
class MDPforTabular(MDP):
    """
    The MDP is defined to initalize with a state_action_dict and state_reward_dict
    """
    def __init__(self,state_action_dict: Mapping[S, Set[A]], terminal_states: Set[S], state_reward_gen_dict, gamma: float) -> None:
        self.state_action_dict: Mapping[S, Set[A]] = state_action_dict
        self.terminal_states: Set[S] = terminal_states
        self.state_reward_gen_dict: Type1 = state_reward_gen_dict
        self.state_action_func=lambda x: self.state_action_dict[x],
        self.gamma=gamma,
        self.terminal_state_func=lambda x: x in self.terminal_states,
        self.state_reward_gen_func=lambda x, y: self.state_reward_gen_dict[x][y](),
        self.init_state_gen=get_rv_gen_func_single(
                {s: 1. / len(self.state_action_dict) for s
                 in self.state_action_dict.keys()}
            ),
        self.init_state_action_gen=get_rv_gen_func_single(
                {(s, a): 1. / sum(len(v) for v
                                  in self.state_action_dict.values())
                 for s, v1 in self.state_action_dict.items() for a in v1}
        )




#### Question 2:
Implement any tabular Monte-Carlo algorithm for Value Function prediction

In [None]:
class MonteCarloMethod:
    def __init__(self, mdp, num_episodes, max_step):
        """
        First-visit Monte Carlo
        """
        self.num_episodes = num_episodes
        self.max_step = max_step
        self.mdp = mdp
            
    def get_one_episode(
        self,
        pol: Policy,
        start_state: S,
        start_action = None,
    ) -> Sequence[Tuple[S, A, float, bool]]:
        """
        @return: the funtion will return a sequence of trace of the current Monte Carlo simulation
        """
        res = []
        state = start_state
        steps = 0
        terminate = False
        state_visited_set = set()
        # define a callable associated with each state to generate the probability of next state
        act_gen_dict = {s: get_rv_gen_func_single(pol.get_state_probabilities(s))
                        for s in self.mdp_rep.state_action_dict.keys()}

        for step in range(self.max_step):
            # MC simulation until the step reaches the predefined max_step
            first = state not in state_visited_set
            occ_states.add(state)
            action = act_gen_dict[state]() if (step > 0 or start_action is None) else start_action
            # choosing the next state based on the current state and action
            next_state, reward = self.mdp.state_reward_gen_dict[state][action]() ## state_reward_gen_dict 
            res.append((state, action, reward, first))
            if state in self.mdp.terminal_states:
                # if reaches terminate state, then finish this episode
                break
            state = next_state
        return res

#### Question 3:
Implement tabular 1-step TD algorithm for Value Function prediction

In [16]:
class TD0:

    def __init__(self, mdp, epsilon: float, learning_rate: float, num_episodes: int,max_steps: int):
        self.mdp=mdp,
        self.epsilon=epsilon,
        self.learning_rate = learning_rate
        self.num_episodes=num_episodes,
        self.max_steps=max_steps
    
    
    def get_one_TD_update(self, pol: Policy, state_value_dict, act_gen_dict, max_steps):
        # one episode update using TD(0)
        s_v_dict = copy.deepcopy(state_value_dict)
        for step in range(max_steps):
            action = act_gen_dict[state]()
            next_state, reward = self.mdp_rep.state_reward_gen_dict[state][action]()
            # for TD(0) 
            
            s_v_dict[state] += self.learning_rate * (updates / self.learning_rate_decay + 1) ** -0.5 *\
                (reward + self.mdp_rep.gamma * s_v_dict[next_state] -
                 s_v_dict[state])
            updates += 1
            steps += 1
            if state in self.mdp_rep.terminal_states:
                break
        return s_v_dict

    def get_value_func_dict(self, pol: Policy):
        state_action_dict = self.mdp_rep.state_action_dict
        state_value_dict = {s: 0.0 for s in sa_dict.keys()}
        act_gen_dict = {s: get_rv_gen_func_single(pol.get_state_probabilities(s))
                        for s in sa_dict.keys()}
        episodes = 0
        updates = 0

        while episodes < self.num_episodes:
            # initate the starting state of the MDP
            state = self.mdp_rep.init_state_gen()
            steps = 0
            terminate = False

            state_value_dict = get_one_TD_update(self.pol, state_value_dict, act_gen_dict, max_steps)

            episodes += 1

        return state_value_dict
        

#### Question 4:
Prove that fixed learning rate (step size alpha) for MC is equivalent to an exponentially decaying average of episode returns

$${V(S_t) <- V(S_t) + \alpha(G_t - V(S_t))}$$
Denote ${V(S_t)^k}$ as the kth update of ${V(S_t)}$
$${V(S_t) <- V(S_t) + \alpha(G_t - V(S_t))}$$
$${V(S_t)^k = V(S_t)^{k-1} + \alpha(G_t^{k-1} - V(S_t)^{k-1}) = (1-\alpha)V(S_t)^{k-1} + \alpha*G_t^{k-1}}$$
$${V(S_t)^k = (1-\alpha)*((1-\alpha)V(S_t)^{k-2} + \alpha*G_t^{k-2}) + \alpha*G_t^{k-1}}$$
$${V(S_t)^k = (1-\alpha)^2V(S_t)^{k-2} + (1-\alpha)*\alpha*G_t^{k-2} + \alpha*G_t^{k-1}}$$
$${V(S_t)^k = (1-\alpha)^{k-1}*\alpha*G_t^{0}+ ... +(1-\alpha)*\alpha*G_t^{k-2} + \alpha*G_t^{k-1}}$$
$${V(S_t)^k = \alpha((1-\alpha)^{k-1}*G_t^{0}+ ... +(1-\alpha)*G_t^{k-2} + \alpha*G_t^{k-1})}$$