In [1]:
import sys
sys.path.insert(1, 'lib')

import lib
import time, argparse
import gym
import numpy as np
from tqdm import tqdm
from lib.common_utils import TabularUtils
from lib.regEnvs import *

class Tabular_DP:
    def __init__(self, args):
        self.env = args.env
        self.gamma = 0.99 # discount factor
        self.theta = 1e-5 # small threshold for stopping criteria
        self.max_iterations = 1000
        self.num_actions = self.env.action_space.n # number of actions
        self.num_states = self.env.observation_space.n # number of states

    def compute_q_value_current_state(self, state, value_function):
        q_values = np.zeros(self.num_actions)
        for action in range(self.num_actions):
            for prob, next_state, reward, _ in self.env.P[state][action]:
                q_values[action] += prob * (reward + self.gamma * value_function[next_state])
        return q_values

    def action_to_one_hot(self, action):
        one_hot_action = np.zeros(self.num_actions)
        one_hot_action[action] = 1
        return one_hot_action

    def value_iteration(self):
        value_function = np.zeros(self.num_states)
        optimal_policy = np.zeros((self.num_states, self.num_actions))
        for _ in range(self.max_iterations):
            delta = 0
            for state in range(self.num_states):
                q_values = self.compute_q_value_current_state(state, value_function)
                best_action_value = np.max(q_values)
                delta = max(delta, np.abs(best_action_value - value_function[state]))
                value_function[state] = best_action_value
                optimal_policy[state] = self.action_to_one_hot(np.argmax(q_values))
            if delta < self.theta:
                break
        return value_function, optimal_policy

In [18]:
env_name = 'FrozenLake-Deterministic-v1'
# env_name = 'FrozenLake-Deterministic-8x8-v1'


my_env = gym.make(env_name)
tabularUtils = TabularUtils(my_env)
# # test value iteration
dp = Tabular_DP(my_env)
print("================Running value iteration=====================")
V_optimal, policy_optimal = dp.value_iteration()
print("Optimal value function: ")
print(V_optimal)
print("Optimal policy: ")
print(policy_optimal)
# render
# tabularUtils.render(policy_optimal)

Optimal value function: 
[0.46089055 0.56655611 0.673289   0.56655611 0.56655611 0.
 0.7811     0.         0.673289   0.7811     0.89       0.
 0.         0.89       1.         0.        ]
Optimal policy: 
[[0. 1. 0. 0.]
 [0. 0. 1. 0.]
 [0. 1. 0. 0.]
 [1. 0. 0. 0.]
 [0. 1. 0. 0.]
 [1. 0. 0. 0.]
 [0. 1. 0. 0.]
 [1. 0. 0. 0.]
 [0. 0. 1. 0.]
 [0. 1. 0. 0.]
 [0. 1. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [0. 0. 1. 0.]
 [0. 0. 1. 0.]
 [1. 0. 0. 0.]]


In [19]:
class Tabular_TD:
    def __init__(self, env):
        # Initialize the Tabular_TD class with the given environment
        self.env = env
        self.num_episodes = 10000  # Number of episodes for training
        self.gamma = 0.99  # Discount factor for future rewards
        self.alpha = 0.05  # Learning rate
        self.env_nA = self.env.action_space.n  # Number of actions in the environment
        self.env_nS = self.env.observation_space.n  # Number of states in the environment
        self.tabular_utils = TabularUtils(self.env)  # Utility functions for the tabular environment

    def sarsa(self):
        """sarsa: on-policy TD control"""
        # Initialize Q values with zeros
        Q_values = np.zeros((self.tabular_utils.env_nS, self.tabular_utils.env_nA))
        epsilon = 0.1  # Epsilon value for epsilon-greedy policy
        for epi in range(self.num_episodes):
            current_state = self.env.reset()  # Reset the environment to the initial state
            current_action = self.tabular_utils.epsilon_greedy_policy(Q_values[current_state], epsilon)
            done = False
            while not done:
                # Execute the action in the environment
                next_state, reward, done, _ = self.env.step(current_action)
                next_action = self.tabular_utils.epsilon_greedy_policy(Q_values[next_state], epsilon)
                # SARSA update
                td_target = reward + (self.gamma * Q_values[next_state][next_action] * (not done))
                td_error = td_target - Q_values[current_state][current_action]
                Q_values[current_state][current_action] += self.alpha * td_error
                current_state, current_action = next_state, next_action

        # Derive the greedy policy from the Q values
        greedy_policy = self.tabular_utils.Q_value_to_greedy_policy(Q_values)
        return Q_values, greedy_policy

In [20]:
def action_to_one_hot(action, num_actions=4):
    # Convert the given action to a one-hot encoded vector
    one_hot_action = np.zeros(num_actions)
    one_hot_action[action] = 1
    return one_hot_action

my_env = gym.make(env_name)
tabularUtils = TabularUtils(my_env)

# Create an instance of the Tabular_TD class and test SARSA
td = Tabular_TD(my_env)
Q_sarsa, policy_sarsa = td.sarsa()
print("Policy from SARSA")
# Convert the policy to a deterministic policy and represent it as a list of one-hot vectors
best_policy = tabularUtils.onehot_policy_to_deterministic_policy(policy_sarsa)
best_policy = [action_to_one_hot(int(i)) for i in best_policy]


print(np.array(best_policy))
# Visualize the best policy on the environment
# tabularUtils.render(best_policy)

Policy from SARSA
[[0. 0. 1. 0.]
 [0. 0. 1. 0.]
 [0. 1. 0. 0.]
 [1. 0. 0. 0.]
 [0. 1. 0. 0.]
 [1. 0. 0. 0.]
 [0. 1. 0. 0.]
 [1. 0. 0. 0.]
 [0. 0. 1. 0.]
 [0. 0. 1. 0.]
 [0. 1. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [0. 0. 1. 0.]
 [0. 0. 1. 0.]
 [1. 0. 0. 0.]]
