-
Notifications
You must be signed in to change notification settings - Fork 1
/
01_dqn_basic.py
60 lines (45 loc) · 2.09 KB
/
01_dqn_basic.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
#!/usr/bin/env python3
import gym
import ptan
import argparse
import torch
import torch.optim as optim
from tensorboardX import SummaryWriter
from lib2 import dqn_model, common
if __name__ == "__main__":
params = common.HYPERPARAMS['pong']
# params['epsilon_frames'] = 200000
parser = argparse.ArgumentParser()
parser.add_argument("--cuda", default=False, action="store_true", help="Enable cuda")
args = parser.parse_args()
device = torch.device("cuda" if args.cuda else "cpu")
env = gym.make(params['env_name'])
env = ptan.common.wrappers.wrap_dqn(env)
writer = SummaryWriter(comment="-" + params['run_name'] + "-basic")
net = dqn_model.DQN(env.observation_space.shape, env.action_space.n).to(device)
tgt_net = ptan.agent.TargetNet(net)
selector = ptan.actions.EpsilonGreedyActionSelector(epsilon=params['epsilon_start'])
epsilon_tracker = common.EpsilonTracker(selector, params)
agent = ptan.agent.DQNAgent(net, selector, device=device)
exp_source = ptan.experience.ExperienceSourceFirstLast(env, agent, gamma=params['gamma'], steps_count=1)
buffer = ptan.experience.ExperienceReplayBuffer(exp_source, buffer_size=params['replay_size'])
optimizer = optim.Adam(net.parameters(), lr=params['learning_rate'])
frame_idx = 0
with common.RewardTracker(writer, params['stop_reward']) as reward_tracker:
while True:
frame_idx += 1
buffer.populate(1)
epsilon_tracker.frame(frame_idx)
new_rewards = exp_source.pop_total_rewards()
if new_rewards:
if reward_tracker.reward(new_rewards[0], frame_idx, selector.epsilon):
break
if len(buffer) < params['replay_initial']:
continue
optimizer.zero_grad()
batch = buffer.sample(params['batch_size'])
loss_v = common.calc_loss_dqn(batch, net, tgt_net.target_model, gamma=params['gamma'], device=device)
loss_v.backward()
optimizer.step()
if frame_idx % params['target_net_sync'] == 0:
tgt_net.sync()