In [1]:
import import_ipynb

In [2]:
from Environment import Environment
import numpy as np
import random
import easydict

importing Jupyter notebook from Environment.ipynb


In [3]:
class QAgent():
    
    def __init__(self, args, env):
        # set hyperparameters
        self.max_episodes = int(args.max_episodes)
        self.max_actions = int(args.max_actions)
        self.learning_rate = float(args.learning_rate)
        self.discount = float(args.discount)
        self.exploration_rate = float(args.exploration_rate)
        self.exploration_decay = 1.0/float(args.max_episodes)
        
        # get environmnet
        self.env = env
        
        # initialize Q(s, a)
        row = env.observation_space.n
        col = env.action_space.n
        self.Q = np.zeros((row, col))
    
    def _policy(self, mode, state, e_rate=0):
        if mode=='train':
            if random.random() > e_rate:
                return np.argmax(self.Q[state,:]) # exploitation
            else:
                return self.env.action_space.sample() # exploration
        elif mode=='test':
            return np.argmax(self.Q[state,:])
    
    def train(self):
        # get hyper-parameters
        max_episodes = self.max_episodes
        max_actions = self.max_actions
        learning_rate = self.learning_rate
        discount = self.discount
        exploration_rate = self.exploration_rate
        exploration_decay = 1.0/self.max_episodes
        
        # reset Q for initialize
        row = self.env.observation_space.n
        col = self.env.action_space.n
        self.Q = np.zeros((row, col))

        # start training
        for i in range(max_episodes):
            state = self.env.reset() # reset the environment per eisodes
            for a in range(max_actions):
                action = self._policy('train', state, exploration_rate)
                new_state, reward, done, info = self.env.step(action)
                # The formulation of updating Q(s, a)
                self.Q[state, action] = self.Q[state, action] + learning_rate*(reward+discount*np.max(self.Q[new_state, :]) -
                                                                               self.Q[state, action])
                state = new_state # update the current state
                if done == True:  # if fall in the hole or arrive to the goal, then this episode is terminated.
                    break
            if exploration_rate>0.001:
                exploration_rate -= exploration_decay
                
    def test(self):
        # Setting hyper-parameters
        max_actions = self.max_actions
        state = self.env.reset() # reset the environment
        for a in range(max_actions):
            self.env.render() # show the environment states
            action = np.argmax(self.Q[state,:]) # take action with the Optimal Policy
            new_state, reward, done, info = self.env.step(action) # arrive to next_state after taking the action
            state = new_state # update current state
            if done:
                print("======")
                self.env.render()
                break
            print("======")
        self.env.close()

In [4]:
if __name__ == '__main__':
    args = easydict.EasyDict({
        "max_episodes" : 200,
        "max_actions" : 99,
        "learning_rate" : 0.83,
        "discount" : 0.95,
        "exploration_rate" : 1.0
    })
    env = Environment().FrozenLakeNoSlippery() # construct the environment
    agent = QAgent(args, env) # get agent
    print("START TRAINING...")
    agent.train()
    print("\n==============\n\nTEST==============\n")
    agent.test()

START TRAINING...




[41mS[0mFFF
FHFH
FFFH
HFFG
  (Right)
S[41mF[0mFF
FHFH
FFFH
HFFG
  (Right)
SF[41mF[0mF
FHFH
FFFH
HFFG
  (Down)
SFFF
FH[41mF[0mH
FFFH
HFFG
  (Down)
SFFF
FHFH
FF[41mF[0mH
HFFG
  (Down)
SFFF
FHFH
FFFH
HF[41mF[0mG
  (Right)
SFFF
FHFH
FFFH
HFF[41mG[0m
