In [1]:
from model import DQN
import numpy as np
import collections
import torch
import torch.nn as nn
import torch.optim as optim
from lib import GetScreen, Actions, env, GetHp
import pickle
import copy


from tensorboardX import SummaryWriter
import time
import random
import pickle

import torch.backends.cudnn as cudnn

cudnn.deterministic = True

cudnn.benchmark = True
torch.backends.cudnn.enabled = True





In [2]:
GAMMA = 0.99
MAX_EPISODE = int(input('play_turn:'))
ESC = 0x1B

epsilon_final = 0.02
epsilon_decay_frame = 10**5
replay_min_size = 2000
batch_size = 32
device = "cuda"
epsilon_start = 1.0

In [3]:

Experience = collections.namedtuple(
    "Experience", field_names=["state", "action", "reward", "done", "new_state"]
)


class ExperienceBuffer:
    def __init__(self, capacity):
        self.buffer = collections.deque(maxlen=capacity)

    def __len__(self):
        return len(self.buffer)

    def append(self, experience):
        self.buffer.append(experience)

    def sample(self, batch_size):
        indices = np.random.choice(len(self.buffer), batch_size, replace=False)
        state, action, reward, dones, next_state = zip(
            *[self.buffer[idx] for idx in indices]
        )
        return (
            torch.stack(state),
            np.array(action),
            np.array(reward, dtype=np.float32),
            np.array(dones, dtype=np.bool8),
            torch.stack(next_state),
        )



In [4]:
# self.hp=15482
# boss.hp=215249
# 15 7 7
class Agent:
    def __init__(self, exp_buffer):
        self.buffer = exp_buffer
        self.get_screen = GetScreen.GetScreen()
        self.total_rewards = 0.0
        self.env = env.env()
        self.hpgetter = GetHp.Hp_getter()
        self.criterion = nn.SmoothL1Loss()
        # self._reset()

    def _reset(self):
        self.env._reset()
        self.state = self.get_screen.grab()
        self.total_rewards = 0.0
        self.bosshp = self.hpgetter.get_boss_hp()
        self.playerhp = self.hpgetter.get_self_hp()

    def play_step(self, net, epsilon, device="cuda"):
        done_reward = None

        if np.random.random() < epsilon:
            action = random.randint(0, 6)

        else:
            q_val_v = net(self.state)
            _, act_v = torch.max(q_val_v, dim=1)
            action = int(act_v[0].item())

        # Actions = [Attack,Shield, Roll, Jump, Move_Left, Move_Right, Nothing]
        reward, is_done, self.playerhp, self.bosshp = self.env.step(
            action, self.playerhp, self.bosshp
        )
        if reward!=0:
            print("reward:%.2f,bosshp:%d,selfhp:%d " % (reward, self.bosshp,self.playerhp),end='\r')
        new_state = self.get_screen.grab()
        self.total_rewards += reward

        exp = Experience(self.state, action, reward, is_done, new_state)
        self.buffer.append(exp)

        self.state = new_state

        if is_done:
            Actions.Nothing()
            done_reward = self.total_rewards
            # print("reward:%.2f" % (done_reward), end='\r')
            time.sleep(11)

        return done_reward

    def cal_loss(self, batch, net, tgt_net, device="cuda"):
        state, action, reward, done, next_state = batch

        state_v = torch.squeeze(state, 1).to(device)
        action_v = torch.tensor(action).to(device)
        reward_v = torch.tensor(reward).to(device)
        next_state_v = torch.squeeze(next_state, 1).to(device)

        state_action_value = (
            net(state_v).gather(
                1, action_v.unsqueeze(-1).type(torch.long)).squeeze(-1)
        )

        next_state_value = tgt_net(next_state_v).max(1)[0]
        next_state_value[done] = 0.0
        next_state_value = next_state_value.detach()
        expected_state_action = next_state_value * GAMMA + reward_v
        return self.criterion(state_action_value, expected_state_action)

In [5]:
net = DQN(7).to(device)
tgt_net = DQN(7).to(device)
# load the model
try:
    net.load_state_dict(torch.load("./checkpoints/best_model.pt"))
    frame_idx = np.load("./checkpoints/frame.npy")
    tgt_net.load_state_dict(net.state_dict())
    print(frame_idx)
    print("load model")
except:
    # if not, set epsilon
    frame_idx = 0
    print("new model")

optimizer = optim.Adam(net.parameters(), lr=0.001)

writer = SummaryWriter(comment="deadcells")



new model


  net.load_state_dict(torch.load("./checkpoints/best_model.pt"))


In [6]:
# try to load the before buffer
try:
    with open("./checkpoints/buffer.pickle", "rb") as f:
        buffer = pickle.load(f)
    print("load buffer")
except:
    buffer = ExperienceBuffer(capacity=2000)
    print("new buffer")


new buffer


In [7]:
agent = Agent(buffer)
sync_time = 1

In [12]:
agent.get_screen.show()

In [None]:

total_rewards = []
best_mean_reward = None
reward = None

# numbers of game
time_start = time.time()
agent._reset()


while len(total_rewards) < MAX_EPISODE:
    frame_idx += 1
    epsilon = max(epsilon_final, epsilon_start -
                  frame_idx / epsilon_decay_frame)

    if reward is not None:
        # one loop ends
        # if something goes wrong, break

        total_rewards.append(reward)
        mean_reward = np.mean(total_rewards[-100:])
        if len(buffer) >= replay_min_size:
            for _ in range(100):
                optimizer.zero_grad()
                batch = buffer.sample(batch_size)
                loss_t = agent.cal_loss(batch, net, tgt_net)
                print(loss_t, end='\r')
                loss_t.backward()
                optimizer.step()
            writer.add_scalar("loss", loss_t, frame_idx)
        print(
            "lenbuffer:%d,frame:%d game:%d, mean reward: %.3f, eps:%.2f"
            % (len(buffer), frame_idx, len(total_rewards), mean_reward, epsilon), end='\r'
        )
        #     print(x, end='\r')
        # print()
        writer.add_scalar("epsilon", epsilon, frame_idx)
        writer.add_scalar("reward_100", mean_reward, frame_idx)
        writer.add_scalar("reward", reward, frame_idx)

        # save model
        if best_mean_reward is None or best_mean_reward < mean_reward:
            torch.save(net.state_dict(), "./checkpoints/best_model.pt")

            if best_mean_reward is not None:
                print(
                    "Best mean reward updated %.3f -> %.3f, model saved"
                    % (best_mean_reward, mean_reward), end='\r'
                )
            best_mean_reward = mean_reward

        np.save("./checkpoints/frame.npy", frame_idx)
        if frame_idx / 10000 > sync_time:
            tgt_net.load_state_dict(net.state_dict())
            sync_time+=1

        # save buffer
        with open("./checkpoints/buffer.pickle", "wb") as f:
            pickle.dump(copy.deepcopy(buffer), f)
        # reset game
        agent._reset()
        # if for some random reason that agent do not enter the boss region
        if not agent.hpgetter.get_boss_hp():
            time.sleep(1)
            Actions.Move_Right()
            time.sleep(8)
            Actions.Nothing()
            Actions.Move_Left()
            time.sleep(5.5)
            Actions.Nothing()
            agent._reset()
    # step between copy the net to tgt_net
    reward = agent.play_step(net, epsilon, device)
    # optimize


writer.close()
print(time.localtime())
print((time.time()-time_start)/60)

lenbuffer:2000,frame:140014 game:9, mean reward: -324.839, eps:0.02

KeyboardInterrupt: 

In [10]:
frame_idx

136235

In [11]:
sync_time

14