In [None]:
import sys
import gym
import numpy as np

# Slippery Nature of the Frozen Lake

In the Frozen Lake environment, the parameter `is_slippery` determines whether the agent moves in the intended direction with certainty or if there's a chance of slipping.

- When `is_slippery` is **True**, the agent may not always move in the intended direction due to the slippery nature of the frozen lake. In this case, there is a probability of 1/3 that the agent will move in the intended direction, while there is an equal probability of 1/3 for the agent to move in either of the perpendicular directions.

- When `is_slippery` is **False**, the agent moves in the intended direction with certainty, without any possibility of slipping.

This parameter allows for different levels of stochasticity in the agent's movement, affecting the difficulty and realism of the environment.


In [None]:
env = gym.make('FrozenLake-v1',is_slippery=True)

In [1]:
def calculate_state_value(env, current_state, value_matrix, discount_factor):

    num_actions = env.action_space.n
    action_values = np.zeros(shape=num_actions)
    for action in range(num_actions):
        for transition_prob, next_state, reward, done in env.env.P[current_state][action]:
            action_values[action] += transition_prob * (reward + discount_factor * value_matrix[next_state])
    return action_values

# Policy Evaluation Function and Parameters

Policy Evaluation Function

The function iterates through the policy evaluation process until either convergence or the maximum number of iterations is reached.
Within each iteration, it updates the state values using the Bellman equation for state values, considering the current policy.
Convergence is determined by comparing the maximum change in state values (delta) against the convergence threshold.
If the convergence threshold is met, the function terminates and returns the estimated state values.
The number of iterations taken for policy evaluation is not printed in this version for brevity.






Parameters:


policy_matrix: The policy matrix specifying the probability of taking each action in each state.
    
environment: The Frozen Lake environment.
    
discount_factor: (Optional) The discount factor used to discount future rewards. Default is 1.0.
    
convergence_threshold: (Optional) The threshold for convergence, indicating when to stop the evaluation process. Default is 1e-9.
    
max_iterations: (Optional) The maximum number of iterations allowed for the evaluation. Default is 1000.


In [None]:
def evaluate_policy(policy_matrix, environment, discount_factor=1.0, convergence_threshold=1e-9, max_iterations=1000):
    num_states = environment.observation_space.n
    evaluation_iterations = 1
    state_values = np.zeros(shape=num_states)

    for iteration in range(int(max_iterations)):
        delta = 0
        for current_state in range(num_states):
            new_state_value = 0
            for action, action_probability in enumerate(policy_matrix[current_state]):
                for state_probability, next_state, reward, done in environment.P[current_state][action]:
                    new_state_value += action_probability * state_probability * (reward + discount_factor * state_values[next_state])
            delta = max(delta, np.abs(state_values[current_state] - new_state_value))
            state_values[current_state] = new_state_value

        evaluation_iterations += 1

        if delta < convergence_threshold:
            print(f'Policy evaluation terminated after {evaluation_iterations} iterations.\n')
            return state_values


# Policy Iteration Algorithm

The function performs policy iteration by iteratively evaluating and improving the policy until convergence or the maximum number of iterations is reached.
It initializes the policy matrix with a uniform distribution over actions for each state.
In each iteration:
It evaluates the current policy using the evaluate_policy function.
It then improves the policy by selecting the action that maximizes the expected value for each state.
If the policy remains unchanged after an iteration, indicating convergence, the algorithm terminates.
Finally, it returns the optimal policy matrix.

In [None]:
def policy_iteration_algorithm(environment, discount_factor=1.0, max_iterations=1000):
    num_states = environment.observation_space.n
    num_actions = environment.action_space.n
    policy_matrix = np.ones(shape=[num_states, num_actions]) / num_actions
    evaluated_policies_count = 1

    for iteration in range(int(max_iterations)):
        stable_policy = False
        value_function = evaluate_policy(policy_matrix, environment, discount_factor)

        for current_state in range(num_states):
            current_action = np.argmax(policy_matrix[current_state])
            action_values = calculate_state_value(environment, current_state, value_function, discount_factor)
            best_action = np.argmax(action_values)

            if current_action != best_action:
                stable_policy = True

            policy_matrix[current_state] = np.eye(num_actions)[best_action]

        evaluated_policies_count += 1

        if stable_policy:
            print(f'Found a stable policy after {evaluated_policies_count:,} evaluations.\n')
            return policy_matrix, value_function


# Value Iteration Algorithm

Finds the optimal value function for a Markov Decision Process (MDP) iteratively.

Computes the optimal value function directly instead of improving a policy iteratively.

Initializes state values to zeros.

Iteratively updates state values until convergence or reaching the maximum number of iterations.

For each state, computes action values and selects the best action value.

Updates state values based on the best action value and tracks the maximum change in state values (delta).

Terminates when the maximum change in state values falls below the convergence threshold.

Extracts the optimal policy by selecting the action with the highest value for each state.

Policy matrix indicates the probability of selecting each action in each state.

Efficient for finding optimal policy in finite MDPs by computing the optimal value function directly.


