In [65]:
from luxai_s2.env import LuxAI_S2
import matplotlib.pyplot as plt
import numpy as np
from scipy.ndimage import distance_transform_cdt

ore1_vs_ice1_mult = 1/2
second_factory_mult = 1/3
# commented out below line because territory metric should take care of this when neccessary
#factory_border_buffer = 3
n_factories = 5
N_second_ice_cutoff = 10 #more than N nsquares away from factory center and we don't care about the 2nd ice anymore
second_ice_default_penalty = 1.3
ice2_vs_ice1_mult = 1/(N_second_ice_cutoff * second_ice_default_penalty + 1) #this way if a factory doesnt have a cutoff and receives the default penalty then it still is only a tie breaker since <1

In [3]:
def manhattan_distance(binary_mask):
    # Get the distance map from every pixel to the nearest positive pixel
    distance_map = distance_transform_cdt(binary_mask, metric='taxicab')
    return distance_map

In [4]:
%%time
#this cell calculates first_vs_second taking into account valid spawns, 1st ice, 1st ore, and factory overlaps



env = LuxAI_S2()
obs = env.reset(seed=2)
#obs = env.reset()
#img = env.render("rgb_array", width=480, height=480)
#plt.imshow(img)

ice = obs['player_0']["board"]["ice"]
ore = obs['player_0']["board"]["ore"]
dist_ice = manhattan_distance(1 - ice)
dist_ore = manhattan_distance(1 - ore)

valid_spawns = obs["player_0"]["board"]["valid_spawns_mask"] 
factory_dist_ice, factory_dist_ore = [], []
for x in range(48):
    for y in range(48):
        #if not valid_spawns[x][y] or (x - factory_border_buffer < 0 or y - factory_border_buffer < 0) or (x + factory_border_buffer >= 48 or y + factory_border_buffer >= 48): 
        if not valid_spawns[x][y]:
            factory_dist_ice.append(999)
            factory_dist_ore.append(999)
            continue
        closest_ice = min(
            dist_ice[x-1,y-1],
            dist_ice[x-1,y],
            dist_ice[x-1,y+1],
            dist_ice[x,y-1],
            #dist_ice[x,y],
            dist_ice[x,y+1],
            dist_ice[x+1,y-1],
            dist_ice[x+1,y],
            dist_ice[x+1,y+1]
        )
        factory_dist_ice.append(closest_ice)
        closest_ore = min(
            dist_ore[x-1,y-1],
            dist_ore[x-1,y],
            dist_ore[x-1,y+1],
            dist_ore[x,y-1],
            #dist_ore[x,y],
            dist_ore[x,y+1],
            dist_ore[x+1,y-1],
            dist_ore[x+1,y],
            dist_ore[x+1,y+1]
        )
        factory_dist_ore.append(closest_ore)
    
scores = factory_dist_ice + np.array(factory_dist_ore) * ore1_vs_ice1_mult
#print(48*48, len(scores))
sorted_indexes = np.argsort(scores)
#sorted_indexes[:100]

no_overlap, nol_scores = [], []
count = -1
#while len(no_overlap) < 2*n_factories:
while len(no_overlap) < 4:
    count += 1
    i = sorted_indexes[count]
    loc = np.unravel_index(i, (48, 48))
    score = scores[i]
    if len(no_overlap) == 0:
        no_overlap.append(loc)
        nol_scores.append(score)
        continue
    if any([np.sum(np.abs(np.array(loc)-np.array(factory))) < 6 for factory in no_overlap]):
        scores[i] = 999 #this is only here to show results in the print loop below, once factories start getting placed can mess things up
        continue
    no_overlap.append(loc)
    nol_scores.append(score)
    
for i in sorted_indexes[:count+1]:
    print(np.unravel_index(i, (48, 48)), factory_dist_ice[i], factory_dist_ore[i], scores[i])
    #break
    
print(no_overlap)
print(nol_scores)
print([dist_ice[x][y] for x,y in no_overlap])
print([dist_ore[x][y] for x,y in no_overlap])
first_vs_second = (nol_scores[0] - nol_scores[1]) + second_factory_mult * (nol_scores[2] - nol_scores[3])
first_vs_second

