In [28]:
import gym
from gym import wrappers
import numpy as np
import _pickle as pickle

In [40]:
env = gym.make('MsPacman-v0')
# Define hyperparameters
learning_rate = 0.00025
gamma = 0.99
epsilon = 0.01
decay_rate = 0.99
explotation_rate = 0.8
batch_size = 5

prev_x = None
resume = False 

# Initialize weights and RMSprop variables
input_size = env.observation_space.shape[0]
hidden_size = 500
output_size = env.action_space.n
D = 185 * 95 # input size 

if resume:
    model = pickle.load(open('save.p', 'rb'))
else:
    model = {}
    model['W1'] = np.random.randn(hidden_size, D) / np.sqrt(D)
    model['W2'] = np.random.randn(hidden_size, output_size) / np.sqrt(hidden_size)


r_w1 = np.zeros_like(model['W1'])
r_w2 = np.zeros_like(model['W2'])

grad_buffer = { k : np.zeros_like(v) for k,v in model.items() } # update buffers that add up gradients over a batch
rmsprop_cache = { k : np.zeros_like(v) for k,v in model.items() }

In [41]:
# Define helper function to preprocess observations
def preprocess(observation):
    if type(observation) is tuple: 
        o = observation[0]
    else:
        o = observation
    processed_observation = np.mean(o[15:200, 30:125], axis=2)
    processed_observation[processed_observation == 144] = 0
    processed_observation[processed_observation == 109] = 0
    processed_observation[processed_observation != 0] = 1
    return processed_observation.astype(np.float).ravel()

# Define function to compute forward pass of neural network
def forward(observation, w1, w2):
    h = np.dot(w1, observation)
    h[h<0] = 0
    y = np.dot(h, w2)
    return y, h

In [42]:
def discount_rewards(r):

    discounted_r = np.zeros_like(r)
    running_add = 0
    for t in reversed(range(0, r.size)): # xrange is no longer supported in Python 3, replace with range
        
        running_add = running_add * gamma + r[t]
        discounted_r[t] = running_add
    return discounted_r

In [43]:
def policy_backward(eph, epx, epdlogp):
    dW2 = np.dot(eph.T, epdlogp).ravel()
    dh = np.outer(epdlogp, model['W2'])
    dh[eph <= 0] = 0 # backpro prelu
    dW1 = np.dot(dh.T, epx)
    return {'W1':dW1, 'W2':dW2}

In [None]:
for episode in range(1, 101):
    observation = env.reset()
    done = False
    reward_sum = 0
    w1 = model['W1']
    w2 = model['W2']     
    while not done:
        x = preprocess(observation)
        y, h = forward(x, model['W1'], model['W2'])
        
        a = np.argmax(y) 
        if np.random.uniform() < explotation_rate:
            a = (np.argmax(y) + np.random.randint(1,9)) % 9 
        else:
            a = np.argmax(y)
            
        observation, reward, terminated, turncated, info = env.step(a)
        done = terminated or turncated
        
        reward_sum += reward
        y_target = np.zeros(output_size)
        y_target[a] = reward + gamma * np.max(forward(preprocess(observation), model['W1'], model['W2'])[0])
            
        delta3 = y - y_target
        delta2 = np.dot(delta3, model['W2'].T)
        delta2[h<=0] = 0
        dw2 = np.outer(h, delta3)
        dw1 = np.outer(x, delta2)
        r_w2 = decay_rate * r_w2 + (1 - decay_rate) * dw2**2
        r_w1 = decay_rate * r_w1 + (1 - decay_rate) * np.multiply(dw1.T, dw1.T)
        w2 -= learning_rate * dw2 / (np.sqrt(r_w2) + epsilon)
        w1 -= learning_rate * dw1.T / (np.sqrt(r_w1) + epsilon)
        
        grad = {'W1': w1, 'W2': w2}
        for k in model: grad_buffer[k] += grad[k]
            
    print('Episode %d - Total Reward: %d' % (episode, reward_sum))
    if episode % batch_size == 0:
        for k,v in model.items():
            g = grad_buffer[k] # gradient
            rmsprop_cache[k] = decay_rate * rmsprop_cache[k] + (1 - decay_rate) * g**2
            model[k] += learning_rate * g / (np.sqrt(rmsprop_cache[k]) + 1e-5)
            grad_buffer[k] = np.zeros_like(v) # reset batch gradient buffer
        fileName = f'save{episode}.p'
        pickle.dump(model, open(fileName, 'wb'))

env.close()
pickle.dump(model, open('save.p', 'wb'))

In [46]:
def policy(obs):
    x = preprocess(obs)
    y, h = forward(x, model['W1'], model['W2'])
    a = np.argmax(y)
    return a

In [None]:
env = gym.make('MsPacman-v0', render_mode = 'human')
observation = env.reset()
done = False
while not done:
    a = policy(observation)
    observation, reward, terminated, turncated, info = env.step(a)
    reward_sum += reward
    done = terminated or turncated
    
print('Total reward: ', reward_sum)
env.close()

  logger.warn(
Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  return processed_observation.astype(np.float).ravel()
