In [25]:
import itertools
from copy import deepcopy
import numpy as np
import gym
import matplotlib.pyplot as plt
from tqdm import tqdm
import json
import os
import random
import time
from datetime import datetime
import numpy as np
from tqdm import tqdm

## Value Iteration

In [12]:
## Value Iteration
def get_transitional_func(env, samples=1e5):
    num_states = 0
    num_actions = 0
    id = env.spec.id
    num_states = env.observation_space.n
    num_actions = env.action_space.n

    # transition probability
    T = np.zeros((num_states, num_actions, num_states))
    # reward function
    R = np.zeros((num_states, num_actions, num_states))
    counter_map = np.zeros((num_states, num_actions, num_states))

    counter = 0
    for _ in tqdm(range(int(samples)), desc="Calculating transition function"):
        state = env.reset()
        terminated = False

        while not terminated:
            random_action = env.action_space.sample() # todo consider looping through all actions
            observation, reward, terminated, info = env.step(random_action)
            T[state][random_action][observation] += 1
            R[state][random_action][observation] += reward

            state = observation
            counter += 1

    # normalization
    for i in range(T.shape[0]):
        for j in range(T.shape[1]):
            norm_coeff = np.sum(T[i, j, :])
            if norm_coeff:
                T[i, j, :] /= norm_coeff

    counter_map[counter_map == 0] = 1  # avoid invalid division
    R /= counter_map

    return T, R


# [link](https://external-content.duckduckgo.com/iu/?u=https%3A%2F%2Fi.stack.imgur.com%2FORobd.png&f=1&nofb=1&ipt=9fb56f6e807783ea6559ee7c1bf9fd0c69a9b1a6b083f4e9aa326cb1dec195b2&ipo=images)
def value_iteration(env, T, R, max_iter=1e5, test_iterations=1e3) -> (np.ndarray, np.ndarray):
    """Value Iteration.

    Args:
        env: OpenAI environment.

    Returns:
        A numpy array of shape [S, A] representing the state value function.
    """
    num_state = T.shape[0]
    num_action = T.shape[1]

    success_rate = []

    # discount factor
    gamma = 0.9
    # θ = 1e-04
    theta = 1e-04

    # value function
    V = np.zeros(num_state)

    # policy
    P = np.zeros(num_state)

    for _ in tqdm(range(int(max_iter)), desc="Value Iteration"):
        # for each state in the state space
        for state_i in range(num_state):
            action_value = 0
            # update the value of the state
            # V(s) ← max a∈A ∑ s′∈S P(s′|s,a)[R(s,a,s′) + γV(s′)]
            action_values = []
            for action_i in range(num_action):
                tmp = 0
                # for each transition in the transition space
                for state_new in range(num_state):
                    # transition P(s′|s,a)
                    t = T[state_i][action_i][state_new]

                    # reward R(s,a,s′)
                    r = R[state_i][action_i][state_new]

                    # new value V(s′)
                    v_new = V[state_new]

                    tmp += t * (r + gamma * v_new)
                action_value = max(action_value, tmp)

            # update the value of the state
            V[state_i] = action_value

            # update the policy
            action_values = []
            for state_i in range(num_state):
                action_value = 0
                for action_i in range(num_action):
                    temp_value = 0
                    for state_new in range(num_state):
                        t = T[state_i][action_i][state_new]
                        r = R[state_i][action_i][state_new]
                        v_new = V[state_new]

                        temp_value += t * (r + gamma * v_new)
                    if temp_value > action_value:
                        P[state_i] = action_i
                        action_value = temp_value

    # π(s) = argmax_a ∑(p(s', r|s, a)[r + γV(s')])
    env.reset()
    success = 0
    for _ in tqdm(range(int(test_iterations)), desc="Testing policy"):
        state = env.reset()
        terminated = False
        while not terminated:
            action = P[state]
            observation, reward, terminated, info = env.step(action)
            if terminated:
                if reward == 1:
                    success += 1
                break

    success_rate.append(success / test_iterations)

    return P, np.asarray(success_rate)