(20, 27) 1 1 1.5
(22, 18) 1 1 1.5
(22, 19) 1 1 999.0
(22, 20) 1 1 999.0
(20, 29) 1 1 999.0
(19, 30) 1 1 999.0
(19, 31) 1 1 999.0
(19, 32) 1 1 1.5
(20, 28) 1 1 999.0
(13, 43) 1 2 2.0
[(20, 27), (22, 18), (19, 32), (13, 43)]
[1.5, 1.5, 1.5, 2.0]
[2, 3, 3, 3]
[3, 3, 3, 4]
CPU times: total: 109 ms
Wall time: 120 ms


-0.16666666666666666

In [5]:
def generate_nsquares(point, n):
    x, y = point
    nsquares = []
    for i in range(-n, n+1):
        for j in range(-n, n+1):
            if abs(i) == n or abs(j) == n:
                nsquares.append((x+i, y+j))
    return nsquares

g = generate_nsquares([0,0], 3)
len(g), g

(24,
 [(-3, -3),
  (-3, -2),
  (-3, -1),
  (-3, 0),
  (-3, 1),
  (-3, 2),
  (-3, 3),
  (-2, -3),
  (-2, 3),
  (-1, -3),
  (-1, 3),
  (0, -3),
  (0, 3),
  (1, -3),
  (1, 3),
  (2, -3),
  (2, 3),
  (3, -3),
  (3, -2),
  (3, -1),
  (3, 0),
  (3, 1),
  (3, 2),
  (3, 3)])

In [6]:
import itertools

def generate_pairings(lst):
    pairings = []
    for i, j in itertools.combinations(lst, 2):
        pairings.append((i,j))
    return pairings
generate_pairings([[0,0],[1,1],[2,2]])

[([0, 0], [1, 1]), ([0, 0], [2, 2]), ([1, 1], [2, 2])]

In [7]:
#10, 12) (12, 9) [9, 15]

np.sum(np.abs(np.array([10,12])-np.array([9,15]))), np.sum(np.abs(np.array([12,9])-np.array([9,15])))

(4, 9)

In [69]:
%%time
#this cell calculates first_vs_second taking into account valid spawns, 1st ice, 1st ore, and factory overlaps
# and also takes into account distance to 2nd ice, only for 2nd ice where manhattan distance > 2 away from 1st ice



env = LuxAI_S2()
obs = env.reset(seed=2)
obs = env.reset()
#img = env.render("rgb_array", width=480, height=480)
#plt.imshow(img)

board_ice = obs['player_0']["board"]["ice"]
board_ore = obs['player_0']["board"]["ore"]
dist_ice = manhattan_distance(1 - board_ice)
dist_ore = manhattan_distance(1 - board_ore)

valid_spawns = obs["player_0"]["board"]["valid_spawns_mask"] 
factory_dist_ice, factory_dist_ore = [], []
for x in range(48):
    for y in range(48):
        #if not valid_spawns[x][y] or (x - factory_border_buffer < 0 or y - factory_border_buffer < 0) or (x + factory_border_buffer >= 48 or y + factory_border_buffer >= 48): 
        if not valid_spawns[x][y]:
            factory_dist_ice.append(999)
            factory_dist_ore.append(999)
            continue
        closest_ice = min([dist_ice[_x, _y] for _x, _y in generate_nsquares([x,y], 1)])
        factory_dist_ice.append(closest_ice)
        
        closest_ore = min([dist_ore[_x, _y] for _x, _y in generate_nsquares([x,y], 1)])
        factory_dist_ore.append(closest_ore)    
    
scores = factory_dist_ice + np.array(factory_dist_ore) * ore1_vs_ice1_mult
#print(48*48, len(scores))
sorted_indexes_1 = np.argsort(scores)
#sorted_indexes[:100]


