In [None]:
import collections
import random
import warnings

import gym
import numpy as np
import tensorflow as tf

import utils

keras = tf.keras

from keras.layers import Dense, Input
from keras.optimizers import Adam

warnings.filterwarnings("ignore", category=DeprecationWarning)

In [None]:
GAME = "mountain_car_dqn"
VERBOSITY = "0"
SAVE_FREQUENCY = 25

In [None]:
# Hyperparameters
EPISODES = 500
LEARNING_RATE = 0.001
GAMMA = 0.99

EPSILON = 1
EPSILON_DECAY = 0.99
EPSILON_MINIMUM = 0.01
ACTION_PROBABILITIES = [0.4, 0.2, 0.4]  # left, nothing, right

BATCH_SIZE = 32
REPLAY_BUFFER_SIZE = 100000

In [None]:
def instantiate_model(env):
    input = Input(shape=(env.observation_space.shape))
    dense1 = Dense(32, activation="relu")(input)
    dense2 = Dense(64, activation="relu")(dense1)
    output = Dense(env.action_space.n, activation="linear")(dense2)
    model = keras.Model(inputs=input, outputs=output)

    model.compile(loss="mse", optimizer=Adam(learning_rate=LEARNING_RATE))

    return model


def take_action(env, action):
    next_state, reward, done, _ = env.step(action)
    return next_state, reward, done


def shape_reward(state, next_state, reward):
    return reward + 300 * (abs(next_state[1]) - abs(state[1]))


def train_on_batch(model, replay_buffer):
    batch = random.sample(replay_buffer, BATCH_SIZE)

    states = np.array([x[0] for x in batch])
    actions = np.array([x[1] for x in batch])
    rewards = np.array([x[2] for x in batch])
    next_states = np.array([x[3] for x in batch])
    dones = np.array([x[4] for x in batch])

    targets = rewards + GAMMA * np.amax(
        np.squeeze(model.predict_on_batch(next_states)), axis=1) * (1 - dones)
    targets_full = np.squeeze(model.predict_on_batch(states))
    targets_full[np.arange(BATCH_SIZE), actions] = targets

    model.fit(states, targets_full, verbose=VERBOSITY)

In [None]:
env = gym.make("MountainCar-v0", new_step_api=False)

model = instantiate_model(env)

replay_buffer = collections.deque(maxlen=REPLAY_BUFFER_SIZE)

reward_history = []

# Training
for episode in range(EPISODES + 1):
    state = env.reset()
    episode_reward = 0
    done = False

    # Episode loop
    while not done:
        if np.random.uniform(0, 1) < EPSILON:
            action = np.random.choice(3, p=ACTION_PROBABILITIES)
        else:
            action = np.argmax(model(np.expand_dims(state, axis=0)))

        next_state, reward, done = take_action(env, action)
        reward = shape_reward(state, next_state, reward)
        episode_reward += reward

        # Store transition in replay buffer
        replay_buffer.append((state, action, reward, next_state, done))

        state = next_state

        # Sample batch and update model
        if len(replay_buffer) >= BATCH_SIZE:
            train_on_batch(model, replay_buffer)

    EPSILON *= EPSILON_DECAY
    EPSILON = max(EPSILON_MINIMUM, EPSILON)

    reward_history.append(episode_reward)

    utils.save_progress(model, reward_history, episode + 1, SAVE_FREQUENCY,
                        GAME)

    utils.log(episode, episode_reward, EPSILON)