def run_value_iteration(name, max_iter, test_iterations, test_samples):
    # create directory for saving the model
    current_ts = datetime.now().strftime("%Y%m%d-%H%M%S")
    dir_name = f"artifacts/{current_ts}/{name}/value_iteration/"
    if not os.path.exists(dir_name):
        os.makedirs(dir_name)

    env = gym.make(name)
    env.reset()

    # get the transition function
    T, R = get_transitional_func(env, samples=test_samples)

    # get the optimal policy
    P, success_rate = value_iteration(env, T, R, max_iter=max_iter, test_iterations=test_iterations)

    # save all artifacts
    np.save(f"{dir_name}/value_iteration/policy.npy", P)
    np.save(f"{dir_name}/value_iteration/success_rate.npy", success_rate)
    np.save(f"{dir_name}/value_iteration/transition.npy", T)
    np.save(f"{dir_name}/value_iteration/reward.npy", R)
    # save hyperparameters
    with open(f"{dir_name}/value_iteration/hyperparameters.json", "w") as f:
        json.dump(
            {
                "max_iter": max_iter,
                "test_iterations": test_iterations,
                "test_samples": test_samples
            }, f)

    print("Final success rate of Value Iteration: {:.1f}%".format(success_rate[-1] * 100))



max_iter = 20
test_iterations = 5
test_samples = 1e3
run_value_iteration('FrozenLake8x8-v1', max_iter, test_iterations, test_samples)

Calculating transition function: 100%|██████████| 1000/1000 [00:00<00:00, 1633.28it/s]
Value Iteration: 100%|██████████| 20/20 [00:17<00:00,  1.17it/s]
Testing policy: 100%|██████████| 5/5 [00:00<?, ?it/s]


FileNotFoundError: [Errno 2] No such file or directory: 'artifacts/20221118-084740/FrozenLake8x8-v1/value_iteration//value_iteration/policy.npy'

## Policy Iteration

In [23]:
def execute(env, policy, gamma=1.0, render=False):
    state = env.reset()
    totalReward = 0
    stepIndex = 0
    while True:
        if render:
            env.render()
        state_new, reward, terminated, info = env.step(int(policy[state]))
        totalReward += (gamma ** stepIndex * reward)
        stepIndex += 1
        if terminated:
            break
    return totalReward

#Evaluation
def evaluate_policy(env, policy, gamma=1.0, n=100):
    scores = []
    for _ in range(n):
        state = env.reset()
        total_reward = 0
        index = 0
        while True:
            state_new, reward, terminated, info = env.step(int(policy[state]))
            total_reward += (gamma ** index * reward)
            index += 1
            if terminated:
                break
        scores.append(total_reward)
    scores = [execute(env, policy, gamma, False) for _ in range(n)]
    return np.mean(scores)

#Extract the policy given a value-function
def extract_policy(v, gamma=1.0):
    policy = np.zeros(env.observation_space.n)
    for s in range(env.observation_space.n):
        q_sa = np.zeros(env.action_space.n)
        for a in range(env.action_space.n):
            q_sa[a] = sum([p * (r + gamma * v[s_]) for p, s_, r, _ in env.env.P[s][a]])
            policy[s] = np.argmax(q_sa)
    return policy

#Iteratively calculates value-function under policy
def get_policy_value(env, policy, gamma=1.0, epsilon=0.1):
    value = np.zeros(env.observation_space.n)
    while True:
        V = np.copy(value)
        for states in range(env.observation_space.n):
            policy_a = policy[states]
            value[states] = sum([p * (r + gamma * V[s_]) for p,s_, r, _ in env.env.P[states][policy_a]])
        if np.sum((np.fabs(V - value))) <= epsilon:
            break
    return value


