In [68]:
from scipy.stats import poisson
import numpy as np
from enum import IntEnum
from typing import Tuple
import matplotlib.pyplot as plt


class Action(IntEnum):
    """Action"""

    LEFT = 0
    DOWN = 1
    RIGHT = 2
    UP = 3


def actions_to_dxdy(action: Action) -> Tuple[int, int]:
    """
    Helper function to map action to changes in x and y coordinates
    Args:
        action (Action): taken action
    Returns:
        dxdy (Tuple[int, int]): Change in x and y coordinates
    """
    mapping = {
        Action.LEFT: (-1, 0),
        Action.DOWN: (0, -1),
        Action.RIGHT: (1, 0),
        Action.UP: (0, 1),
    }
    return mapping[action]


class Gridworld5x5:
    """5x5 Gridworld"""

    def __init__(self) -> None:
        """
        State: (x, y) coordinates

        Actions: See class(Action).
        """
        self.rows = 5
        self.cols = 5
        self.state_space = [
            (x, y) for x in range(0, self.rows) for y in range(0, self.cols)
        ]
        self.action_space = len(Action)

        # TODO set the locations of A and B, the next locations, and their rewards
        self.A = (1, 4)
        self.A_prime = (1, 0)
        self.A_reward = 10
        self.B = (3, 4)
        self.B_prime = (3, 2)
        self.B_reward = 5

    def transitions(
        self, state: Tuple, action: Action
    ) -> Tuple[Tuple[int, int], float]:
        """Get transitions from given (state, action) pair.

        Note that this is the 4-argument transition version p(s',r|s,a).
        This particular environment has deterministic transitions

        Args:
            state (Tuple): state
            action (Action): action

        Returns:
            next_state: Tuple[int, int]
            reward: float
        """


        # TODO Check if current state is A and B and return the next state and corresponding reward
        # Else, check if the next step is within boundaries and return next state and reward

        if state == self.A:
            next_state = self.A_prime
            reward = self.A_reward
        elif state == self.B:
            next_state = self.B_prime
            reward = self.B_reward
        else:
            move = actions_to_dxdy(action)
            next_state = (state[0] + move[0], state[1] + move[1])
            reward = 0
            if (next_state[0] not in range(5)) or (next_state[1] not in range(5)):
                next_state = state
                reward = -1

        return next_state, reward

    def expected_return(
        self, V, state: Tuple[int, int], action: Action, gamma: float
    ) -> float:
        """Compute the expected_return for all transitions from the (s,a) pair, i.e. do a 1-step Bellman backup.

        Args:
            V (np.ndarray): list of state values (length = number of states)
            state (Tuple[int, int]): state
            action (Action): action
            gamma (float): discount factor

        Returns:
            ret (float): the expected return
        """

        next_state, reward = self.transitions(state, action)
        # TODO compute the expected return
        ret = (1 / self.action_space) * (reward + (gamma * V[next_state]))

        return ret


