In [478]:
import numpy as np
import contextlib

# Configures numpy print options
@contextlib.contextmanager
def _printoptions(*args, **kwargs):
    original = np.get_printoptions()
    np.set_printoptions(*args, **kwargs)
    try:
        yield
    finally:
        np.set_printoptions(**original)


class EnvironmentModel:
    def __init__(self, n_states, n_actions, seed=None):
        self.n_states = n_states
        self.n_actions = n_actions

        self.random_state = np.random.RandomState(seed)

    def p(self, next_state, state, action):
        raise NotImplementedError()

    def r(self, next_state, state, action):
        raise NotImplementedError()

    def draw(self, state, action):
        p = [self.p(ns, state, action) for ns in range(self.n_states)]
        next_state = self.random_state.choice(self.n_states, p=p)
        reward = self.r(next_state, state, action)

        return next_state, reward


class Environment(EnvironmentModel):
    def __init__(self, n_states, n_actions, max_steps, pi, seed=None):
        EnvironmentModel.__init__(self, n_states, n_actions, seed)

        self.max_steps = max_steps

        self.pi = pi
        if self.pi is None:
            self.pi = np.full(n_states, 1. /n_states)

    def reset(self):
        self.n_steps = 0
        self.state = self.random_state.choice(self.n_states, p=self.pi)

        return self.state

    def step(self, action):
        if action < 0 or action >= self.n_actions:
            raise Exception('Invalid action.')

        self.n_steps += 1
        done = (self.n_steps >= self.max_steps)

        self.state, reward = self.draw(self.state, action)

        return self.state, reward, done

    def render(self, policy=None, value=None):
        raise NotImplementedError()


