In [1]:
!python FluxuatingSkillSC2.py

^C


### Heirnetwork conversion

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import copy

class PolicyNet(nn.Module):
    def __init__(self, ob_space, act_space_array, activation=nn.ReLU()):
        super(PolicyNet, self).__init__()
        self.map_width = 64
        self.map_channels = C.MAP_CHANNELS

        self.use_norm = False
        self.sl_training = False

        self.obs_layer = nn.Linear(ob_space, 256)
        self.controller_layer1 = nn.Linear(256, 256)
        self.controller_layer2 = nn.Linear(256, 64)

        self.minimap_info_layer = nn.Conv2d(self.map_channels, 32, kernel_size=3)
        self.minimap_info_layer2 = nn.Conv2d(32, 64, kernel_size=3)
        self.minimap_info_layer3 = nn.Conv2d(64, 64, kernel_size=3)
        self.minimap_info_layer4 = nn.Conv2d(64, 3, kernel_size=3)

        self.battle_info_layer = nn.Linear(64 + 3 * 3, 256)
        self.battle_probs_layer = nn.Linear(256, act_space_array[0])
        self.battle_pos_probs_layer = nn.Linear(256, act_space_array[1])

        self.value_layer1 = nn.Linear(ob_space, 256)
        self.value_layer2 = nn.Linear(256, 128)
        self.value_layer3 = nn.Linear(128, 128)
        self.value_layer4 = nn.Linear(128, 1)

    def forward(self, obs, map_data):
        obs = F.relu(self.obs_layer(obs))
        obs = F.relu(self.controller_layer1(obs))
        obs = self.controller_layer2(obs)

        map_data = F.relu(self.minimap_info_layer(map_data))
        map_data = F.max_pool2d(map_data, kernel_size=2)
        map_data = F.relu(self.minimap_info_layer2(map_data))
        map_data = F.max_pool2d(map_data, kernel_size=2)
        map_data = F.relu(self.minimap_info_layer3(map_data))
        map_data = F.max_pool2d(map_data, kernel_size=2)
        map_data = F.relu(self.minimap_info_layer4(map_data))
        map_data = map_data.view(-1, 3 * 3)

        battle_info = torch.cat((obs, map_data), dim=1)
        battle_info = F.relu(self.battle_info_layer(battle_info))
        battle_probs = self.battle_probs_layer(battle_info)
        battle_probs = F.softmax(battle_probs, dim=1)

        battle_pos_probs = self.battle_pos_probs_layer(battle_info)
        battle_pos_probs = F.softmax(battle_pos_probs, dim=1)

        value = F.relu(self.value_layer1(obs))
        value = F.relu(self.value_layer2(value))
        value = F.relu(self.value_layer3(value))
        value = self.value_layer4(value)

        return battle_probs, battle_pos_probs, value

    def get_action(self, obs, map_data):
        battle_probs, battle_pos_probs, _ = self.forward(obs, map_data)
        battle_act = torch.multinomial(battle_probs, num_samples=1)
        battle_pos = torch.multinomial(battle_pos_probs, num_samples=1)
        return battle_act, battle_pos

    def get_values(self, obs, map_data):
        _, _, value = self.forward(obs, map_data)
        return value

class PPOTrain:
    def __init__(self, policy, old_policy, gamma=0.995, clip_value=0.2, c_1=0.01, c_2=1e-6, epoch_num=20):
        self.policy = policy
        self.old_policy = old_policy
        self.gamma = gamma
        self.clip_value = clip_value
        self.c_1 = c_1
        self.c_2 = c_2
        self.epoch_num = epoch_num

        self.optimizer = torch.optim.Adam(self.policy.parameters(), lr=P.lr, eps=1e-5)

    def get_loss(self, obs, map_data, battle_actions, battle_pos, gaes, rewards, v_preds_next):
        battle_probs, battle_pos_probs, v_preds = self.policy.forward(obs, map_data)
        act_probs = battle_probs * torch.one_hot(battle_actions, battle_probs.shape[1])
        act_probs = act_probs.sum(dim=1)
        act_probs_old = self.old_policy.forward(obs, map_data)[0] * torch.one_hot(battle_actions, battle_probs.shape[1])
        act_probs_old = act_probs_old.sum(dim=1)

        ratios = torch.exp(torch.log(act_probs) - torch.log(act_probs_old))
                clipped_ratios = torch.clamp(ratios, 1 - self.clip_value, 1 + self.clip_value)
        loss_clip = torch.minimum(gaes * ratios, gaes * clipped_ratios)
        loss_clip = -loss_clip.mean()

        battle_act_entropy = -torch.sum(battle_probs * torch.log(battle_probs), dim=1)
        battle_pos_entropy = -torch.sum(battle_pos_probs * torch.log(battle_pos_probs), dim=1)
        entropy = battle_act_entropy + battle_pos_entropy
        entropy = entropy.mean()

        loss_vf = (v_preds - rewards - self.gamma * v_preds_next) ** 2
        loss_vf = loss_vf.mean()

        total_loss = loss_clip + self.c_1 * loss_vf - self.c_2 * entropy
        return total_loss

    def train(self, obs, map_data, battle_actions, battle_pos, gaes, rewards, v_preds_next):
        self.optimizer.zero_grad()
        loss = self.get_loss(obs, map_data, battle_actions, battle_pos, gaes, rewards, v_preds_next)
        loss.backward()
        self.optimizer.step()
        return loss.item()

    def assign_policy_parameters(self):
        for param, old_param in zip(self.policy.parameters(), self.old_policy.parameters()):
            old_param.data.copy_(param.data)

    def get_gaes(self, rewards, v_preds, v_preds_next):
        deltas = [r_t + self.gamma * v_next - v for r_t, v_next, v in zip(rewards, v_preds_next, v_preds)]
        gaes = copy.deepcopy(deltas)
        for t in reversed(range(len(gaes) - 1)):
            gaes[t] = gaes[t] + self.gamma * self.lamda * gaes[t + 1]
        return gaes

    def ppo_train_dis(self, observations, map_data, battle_actions, battle_pos, rewards, v_preds, v_preds_next, gaes, returns):
        observations = torch.tensor(observations, dtype=torch.float32)
        map_data = torch.tensor(map_data, dtype=torch.float32)
        battle_actions = torch.tensor(battle_actions, dtype=torch.int64)
        battle_pos = torch.tensor(battle_pos, dtype=torch.int64)
        gaes = torch.tensor(gaes, dtype=torch.float32)
        rewards = torch.tensor(rewards, dtype=torch.float32)
        v_preds_next = torch.tensor(v_preds_next, dtype=torch.float32)

        for epoch in range(self.epoch_num):
            self.optimizer.zero_grad()
            loss = self.train(observations, map_data, battle_actions, battle_pos, gaes, rewards, v_preds_next)
            self.optimizer.step()

        self.assign_policy_parameters()
        return loss