class JacksCarRental:
    def __init__(self, modified: bool = False) -> None:
        """JacksCarRental

        Args:
           modified (bool): False = original problem Q6a, True = modified problem for Q6b

        State: tuple of (# cars at location A, # cars at location B)

        Action (int): -5 to +5
            Positive if moving cars from location A to B
            Negative if moving cars from location B to A
        """
        self.modified = modified

        self.action_space = list(range(-5, 6))

        self.rent_reward = 10
        self.move_cost = 2

        # For modified problem
        self.overflow_cars = 10
        self.overflow_cost = 4

        # Rent and return Poisson process parameters
        # Save as an array for each location (Loc A, Loc B)
        self.rent = [poisson(3), poisson(4)]
        self.return_ = [poisson(3), poisson(2)]

        # Max number of cars at end of day
        self.max_cars_end = 20
        # Max number of cars at start of day
        self.max_cars_start = self.max_cars_end + max(self.action_space)

        self.state_space = [
            (x, y)
            for x in range(0, self.max_cars_end + 1)
            for y in range(0, self.max_cars_end + 1)
        ]

        # Store all possible transitions here as a multi-dimensional array (locA, locB, action, locA', locB')
        # This is the 3-argument transition function p(s'|s,a)
        self.t = np.zeros(
            (
                self.max_cars_end + 1,
                self.max_cars_end + 1,
                len(self.action_space),
                self.max_cars_end + 1,
                self.max_cars_end + 1,
            ),
        )

        # Store all possible rewards (locA, locB, action)
        # This is the reward function r(s,a)
        self.r = np.zeros(
            (self.max_cars_end + 1, self.max_cars_end + 1, len(self.action_space))
        )

    def _open_to_close(self, loc_idx: int) -> Tuple[np.ndarray, np.ndarray]:
        """Computes the probability of ending the day with s_end \in [0,20] cars given that the location started with s_start \in [0, 20+5] cars.

        Args:
            loc_idx (int): the location index. 0 is for A and 1 is for B. All other values are invalid
        Returns:
            probs (np.ndarray): list of probabilities for all possible combination of s_start and s_end
            rewards (np.ndarray): average rewards for all possible s_start
        """
        probs = np.zeros((self.max_cars_start + 1, self.max_cars_end + 1))
        rewards = np.zeros(self.max_cars_start + 1)
        for start in range(probs.shape[0]):
            # TODO Calculate average rewards.
            # For all possible s_start, calculate the probability of renting k cars.
            # Be sure to consider the case where business is lost (i.e. renting k > s_start cars)
            avg_rent = self.rent[loc_idx].mean()
            rewards[start] = self.rent_reward * avg_rent

            # TODO Calculate probabilities
            # Loop over every possible s_end
            for end in range(probs.shape[1]):
                prob = 0.0
                # Since s_start and s_end are specified,
                # you must rent a minimum of max(0, start-end)
                min_rent = max(0, start - end)

                # TODO Loop over all possible rent scenarios and compute probabilities
                # Be sure to consider the case where business is lost (i.e. renting k > s_start cars)
                for i in range(min_rent, start + 1):
                    prob += self.rent[loc_idx].pmf(i)

                probs[start, end] = prob

        return probs, rewards

    def _calculate_cost(self, state: Tuple[int, int], action: int) -> float:
        """A helper function to compute the cost of moving cars for a given (state, action)

        Note that you should compute costs differently if this is the modified problem.

        Args:
            state (Tuple[int,int]): state
            action (int): action
        """
        cost = self.move_cost * action

        return cost

    def _valid_action(self, state: Tuple[int, int], action: int) -> bool:
        """Helper function to check if this action is valid for the given state

        Args:
            state:
            action:
        """
        if state[0] < action or state[1] < -(action):
            return False
        else:
            return True

    def precompute_transitions(self) -> None:
        """Function to precompute the transitions and rewards.

        This function should have been run at least once before calling expected_return().
        You can call this function in __init__() or separately.

        """
        # Calculate open_to_close for each location
        day_probs_A, day_rewards_A = self._open_to_close(0)
        day_probs_B, day_rewards_B = self._open_to_close(1)

        # Perform action first then calculate daytime probabilities
        for locA in range(self.max_cars_end + 1):
            for locB in range(self.max_cars_end + 1):
                for ia, action in enumerate(self.action_space):
                    # Check boundary conditions
                    if not self._valid_action((locA, locB), action):
                        self.t[locA, locB, ia, :, :] = 0
                        self.r[locA, locB, ia] = 0
                    else:
                        # TODO Calculate day rewards from renting
                        # Use day_rewards_A and day_rewards_B and _calculate_cost()
                        self.r[locA, locB, ia] = day_rewards_A[locA] + day_rewards_B[locB] + \
                                                 self._calculate_cost((locA, locB), action)

                        # Loop over all combinations of locA_ and locB_
                        for locA_ in range(self.max_cars_end + 1):
                            for locB_ in range(self.max_cars_end + 1):

                                # TODO Calculate transition probabilities
                                # Use the probabilities computed from open_to_close
                                self.t[locA, locB, ia, locA_, locB_] = None

    def expected_return(
        self, V, state: Tuple[int, int], action: Action, gamma: float
    ) -> float:
        """Compute the expected_return for all transitions from the (s,a) pair, i.e. do a 1-step Bellman backup.

        Args:
            V (np.ndarray): list of state values (length = number of states)
            state (Tuple[int, int]): state
            action (Action): action
            gamma (float): discount factor

        Returns:
            ret (float): the expected return
        """

        # TODO compute the expected return
        ret = None
        return ret

    def transitions(self, state: Tuple, action: Action) -> np.ndarray:
        """Get transition probabilities for given (state, action) pair.

        Note that this is the 3-argument transition version p(s'|s,a).
        This particular environment has stochastic transitions

        Args:
            state (Tuple): state
            action (Action): action

        Returns:
            probs (np.ndarray): return probabilities for next states. Since transition function is of shape (locA, locB, action, locA', locB'), probs should be of shape (locA', locB')
        """
        # TODO
        probs = np.zeros((self.max_cars_end + 1, self.max_cars_end + 1))
        return probs

    def rewards(self, state, action) -> float:
        """Reward function r(s,a)

        Args:
            state (Tuple): state
            action (Action): action
        Returns:
            reward: float
        """
        # TODO
        return action * abs(self.rent_reward)


