In [None]:
import gym
import gridworlds
import numpy as np; np.random.seed(0)
import seaborn as sns; sns.set_theme()
import matplotlib
import matplotlib.pyplot as plt
import cv2 as cv

In [None]:
class Agent():
    def __init__(self, env, epsilon, alpha, gamma, decay_factor, render=False):
        self.env = env
        self.epsilon = epsilon
        self.alpha = alpha
        self.gamma = gamma
        self.decay_factor = decay_factor
        self.render = render      
        self.Q = np.zeros([env.env.height, env.env.width, env.env.action_space.n])
        
        reward_pos = env.config['start_position']
        terminal_pos = env.config['reward_position']
        self.state_list = np.indices((env.env.height, env.env.width)).transpose(1, 2, 0).reshape(-1, 2).astype(str)
        
        self.state_list = [', '.join(ele) for ele in self.state_list]       
        self.action_list = ["UP", "RIGHT", "DOWN", "LEFT"]

            
    def epsilon_greedy(self, state):
        p = np.random.uniform()
        
        if p <= 1-self.epsilon:
            # choose optimal action, if several optimal actions, action is choosen randomly between them
            actions = self.Q[state[0], state[1], :]
            return np.random.choice(np.flatnonzero(actions == actions.max())) 
            
        else:
            # choose random between actions
            return env.env.action_space.sample()
        
    def q_learning(self, state, action, suc_state, reward):
        max_q_suc = np.max(self.Q[suc_state[0], suc_state[1], :])
        current_q = self.Q[state[0], state[1], action]
        
        # update action-value according to Q-learning
        self.Q[state[0], state[1], action] = current_q + self.alpha * (reward + self.gamma * max_q_suc - current_q)

        
    def train_agent(self, episodes, max_timesteps):
        steps = np.zeros(episodes)
        print('[INFO] - Training of the Agent started.')
        
        for episode in range(episodes):
            state = self.env.reset()
                        
            for t in range(max_timesteps):  # cap timesteps
                action = self.epsilon_greedy(state)        
                suc_state, reward, done, _ = self.env.step(action)
                
                self.q_learning(state, action, suc_state, reward)
                state = suc_state
                
                if self.render == True: self.env.render()

                if done:
                    steps[episode] = t+1    # starts at 0
                    break
            
            if self.epsilon >= 0.0005:
                self.epsilon -= self.decay_factor            
            
            if episode%50 == 0 and episode != 0:
                self.plot_q(episode)
                
        if self.render == True: env.close() # env.close not implemented
            
        print(f'[INFO] - Finished training with epsisode: {episode}.')
        self.plot_q(episode)
        self.plot_steps(steps)
        
        print('[INFO] - Running agent with optimal policy.')
        self.behave_optimal()

        print('[INFO] - Finished training.')
            
        return self.Q
    
    def behave_optimal(self):
        state = self.env.reset()
        done = False
        
        while not done:  # max timesteps
            # take optimal action
            actions = self.Q[state[0], state[1], :]
            action = np.random.choice(np.flatnonzero(actions == actions.max()))
            
            suc_state, reward, done, _ = self.env.step(action)

            state = suc_state

            self.env.render()
    
        cv.destroyAllWindows()
    
    def plot_q(self, episode):
        y_labels = self.state_list.copy()        
        fig, ax = plt.subplots(figsize=(10,10))
        ax = sns.heatmap(self.Q.reshape((-1, 4)), vmin=np.min(self.Q), vmax=np.max(self.Q), linewidths=.5, annot=True, fmt='.4f', ax=ax)
        ax.set_yticklabels(y_labels, rotation=0)
        ax.set_xticklabels(self.action_list)
        ax.set_title(f'Q-table after episode {episode}.')
        plt.show()
    
    def plot_steps(self, steps):
        print(steps)
        fig, ax = plt.subplots()
        ax.plot(steps)
        ax.set(xlabel='Episode', ylabel='Steps', title='Amount of steps taken by the Agent during training')
        plt.show()

In [None]:
env = gym.make('gridworld-v0')

q_learning_agent = Agent(env,
                       alpha=0.1,      # learning rate
                       epsilon=0.4,    # epsilon_greedy
                       gamma=0.9,      # discount factor (0-myopic; 1-far sighted)
                       decay_factor=0.0001,
                       render=False)
                       
print(f'The environment has {env.env.observation_space.n} states and {env.env.action_space.n} actions, with an state space of {env.env.observation_space} and an action space of {env.env.action_space}.')

In [None]:
v_star = q_learning_agent.train_agent(episodes=500,
                                max_timesteps=100)

In [None]:
fig, ax = plt.subplots()
ax.plot(np.arange(10))
ax.set(xlabel='Episode', 
       ylabel='Steps', 
       title='Amount of steps taken by the Agent during training')

plt.show() 

In [2]:
import tensorflow as tf
from tensorflow.keras.layers import Dense, ReLU

class DQN(tf.keras.Model):
    def __init__(self, input_dim, output_dim, ):
        super(DQN, self).__init__()

        self.layer = [
                Dense(256, input_dim=input_dim),
                ReLU(),
                Dense(128),
                ReLU(),
                Dense(output_dim, use_bias=False)
        ]

    @tf.function
    def call(self, x, training=False):
        for l in self.layer:
            # training argument for BN and DropOut
            try:
                x = l(x, training)
            except:
                x = l(x)
        return x


In [43]:
import numpy as np

model = DQN((64, 4), 2)

states = tf.constant(np.arange(4*64).reshape(-1, 4).reshape(64, -1))
#tf.reduce_max(model(states), axis=-1, keepdims=True)
states[0]


<tf.Tensor: shape=(4,), dtype=int64, numpy=array([0, 1, 2, 3])>

In [31]:
actions = np.ones(64, dtype=np.int32).reshape(64, -1)
actions[::2] = 0
actions = tf.constant(actions)

In [42]:
out = model(states[0,:].numpy())
print(out)

states[0]



ValueError: in user code:

    <ipython-input-2-034da88ce888>:23 call  *
        x = l(x)
    /home/janosch/miniconda3/envs/tf_2.4/lib/python3.7/site-packages/tensorflow/python/keras/engine/base_layer.py:998 __call__  **
        input_spec.assert_input_compatibility(self.input_spec, inputs, self.name)
    /home/janosch/miniconda3/envs/tf_2.4/lib/python3.7/site-packages/tensorflow/python/keras/engine/input_spec.py:239 assert_input_compatibility
        str(tuple(shape)))

    ValueError: Input 0 of layer dense_9 is incompatible with the layer: : expected min_ndim=2, found ndim=1. Full shape received: (4,)
