<a href="https://colab.research.google.com/github/hrumst/ML/blob/master/dqn_pong_v3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
import gym
import random

UP_ACTION = 2
DOWN_ACTION = 3

env = gym.make("Pong-v0")
next_state = env.reset()

In [0]:
!pip install pyvirtualdisplay > /dev/null 2>&1
!apt-get install -y xvfb python-opengl ffmpeg > /dev/null 2>&1

from pyvirtualdisplay import Display
display = Display(visible=0, size=(1400, 900))
display.start()

In [0]:
from gym.wrappers import Monitor
env_sim = Monitor(env, './video', force=True)

_ = env_sim.reset()
for i in range(500):
    action = random.randint(UP_ACTION, DOWN_ACTION)
    next_state, reward, done, info = env_sim.step(action)
    if done:
        env_sim.reset()

env_sim.close()

In [0]:
def show_video():
    import glob
    import io
    import base64
    from IPython.display import HTML
    from IPython import display as ipythondisplay

    mp4list = glob.glob('video/*.mp4')
    if len(mp4list) > 0:
        mp4 = mp4list[0]
        video = io.open(mp4, 'r+b').read()
        encoded = base64.b64encode(video)
        ipythondisplay.display(HTML(data='''<video alt="test" autoplay 
            loop controls style="height: 400px;">
            <source src="data:video/mp4;base64,{0}" type="video/mp4" />
            </video>'''.format(encoded.decode('ascii'))))
    else: 
        print("Could not find video")
    
show_video()

In [0]:
# import necessary modules from keras
from keras.layers import Dense
from keras.models import Sequential

# creates a generic neural network architecture
model = Sequential()

# hidden layer takes a pre-processed frame as input, and has 200 units
model.add(Dense(units=200,input_dim=80*80, activation='relu', kernel_initializer='glorot_uniform'))

# output layer
model.add(Dense(units=1, activation='sigmoid', kernel_initializer='RandomNormal'))

# compile the model using traditional Machine Learning losses and optimizers
model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])

In [0]:
import numpy as np

# preprocessing used by Karpathy (cf. https://gist.github.com/karpathy/a4166c7fe253700972fcbc77e4ea32c5)
def prepro(img):
    """ prepro 210x160x3 uint8 frame into 6400 (80x80) 1D float vector """
    img = img[35:195] # crop
    img = img[::2,::2,0] # downsample by factor of 2
    img[img == 144] = 0 # erase background (background type 1)
    img[img == 109] = 0 # erase background (background type 2)
    img[img != 0] = 1 # everything else (paddles, ball) just set to 1
    return img.astype(np.float).ravel()

# reward discount used by Karpathy (cf. https://gist.github.com/karpathy/a4166c7fe253700972fcbc77e4ea32c5)
def discount_rewards(r, gamma):
    """ take 1D float array of rewards and compute discounted reward """
    r = np.array(r)
    discounted_r = np.zeros_like(r)
    running_add = 0
    # we go from last reward to first one so we don't have to do exponentiations
    for t in reversed(range(0, r.size)):
        if r[t] != 0: running_add = 0 # if the game ended (in Pong), reset the reward sum
        running_add = running_add * gamma + r[t] # the point here is to use Horner's method to compute those rewards efficiently
        discounted_r[t] = running_add
    discounted_r -= np.mean(discounted_r) #normalizing the result
    discounted_r /= np.std(discounted_r) #idem
    return discounted_r

In [0]:
next_state = env.reset()

x_train = []
y_train = []
rewards = []
total_reward = 0

episode_nb = 0
prev_input = None
cur_input = None

gamma = .9

for i in range(50000):
    cur_input = prepro(next_state)
    x = cur_input - prev_input if prev_input is not None else np.zeros(80 * 80)
    prev_input = cur_input

    proba = model.predict(np.expand_dims(x, axis=1).T)
    action = UP_ACTION if np.random.uniform() < proba else DOWN_ACTION
    y = 1 if action == UP_ACTION else 0 # 0 and 1 are our labels

    # log the input and label to train later
    x_train.append(x)
    y_train.append(y)

    next_state, reward, done, info = env.step(action)
    rewards.append(reward)
    total_reward += reward

    if done:
        print('At the end of episode', episode_nb, 'the total reward was :', total_reward)
        # increment episode number
        episode_nb += 1
        # training
        model.fit(x=np.vstack(x_train), y=np.vstack(y_train), verbose=1, sample_weight=discount_rewards(rewards, gamma))

        x_train, y_train, rewards = [], [], []
        total_reward = 0
        prev_input = None
        next_state = env.reset()

env.close()