In [None]:
#WEEK-8 (MDP AND Dynamic prog)(Policy and value iteration)
import sys
import gym
import numpy as np

env = gym.make('FrozenLake-v1',is_slippery=True)

def calculate_state_value(env, current_state, value_matrix, discount_factor):

    num_actions = env.action_space.n
    action_values = np.zeros(shape=num_actions)
    for action in range(num_actions):
        for transition_prob, next_state, reward, done in env.env.P[current_state][action]:
            action_values[action] += transition_prob * (reward + discount_factor * value_matrix[next_state])
    return action_values

def evaluate_policy(policy_matrix, environment, discount_factor=1.0, convergence_threshold=1e-9, max_iterations=1000):
    num_states = environment.observation_space.n
    evaluation_iterations = 1
    state_values = np.zeros(shape=num_states)

    for iteration in range(int(max_iterations)):
        delta = 0
        for current_state in range(num_states):
            new_state_value = 0
            for action, action_probability in enumerate(policy_matrix[current_state]):
                for state_probability, next_state, reward, done in environment.P[current_state][action]:
                    new_state_value += action_probability * state_probability * (reward + discount_factor * state_values[next_state])
            delta = max(delta, np.abs(state_values[current_state] - new_state_value))
            state_values[current_state] = new_state_value

        evaluation_iterations += 1

        if delta < convergence_threshold:
            print(f'Policy evaluation terminated after {evaluation_iterations} iterations.\n')
            return state_values

        
def policy_iteration_algorithm(environment, discount_factor=1.0, max_iterations=1000):
    num_states = environment.observation_space.n
    num_actions = environment.action_space.n
    policy_matrix = np.ones(shape=[num_states, num_actions]) / num_actions
    evaluated_policies_count = 1

    for iteration in range(int(max_iterations)):
        stable_policy = False
        value_function = evaluate_policy(policy_matrix, environment, discount_factor)

        for current_state in range(num_states):
            current_action = np.argmax(policy_matrix[current_state])
            action_values = calculate_state_value(environment, current_state, value_function, discount_factor)
            best_action = np.argmax(action_values)

            if current_action != best_action:
                stable_policy = True

            policy_matrix[current_state] = np.eye(num_actions)[best_action]

        evaluated_policies_count += 1

        if stable_policy:
            print(f'Found a stable policy after {evaluated_policies_count:,} evaluations.\n')
            return policy_matrix, value_function

        
def value_iteration_algorithm(environment, discount_factor=1e-1, convergence_threshold=1e-9, max_iterations=1e4):
    state_values = np.zeros(environment.observation_space.n)

    for iteration in range(int(max_iterations)):
        delta = 0

        for current_state in range(environment.observation_space.n):
            action_values = calculate_state_value(environment, current_state, state_values, discount_factor)
            best_action_value = np.max(action_values)
            delta = max(delta, np.abs(state_values[current_state] - best_action_value))
            state_values[current_state] = best_action_value

        if delta < convergence_threshold:
            print(f'\nValue iteration converged at iteration #{iteration+1:,}')
            break

    policy_matrix = np.zeros(shape=[environment.observation_space.n, environment.action_space.n])

    for current_state in range(environment.observation_space.n):
        action_values = calculate_state_value(environment, current_state, state_values, discount_factor)
        best_action = np.argmax(action_values)
        policy_matrix[current_state, best_action] = 1.0

    return policy_matrix, state_values


def play_episodes_and_evaluate(env, num_episodes, policy_matrix, max_actions=100, render=False):

    total_wins = 0
    total_rewards, total_actions = 0, 0

    for episode in range(num_episodes):
        current_state = env.reset()
        episode_done, actions_taken = False, 0

        while actions_taken < max_actions:
            selected_action = np.argmax(policy_matrix[current_state])
            next_state, reward, episode_done, _ = env.step(selected_action)

            if render:
                env.render()

            actions_taken += 1
            total_rewards += reward
            current_state = next_state

            if episode_done:
                total_wins += 1
                break

        total_actions += actions_taken

    print(f'Total rewards: {total_rewards:,}\tMax actions: {actions_taken:,}')

    average_reward = total_rewards / num_episodes
    average_actions = total_actions / num_episodes

    print('')
    return total_wins, total_rewards, average_reward, average_actions

num_episodes = 1000

def agent_and_evaluate(env):

    total_rewards_list = []

    action_mapping = {
        0: '\u2191',  # up
        1: '\u2192',  # right
        2: '\u2193',  # down
        3: '\u2190'   # left
    }

    iteration_methods = [
        ('Policy Iteration', policy_iteration_algorithm),
        ('Value Iteration', value_iteration_algorithm)
    ]

    for method_name, method_func in iteration_methods:
        policy_matrix, value_function = method_func(env)

        print(f'Final policy using {method_name}:')
        print(' '.join([action_mapping[action] for action in np.argmax(policy_matrix, axis=1)]))

        total_wins, total_rewards, avg_reward, avg_actions = play_episodes_and_evaluate(env, num_episodes, policy_matrix)
        total_rewards_list.append(total_rewards)

        print(f'Number of wins = {total_wins:,}')
        print(f'Average reward = {avg_reward:.2f}')
        print(f'Average actions = {avg_actions:.2f}')

    return total_rewards_list


rewards = agent_and_evaluate(env)

In [None]:
#WEEK-9 (MDP and Monte carlo methods)
import sys
import gym
import numpy as np
np.bool = np.bool_
import pandas as pd

env = gym.make('CliffWalking-v0')

