In [72]:
#!/usr/bin/env python
from __future__ import print_function

import argparse
import skimage as skimage
from skimage import transform, color, exposure
from skimage.transform import rotate
from skimage.viewer import ImageViewer

import sys
import os
work_space = os.path.dirname(os.getcwd())
sys.path.append(work_space+"/game/")
import wrapped_flappy_bird as game
import random
import numpy as np
from collections import deque
import json
import os

from bigdl.optim.optimizer import *
from bigdl.nn.criterion import *
from zoo.pipeline.api.keras.models import Sequential
from zoo.pipeline.api.keras.layers import Dense, Dropout, Activation,Flatten,Convolution2D
import tensorflow as tf

In [73]:
def to_RDD(X, y):
    return sc.parallelize(X).zip(sc.parallelize(y)).map(
            lambda x: Sample.from_ndarray(x[0],x[1]))

In [74]:
def normalize(advantages, smallEps=1e-8):
    return (advantages - advantages.mean())/(advantages.std() + smallEps)

In [75]:
ACTION_PER_FRAME = 1
IMAGE_ROWS, IMAGE_COLS = 80, 80
IMAGE_CHANNELS = 4
ACTION_SIZE = 2
LEARNING_RATE = 1e-4
INITIAL_EPSILON = 0.1
FINAL_EPSILON = 0.0001
EXPLORE = 3000000
BATCH_SIZE = 2 # every how many eposides do a parameter update
GAMMA = 0.99
r_reward_moving_average = 0

In [76]:
# the abstraction of birdagent
class BirdAgent:
    def __init__(self, state_size, action_size, gamma=0.95):
        self.state_size = state_size
        self.action_size = action_size
        self.memory = deque(maxlen=2000)
        self.gamma = gamma
        self.model = self._build_mode()

    def _build_mode(self):
        print("Now we build the model")
        model = Sequential()
        model.add(Convolution2D(32, 8, 8, subsample=(4, 4), border_mode='same',
                                input_shape=(IMAGE_ROWS, IMAGE_COLS, IMAGE_CHANNELS)))  # 80*80*4
        model.add(Activation('relu'))
        model.add(Convolution2D(64, 4, 4, subsample=(2, 2), border_mode='same'))
        model.add(Activation('relu'))
        model.add(Convolution2D(64, 3, 3, subsample=(1, 1), border_mode='same'))
        model.add(Activation('relu'))
        model.add(Flatten())
        model.add(Dense(512))
        model.add(Activation('relu'))
        model.add(Dense(2))
        # get the 1 * 2 output represent each action's probability
        model.add(Activation('softmax'))
        return model

    # sample the action from the predict of the model
    def action_sampler(self, out):
        # return 1 if result > np.random.random() else 0
        return np.random.choice([0, 1], p=out)

    def act(self, state, epsilon = 0.01):
        explore = False
        random_num = random.random()
        action = np.zeros(2)
        if random_num <= epsilon:
            explore = True
            # ramdomly select an action
            action_index = random.randrange(ACTION_SIZE)
            action[action_index] = 1
            print("*********** Random Action *********** : ", action_index)
            return action,action,explore

        else:
            if isinstance(state, np.ndarray):
                features = to_sample_rdd(state, np.zeros([state.shape[0]]))
            out = self.model.predict(features)
            #print("out type",out)
            out = out.collect()
            
            if self.action_sampler(np.squeeze(out)) == 0:
                action[0] = 1
            else:
                action[1] = 1
            return np.squeeze(out),action,explore

In [77]:
def to_sample_rdd(x, y, numSlices=None):
    """
    Conver x and y into RDD[Sample]
    :param x: ndarray and the first dimension should be batch
    :param y: ndarray and the first dimension should be batch
    :param numSlices:
    :return:
    """
    sc = get_spark_context()
    from bigdl.util.common import Sample
    x_rdd = sc.parallelize(x, numSlices)
    y_rdd = sc.parallelize(y, numSlices)
    return x_rdd.zip(y_rdd).map(lambda item: Sample.from_ndarray(item[0], item[1]))


