In [1]:
import pickle
import numpy as np
import matplotlib.pyplot as plt
from IPython.display import clear_output

import gym_super_mario_bros
from gym_super_mario_bros.actions import SIMPLE_MOVEMENT, COMPLEX_MOVEMENT, RIGHT_ONLY
from gym import wrappers
from nes_py.wrappers import BinarySpaceToDiscreteSpaceEnv


import keras
from keras.models import Sequential, Model
from keras.layers import Input, Conv2D, Dense, Flatten, MaxPooling1D, SeparableConv2D, Activation, Lambda
from keras.layers import AveragePooling2D, MaxPooling2D, LSTM, Concatenate, Reshape, GRU, BatchNormalization
from keras.initializers import Constant
from keras.constraints import MaxNorm
from keras.applications.xception import Xception
from keras.applications.mobilenet_v2 import MobileNetV2

version = 0
movement_type = SIMPLE_MOVEMENT

def make_env(version, movement_type):
    env = gym_super_mario_bros.make('SuperMarioBros-v' + str(version))
    env = BinarySpaceToDiscreteSpaceEnv(env, movement_type)
    return env

env = make_env(version, movement_type)
obs_shape = env.observation_space.shape
square_shape = (16,16)
strides = int(square_shape[0]/2)
action_dim = len(env.get_action_meanings())
buffer = 10

def get_mario_model(obs_shape = obs_shape, square_shape = square_shape, strides = strides, action_dim = action_dim, hidden_size = 10):
    
    state = Input(batch_shape = (1,240,256,3))
    encoded = Lambda(lambda x: x/255.0, output_shape=None, mask=None, arguments=None)(state)
    encoded = Conv2D(kernel_size = (8,8), filters=3, strides = 8, padding = 'same')(encoded)
    encoded = Activation(activation='relu')(encoded)
    encoded = Conv2D(kernel_size = (4,4), filters=12, strides = 2, padding = 'same')(encoded)
    encoded = Activation(activation='relu')(encoded)
    encoded = Conv2D(kernel_size = (2,2), filters=24, strides = 2, padding = 'same')(encoded)
    encoded = Activation(activation='relu')(encoded)
    encoded = Conv2D(kernel_size = (2,2), filters=48, strides = 2, padding = 'same')(encoded)
    encoded = Activation(activation='relu')(encoded)
    encoded = Flatten()(encoded)
    encoded = Dense(20)(encoded)
    
    action = Input(batch_shape = (1,action_dim))
    concat = Concatenate(axis = 1)([action, encoded])
    
    concat = Dense(1,  activation = 'linear')(concat)
    
    model = Model(inputs = [state, action], outputs = concat)
    model.compile(optimizer = 'adam',loss = 'mae')

    return model

agent = get_mario_model()

Using TensorFlow backend.


Instructions for updating:
Colocations handled automatically by placer.


In [2]:
agent.summary()

__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            (1, 240, 256, 3)     0                                            
__________________________________________________________________________________________________
lambda_1 (Lambda)               (1, 240, 256, 3)     0           input_1[0][0]                    
__________________________________________________________________________________________________
conv2d_1 (Conv2D)               (1, 30, 32, 3)       579         lambda_1[0][0]                   
__________________________________________________________________________________________________
activation_1 (Activation)       (1, 30, 32, 3)       0           conv2d_1[0][0]                   
__________________________________________________________________________________________________
conv2d_2 (

In [3]:
import random

def info_reward(info):
    reward = info['x_pos']
    reward += info['score']/100
    if info['status'] == 'tall':
        reward += 10
    elif info['status'] == 'fireball':
        reward += 20
    reward += info['coins']
    reward += 15*info['life']
    reward += 1000*(info['stage']-1)
    reward += 5000*(info['world']-1)
    return reward

def best_action_vec(agent, state, action_dim = action_dim, return_rewards = False):  
    actions = np.eye(7)
    rewards = []
    for action in actions:
        action = action.reshape((1,action_dim))
        rewards.append(agent.predict([state, action]))
    if return_rewards:
        return rewards
    return actions[np.argmax(rewards)].reshape((1,action_dim))

def get_action_vec(agent, state, action_dim = action_dim, explore = 0.1):
    if random.random() > 0.1:
        return best_action_vec(agent, state, action_dim)
    else:
        action = random.randint(1,action_dim) - 1
        return np.eye(action_dim)[action].reshape((1, action_dim))

In [4]:
agent = get_mario_model()
agent.reset_states()

In [6]:
import cv2

env = make_env(version, movement_type)
env = wrappers.Monitor(env, "./gym-results", force=True)

frames = []
reward_hist = []

max_frames = 5000
max_rest = 100

buffer = 20
life = 2
fitness = 0
done = True
x_pos = -1
resting = 0
score = 0
action = 0
reward = 0
action_vec = np.ones((1,1,len(env.get_keys_to_action())))
action = 0
prev_reward = 0
prev_state = np.zeros((1,240,256,3))
frames = []
agg_reward = 0
for step in range(max_frames):
    if done:
        state = env.reset()
    if step%buffer == 0:
        np_state = np.array(state).reshape((1,240,256,3))
        frames.append(np_state)
        action_vec = get_action_vec(agent, np_state, explore = .0)
        action = np.argmax(action_vec)
    state, reward, done, info = env.step(action)
    agg_reward += reward
    if step%buffer == 0:
        agg_reward = np.array(agg_reward).reshape((1,1))
        agent.train_on_batch([np_state, action_vec], agg_reward)
        agg_reward = 0
    env.render()
    
    if False:
        if abs(info['x_pos'] - x_pos) < 1:
            resting += 1
        else:
            x_pos = info['x_pos']
            resting = 0
        if resting > max_rest:
            break
        if life != info['life']:
            break
env.close()

In [None]:
action_vec = best_action_vec(agent, np_state)

agent.predict([np_state, action_vec])

In [None]:
best_action_vec(agent, np_state, return_rewards = True)

In [None]:
env.close()

In [None]:
agent.train_on_batch(np_state, opposite)

In [None]:
agent.predict(np_state)

In [None]:
opposite

In [None]:
np.shape(gameplay)

In [None]:
for frame in frames:
    plt.imshow(frame[0])
    plt.show()
    clear_output(wait=True)

In [None]:
env.close()

In [None]:
dir(env)

In [None]:
dir(env.env)