def monte_carlo_es(env, n_episodes=500):
    Q = np.zeros((env.observation_space.n, env.action_space.n))
    N = np.zeros((env.observation_space.n, env.action_space.n))
    gamma = 1.0
    total_steps = []

    for i in range(n_episodes):
        state = env.reset()
        episode = []
        done = False
        steps = 0

        # generate an episode using exploring starts
        while not done:
            action = np.random.choice(env.action_space.n)
            next_state, reward, done, info = env.step(action)
            episode.append((state, action, reward))
            state = next_state
            steps += 1
        total_steps.append(steps)

        # update Q values using the episode
        returns = 0
        for j in range(len(episode)-1, -1, -1):
            state, action, reward = episode[j]
            returns = gamma*returns + reward
            N[state][action] += 1
            Q[state][action] += (returns - Q[state][action])/N[state][action]

    # derive optimal policy from Q values
    policy = np.argmax(Q, axis=1)

    return policy, Q, total_steps

def on_policy_mc_control(env, n_episodes=500, epsilon=0.1):
    Q = np.zeros((env.observation_space.n, env.action_space.n))
    N = np.zeros((env.observation_space.n, env.action_space.n))
    gamma = 1.0
    total_steps = []

    for i in range(n_episodes):
        state = env.reset()
        done = False
        steps = 0

        # generate an episode using Ɛ-soft policy
        while not done:
            if np.random.uniform(0, 1) < epsilon:
                action = env.action_space.sample()
            else:
                action = np.argmax(Q[state])
            next_state, reward, done, info = env.step(action)
            N[state][action] += 1
            Q[state][action] += (reward + gamma*np.max(Q[next_state]) - Q[state][action])/N[state][action]
            state = next_state
            steps += 1
        total_steps.append(steps)

    # derive optimal policy from Q values
    policy = np.argmax(Q, axis=1)

    return policy, Q, total_steps

monte_carlo_es_policy, monte_carlo_es_q, total_steps_es = monte_carlo_es(env)
on_policy_mc_control_policy, on_policy_mc_control_q, total_steps_control = on_policy_mc_control(env)

print(str.format('Total Number of Steps taken to reach Optimal Policy using Monte Carlo ES: {}', sum(total_steps_es)))
print(str.format('Total Number of Steps taken to reach Optimal Policy using On-Policy First-Visit MC Control: {}', sum(total_steps_control)))

print(str.format('Average Number of Steps per Episode taken to reach Optimal Policy using Monte Carlo ES: {}', sum(total_steps_es)/len(total_steps_es)))
print(str.format('Average Number of Steps per Episode taken to reach Optimal Policy using On-Policy First-Visit MC Control: {}', sum(total_steps_control)/len(total_steps_control)))

In [None]:
#WEEK-10 (Temporal difference SARSA learning And Q-learning)
import gym
import numpy as np
import matplotlib.pyplot as plt

env = gym.make('Taxi-v3')

alpha = 0.4
gamma = 0.9
epsilon = 0.9
num_episodes = 2000

Q = np.zeros([env.observation_space.n, env.action_space.n])

def epsilon_greedy_policy(state, epsilon):
    if np.random.uniform(0, 1) < epsilon:
        action = env.action_space.sample()
    else:
        action = np.argmax(Q[state, :])
    return action

def sarsa():
    rewards = []
    for episode in range(num_episodes):
        state = env.reset()
        action = epsilon_greedy_policy(state, epsilon)
        total_reward = 0
        while True:
            next_state, reward, done, _ = env.step(action)
            next_action = epsilon_greedy_policy(next_state, epsilon)
            Q[state, action] += alpha * (reward + gamma * Q[next_state, next_action] - Q[state, action])
            total_reward += reward
            state = next_state
            action = next_action
            if done:
                rewards.append(total_reward)
                break
    return rewards

Q = np.zeros([env.observation_space.n, env.action_space.n])

def q_learning():
    rewards = []
    for episode in range(num_episodes):
        state = env.reset()
        total_reward = 0
        while True:
            action = epsilon_greedy_policy(state, epsilon)
            next_state, reward, done, _ = env.step(action)
            Q[state, action] += alpha * (reward + gamma * np.max(Q[next_state, :]) - Q[state, action])
            total_reward += reward
            state = next_state
            if done:
                rewards.append(total_reward)
                break
    return rewards

Q = np.zeros([env.observation_space.n, env.action_space.n])

def expected_sarsa():
    rewards = []
    for episode in range(num_episodes):
        state = env.reset()
        total_reward = 0
        while True:
            action = epsilon_greedy_policy(state, epsilon)
            next_state, reward, done, _ = env.step(action)
            expected_value = np.dot(Q[next_state, :], np.ones(env.action_space.n) * epsilon / env.action_space.n) + \
                             np.max(Q[next_state, :]) * (1 - epsilon)
            Q[state, action] += alpha * (reward + gamma * expected_value - Q[state, action])
            total_reward += reward
            state = next_state
            if done:
                rewards.append(total_reward)
                break
    return rewards

r_sarsa = sarsa()
r_qlearn = q_learning()
r_esarsa = expected_sarsa()

plt.plot(r_sarsa)
plt.title('Rewards generated by SARSA')
plt.xlabel('Episodes')
plt.ylabel('Rewards')

np.mean(r_sarsa)

plt.plot(r_qlearn)
plt.title('Rewards generated by Q-Learning')
plt.xlabel('Episodes')
plt.ylabel('Rewards')

np.mean(r_qlearn)

plt.plot(r_esarsa)
plt.title('Rewards generated by E-SARSA')
plt.xlabel('Episodes')
plt.ylabel('Rewards')

np.mean(r_esarsa)