-
Notifications
You must be signed in to change notification settings - Fork 3
/
AC.py
159 lines (128 loc) · 5.66 KB
/
AC.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
import numpy as np
import tensorflow as tf
import gym
import pandas as pd
OUTPUT_GRAPH = False
MAX_EPISODE = 500
DISPLAY_REWARD_THRESHOLD = 200 # renders environment if total episode reward is greater then this threshold
MAX_EP_STEPS = 2000 # maximum time step in one episode
RENDER = False # rendering wastes time
GAMMA = 0.9 # reward discount in TD error
LR_A = 0.001 # learning rate for actor
LR_C = 0.001 # learning rate for critic
class Actor(object):
def __init__(self, sess, n_features, n_actions, lr=0.001):
self.sess = sess
self.s = tf.placeholder(tf.float32, [1, n_features], "state")
self.a = tf.placeholder(tf.int32, None, "action")
self.q = tf.placeholder(tf.float32, None, "q") # TD_error
with tf.variable_scope('Actor'):
l1 = tf.layers.dense(
inputs=self.s,
units=20, # number of hidden units
activation=tf.nn.relu,
kernel_initializer=tf.random_normal_initializer(0., .1), # weights
bias_initializer=tf.constant_initializer(0.1), # biases
name='l1'
)
self.acts_prob = tf.layers.dense(
inputs=l1,
units=n_actions, # output units
activation=tf.nn.softmax, # get action probabilities
kernel_initializer=tf.random_normal_initializer(0., .1), # weights
bias_initializer=tf.constant_initializer(0.1), # biases
name='acts_prob'
)
with tf.variable_scope('exp_v'):
log_prob = tf.log(self.acts_prob[0, self.a])
self.exp_v = tf.reduce_mean(log_prob * self.q) # advantage (TD_error) guided loss
with tf.variable_scope('train'):
self.train_op = tf.train.AdamOptimizer(lr).minimize(-self.exp_v) # minimize(-exp_v) = maximize(exp_v)
def learn(self, s, a, q):
s = s[np.newaxis, :]
feed_dict = {self.s: s, self.a: a, self.q: q}
_, exp_v = self.sess.run([self.train_op, self.exp_v], feed_dict)
return exp_v
def choose_action(self, s):
s = s[np.newaxis, :]
probs = self.sess.run(self.acts_prob, {self.s: s}) # get probabilities for all actions
return np.random.choice(np.arange(probs.shape[1]), p=probs.ravel()) # return a int
class Critic(object):
def __init__(self, sess, n_features,n_actions, lr=0.01):
self.sess = sess
self.s = tf.placeholder(tf.float32, [None, n_features], "state")
self.a = tf.placeholder(tf.int32,[None, 1],"action")
self.r = tf.placeholder(tf.float32, None, 'r')
self.q_ = tf.placeholder(tf.float32,[None,1],'q_next')
self.a_onehot = tf.one_hot(self.a, n_actions, dtype=tf.float32)
self.a_onehot = tf.squeeze(self.a_onehot,axis=1)
self.input = tf.concat([self.s,self.a_onehot],axis=1)
with tf.variable_scope('Critic'):
l1 = tf.layers.dense(
inputs=self.input,
units=20, # number of hidden units
activation=tf.nn.relu, # None
kernel_initializer=tf.random_normal_initializer(0., .1), # weights
bias_initializer=tf.constant_initializer(0.1), # biases
name='l1'
)
self.q = tf.layers.dense(
inputs=l1,
units=1, # output units
activation=None,
kernel_initializer=tf.random_normal_initializer(0., .1), # weights
bias_initializer=tf.constant_initializer(0.1), # biases
name='Q'
)
with tf.variable_scope('squared_TD_error'):
self.td_error = self.r + GAMMA * self.q_ - self.q
self.loss = tf.square(self.td_error) # TD_error = (r+gamma*V_next) - V_eval
with tf.variable_scope('train'):
self.train_op = tf.train.AdamOptimizer(lr).minimize(self.loss)
def learn(self, s, a, r, s_):
s, s_ = s[np.newaxis, :], s_[np.newaxis, :]
next_a = [[i] for i in range(N_A)]
s_ = np.tile(s_,[N_A,1])
q_ = self.sess.run(self.q, {self.s: s_,self.a:next_a})
q_ = np.max(q_,axis=0,keepdims=True)
q, _ = self.sess.run([self.q, self.train_op],
{self.s: s, self.q_: q_, self.r: r,self.a:[[a]]})
return q
env = gym.make('CartPole-v0')
env.seed(1) # reproducible
env = env.unwrapped
N_F = env.observation_space.shape[0]
N_A = env.action_space.n
sess = tf.Session()
actor = Actor(sess, n_features=N_F, n_actions=N_A, lr=LR_A)
critic = Critic(sess, n_features=N_F,n_actions=N_A,lr=LR_C)
sess.run(tf.global_variables_initializer())
res = []
for i_episode in range(MAX_EPISODE):
s = env.reset()
t = 0
track_r = []
while True:
if RENDER:
env.render()
a = actor.choose_action(s)
s_, r, done, info = env.step(a)
if done:
r = -20
track_r.append(r)
q = critic.learn(s, a,r, s_) # gradient = grad[r + gamma * V(s_) - V(s)]
actor.learn(s, a, q) # true_gradient = grad[logPi(s,a) * td_error]
s = s_
t += 1
if done or t >= MAX_EP_STEPS:
ep_rs_sum = sum(track_r)
if 'running_reward' not in globals():
running_reward = ep_rs_sum
else:
running_reward = running_reward * 0.95 + ep_rs_sum * 0.05
if running_reward > DISPLAY_REWARD_THRESHOLD:
RENDER = True # rendering
print("episode:", i_episode, " reward:", int(running_reward))
res.append([i_episode,running_reward])
break
pd.DataFrame(res,columns=['episode','ac_reward']).to_csv('./ac_reward.csv')