In [1]:
# OpenGym CartPole-v0 with A3C on GPU
# -----------------------------------
#
# A3C implementation with GPU optimizer threads.
# 
# Made as part of blog series Let's make an A3C, available at
# https://jaromiru.com/2017/02/16/lets-make-an-a3c-theory/
#
# author: Jaromir Janisch, 2017

import numpy as np
import tensorflow as tf

import matplotlib.pyplot as plt
%matplotlib inline
from IPython import display

import gym, time, random, threading

from keras.models import *
from keras.layers import *
from keras.callbacks import TensorBoard
from keras import backend as K
from keras import optimizers


#-- constants
#ENV = 'CartPole-v0'

RUN_TIME = 30
THREADS = 16
OPTIMIZERS = 8
THREAD_DELAY = 0.001

GAMMA = 0.99

N_STEP_RETURN = 8
GAMMA_N = GAMMA ** N_STEP_RETURN

EPS_START = 0.4
EPS_STOP  = .15
EPS_STEPS = 75000

MIN_BATCH = 32
LEARNING_RATE = 5e-3
DECAY = .99

LOSS_V = .5			# v loss coefficient
LOSS_ENTROPY = .01 	# entropy coefficient

#---------
class Brain:
	train_queue = [ [], [], [], [], [] ]	# s, a, r, s', s' terminal mask
	lock_queue = threading.Lock()

	def __init__(self):
		self.graph = tf.reset_default_graph()
		self.session = tf.Session()
		K.set_session(self.session)
		K.manual_variable_initialization(True)

		self.model = self._build_model()
		self.graph = self._build_graph(self.model)

		self.session.run(tf.global_variables_initializer())
		self.default_graph = tf.get_default_graph()

		self._log_model()
        
		self.default_graph.finalize()	# avoid modifications
        
	def _build_model(self):

		l_input = Input( batch_shape=(None, NUM_STATE) )
		l_dense = Dense(16, activation='relu')(l_input)

		out_actions = Dense(NUM_ACTIONS, activation='softmax')(l_dense)
		out_value   = Dense(1, activation='linear')(l_dense)

		model = Model(inputs=[l_input], outputs=[out_actions, out_value])
		model._make_predict_function()	# have to initialize before threading

		return model

	def _log_model(self):

		tbCallback = TensorBoard(log_dir='./Graph', histogram_freq=16, write_graph=True, write_grads=False, write_images=True)
		tbCallback.set_model(self.model)

	def _build_graph(self, model):
		s_t = tf.placeholder(tf.float32, shape=(None, NUM_STATE))
		a_t = tf.placeholder(tf.float32, shape=(None, NUM_ACTIONS))
		r_t = tf.placeholder(tf.float32, shape=(None, 1)) # not immediate, but discounted n step reward
		p, v = model(s_t)

		log_prob = tf.log( tf.reduce_sum(p * a_t, axis=1, keep_dims=True) + 1e-10)
		advantage = r_t - v

		loss_policy = - log_prob * tf.stop_gradient(advantage)									# maximize policy
		loss_value  = LOSS_V * tf.square(advantage)												# minimize value error
		entropy = LOSS_ENTROPY * tf.reduce_sum(p * tf.log(p + 1e-10), axis=1, keep_dims=True)	# maximize entropy (regularization)

		loss_total = tf.reduce_mean(loss_policy + loss_value + entropy)
		optimizer = tf.train.RMSPropOptimizer(LEARNING_RATE, decay=.99)
		minimize = optimizer.minimize(loss_total)
        
		return s_t, a_t, r_t, minimize

	def optimize(self):
		if len(self.train_queue[0]) < MIN_BATCH:
			time.sleep(0)	# yield
			return

		with self.lock_queue:
			if len(self.train_queue[0]) < MIN_BATCH:	# more thread could have passed without lock
				return 									# we can't yield inside lock

			s, a, r, s_, s_mask = self.train_queue
			self.train_queue = [ [], [], [], [], [] ]

		s = np.vstack(s)
		a = np.vstack(a)
		r = np.vstack(r)
		s_ = np.vstack(s_)
		s_mask = np.vstack(s_mask)

		if len(s) > 5*MIN_BATCH: print("Optimizer alert! Minimizing batch of %d" % len(s))

		v = self.predict_v(s_)
		r = r + GAMMA_N * v * s_mask	# set v to 0 where s_ is terminal state
		s_t, a_t, r_t, minimize = self.graph
        
		#summary = tf.Summary()
		#summary.value.add(tag='Perf/reward_t', simple_value=float(r))
		#summary.value.add(tag='Perf/reward_t', simple_value=float(r_t))
		#summary.value.add(tag='Loss/minimize', simple_value=float(minimize))
		#self.summary_writer.add_summary(summary, episode_count)
		#self.summary_writer.flush()
        
		self.session.run(minimize, feed_dict={s_t: s, a_t: a, r_t: r})

	def train_push(self, s, a, r, s_):
		with self.lock_queue:
			self.train_queue[0].append(s)
			self.train_queue[1].append(a)
			self.train_queue[2].append(r)

			if s_ is None:
				self.train_queue[3].append(NONE_STATE)
				self.train_queue[4].append(0.)
			else:	
				self.train_queue[3].append(s_)
				self.train_queue[4].append(1.)

	def predict(self, s):
		with self.default_graph.as_default():
			p, v = self.model.predict(s)
			return p, v

	def predict_p(self, s):
		with self.default_graph.as_default():
			p, v = self.model.predict(s)
			return p

	def predict_v(self, s):
		with self.default_graph.as_default():
			p, v = self.model.predict(s)		
			return v

