In [1]:
"""
    SOURCE:
            https://levelup.gitconnected.com/dqn-from-scratch-with-tensorflow-2-eb0541151049
            https://towardsdatascience.com/explaining-double-q-learning-for-openai-environments-using-the-movie-tenet-816dc952f41c
            https://github.com/perseus784/Vehicle_Overtake_Double_DQN
"""

'\n    SOURCE:\n            https://levelup.gitconnected.com/dqn-from-scratch-with-tensorflow-2-eb0541151049\n            https://towardsdatascience.com/explaining-double-q-learning-for-openai-environments-using-the-movie-tenet-816dc952f41c\n            https://github.com/perseus784/Vehicle_Overtake_Double_DQN\n'

In [2]:
import random
import numpy as np
from collections import deque

class ReplayBuffer:
    """
    Replay Buffer Stores and retrieves gameplay experiences
    """
    def __init__(self):
        self.gameplay_experiences = deque(maxlen=100000)
    
    def store_gameplay_experience(self, state, next_state, reward, action, done):
        """
        Records a single step (state transition) of gameplay experience.
        :param state: the current game state
        :param next_state: the game state after taking action
        :param reward: the reward taking action at the current state brings
        :param action: the action taken at the current state
        :param done: a boolean indicating if the game is finished after
        taking the action
        :return: None
        """
        self.gameplay_experiences.append((state, next_state, reward, action, done))
    
    def sample_gameplay_batch(self):
        """
        Samples a batch of gameplay experiences for training.
        :return: a list of gameplay experiences
        """
        batch_size = min(128, len(self.gameplay_experiences))
        sampled_gameplay_batch = random.sample(self.gameplay_experiences, batch_size)
        state_batch = []
        next_state_batch = []
        action_batch = []
        reward_batch = []
        done_batch = []
        
        for gameplay_experience in sampled_gameplay_batch:
            state_batch.append(gameplay_experience[0])
            next_state_batch.append(gameplay_experience[1])
            reward_batch.append(gameplay_experience[2])
            action_batch.append(gameplay_experience[3])
            done_batch.append(gameplay_experience[4])
            
        return np.array(state_batch), np.array(next_state_batch), np.array(
            action_batch), np.array(reward_batch), np.array(done_batch)

In [3]:
import tensorflow as tf
import numpy as np
from tensorflow.keras import Sequential, Model
from tensorflow.keras.layers import Dense, Flatten, Convolution2D, MaxPool2D, Dropout, Input, Conv2D
from tensorflow.keras.optimizers import Adam

IM_HEIGHT = 96
IM_WIDTH = 96
IM_CHANNEL = 3
NUM_ACTIONS = 8
LEARNING_RATE = .001
EXPLORATION_RATE = .05
GAMMA = .95

