Installing dependencies

In [None]:
! pip3 install -e .

# Q1 Dynamic Programming
- To find optimal policies for Markov Decision Processes (MDPs)
- Implement the Policy Iteration (PI) and Value Iteration (VI) algorithms (functions).
- For each algorithm, you can find the functions that you need to implement under Tasks below. 
- Read the code documentation to understand the input and required outputs of these functions. 
- We will mark your submission only based on the correctness of the outputs of these functions.

In [None]:
# from rl2023.exercise1.mdp_solver import ValueIteration, PolicyIteration
from rl2023.exercise1.mdp import MDP, Transition, State, Action
from rl2023 import constants
import numpy as np

In [None]:
constants.EX1_CONSTANTS

In [None]:
def calculate_state_values(policy, rewards, gamma=0.9, theta=0.01):
    """
  Args:
      policy (np.array): Policy giving the probability of taking each action from each state.
      rewards (np.array): Rewards corresponding to reaching the next state from each state (so r(s,s') here rather than the usual r(s,a)).
      gamma (float, optional): Discount factor. Defaults to 0.9.
      theta (float, optional): Minimum value change threshold to terminate iteration. Defaults to 0.01.

  Returns:
      values (np.array): State values for given policy and reward mappings.
  """    
    num_states, num_actions = policy.shape # 2, 3
    values = np.zeros((num_states+1)) # Include terminal state
    
    print(f'Beginning policy evaluation for given policy and MDP...\n')
    iteration = 1

    while True:
        print(f'Iteration: {iteration} \t Current Values: {values}')
        delta=0
        initial_values = values
        values = np.zeros_like(values)
        for state in range(num_states):
            for action in range(num_actions):
                # With probability 0.9, next state = action with corresponding reward. With probability 0.1, next state = state with no reward.
                next_state_probabilities = {action:0.9, state:0.1}
                # Often we leave this next state computation (transition dynamics) to the environment. But to use complete dynamic programming we must know 
                # the transition probablities.
                for next_state in next_state_probabilities.keys():
                  # Note the expectation here is over both the policy and next state probabilities (environment dynamics)
                    aa = policy[state][action]*next_state_probabilities[next_state] * (rewards[state][next_state] + gamma * initial_values[next_state])
                    values[state] += aa
#                     print(aa)

            delta = max(delta, abs(initial_values[state]-values[state]))

        if delta < theta:
            print(f'\nMax difference in state value from previous iteration = {delta} which is less than threshold {theta}. Policy Evaluation terminating...\n')
            break

        iteration+=1

    print(f'Final policy state values: {values}')
    return values

In [None]:
def calculate_greedy_policy(policy, rewards, values, valid_actions, gamma=0.9):
    """Improve policy (take greedy actions) with respect to rewards and state values.

  Args:
      policy (np.array): Policy giving the probability of taking each action from each state.
      rewards (np.array): Rewards corresponding to reaching the next state from each state (so r(s,s') here rather than the usual r(s,a)).
      values (np.array): State values evaluated for current policy.
      gamma (float, optional): Discount factor. Defaults to 0.9.

  Returns:
      policy (np.array): Improved (greedy) policy.
  """

    num_states, num_actions = policy.shape # 2, 3
    greedy_policy = np.zeros_like(policy)
    state_action_values = np.nan*policy
    for action in range(num_actions):
        for state in range(num_states):
            if valid_actions[state, action]==1:
                state_action_values[state][action] = 0.0
                # With probability 0.9, next state = action with corresponding reward. With probability 0.1, next state = state with no reward.
                next_state_probabilities = {action:0.9, state:0.1}
                # Often the environment handles this next state computation (transition dynamics). But to use complete dynamic programming we must know 
                # the transition probablities.
        for next_state in next_state_probabilities.keys():
          # Note the expectation is now only over next state probabilities (environment dynamics) since we want the value for each valid state and action.
            state_action_values[state][action] += next_state_probabilities[next_state]*(rewards[state][next_state]+gamma*values[next_state])
            
    
    greedy_action = np.nanargmax(state_action_values[state]) # Argmax ignoring invalid actions with nan value
  
    for action in range(num_actions):
        greedy_policy[state][action] = 1 if action == greedy_action else 0

    print(f'State action values with previous policy:\n {state_action_values}\n')
    print(f'Greedy policy after policy improvement:\n {greedy_policy}')
    return greedy_policy

In [None]:
# state values
valid_actions = np.array([[0,1,0],[1,0,1]])

policy = np.array([[0,1,0],[0.5,0,0.5]])

rewards = np.array([[0,0,0],[0,0,10]])
###############################################################

