In [8]:
import gym
from gym.envs.registration import register
import numpy as np
import random
import tensorflow as tf
import tensorflow.keras as keras
import tensorflow.keras.layers as layers
from IPython.display import clear_output
from collections import deque

In [9]:

try:
    register(
        id="FrozenLakeNoSlip-v0",
        entry_point="gym.envs.toy_text:FrozenLakeEnv",
        kwargs={'map_name': "4x4", "is_slippery": False},
        max_episode_steps=100,
        reward_threshold=0.78  # optimum = .8194
    )
except:
    pass
env = gym.make("FrozenLakeNoSlip-v0")
discount_rate=0.97
learning_rate=0.001


In [10]:
class CustomSquaredError(keras.losses.Loss):
  def call(self, y_true, y_pred):
    #y_true[0] is q_target and y_true[1] is action
    reduced_pred = tf.reduce_sum(tf.multiply(y_pred,tf.one_hot([y_true[1]],depth=4)))
    #ans = tf.math.squared_difference(reduced_pred,y_true[0])
    ans = tf.reduce_sum(tf.math.squared_difference(y_true[0],reduced_pred))
    return ans

The custom squared error function was raising this error:

ValueError: Index out of range using input dim 0; input has only 0 dims for '{{node CustomMeanSquaredError/strided_slice}} = StridedSlice[Index=DT_INT32, T=DT_FLOAT, begin_mask=0, ellipsis_mask=0, end_mask=0, new_axis_mask=0, shrink_axis_mask=1](Cast, CustomMeanSquaredError/strided_slice/stack, CustomMeanSquaredError/strided_slice/stack_1, CustomMeanSquaredError/strided_slice/stack_2)' with input shapes: [], [1], [1], [1] and with computed input tensors: input[3] = <1>.

The fix was, instead of passsing in q_target as the true value, pass in a tensor containing q_target and reference its first index.

For reducing the prediction from a q state to one q value, you have to use sum because sometimes the q value is negative, so max won't work

Taking the max of the q-state as the q-value for the current action doesn't work because sometimes the action is random, so then its q value isn't the max of the q-state.

In [11]:
model = keras.Sequential([
    layers.Dense(units=4, input_shape=[16])])
model.compile(
    optimizer=keras.optimizers.Adam(learning_rate),
    loss=CustomSquaredError()
)
epsilon = 1.0
exp_buffer = deque(maxlen=1000)
model.summary()

Model: "sequential_1"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
dense_1 (Dense)              (None, 4)                 68        
Total params: 68
Trainable params: 68
Non-trainable params: 0
_________________________________________________________________


In [12]:
def makeDecision(state):
    action_greedy = np.argmax(model.predict(tf.one_hot([state],depth=16)))
    action_random = random.randint(0,3)
    return action_greedy if random.random()>epsilon else action_random
    

In [13]:
def train(experience):
    global epsilon
    batch_size=50
    exp_buffer.append(experience)
    samples = random.choices(exp_buffer,k=batch_size)
    state,action,next_state,reward,done=(list(col) for col in zip(*samples))
    q_next = model.predict(tf.one_hot([next_state],depth=16))[0]#next_state is state, but after the agent made an action
    #q_next is now the predicted q value for the next state, but wrapped like [[value]]
    q_next[done]=np.zeros(4)#syntax shortcut. for each index where the corresponding index of done is the same, set it to zeros
    q_target = reward +  discount_rate*np.max(q_next,axis=1)#for a win, q_target will be 1, and -0.5 for a loss
    current_state = tf.one_hot([state],depth=16)
    model.train_on_batch(x=current_state,y=tf.constant([q_target,action]))#q_target*tf.ones(shape=(4,)))

    if experience[4]:#if the current iteration is done(the done variable is now a list of the values for each experience in the batch)
        epsilon*=0.99


IF the below snippet raises "ValueError: Creating variables on a non-first call to a function decorated with tf.function.", run the model initialization cell again and it should work

In [14]:
total_reward = 0
wins=0
episodes = 100
samples=30#For central limit theorem
verbose=True
for experiment in range(30):
    model = keras.Sequential([
    layers.Dense(units=4, input_shape=[16])])
    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate),
        loss=CustomSquaredError()
    )
    epsilon = 1.0
    exp_buffer = deque(maxlen=1000)
    for episode in range(episodes):
        done = False
        state = env.reset()
        while not done:
            action = makeDecision(state)
            next_state, reward, done, info = env.step(action)#reward is one if the goal is reached and zero if the goal isn't reached
            if state==next_state or (done and next_state!=15):
                reward=-1
            train((state,action,next_state,reward,done))
            state=next_state
            total_reward+=reward
            if reward>0:
                wins+=1
            if verbose:
                print(f"Epsiode: {episode}, total_reward: {total_reward}, wins: {wins}, epsilon: {epsilon}")
                env.render()
                print(model.layers[0].get_weights()[0])
                clear_output(wait=True)


print(f"wins: {wins}, total reward: {total_reward}")

wins: 1, total reward: -573.0