class DQNAgent:
    """
    DQN Agent
    The agent that explores the game and learn how to play the game by
    learning how to predict the expected long-term return, the Q value given
    a state-action pair.
    """

    def __init__(self):
        self.q_net = self._build_dqn_model()
        self.target_q_net = self._build_dqn_model()
    
    @staticmethod
    def _build_dqn_model():
        """
        Builds a deep neural net which predicts the Q values for all possible
        actions given a state. The input should have the shape of the state, and
        the output should have the same shape as the action space since we want
        1 Q value per possible action.
        :return: Q network
        """
        inp = Input((IM_HEIGHT, IM_WIDTH, IM_CHANNEL))
        
        x = Conv2D(32, (3,3), activation='relu')(inp)
        x = Conv2D(32, (3,3), activation='relu')(x)
        x = Conv2D(64, (3,3), activation='relu')(x)
        x = MaxPool2D((2,2))(x)
        x = Dropout(0.2)(x)

        x = Conv2D(64, (3,3), activation='relu')(x)
        x = Conv2D(128, (3,3), activation='relu')(x)
        x = MaxPool2D((2,2))(x)
        x = Dropout(0.2)(x)

        x = Conv2D(128, (3,3), activation='relu')(x)
        x = Conv2D(256, (3,3), activation='relu')(x)
        x = MaxPool2D((2,2))(x)
        x = Dropout(0.2)(x)

        x = Conv2D(512, (3,3), activation='relu')(x)
        x = MaxPool2D((2,2))(x)
        x = Flatten()(x)

        x = Dense(512, activation='relu')(x)
        x = Dropout(0.1)(x)
        x = Dense(512, activation='relu')(x)
        x = Dense(NUM_ACTIONS, activation='linear')(x)

        q_net = Model(inputs=inp, outputs=x)
        q_net.summary()
        
        q_net.compile(optimizer=Adam(learning_rate=LEARNING_RATE), loss='mse')
        
        return q_net
    
    def map_int_to_action(self, n):
        """
        Maps the integer value to an action in env.action_space values for CarRacing-v0
        :param n: an integer value
        :return: action
        """
        discrete_action_space = {
            "turn_left":[-1,0,0],
            "turn_right":[1,0,0],
            "go":[0,1,0],
            "go_left":[-1,1,0],
            "go_right":[1,1,0],
            "brake":[0,0,1],
            "brake_left":[-1,0,1],
            "brake_right":[1,0,1]
        }
        discrete_actions = list(discrete_action_space.values())
        action = (discrete_actions[n])
        return action

    def random_policy(self, num_actions=NUM_ACTIONS):
        """
        Outputs a random action
        :param num_actions: number of actions in the env.action_space
        :return: action
        """
        n = np.random.randint(num_actions)
        action = self.map_int_to_action(n)
        return action

    def collect_policy(self, state, num_actions=NUM_ACTIONS, exploration_rate=EXPLORATION_RATE):
        """
        Similar to policy but with some randomness to encourage exploration.
        :param state: the game state
        :param num_actions: number of actions in the env.action_space
        :return: action
        """
        if np.random.random() < exploration_rate:
            return self.random_policy(num_actions)
        return self.policy(state)

    def policy(self, state):
        """
        Takes a state from the game environment and returns an action that
        has the highest Q value and should be taken as the next step.
        i.e. run the state through the q_net and take the action which is the index that has the highest Q value
        :param state: the current game environment state
        :return: an action
        """
        state_input = tf.convert_to_tensor(state[None, :], dtype=tf.float32)
        action_q = self.target_q_net(state_input)
        best_action = np.argmax(action_q.numpy()[0], axis=0)
        
        # if you are using CarRacing-v0 environment with discrete actions,
        # you have to map the int value to [int, int, int]
        # Creating discrete actions
        best_action = self.map_int_to_action(best_action)
        #print(best_action)
        
        return best_action

    def update_target_network(self):
        """
        Updates the current target_q_net with the q_net which brings all the
        training in the q_net to the target_q_net.
        :return: None
        """
        self.target_q_net.set_weights(self.q_net.get_weights())

    def train(self, batch):
        """
        Trains the underlying network with a batch of gameplay experiences to
        help it better predict the Q values.
        :param batch: a batch of gameplay experiences
        :return: training loss
        """
        # training the model from the feed coming from the replay batch
        state_batch, next_state_batch, action_batch, reward_batch, done_batch = batch
        current_q_values = self.q_net(state_batch).numpy()
        # note that in DQN, only q-value of the actions that were taking will be updated
        # hence copying the q_value for the actions that were not taken in target_q_values
        target_q_values = np.copy(current_q_values)
        # getting the max Q values of the states after transition by running the next_state through target_q_net
        # and taking the max of Q values for all actions for each sample
        next_q_values = self.target_q_net(next_state_batch).numpy()
        max_next_q_values = np.amax(next_q_values, axis=1)
        for i in range(state_batch.shape[0]):
            # updating the q_value of the action taken with the max_q_values of the next_state plus the
            # intermediate reward from the action taken
            target_q_val = reward_batch[i]
            if not done_batch[i]:
                target_q_val += GAMMA * max_next_q_values[i]
            target_q_values[i][action_batch[i]] = target_q_val
        # training the q_net with the target_q_values
        training_history = self.q_net.fit(x=state_batch, y=target_q_values, verbose=0)
        loss = training_history.history['loss']
        return loss

