In [1]:
# importing the libraries

import gym
import collections
from tensorboardX import SummaryWriter
import time

In [2]:
# global parameters

GAMMA = 0.8
TEST_EPISODES = 20
ENV = 'FrozenLake-v1'
#ENV = 'FrozenLake8x8-v1'

Value Iteration Methord

In [6]:
# agent class

class Agent:
    def __init__(self):
        '''
        (a) rewards : dtype  : dictionary
                      keys   : [(current state,action,new state)]
                      values : reward obtained due to the transition

        (b) transition : dtype : dictionary
                         keys  : [(current state,action)]
                         values : [Counter([state: number of times the state visited])]
        
        (c) value : dtype : dictionary
                    keys : states
                    values : state values (Updated in each iteration)

        
        '''
        self.env = gym.make(ENV)
        self.cur_state = self.env.reset()
        self.rewards = collections.defaultdict(float)   
        self.transition = collections.defaultdict(collections.Counter)
        self.value = collections.defaultdict(float)

    # play n steps randomly to explore the enviornment
    def play_n_steps(self,count):
        for _ in range(count):
            action = self.env.action_space.sample()  # random action
            new_state, reward, done, _ = self.env.step(action)
            self.rewards[(self.cur_state,action,new_state)] = reward
            self.transition[(self.cur_state,action)][new_state]+=1
            if done:
                self.cur_state = self.env.reset()
            else:
                self.cur_state = new_state

    def calc_state_action_value(self,state,action):
        s_dash_states = self.transition[(state,action)]
        total_count = sum(s_dash_states.values())
        action_value = 0.0
        for s_dash, count in s_dash_states.items():
            reward = self.rewards[(state,action,s_dash)]
            val = reward + GAMMA*self.value[s_dash]
            action_value+= (count/total_count)*val
        return action_value
    
    def best_action_select(self,state):
        best_action = None
        best_action_value = None
        for action in range(self.env.action_space.n):
            action_value = self.calc_state_action_value(state,action)
            if best_action_value is None or best_action_value<action_value:
                best_action_value = action_value
                best_action = action
        return best_action
    

    def update_state_value(self):
        for state in range(self.env.observation_space.n):
            state_values = [self.calc_state_action_value(state,action) 
            for action in range(self.env.action_space.n)]
            self.value[state] = max(state_values)

    def play_episode(self,env,render=False,video_frames = None):
        total_reward = 0.0
        state = env.reset()
        done = False
        while not done:
            action = self.best_action_select(state)
            new_state,reward,done,_ = env.step(action)
            self.rewards[(state,action,new_state)] = reward
            self.transition[(state,action)][new_state]+=1
            total_reward+=reward
            if render:
                env.render()
                video_frames.append(env.render(mode='rgb_array'))
            state = new_state
        env.close()
        return reward

In [None]:
video_frames = []
if __name__=="__main__":
     
    agent = Agent()
    writer = SummaryWriter(comment='-v-iteration')
    test_env = gym.make(ENV)

    iter_no = 0.0
    best_reward = 0.0
    while True:
        iter_no+=1
        print("Iteration number : "+str(iter_no))
        agent.play_n_steps(100)
        agent.update_state_value()

        reward = 0.0
        for i in range(TEST_EPISODES):
            reward += agent.play_episode(test_env)

        reward /= TEST_EPISODES
        writer.add_scalar("Reward FrozenLake 4x4",reward,iter_no)
        if reward>best_reward:
            best_reward = reward
            print("Reward updated : "+str(best_reward))
        if reward>0.8:
            print("Env solved in "+str(iter_no))
            break
    writer.close()
    agent.play_episode(test_env,True,video_frames)

Tabular Q Learning

In [9]:
# agent class

