In [3]:
import gym
import numpy as np

In [4]:
env = gym.make("FrozenLake-v0")

In [5]:
SZ_ACTION_SPACE = env.action_space.n
SZ_OBS_SPACE = env.observation_space.n

In [60]:
class PolicyIteration:
    def __init__(self,env,gamma=1.0):
        self.env = env
        self.gamma = gamma
        self.SZ_ACTION_SPACE = self.env.action_space.n
        self.SZ_OBS_SPACE = self.env.observation_space.n
        self.policy = np.random.choice(env.nA,size = (env.nS))
    def policy_evaluation(self):
        v = np.zeros(self.env.nS)
        epsilon = 1e-13
        while True:
            prev_v = np.copy(v)
            for s in range(self.env.nS):
                policy_a = self.policy[s]
                v[s] = sum([p*(r_ + self.gamma*prev_v[s_]) for p, s_,r_,_ in self.env.P[s][policy_a]])
            if(np.sum((np.fabs(prev_v-v)))<epsilon):
                break
        return v
    def policy_improvement(self,v):
        policy = np.random.choice(self.env.nA,size = self.env.nS)
        for s in range(self.env.nS):
            q_sa = np.zeros(self.env.nA)
            for a in range(self.env.nA):
                q_sa[a] = sum([p* (r_+self.gamma*v[s_]) for p, s_,r_,_ in self.env.P[s][a]])
            policy[s] = np.argmax(q_sa)
        return policy
    def run(self,iterations=100000):
        for i in range(iterations):
            # print(f"Iteration {i+1} started")

            old_policy = np.copy(self.policy)
            V = self.policy_evaluation()
            # print(V)
            self.policy = np.copy(self.policy_improvement(V))
            if np.array_equal(old_policy,self.policy):
                print("Converged at iteration {}".format(i+1))
                break
    def evaluate(self,episodes,render=False,verbose=False):
        total_reward = 0
        for i in range(episodes):
            obs = self.env.reset()
            done = False
            step = 0
            ep_reward=0
            while True:
                if render:
                    self.env.render()
                obs,reward,done,_ = self.env.step(int(self.policy[obs]))
                ep_reward += reward
                step += 1
                if done:
                    if verbose:
                        print(f"Took {step} steps")
                    total_reward += ep_reward
                    break
        return total_reward/episodes        

In [62]:
env_name = "FrozenLake8x8-v0"
env = gym.make(env_name)
pi = PolicyIteration(env)
pi.run()
print(pi.evaluate(10000))
# print(pi.policy)


Converged at iteration 12
0.8875