In [4]:
"""
Training loop
This module trains the DQN agent by trial and error. In this module the DQN
agent will play the game episode by episode, store the gameplay experiences
and then use the saved gameplay experiences to train the underlying model.
"""
import gym


def evaluate_training_result(env, agent, num_episodes=10):
    """
    Evaluates the performance of the current DQN agent by using it to play a
    few episodes of the game and then calculates the average reward it gets.
    The higher the average reward is the better the DQN agent performs.
    :param env: the game environment
    :param agent: the DQN agent
    :return: average reward across episodes
    """
    total_reward = 0.
    for episode in range(num_episodes):
        state = env.reset()
        done = False
        episode_reward = 0.
        while not done:
            action = agent.policy(state)
            next_state, reward, done, _ = env.step(action)
            episode_reward += reward
            state = next_state
        total_reward += episode_reward
    average_reward = total_reward / num_episodes
    return average_reward


def collect_gameplay_experiences(env, agent, buffer):
    """
    Collects gameplay experiences by playing env with the instructions
    produced by agent and stores the gameplay experiences in buffer.
    :param env: the game environment
    :param agent: the DQN agent
    :param buffer: the replay buffer
    :return: None
    """
    state = env.reset()
    done = False
    while not done:
        action = agent.collect_policy(state)
        next_state, reward, done, _ = env.step(action)
        if done:
            reward = -1.0
        buffer.store_gameplay_experience(state, next_state,
                                         reward, action, done)
        state = next_state


def train_model(max_episodes=50000, update_indicator=10, visualize=False):
    """
    Trains a DQN agent to play the highway-v0 game by trial and error
    :return: None
    """
    agent = DQNAgent()
    buffer = ReplayBuffer()
    env = gym.make('CarRacing-v0')
    for episode in range(1, max_episodes+1):
        collect_gameplay_experiences(env, agent, buffer)
        gameplay_experience_batch = buffer.sample_gameplay_batch()
        loss = agent.train(gameplay_experience_batch)
        average_reward = evaluate_training_result(env, agent)
        print('episode: %d/%d\tavg(reward): %.2f\tloss: %.2f'%(episode, max_episodes, average_reward, loss[0]))
        if episode % update_indicator == 0:
            agent.update_target_network()
    env.close()
        


train_model(max_episodes=2, update_indicator=1, visualize=False)

Model: "model"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_1 (InputLayer)         [(None, 96, 96, 3)]       0         
_________________________________________________________________
conv2d (Conv2D)              (None, 94, 94, 32)        896       
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 92, 92, 32)        9248      
_________________________________________________________________
conv2d_2 (Conv2D)            (None, 90, 90, 64)        18496     
_________________________________________________________________
max_pooling2d (MaxPooling2D) (None, 45, 45, 64)        0         
_________________________________________________________________
dropout (Dropout)            (None, 45, 45, 64)        0         
_________________________________________________________________
conv2d_3 (Conv2D)            (None, 43, 43, 64)        36928 



Track generation: 1139..1428 -> 289-tiles track
Track generation: 1217..1525 -> 308-tiles track
Track generation: 1155..1448 -> 293-tiles track
Track generation: 1236..1549 -> 313-tiles track
Track generation: 1128..1422 -> 294-tiles track
Track generation: 1100..1379 -> 279-tiles track
Track generation: 1053..1320 -> 267-tiles track
Track generation: 1055..1323 -> 268-tiles track
Track generation: 1228..1539 -> 311-tiles track
Track generation: 1121..1405 -> 284-tiles track
Track generation: 1197..1505 -> 308-tiles track
episode: 1/2	avg(reward): -74.43	loss: 234.93
Track generation: 1196..1508 -> 312-tiles track
Track generation: 1156..1449 -> 293-tiles track
Track generation: 1168..1464 -> 296-tiles track
Track generation: 1172..1469 -> 297-tiles track
Track generation: 1104..1384 -> 280-tiles track
Track generation: 1224..1534 -> 310-tiles track
Track generation: 1004..1268 -> 264-tiles track
Track generation: 1022..1288 -> 266-tiles track
Track generation: 1264..1584 -> 320-tiles 