In [50]:
def policy_improve(V, g, pi):
    policy_stable = True
    for s in g.state_space:
        old_action = pi[s]
        a = []
        for i in range(g.action_space):
            a.append(g.expected_return(V, s, i, 0.9))
        pi[s] = np.argwhere(a == np.amax(a))

        if isinstance(old_action, int):
            old_action = np.empty(1)
        if old_action.all() != pi[s].all():
            policy_stable = False

    if policy_stable:
        for i in V.T[::-1]:
            print(i)

        print()
        for row in pi.T[::-1]:
            col = [a.flatten().tolist() for a in row]
            for element in col:
                print('[', end='')
                for a in element:
                    if a == 0:
                        print(u"\u2190", end='')  #Left
                    if a == 1:
                        print(u"\u2193", end='')  #Down
                    if a == 2:
                        print(u"\u2192", end='')  #Right
                    if a == 3:
                        print(u"\u2191", end='')  #Up
                print('] ', end='')
                if len(element) == 1:
                    print('   ', end='')
                elif len(element) == 2:
                    print('  ', end='')
            print()
        return policy_stable
    else:
        return policy_stable


def policy_eval(V, g, pi):
    theta = 0.001
    error = 2 * theta

    while error > theta:
        for s in g.state_space:
            v = V[s]
            action = pi[s].flatten()[0]
            V[s] = 4*g.expected_return(V, s, action, 0.9)


            error = abs(v - V[s])



In [58]:
def Q5_a():

    grid = Gridworld5x5()
    value_function = np.zeros(shape=(5, 5))
    convergence_threshold = 0.001
    error = 2 * convergence_threshold

    while error > convergence_threshold:
        for state in grid.state_space:
            previous_value = value_function[state]

            action_returns = 0

            # Iterate through possible actions
            for action in range(grid.action_space):
                action_returns += grid.expected_return(value_function, state, action, 0.9)

            # Update the value function
            value_function[state] = action_returns
            error = abs(previous_value - value_function[state])

    # Print the updated value function
    print("Value function to 1 decimal:-\n")
    for row in value_function.T[::-1]:
        print(row)
    print()

    # Print the value function with formatting
    print("Value function:-\n")
    for row in value_function.T[::-1]:
        formatted_row = [f"{value:.1f}" for value in row]
        print(formatted_row)


def Q5_b():

    grid = Gridworld5x5()
    value_function = np.zeros(shape=(5, 5))
    policy = np.empty(shape=(5, 5), dtype=object)
    convergence_threshold = 0.001
    error = 2 * convergence_threshold

    max_iterations = 100

    iterations = 0
    while iterations < max_iterations:
        for state in grid.state_space:
            previous_value = value_function[state]

            expected_returns = []

            for action in range(grid.action_space):
                expected_returns.append(4 * grid.expected_return(value_function, state, action, 0.9))

            value_function[state] = max(expected_returns)

            # Find the best actions
            best_actions = np.argwhere(expected_returns == np.amax(expected_returns))
            policy[state] = best_actions
            error = abs(previous_value - value_function[state])

        iterations += 1


    for row in value_function.T[::-1]:
        print(row)
    print()


    for row in value_function.T[::-1]:
        formatted_row = [f"{value:.1f}" for value in row]
        print(formatted_row)
    print()

    # Print the policy with arrows
    for row in policy.T[::-1]:
        for col in row:
            if len(col) == 0:
                print('[ ]   ', end='')
            else:
                print('[', end='')
                for action in col:
                    if action == 0:
                        print(u"\u2190", end='')  # Left
                    elif action == 1:
                        print(u"\u2193", end='')  # Down
                    elif action == 2:
                        print(u"\u2192", end='')  # Right
                    elif action == 3:
                        print(u"\u2191", end='')  # Up
                print(']   ', end='')
        print()



def Q5_c():
    grid = Gridworld5x5()
    value_function = np.zeros(shape=(5, 5))
    policy = np.zeros(shape=(5, 5), dtype=object)
    convergence_threshold = 0.1

    error = 2 * convergence_threshold

    while not policy_improve(value_function, grid, policy):
        policy_eval(value_function, grid, policy)


# def Q6_a():
#     env = JacksCarRental()
#     # grid.precompute_transitions()
#     threshold = 0.001
#     gamma = 0.9
#     results_list = run_policy_iteration(env, threshold, gamma)

#     # x = grid._open_to_close(1)[0]

#     # f = plt.imshow(x)
#     # plt.colorbar(f)

#     # plt.show()





In [69]:
# Q5_a()
# Q5_b()
Q5_c()
# Q6_a()

[21.97748529 24.4194281  21.97748529 19.4194281  17.47748529]
[19.77973676 21.97748529 19.77973676 17.80176308 16.02158677]
[17.80176308 19.77973676 17.80176308 16.02158677 14.4194281 ]
[16.02158677 17.80176308 16.02158677 14.4194281  12.97748529]
[14.4194281  16.02158677 14.4194281  12.97748529 11.67973676]

[→]    [←↓→↑] [←]    [←↓→↑] [←]    
[→↑]   [↑]    [←↑]   [←]    [←]    
[→↑]   [↑]    [←↑]   [←↑]   [←↑]   
[→↑]   [↑]    [←↑]   [←↑]   [←↑]   
[→↑]   [↑]    [←↑]   [←↑]   [←↑]   