less = 80 #number of locations we will do computations for 2nd closest ice
factory_dist_ice2 = []
for i in sorted_indexes_1[:less]:
    x,y = np.unravel_index(i, (48, 48))
    dist_ice1 = dist_ice[x][y]
    dist_ice2 = None
    ltoet_n_away = [] # ltoet <=
    for n in range(0, N_second_ice_cutoff): #if second ice is more than N nsquares away it kinda doesnt matter, can just add some constant penalty
        if n < 2: #these are factory tiles
            continue
        for _x,_y in generate_nsquares([x,y], n):
            if (_x < 0 or _y < 0) or (_x >= 48 or _y >= 48): 
                continue
            if board_ice[_x][_y] == 1:
                ltoet_n_away.append((_x,_y))
        if len(ltoet_n_away) >= 2:
            for ice1, ice2 in generate_pairings(ltoet_n_away):
                if np.sum(np.abs(np.array(ice1)-np.array(ice2))) > 2:
                    if np.sum(np.abs(np.array(ice1)-np.array([x,y]))) == dist_ice1: #our first ice must be the closest ice(s)
                        dist_ice2 = np.sum(np.abs( np.array(ice2)-np.array([x,y]) ))
                        assert dist_ice1 <= dist_ice2
    if not dist_ice2:
        dist_ice2 = N_second_ice_cutoff * second_ice_default_penalty
    factory_dist_ice2.append(dist_ice2)
    
for i in sorted_indexes_1[less:]:
    dist_ice2 = N_second_ice_cutoff * second_ice_default_penalty
    factory_dist_ice2.append(dist_ice2)
assert len(factory_dist_ice2) == len(scores)
#print(factory_dist_ice2[:20])


#update scores to reflect distances to ice2
for _ in range(48*48):
    i = sorted_indexes_1[_] #because we indexed factory_dist_ice2 differently so we dont have to compute for all 48*48
    #print(scores[i], factory_dist_ice2[_])
    scores[i] += factory_dist_ice2[_] * ice2_vs_ice1_mult
#sorted_indexes_2 to reflect new scores accounting for ice2
sorted_indexes_2 = np.argsort(scores) 

no_overlap, nol_scores = [], [] 
corresponding_si1_index = [] #corresponding to sorted_indexes_1
count = -1
#while len(no_overlap) < 2*n_factories:
while len(no_overlap) < 4:
    count += 1
    i = sorted_indexes_2[count]
    loc = np.unravel_index(i, (48, 48))
    score = scores[i]
    if len(no_overlap) == 0:
        no_overlap.append(loc)
        nol_scores.append(score)
        flattened_i = loc[0]*48 + loc[1]
        corresponding_si1_index.append(list(sorted_indexes_1).index(flattened_i))
        continue
    if any([np.sum(np.abs(np.array(loc)-np.array(factory))) < 6 for factory in no_overlap]):
        scores[i] = 999 #this is only here to show results in the print loop below, once factories start getting placed can mess things up
        continue
    no_overlap.append(loc)
    nol_scores.append(score)
    flattened_i = loc[0]*48 + loc[1]
    corresponding_si1_index.append(list(sorted_indexes_1).index(flattened_i))
    
for i in sorted_indexes_2[:count+1]:
    #print(np.unravel_index(i, (48, 48)), factory_dist_ice[i], factory_dist_ore[i], scores[i])
    break
    
print(no_overlap)
print(nol_scores)
print(corresponding_si1_index) #IMPORTANT: these are rarely over 50, so we can pretty comfortably set the 'less' var above to 80
print('ice and ore dist:', [dist_ice[x][y] for x,y in no_overlap], [dist_ore[x][y] for x,y in no_overlap])
first_vs_second = (nol_scores[0] - nol_scores[1]) + second_factory_mult * (nol_scores[2] - nol_scores[3])
first_vs_second

[(22, 19), (26, 31), (24, 25), (20, 15)]
[2.0, 2.142857142857143, 2.357142857142857, 2.4285714285714284]
[3, 6, 2, 4]
ice and ore dist: [3, 3, 3, 2] [2, 3, 3, 3]
CPU times: total: 188 ms
Wall time: 234 ms


-0.16666666666666652

In [None]:
#sorted_indexes corresponds to flattened indexes 0 -- 2304
#if we have loc (19, 30) this corresponds to the Xth value in flatten location which corresponds to some ith value of sorted_ind

for x,y in no_overlap:
    flattened_i = x*48 + y
    print(list(sorted_indexes).index(flattened_i))

In [None]:
#metric for placing factory: 'intended territory'

#pseudocode: proceeding through argsorted ice-ore distances, a factory location has N intended territory defined as size
    # of contested tiles w rubble <= 35

