In [5]:
from models import *
from utils import *

net = GNet()#.cuda()
net.load_state_dict(torch.load('models/g.net', map_location=torch.device('cpu')))

net.eval()
player = RLAgent(net, stochastic=False)
obses = []
prev_obs = None
prev_heads = [None for _ in range(4)]

def agent(obs_dict, config_dict):
    global prev_obs, prev_heads
    observation = obs_dict
    if not prev_obs:
      prev_obs = observation
    # cache previous state
    for i, g in enumerate(observation['geese']):
        if len(g) > 0:
            prev_heads[i] = prev_obs['geese'][i][0]
    prev_obs = observation

    state = get_features(obs_dict, env.configuration, prev_heads)
    action, logp, v = player.raw_outputs(state)
    return ['NORTH', 'EAST', 'SOUTH', 'WEST'][action]

CUDA is not available, using CPU...


In [6]:
# Write weight to the head of the python script
weight_base64 = base64.b64encode(bz2.compress(pickle.dumps(net.state_dict())))
w = "weight= %s"%weight_base64
%store w >submission.py

NameError: name 'base64' is not defined

In [None]:
%%writefile -a submission.py
# basic imports
import numpy as np
import itertools
import matplotlib.pyplot as plt
import seaborn as sns
from random import shuffle
from copy import deepcopy
from tqdm.notebook import tqdm

import pickle
import bz2
import base64

# torch imports
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
from torch.distributions import Categorical
from torch.optim import Adam

# hungry-geese imports
from kaggle_environments import make, evaluate

env = make("hungry_geese", debug=False)


def get_previous_head(ids, last_action, rows, columns):
    if len(ids) > 1:
        return ids[1]
    return shift_head(ids[0], (last_action + 2) % 4, rows, columns)

def ids2locations(ids, prev_head, step, rows, columns):
    state = np.zeros((4, rows * columns))
    if len(ids) == 0:
        return state
    state[0, ids[0]] = 1 # goose head
    if len(ids) > 1:
        state[1, ids[1:-1]] = 1 # goose body
        state[2, ids[-1]] = 1 # goose tail
    if step != 0:
        state[3, prev_head] = 1 # goose head one step before
    return state