In [78]:
# the abstraction of rollouts
class RollOuts():
    def __init__(self, next_states, actions, rewards, logprobs, gradients, total_steps):
        self.next_states = next_states
        self.actions = actions
        self.rewards = rewards
        self.steps = total_steps
        self.logprobs = logprobs
        self.gradients = gradients
        self.total_rewards = np.sum(rewards)


    def get_summary(self):
        return {" total_reward ": self.total_rewards,
                " total_steps ": self.steps}


    def prepare_target(self):
        result = []
        for action, adv in list(zip(self.actions, self.advs)):
            adv = adv * action
            print('action and advantages : ',action, adv)
            result.append(adv)
        return np.array(result)

In [79]:
# the abstraction of experience memory
class ExperienceStroe:
    def __init__(self):
        self.rollouts = []

    def add_rollout(self, next_states, actions, rewards,logprobs, gradients, total_steps):
        self.rollouts.append(RollOuts(next_states, actions, rewards, logprobs, gradients,total_steps))

    def num_experiences(self):
        return len(self.rollouts)

    def get_range(self, start, end):
        return self.rollouts[start:end]

    def reset(self):
        self.rollouts = []

In [80]:
def stats_summary(rollouts, records, verbose=True):
    rollout_rewards = np.array([rollout.total_rewards for rollout in rollouts])
    print("reward mean %s" % (rollout_rewards.mean()))
    print("reward std %s" % (rollout_rewards.std()))
    print("reward max %s" % (rollout_rewards.max()))
    records.append([rollout_rewards.mean(), rollout_rewards.std()])

In [81]:
# calculate running reward
def cal_reward(rollouts, gamma, discounted=False):
    for rollout in rollouts:
        rewards = rollout.rewards
        r_reward = []
        running_reward = 0
        for reward in rewards[::-1]:
            # discounted reward
            # reward = (-1) * reward
            if discounted == True:
                running_reward = gamma * running_reward + reward
            else:
                running_reward = running_reward + reward
            r_reward.append(running_reward)
        rollout.r_rewards = np.squeeze(np.array(np.vstack(r_reward[::-1])))


In [82]:
def cal_advantage_moving_average(rollouts):
    r_reward_moving_average = 0
    step = 0
    for rollout in rollouts:
        advs = np.zeros([rollout.rewards.shape[0]])
        i = 0
        for r_reward in rollout.r_rewards:
            # calculate moving average
            r_reward_moving_average = (1-0.9) * r_reward_moving_average + 0.1 * r_reward
            # correct the bias
            print ("lalla",1-np.power(0.9, step))
            advs[i] = r_reward - r_reward_moving_average
            i += 1
            step += 1
            rollout.advs = advs       

In [83]:
# calculate the advantage = reward - expected reward at this time step
def cal_advantage(rollouts):
    max_steps = max(rollout.rewards.shape[0] for rollout in rollouts)
    for rollout in rollouts:
        rollout.r_rewards = np.pad(rollout.r_rewards, (0, max_steps - rollout.r_rewards.shape[0]),'constant')
    baselines = np.mean(np.vstack([rollout.r_rewards for rollout in rollouts]), axis=0)
    for rollout in rollouts:
        rollout.advs = rollout.r_rewards - baselines
        rollout.advs = rollout.advs[:len(rollout.r_rewards)]
        rollout.r_rewards = rollout.r_rewards[:len(rollout.rewards)]