#rubble=0 tile at distance n from factory center loc has value (46+46)/(n-1) --- (since n >= 2 for non factory tiles)
#disregard all eight factory tiles (i+-1/0, j+-1/0),
#


In [None]:
# judge new positions by change in total board state if we were to build there

In [None]:
from tqdm import tqdm

In [None]:
for _ in range(len(valid_spawns)):
    print(_)
    print(valid_spawns[_])

In [None]:
rubble = obs['player_0']['board']['rubble']
rubble.shape
conv_size = 8
y = rubble.reshape(conv_size,conv_size, int(rubble.shape[0]/conv_size), int(rubble.shape[1]/conv_size))
y

In [None]:
rubble_all = []
rubble_means = []
mountain_count, cave_count = 0,0
for n in np.arange(300,800):
    obs = env.reset(seed=n)
    m = np.mean(obs['player_0']['board']['rubble'])
    if m < 35:
        mountain_count += 1
    else:
        cave_count += 1
    rubble_means.append(m)
plt.hist(rubble_means, bins=50)
plt.show()
mountain_count, cave_count

In [None]:
def animate(imgs, _return=True):
    # using cv2 to generate videos as moviepy doesn't work on kaggle notebooks
    import cv2
    import os
    import string
    import random
    video_name = ''.join(random.choice(string.ascii_letters) for i in range(18))+'.webm'
    height, width, layers = imgs[0].shape
    fourcc = cv2.VideoWriter_fourcc(*'VP90')
    video = cv2.VideoWriter(video_name, fourcc, 10, (width,height))

    for img in imgs:
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        video.write(img)
    video.release()
    if _return:
        from IPython.display import Video
        return Video(video_name)
def interact(env, agents, steps):
    # reset our env
    obs = env.reset(seed=41)
    np.random.seed(0)
    imgs = []
    step = 0
    # Note that as the environment has two phases, we also keep track a value called 
    # `real_env_steps` in the environment state. The first phase ends once `real_env_steps` is 0 and used below

    # iterate until phase 1 ends
    while env.state.real_env_steps < 0:
        if step >= steps: break
        actions = {}
        for player in env.agents:
            o = obs[player]
            a = agents[player].early_setup(step, o)
            actions[player] = a
        step += 1
        obs, rewards, dones, infos = env.step(actions)
        imgs += [env.render("rgb_array", width=400, height=400)]
    done = False
    while not done:
        if step >= steps: break
        actions = {}
        for player in env.agents:
            o = obs[player]
            a = agents[player].act(step, o)
            actions[player] = a
        step += 1
        obs, rewards, dones, infos = env.step(actions)
        imgs += [env.render("rgb_array", width=400, height=400)]
        done = dones["player_0"] and dones["player_1"]
    return animate(imgs)

In [None]:
# recreate our agents and run
agents = {player: Agent(player, env.state.env_cfg) for player in env.agents}
interact(env, agents, 25)

Some factories are surviving for for more than 100 steps thanks to the delivery of additional ice, but more work will need to be done to keep them alive longer.

Puting all those pieces together the full starter agent looks like this (and we will save it to agent.py)

