In [None]:
import gym
import tensorflow as tf
import numpy as np
from tensorflow import keras
import matplotlib.pyplot as plt
from collections import deque
import time
import random
from PIL import Image

RANDOM_SEED = 5
tf.random.set_seed(RANDOM_SEED)

In [None]:
env = gym.make('AssaultNoFrameskip-v0')
env = gym.wrappers.AtariPreprocessing(env, noop_max=30, screen_size=84, terminal_on_life_loss=False, grayscale_obs=True, grayscale_newaxis=True, scale_obs=True)

print("Action Space: {}".format(env.action_space))
print("State space: {}".format(env.observation_space))

In [None]:
from time import sleep

epochs, rewards = 0, 0
state = env.reset()
done = False

while not done:
    env.render()
    #sleep(0.1)
    action = env.action_space.sample()
    state, reward, done, info = env.step(action)
    rewards  += reward
    epochs += 1
env.close()
print(f"Number of steps: {epochs}")  

In [None]:
Steps = []
for episode in range(100):
        n_steps_episode = 0
        total_training_rewards = 0
        observation = env.reset()
        done = False
        while not done:
            action = env.action_space.sample()
            new_observation, reward, done, info = env.step(action)
            total_training_rewards += reward  
            observation = new_observation
            
            n_steps_episode += 1.

        Steps.append(n_steps_episode)

np.mean(Steps) 

In [None]:
def agent(state_shape, action_shape):
  learning_rate = 0.001
  init = tf.keras.initializers.HeUniform()
  model = keras.Sequential() 
  model.add(keras.layers.Input(shape=(state_shape)))
  model.add(keras.layers.Conv2D(32,kernel_size=(3,3),strides=(2,2)))
  model.add(keras.layers.MaxPooling2D(pool_size=(2,2)))
  model.add(keras.layers.BatchNormalization())
  model.add(keras.layers.Flatten())

  model.add(keras.layers.Dense(128, activation='relu', kernel_initializer=init))
  model.add(keras.layers.Dense(128, activation='relu', kernel_initializer=init))
  model.add(keras.layers.Dense(action_shape, activation='linear', kernel_initializer=init))
  model.compile(loss=tf.keras.losses.Huber(), optimizer=tf.keras.optimizers.Adam(lr=learning_rate), metrics=['accuracy'])

  return model

In [None]:
train_episodes = 250
test_episodes = 100

epsilon = 1 
max_epsilon = 1
min_epsilon = 0.01
decay = 0.01

In [None]:
# Main prediction Model (updated every 4 steps)
model = agent(env.observation_space.shape, env.action_space.n)
# Target Model (updated every 100 steps)
target_model = agent(env.observation_space.shape, env.action_space.n)

# Set the weights of the target model to be equal to the weights of the prediction model
target_model.set_weights(model.get_weights())

# Initialize the replay_memory
replay_memory = deque(maxlen=50_000)

In [None]:
def train(env, replay_memory, model, target_model, done):
    # We define the learning rate and discount factor
    learning_rate = 0.7 # Learning rate
    discount_factor = 0.618

    # We will only start training the model, once we have a replay memory of at least 1000 steps.
    MIN_REPLAY_SIZE = 1000
    # Therefore, if we have less than 1000, we will skip the training (by using return in the function)
    if len(replay_memory) < MIN_REPLAY_SIZE:
        return
    
    
    
    # We will train our model on 500 random steps from the replay memory each time
    batch_size = 500
    mini_batch = random.sample(replay_memory, batch_size)
    
    ## We need to define our predictor (X) and outcome (Y) to be able to train the model.  
        # Note that this environment uses 'observation' to refer to the 'state'
        # It is important to realize that in the replay_memory, we will store the different steps as 
          # [observation, action, reward, new_observation, done]
    
    # First, we transform or selected batch into readable states for the model (so, we select the first element [0] from the replay memory: the 'observation')
    current_states = np.array([transition[0] for transition in mini_batch])
        # Then, we predict the current Q-values, using the prediction network 
    current_qs_list = model.predict(current_states)
        # Then we extract the next state that the agent will end up in , so the fourth elemenent [3]
    new_current_states = np.array([transition[3] for transition in mini_batch])
        # We predict the q-value of the next state by using the target model
    future_qs_list = target_model.predict(new_current_states)
  
  
        # Define 2 empty vectors for the X and Y-values
    X = []
    Y = []
    
        #  Calculate what the maximum q-value is for the next step (to be able to update the q-value), as long as the episode is not done
    for index, (observation, action, reward, new_observation, done) in enumerate(mini_batch):
        if not done:
            max_future_q = reward + discount_factor * np.max(future_qs_list[index])
        else:
            max_future_q = reward
            
        # We update the q-value, using the bellman-equation
        current_qs = current_qs_list[index]
        current_qs[action] = (1 - learning_rate) * current_qs[action] + learning_rate * max_future_q
        
        # Store the current state in the X-vector, to be used as predictor
        X.append(observation)
        # Store the updated q-value in the Y-vector to be used as the outcome
        Y.append(current_qs)
    
    ## Finally, train the model
    model.fit(np.array(X), np.array(Y), batch_size=batch_size, verbose=0, shuffle=True)

In [None]:
X = []
y = []
Steps = []

# We will need to count how many steps has passed to be ably to update the main and target model 
steps_to_update_target_model = 0