In [84]:
def play_game(agent, render=False):
    # 1. initialize
    # open up a game state to communicate with emulator
    game_state = game.GameState()
    # used to save the actions, should be a 1*2 nparray and sum(action) should be 1
    actions = np.zeros([1,2])
    # used to save the rewards
    rewards = np.array([])
    # get the first state by doing nothing and preprocess the image to 80x80x4
  

    gradients = np.zeros([1, 2])
    # should be (1*2) nparray and each col represent the probability to chose that action
    logprobs = np.array([1, 2])


    do_nothing = np.zeros(ACTION_SIZE)
    do_nothing[0] = 1
    x_t, r_0, terminal = game_state.frame_step(do_nothing)

    # preprocess the first frame
    x_t = skimage.color.rgb2gray(x_t)
    x_t = skimage.transform.resize(x_t, (80, 80))
    x_t = skimage.exposure.rescale_intensity(x_t, out_range=(0, 255))

    # rescale to 0-1
    x_t = x_t / 255.0

    # 80 * 80 * 4
    state = np.stack((x_t, x_t, x_t, x_t), axis=2)
    print (state.shape)
    # In Keras, need to reshape 1 * 80 * 80 * 4
    state = state.reshape(1, state.shape[0], state.shape[1], state.shape[2])  # 1*80*80*4
    # the observations will be batch_size * 80 * 80 * 4
    observations = np.zeros([1, state.shape[1], state.shape[2], state.shape[3]])

    for step in range(1, 500):
        # next state's shape, should be (1*80*80*4)
        
        # get the state list
        observations = np.vstack((observations, state))
        # predict the action

        logprob,action,explore = agent.act(state)
        
        
        actions = np.vstack((actions, action))
        logprobs = np.vstack((logprobs ,logprob))
        gradients = np.vstack((gradients , action.astype('float32') - logprob))
        # use the predicted action to determine the next state
        x_t1_colored, reward, terminal = game_state.frame_step(action)
        print ('reward ',reward)
        # rgb to gray and rescale
        x_t1 = skimage.color.rgb2gray(x_t1_colored)
        x_t1 = skimage.transform.resize(x_t1, (80, 80))
        x_t1 = skimage.exposure.rescale_intensity(x_t1, out_range=(0, 255))
        # rescale to 0-1
        x_t1 = x_t1 / 255
        # update state
        x_t1 = x_t1.reshape(1,x_t1.shape[0],x_t1.shape[1],1)
        state = np.append(x_t1,state[:,:,:,:3],axis=3)
        rewards = np.append(rewards, reward)

        print('living steps : ',observations.shape[0])
        # print('state size ',state.shape)
        # print('action size : ',action.shape)
        # print('action : ',action)
        # print('reward : ',reward)
        # print('terminal : ',terminal)
        if terminal or step == 498:
            break
    return observations[1:], actions[1:], rewards , logprobs[1:], gradients[1:], step


In [85]:
def play_n_games(agent, history, n=4, verbose=True):
    start_eps = history.num_experiences()
    total_step = 0
    for i in range(n):
        observations, actions, rewards, logprobs, gradients, step = play_game(agent=agent)
        history.add_rollout(observations, actions, rewards, logprobs, gradients, step)
        total_step += step
    end_eps = history.num_experiences()
    return start_eps, end_eps, total_step

In [86]:
def learn(agent, rollouts):
    cal_reward(rollouts, agent.gamma)
    cal_advantage_moving_average
    #cal_advantage(rollouts)
    X_batch = np.zeros([1, 80, 80, 4])
    Y_batch = np.array([0,0])
    for rollout in rollouts:
        X_batch = np.vstack((X_batch, rollout.next_states))
        # Y_batch: the label of the training should be [[action,adv]]
        Y_batch = np.vstack((Y_batch, rollout.prepare_target()))
    X_batch = X_batch[1:]
    Y_batch = Y_batch[1:]
    Y_batch[:,1] = normalize(Y_batch[:,1])

    # Y_batch[:, 1] = normalize(y_batch[:, 1])
    # prepare to train the model
    print('X_batch size : ',X_batch.shape)
    print('Y_batch size : ',Y_batch.shape)
    rdd_sample = to_RDD(X_batch, Y_batch)
    batch_size = X_batch.shape[0] - X_batch.shape[0]%8
    print ("using batch_size = ",batch_size)
    
    optimizer = Optimizer(model=agent.model,
                                  training_rdd=rdd_sample,
                                  criterion=PGCriterion(),
                                  optim_method= RMSprop(learningrate=0.005),
                                  end_trigger=MaxIteration(1),
                                  batch_size=batch_size)
    #else:
        #optimizer.set_traindata(training_rdd=rdd_sample, batch_size=batch_size)
        #optimizer.set_criterion(RFPGCriterion())
    agent.model = optimizer.optimize()