# algorithm
values = calculate_state_values(policy, rewards, gamma=0.9, theta=0.01)

policy = calculate_greedy_policy(policy, rewards, values, valid_actions, gamma=0.9)

# values = calculate_state_values(policy, rewards, gamma=0.9, theta=0.01)

# policy = calculate_greedy_policy(policy, rewards, values, valid_actions, gamma=0.9)

In [None]:
class ValueIteration(MDPSolver):
    """MDP solver using the Value Iteration algorithm
    """

    def _calc_value_func(self, theta: float) -> np.ndarray:
        """Calculates the value function

        **YOU MUST IMPLEMENT THIS FUNCTION FOR Q1**

        **DO NOT ALTER THE MDP HERE**

        Useful Variables:
        1. `self.mpd` -- Gives access to the MDP.
        2. `self.mdp.R` -- 3D NumPy array with the rewards for each transition.
            E.g. the reward of transition [3] -2-> [4] (going from state 3 to state 4 with action
            2) can be accessed with `self.R[3, 2, 4]`
        3. `self.mdp.P` -- 3D NumPy array with transition probabilities.
            *REMEMBER*: the sum of (STATE, ACTION, :) should be 1.0 (all actions lead somewhere)
            E.g. the transition probability of transition [3] -2-> [4] (going from state 3 to
            state 4 with action 2) can be accessed with `self.P[3, 2, 4]`

        :param theta (float): theta is the stop threshold for value iteration
        :return (np.ndarray of float with dim (num of states)):
            1D NumPy array with the values of each state.
            E.g. V[3] returns the computed value for state 3
        """
        V = np.zeros(self.state_dim)
        ### PUT YOUR CODE HERE ###
        # for each state, find the a which is maximum
        num_states, num_actions = self.state_dim, self.action_dim 
        rewards = self.mdp.R # access rewards 
        gamma = self.gamma
        delta = 0
        policies = np.zeros(num_states)
        while delta < theta:
            for s in range(num_states):
                v = V[s].copy()
                # initialise an array for each iteration (state), 
                # which consists of the possible values for each action 
                find_max = np.zeros(num_actions) 
                for a in range(num_actions): 
                    for ns in range(num_states):
                        next_state_probabilities = self.mdp.P[s, a, ns]
                        find_max[a] += next_state_probabilities*(rewards[s, a, ns] + gamma * V[ns])
                V[s] = max(find_max)
                policies[s] = np.nanargmax(find_max)
                
            delta = max(delta, abs(v - V[s]))
        
#         raise NotImplementedError("Needed for Q1")
        return V

    def _calc_policy(self, V: np.ndarray) -> np.ndarray:
        """Calculates the policy

        **YOU MUST IMPLEMENT THIS FUNCTION FOR Q1**

        :param V (np.ndarray of float with dim (num of states)):
            A 1D NumPy array that encodes the computed value function (from _calc_value_func(...))
            It is indexed as (State) where V[State] is the value of state 'State'
        :return (np.ndarray of float with dim (num of states, num of actions):
            A 2D NumPy array that encodes the calculated policy.
            It is indexed as (STATE, ACTION) where policy[STATE, ACTION] has the probability of
            taking action 'ACTION' in state 'STATE'.
            REMEMBER: the sum of policy[STATE, :] should always be 1.0
            For deterministic policies the following holds for each state S:
            policy[S, BEST_ACTION] = 1.0
            policy[S, OTHER_ACTIONS] = 0
        """
        policy = np.zeros([self.state_dim, self.action_dim])
        ### PUT YOUR CODE HERE ###
        num_states, num_actions = self.state_dim, self.action_dim 
        rewards = self.mdp.R # access rewards 
        gamma = self.gamma
# #         raise NotImplementedError("Needed for Q1")
        for s in range(num_states):
            find_max = np.zeros(num_actions) 
            for a in range(num_actions): 
                for ns in range(num_states):
                    next_state_probabilities = self.mdp.P[s, a, ns]
                    find_max[a] += next_state_probabilities*(rewards[s, a, ns] + gamma * V[ns])
            for i in range(len(find_max)):
                if find_max[i] == V[s]:
                    policy[s] = i
            
        return policy

    def solve(self, theta: float = 1e-6) -> Tuple[np.ndarray, np.ndarray]:
        """Solves the MDP

        Compiles the MDP and then calls the calc_value_func and
        calc_policy functions to return the best policy and the
        computed value function

        **DO NOT CHANGE THIS FUNCTION**

        :param theta (float, optional): stop threshold, defaults to 1e-6
        :return (Tuple[np.ndarray of float with dim (num of states, num of actions),
                       np.ndarray of float with dim (num of states)):
            Tuple of calculated policy and value function
        """
        
        self.mdp.ensure_compiled()
        V = self._calc_value_func(theta)
        policy = self._calc_policy(V)

        return policy, V