In [None]:
def value_iteration_algorithm(environment, discount_factor=1e-1, convergence_threshold=1e-9, max_iterations=1e4):
    state_values = np.zeros(environment.observation_space.n)

    for iteration in range(int(max_iterations)):
        delta = 0

        for current_state in range(environment.observation_space.n):
            action_values = calculate_state_value(environment, current_state, state_values, discount_factor)
            best_action_value = np.max(action_values)
            delta = max(delta, np.abs(state_values[current_state] - best_action_value))
            state_values[current_state] = best_action_value

        if delta < convergence_threshold:
            print(f'\nValue iteration converged at iteration #{iteration+1:,}')
            break

    policy_matrix = np.zeros(shape=[environment.observation_space.n, environment.action_space.n])

    for current_state in range(environment.observation_space.n):
        action_values = calculate_state_value(environment, current_state, state_values, discount_factor)
        best_action = np.argmax(action_values)
        policy_matrix[current_state, best_action] = 1.0

    return policy_matrix, state_values


# Playing Episodes and Evaluating Policies

This function is useful for evaluating the performance of a given policy matrix in the Frozen Lake environment by simulating gameplay over multiple episodes

In [None]:
def play_episodes_and_evaluate(env, num_episodes, policy_matrix, max_actions=100, render=False):

    total_wins = 0
    total_rewards, total_actions = 0, 0

    for episode in range(num_episodes):
        current_state = env.reset()
        episode_done, actions_taken = False, 0

        while actions_taken < max_actions:
            selected_action = np.argmax(policy_matrix[current_state])
            next_state, reward, episode_done, _ = env.step(selected_action)

            if render:
                env.render()

            actions_taken += 1
            total_rewards += reward
            current_state = next_state

            if episode_done:
                total_wins += 1
                break

        total_actions += actions_taken

    print(f'Total rewards: {total_rewards:,}\tMax actions: {actions_taken:,}')

    average_reward = total_rewards / num_episodes
    average_actions = total_actions / num_episodes

    print('')
    return total_wins, total_rewards, average_reward, average_actions


# Agent and Evaluation

This function serves as a comprehensive evaluation tool for comparing the performance of different iteration methods in the Frozen Lake environment.

In [None]:
num_episodes = 1000

def agent_and_evaluate(env):

    total_rewards_list = []

    action_mapping = {
        0: '\u2191',  # up
        1: '\u2192',  # right
        2: '\u2193',  # down
        3: '\u2190'   # left
    }

    iteration_methods = [
        ('Policy Iteration', policy_iteration_algorithm),
        ('Value Iteration', value_iteration_algorithm)
    ]

    for method_name, method_func in iteration_methods:
        policy_matrix, value_function = method_func(env)

        print(f'Final policy using {method_name}:')
        print(' '.join([action_mapping[action] for action in np.argmax(policy_matrix, axis=1)]))

        total_wins, total_rewards, avg_reward, avg_actions = play_episodes_and_evaluate(env, num_episodes, policy_matrix)
        total_rewards_list.append(total_rewards)

        print(f'Number of wins = {total_wins:,}')
        print(f'Average reward = {avg_reward:.2f}')
        print(f'Average actions = {avg_actions:.2f}')

    return total_rewards_list


In [None]:
rewards = agent_and_evaluate(env)

Policy evaluation terminated after 66 iterations.

Found a stable policy after 2 evaluations.

Final policy using Policy Iteration:
↑ ← ↑ ← ↑ ↑ ↑ ↑ ← → ↑ ↑ ↑ ↓ → ↑


  if not isinstance(terminated, (bool, np.bool8)):


Total rewards: 735.0	Max actions: 45

Number of wins = 1,000
Average reward = 0.73
Average actions = 41.14

Value iteration converged at iteration #8
Final policy using Value Iteration:
→ ← ↓ ← ↑ ↑ ↑ ↑ ← → ↑ ↑ ↑ ↓ → ↑
Total rewards: 438.0	Max actions: 24

Number of wins = 1,000
Average reward = 0.44
Average actions = 27.64


# Policy Iteration: and Conclusion

Policy Iteration:

Convergence: Policy evaluation converged after 66 iterations.
Stability: A stable policy was found after 2 evaluations.
Final Policy: Represents recommended actions for each state.
Total Rewards: Achieved 735.0 cumulative rewards across all episodes.
Max Actions: 45 actions in the longest episode.
Wins: All 1,000 episodes resulted in wins.
Average Reward: 0.73 per episode.
Average Actions: 41.14 actions per episode.
Value Iteration:

Convergence: Value iteration converged after 8 iterations.
Final Policy: Similar representation as Policy Iteration.
Total Rewards: Achieved 438.0 cumulative rewards.
Max Actions: 24 actions in the longest episode.
Wins: All 1,000 episodes resulted in wins.
Average Reward: 0.44 per episode.
Average Actions: 27.64 actions per episode.
Comparison:

Average Reward: Policy Iteration outperformed with 0.73 compared to 0.44.
Average Actions: Value Iteration was more efficient with 27.64 compared to 41.14.
Conclusion:

Policy Iteration offers higher rewards per episode.
Value Iteration requires fewer actions, potentially finding more efficient paths.
Selection depends on specific goals and requirements.

