In [1]:
import tensorflow as tf #Tensorflow handles the Training and Testing
from tensorflow import keras #Keras handles the importing of Data
import numpy as np #NumPy does funny math good
import gym #imports OpenAI Gym which has a bunch of environments(games) to play with
import matplotlib.pyplot as plt
from statistics import mean, median 
from tqdm import tqdm
from keras.models import load_model
from keras.activations import relu, linear
from keras.optimizers import Adam
from keras.losses import mean_squared_error
import random
from collections import deque 

In [7]:
env = gym.make("CarRacing-v0")
print(env.action_space.shape[0])
print(env.observation_space)

3
Box(0, 255, (96, 96, 3), uint8)


In [None]:
#Runs 5 games with bottom thruster firing 
for _ in range(10000):
    env.reset()
    for s in range(300):
        
        env.render(True) #Renders Environment. CAUTION: Rendering takes more time to train
        
        #Picks Action based on max reward
        action = 0
        if s%3==0: #Fires every 3rd frame
            action = 2
        
        new_state, reward, done, _ = env.step(action) #Takes the action

        #changes states
        state = new_state

        # #Handles if game finished
        # if done:
        #     break

In [None]:
#Setting up the DQN
class DQN():
    def __init__(self, env, lr, gamma, epsilon, epsilon_decay):
        # Hyperparameters
        self.lr = lr
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_decay = epsilon_decay
        self.epsilon_min = 0.01

        # Environment variables
        self.env = env
        self.action_space = env.action_space
        self.observation_space = env.observation_space
        self.num_action_space = env.action_space.n
        self.num_observation_space = env.observation_space.shape[0]

        # Training Variables 
        self.training_data = deque(maxlen=500000)
        self.rewards_list = []
        self.batch_size = 64
        self.high_score = -8000

        # Creating DQN with Architecture 512-256-4
        model = keras.Sequential()
        model.add(keras.layers.Dense(512, input_dim=self.num_observation_space, activation=relu))
        model.add(keras.layers.Dense(256, activation=relu))
        model.add(keras.layers.Dense(self.num_action_space, activation=linear))

        # Compiling Model using MSE Loss and Adam Optimizer
        model.compile(loss=mean_squared_error, optimizer=Adam(lr=self.lr))

        self.model = model
        print(model.summary())
    
    # Chooses an action based on the Epsilon value (Random action Epsilon% of the time)
    def get_action(self, state):
        if np.random.rand() < self.epsilon:
            return random.randrange(self.num_action_space)
        return np.argmax(self.model.predict(state)[0])

    # Trains model based off of Cumulative Training Data
    def learn(self): #COME BACK HERE AND FIND OUT WTF HAPPENED

        # Cancels Training if there is insufficient data or if ther model is sufficiently trained
        if len(self.training_data) < self.batch_size:
            return
        if np.mean(self.rewards_list[-10:]) > 180:
            return

        # Randomly Samples frames out of Training Data based on self.batch_size
        sample = random.sample(self.training_data, self.batch_size)
        
        # Extracts components from each frame and condenses them into arrays
        states = np.squeeze(np.squeeze(np.array([i[0] for i in sample])))
        actions = np.array([i[1] for i in sample])
        rewards = np.array([i[2] for i in sample])
        new_states = np.squeeze(np.array([i[3] for i in sample]))
        done_list = np.array([i[4] for i in sample])
        
        # Creates "targets" for model.fit()
        targets = rewards + self.gamma * (np.amax(self.model.predict_on_batch(new_states), axis=1)) * (1 - done_list)
        target_vec = self.model.predict_on_batch(states)
        indexes = np.array([i for i in range(len(self.batch_size))])
        target_vec[[indexes], [actions]] = targets
        
        self.model.fit(states, target_vec, epochs=1, verbose=0)

    # Handles Generating Training Episodes and Trains Model
    def train(self, episodes = 500):
        progress = tqdm(total=episodes, position=0, leave=False)
        
        # Epsiodes Loop
        for e in range(episodes):
            progress.update(1)

            state = env.reset()
            episode_reward = 0
            MAX_STEPS = 1000
            state = np.reshape(state, [1, self.num_observation_space])

            # Step Loop
            for s in range(MAX_STEPS):
                #env.render()

                action = self.get_action(state) # Chooses action

                new_state, reward, done, _ = env.step(action) # Takes Action and records New State
                new_state = np.reshape(new_state, [1, self.num_observation_space])

                self.training_data.append((state, action, reward, new_state, done)) # adds information about the fram to training data
                
                episode_reward += reward # Reward tally

                state = new_state #Progressing of game
                
                self.learn() 

                if done:
                    break

            self.rewards_list.append(episode_reward) # Tracks rewards and keeps a high score
            if self.high_score < episode_reward:
                self.high_score = episode_reward

            if self.epsilon > self.epsilon_min: # Handles epsilon decay over the course of the episode
                self.epsilon *= self.epsilon_decay #(episodes-e)/episodes 
            
            if np.mean(self.rewards_list[-100:]) > 200: # Stops training if Scores are above 200
                print("Average Score: 200. Training Completed...")
                break
            
            
    
            print(" || Reward: ", "%.2f" % episode_reward, "\t|| Average Reward: ", "%.2f" % np.mean(self.rewards_list[-100:]), "\t epsilon: ", "%.4f" % self.epsilon )

        print("Training Complete...")
        print("Highest Training Score:", self.high_score)

    # Saves Model in .h5 format
    def save(self, name):
        self.model.save(name)
