-
Notifications
You must be signed in to change notification settings - Fork 0
/
methods.py
363 lines (271 loc) · 14.1 KB
/
methods.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
import numpy as np
import matplotlib.pyplot as plt
import random
from environment import *
def policy_evaluation(grid, policy, gamma=1.0, threshold=0.0001):
# Make sure policy has right dimensions
assert policy.shape == (grid.state_size, grid.action_size)
# Make sure delta is bigger than the threshold to start with
delta = 3 * threshold
# Get the reward and transition matrices
R = grid.get_reward_matrix()
T = grid.get_transition_matrix()
# The value is initialised at 0
V = np.zeros(grid.state_size)
V_new = V.copy()
epochs = 0
# While the Value has not yet converged do:
while delta > threshold:
epochs += 1
for state_idx in range(grid.state_size):
# If it is one of the absorbing states, ignore
if grid.absorbing[0, state_idx]:
continue
# Accumulator variable for the Value of a state
tmpV = 0
for action_idx in range(grid.action_size):
# Accumulator variable for the State-Action Value
tmpQ = 0
for state_idx_prime in range(grid.state_size):
tmpQ += T[state_idx_prime, state_idx, action_idx] * \
(R[state_idx_prime, state_idx, action_idx] + gamma * V[state_idx_prime])
tmpV += policy[state_idx, action_idx] * tmpQ
# Update the value of the state
V_new[state_idx] = tmpV
# After updating the values of all states, update the delta
delta = np.abs(V_new - V).max()
# and save the new value into the old
V = V_new.copy()
return V, epochs
def policy_iteration(grid, gamma=1.0,threshold=0.0001):
policy = np.zeros((grid.state_size, grid.action_size))
# Initialise a random policy
policy[:, 0] = 1.0
T = grid.get_transition_matrix()
R = grid.get_reward_matrix()
epochs = 0
while True:
# Policy Evaluation
V, epochs_eval = policy_evaluation(grid, policy, gamma, threshold)
epochs += epochs_eval
# Policy Improvement
policy_stable = True
for state_idx in range(grid.state_size):
if grid.absorbing[0, state_idx]:
continue
old_action = np.argmax(policy[state_idx, :])
Q = np.zeros(grid.action_size)
for state_idx_prime in range(grid.state_size):
Q += T[state_idx_prime, state_idx, :] * (R[state_idx_prime, state_idx, :] + gamma * V[state_idx_prime])
new_policy = np.zeros(grid.action_size)
new_policy[np.argmax(Q)] = 1.0
policy[state_idx] = new_policy
if old_action != np.argmax(policy[state_idx]):
policy_stable = False
if policy_stable:
return V, policy, epochs
def value_iteration(grid, gamma=1.0, threshold=0.0001):
# The value is initialised at 0
V = np.zeros(grid.state_size)
# Get the reward and transition matrices
T = grid.get_transition_matrix()
R = grid.get_reward_matrix()
epochs = 0
while True:
epochs += 1
delta = 0
for state_idx in range(grid.state_size):
if grid.absorbing[0, state_idx]:
continue
v = V[state_idx]
Q = np.zeros(grid.action_size)
for state_idx_prime in range(grid.state_size):
Q += T[state_idx_prime, state_idx, :] * \
(R[state_idx_prime, state_idx, :] + gamma * V[state_idx_prime])
V[state_idx]= np.max(Q)
delta = max(delta, np.abs(v - V[state_idx]))
if delta < threshold:
optimal_policy = np.zeros((grid.state_size, grid.action_size))
for state_idx in range(grid.state_size):
Q = np.zeros(grid.action_size)
for state_idx_prime in range(grid.state_size):
Q += T[state_idx_prime, state_idx, :] * \
(R[state_idx_prime, state_idx, :] + gamma * V[state_idx_prime])
optimal_policy[state_idx, np.argmax(Q)] = 1
return optimal_policy, epochs
def MC_policy_evaluation(grid, policy, num_episodes, gamma=1.0):
V = [0.0 for i in range(grid.state_size)]
R = [[] for i in range(grid.state_size)]
for i in range(num_episodes):
starting_loc = random.choice(list(set(grid.locs)-set(grid.absorbing_locs)))
starting_state = grid.loc_to_state(starting_loc, grid.locs)
trace = grid.sample_episode(policy, starting_loc, gamma=gamma, max_episode_len=100)
G = 0
state_trace = ([s[0] for s in trace])
visited_states = set(transition[0] for transition in trace) # first visit MC
for state in visited_states:
state_1st_occurance_idx = state_trace.index(state)
G = [transition[2] * gamma ** k for (k, transition) in enumerate(trace[state_1st_occurance_idx:])]
G = np.sum(G)
R[grid.loc_to_state(state, grid.locs)].append(G)
V[grid.loc_to_state(state, grid.locs)] = np.mean(R[grid.loc_to_state(state, grid.locs)])
return np.array(V)
def TD_estimation(grid, policy, num_episodes, gamma=1.0, alpha=None):
# Init state value function
V = [0.0 for i in range(grid.state_size)]
# State visit counter
C = [0 for i in range(grid.state_size)]
for i in range(num_episodes):
# Start at random location
starting_loc = random.choice(list(set(grid.locs)-set(grid.absorbing_locs)))
# Sample a trace
trace = grid.sample_episode(policy, starting_loc, gamma=gamma, max_episode_len=100)
for transition in trace:
state = grid.loc_to_state(transition[0], grid.locs)
C[state] += 1
state_prime = grid.loc_to_state(transition[-1], grid.locs)
reward = transition[2]
td_target = reward + gamma * V[state_prime]
td_error = td_target - V[state]
alpha_t = 1 / C[state] if alpha == None else alpha
# Gradually forget older visits
V[state] += alpha_t * td_error
return np.array(V)
def on_policy_eps_greedy_MC_control(grid, policy, num_episodes, gamma=1.0, epsilon=1.0):
# Init state-action function to zeros
Q = np.zeros((grid.state_size, grid.action_size))
# Returns
R = [[0.0 for j in range(grid.action_size)] for i in range(grid.state_size)]
# Keep counter to calculate the average return
C = [[0 for j in range(grid.action_size)] for i in range(grid.state_size)]
for i in range(num_episodes):
# Start at random location
starting_loc = random.choice(list(set(grid.locs) - set(grid.absorbing_locs)))
# Sample a trace
trace = grid.sample_episode(policy, starting_loc, gamma=gamma, max_episode_len=100)
# Encourage exploration at first episodes
epsilon = max(epsilon * 0.99995, 0.05)
# Create a state, action trace
trace_state_actions = [(s, a) for (s, a, _, _) in trace]
# List of visited states
visited_states = list(set(grid.loc_to_state(transition[0], grid.locs) for transition in trace))
visited_states.sort()
# List of visited state-action pairs
visited_state_actions = [state_action for t, state_action in \
enumerate(trace_state_actions) if state_action not in trace_state_actions[:t]]
# Since we are doing first visit MC
for state_action in visited_state_actions:
state_action_1st_occurance_idx = trace_state_actions.index(state_action)
G = np.sum([transition[2] * gamma ** k for (k, transition) in \
enumerate(trace[state_action_1st_occurance_idx:])])
state_loc, action = state_action
state = grid.loc_to_state(state_loc, grid.locs)
R[state][grid.action_to_idx(action)] += G
C[state][grid.action_to_idx(action)] += 1
Q[state, grid.action_to_idx(action)] = R[state][grid.action_to_idx(action)] / C[state][grid.action_to_idx(action)]
# Make an epsilon-greedy policy
best_actions = np.argmax(Q[visited_states], 1)
policy[visited_states, best_actions] = 1 - epsilon + epsilon / grid.action_size
other_actions = [[i for i in range(grid.action_size) if i!= j] for j in best_actions]
other_actions = np.array(other_actions).T
policy[visited_states, other_actions] = epsilon / grid.action_size
return Q, policy
def MC_iterative_eps_greedy_control(grid, num_episodes, gamma=1.0, epsilon=0.9, alpha=0.1):
# Init state-action function to zeros
Q = np.zeros((grid.state_size, grid.action_size))
# Init eps-greedy policy
policy = np.ones((grid.state_size, grid.action_size)) * 0.25
for i in range(num_episodes):
# Start at random location
starting_loc = random.choice(list(set(grid.locs) - set(grid.absorbing_locs)))
# Sample a trace
trace = grid.sample_episode(policy, starting_loc, gamma=gamma, max_episode_len=100)
# Encourage exploration at first episodes
epsilon = max(epsilon * 0.99995, 0.05)
# Create a state, action trace
trace_state_actions = [(s, a) for (s, a, _, _) in trace]
# List of visited states
visited_states = list(set(grid.loc_to_state(transition[0], grid.locs) for transition in trace))
visited_states.sort()
# List of visited state-action pairs
visited_state_actions = [state_action for t, state_action in \
enumerate(trace_state_actions) if state_action not in trace_state_actions[:t]]
# Since we are doing first visit MC
for state_action in visited_state_actions:
state_action_1st_occurance_idx = trace_state_actions.index(state_action)
G = np.sum([transition[2] * gamma ** k for (k, transition) in \
enumerate(trace[state_action_1st_occurance_idx:])])
state_loc, action = state_action
state = grid.loc_to_state(state_loc, grid.locs)
Q[state, grid.action_to_idx(action)] += alpha * (G - Q[state, grid.action_to_idx(action)])
# Make an epsilon-greedy policy
best_actions = np.argmax(Q[visited_states], 1)
policy[visited_states, best_actions] = 1 - epsilon + epsilon / grid.action_size
other_actions = [[i for i in range(grid.action_size) if i!= j] for j in best_actions]
other_actions = np.array(other_actions).T
policy[visited_states, other_actions] = epsilon / grid.action_size
return Q, policy
def SARSA(grid, num_episodes, max_episode_len=100, gamma=1.0, epsilon=1.0, alpha=0.1):
# Init state-action function to zeros
Q = np.zeros((grid.state_size, grid.action_size))
for i in range(num_episodes):
# Start at random location
state_loc = random.choice(list(set(grid.locs) - set(grid.absorbing_locs)))
state_idx = grid.loc_to_state(state_loc, grid.locs)
# Choose a' from s' using policy derived from Q (epsilon-greedy)
action_idx = epsilon_greedy_action(Q, epsilon, state_idx)
steps = 0
while (steps <= max_episode_len) & (not grid.absorbing[0, state_idx]):
steps += 1
# Take a step : take action a', observe reward and s'
_, _, reward, state_prime_idx = grid.state_action_step(state_idx, action_idx)
# Choose a' from s' using policy derived from Q (epsilon-greedy)
action_prime_idx = epsilon_greedy_action(Q, epsilon, state_prime_idx)
# Update Q function
Q[state_idx, action_idx] += alpha * \
(reward + gamma * Q[state_prime_idx, action_prime_idx] - Q[state_idx, action_idx])
# Update state, action
state_idx, action_idx = state_prime_idx, action_prime_idx
# Encourage exploration at first episodes
epsilon = max(epsilon * 0.99995, 0.05)
policy = Q.argmax(1)
return Q, policy
def Q_learning(grid, num_episodes, max_episode_len=100, gamma=1.0, epsilon=1.0, alpha=0.1):
# Init state-action function to zeros
Q = np.zeros((grid.state_size, grid.action_size))
for i in range(num_episodes):
# Start at random location
state_loc = random.choice(list(set(grid.locs) - set(grid.absorbing_locs)))
state_idx = grid.loc_to_state(state_loc, grid.locs)
steps = 0
while (steps <= max_episode_len) & (not grid.absorbing[0, state_idx]):
steps += 1
# Choose a' from s' using policy derived from Q (epsilon-greedy)
action_idx = epsilon_greedy_action(Q, epsilon, state_idx)
# Take a step : take action a', observe reward and s'
_, _, reward, state_prime_idx = grid.state_action_step(state_idx, action_idx)
# Update Q function (choose greedy action from s')
Q[state_idx, action_idx] += alpha * \
(reward + gamma * Q[state_prime_idx].max() - Q[state_idx, action_idx])
# Update state
state_idx = state_prime_idx
# Encourage exploration at first episodes
epsilon = max(epsilon * 0.99995, 0.05)
policy = Q.argmax(1)
return Q, policy
def epsilon_greedy_policy(Q, epsilon):
policy = np.zeros(Q.shape)
best_actions = np.argmax(Q, 1)
policy[range(Q.shape[0]), best_actions] = 1 - epsilon + epsilon / Q.shape[1]
other_actions = [[i for i in range(Q.shape[1]) if i!= j] for j in best_actions]
other_actions = np.array(other_actions).T
policy[range(Q.shape[0]), other_actions] = epsilon / Q.shape[1]
return policy
def epsilon_greedy_action(Q, epsilon, state_idx):
if np.random.rand() < 1 - epsilon:
# p(a = a*|s) = 1 - epsilon + epsilon / |A(s)|
action_idx = Q[state_idx].argmax()
else:
# p(a = a', a'!= a*|s) = epsilon / |A(s)|
action_idx = np.random.choice(range(Q.shape[1]))
return action_idx