In [None]:
import numpy as np
import torch
from Gridworld import Gridworld
import random
from matplotlib import pylab as plt
from IPython.display import clear_output

import copy

In [None]:
model = torch.nn.Sequential(
    torch.nn.Linear(64,150),
    torch.nn.ReLU(),
    torch.nn.Linear(150,100),
    torch.nn.ReLU(),
    torch.nn.Linear(100,4)
)

# 复制原始Q网络 创建
model2 = copy.deepcopy(model)

# 加载
model2.load_state_dict(model.state_dict())

loss_fn = torch.nn.MSELoss()
learning_rate = 1e-3
optimizer = torch.optim.Adam(model.parameters(),lr=learning_rate)

gamma = 0.9
epsilon = 0.3

In [None]:
action_set = {
    0: 'u',
    1: 'd',
    2: 'l',
    3: 'r',
}

In [None]:
from collections import deque
import sys

epoches = 5000
losses = []
mem_size = 1000  # 经验缓冲区大小
batch_size = 200  # 单次批量大小
relay = deque(maxlen=mem_size)  # 经验缓存
max_moves = 50
h = 0
sync_freq = 500  # 网络更新频率
j = 0

for i in range(epoches):
    game = Gridworld(size=4,mode='random')  # 创建游戏环境
    state1_ = game.board.render_np().reshape(1,64) + np.random.rand(1,64)/100.0
    state1 = torch.from_numpy(state1_).float()
    status = 1
    mov = 0
    print(i)

    while status == 1:
        j += 1
        mov += 1
        qval = model(state1)
        qval_ = qval.data.numpy()

        if random.random() < epsilon:
            action_ = np.random.randint(0,4)
        else:
            action_ = np.argmax(qval_)

        action = action_set[action_]
        game.makeMove(action)
        state2_ = game.board.render_np().reshape(1,64) + np.random.rand(1,64)/100.0
        state2 = torch.from_numpy(state2_).float()
        reward = game.reward()
        done = True if reward > 0 else False
        exp = (state1,action_,reward,state2,done)
        relay.append(exp)
        state1 = state2

        if len(relay) > batch_size:
            mini_batch = random.sample(relay,batch_size)
            state1_batch = torch.cat([s1 for (s1,a,r,s2,d) in mini_batch])
            action_batch = torch.Tensor([a for (s1,a,r,s2,d) in mini_batch])
            reward_batch = torch.Tensor([r for (s1,a,r,s2,d) in mini_batch])
            state2_batch = torch.cat([s2 for (s1,a,r,s2,d) in mini_batch])
            done_batch = torch.Tensor([d for (s1,a,r,s2,d) in mini_batch])
            Q1 = model(state1_batch)
            with torch.no_grad():
                Q2 = model2(state2_batch)

            # 使用目标网络获得下一状态的最大Q值
            Y = reward_batch + gamma * ((1-done_batch) * torch.max(Q2,dim=1)[0])
            X = Q1.gather(dim=1,index=action_batch.long().unsqueeze(dim=1)).squeeze()
            loss = loss_fn(X,Y.detach())

            optimizer.zero_grad()
            loss.backward()
            losses.append(loss.item())
            optimizer.step()

            # 主模型参数复制到目标网络
            if j % sync_freq == 0:
                model2.load_state_dict(model.state_dict())

    if reward != -1 or mov > max_moves:
        status = 0
        mov = 0

losses = np.array(losses)

In [None]:
plt.figure(figsize=(10,7))
plt.plot(losses)
plt.xlabel("Epochs",fontsize=22)
plt.ylabel("Loss",fontsize=22)

In [None]:
max_games = 1000
wins = 0
for i in range(max_games):
    win = test_model(model, mode='random', display=False)
    if win:
        wins += 1
win_perc = float(wins) / float(max_games)
print("Games played: {0}, # of wins: {1}".format(max_games,wins))
print("Win percentage: {}%".format(100.0*win_perc))