In [1]:
!python3 -m pip install --force-reinstall --no-deps numpy==1.25.2
!python3 -m pip uninstall -y importlib_metadata
!python3 -m pip install importlib_metadata
!pip install git+https://github.com/DLR-RM/stable-baselines3@feat/gymnasium-support
!pip install git+https://github.com/Stable-Baselines-Team/stable-baselines3-contrib@feat/gymnasium-support
!pip install 'shimmy>=0.2.1'

Collecting numpy==1.25.2
  Downloading numpy-1.25.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (18.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m18.2/18.2 MB[0m [31m40.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: numpy
  Attempting uninstall: numpy
    Found existing installation: numpy 1.23.5
    Uninstalling numpy-1.23.5:
      Successfully uninstalled numpy-1.23.5
Successfully installed numpy-1.25.2
Found existing installation: importlib-metadata 6.7.0
Uninstalling importlib-metadata-6.7.0:
  Successfully uninstalled importlib-metadata-6.7.0
Collecting git+https://github.com/DLR-RM/stable-baselines3@feat/gymnasium-support
  Cloning https://github.com/DLR-RM/stable-baselines3 (to revision feat/gymnasium-support) to /tmp/pip-req-build-u7wc5m6v
  Running command git clone --filter=blob:none --quiet https://github.com/DLR-RM/stable-baselines3 /tmp/pip-req-build-u7wc5m6v
[0m  Running command git checkout -q

In [2]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
from random import choice

from kaggle_environments import make, evaluate
from kaggle_environments.envs.halite.helpers import *

import torch as th
from torch import nn

from stable_baselines3 import PPO
from stable_baselines3.common.torch_layers import BaseFeaturesExtractor

import gymnasium as gym
from gymnasium.spaces import MultiDiscrete, Dict, Box

Loading environment lux_ai_s2 failed: No module named 'vec_noise'


## Define Board and Actions

Helper functions to check if actions are valid

In [3]:
def check_actions(board):
    for ship in board.ships.values():
        if not check_ship_action(ship, board.configuration):
            return False
    for yard in board.shipyards.values():
        if not check_yard_action(yard, board.configuration):
            return False
    return True

def check_ship_action(ship, cfg):
    if ship.next_action == ShipAction.NORTH and ship.position[1] == cfg.size-1:
        return False
    if ship.next_action == ShipAction.EAST and ship.position[0] == cfg.size-1:
        return False
    if ship.next_action == ShipAction.SOUTH and ship.position[1] == 0:
        return False
    if ship.next_action == ShipAction.WEST and ship.position[0] == 0:
        return False
    if ship.next_action == ShipAction.CONVERT and ship.halite < cfg.convert_cost:
        return False
    return True

def check_yard_action(yard, cfg):
    if yard.next_action == ShipyardAction.SPAWN and yard.halite < cfg.spawn_cost:
        return False
    return True

Create tensor representation of board. Three layers - one for board/halite and one for each player.

Create function to set actions for all players' units.

In [4]:
def board_to_tensor(board):
    size = board.configuration.size
    feat_arr = np.dstack([[cell.halite, cell.ship, cell.shipyard]
                      for cell in board.cells.values()]).reshape(3,size,size)
    num_arr = np.array([0]*(3*size*size)).reshape((3,size,size))
    num_arr[0,:,:] = feat_arr[0,:,:]
    
    for i in range(size):
        for j in range(size):
            # Add 1 to make 1-based instead of 0-based
            # since 0 represents no ship(yard)
            if feat_arr[1,i,j] != None:
                num_arr[1,i,j] = feat_arr[1,i,j].player_id + 1
            if feat_arr[2,i,j] != None:
                num_arr[2,i,j] = feat_arr[2,i,j].player_id + 1
                
    return num_arr

def fill_actions(act_arr, player):
    idx = 0
    
    for ship in player.ships:
        if act_arr[idx] == 0:
            ship.next_action = ShipAction.NORTH
        elif act_arr[idx] == 1:
            ship.next_action = ShipAction.EAST
        elif act_arr[idx] == 2:
            ship.next_action = ShipAction.SOUTH
        elif act_arr[idx] == 3:
            ship.next_action = ShipAction.WEST
        elif act_arr[idx] == 4:
            ship.next_action = ShipAction.CONVERT
        else:
            ship.next_action = None
        idx += 1
            
    for yard in player.shipyards:
        if act_arr[idx] == 0:
            yard.next_action = ShipyardAction.SPAWN
        else:
            yard.next_action = None
        idx += 1
        
    return player.next_actions
            

## Define Environment

Override gym environment for reinforcement learning for our particular application.

In [5]:
class HaliteGym(gym.Env):
    def __init__(self, agent2="random", nplayers=2):
        ks_env = make("halite", debug=True)
        size = ks_env.configuration.size
        max_halite = ks_env.configuration.maxCellHalite
        self.env = ks_env.train([None, agent2])
        self.board = Board(ks_env.reset(nplayers)[0].observation, ks_env.configuration)
        self.action_space = MultiDiscrete([len(ShipAction)+1])
        self.observation_space = Dict(
            {"halite": Box(low=0, high=max_halite, shape=(size,size), dtype=int),
             "ships": Box(low=0, high=nplayers, shape=(size,size), dtype=int),
             "yards": Box(low=0, high=nplayers, shape=(size,size), dtype=int)})
        # Tuple corresponding to the min and max possible rewards
        self.reward_range = (-10, 1)
        # StableBaselines throws error if these are not defined
        self.size = size
        self.spec = None
        self.metadata = None
    def reset(self, seed=69):
        self.obs = self.env.reset()
        board_arr = board_to_tensor(self.board)
        board_dict = {}
        board_dict['halite'] = board_arr[0]
        board_dict['ships'] = board_arr[1]
        board_dict['yards'] = board_arr[2]
        return board_dict, _
    def change_reward(self, old_reward, done):
        if old_reward == 1: # The agent won the game
            return 1
        elif done: # The opponent won the game
            return -1
        else: # Reward 1/42
            return 1/(self.size*self.size)
    def step(self, action):
        # Check if agent's move is valid
        is_valid = check_actions(self.board)
        if is_valid: # Play the move
            self.obs, old_reward, done, _ = self.env.step(action)
            reward = self.change_reward(old_reward, done)
        else: # End the game and penalize agent
            reward, done, _ = -10, True, {}
        
        board_arr = board_to_tensor(self.board)
        board_dict = {}
        board_dict['halite'] = board_arr[0]
        board_dict['ships'] = board_arr[1]
        board_dict['yards'] = board_arr[2]
        return board_dict, reward, done, False, _

In [6]:
env = HaliteGym()

Set up neural network to be used as predictive engine in determining player moves.

In [7]:
class CustomCombinedExtractor(BaseFeaturesExtractor):
    def __init__(self, observation_space: gym.spaces.Dict, features_dim):
        # We do not know features-dim here before going over all the items,
        # so put something dummy for now. PyTorch requires calling
        # nn.Module.__init__ before adding modules
        super().__init__(observation_space, features_dim=1)

        extractors = {}

        total_concat_size = 0
        # We need to know size of the output of this extractor,
        # so go over all the spaces and compute output feature sizes
        for key, subspace in observation_space.spaces.items():
            # We will just downsample one channel of the image by 4x4 and flatten.
            # Assume the image is single-channel (subspace.shape[0] == 0)
            extractors[key] = nn.Sequential(nn.MaxPool2d(4), nn.Flatten())
            total_concat_size += (subspace.shape[0] // 4) * (subspace.shape[1] // 4)

        self.extractors = nn.ModuleDict(extractors)

        # Update the features dim manually
        self._features_dim = total_concat_size

    def forward(self, observations) -> th.Tensor:
        encoded_tensor_list = []

        # self.extractors contain nn.Modules that do all the processing.
        for key, extractor in self.extractors.items():
            encoded_tensor_list.append(extractor(observations[key]))
        # Return a (B, self._features_dim) PyTorch tensor, where B is batch dimension.
        return th.cat(encoded_tensor_list, dim=1)
    
# Neural network for predicting ship action values
class CustomCNN(BaseFeaturesExtractor):
    
    def __init__(self, observation_space: gym.spaces.Dict, features_dim: int=128):
        super().__init__(observation_space, features_dim)
        # CxHxW images (channels first)
        #n_input_channels = len(observation_space.sample())
        
        extractors = {}
        total_concat_size = 0
        
        for key, subspace in observation_space.spaces.items():
            extractors[key] = nn.Sequential(
                nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=0),
                nn.ReLU(),
                nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=0),
                nn.ReLU(),
                nn.Flatten(),
            )
            # Compute shape by doing one forward pass
            with th.no_grad():
                n_flatten = extractors[key](
                    th.as_tensor(np.array(observation_space.sample()[key][None])).float()
                ).shape[1]
                total_concat_size += n_flatten
            
        self.linear = nn.Sequential(nn.Linear(total_concat_size, features_dim), nn.ReLU())
        self.extractors = nn.ModuleDict(extractors)
        self._features_dim = total_concat_size

    def forward(self, observations: th.Tensor) -> th.Tensor:
        encoded_tensor_list = []

        # self.extractors contain nn.Modules that do all the processing.
        for key, extractor in self.extractors.items():
            encoded_tensor_list.append(extractor(observations[key]))
        print(th.cat(encoded_tensor_list, dim=0).shape)
        # Return a (B, self._features_dim) PyTorch
        return th.cat(encoded_tensor_list, dim=1)

Define multiple input PPO policy.

In [8]:
policy_kwargs = dict(
    features_extractor_class=CustomCombinedExtractor,
    features_extractor_kwargs=dict(features_dim=128),
)
model = PPO("MultiInputPolicy", env, policy_kwargs=policy_kwargs, verbose=1)
model.learn(total_timesteps=60000)

Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 399      |
|    ep_rew_mean     | -0.0975  |
| time/              |          |
|    fps             | 42       |
|    iterations      | 1        |
|    time_elapsed    | 48       |
|    total_timesteps | 2048     |
---------------------------------
-----------------------------------------
| rollout/                |             |
|    ep_len_mean          | 399         |
|    ep_rew_mean          | -0.0975     |
| time/                   |             |
|    fps                  | 41          |
|    iterations           | 2           |
|    time_elapsed         | 98          |
|    total_timesteps      | 4096        |
| train/                  |             |
|    approx_kl            | 0.017727578 |
|    clip_fraction        | 0.173       |
|    clip_range           | 0.2         |
|    entropy_loss   

<stable_baselines3.ppo.ppo.PPO at 0x7d932ddadc90>

Sample board representation

In [9]:
board_arr = board_to_tensor(env.board)
board_dict = {}
board_dict['halite'] = board_arr[0]
board_dict['ships'] = board_arr[1]
board_dict['yards'] = board_arr[2]
actions, _ = model.predict(board_dict)
print(env.board)

| 5 | 0 | 0 | 4 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 4 | 0 | 0 | 5 |
| 1 | 1 | 1 | 0 | 0 | 2 | 0 | 3 | 1 | 0 | 0 | 0 | 1 | 3 | 0 | 2 | 0 | 0 | 1 | 1 | 1 |
| 6 | 0 | 0 | 0 | 0 | 0 | 6 | 0 | 0 | 3 | 0 | 3 | 0 | 0 | 6 | 0 | 0 | 0 | 0 | 0 | 6 |
| 6 | 0 | 0 | 0 | 1 | 0 | 0 | 0 | 0 | 6 | 0 | 6 | 0 | 0 | 0 | 0 | 1 | 0 | 0 | 0 | 6 |
| 0 | 4 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 7 | 0 | 7 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 4 | 0 |
| 0 | 0 | 0 | 0 | 6 | 0 | 0 | 1 | 0 | 2 | 1 | 2 | 0 | 1 | 0 | 0 | 6 | 0 | 0 | 0 | 0 |
| 0 | 0 | 2 | 0 | 0 | 1 | 1 | 0 | 0 | 1 | 0 | 1 | 0 | 0 | 1 | 1 | 0 | 0 | 2 | 0 | 0 |
| 0 | 2 | 0 | 0 | 0 | 4 | 0 | 0 | 1 | 0 | 0 | 0 | 1 | 0 | 0 | 4 | 0 | 0 | 0 | 2 | 0 |
| 0 | 0 | 1 | 0 | 0 | 4 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 4 | 0 | 0 | 1 | 0 | 0 |
| 0 | 0 | 0 | 0 | 0 | 0 | 0 | 1 | 1 | 3 | 0 | 3 | 1 | 1 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
| 0 | 0 | 0 | 0 | 0 |a0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |b0 | 0 | 0 | 0 | 0 | 0 |
| 0 | 0 | 0 | 0 | 0 | 0 | 0 | 1 | 1 | 3 | 0 | 3 | 1 | 

Prospective agents: first uses NN engine with model.predict() and second just uses random actions.

In [10]:
def agent(obs,config):
    
    board = Board(obs,config)
    board_arr = board_to_tensor(board)
    board_dict = {}
    board_dict['halite'] = board_arr[0]
    board_dict['ships'] = board_arr[1]
    board_dict['yards'] = board_arr[2]
    actions, _ = model.predict(board_dict)
    
    me = board.current_player

    return fill_actions(actions, me)

In [11]:
def agent1(obs,config):

    board = Board(obs,config)
    me = board.current_player

    # Set actions for each ship
    for ship in me.ships:
        ship.next_action = choice([ShipAction.NORTH,ShipAction.EAST,ShipAction.SOUTH,ShipAction.WEST,None])

    # Set actions for each shipyard
    for shipyard in me.shipyards:
        shipyard.next_action = None
        
    print(me.next_actions)

    return me.next_actions

In [12]:
me = env.board.current_player

# Set actions for each ship
for ship in me.ships:
    ship.next_action = choice([ShipAction.NORTH,ShipAction.EAST,ShipAction.SOUTH,ShipAction.WEST,ShipAction.CONVERT,None])

# Set actions for each shipyard
for shipyard in me.shipyards:
    shipyard.next_action = choice([ShipyardAction.SPAWN,None])

print(me.next_actions)

board = env.board
board_arr = board_to_tensor(board)
board_dict = {}
board_dict['halite'] = board_arr[0]
board_dict['ships'] = board_arr[1]
board_dict['yards'] = board_arr[2]
actions, _ = model.predict(board_dict)

me = board.current_player

print(fill_actions(actions, me))

{}
{'0-1': 'EAST'}


## Run game

In [13]:
game = make("halite")
game.run([agent, "random"])
game.render(mode="ipython", width=800, height=600)