In [30]:
import gym
import numpy as np

class DynamicProgramming:
    def __init__(self) -> None:
        self.env = gym.make('FrozenLake-v1')
        self.nS = 16
        self.nA = 4
        self.gamma = 1
        self.theta = 1e-4
        self.random_policy = np.ones((self.nS, self.nA)) / self.nA

    def policy_evaluation(self, policy):
        prev_V = np.zeros(self.nS)
        iteration = 0
        while True:
            delta = 0
            iteration += 1
            V = np.zeros(self.nS)
            for state in range(self.nS):
                for action in range(self.nA):
                    for prob, next_state, reward, _ in self.env.P[state][action]:
                        V[state] += policy[state][action] * prob * (reward + self.gamma * prev_V[next_state])
            delta = np.max(np.abs(V - prev_V))
            if delta <= self.theta:
                break
            prev_V = np.copy(V)
        return V, iteration
    
    def policy_improvement(self, V):
        policy = np.zeros((self.nS, self.nA))
        q = np.zeros((self.nS, self.nA))
        for state in range(self.nA):
            for action in range(self.nA):
                for prob, next_state, reward, _ in self.env.P[state][action]:
                    q[state][action] += prob * (reward + self.gamma * V[next_state])

        max_index = np.argmax(q, axis=1)

        for i, optimal_action in enumerate(max_index):
            policy[i][optimal_action] = 1

        return policy

    def policy_iteration(self, policy=None):
        policy = self.random_policy if policy == None else policy
        old_policy = policy
        iteration = 0
        while True:
            V, _ = self.policy_evaluation(policy)
            policy = self.policy_improvement(V)
            iteration += 1
            comparison = policy == old_policy
            if comparison.all() == True:
                break
            old_policy = policy
        return policy, iteration
    
    def value_iteration(self):
        policy = np.zeros((self.nS, self.nA))
        prev_V = np.zeros((self.nS, 1))
        iteration = 0
        while True:
            iteration += 1
            Q = np.zeros((self.nS, self.nA))
            for state in range(self.nS):
                for action in range(self.nA):
                    for prob, next_state, reward, _ in self.env.P[state][action]:
                        Q[state][action] += prob * (reward + self.gamma * prev_V[next_state])
            V = np.max(Q, axis=1)
            if np.max(np.abs(prev_V - V)) < self.theta:
                break
            prev_V = V
            max_index = np.argmax(Q, axis=1)
            for i, optimal_action in enumerate(max_index):
                policy[i][optimal_action]  = 1
        return policy, iteration
    
    
DP = DynamicProgramming()
optimal_policy , iteration = DP.policy_iteration()
optimal_policy2 , iteration2 = DP.value_iteration()

if np.all(optimal_policy == optimal_policy2):
    print("same policy")

print(f"{optimal_policy}\n{iteration}")
print(f"{optimal_policy2}\n{iteration2}")