In [None]:
class PolicyIteration(MDPSolver):
    """MDP solver using the Policy Iteration algorithm
    """

    def _policy_eval(self, policy: np.ndarray) -> np.ndarray:
        """Computes one policy evaluation step

        **YOU MUST IMPLEMENT THIS FUNCTION FOR Q1**

        :param policy (np.ndarray of float with dim (num of states, num of actions)):
            A 2D NumPy array that encodes the policy.
            It is indexed as (STATE, ACTION) where policy[STATE, ACTION] has the probability of
            taking action 'ACTION' in state 'STATE'.
            REMEMBER: the sum of policy[STATE, :] should always be 1.0
            For deterministic policies the following holds for each state S:
            policy[S, BEST_ACTION] = 1.0
            policy[S, OTHER_ACTIONS] = 0
        :return (np.ndarray of float with dim (num of states)): 
            A 1D NumPy array that encodes the computed value function
            It is indexed as (State) where V[State] is the value of state 'State'
        """
        ### PUT YOUR CODE HERE ###
        # initilise V(s)
        theta = 0.01 # self.theta
#         policy = np.zeros([self.state_dim, self.action_dim])
#         policy = np.array([[1,0,0],[0,1,0],[0.5,0,0.5]])
        num_states, num_actions = policy.shape # 2, 3
    
        V = np.zeros(self.state_dim)
#         values = np.zeros((num_states + 1)) # Include terminal state
        gamma = 0.9 #self.gamma # obtain gamma value
        
        rewards = self.mdp.R # access rewards 
#         print(rewards)
#         print(self.mdp.P) 

        
        print(f'Beginning policy evaluation for given policy and MDP...\n')
        iteration = 1

        while True:
            print(f'Iteration: {iteration} \t Current Values: {V}')
            delta = 0
            # set v = V
            initial_values = V.copy()
            # values = V(s)
#             V = np.zeros_like(V)
            for s in range(num_states):
                for a in range(num_actions):
                    for ns in range(num_states):
                        next_state_probabilities = self.mdp.P[s, a, ns] # *** how to get te next state policy?
                      # Note the expectation here is over both the policy and next state probabilities (environment dynamics)
                        V[s] += policy[s][a]*next_state_probabilities*(rewards[s, a, ns] + gamma * initial_values[ns])

                delta = max(delta, abs(initial_values[s]-V[s]))
        
            if delta < theta:
                print(f'\nMax difference in state value from previous iteration = {delta} which is less than threshold {theta}. Policy Evaluation terminating...\n')
                break

            iteration+=1

            print(f'Final policy state values: {V}')
        
        
#         raise NotImplementedError("Needed for Q1")
        return np.array(V)

    def _policy_improvement(self) -> Tuple[np.ndarray, np.ndarray]:
        """Computes policy iteration until a stable policy is reached

        **YOU MUST IMPLEMENT THIS FUNCTION FOR Q1**

        Useful Variables (As with Value Iteration):
        1. `self.mpd` -- Gives access to the MDP.
        2. `self.mdp.R` -- 3D NumPy array with the rewards for each transition.
            E.g. the reward of transition [3] -2-> [4] (going from state 3 to state 4 with action
            2) can be accessed with `self.R[3, 2, 4]`
        3. `self.mdp.P` -- 3D NumPy array with transition probabilities.
            *REMEMBER*: the sum of (STATE, ACTION, :) should be 1.0 (all actions lead somewhere)
            E.g. the transition probability of transition [3] -2-> [4] (going from state 3 to
            state 4 with action 2) can be accessed with `self.P[3, 2, 4]`

        :return (Tuple[np.ndarray of float with dim (num of states, num of actions),
                       np.ndarray of float with dim (num of states)):
            Tuple of calculated policy and value function
        """
        num_states, num_actions = self.state_dim, self.action_dim 
        policy = np.zeros([num_states, num_actions])
#         policy = np.array([[1,0,0],[0,1,0],[0.5,0,0.5]])
        policy = np.array([[1,0,0],[1,0,0],[1,0,0]])
        V = np.zeros([num_states])
        ### PUT YOUR CODE HERE ###
