In [1]:
# show game
import gym
from gym import wrappers

env = gym.make('MountainCar-v0')
print(env.action_space.n)
print(env.observation_space)
print(env.observation_space.high)
print(env.observation_space.low)

env = wrappers.Monitor(env, "./gym-results", force=True)
env.reset()
for _ in range(1000):
    action = env.action_space.sample()
    observation, reward, done, info = env.step(action)
    if done: break
env.close()

3
Box(-1.2000000476837158, 0.6000000238418579, (2,), float32)
[0.6  0.07]
[-1.2  -0.07]


In [2]:
import tensorflow as tf
import numpy as np
import random
from collections import deque

active_n = env.action_space.n

# log_dir = 'tensorboard'
num_episodes = 300
num_exploration_episodes = 100
max_len_episode = 1000
batch_size = 32
learning_rate = 1e-3
gamma = 1.0
initial_epsilon = 1.0
final_epsilon = 0.012

Qmemory = 20000

class DQNetwork(tf.keras.Model):
    def __init__(self):
        super().__init__()
        self.dense1 = tf.keras.layers.Dense(units=32, activation=tf.nn.relu)
        self.dense2 = tf.keras.layers.Dense(units=32, activation=tf.nn.relu)
        self.dense3 = tf.keras.layers.Dense(units=active_n)

    def call(self, inputs):
        x = self.dense1(inputs)
        x = self.dense2(x)
        x = self.dense3(x)
        return x

    def predict(self, inputs):
        q_values = self(inputs)
        return tf.argmax(q_values, axis=-1)

In [4]:
if __name__ == '__main__':
    env = gym.make('MountainCar-v0')
    env = wrappers.Monitor(env, "./gym-results", force=True)
    
    model = DQNetwork()
    
#     optimizer = SGD(lr = learning_rate)
#     model.compile(optimizer = optimizer, loss = 'mse')
    optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
#     summary_writer = tf.summary.create_file_writer(log_dir)
#     tf.summary.trace_on(profiler=True)
    replay_buffer = deque(maxlen=Qmemory)
    epsilon = initial_epsilon
    
    for episode_id in range(num_episodes):
        # total_reward = 0
        state = env.reset()
        
        epsilon = max(
            initial_epsilon * (num_exploration_episodes - episode_id) / num_exploration_episodes,
            final_epsilon)
        
        for t in range(max_len_episode):
            
            env.render()
            
            if random.random() < epsilon:
                action = env.action_space.sample()
            else:
                # get Q value
                action = model.predict(np.expand_dims(state, axis=0)).numpy()
                action = action[0]

            next_state, reward, done, _ = env.step(action)
            
            # how to design your reward
            position, _ = next_state
            reward =  abs(position - (-0.5))
            # reward = -50.0 if done else reward
            # reward = 100* (new_state[0] * 0.01) + 0.5 * new_state[1] * new_state[1]) - (state[0] * 0.01 + 0.5 * state[1] * state[1])) 
            # total_reward += reward
            
            # update state
            replay_buffer.append((state, action, reward, next_state, 1 if done else 0))
            state = next_state

            if done:
                print(f"num {t}, episode {episode_id}, epsilon {epsilon}, score {reward}")
                break

            if len(replay_buffer) >= batch_size:
                batch_state, batch_action, batch_reward, batch_next_state, batch_done = zip(
                    *random.sample(replay_buffer, batch_size))
                
                batch_state, batch_reward, batch_next_state, batch_done = \
                    [np.array(a, dtype=np.float32) for a in [batch_state, batch_reward, batch_next_state, batch_done]]
                
                batch_action = np.array(batch_action, dtype=np.int32)

                # action value function Q
                q_value = model(batch_next_state)
                y = batch_reward + (gamma * tf.reduce_max(q_value, axis=1)) * (1 - batch_done)
                
                with tf.GradientTape() as tape:
                    loss = tf.keras.losses.mean_squared_error(
                        y_true=y,
                        y_pred=tf.reduce_sum(model(batch_state) * tf.one_hot(batch_action, depth=active_n), axis=1)
                    )
                grads = tape.gradient(loss, model.variables)
                optimizer.apply_gradients(grads_and_vars=zip(grads, model.variables))
                
#             with summary_writer.as_default():
#                 tf.summary.trace_export(name="model_trace", step=0, profiler_outdir=log_dir)
                
        # if total_reward >= max_reward:
            # agent.save_weights("mountaincar-v0.h5")
            # max_reward = total_reward
            
    env.close()

episode 0, epsilon 1.0, score -50.0


To change all layers to have dtype float64 by default, call `tf.keras.backend.set_floatx('float64')`. To change just this layer, pass dtype='float64' to the layer constructor. If you are the author of this layer, you can disable autocasting by passing autocast=False to the base Layer constructor.

episode 1, epsilon 0.99, score -50.0
episode 2, epsilon 0.98, score -50.0
episode 3, epsilon 0.97, score -50.0
episode 4, epsilon 0.96, score -50.0
episode 5, epsilon 0.95, score -50.0
episode 6, epsilon 0.94, score -50.0
episode 7, epsilon 0.93, score -50.0
episode 8, epsilon 0.92, score -50.0
episode 9, epsilon 0.91, score -50.0
episode 10, epsilon 0.9, score -50.0
episode 11, epsilon 0.89, score -50.0
episode 12, epsilon 0.88, score -50.0
episode 13, epsilon 0.87, score -50.0
episode 14, epsilon 0.86, score -50.0
episode 15, epsilon 0.85, score -50.0
episode 16, epsilon 0.84, score -50.0
episode 17, epsilon 0.83, score -50.0
episode 18, epsilon 0.82, s