[[1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]]
3
[[1. 1. 1. 0.]
 [1. 1. 1. 1.]
 [1. 0. 1. 1.]
 [1. 0. 0. 1.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [1. 1. 0. 1.]
 [1. 1. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [1. 1. 1. 0.]
 [0. 1. 0. 0.]
 [1. 0. 0. 0.]]
243


  Q[state][action] += prob * (reward + self.gamma * prev_V[next_state])


In [5]:
def decay_schedule(
 init_value, min_value,
 decay_ratio, max_steps,
 log_start=-2, log_base=10):
 decay_steps = int(max_steps * decay_ratio)
 rem_steps = max_steps - decay_steps
 values = np.logspace(
 log_start, 0, decay_steps,
 base=log_base, endpoint=True)[::-1]
 values = (values - values.min()) / \
 (values.max() - values.min())
 values = (init_value - min_value) * values + min_value
 values = np.pad(values, (0, rem_steps), 'edge')
 return values

In [6]:
def sarsa(env, gamma=1.0, init_alpha=0.5, min_alpha=0.01, alpha_decay_ratio=0.5, init_epsilon=1.0, min_epsilon=0.1, epsilon_decay_ratio=0.9, n_episodes=3000):
    nS, nA = env.observation_space.n, env.action_space.n
    pi_track = []
    Q = np.zeros((nS, nA), dtype=np.float64)
    Q_track = np.zeros((n_episodes, nS, nA), dtype=np.float64)
    select_action = lambda state, Q, epsilon: np.argmax(Q[state]) if np.random.random() > epsilon else np.random.randint(len(Q[state]))
    alphas = decay_schedule(
    init_alpha, min_alpha,
    alpha_decay_ratio,
    n_episodes)
    epsilons = decay_schedule(
    init_epsilon, min_epsilon,
    epsilon_decay_ratio,
    n_episodes)
    for e in range(n_episodes):
        state, info = env.reset()
        done = False
        action = select_action(state, Q, epsilons[e])
        while not done:
            next_state, reward, done, _, _ = env.step(action)
            next_action = select_action(next_state,
            Q,
            epsilons[e])
            td_target = reward + gamma * \
            Q[next_state][next_action] * (not done)
            td_error = td_target - Q[state][action]
            Q[state][action] = Q[state][action] + \
            alphas[e] * td_error
            state, action = next_state, next_action
        Q_track[e] = Q
        pi_track.append(np.argmax(Q, axis=1))
    V = np.max(Q, axis=1)
    pi = lambda s: {s:a for s, a in enumerate(\
    np.argmax(Q, axis=1))}[s]
    return Q, V, pi, Q_track, pi_track

sarsa(gym.make('CliffWalking-v0'))

  if not isinstance(terminated, (bool, np.bool8)):


(array([[-8.89511142e+02, -3.35860825e+02, -8.94840151e+02,
         -8.00611151e+02],
        [-6.55317976e+02, -2.85211875e+02, -1.01693394e+03,
         -7.75454299e+02],
        [-5.93507492e+02, -2.33538568e+02, -9.71345855e+02,
         -6.54893328e+02],
        [-5.10552830e+02, -1.89777075e+02, -9.25950500e+02,
         -6.05440462e+02],
        [-4.76184816e+02, -1.50542794e+02, -8.33810700e+02,
         -4.70718175e+02],
        [-3.97823675e+02, -1.06883003e+02, -7.16501499e+02,
         -5.17392722e+02],
        [-2.54500422e+02, -7.72456669e+01, -4.45501625e+02,
         -3.57382116e+02],
        [-2.50989175e+02, -7.29309741e+01, -4.91405638e+02,
         -2.80286252e+02],
        [-1.82539799e+02, -4.99279587e+01, -4.10015674e+02,
         -2.14165863e+02],
        [-1.12899723e+02, -2.91493268e+01, -2.89498443e+02,
         -1.63697242e+02],
        [-8.54004652e+01, -2.25432176e+01, -1.25428277e+02,
         -1.35955179e+02],
        [-6.10561703e+01, -6.13869191e+01, 

In [28]:
import gym 
import numpy as np 
import matplotlib.pyplot as plt 
 
# Q-Learning Algorithm 
def q_learning(env,
    gamma=1.0,
    init_alpha=0.5,
    min_alpha=0.01,
    alpha_decay_ratio=0.5,
    init_epsilon=1.0,
    min_epsilon=0.1,
    epsilon_decay_ratio=0.9,
    n_episodes=3000):

    nS, nA = env.observation_space.n, env.action_space.n
    pi_track = []
    pi_track=[] 
    Q = np.zeros((nS, nA), dtype=np.float64)
    Q_track = np.zeros((n_episodes, nS, nA), dtype=np.float64)
    select_action = lambda state, Q, epsilon: \
        np.argmax(Q[state]) \
        if np.random.random() > epsilon \
        else np.random.randint(len(Q[state]))
    
    alphas = decay_schedule(
        init_alpha, min_alpha,
        alpha_decay_ratio,
        n_episodes) 
    epsilons = decay_schedule(
        init_epsilon, min_epsilon,
        epsilon_decay_ratio,
        n_episodes)
    for e in range(n_episodes): 
        state, _ = env.reset()
        done = False
        while not done: 
            action = select_action(state, Q, epsilons[e])
            next_state, reward, done, _, _ = env.step(action)
            td_target = reward + gamma * Q[next_state].max() * (not done)
            td_error = td_target - Q[state][action]
            Q[state][action] = Q[state][action] + alphas[e] * td_error
            state = next_state
        Q_track[e] = Q
        pi_track.append(np.argmax(Q, axis=1))
    V = np.max(Q, axis=1)
    pi = lambda s: {s:a for s, a in enumerate(np.argmax(Q, axis=1))}[s] 
    return Q, V, pi, Q_track, pi_track
q_learning(gym.make('CliffWalking-v0'))

  if not isinstance(terminated, (bool, np.bool8)):


(array([[ -15.,  -14.,  -14.,  -15.],
        [ -14.,  -13.,  -13.,  -15.],
        [ -13.,  -12.,  -12.,  -14.],
        [ -12.,  -11.,  -11.,  -13.],
        [ -11.,  -10.,  -10.,  -12.],
        [ -10.,   -9.,   -9.,  -11.],
        [  -9.,   -8.,   -8.,  -10.],
        [  -8.,   -7.,   -7.,   -9.],
        [  -7.,   -6.,   -6.,   -8.],
        [  -6.,   -5.,   -5.,   -7.],
        [  -5.,   -4.,   -4.,   -6.],
        [  -4.,   -4.,   -3.,   -5.],
        [ -15.,  -13.,  -13.,  -14.],
        [ -14.,  -12.,  -12.,  -14.],
        [ -13.,  -11.,  -11.,  -13.],
        [ -12.,  -10.,  -10.,  -12.],
        [ -11.,   -9.,   -9.,  -11.],
        [ -10.,   -8.,   -8.,  -10.],
        [  -9.,   -7.,   -7.,   -9.],
        [  -8.,   -6.,   -6.,   -8.],
        [  -7.,   -5.,   -5.,   -7.],
        [  -6.,   -4.,   -4.,   -6.],
        [  -5.,   -3.,   -3.,   -5.],
        [  -4.,   -3.,   -2.,   -4.],
        [ -14.,  -12.,  -14.,  -13.],
        [ -13.,  -11., -113.,  -13.],
        [ -1