#Policy Iteration algorithm
def policy_iteration(env, gamma=1.0, max_iterations=1000, epsilon=0.1):
    # policy = np.random.choice(env.action_space.n, size=env.observation_space.n)
    policy = np.random.choice(env.action_space.n, env.observation_space.n)
    value_function = np.zeros(env.observation_space.n)
    new_policy = np.zeros(env.observation_space.n, dtype=int)
    record = []
    record.append(value_function)

    for _ in range(max_iterations):
        policy_stable = True

        # policy evaluation
        p_e_value_function = np.zeros(env.observation_space.n)
        while True:
            delta = 0
            for s in range(env.observation_space.n):
                v = p_e_value_function[s]
                p_temp = env.P[s][policy[s]]
                new_value = 0
                for k in p_temp:
                    new_value += k[0] * (k[2] + gamma * p_e_value_function[k[1]])
                p_e_value_function[s] = new_value
                delta = max(delta, abs(v - p_e_value_function[s]))
            if delta <= epsilon:
                break

        value_function = p_e_value_function
        record.append(value_function)

        policy = new_policy.copy()
        while True:
            delta = 0
            for s in range(env.observation_space.n):
                v = p_e_value_function[s]
                p_temp = env.P[s][policy[s]]
                new_value = 0
                for k in p_temp:
                    new_value += k[0] * (k[2] + gamma * p_e_value_function[k[1]])
                p_e_value_function[s] = new_value
                delta = max(delta, abs(v - p_e_value_function[s]))
            if delta <= epsilon:
                break

        # policy improvement
        new_policy

        #

        V = get_policy_value(env, policy, gamma, epsilon)
        V_next = extract_policy(V, gamma)
        if np.all(policy == V_next):
            print('Policy Iteration converged at %d' %(i+1))
            break
        policy = V_next
    return policy

## Policy search
env = gym.make('FrozenLake8x8-v1')
gamma = 0.9
max_iterations = 100000
epsilon = 0.001
startTime = time.time()
optimal_policy = policy_iteration(env, gamma = gamma, max_iterations=max_iterations, epsilon=epsilon)
scores = evaluate_policy(env, optimalPolicy, gamma=gamma)
endTime = time.time()
print("Best score = %0.2f. Time taken = %4.4f seconds" %(np.max(scores), endTime-startTime))
print(scores)

KeyboardInterrupt: 

In [24]:
def policyEval(policy, value, trans_prob, reward, gamma, max_itr=20):
    """
    Policy evaluation
    : param policy: ndarray, given policy
    : param value: ndarray, given value function
    : param trans_prob: ndarray, transition probabilities p(s'|a, s)
    : param reward: ndarray, reward function r(s, a, s')
    : param gamma: float, discount factor
    : param max_itr: int, maximum number of iteration
    : return: updated value function
    """
    counter = 0
    num_state = policy.shape[0]

    while counter < max_itr:
        counter += 1
        for s in range(num_state):
            val = 0
            for s_new in range(num_state):
                val += trans_prob[s][policy[s]][s_new] * (
                        reward[s][policy[s]][s_new] + gamma * value[s_new]
                )
            value[s] = val
    return value


def policyImprove(policy, value, trans_prob, reward, gamma):
    """
    Policy improvement
    : param policy: ndarray, given policy
    : param value: ndarray, given value function
    : param trans_prob: ndarray, transition probabilities p(s'|a, s)
    : param reward: ndarray, reward function r(s, a, s')
    : param gamma: float, discount factor
    : return:
        policy: updated policy
        policy_stable, bool, True if no change in policy
    """
    policy_stable = True
    num_state = trans_prob.shape[0]
    num_action = trans_prob.shape[1]

    for s in range(num_state):
        old_action = policy[s]
        val = value[s]
        for a in range(num_action):
            tmp = 0
            for s_new in range(num_state):
                tmp += trans_prob[s][a][s_new] * (
                        reward[s][a][s_new] + gamma * value[s_new]
                )
            if tmp > val:
                policy[s] = a
                val = tmp
        if policy[s] != old_action:
            policy_stable = False
    return policy, policy_stable


