This notebook shows how to get BizHawk to execute Rapidly-exploring Random Tree (RRT). Idealy there is no need to change the "action.lua" script. However, the rom_path variable needs to be changed to the path to your rom file.

In [3]:
%pylab inline
import os
import subprocess
import shutil
from keras.applications.inception_v3 import InceptionV3

Populating the interactive namespace from numpy and matplotlib


In [30]:
%%time

#Function for saving image and getting embedding
def img_lua():
    img_path = 'temp.png'
    proc.stdin.write(b'client.screenshot("' + bytes(img_path, 'utf-8') + b'");')
    proc.stdin.write(b'io.stdout:write("continue");')
    proc.stdin.flush()
    while proc.stdout.readline() != b'continue\n':
        pass
    
    temp_img = np.expand_dims(imread(img_path), axis=0)
    return embedded_model.predict(temp_img)[0]

#RRT function
def explore_with_rrt(initial_state, successor, get_goal,
                     projection, max_samples=1000):
    edges = []
    states = [initial_state]
    projections = [projection(initial_state)]
    available_actions = [set(np.arange(ACTION_NUM))]
    
    for i in range(max_samples):
        goal = get_goal()
        min_index = 0
        min_value = np.linalg.norm(projections[0] - goal)
        for j in range(len(states)):
            temp_projection = np.linalg.norm(projections[j] - goal)
            if temp_projection <= min_value:
                min_index = j
                min_value = temp_projection
        chosen_index = min_index
        
        chosen_state = states[chosen_index]
        selected_action, successor_state = successor(chosen_state, goal,
                                                     available_actions[chosen_state])
        
        available_actions.append(set(np.arange(ACTION_NUM)))
        chosen_projection = projections[chosen_index]
        successor_projection = projection(successor_state)
        states.append(successor_state)
        projections.append(successor_projection)
        edges.append((goal, chosen_state, selected_action,
                      successor_state, chosen_projection,
                      successor_projection))
        
    return edges
        
#Project states to embeddings
def state_projection(state):
    return embeddings[state]

#Select a random action, do it for 30 frames and return the successor state
def get_random_successor(state, goal, available_action):
    action_name = ['Up', 'Down', 'Left', 'Right', 'Select',
                   'Start', 'B', 'A', 'X', 'Y', 'L', 'R']
    
    #Select a random action based on button distribution
    temp_action = np.random.choice(ACTION_NUM, p=distribution)
    while temp_action not in available_action:
        temp_action = np.random.choice(ACTION_NUM, p=distribution)
    action = '{0:b}'.format(temp_action).rjust(12, '0')[::-1]
    
    next_state = len(embeddings)
    
    #Do the selected action
    for i in range(FRAMES_PER_STEP + 1):
        action_code = b''
        action_code += b'buttons = {};'
        
        for j, name in enumerate(action_name):
            if action[j] == '1':
                action_code += b'buttons["' + str.encode(name) + b'"] = 1;'
                
        action_code += b'joypad.set(buttons, 1);'
        action_code += b'emu.frameadvance();'
        
        if i == 0:
            proc.stdin.write(b'savestate.load("' + str.encode(paths[state]) + b'");')
            proc.stdin.flush()
            proc.stdin.write(action_code)
            proc.stdin.flush()

        elif i == FRAMES_PER_STEP:
            embeddings.append(img_lua())
            paths.append(state_path + str(len(embeddings)) + '.State')
            proc.stdin.write(b'savestate.save("' + str.encode(paths[-1]) + b'");')

        else:
            proc.stdin.write(b'joypad.set(buttons, 1);')
            proc.stdin.write(b'emu.frameadvance();')
            proc.stdin.flush()

    return (temp_action, next_state)

#Initialize a random goal point 
def random_goal():
    goal = np.random.uniform(0, 1, 1000)
    return goal
        
        
FRAMES_PER_STEP = 30
ACTION_NUM = 4096
preparation = False
embedded_model = InceptionV3() #It might take a while to run this line of code
distribution=np.load('SNES_distribution.npy')

Wall time: 504 µs


In [33]:
%%time

bizhawk_path = 'BizHawk-2.2.2/'
rom_path = '' #Path to the .smc rom file
state_path = 'states/'
steps = 200
embeddings = []
paths = []

if not os.path.exists(state_path):
    os.mkdir(state_path)

#Start BizHawk
proc = subprocess.Popen([bizhawk_path + 'EmuHawk.exe',
                         rom_path,
                         '--lua=../action.lua'],
                         stdout=subprocess.PIPE,
                         stdin=subprocess.PIPE)

while True:
    out_line = proc.stdout.readline()
    
    if out_line[:5] == b'start':
        preparation = True
        
    if preparation:
        proc.stdin.write(b'client.speedmode(400);')
        proc.stdin.flush()
        
        #Save the initial state
        embeddings.append(img_lua())
        paths.append(state_path + str(len(embeddings)) + '.State')
        proc.stdin.write(b'savestate.save("' + str.encode(paths[-1]) + b'");')
        proc.stdin.flush()

        #Run RRT
        thelist = explore_with_rrt(0, get_random_successor,
                                   random_goal, state_projection,
                                   max_samples=steps)
        break

    #to terminate program after closing EmuHawk
    if out_line == b'':
        break

proc.terminate()
os.remove('temp.png')
shutil.rmtree('states')