

Referenced　notebooks:
https://www.kaggle.com/stonet2000/lux-ai-season-1-jupyter-notebook-tutorial
https://www.kaggle.com/stefanschulmeister87/all-properties-and-methods-on-one-page



In [1]:
!rm -rf log
!mkdir log

In [2]:
!node --version

v15.14.0


We will also need Kaggle Environments

In [3]:
!pip install kaggle-environments -U

Collecting kaggle-environments
  Downloading kaggle_environments-1.14.0-py2.py3-none-any.whl (1.3 MB)
[K     |████████████████████████████████| 1.3 MB 12.2 MB/s 
[?25hCollecting PettingZoo==1.12.0
  Downloading PettingZoo-1.12.0.tar.gz (756 kB)
[K     |████████████████████████████████| 756 kB 62.6 MB/s 
Collecting stable-baselines3==1.7.0
  Downloading stable_baselines3-1.7.0-py3-none-any.whl (171 kB)
[K     |████████████████████████████████| 171 kB 75.1 MB/s 
[?25hCollecting pickle5==0.0.12
  Downloading pickle5-0.0.12-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.whl (256 kB)
[K     |████████████████████████████████| 256 kB 62.8 MB/s 
Collecting vec-noise>=1.1.4
  Downloading vec_noise-1.1.4.zip (134 kB)
[K     |████████████████████████████████| 134 kB 87.5 MB/s 
Collecting gym==0.21.0
  Downloading gym-0.21.0.tar.gz (1.5 MB)
[K     |████████████████████████████████| 1.5 MB 52.8 MB/s 
Collecting importlib_metadata>=4.8.1
  Downloading importlib_metadata

Next, we have to import the `make` function from the `kaggle_environments` package

In [4]:
from kaggle_environments import make

Loading environment lux_ai_s2 failed: No module named 'numpy.typing'
Loading environment llm_20_questions failed: /opt/conda/lib/python3.7/site-packages/nvidia/cublas/lib/libcublas.so.11: symbol cublasLtHSHMatmulAlgoInit version libcublasLt.so.11 not defined in file libcublasLt.so.11 with link time reference


## Building from Scratch

The following bit of code is all you need for a empty agent that does nothing

In [5]:
# run this if using kaggle notebooks
!cp -r ../input/lux-ai-2021/* .

We have something that survives! We are now ready to submit something to the leaderboard. The code below compiles all we have built so far into one file that you can then submit to the competition leaderboard

In [6]:
%%writefile game_info.py
import numpy as np
from sklearn.cluster import KMeans
from lux.game_map import Position
from lux.constants import Constants
import math

class GameInfo:
    # get clusters by resrouce
    def __init__(self, game_state, observation):
        self.clusters = {}
        player, opponent = self.get_players(game_state, observation)
        self.player = player
        X, self.resource_array = GameInfo.find_resource_map(game_state, player)
        max_clusters = game_state.map_width//3
        min_length = game_state.map_width//5
        (self.kmeans, self.n_clusters) = GameInfo.make_kmeans(max_clusters, X, min_length)
        if self.kmeans is None:
            return
        for label in range(self.n_clusters):
            self.add_label_info(label)
        self.update(game_state, observation)
        
    def add_label_info(self, label):
        self.clusters[label] = {}
        self.clusters[label]['label'] = label
        center = self.kmeans.cluster_centers_[label]
        self.clusters[label]['center'] = Position(round(center[0]), round(center[1]))

    def get_players(self, game_state, observation):
        return (game_state.players[observation.player], game_state.players[(observation.player + 1) % 2])

    def update(self, game_state, observation):
        player, opponent = self.get_players(game_state, observation)
        for label in range(self.n_clusters):
            self.update_resource(label)
        self.add_units('units', player.units)
        self.add_units('opponent_units', opponent.units)
        self.add_citytiles('player_cities', player.cities)
        self.add_citytiles('opponent_cities', opponent.cities)
        
    def get_cluster_label_unit(self, unit, player):
        if (player == self.player):
            key = 'units'
        else:
            key = 'opponent_units'
        for label in range(self.n_clusters):
            if (unit.id in self.clusters[label][key]):
                return label
        return None

    def find_closest_cluster(self, unit, unit_number=None):
        closest_dist = math.inf
        closest_cluster = None
        for label, cluster in self.clusters.items():
            dist = cluster["center"].distance_to(unit.pos)
            if dist < closest_dist and (unit_number is None or len(cluster['units']) <= unit_number):
                closest_dist = dist
                closest_cluster = cluster
        return closest_cluster

    @staticmethod
    def make_kmeans(max_clusters, X, min_distance):
        max_clusters = min(max_clusters,X.size)
        for n_clusters in reversed(range(1, max_clusters)):
            try:
                kmeans = KMeans(n_clusters=n_clusters, random_state=0).fit(X)
                positions = [Position(center[0], center[1]) for center in kmeans.cluster_centers_]
                is_near = False
                for p1 in positions:
                    for p2 in positions:
                        if p1 != p2 and p1.distance_to(p2) < min_distance:
                            is_near = True
                if not is_near:
                    return (kmeans, n_clusters)
            except:
                continue
        return (None, 1)

    def add_units(self, key, units):
        X = np.array([[unit.pos.x, unit.pos.y] for unit in units])
        if X.size > 0:
            labels = self.kmeans.predict(X)
            for label in range(self.n_clusters):
                self.clusters[label][key] = {units[index].id: units[index] for index in range(len(labels)) if
                                             labels[index] == label}
        else:
            for label in range(self.n_clusters):
                self.clusters[label][key] = {}

    def add_citytiles(self, key, cities):
        citytiles = [citytile for city in cities.values() for citytile in city.citytiles]
        X = np.array([[citytile.pos.x, citytile.pos.y] for citytile in citytiles])
        if X.size > 0:
            labels = self.kmeans.predict(X)
            for label in range(self.n_clusters):
                self.clusters[label][key] = {
                    citytiles[index].cityid + '_{}_{}'.format(citytiles[index].pos.x, citytiles[index].pos.y):
                        citytiles[index] for index in range(len(labels)) if
                    labels[index] == label}
        else:
            for label in range(self.n_clusters):
                self.clusters[label][key] = {}

    @staticmethod
    def find_resource_map(game_state, player):
        width, height = game_state.map_width, game_state.map_height
        resource_x_y_array = []
        resource_array = []
        for y in range(height):
            for x in range(width):
                cell = game_state.map.get_cell(x, y)
                if cell.has_resource():
                    if cell.resource.type == Constants.RESOURCE_TYPES.COAL and not player.researched_coal(): continue
                    if cell.resource.type == Constants.RESOURCE_TYPES.URANIUM and not player.researched_uranium(): continue
                    resource_x_y_array.append([x, y])
                    resource_array.append(cell)
        return np.array(resource_x_y_array), resource_array


    def update_resource(self, label):
        self.clusters[label]['resources'] = {}
        self.clusters[label]['amount'] = 0
        for index in range(len(self.kmeans.labels_)):
            if self.kmeans.labels_[index] == label and self.resource_array[index].has_resource():
                self.clusters[label]['resources'][
                    str(self.resource_array[index].pos.x) + '_' + str(self.resource_array[index].pos.y)] = \
                    self.resource_array[
                        index]
                self.clusters[label]['amount'] += self.resource_array[index].resource.amount

Writing game_info.py


In [7]:
%%writefile mission.py
from lux.constants import Constants
from lux.game_map import Cell, RESOURCE_TYPES, Position
from lux import annotate
import math


class Mission:
    def __init__(self, mission_func, append_values={}):
        self.mission_func = mission_func
        self.append_values = append_values

    @staticmethod
    def get_max_action(dic_mission, dic_weight, unit, **kwargs):
        dic_weight = sorted(dic_weight.items(), key=lambda x: x[1])
        actions = []
        choice = []
        for action_name, weight in dic_weight:
            action_mission = dic_mission[action_name]
            kwargs.update(action_mission.append_values)
            actions,action,target_position = action_mission.get_action(action_name, unit=unit, **kwargs)
            choice.append(action_name+":"+action)
            if "none" not in action:
                return actions,action,action_name,target_position,choice
        return actions,None,None,None,choice
    
    def get_action(self, action_name, unit, **kwargs):
        (action, target_position) = self.mission_func(unit=unit, **kwargs)
        actions = []
        if "none" not in action:
            actions.append(annotate.sidetext(action_name + ":" + action))
            if target_position is not None and unit.pos != target_position :
                actions.append(annotate.circle(target_position.x, target_position.y))
                actions.append(annotate.line(unit.pos.x, unit.pos.y, target_position.x, target_position.y))
            if "skip" not in action:
                actions.append(action)
        return actions,action,target_position

class Empty():
    def __init__(self, pos):
        self.pos = pos    
    
class Helper:
    @staticmethod
    def str_pos(pos: Position):
        return str(pos.x) + '-' + str(pos.y)

    @staticmethod
    def str_pos_xy(x, y):
        return str(x) + '-' + str(y)

    @staticmethod
    def find_resources(game_state):
        resource_tiles = {}
        width, height = game_state.map_width, game_state.map_height
        for y in range(height):
            for x in range(width):
                cell = game_state.map.get_cell(x, y)
                if cell.has_resource():
                    resource_tiles[Helper.str_pos_xy(x, y)] = cell
        return resource_tiles
    
    @staticmethod    
    def find_unit_tiles(player, check_can_not_act=False, is_worker=None):
        tiles = {}
        for unit in player.units:
            if check_can_not_act and unit.can_act():
                continue
            if is_worker is not None and is_worker == unit.is_worker():
                continue            
            tiles[Helper.str_pos(unit.pos)] = unit
        return tiles
    
    @staticmethod    
    def find_city_tiles(player):
        tiles = {}
        for k, city in player.cities.items():
            for city_tile in city.citytiles:
                city_tile.city = city
                tiles[Helper.str_pos(city_tile.pos)] = city_tile
        return tiles
    
    @staticmethod    
    def find_empty_tiles(game_state, player, used_positions):
        empty_tiles = {}
        width, height = game_state.map_width, game_state.map_height
        for y in range(height):
            for x in range(width):
                pos_str = Helper.str_pos_xy(x, y)
                cell = game_state.map.get_cell(x, y)
                empty = True
                if cell.has_resource():
                    empty = False
                if pos_str in Helper.find_city_tiles(player):
                    empty = False
                if pos_str in used_positions:
                    empty = False
                if empty:
                    empty_tiles[pos_str] = Empty(Position(x, y))
        return empty_tiles
    
    @staticmethod    
    def find_closest_city_tile(pos, city_tiles, fuel=None, light_upkeep=None):
        closest_city_tile = None
        closest_dist = math.inf
        for str_pos, city_tile in city_tiles.items():
            dist = city_tile.pos.distance_to(pos)
            if (dist < closest_dist):
                if fuel is not None and fuel < city_tile.city.fuel:
                    continue;
                if light_upkeep is not None and light_upkeep < city_tile.city.light_upkeep:
                    continue;
                closest_dist = dist
                closest_city_tile = city_tile
        return closest_city_tile
    
    @staticmethod    
    def find_closest_tile(pos, tiles):
        closest_dist = math.inf
        closest_tile = None
        for key, tile in tiles.items():
            dist = tile.pos.distance_to(pos)
            if dist < closest_dist:
                closest_dist = dist
                closest_tile = tile
        return closest_tile
    
    @staticmethod    
    def find_closest_resources(pos, player, resource_tiles):
        closest_dist = math.inf
        closest_resource_tile = None
        for key, resource_tile in resource_tiles.items():
            dist = resource_tile.pos.distance_to(pos)
            if resource_tile.resource.type == Constants.RESOURCE_TYPES.COAL and not player.researched_coal(): continue
            if resource_tile.resource.type == Constants.RESOURCE_TYPES.URANIUM and not player.researched_uranium(): continue
            if resource_tile.resource.amount > 0 and dist < closest_dist:
                closest_dist = dist
                closest_resource_tile = resource_tile
        return closest_resource_tile
    
    @staticmethod    
    def calc_move_pos(unit, player, pos, used_positions):
        unit_tiles = Helper.find_unit_tiles(player)
        check_dirs = [
            Constants.DIRECTIONS.NORTH,
            Constants.DIRECTIONS.EAST,
            Constants.DIRECTIONS.SOUTH,
            Constants.DIRECTIONS.WEST,
        ]
        adjacent_pos = [unit.pos.translate(direction,1) for direction in check_dirs]
        min_distance = pos.distance_to(unit.pos)-1
        root_pos = [ adjacent_pos for adjacent_pos in adjacent_pos if adjacent_pos.distance_to(pos) == min_distance]
        not_used_pos = [pos for pos in root_pos if Helper.str_pos(pos) not in used_positions]
        best_pos = [pos for pos in not_used_pos if Helper.str_pos(pos) not in unit_tiles]
        if len(best_pos) > 0:
            return best_pos[0]
        if len(not_used_pos) > 0:
            return not_used_pos[0]
        return None

class Action:
    @staticmethod
    def build_city(unit, player,used_positions,game_state, **kwargs):
        if unit.can_build(game_state.map):
            used_positions[Helper.str_pos(unit.pos)] = unit
            return unit.build_city(), unit.pos
        if (unit.cargo.wood + unit.cargo.coal + unit.cargo.uranium) >= 100:
            return Action.go_to_empty(unit, player, used_positions=used_positions,**kwargs)
        return "none_not_enough_cargo", None
    
    @staticmethod  
    def go_to_city(unit, player, city_tiles, fuel=None, light_upkeep=None, cargo_have=None, **kwargs):
        if (cargo_have is not None
                and cargo_have >= 100 - unit.get_cargo_space_left()):
            return "none_not_enough_cargo", None
        closest_city_tile = Helper.find_closest_city_tile(unit.pos, city_tiles, fuel, light_upkeep)
        if closest_city_tile is not None:
            del city_tiles[Helper.str_pos(closest_city_tile.pos)]
            return  Action.go_to_pos(unit, player, closest_city_tile.pos,**kwargs)
        return "none_not_found", None
  
    @staticmethod  
    def go_to_pos(unit, player, pos, used_positions, **kwargs):
        if pos != unit.pos:
            go_pos = Helper.calc_move_pos(unit, player, pos, used_positions )
            if go_pos is None:
                action = "skip_collided"
            else:
                direction = unit.pos.direction_to(go_pos)
                action = unit.move(direction)
                pos_str = Helper.str_pos(go_pos)
                used_positions[pos_str] = unit
        else:
            action = "skip_here"
    
        if "skip" in action:
            used_positions[Helper.str_pos(unit.pos)] = unit
        return action, pos
    
    @staticmethod  
    def go_to_empty(unit, player, empty_tiles, **kwargs):
        empty_tile = Helper.find_closest_tile(unit.pos, empty_tiles)
        if empty_tile is None:
            return "none_not_found", None
        if unit.pos == empty_tile.pos:
            return "skip_here", None
        del empty_tiles[Helper.str_pos(empty_tile.pos)]
        return  Action.go_to_pos(unit, player, empty_tile.pos, **kwargs)
  
    @staticmethod  
    def go_to_closest_cluster(unit, player, game_info, is_empty=False, **kwargs):
        cluster = game_info.find_closest_cluster(unit, is_empty)
        if cluster is None:
            return "none_not_found", None
        return  Action.go_to_pos(unit, player, cluster["center"], **kwargs)
        
    
    @staticmethod
    def do_pillage(unit, player, opponent_unit_tiles, **kwargs):
        unit_tile = Helper.find_closest_tile(unit.pos, opponent_unit_tiles)
        if unit_tile is None:
            return "none not found", None
        if unit_tile.pos.distance_to(unit.pos) > 1:
            return Action.go_to_pos(unit, player, unit_tile.pos, **kwargs)
        return unit.pillage(), None
  
    @staticmethod  
    def go_to_closest_resource(unit, player, resource_tiles, **kwargs):
        resource_tile = Helper.find_closest_resources(unit.pos, player, resource_tiles)
        return  Action.go_to_resource(unit, player, resource_tile, resource_tiles, **kwargs)
    
    @staticmethod
    def go_to_resource(unit, player, resource_tile, tiles, **kwargs):
        if unit.get_cargo_space_left() > 5:
            if resource_tile is not None:
                del tiles[Helper.str_pos(resource_tile.pos)]
                return  Action.go_to_pos(unit, player, resource_tile.pos, **kwargs)
        return "none_not_found", None
    
    @staticmethod
    def go_to_pos_resource(unit, player, pos, resource_tiles, **kwargs):
        resource_tile = Helper.find_closest_resources(pos, player, resource_tiles)
        return  Action.go_to_resource(unit, player, resource_tile, resource_tiles, **kwargs)


Writing mission.py


In [8]:
%%writefile game_logger.py
import pandas as pd
class GameLogger:
    def __init__(self,game_id):
        self.game_id = game_id
        self.turn_logs = {}

    def append_obj(self, turn, uniq_id, obj):
        attr = vars(obj)
        for key, value in attr.items():
            self.append(turn, uniq_id, key, value)
    
    def append(self, turn, uniq_id, key, value):
        dic_key = "{}-{}".format(uniq_id,key)
        if turn not in self.turn_logs:
            self.turn_logs[turn] = {}
        if dic_key not in self.turn_logs[turn]:
            self.turn_logs[turn][dic_key] = {}
        self.turn_logs[turn][dic_key] = value
    
    def get_pandas(self):
        df = pd.DataFrame.from_dict(
            {turn: pd.Series(v) for turn, v in self.turn_logs.items()}, orient='index')
        df = df.reindex(sorted(df.columns), axis=1)
        return df
      
    
    def save(self,team):
        df = self.get_pandas()
        df.to_csv('log/{}-{}.csv'.format(self.game_id,team))
        

Writing game_logger.py


In [9]:
%%writefile agent.py
from lux.game import Game
from game_info import * 
from mission import *
from game_logger import *
import pandas as pd

game_state = None
game_info = None
game_logger = GameLogger('test')
DEBUG = False

def agent(observation, configuration):
    global game_state
    global game_info
    ### Do not edit ###
    if observation["step"] == 0:
        game_state = Game()
        game_state._initialize(observation["updates"])
        game_state._update(observation["updates"][2:])
        game_state.id = observation.player
    else:
        game_state._update(observation["updates"])
    
    if observation["step"] % 20 == 0:
        game_info = GameInfo(game_state, observation)
    elif len(game_info.clusters) > 0:
        game_info.update(game_state, observation)
        
    if observation["step"] % 50 == 0:
        if DEBUG: game_logger.save(observation.player)
   
    actions = []

    ### AI Code goes down here! ###
    player, opponent = game_info.get_players(game_state, observation)
    info = {
        'game_state': game_state,
        'player': player,
        'game_info': game_info,
        'used_positions': {},
        'resource_tiles': Helper.find_resources(game_state),
        'city_tiles': Helper.find_city_tiles(player),
        'unit_tiles': Helper.find_unit_tiles(player,False),
        'no_act_unit_tiles': Helper.find_unit_tiles(player,True),
        'opponent_city_tiles': Helper.find_city_tiles(opponent),
        'opponent_unit_tiles': Helper.find_unit_tiles(opponent),
        'used_positions': {}
    }
    info['used_positions'].update(info['opponent_city_tiles'])
    info['used_positions'].update(info['opponent_unit_tiles'])
    info['used_positions'].update(info['no_act_unit_tiles'])
    info['empty_tiles'] = Helper.find_empty_tiles(game_state, player, info['used_positions'])
    
    # Logging
    game_logger.append_obj(observation["step"],'observation',observation)
    game_logger.append_obj(observation["step"],'player',player)
    game_logger.append(observation["step"], 'player', 'researched_coal' , player.researched_coal() )
    game_logger.append(observation["step"], 'player', 'researched_uranium' , player.researched_uranium() )
    game_logger.append(observation["step"], 'tiles', 'city_tiles' , [ city.cityid for city in info['city_tiles'].values()]  )
    game_logger.append(observation["step"], 'tiles', 'opponent_city_tiles' , [ city.cityid for city in info['opponent_city_tiles'].values()] )
    game_logger.append(observation["step"], 'tiles', 'resource_tiles' , [ str_pos for str_pos in info['resource_tiles']]  )    
    game_logger.append(observation["step"], 'tiles', 'unit_tiles' , [ unit.id for unit in info['unit_tiles'].values()]  )
    game_logger.append(observation["step"], 'units', 'all_'+str(player.team) , [unit.id for unit in player.units] )
    game_logger.append(observation["step"], 'units', 'act_'+str(player.team) , [unit.id for unit in player.units if unit.can_act()] )
    for city in player.cities.values():
        game_logger.append_obj(observation["step"], 'city_'+str(city.cityid) , city)
        for city_tile in city.citytiles:
            game_logger.append_obj(observation["step"], 'citytile_'+str(city_tile.cityid) , city_tile)
    for city in opponent.cities.values():
        game_logger.append_obj(observation["step"], 'city_'+str(city.cityid) , city)
        for city_tile in city.citytiles:
            game_logger.append_obj(observation["step"], 'citytile_'+str(city_tile.cityid) , city_tile)
    width, height = game_state.map_width, game_state.map_height
    for y in range(height):
        for x in range(width):
            cell = game_state.map.get_cell(x, y)
            game_logger.append_obj(observation["step"],'cell_'+Helper.str_pos(cell.pos),cell)
    for unit in player.units:
        game_logger.append_obj(observation["step"], unit.id, unit)
        game_logger.append(observation["step"], unit.id, 'is_worker', unit.is_worker())
    for unit in opponent.units:
        game_logger.append_obj(observation["step"], unit.id, unit)
        game_logger.append(observation["step"], unit.id, 'is_worker', unit.is_worker())
    
    info['dic_mission'] = {
        'build_city': Mission(Action.build_city),
        'go_to_closest_resource': Mission(Action.go_to_closest_resource),
        'go_to_warn_city': Mission(Action.go_to_city,
                                   {'light_upkeep': 5, 'cargo_have': 10}),
        'go_to_city': Mission(Action.go_to_city),
        'go_to_enemy_city': Mission(Action.go_to_city, {'player': opponent,'city_tiles': info['opponent_city_tiles']}),
        'do_pillage': Mission(Action.do_pillage, {'opponent': opponent,'city_tiles': info['opponent_city_tiles']}),
        'go_to_closest_empty_cluster': Mission(Action.go_to_closest_cluster,
                                               {'unit_count': 0}),
        'go_to_closest_few_unit_cluster': Mission(Action.go_to_closest_cluster,
                                               {'unit_count': 3}),
        'go_to_closest_cluster': Mission(Action.go_to_closest_cluster),
    }
    unit_mission = {
            'good_citizen': {
                'build_city': 1,
                'go_to_warn_city': 2,
                'go_to_closest_resource': 3,
                'go_to_city': 4
            },
            'adventure':  {
                'go_to_closest_few_unit_cluster': 0,              
                'build_city': 1,
                'go_to_warn_city': 2,
                'go_to_closest_resource': 3,
                'go_to_city': 4
            },
            'adventure_resource':  {
                'go_to_closest_few_unit_cluster': 0,
                'go_to_warn_city': 2,
                'go_to_closest_resource': 3,
                'go_to_city': 4
            },
            'thieves': {
                'build_city': 1,
                'do_pillage': 2,
                'go_to_enemy_city': 3
            },
    }

    unit_count = len(player.units)

    for city in player.cities.values():
        for city_tile in city.citytiles:
            action = 'skip'
            if city_tile.can_act():
                if unit_count < player.city_tile_count:
                    action = city_tile.build_worker()
                    actions.append(action)
                    unit_count += 1
                elif not player.researched_uranium():
                    action = city_tile.research()
                    actions.append(action)
            game_logger.append(observation["step"], 'citytile_'+str(city_tile.cityid) , 'action' , action)
        
            
    def get_unit_mission(game_info,cluster,unit):
        unit_count = len(cluster["units"])
        if cluster["active_cluster_unit"] <= 2:
            return 'good_citizen'
        if cluster["active_cluster_unit"] <= max(5,unit_count//2):
            return 'adventure'
        if cluster["active_cluster_unit"] <= max(10,unit_count-unit_count//10):
            return 'adventure_resource'
        return 'thieves'
    
    cluster_units = {}
    for label, cluster in info["game_info"].clusters.items():
        cluster["active_cluster_unit"] = 0
        for unit_id, unit in cluster['units'].items():
            cluster_units[Helper.str_pos(unit.pos)] = unit
            if not unit.can_act():
                continue;
            cluster["active_cluster_unit"] += 1
            mission = get_unit_mission(info["game_info"],cluster,unit)
            info['dic_weight'] = unit_mission[mission]
            unit_actions,action,action_name,target_position,choice = Mission.get_max_action(unit=unit, **info)
            actions +=unit_actions
            game_logger.append(observation["step"], unit.id, "mission", mission)
            game_logger.append(observation["step"], unit.id, "action", action)
            game_logger.append(observation["step"], unit.id, "action_name", action_name)
            game_logger.append(observation["step"], unit.id, "target_position", target_position)
            game_logger.append(observation["step"], unit.id, "choice", choice)
            game_logger.append(observation["step"], unit.id, "cluster", label)

    game_logger.append(observation["step"], 'cluster_units', "all", cluster_units.keys())
    for pos_str in [pos_str_unit for pos_str_unit in info['unit_tiles'] if pos_str_unit not in cluster_units.keys()]:
        # bug ? not is cluster user
        info['dic_weight'] = unit_mission['good_citizen']
        unit_actions,action,action_name,target_position,choice = Mission.get_max_action(unit=info['unit_tiles'][pos_str], **info)
        actions += unit_actions
        game_logger.append(observation["step"], unit.id, "mission", 'no_cluster_good_citizen')
        game_logger.append(observation["step"], unit.id, "action", action)
        game_logger.append(observation["step"], unit.id, "action_name", action_name)
        game_logger.append(observation["step"], unit.id, "target_position", target_position)
        game_logger.append(observation["step"], unit.id, "choice", choice)
        game_logger.append(observation["step"], unit.id, "cluster", None)
        
    game_logger.append(observation["step"], "actions", "all", actions)
    return actions


Overwriting agent.py


In [10]:
from kaggle_environments import make

env = make("lux_ai_2021", configuration={"width": 12, "height": 12, "loglevel": 2, "annotations": True}, debug=True)
steps = env.run(['agent.py', 'agent.py'])
env.render(mode="ipython", width=800, height=800)

import json
replay = env.toJSON()
with open("replay.json", "w") as f:
    json.dump(replay, f)

[33m[WARN][39m (match_tK3hpxZHZdqG) - turn 35; Unit u_1 collided when trying to move s to (4, 3)
[33m[WARN][39m (match_tK3hpxZHZdqG) - turn 36; Unit u_1 collided when trying to move s to (4, 3)
[33m[WARN][39m (match_tK3hpxZHZdqG) - turn 37; Unit u_1 collided when trying to move s to (4, 3)
[33m[WARN][39m (match_tK3hpxZHZdqG) - turn 38; Unit u_1 collided when trying to move s to (4, 3)
[33m[WARN][39m (match_tK3hpxZHZdqG) - turn 39; Unit u_1 collided when trying to move s to (4, 3)
[33m[WARN][39m (match_tK3hpxZHZdqG) - turn 40; Unit u_1 collided when trying to move s to (4, 3)
[33m[WARN][39m (match_tK3hpxZHZdqG) - turn 43; Unit u_1 collided when trying to move n to (4, 2)
[33m[WARN][39m (match_tK3hpxZHZdqG) - turn 50; Unit u_4 collided when trying to move s to (4, 4)
[33m[WARN][39m (match_tK3hpxZHZdqG) - turn 50; Unit u_3 collided when trying to move n to (4, 4)
[33m[WARN][39m (match_tK3hpxZHZdqG) - turn 53; Unit u_1 collided when trying to move n to (5, 4)
[33m[WARN

## Create a submission
Now we need to create a .tar.gz file with main.py (and agent.py) at the top level. We can then upload this!

In [11]:
!tar -czf submission.tar.gz *