def get_features(observation, config, prev_heads):
    rows, columns = config['rows'], config['columns']
    geese = observation['geese']
    index = observation['index']
    step = observation['step']
    
    # convert indices to locations
    locations = np.zeros((len(geese), 4, rows * columns))
    for i, g in enumerate(geese):
        locations[i] = ids2locations(g, prev_heads[i], step, rows, columns)
    
    if index != 0: # swap rows for player locations to be in first channel
        locations[[0, index]] = locations[[index, 0]]
    
    # put locations into features
    features = np.zeros((12, rows * columns))
    for k in range(4):
        features[k] = np.sum(locations[k][:3], 0)
        features[k + 4] = np.sum(locations[:, k], 0)

    features[-4, observation['food']] = 1                                     # food channel
    features[-3, :] = (step % config['hunger_rate']) / config['hunger_rate']  # hunger danger channel
    features[-2, :] = step / config['episodeSteps']                           # timesteps channel
    features[-1, :] = float((step + 1) % config['hunger_rate'] == 0)          # hunger milestone indicator
    features = torch.Tensor(features).reshape(-1, rows, columns)
    # roll
    head_id = geese[index][0]
    head_row = head_id // columns
    head_col = head_id % columns
    features = torch.roll(features, ((rows // 2) - head_row, (columns // 2) - head_col), dims=(-2, -1))
    return features


class SEBlock(nn.Module):
    """Squeeze-Excitation Block"""
    
    def __init__(self, dim, reduction_ratio=4):
        super(SEBlock, self).__init__()
        self.f1 = nn.Linear(dim, dim // reduction_ratio)
        self.f2 = nn.Linear(dim // reduction_ratio, dim)

    def forward(self, x):
        y = x.mean(axis=(-1, -2))
        y = F.silu(self.f1(y))
        y = torch.sigmoid(self.f2(y))
        return x * y.unsqueeze(-1).unsqueeze(-1)

class BasicBlock(nn.Module):
    """Basic Residual Block"""
    
    def __init__(self, dim, downscale=False):
        super(BasicBlock, self).__init__()
        if downscale:
            self.conv1 = nn.Conv2d(dim,     2 * dim, 3, stride=2, padding=1)
            self.conv2 = nn.Conv2d(2 * dim, 2 * dim, 3, stride=1, padding=1)
            self.bnorm1 = nn.BatchNorm2d(dim)
            self.bnorm2 = nn.BatchNorm2d(2 * dim)
            self.proj = nn.Conv2d(dim, 2 * dim, 1, stride=2)
            self.se = SEBlock(2 * dim)
        else:
            self.conv1 = nn.Conv2d(dim, dim, 3, padding=1)
            self.conv2 = nn.Conv2d(dim, dim, 3, padding=1)
            self.bnorm1 = nn.BatchNorm2d(dim)
            self.bnorm2 = nn.BatchNorm2d(dim)
            self.proj = nn.Identity()
            self.se = SEBlock(dim)

    def forward(self, x):
        y = self.conv1(self.bnorm1(F.silu(x)))
        z = self.conv2(self.bnorm2(F.silu(y)))
        return self.se(z) + self.proj(x)
    
class BottleneckBlock(nn.Module):
    """Bottleneck Residual Block"""
    
    def __init__(self, dim, downscale=False):
        super(BottleneckBlock, self).__init__()
        if downscale:
            self.conv1 = nn.Conv2d(dim,     dim,     1)
            self.conv2 = nn.Conv2d(dim,     2 * dim, 3, stride=2, padding=1)
            self.conv3 = nn.Conv2d(2 * dim, 2 * dim, 1)
            self.bnorm1 = nn.BatchNorm2d(dim)
            self.bnorm2 = nn.BatchNorm2d(dim)
            self.bnorm3 = nn.BatchNorm2d(2 * dim)
            self.proj = nn.Conv2d(dim, 2 * dim, 1, stride=2)
            self.se = SEBlock(2 * dim)
        else:
            self.conv1 = nn.Conv2d(dim, dim, 1)
            self.conv2 = nn.Conv2d(dim, dim, 3, padding=1)
            self.conv3 = nn.Conv2d(dim, dim, 1)
            self.bnorm1 = nn.BatchNorm2d(dim)
            self.bnorm2 = nn.BatchNorm2d(dim)
            self.bnorm3 = nn.BatchNorm2d(dim)
            self.proj = nn.Identity()
            self.se = SEBlock(dim)
        
    def forward(self, x):
        y = self.conv1(self.bnorm1(F.silu(x)))
        z = self.conv2(self.bnorm2(F.silu(y)))
        w = self.conv3(self.bnorm3(F.silu(z)))
        return self.se(w) + self.proj(x)

class ResLayers(nn.Module):
    """Sequential Residual Layers"""
    
    def __init__(self, block, dim, depth):
        super(ResLayers, self).__init__()
        self.blocks = nn.ModuleList(
            [block(dim, downscale=False) for _ in range(depth - 1)] +
            [block(dim, downscale=True)]
            )
    
    def forward(self, x):
        for b in self.blocks:
            x = b(x)
        return x
    
class Encoder(nn.Module):
    """Res-Net Encoder"""
    
    def __init__(self, dim_in, depths):
        super(Encoder, self).__init__()
        self.gate = nn.Conv2d(12, dim_in, 1, padding=(3, 5), padding_mode='circular')
        self.layers = nn.ModuleList([
            ResLayers(BasicBlock,          dim_in, depths[0]),
            ResLayers(BasicBlock,      2 * dim_in, depths[1]),
            ResLayers(BottleneckBlock, 4 * dim_in, depths[2])
        ])

    def forward(self, x):
        z = self.gate(x)
        for l in self.layers:
            z = l(z)
        return z

class Actor(nn.Module):
    """Actor Head"""
    
    def __init__(self, dim_in, head_dim):
        super(Actor, self).__init__()
        self.compr = nn.Sequential(
            nn.Conv2d(dim_in, dim_in, 3, padding=1),
            nn.SiLU(inplace=True),
            nn.BatchNorm2d(dim_in),
            nn.Conv2d(dim_in, head_dim, 1),
            nn.SiLU(inplace=True)
        )
        self.fc = nn.Linear(head_dim, 4)
        
    def forward(self, state):
        p = self.compr(state)
        p = p.mean(axis=(-1, -2))
        p = self.fc(p)
        return F.log_softmax(p, dim=1)
    
class Critic(nn.Module):
    """Critic Head"""
    
    def __init__(self, dim_in, head_dim):
        super(Critic, self).__init__()
        self.compr = nn.Sequential(
            nn.Conv2d(dim_in, dim_in, 3, padding=1),
            nn.SiLU(inplace=True),
            nn.BatchNorm2d(dim_in),
            nn.Conv2d(dim_in, head_dim, 1),
            nn.SiLU(inplace=True)
        )
        self.fc = nn.utils.weight_norm(nn.Linear(head_dim, 1))
        
    def forward(self, state):
        v = self.compr(state)
        v = v.mean(axis=(-1, -2))
        v = self.fc(v)
        return torch.tanh(v)

class GNet(nn.Module):
    """G-Net"""
    
    def __init__(self):
        super(GNet, self).__init__()
        # init hyperparameters
        dim_in = 64
        head_dim = 32
        depths = (2, 2, 2)
        # init modules
        self.encoder = Encoder(dim_in, depths)
        self.actor = Actor(8 * dim_in, head_dim)
        self.critic1 = Critic(8 * dim_in, head_dim)
        self.critic2 = Critic(8 * dim_in, head_dim)
    
    def forward(self, state):
        latent = self.encoder(state)
        logp = self.actor(latent)
        v1 = self.critic1(latent)
        v2 = self.critic2(latent)
        return logp, (v1, v2)

class RLAgent:
    def __init__(self, net, stochastic=False):
        self.prev_heads = [-1, -1, -1, -1]
        self.net = net
        self.stochastic = stochastic

    def raw_outputs(self, state):
        with torch.no_grad():
            logits, (v1, v2) = self.net(state.unsqueeze(0)) #.cuda()
            logits = logits.squeeze(0)
            v1 = v1.squeeze(0)
            v2 = v2.squeeze(0)
            if self.stochastic:
                # get probabilities
                probs = torch.exp(logits)
                # convert 2 numpy
                probs = probs.cpu().detach().numpy()
                action = np.random.choice(range(4), p=probs) 
            else:
                action = np.argmax(logits.cpu().detach().numpy())
            return action, logits[action], (v1, v2)

    def __call__(self, observation, configuration):
        if observation['step'] == 0:
            self.prev_heads = [-1, -1, -1, -1]
        state = get_features(observation, configuration, self.prev_heads)
        action, _, _ = self.raw_outputs(state)
        self.prev_heads = [goose[0] if len(goose) > 0 else -1 for goose in observation['geese']]
        return ['NORTH', 'EAST', 'SOUTH', 'WEST'][action]


net = GNet()
state_dict = pickle.loads(bz2.decompress(base64.b64decode(weight)))
net.load_state_dict(state_dict)

net.eval()
player = RLAgent(net, stochastic=False)
obses = []
prev_obs = None
prev_heads = [None for _ in range(4)]

def agent(obs_dict, config_dict):
    global prev_obs, prev_heads
    observation = obs_dict
    if not prev_obs:
      prev_obs = observation
    # cache previous state
    for i, g in enumerate(observation['geese']):
        if len(g) > 0:
            prev_heads[i] = prev_obs['geese'][i][0]
    prev_obs = observation

    state = get_features(obs_dict, env.configuration, prev_heads)
    action, logp, v = player.raw_outputs(state)
    return ['NORTH', 'EAST', 'SOUTH', 'WEST'][action]

Appending to submission.py


In [None]:
from kaggle_environments import make
env = make("hungry_geese", debug=True)

env.reset()
env.run(['submission.py', 'submission.py','submission.py','submission.py'])
env.render(mode="ipython", width=500, height=450)

Goose Collision: SOUTH
Goose Collision: NORTH
Goose Collision: SOUTH