#---------
frames = 0
class Agent:
	def __init__(self, eps_start, eps_end, eps_steps):
		self.eps_start = eps_start
		self.eps_end   = eps_end
		self.eps_steps = eps_steps

		self.memory = []	# used for n_step return
		self.R = 0.

	def getEpsilon(self):
		if(frames >= self.eps_steps):
			return self.eps_end
		else:
			return self.eps_start + frames * (self.eps_end - self.eps_start) / self.eps_steps	# linearly interpolate

	def act(self, s):
		eps = self.getEpsilon()			
		global frames; frames = frames + 1

		if random.random() < eps:
			return random.randint(0, NUM_ACTIONS-1)

		else:         
			s = np.array([s])
			p = brain.predict_p(s)[0]

			# a = np.argmax(p)
			a = np.random.choice(NUM_ACTIONS, p=p)

			return a
	
	def train(self, s, a, r, s_):
		def get_sample(memory, n):
			s, a, _, _  = memory[0]
			_, _, _, s_ = memory[n-1]

			return s, a, self.R, s_

		a_cats = np.zeros(NUM_ACTIONS)	# turn action into one-hot representation
		a_cats[a] = 1 

		self.memory.append( (s, a_cats, r, s_) )

		self.R = ( self.R + r * GAMMA_N ) / GAMMA

		if s_ is None:
			while len(self.memory) > 0:
				n = len(self.memory)
				s, a, r, s_ = get_sample(self.memory, n)
				brain.train_push(s, a, r, s_)

				self.R = ( self.R - self.memory[0][2] ) / GAMMA
				self.memory.pop(0)		

			self.R = 0

		if len(self.memory) >= N_STEP_RETURN:
			s, a, r, s_ = get_sample(self.memory, N_STEP_RETURN)
			brain.train_push(s, a, r, s_)

			self.R = self.R - self.memory[0][2]
			self.memory.pop(0)	
	
	# possible edge case - if an episode ends in <N steps, the computation is incorrect
		
#---------
class Environment(threading.Thread):
	stop_signal = False

	def __init__(self, render=False, eps_start=EPS_START, eps_end=EPS_STOP, eps_steps=EPS_STEPS):
		threading.Thread.__init__(self)

		self.render = render
		self.env = MarketEnv(dir_path = "../../ENV/sample_data/", target_codes = codeMap.keys(), input_codes = [], start_date = "2010-08-25", end_date = "2015-08-25", sudden_death = -1.0)
		self.agent = Agent(eps_start, eps_end, eps_steps)
		self.summary_writer = tf.summary.FileWriter('./Graph')

	def runEpisode(self):
		s = self.env.reset()
		self.episode_rewards = []
		episode_count = 0
		R = 0
		while True:         
			time.sleep(THREAD_DELAY) # yield 

			if self.render: self.env.render()

			a = self.agent.act(s)
			s_, r, done, info = self.env.step(a)

			if done: # terminal state
				s_ = None

			self.agent.train(s, a, r, s_)

			s = s_
			R += r
			episode_count += 1
			#if episode_count % 10 == 0 and episode_count != 0:
			#	if self.env.actions[a] == "LONG" or self.env.actions[a] == "SHORT":
			#		print "%s:\t%s\t%.2f\t%.2f\t\n" % (info["dt"], self.env.actions[a], R, info["cum"])
			if done or self.stop_signal:
				break

		summary = tf.Summary()
		summary.value.add(tag='Perf/Reward', simple_value=float(R))
		self.summary_writer.add_summary(summary, episode_count)
		self.summary_writer.flush()
		print("Total R:", R)

	def run(self):
		while not self.stop_signal:
			self.runEpisode()

	def stop(self):
		self.stop_signal = True

#---------
class Optimizer(threading.Thread):
	stop_signal = False

	def __init__(self):
		threading.Thread.__init__(self)

	def run(self):
		while not self.stop_signal:
			brain.optimize()

	def stop(self):
		self.stop_signal = True


#-- main
if __name__ == "__main__":
	import os
	import numpy as np
	import sys
	sys.path.append("../../ENV")
	from market_env import MarketEnv
	#from market_model_builder import MarketPolicyGradientModelBuilder

	import sys
	import codecs

	#codeListFilename = sys.argv[1]
	codeListFilename = './kospi_10.csv'
	#modelFilename = sys.argv[2] if len(sys.argv) > 2 else None
	modelFilename = 'pg.h5'
	#historyFilename = sys.argv[3] if len(sys.argv) > 3 else None
	historyFilename = sys.argv[3] if len(sys.argv) > 3 else None
    
	codeMap = {}
	f = codecs.open(codeListFilename, "r", "utf-8")

	for line in f:
		if line.strip() != "":
			tokens = line.strip().split(",") if not "\t" in line else line.strip().split("\t")
			codeMap[tokens[0]] = tokens[1]

	f.close()
    