# We will train our agent for a set number of episodes
for episode in range(train_episodes):
    # Set the number of steps (and rewards) for the episode at 0 to start (we will add 1 after each step)
    n_steps_episode = 0
    total_training_rewards = 0
    # Set the state ('observation') at the beginning of a random episode
    observation = env.reset()
    # We start with a game that is not done
    done = False
    # As long as the episode is not done, we will keep playing the game
    while not done:
        # for each step, add 1 to the counter keeping track when to update the models
        steps_to_update_target_model += 1
        # Because of this code, you can see the game in a seperate frame
       # if True:
       #     env.render()
        
        # Choose a random number to act out the epsilon greedy stragety
        random_number = np.random.rand()
        if random_number <= epsilon:
            # Explore (use random action) when the number is smaller than epsilon
            action = env.action_space.sample()
            
        else:
            # Or exploit (choose the best known action) when it is larger
            
                # Transform the state into the right format for the deep learning model
            observation_reshaped = np.array([observation],order = 'C')
            observation_reshaped.resize(1,84,84,1)
            observation_reshaped.shape
            #observation_reshaped = observation.reshape([1, observation.shape[0]])
                # predict the q-values of all possible actions using the main model
            print("Model predicted: ")
            print(model.predict(observation_reshaped))
            predicted = model.predict(observation_reshaped).flatten()
                # choose the action based on the predicted q-values
            action = np.argmax(predicted)
                # Store the results of this action
        new_observation, reward, done, info = env.step(action)
        total_training_rewards += reward
                # Add the step to the replay memory (from which will be sampled to train the model)
        replay_memory.append([observation, action, reward, new_observation, done])

        # Update the Main Network using the Bellman Equation -> here we call the trainingsfunction we defined above
            # We only update our model every 4 steps
        if steps_to_update_target_model % 4 == 0 or done:
            train(env, replay_memory, model, target_model, done)
        
        # Set the new state to the currect state to be able to perform the next step
        observation = new_observation
        
        # Count the step you just took
        n_steps_episode += 1
        
        # At the end of the episode: print the number of steps for that episode and store it in the Steps-vector
        if done:
            print('{} Total training rewards: {} after n steps = {}'.format(episode, total_training_rewards, n_steps_episode))
            Steps.append(n_steps_episode)

        # If 100 steps have passed, copy the network weights of the main model to the target model
            if steps_to_update_target_model >= 100:
                target_model.set_weights(model.get_weights())
                steps_to_update_target_model = 0
            break
        # Adjust epsilon after an episonde is done
    epsilon = min_epsilon + (max_epsilon - min_epsilon) * np.exp(-decay * episode)
env.close()

In [None]:
def plot_res(values, title=''):   
    ''' Plot the reward curve and histogram of results over time.'''
   
    # Define the figure
    f, ax = plt.subplots(nrows=1, ncols=2, figsize=(12,5))
    f.suptitle(title)
    ax[0].plot(values, label='score per run')
    ax[0].axhline(800, c='red',ls='--', label='goal')
    ax[0].set_xlabel('Episodes')
    ax[0].set_ylabel('Reward')
    ax[1].tick_params(axis='x', colors='white')
    ax[1].tick_params(axis='y', colors='white')
    x = range(len(values))
    ax[0].legend()
    # Calculate the trend
    try:
        z = np.polyfit(x, values, 1)
        p = np.poly1d(z)
        ax[0].plot(x,p(x),"--", label='trend')
    except:
        print('')
    
    # Plot the histogram of results
    ax[1].hist(values[-50:])
    ax[1].axvline(200, c='red', label='goal')
    ax[1].set_xlabel('Scores per Last 50 Episodes')
    ax[1].set_ylabel('Frequency')
    ax[1].tick_params(axis='x', colors='white')
    ax[1].tick_params(axis='y', colors='white')
    ax[1].legend()
    plt.show()

In [None]:
plot_res(Steps,'Random actions')

In [None]:
#import joblib
#model.save('assault_deepQ.h5')  # creates a HDF5 file 'my_model.h5'

In [None]:
epochs, rewards = 0, 0
observation = env.reset()
done = False

while not done:
    env.render()
    #observation_reshaped = observation.reshape([1, observation.shape[0]])
    observation_reshaped = np.array([observation],order = 'C')
    observation_reshaped.resize(1,84,84,1)
    observation_reshaped.shape
    predicted = model.predict(observation_reshaped).flatten()
    action = np.argmax(predicted)
    state, reward, done, info = env.step(action)
    rewards  += reward
    epochs += 1
env.close()
print(f"Reward: {rewards}")  

In [None]:
Steps = []
for episode in range(test_episodes):
        n_steps_episode = 0
        total_training_rewards = 0
        observation = env.reset()
        done = False
        while not done:
            #observation_reshaped = observation.reshape([1, observation.shape[0]])
            observation_reshaped = np.array([observation],order = 'C')
            observation_reshaped.resize(1,84,84,1)
            observation_reshaped.shape
            predicted = model.predict(observation_reshaped).flatten()
            action = np.argmax(predicted)
            new_observation, reward, done, info = env.step(action)
            total_training_rewards += reward  
            observation = new_observation
            
            n_steps_episode += 1
        print('{} Total steps = {}'.format(episode,n_steps_episode))
        Steps.append(n_steps_episode)