class FrozenLake(Environment):
    def __init__(self, lake, slip, max_steps, seed=None):
        """
        lake: A matrix that represents the lake. For example:
        lake =  [['&', '.', '.', '.'],
                ['.', '#', '.', '#'],
                ['.', '.', '.', '#'],
                ['#', '.', '.', '$']]
        slip: The probability that the agent will slip
        max_steps: The maximum number of time steps in an episode
        seed: A seed to control the random number generator (optional)
        """
        # start (&), frozen (.), hole (#), goal ($)
        self.lake = np.array(lake)
        self.lake_flat = self.lake.reshape(-1)

        self.slip = slip

        n_states = self.lake.size + 1
        n_actions = 4

        pi = np.zeros(n_states, dtype=float)
        pi[np.where(self.lake_flat == '&')[0]] = 1.0

        self.absorbing_state = n_states - 1

        # TODO: We can pre-calculate p and r here and put them in an array but is not necessary as answered in the forum

        Environment.__init__(self, n_states, n_actions, max_steps, pi, seed=seed)

    def step(self, action):
        state, reward, done = Environment.step(self, action)

        done = (state == self.absorbing_state) or done

        return state, reward, done

    def p(self, next_state, state, action):
        # Convert possible actions to directions for clarity of reading the code:
        up, left, down, right = [0, 1, 2, 3]
        # Probability of transitioning from state to next_state given an action
        # (return value)
        pt = 0.0
        
        
        # 1. If the agent is in the absorbing state the probability of remaining in 
        #    the same place is 1. Any action taken in the absorbing state leads
        #    to the absorbing state. Validating this first to avoid dealing with
        #    'out of index' issues with the self.lake_flat array since the
        #    absorbing state is in the self.lake.size + 1 position
        if state == self.absorbing_state and next_state == self.absorbing_state:
            # Assigning return value to pt just to keep the convention that pt
            # is the return value. Same convention is used in the rest of the
            # function.
            pt = 1.0
            return pt
        elif state == self.absorbing_state:
            return pt
   
        # Is the agent in a hole or the goal ?
        hole_or_goal = True if self.lake_flat[state] == '#' or self.lake_flat[state] == '$' else False


        # 2. If the agent is in a hole or in the goal the probability of moving to 
        #    the absorbing state is 1.
        if hole_or_goal and next_state == self.absorbing_state:
            pt = 1.0
            return pt
        # If the agent is in any position different from a hole or the goal, and it
        # wants to move to the absorbing state the probability is 0, or if it is
        # in a hole or the goal, and it tries to move to any other state different
        # from the absorbing state the probability is 0. 
        elif next_state == self.absorbing_state or hole_or_goal:
            return pt
        
        
        # 3. Validate if the agent can move from the current state to the next_state.
        #    Consider that it has 4 possible actions: up, down, left, right;
        #    this means that it can move in just 1 direction per action, either
        #    x or y. The absolute change in the coordinates x, y from state to
        #    next_state can't be greater than 1
        
        # Get x, y coordinates of state and next_state 
        next_state_y, next_state_x = np.unravel_index(next_state, self.lake.shape)
        state_y, state_x = np.unravel_index(state, self.lake.shape)
        # A negative delta_x indicates that the agent should move to the left,
        # positive delta_x indicates it should move to the right
        delta_x = next_state_x - state_x
        # A negative delta_y indicates that the agent should go down, positive
        # delta_y indicate it should go up
        delta_y = next_state_y - state_y

        # Probability of making an invalid move is 0
        if (np.abs(delta_x) + np.abs(delta_y)) > 1 and not hole_or_goal:
            return pt
        
        
        # 4. Given that the transition from state to next_state is valid, validate
        #    that by executing the action the agent can get from state to
        #    next_state either by the selected action or by slipping.
        
        # Borders of the lake 
        border_y, border_x = self.lake.shape
        border_x -= 1
        border_y -= 1
        
        # Defining walls as the number of borders each tile collides with.
        # Get walls in x and y
        walls_y = 1 if state_y == border_y or state_y == 0 else 0
        walls_x = 1 if state_x == border_x or state_x == 0 else 0
        # Add the number of walls this state collides with x and y
        walls   = walls_y + walls_x
        
        # Transition is from state to next_state, where state is not equal to 
        # next_state. In other words, the agent is trying to move from the
        # current state
        if delta_x or delta_y:
            # For transitioning from state to the next_state the action provided
            # is the correct one. The agent will get to the next state unless it slips
            if ((delta_x > 0 and action == right) or (delta_x < 0 and action == left) or
                (delta_y > 0 and action == down) or (delta_y < 0 and action == up)):
                pt = (1 - self.slip) + (self.slip / self.n_actions)
            # For transitioning from state to the next_state the action provided
            # is incorrect. The only wat to transition is by slipping. 
            else:
                pt = (self.slip / self.n_actions)
        # Transition is to the same state: state == next_state. The agent can remain in 
        # the same state if it crashes with a wall. The number of walls contribute 
        # to how probable is to remain in the same state.
        elif walls:
            # The agent is moving to a wall, it will remain in the same state unless is
            # slips to a place without a wall.
            if (action == left and state_x == 0) or (action == right and state_x == border_x) or \
               (action == up and state_y == 0) or (action == down and state_y == border_y):
                pt = (1 - self.slip) + ((self.slip / self.n_actions) * walls)
            # Action is not in a direction of a wall, the agent will remain in the same
            # state just if it slips into a wall
            else:
                pt = (self.slip / self.n_actions) * walls
        # Is not necessary to put an else case here, since pt was initialized to 0.0
        # If pt has a value of 0.0 at this point means that the agent wanted to remain
        # in the same state but there are no walls to crash to remain in the same place. 
        # So the probability of remaining in the same place is 0 given any action,
        # even if the agent slips it will end up moving. 
                
        # Return the probability of moving from state to next_state given an action and
        # the probability of slipping
        return pt

    def r(self, next_state, state, action):
        # Reward received by transitioning from state to next_state 
        # given an action
        reward = 0.0
        
        # The agent receives a reward of 1 upon taking an action in the goal
        if state != self.absorbing_state and self.lake_flat[state] == '$':
            reward = 1.0
    
        # In any other case there is no reward
        return reward    

    def render(self, policy=None, value=None):
        if policy is None:
            lake = np.array(self.lake_flat)

            if self.state < self.absorbing_state:
                lake[self.state] = '@'

            print(lake.reshape(self.lake.shape))
        else:
            # UTF-8 arrows look nicer, but cannot be used in LaTeX
            # https://www.w3schools.com/charsets/ref_utf_arrows.asp
            actions = ['^', '<', '_', '>']

            print('Lake:')
            print(self.lake)

            print('Policy:')
            policy = np.array([actions[a] for a in policy[:-1]])
            print(policy.reshape(self.lake.shape))

            print('Value:')
            with _printoptions(precision=3, suppress=True):
                print(value[:-1].reshape(self.lake.shape))

