In [1]:
import retro
import tensorflow as tf
from tensorflow import keras
from keras import backend as K

import numpy as np
import cv2

import matplotlib.pylab as plt
from matplotlib.gridspec import GridSpec
from matplotlib.backends.backend_agg import FigureCanvasAgg as FigureCanvas
from matplotlib.figure import Figure

from IPython.display import clear_output
from PIL import Image

In [2]:
#The agent was a double DQN. The two networks are:
#1. Behavior - Makes action predictions and is updated every 16 frames
#2. Target - Makes predictions on the future return

#Point the string to the directory of the target network!
target = keras.models.load_model('210612_1_SMB_DQN_target')

In [3]:
#Repeats actions frameskips number of times. Stacks the same number of images. 
#The stacked array of images is the final input for the DQN.
def RGB_preprocess(action, env, frameskips):
    state_output = []
    reward = 0
    for i in range(frameskips):
        s, r, d, info = env.step(action)
        #Accumulate any rewards given during action repeats.
        reward += r
        #Grayscale the images.
        s = cv2.cvtColor(s, cv2.COLOR_RGB2GRAY)
        #Resize the images to a square 84x84.
        s = cv2.resize(s, (84, 84), interpolation=cv2.INTER_AREA)
        #Scale pixel values to a maximum of 1 by dividing by 255.
        state_output.append(s/255.0)
    return np.dstack(state_output).astype('float16'), reward, d, info

#The video is a stack of matplotlib figure frames. Each figure frame is created through the diag_output function.
#This function is tailored to my specific DQN. You will have to modify this function if your DQN is of a different architecture.
def diag_output(s, peeled_layers, scalar_layers, episode_return):
    
    #Create the matplotlib figure. I find GridSpec to be the easiest way to customize the layout
    # of my plots.
    
    fig = plt.figure(figsize=(25, 20))
    fig.patch.set_facecolor('black')
    gs = GridSpec(nrows=6, ncols=4)

    #Raw observation
    ax0 = fig.add_subplot(gs[0:4, 0:2]) 
    ax0.axis('off')
    ax0.imshow(s)
    ax0.set_title('Raw observation', color='white', fontsize=25)

    #Preprocessing - 4-frame stack input for the NN
    ax1 = fig.add_subplot(gs[0,2:])
    ax1.set_xlim(0,84*4)
    ax1.axis('off')
    for i in range(len(peeled_layers[0])):   
        ax1.imshow(peeled_layers[0][i], origin='upper',extent=[i*84,i*84+84, 0, 84])
        ax1.plot([i*84,i*84],[0,84],color='black')
    ax1.set_title('Preprocessing - 84x84, 4-frame stack', color='white', fontsize=25)
    
    #First Conv2D layer
    ax2 = fig.add_subplot(gs[1,2:])
    ax2.axis('off')
    for i in range(len(peeled_layers[1])):   
        ax2.imshow(peeled_layers[1][i], origin='upper',extent=[int(i%8)*20,int(i%8)*20+20, int(i/8)*20, int(i/8)*20+20])
        ax2.plot([int(i%8)*20,int(i%8)*20],[int(i/8)*20,int(i/8)*20+20],color='black')
        #ax2.set_ylim(len(peeled_layers[1]*20))
    ax2.set_title('Conv2D layer - 20x20x32', color='white', fontsize=25)

    #Second Conv2D layer
    ax3 = fig.add_subplot(gs[2,2:])
    ax3.axis('off')
    for i in range(len(peeled_layers[2])):   
        ax3.imshow(peeled_layers[2][i], origin='upper',extent=[int(i%16)*9,int(i%16)*9+9, int(i/16)*9, int(i/16)*9+9])
        ax3.plot([int(i%16)*9,int(i%16)*9],[int(i/16)*9,int(i/16)*9+9],color='black')
    ax3.set_title('Conv2D layer - 9x9x64', color='white', fontsize=25)

    #Third Conv2D layer
    ax4 = fig.add_subplot(gs[3,2:])
    ax4.axis('off')
    for i in range(len(peeled_layers[3])):   
        ax4.imshow(peeled_layers[3][i], origin='upper',extent=[int(i%16)*7,int(i%16)*7+7, int(i/16)*7, int(i/16)*7+7])
        ax4.plot([int(i%16)*7,int(i%16)*7],[int(i/16)*7,int(i/16)*7+7],color='black')
    ax4.set_title('Conv2D layer - 7x7x64', color='white', fontsize=25)

    #Flatten layer
    ax5 = fig.add_subplot(gs[4,2:])
    ax5.axis('off')
    ax5.imshow(np.reshape(scalar_layers[0],(32,98)))
    ax5.set_title('Flatten layer - 1x3136', color='white', fontsize=25)

    #Dense layer
    ax6 = fig.add_subplot(gs[5,2:])
    ax6.axis('off')
    ax6.imshow(np.reshape(scalar_layers[1],(8,64)))
    ax6.set_title('Dense layer - 1x512', color='white', fontsize=25)

    #Output layer of neural network showing predicted value of each actions (Q value)
    ax7 = fig.add_subplot(gs[4:,0])
    ax7.bar(np.arange(len(scalar_layers[2][0])),scalar_layers[2][0], edgecolor='white', color='black', width=0.25, linewidth=2.0)
    ax7.set_ylabel('Q value', fontsize=25, color='white')
    ax7.set_xlabel('Action', fontsize=25, color='white')
    ax7.tick_params(axis='both', labelsize=25, color='white', labelcolor='white')
    ax7.set_facecolor('black')
    ax7.spines['bottom'].set_color('white')
    ax7.spines['top'].set_color('white')
    ax7.spines['left'].set_color('white')
    ax7.spines['right'].set_color('white')

    #Reward accumulated throughout the episode.
    ax8 = fig.add_subplot(gs[4:,1])
    ax8.plot(episode_return, color='white', linewidth=2.0)
    ax8.set_ylabel('Cumulative Reward', fontsize=25, color='white')
    ax8.set_xlabel('Step', fontsize=25, color='white')
    ax8.tick_params(axis='x', labelsize=25, color='white', labelcolor='white')
    ax8.tick_params(axis='y', which='both', labelleft=False)
    ax8.set_facecolor('black')
    ax8.spines['bottom'].set_color('white')
    ax8.spines['top'].set_color('white')
    ax8.spines['left'].set_color('white')
    ax8.spines['right'].set_color('white') 
    
    #These lines convert the matplotlib figure into a numpy image array. 
    canvas = FigureCanvas(fig)
    canvas.draw()  
    width, height = fig.get_size_inches() * fig.get_dpi()
    image = np.fromstring(canvas.tostring_rgb(), dtype='uint8').reshape(int(height), int(width), 3)
    return image