In [None]:
#%%writefile test_agent6.py
from lux.kit import obs_to_game_state, GameState, EnvConfig
from lux.utils import direction_to, my_turn_to_place_factory
import numpy as np
import sys
class TestAgent_0():
    def __init__(self, player: str, env_cfg: EnvConfig) -> None:
        self.player = player
        self.opp_player = "player_1" if self.player == "player_0" else "player_0"
        np.random.seed(0)
        self.env_cfg: EnvConfig = env_cfg

    def early_setup(self, step: int, obs, remainingOverageTime: int = 60):
        #print('asdfasdf')
        if step == 0:
            # bid 0 to not waste resources bidding and declare as the default faction
            return dict(faction="AlphaStrike", bid=0)
        else:
            game_state = obs_to_game_state(step, self.env_cfg, obs)
            # factory placement period

            # how much water and metal you have in your starting pool to give to new factories
            water_left = game_state.teams[self.player].water
            metal_left = game_state.teams[self.player].metal

            # how many factories you have left to place
            factories_to_place = game_state.teams[self.player].factories_to_place
            # whether it is your turn to place a factory
            my_turn_to_place = my_turn_to_place_factory(game_state.teams[self.player].place_first, step)
            if factories_to_place > 0 and my_turn_to_place:
                # we will spawn our factory in a random location with 150 metal and water if it is our turn to place
                potential_spawns = np.array(list(zip(*np.where(obs["board"]["valid_spawns_mask"] == 1))))
                spawn_loc = potential_spawns[np.random.randint(0, len(potential_spawns))]
                return dict(spawn=spawn_loc, metal=150, water=150)
            return dict()

    def act(self, step: int, obs, remainingOverageTime: int = 60):
        #print('asdf')
        actions = dict()
        game_state = obs_to_game_state(step, self.env_cfg, obs)
        factories = game_state.factories[self.player]
        
        #print('pla')
        #display(game_state.factories[self.player])
        #print('opp')
        #display(game_state.factories[self.opp_player])
        
        game_state.teams[self.player].place_first
        factory_tiles, factory_units = [], []
        for unit_id, factory in factories.items():
            if factory.power >= self.env_cfg.ROBOTS["HEAVY"].POWER_COST and \
            factory.cargo.metal >= self.env_cfg.ROBOTS["HEAVY"].METAL_COST:
                actions[unit_id] = factory.build_heavy()
            if self.env_cfg.max_episode_length - game_state.real_env_steps < 50:
                if factory.water_cost(game_state) <= factory.cargo.water:
                    actions[unit_id] = factory.water()
            factory_tiles += [factory.pos]
            factory_units += [factory]
        factory_tiles = np.array(factory_tiles)

        units = game_state.units[self.player]
        ice_map = game_state.board.ice
        ice_tile_locations = np.argwhere(ice_map == 1)
        for unit_id, unit in units.items():

            # track the closest factory
            closest_factory = None
            adjacent_to_factory = False
            if len(factory_tiles) > 0:
                factory_distances = np.mean((factory_tiles - unit.pos) ** 2, 1)
                closest_factory_tile = factory_tiles[np.argmin(factory_distances)]
                closest_factory = factory_units[np.argmin(factory_distances)]
                adjacent_to_factory = np.mean((closest_factory_tile - unit.pos) ** 2) == 0

                # previous ice mining code
                if unit.cargo.ice < 40:
                    ice_tile_distances = np.mean((ice_tile_locations - unit.pos) ** 2, 1)
                    closest_ice_tile = ice_tile_locations[np.argmin(ice_tile_distances)]
                    if np.all(closest_ice_tile == unit.pos):
                        if unit.power >= unit.dig_cost(game_state) + unit.action_queue_cost(game_state):
                            actions[unit_id] = [unit.dig(repeat=0)]
                    else:
                        direction = direction_to(unit.pos, closest_ice_tile)
                        move_cost = unit.move_cost(game_state, direction)
                        if move_cost is not None and unit.power >= move_cost + unit.action_queue_cost(game_state):
                            actions[unit_id] = [unit.move(direction, repeat=0)]
                # else if we have enough ice, we go back to the factory and dump it.
                elif unit.cargo.ice >= 40:
                    direction = direction_to(unit.pos, closest_factory_tile)
                    if adjacent_to_factory:
                        if unit.power >= unit.action_queue_cost(game_state):
                            actions[unit_id] = [unit.transfer(direction, 0, unit.cargo.ice, repeat=0)]
                    else:
                        move_cost = unit.move_cost(game_state, direction)
                        if move_cost is not None and unit.power >= move_cost + unit.action_queue_cost(game_state):
                            actions[unit_id] = [unit.move(direction, repeat=0)]
        return actions

In [None]:
env.agents

In [None]:
#ctrlf

#from test_agent6 import TestAgent


agents = {
    env.agents[0]: TestAgent_0(env.agents[0], env.state.env_cfg),
    env.agents[1]: TestAgent_1(env.agents[1], env.state.env_cfg),
}
interact(env, agents, 10)

## Create a submission
Now we need to create a .tar.gz file with main.py (and agent.py) at the top level. We can then upload this!

In [None]:
!tar -czf submission.tar.gz *