def play(env):
    actions = ['w', 'a', 's', 'd']

    state = env.reset()
    env.render()

    done = False
    while not done:
        c = input('\nMove: ')
        if c not in actions:
            raise Exception('Invalid action')

        state, r, done = env.step(actions.index(c))

        env.render()
        print('Reward: {0}.'.format(r))
        
"""
    Definition: Create a random deterministic policy. Returns a numpy array with
                the random deterministic policy
"""
def random_deterministic_policy(env):
    rand_policy = list()
    # Get a random action and add them to the policy for each state
    [rand_policy.append(np.random.randint(0, env.n_actions)) for i in range(env.n_states)]

    return np.array(rand_policy)

def policy_evaluation(env, policy, gamma, theta, max_iterations):
    value = np.zeros(env.n_states, dtype=np.float32)

    # Loop the number of iterations or while our error is greater than
    # the tolerance parameter
    for i in range(max_iterations):
        delta = 0
        # Go through all tha states
        for state in range(env.n_states):
            # Store value of the state previous to any modification
            v = value[state]
            policy_state_product = 0

            # We have just one action per state in a deterministic policy, is not necessary
            # to loop through all the actions. Leaving this 'for' just to match the policy
            # evaluation formula 
            # Go through all the actions in the policy
            for a in range(env.n_actions):
                policy_p = 0
                # If the action is not part of the policy don't calculate anything
                if a == policy[state]:
                    policy_p = 1
                else:
                    continue

                state_product = 0

                # Go through all the possible next states
                for state_prime in range(env.n_states):
                    # Get probability of transitioning from state to state_prime
                    # given action
                    pt = env.p(state_prime, state, a)
                    # Get reward of transitioning from state to state_prime given
                    # action
                    rt = env.r(state_prime, state, a)

                    state_product += pt * (rt + (gamma * value[state_prime]))

                # Store product from all possible action in policy
                policy_state_product += state_product * policy_p
              
            # Update value of this state  
            value[state] = policy_state_product

            # Calculate the difference between our previous value and the new one
            delta = max(delta, abs(v - value[state]))

        # If our delta is smaller than the error value theta, stop
        if delta < theta:
            break

    return value


def policy_improvement(env, value, gamma):
    improved_policy = np.zeros(env.n_states, dtype=int)

    # Go through all the states
    for state in range(env.n_states):
        q = list()
        
        # Go through all the actions
        for a in range(env.n_actions):
            state_prime_q = 0
            
            # Go through all next states from the actual state
            for state_prime in range(env.n_states):
                # Get probability of transitioning from state to state_prime
                # given action
                pt = env.p(state_prime, state, a)
                # Get reward of transitioning from state to state_prime given
                # action
                rt = env.r(state_prime, state, a)

                state_prime_q += pt * (rt + (gamma * value[state_prime]))

            # Store value of all actions of possible next states in a list
            q.append(state_prime_q)
            
        q = np.array(q)
        
        # Get the best action in the current state
        improved_policy[state] = np.argmax(q)

    # Return improved policy
    return improved_policy
  
def policy_iteration(env, gamma, theta, max_iterations, policy=None):
    if policy is None:
        policy = np.zeros(env.n_states, dtype=int)
    else:
        policy = np.array(policy, dtype=int)
        
    # Iterate the maximum number of iterations or while there are still
    # improvements in the policy
    for i in range(max_iterations):
        prv_policy = policy
        #print("Iteration ", i)
        # Get value of current policy
        value = policy_evaluation(env, policy, gamma, theta, max_iterations)
        # Improve policy
        policy = policy_improvement(env, value, gamma)
        
        # If policy cannot improve more we have the optimal policy
        equal = prv_policy == policy
        if not np.any(equal == False):
            break

    # Return the optimal policy and value of tha policy
    return policy, value

    
def value_iteration(env, gamma, theta, max_iterations, value=None):
    if value is None:
        value = np.zeros(env.n_states)
    else:
        value = np.array(value, dtype=np.float32)
        
    # Keep iterating while we have budget, or if the delta error is reached
    for i in range(max_iterations):
        #print("Iteration ", i)
        delta = 0

        # Go through all the states
        for state in range(env.n_states):
            v = value[state]
            policy_state_product = list()

            # Go through all the actions
            for a in range(env.n_actions):
                state_product = 0

                # Go through all the next states given the current state
                for state_prime in range(env.n_states):
                    pt = env.p(state_prime, state, a)
                    rt = env.r(state_prime, state, a)

                    state_product += pt * (rt + (gamma * value[state_prime]))

                # Store evaluation of states in a list, we will decide which is
                # the best outside this loop
                policy_state_product.append(state_product)

            # Update value whit best evaluation
            policy_state_product = np.array(policy_state_product)
            value[state] = np.max(policy_state_product)

            # Update delta
            delta = max(delta, abs(v - value[state]))

        # If our delta is smaller than the error value theta, stop
        if delta < theta:
            break

    # Update policy given the optimal value
    policy = policy_improvement(env, value, gamma)

    # Return optimal policy and value
    return policy, value    