env_test = Environment(render=False, eps_start=0., eps_end=0.)
#NUM_STATE = env_test.env.observation_space.shape[0]
NUM_STATE = 2*env_test.env.observation_space.shape[0] + 3
NUM_ACTIONS = env_test.env.action_space.n
NONE_STATE = np.zeros(NUM_STATE)

brain = Brain()	# brain is global in A3C

envs = [Environment() for i in range(THREADS)]
opts = [Optimizer() for i in range(OPTIMIZERS)]

for o in opts:
	o.start()

for e in envs:
	e.start()

time.sleep(RUN_TIME)

for e in envs:
	e.stop()
for e in envs:
	e.join()

for o in opts:
	o.stop()
for o in opts:
	o.join()

env_test.run()

Using TensorFlow backend.


('Total R:', -0.37542188014393096)
('Total R:', -0.7194061097542513)
('Total R:', 0.062194003077353566)
('Total R:', 1.9527258119762188)
('Total R:', -0.15869061204650325)
('Total R:', -1.914809776155471)
('Total R:', 0.8939290855286188)
('Total R:', 0.7055444938825364)
('Total R:', 0.49596945134250237)
('Total R:', 0.7982523037143882)
('Total R:', 0.9152821301804118)
('Total R:', -1.5055041184537128)
('Total R:', -2.3190385655635675)
('Total R:', 0.4716843888594362)
('Total R:', 0.32427439156893356)
('Total R:', -0.24409569107597906)
('Total R:', -0.3959701288333253)
('Total R:', -0.48376724191440257)
('Total R:', -5.359714430100694)
('Total R:', 0.5489624392820076)('Total R:', 0.6108477129701805)
('Total R:', -0.034209602664634464)

('Total R:', 1.5292320811487992)('Total R:', -0.5497928502417229) ('Total R:', 0.3911402113169693) ('Total R:', -0.005789982365044244) 

 
('Total R:', -1.3623246353484588)('Total R:', -0.41348759594527007)('Total R:', 1.4057323113072466)('Total R:', 0.61

[2017-09-25 11:27:41,031] Internal Python error in the inspect module.
Below is the traceback from this internal error.



Traceback (most recent call last):
  File "/Users/czuend/jupyter/lib/python2.7/site-packages/IPython/core/ultratb.py", line 1132, in get_records
    return _fixed_getinnerframes(etb, number_of_lines_of_context, tb_offset)
  File "/Users/czuend/jupyter/lib/python2.7/site-packages/IPython/core/ultratb.py", line 313, in wrapped
    return f(*args, **kwargs)
  File "/Users/czuend/jupyter/lib/python2.7/site-packages/IPython/core/ultratb.py", line 358, in _fixed_getinnerframes
    records = fix_frame_records_filenames(inspect.getinnerframes(etb, context))
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/inspect.py", line 1044, in getinnerframes
    framelist.append((tb.tb_frame,) + getframeinfo(tb, context))
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/inspect.py", line 1004, in getframeinfo
    filename = getsourcefile(frame) or getfile(frame)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/inspec

IndexError: string index out of range

In [None]:
'''
self.summary_writer = tf.summary.FileWriter("train_"+str(self.number))
mean_reward = np.mean(self.episode_rewards[-50:])
mean_length = np.mean(self.episode_lengths[-50:])
mean_value = np.mean(self.episode_mean_values[-50:])
summary = tf.Summary()
summary.value.add(tag='Perf/Reward', simple_value=float(mean_reward))
summary.value.add(tag='Perf/Length', simple_value=float(mean_length))
summary.value.add(tag='Perf/Value', simple_value=float(mean_value))
'''



'''
		self.summary_op = tf.merge_all_summaries()
		self.summary_writer = tf.train.SummaryWriter("{}/{}".format(self.summ_base_dir, self.actor_id), self.session.graph_def) 


	def setup_summaries(self):
		episode_reward = tf.Variable(0.)
		s1 = tf.scalar_summary("Episode Reward " + str(self.actor_id), episode_reward)
		summary_vars = [episode_reward]
		else:
			episode_ave_max_q = tf.Variable(0.)
			s2 = tf.scalar_summary("Max Q Value " + str(self.actor_id), episode_ave_max_q)
			logged_epsilon = tf.Variable(0.)
			s3 = tf.scalar_summary("Epsilon " + str(self.actor_id), logged_epsilon)
			summary_vars = [episode_reward, episode_ave_max_q, logged_epsilon]
		summary_placeholders = [tf.placeholder("float") for _ in range(len(summary_vars))]
		update_ops = [summary_vars[i].assign(summary_placeholders[i]) for i in range(len(summary_vars))]
		with tf.control_dependencies(update_ops):
			summary_ops = tf.merge_all_summaries()
		return summary_placeholders, update_ops, summary_ops

'''