In [87]:
import timeit
state_size = (1, 80, 80, 4)
action_size = ACTION_SIZE
agent = BirdAgent(state_size, action_size)
history = ExperienceStroe()
record = []
exe_times = []
history.reset()
train_start = 0
played_steps = 0
start_of_play = timeit.default_timer()
for i in range(100):
    # history is a list of rollouts
    start_eps, end_eps, steps = play_n_games(agent, history, n=BATCH_SIZE)
    played_steps += steps
    train_end = end_eps
    end_of_play = timeit.default_timer()
    print
    "*************************"
    rollouts = history.get_range(start_eps, end_eps)
    stats_summary(rollouts, record)
    if (played_steps > BATCH_SIZE * 34):
        print("used in training: ", start_eps, "-", end_eps)
        print("num of total steps in training ", played_steps)
        start_of_train = timeit.default_timer()
        rollouts = history.get_range(train_start, train_end)
        print("training begin")
        learn(agent, rollouts)
        end_of_train = timeit.default_timer()
        exe_time_game_play = end_of_play - start_of_play
        train_time_game_paly = end_of_train - start_of_train
        exe_times.append([exe_time_game_play, train_time_game_paly])
        
        played_steps = 0
        start_of_play = timeit.default_timer()
        

Now we build the model
creating: createZooKerasSequential
creating: createZooKerasConvolution2D
creating: createZooKerasActivation
creating: createZooKerasConvolution2D
creating: createZooKerasActivation
creating: createZooKerasConvolution2D
creating: createZooKerasActivation
creating: createZooKerasFlatten
creating: createZooKerasDense
creating: createZooKerasActivation
creating: createZooKerasDense
creating: createZooKerasActivation
(80, 80, 4)
reward  0.1
living steps :  2
reward  0.1
living steps :  3
reward  0.1
living steps :  4
reward  0.1
living steps :  5
reward  0.1
living steps :  6
reward  0.1
living steps :  7
reward  0.1
living steps :  8
reward  0.1
living steps :  9
reward  0.1
living steps :  10
reward  0.1
living steps :  11
reward  0.1
living steps :  12
reward  0.1
living steps :  13
reward  0.1
living steps :  14
reward  0.1
living steps :  15
reward  0.1
living steps :  16
reward  0.1
living steps :  17
reward  0.1
living steps :  18
reward  0.1
living steps :  19

using batch_size =  96
creating: createPGCriterion
creating: createRMSprop
creating: createMaxIteration
creating: createDistriOptimizer


Py4JJavaError: An error occurred while calling o21428.optimize.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 985.0 failed 1 times, most recent failure: Lost task 0.0 in stage 985.0 (TID 7051, localhost): java.util.concurrent.ExecutionException: java.lang.OutOfMemoryError: Java heap space
	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
	at com.intel.analytics.bigdl.optim.DistriOptimizer$$anonfun$5$$anonfun$8.apply(DistriOptimizer.scala:262)
	at com.intel.analytics.bigdl.optim.DistriOptimizer$$anonfun$5$$anonfun$8.apply(DistriOptimizer.scala:262)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
	at scala.collection.AbstractTraversable.map(Traversable.scala:105)
	at com.intel.analytics.bigdl.optim.DistriOptimizer$$anonfun$5.apply(DistriOptimizer.scala:262)
	at com.intel.analytics.bigdl.optim.DistriOptimizer$$anonfun$5.apply(DistriOptimizer.scala:202)
	at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.OutOfMemoryError: Java heap space

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1952)
	at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1025)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
	at org.apache.spark.rdd.RDD.reduce(RDD.scala:1007)
	at com.intel.analytics.bigdl.optim.DistriOptimizer$.optimize(DistriOptimizer.scala:312)
	at com.intel.analytics.bigdl.optim.DistriOptimizer.optimize(DistriOptimizer.scala:914)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:209)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.ExecutionException: java.lang.OutOfMemoryError: Java heap space
	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
	at com.intel.analytics.bigdl.optim.DistriOptimizer$$anonfun$5$$anonfun$8.apply(DistriOptimizer.scala:262)
	at com.intel.analytics.bigdl.optim.DistriOptimizer$$anonfun$5$$anonfun$8.apply(DistriOptimizer.scala:262)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
	at scala.collection.AbstractTraversable.map(Traversable.scala:105)
	at com.intel.analytics.bigdl.optim.DistriOptimizer$$anonfun$5.apply(DistriOptimizer.scala:262)
	at com.intel.analytics.bigdl.optim.DistriOptimizer$$anonfun$5.apply(DistriOptimizer.scala:202)
	at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more
Caused by: java.lang.OutOfMemoryError: Java heap space