class Q_Agent:
    def __init__(self):
        self.env = gym.make(ENV)
        self.cur_state = self.env.reset()
        self.rewards = collections.defaultdict(float)
        self.transition = collections.defaultdict(collections.Counter)
        self.q_value = collections.defaultdict(float)

    def play_n_steps(self,count):
        for _ in range(count):
            action = self.env.action_space.sample()  # random action
            new_state, reward, done, _ = self.env.step(action)
            self.rewards[(self.cur_state,action,new_state)] = reward
            self.transition[(self.cur_state,action)][new_state]+=1
            if done:
                self.cur_state = self.env.reset()
            else:
                self.cur_state = new_state

    def best_action_select(self,state):
        best_action = None
        best_action_value = None
        for action in range(self.env.action_space.n):
            action_value = self.q_value[(state,action)]
            if best_action_value is None or best_action_value<action_value:
                best_action_value = action_value
                best_action = action
        return best_action
    

    def update_q_table(self):
        for state in range(self.env.observation_space.n):
            for action in range(self.env.action_space.n):
                action_value = 0.0
                s_dash_states = self.transition[(state,action)]
                total_count = sum(s_dash_states.values())
                for s_dash, count in s_dash_states.items():
                    reward = self.rewards[state,action,s_dash]
                    best_action = self.best_action_select(state)
                    val = reward + GAMMA*self.q_value[(s_dash,best_action)]
                    action_value+=(count/total_count)*val
                self.q_value[(state,action)] = action_value

    def play_episode(self,env,render=False,video_frames = None):
        total_reward = 0.0
        state = env.reset()
        done = False
        while not done:
            action = self.best_action_select(state)
            new_state,reward,done,_ = env.step(action)
            self.rewards[(state,action,new_state)] = reward
            self.transition[(state,action)][new_state]+=1
            total_reward+=reward
            state = new_state
            if render:
                env.render()
                video_frames.append(env.render(mode='rgb_array'))
        env.close()
        return reward

In [10]:
video_frames = []
if __name__=="__main__":
    agent = Q_Agent()
    writer = SummaryWriter(comment='-v-iteration')
    test_env = gym.make(ENV)

    
    iter_no = 0.0
    best_reward = float('-inf')
    while True:
        iter_no+=1
        print("Iteration number : "+str(iter_no))
        agent.play_n_steps(100)
        agent.update_q_table()

        reward = 0.0
        for i in range(TEST_EPISODES):
            reward += agent.play_episode(test_env)

        reward /= TEST_EPISODES
        writer.add_scalar("reward",reward,iter_no)
        print('Reward : '+str(reward))
        if reward>best_reward:
            best_reward = reward
            print("Reward updated : "+str(best_reward))
        if reward>0.8:
            print("Env solved in "+str(iter_no))
            break
    writer.close()
    agent.play_episode(test_env,True,video_frames)


Iteration number : 1.0
Reward : 0.0
Reward updated : 0.0
Iteration number : 2.0
Reward : 0.0
Iteration number : 3.0
Reward : 0.0
Iteration number : 4.0
Reward : 0.0
Iteration number : 5.0
Reward : 0.0
Iteration number : 6.0
Reward : 0.0
Iteration number : 7.0
Reward : 0.0
Iteration number : 8.0
Reward : 0.0
Iteration number : 9.0
Reward : 0.0
Iteration number : 10.0
Reward : 0.0
Iteration number : 11.0
Reward : 0.0
Iteration number : 12.0
Reward : 0.0
Iteration number : 13.0
Reward : 0.0
Iteration number : 14.0
Reward : 0.0
Iteration number : 15.0
Reward : 0.0
Iteration number : 16.0
Reward : 0.0
Iteration number : 17.0
Reward : 0.0
Iteration number : 18.0
Reward : 0.0
Iteration number : 19.0
Reward : 0.0
Iteration number : 20.0
Reward : 0.0
Iteration number : 21.0
Reward : 0.0
Iteration number : 22.0
Reward : 0.0
Iteration number : 23.0
Reward : 0.0
Iteration number : 24.0
Reward : 0.0
Iteration number : 25.0
Reward : 0.0
Iteration number : 26.0
Reward : 0.0
Iteration number : 27.0
Re

In [11]:
import numpy as np
import cv2
print(len(video_frames))
arr = np.asarray(video_frames)
size = (256,256)
out = cv2.VideoWriter('project_brown.mp4',cv2.VideoWriter_fourcc(*'DIVX'),15,size)
for i in range(len(video_frames)):
    rgb_img = cv2.cvtColor(arr[i],cv2.COLOR_RGB2BGR)
    out.write(rgb_img)
out.release()

15


OpenCV: FFMPEG: tag 0x58564944/'DIVX' is not supported with codec id 12 and format 'mp4 / MP4 (MPEG-4 Part 14)'
OpenCV: FFMPEG: fallback to use tag 0x7634706d/'mp4v'