#         raise NotImplementedError("Needed for Q1")
        gamma = 0.9 #self.gamma 
        rewards = self.mdp.R # access rewards 
        policy_stable = False
        if (policy_stable == False):
            V = self._policy_eval(policy)
            print(V)
            for s in range(num_states):
                action = policy[s].copy()
                policy_max = np.zeros(num_actions)
                for a in range(num_actions): ### in action?
                    for ns in range(num_states):
                        next_state_probabilities = self.mdp.P[s, a, ns] 
                        policy_max[a] += next_state_probabilities*(rewards[s, a, ns] + gamma * V[ns])
                        
                # for each state, take the action which leads to maximum policy[s]
#                 policy[s] = max(find_max_a, key=find_max_a.get)
                policy[s] = np.nanargmax(policy_max)

                # if action == policy[s], then policy_stable = true, and break loop 
                print("action =", action, "policy = ", policy[s])
            
                if (action == policy[s]).all():
                    print(policy_stable)
                    policy_stable = True
                    break

        return V, policy
    # 3.
    # set (random?) possible actions (a) from the policy
    # find the best policy
    # if a is not equal to the best policy, repeat the steps until they are equal

    def solve(self, theta: float = 1e-6) -> Tuple[np.ndarray, np.ndarray]:
        """Solves the MDP

        This function compiles the MDP and then calls the
        policy improvement function that the student must implement
        and returns the solution

        **DO NOT CHANGE THIS FUNCTION**

        :param theta (float, optional): stop threshold, 
        defaults to 1e-6
        :return (Tuple[np.ndarray of float with dim (num of states, num of actions),
                       np.ndarray of float with dim (num of states)]):
            Tuple of calculated policy and value function
        """
        self.mdp.ensure_compiled()
        self.theta = theta
        return self._policy_improvement()

In [None]:
mdp = MDP()
mdp.add_transition(
    #         start action end prob reward
    Transition("rock0", "jump0", "rock0", 1, 0),
    Transition("rock0", "stay", "rock0", 1, 0),
    Transition("rock0", "jump1", "rock0", 0.1, 0),
    Transition("rock0", "jump1", "rock1", 0.9, 0),
    Transition("rock1", "jump0", "rock1", 0.1, 0),
    Transition("rock1", "jump0", "rock0", 0.9, 0),
    Transition("rock1", "jump1", "rock1", 0.1, 0),
    Transition("rock1", "jump1", "land", 0.9, 10),
    Transition("rock1", "stay", "rock1", 1, 0),
    Transition("land", "stay", "land", 1, 0),
    Transition("land", "jump0", "land", 1, 0),
    Transition("land", "jump1", "land", 1, 0),
)
# solver = ValueIteration(mdp, CONSTANTS["gamma"])
# policy, valuefunc = solver.solve()
# print("---Value Iteration---")
# print("Policy:")
# print(solver.decode_policy(policy))
# print("Value Function")
# print(valuefunc)

solver = PolicyIteration(mdp, CONSTANTS["gamma"])
policy, valuefunc = solver.solve()
print("---Policy Iteration---")
print("Policy:")
print(solver.decode_policy(policy))
print("Value Function")
print(valuefunc)

In [1]:
from abc import ABC, abstractmethod
import numpy as np
from typing import List, Tuple, Dict, Optional, Hashable

from rl2023.constants import EX1_CONSTANTS as CONSTANTS
from rl2023.exercise1.mdp import MDP, Transition, State, Action

class MDPSolver(ABC):
    """Base class for MDP solvers

    **DO NOT CHANGE THIS CLASS**

    :attr mdp (MDP): MDP to solve
    :attr gamma (float): discount factor gamma to use
    :attr action_dim (int): number of actions in the MDP
    :attr state_dim (int): number of states in the MDP
    """

    def __init__(self, mdp: MDP, gamma: float):
        """Constructor of MDPSolver

        Initialises some variables from the MDP, namely the state and action dimension variables

        :param mdp (MDP): MDP to solve
        :param gamma (float): discount factor (gamma)
        """
        self.mdp: MDP = mdp
        self.gamma: float = gamma

        self.action_dim: int = len(self.mdp.actions)
        self.state_dim: int = len(self.mdp.states)

    def decode_policy(self, policy: Dict[int, np.ndarray]) -> Dict[State, Action]:
        """Generates greedy, deterministic policy dict

        Given a stochastic policy from state indeces to distribution over actions, the greedy,
        deterministic policy is generated choosing the action with highest probability

        :param policy (Dict[int, np.ndarray of float with dim (num of actions)]):
            stochastic policy assigning a distribution over actions to each state index
        :return (Dict[State, Action]): greedy, deterministic policy from states to actions
        """
        new_p = {}
        for state, state_idx in self.mdp._state_dict.items():
            new_p[state] = self.mdp.actions[np.argmax(policy[state_idx])]
        return new_p

    @abstractmethod
    def solve(self):
        """Solves the given MDP
        """
        ...