def visual_evaluate_NN(output_name, env, epsilon, replicates, model, max_steps):
    av_returns = []
    #Unwrap layers to get intermediate outputs.
    inp = model.input
    outputs = [layer.output for layer in model.layers]
    functors = K.function(inp, outputs)
    
    #Create video object. We will write the frames to this object.
    out = cv2.VideoWriter(output_name+'.avi',cv2.VideoWriter_fourcc('M','J','P','G'), 60/frameskips, (1800,1440))

    #You have to reset the environment to the start of the level.
    s = env.reset()
    #Prime the frame stack. I chose the first action. 
    s, episode_return, done, info = RGB_preprocess(action_space[0], env, frameskips)
    episode_return = 0
    return_list = [0]
    done = False
    lives = int(info['lives'])
    #Run through Q-learning algorithm loop until the max number of steps is reached or Mario dies. :-(
    for num_of_steps in range(max_steps):
        if num_of_steps%5==0: 
            clear_output()
            print ('Step: ', num_of_steps)
   
        #Choose epsilon greedy action.
        if np.random.random() < epsilon:
            a = np.random.choice(np.arange(len(action_space)))
        else: 
            a_probs = model(np.expand_dims(s,0), training=False)
            a = tf.argmax(a_probs[0]).numpy()
            
        #Collect information on next state.
        s, reward, done, info = RGB_preprocess(action_space[a], env, frameskips)
        raw_obs = env.get_screen()[:, :, [0, 1, 2]]

        episode_return += reward
        return_list.append(episode_return)
        
        #Run the state through the functors for intermediate output plots.
        layer_outs = functors([np.expand_dims(s,0), 1.])
        peeled_layers = []
        scalar_layers = layer_outs[4:]
        images = []
        #The first 4 layers of the DQN are images. So, we need to unravel the values into individual features. 
        #The other layers have 1D arrays as outputs. Thus, there is no specific way to display this values in our plots.
        for i in layer_outs[0:4]:
            images = [i[0][:,:,j] for j in range(len(i[0][0][0]))]
            peeled_layers.append(images) 
            
        #Create the image from the output of each layer using diag_output
        image = diag_output(raw_obs, peeled_layers, scalar_layers, return_list)
        
        #CV2 arranges the channels as BGR. We need to write the matplotlib figure as RGB by rearranging the channels.
        out.write(image[:, :, [2, 1, 0]])
        plt.close()
        #If a life was lost, end the episode. This will result in a video of a single Mario life rather than all available lives.
        if not (int(info['lives']) == lives):                                         
            lives = int(info['lives'])
            s, reward, done, info = RGB_preprocess(action_space[0], env, frameskips)
            av_returns.append(episode_return)  
            episode_return = 0 
            break
        if done:
            av_returns.append(episode_return)   
            break   
    #Release the video object. 
    out.release()
    clear_output()
    print ('Evaluation complete.')

In [4]:
#Create a new environment. 
#env.close()
env = retro.make(game='SuperMarioBros-Nes')

#Actions are repeated over 4 frames. These 4 frames are used as input for the next action value prediction.
frameskips = 4
epsilon = 0.02

#Only 4 actions are available to the agent. Move left, move right, jump left, and jump right. 
action_space = [[0,0,0,0,0,0,1,0,0],
                [0,0,0,0,0,0,0,1,0],
                [0,0,0,0,0,0,1,0,1],
                [0,0,0,0,0,0,0,1,1]]

#Run the visual_evaluate_NN function. 
visual_evaluate_NN('test', env, epsilon, 1, target, 10000)

#You can only have one retro env open at a time. Make sure to close any previously opened envs. 
env.close()

Evaluation complete.