def policyItr(trans_prob, reward, gamma=0.99, max_itr=30, stop_if_stable=False):
    """
    Policy iteration
    : param trans_prob: ndarray, transition probabilities p(s'|a, s)
    : param reward: ndarray, reward function r(s, a, s')
    : param gamma: float, discount factor
    : param max_itr: int, maximum number of iteration
    : param stop_if_stable: bool, stop the training if reach stable state
    : return:
        policy: updated policy
        success_rate: list, success rate for each iteration
    """
    success_rate = []
    num_state = trans_prob.shape[0]

    # init policy and value function
    policy = np.zeros(num_state, dtype=int)
    value = np.zeros(num_state)

    counter = 0
    while counter < max_itr:
        counter += 1
        value = policyEval(policy, value, trans_prob, reward, gamma)
        policy, stable = policyImprove(policy, value, trans_prob, reward, gamma)

        # test the policy for each iteration
        success_rate.append(testPolicy(policy))

        if stable and stop_if_stable:
            print("policy is stable at {} iteration".format(counter))
    return policy, success_rate


if __name__ == "__main__":
    env = gym.make("FrozenLake-v0")
    env.reset()

    # toy policy
    policy = [(s + 1) % 4 for s in range(15)]
    print("Success rate of toy policy: {:.1f}%".format(testPolicy(policy) * 100))

    # get transitional probability and reward function
    trans_prob, reward = learnModel(env)

    # Policy Iteration
    PI_policy, PI_success_rate = policyItr(trans_prob, reward, max_itr=50)
    print("Final success rate of PI: {:.1f}%".format(PI_success_rate[-1] * 100))

    # plot
    plot(PI_success_rate, "Average success rate v.s. Episode (Policy Iteration)")

DeprecatedEnv: Env FrozenLake-v0 not found (valid versions include ['FrozenLake-v1'])

In [68]:
"""
    Args:
    poicy [S,A] shaped matrix representing policy.
    env. OpenAi gym env.v.
      env.P represents the transition propablities of the env
      env.P[s][a] is a list of transition tuples
      env.nS = is a number of states
      env.nA is a number of actions
    gamma: discount factor
    render: boolean to turn rendering on/off
"""

def execute(env, policy, gamma=1.0, render=False):
    start = env.reset()
    totalReward = 0
    stepIndex = 0
    while True:
        if render:
            env.render()
        # action = start[0]
        if type(start) == type(()):
            action = start[0]
        else:
            action = start
        start, reward, done, truncated, info = env.step(int(policy[action]))
        totalReward += (gamma ** stepIndex * reward)
        stepIndex += 1
        if done:
            break
    return totalReward

#Evaluation
def evaluatePolicy(env, policy, gamma=1.0, n=100):
    scores = [execute(env, policy, gamma, False) for _ in range(n)]
    return numpy.mean(scores)

#Extract the policy given a value-function
def extractPolicy(v, gamma=1.0):
    policy = numpy.zeros([s.n for s in env.observation_space.spaces])
    state_shape = [s.n for s in env.observation_space.spaces]
    state_iterations = list(itertools.product(*[range(s.n) for s in env.observation_space.spaces]))
    for s in state_iterations:
        env_copy = deepcopy(env)
        q_sa = numpy.zeros(state_shape)
        for a in range(env.action_space.n):
            env_copy = deepcopy(env_copy)
            state_new, reward, terminated, truncated, info = env_copy.step(a)
            for a2 in range(env.action_space.n):
                s2, r2, t2, tt2, i2 = env_copy.step(a2)
                q_sa[a] += 1.0 * (r2 + gamma * v[s2])
        policy[s] = numpy.argmax(q_sa)
    return policy