### 1. Testing values of p function, they should match the ones provided in the p.npy file

In [479]:
# We have 17 * 17 * 4 shape of the matrix, absorving state is the last one
# This means that we can move betweeen the 16 states, and wwe can transition to the absorbing state that is 17
# next_state, state, action
lake_p = np.load("p.npy")
seed = 0
gamma = 0.9

# Small frozen lake array
lake =  [['&', '.', '.', '.'],
        ['.', '#', '.', '#'],
        ['.', '.', '.', '#'],
        ['#', '.', '.', '$']]

# Big lake
#lake = [['&', '.', '.', '.', '.', '.', '.', '.'],
#        ['.', '.', '.', '.', '.', '.', '.', '.'],
#        ['.', '.', '.', '#', '.', '.', '.', '.'],
#        ['.', '.', '.', '.', '.', '#', '.', '.'],
#        ['.', '.', '.', '#', '.', '.', '.', '.'],
#        ['.', '#', '#', '.', '.', '.', '#', '.'],
#        ['.', '#', '.', '.', '#', '.', '#', '.'],
#        ['.', '.', '.', '#', '.', '.', '.', '$']]

# Create small frozen lake environment
env = FrozenLake(lake, slip=0.1, max_steps=16, seed=seed)

# Create an array of transitioning to all the states given all
# the actions in the lake. Using this array to compare it to the
# one provided in the assignment that was loaded at the begining
# of this document 
probb = list()
for nextState in range (env.n_states):
    for sttate in range (env.n_states):
        for act in range (env.n_actions):
            probb.append((env.p(nextState, sttate, act)))

lake = np.array(lake)
lake_flat = lake.reshape(-1)

# Convert list to numpy array
probb = np.array(probb)
# Give correct shape, trusting in reshapes black magic
probb = probb.reshape(lake.size + 1, lake.size + 1, 4)

# Validate that the array is the same
#equal = probb == lake_p
# If there is no False, means that all probabilities match
# with the probabilities given in the assignment.
#print(np.where(equal == False))


### 2. Testing of: Tabular model-based reinforcement learning
**Testing of:** Policy evaluation, policy improvement, policy iteration and value iteration<br>
**Hint from document:** A *deterministic* policy may be represented by an array that contains the action prescribed for each state  

In [480]:
# Taken from document in the main function side
theta = 0.001
max_iterations = 128

print('# Model-based algorithms')

print('')

print('## Policy iteration')
policy, value = policy_iteration(env, gamma, theta=0.001, max_iterations=128)
env.render(policy, value)

print('')

print('## Value iteration')
policy, value = value_iteration(env, gamma, theta=0.001, max_iterations=128)
env.render(policy, value)


# Model-based algorithms

## Policy iteration
Lake:
[['&' '.' '.' '.']
 ['.' '#' '.' '#']
 ['.' '.' '.' '#']
 ['#' '.' '.' '$']]
Policy:
[['_' '>' '_' '<']
 ['_' '^' '_' '^']
 ['>' '_' '_' '^']
 ['^' '>' '>' '^']]
Value:
[[0.455 0.504 0.579 0.505]
 [0.508 0.    0.653 0.   ]
 [0.584 0.672 0.768 0.   ]
 [0.    0.771 0.887 1.   ]]

## Value iteration
Lake:
[['&' '.' '.' '.']
 ['.' '#' '.' '#']
 ['.' '.' '.' '#']
 ['#' '.' '.' '$']]
Policy:
[['_' '>' '_' '<']
 ['_' '^' '_' '^']
 ['>' '_' '_' '^']
 ['^' '>' '>' '^']]
Value:
[[0.455 0.504 0.579 0.505]
 [0.508 0.    0.653 0.   ]
 [0.584 0.672 0.768 0.   ]
 [0.    0.771 0.887 1.   ]]
