In [49]:
import gym
import tensorflow as tf
import random
import numpy as np
from collections import deque
from gym.envs.registration import register        # <<<--- To manipulate the registry of the FrozenLake Source to remove slipping 
from IPython.display import clear_output

In [50]:
class Agent:
    def __init__(self, env):
        self.is_discrete = type(env.action_space) == gym.spaces.discrete.Discrete
        #     ^^^^ <<< Takes 1 if discrete else 0
        
        if self.is_discrete:
            self.action_size = env.action_space.n
        else:
            self.action_low = env.action_space.low
            self.action_high = env.action_space.high
            self.action_size = env.action_space.shape
        
    def get_action(self):
        if self.is_discrete:
            a = random.choice(range(self.action_size))
        else:             # vvvv <<< takes low high and shape of output as args
            a = np.random.uniform(self.action_low,
                                  self.action_high,
                                  self.action_size)
        return a

try:
    register(
        id='FrozenLake-v0',
        entry_point='gym.envs.toy_text:FrozenLakeEnv',
        kwargs={'map_name' : '4x4', 'is_slippery':False},
        max_episode_steps=100,
        reward_threshold=0.78, # optimum = .8196
    )
except:
    pass

env_name = "FrozenLake-v0"
env = gym.make(env_name)
print("Observation space:", env.observation_space)
print("Action space:", env.action_space)
type(env.action_space)

Observation space: Discrete(16)
Action space: Discrete(4)


gym.spaces.discrete.Discrete

In [51]:
class Agent:
    def __init__(self, env):
        self.is_discrete = \
            type(env.action_space) == gym.spaces.discrete.Discrete
        
        if self.is_discrete:
            self.action_size = env.action_space.n
            print("Action size:", self.action_size)
        else:
            self.action_low = env.action_space.low
            self.action_high = env.action_space.high
            self.action_shape = env.action_space.shape
            print("Action range:", self.action_low, self.action_high)
        
    def get_action(self):
        if self.is_discrete:
            action = random.choice(range(self.action_size))
        else:
            action = np.random.uniform(self.action_low,
                                       self.action_high,
                                       self.action_shape)
        return action

In [52]:
class QNAgent(Agent):
    def __init__(self, env, discount_rate=0.97, learning_rate=0.01, memory_size = 1000000, batch_size = 16):
        super().__init__(env)
        self.state_size = env.observation_space.n
        print("State size:", self.state_size)
        self.memory = deque(maxlen = memory_size)
        self.batch_size = batch_size
        self.eps = 1.0
        self.eps_decay = 0.99
        self.eps_min = 0.01
        self.discount_rate = discount_rate
        self.learning_rate = learning_rate
        self.build_model()
        
        
    def build_model(self):
        self.model = tf.keras.models.Sequential([
            tf.keras.layers.Dense(32, 
                                  input_shape = (self.state_size,), 
                                  activation = tf.keras.activations.relu), 
            
            tf.keras.layers.Dense(16, 
                                  activation = tf.keras.activations.relu),
            
            tf.keras.layers.Dense(self.action_size, 
                                  activation = tf.keras.activations.softmax)
        ])
        
        self.model.compile(optimizer = tf.keras.optimizers.Adam(lr = self.learning_rate),
                           loss = tf.keras.losses.mse,
                           metrics = [tf.keras.metrics.mean_squared_error])
    
    def get_action(self, state):
        exploit_action = np.argmax(self.model.predict(state)[0])
        explore_action = random.choice(range(self.action_size))
        return explore_action if random.random() < self.eps else exploit_action

    def append_memory(self, state, action, next_state, reward, done):
        self.memory.append((state, action, next_state, reward, done))
        
    def exp_replay(self):
        if len(self.memory) < self.batch_size:
            return
        
        trainer_batch = random.sample(self.memory, self.batch_size)
        
        for state, action, next_state, reward, done in trainer_batch:
            q_values = self.model.predict(state)
            q_update = reward if done else reward + (self.discount_rate * np.argmax(self.model.predict(next_state)[0]))
            q_values[0][action] = q_update
            
            self.model.fit(state, q_values)
        
        self.eps = max(self.eps_min, self.eps * self.eps_decay)
        
        
        
QNAgent = QNAgent(env)

Action size: 4
State size: 16


In [53]:
train_episodes = 100

for epi in range(train_episodes):
    state = env.reset()
    # state = np.reshape(state, [1, env.observation_space.n])
    
    done = False
    time_steps = 0
    while not done:
        time_steps += 1
        action = QNAgent.get_action(state)
        state_next, reward, done, info = env.step(action)
        if done:
            reward = -reward
        state_next = np.reshape(state_next, [1, QNAgent.state_size])
        
        QNAgent.append_memory(state, action, state_next, reward, done)
        state = state_next
        
        if done:
            print(f"Episode : {epi}, Score : {time_steps}, Epsilon : {QNAgent.eps}")
            break
        

ValueError: cannot reshape array of size 1 into shape (1,16)

In [43]:
state = np.zeros([env.observation_space.n, 1])
state.shape

(16, 1)

In [48]:
env.observation_space.shape

()