#Iteratively calculates value-function under policy
def CalcPolicyValue(env, policy, gamma=1.0):
    state_shape = [s.n for s in env.observation_space.spaces]
    value = numpy.zeros(state_shape)
    eps = 0.1
    while True:
        previousValue = numpy.copy(value)
        state_iterations = list(itertools.product(*[range(s.n) for s in env.observation_space.spaces]))
        for states in state_iterations:
            policy_a = policy[states]
            total_val = 0
            env_copy = deepcopy(env)
            state_new, reward, terminated, truncated, info = env_copy.step(policy_a)
            i, j, k = state_new
            k = int(k)
            if terminated:
                continue
            for action in range(env.action_space.n):
                s2, r2, t2, tt2, i2 = env_copy.step(action)
                total_val += 1.0 * (r2 + gamma * previousValue[(i,j,k)])
            value[states] = total_val
        if (numpy.sum((numpy.fabs(previousValue - value))) <= eps):
            break
    return value


#Policy Iteration algorithm
def policyIteration(env, gamma=1.0):
    policy = numpy.random.choice(env.action_space.n, size=(len(state_space_linear)))
    maxIterations = 1000
    gamma = 1.0
    for i in range(maxIterations):
        oldPolicyValue = CalcPolicyValue(env, policy, gamma)
        newPolicy = extractPolicy(oldPolicyValue, gamma)
        if (numpy.all(policy == newPolicy)):
            print('Policy Iteration converged at %d' %(i+1))
            break
        policy = newPolicy
    return policy

def state_idx(state):
    try:
        if type(state) == type(()) and len(state) == 3:
            return state_space_linear.index((state[0], state[1], int(state[2])))
        if type(state) == type(()) and len(state) == 2:
            state = state[0]
            return state_space_linear.index((state[0], state[1], int(state[2])))
        if type(state) == type(1):
            return state
    except:
        return Exception('ugh oh')

env = gym.make('FrozenLake-v1')
env.reset()
## Policy search
state_space_linear = list(itertools.product(*[range(s.n) for s in env.observation_space.spaces]))
startTime = time.time()
optimalPolicy = policyIteration(env, gamma = 1.0)
scores = evaluatePolicy(env, optimalPolicy, gamma=1.0)
endTime = time.time()
print("Best score = %0.2f. Time taken = %4.4f seconds" %(numpy.max(scores), endTime-startTime))
print(scores)

AttributeError: 'Discrete' object has no attribute 'spaces'

In [69]:
# deep q learning

env = gym.make('FrozenLake-v1')
state = env.reset()

discount = 0.9
learning_rate = 0.025
epsilon = 0.5
state_space_linear = list(itertools.product(*[range(s.n) for s in env.observation_space.spaces]))
Q = np.zeros([len(state_space_linear), env.action_space.n])

def state_idx(state):
    try:
        if type(state) == type(()) and len(state) == 3:
            return state_space_linear.index((state[0], state[1], int(state[2])))
        if type(state) == type(()) and len(state) == 2:
            state = state[0]
            return state_space_linear.index((state[0], state[1], int(state[2])))
        if type(state) == type(1):
            return state
    except:
        return Exception('ugh oh')


for _ in range(50000):
    state = state_idx(state)
    if np.random.rand() < epsilon:
        action = env.action_space.sample()
    else:
        action = np.argmax(Q[state])

    # Step the environment
    next_state, reward, done, _, _ = env.step(action)
    next_state = state_idx(next_state)

    # Q-Learning update:
    # Q(s,a) <-- Q(s,a) + α * (r + γ max_a' Q(s',a') - Q(s,a))
    target = reward - Q[state, action]
    if not done:
        target += discount * np.max(Q[next_state])
    Q[state, action] += learning_rate * target

    # Reset the environment if we're done
    state = env.reset() if done else next_state

# Now let's see what the value function looks like after training:
V = np.max(Q, axis=1)
print(V)

AttributeError: 'Discrete' object has no attribute 'spaces'

In [16]:
state_idx(state[0])

224

In [15]:
state

((10, 2, False), {})