## Submit
Now open the /kaggle/working folder and find submission.tar.gz, download that file, navigate to the "MySubmissions" tab in https://www.kaggle.com/c/lux-ai-season-2/ and upload your submission! It should play a validation match against itself and once it succeeds it will be automatically matched against other players' submissions. Newer submissions will be prioritized for games over older ones. Your team is limited in the number of succesful submissions per day so we highly recommend testing your bot locally before submitting.

## CLI Tool

To test your agent without using the python API you can also run

In [None]:
!luxai-s2 main.py main.py -v 2 -s 101 -o replay.json

which uses a random seed and generates a replay.html file that you can click and watch. Optionally if you specify `-o replay.json` you can upload replay.json to http://s2vis.lux-ai.org/.

The CLI tool enables you to easily run episodes between any two agents (python or not) and provides a flexible tournament running tool to evaluate many agents together. Documentation on this tool can be found here: https://github.com/Lux-AI-Challenge/Lux-Design-S2/tree/main/luxai_runner/README.md

In [None]:
import IPython
IPython.display.HTML(filename='replay.html')

And they're off! The heavy robots have started to move towards the ice tiles and some have begun mining.

#### Delivering Resources and Keep Factories Alive
We now have ice being mined, but we now need to deliver that back to the factories so they can refine that ice into water and sustain themselves.

In [None]:
def act(self, step: int, obs, remainingOverageTime: int = 60):
    actions = dict()
    game_state = obs_to_game_state(step, self.env_cfg, obs)
    factories = game_state.factories[self.player]
    factory_tiles, factory_units = [], []
    for unit_id, factory in factories.items():
        if factory.power >= self.env_cfg.ROBOTS["HEAVY"].POWER_COST and \
        factory.cargo.metal >= self.env_cfg.ROBOTS["HEAVY"].METAL_COST:
            actions[unit_id] = factory.build_heavy()
        factory_tiles += [factory.pos]
        factory_units += [factory]
    factory_tiles = np.array(factory_tiles)

    units = game_state.units[self.player]
    ice_map = game_state.board.ice
    ice_tile_locations = np.argwhere(ice_map == 1)
    for unit_id, unit in units.items():
        
        # track the closest factory
        closest_factory = None
        adjacent_to_factory = False
        if len(factory_tiles) > 0:
            factory_distances = np.mean((factory_tiles - unit.pos) ** 2, 1)
            closest_factory_tile = factory_tiles[np.argmin(factory_distances)]
            closest_factory = factory_units[np.argmin(factory_distances)]
            adjacent_to_factory = np.mean((closest_factory_tile - unit.pos) ** 2) == 0
        
            # previous ice mining code
            if unit.cargo.ice < 40:
                ice_tile_distances = np.mean((ice_tile_locations - unit.pos) ** 2, 1)
                closest_ice_tile = ice_tile_locations[np.argmin(ice_tile_distances)]
                if np.all(closest_ice_tile == unit.pos):
                    if unit.power >= unit.dig_cost(game_state) + unit.action_queue_cost(game_state):
                        actions[unit_id] = [unit.dig(repeat=0)]
                else:
                    direction = direction_to(unit.pos, closest_ice_tile)
                    move_cost = unit.move_cost(game_state, direction)
                    if move_cost is not None and unit.power >= move_cost + unit.action_queue_cost(game_state):
                        actions[unit_id] = [unit.move(direction, repeat=0)]
            # else if we have enough ice, we go back to the factory and dump it.
            elif unit.cargo.ice >= 40:
                direction = direction_to(unit.pos, closest_factory_tile)
                if adjacent_to_factory:
                    if unit.power >= unit.action_queue_cost(game_state):
                        actions[unit_id] = [unit.transfer(direction, 0, unit.cargo.ice, repeat=0)]
                else:
                    move_cost = unit.move_cost(game_state, direction)
                    if move_cost is not None and unit.power >= move_cost + unit.action_queue_cost(game_state):
                        actions[unit_id] = [unit.move(direction, repeat=0)]
    return actions
Agent.act = act

In [None]:
# recreate our agents and run
agents = {player: Agent(player, env.state.env_cfg) for player in env.agents}
interact(env, agents, 300)

In [None]:
img = env.render("rgb_array", width=640, height=640)
plt.imshow(img)