Here is the implementation for my current top bot. So far it has only had middling success so I thought I would share and see if anyone else might be able to improve on it. My idea was to make rule-based workers that could switch between three different basic behavior patterns:
1. Build city: Gather wood from the nearest source and build a city tile on the closest open cell.
2. Gather resources: Gather the most potent fuel currently researched from the nearest available source and bring it to the closest city tile.
3. Rest: Move to the nearest city tile and stay there.

The strategy could then be adjusted throughout the match by an RL agent who would alter the proportions of workers assigned each behavior. I implemented a Q-learning agent that takes in a state vector with nine inputs: 

> \[(# of worker units) (# of cart units) (# of city tiles) (# of opponent workers) (# of opponent carts) (# of opponent city tiles) (Research points) (Opponent research points) (Turn #)\]

and return one of 55 action codes corresponding to a set of worker behavior proportions. My hope was that the neural net would eventually learn strategic patterns like building quickly once night ended, focusing more on resource gathering as night approached, and returning to cities to conserve fuel once night fell but no such patterns ever seemed to emerge.

To handle mining, I generated Mine objects for each resource cluster at the beginning of each game storing the locations of tiles and resource type for each cluster. When workers were ready to gather resources, they would be assigned a particular resource cell in a mine. 

In [1]:
!node --version
!pip install kaggle-environments -U
!pip install tensorflow

v16.6.1


In [2]:
!cp -r ../input/lux-ai-2021/* .
from kaggle_environments import make

Loading environment football failed: No module named 'gfootball'


Each worker unit is wrapped in a WorkerAgent object which holds things like their current objective and destination. The workers use a rudimentary sort of collision aversion technique. Workers earlier in the action assignment queue have movement priority over later workers, meaning that if a higher priority worker plans to move into a spot currently occupied or by a lower priority one or a lower priority worker's next step would block one of a higher priority worker, that worker is moved or the planned step is changed.

In [3]:
%%writefile WorkerAgent.py
from enum import Enum
from lux.game_map import RESOURCE_TYPES, Position
import sys
import math

class WorkerObjective(Enum):
    GatherFuel = "gather fuel"
    BuildCity = "build city"
    Rest = "rest"

# Wrapper class for worker unit objects
class WorkerAgent:
    def __init__(self, worker_obj, debug):
        self.debug = debug
        self.worker = worker_obj
        self.objective = WorkerObjective.BuildCity
        self.objective_changed = False
        self.mine = None
        self.destination = None
        
    # Reset worker object with latest version each turn
    def update(self, worker_obj):
        self.worker = worker_obj
    
    def _find_open_mine(self, resource_type, mines):
        mine = mines.place_in_mine(self.worker, resource_type)
        if mine is not None:
            self.mine = mine
        
    def _get_best_fuel(self, player):
        resource_type = RESOURCE_TYPES.WOOD
        if player.researched_coal():
            resource_type = RESOURCE_TYPES.COAL
        if player.researched_uranium():
            resource_type = RESOURCE_TYPES.URANIUM
            
        return resource_type
    
    def _at_mining_spot(self):
        mining_spot = self.get_mining_spot()
        if mining_spot is None:
            return False
        
        return self.worker.pos == mining_spot
    
    def _update_mining(self, controller):
        if self._at_mining_spot():
            cell = controller.map.get_cell_by_pos(self.worker.pos)
            if not cell.has_resource():
                self.mine.report_resource_depleted(self.worker.pos, self.worker)
                self.mine = None
         
        if self.objective == WorkerObjective.GatherFuel:
            best_fuel = self._get_best_fuel(controller.player)
            if self.mine is not None and self.mine.resource_type != best_fuel:
                self.mine.release_worker(self.worker)
                self._find_open_mine(best_fuel, controller.mines)
                
    def _handle_objective_change(self):
        if self.objective_changed:
            if self.mine != None:
                self.mine.release_worker(self.worker)
                self.mine = None
            self.destination = None
            self.objective_changed = False
            if self.debug:
                print("Worker", self.worker.id, "assigned new objective", file=sys.stderr)
                
    def _handle_mine_assignment(self, controller):
        if self.destination is None and self.mine is None and self.objective != WorkerObjective.Rest:
            resource_type = RESOURCE_TYPES.WOOD
            if self.objective == WorkerObjective.GatherFuel:
                resource_type = self._get_best_fuel(controller.player)
            self._find_open_mine(resource_type, controller.mines)
            if self.debug:
                if self.mine is not None:
                    print("Worker", self.worker.id, "assigned to mining spot", self.get_mining_spot(), file=sys.stderr)
                else:
                    print("Unable to place worker", self.worker.id, "in mine", file=sys.stderr)
                
    def _handle_destination_arrival(self):
        if self.destination is not None and self.worker.pos == self.destination:
            self.destination = None  
            if self.debug:
                print("Worker", self.worker.id, "arrived at their destination", file=sys.stderr)
        
    def _handle_destination_assignment(self, controller):
        if self.destination is not None:
            return
        
        if self.objective == WorkerObjective.Rest and not self.on_city_tile(controller.map):
            closest_city_tile = controller.cities.get_nearest_city_tile(self.worker.pos)
            if closest_city_tile is not None:
                self.destination = closest_city_tile.pos
            if self.debug:
                print("Worker", self.worker.id, "destination set to city tile", (self.destination.x, self.destination.y), file=sys.stderr)
            return
            
        if self.worker.get_cargo_space_left() == 0: 
            if self.mine is not None:
                self.mine.release_worker(self.worker)
                self.mine = None

            if self.debug:
                print("Worker", self.worker.id, "is at max cargo", file=sys.stderr)
            if self.objective == WorkerObjective.GatherFuel:
                nearest_city_tile = controller.cities.get_nearest_city_tile(self.worker.pos)
                if nearest_city_tile is not None:
                    self.destination = nearest_city_tile.pos
            elif self.objective == WorkerObjective.BuildCity:
                # nearest_periph = controller.cities.get_nearest_periph_pos(self.worker.pos, controller.map)
                # if nearest_periph is not None:
                #     self.destination = nearest_periph
                # else:
                self.destination = self.find_nearest_empty_tile(self.worker.pos, controller.map)
            if self.debug:
                print("Worker", self.worker.id, "destination changed to", self.destination, file=sys.stderr)
            return
                   
        if not self._at_mining_spot():
            self.destination = self.get_mining_spot()
            if self.debug:
                print("Worker", self.worker.id, "returning to minining spot", self.get_mining_spot(), file=sys.stderr)
            return

    # Converts directions to degrees
    def _to_degrees(self, direction):
        directions = ["w", "s", "e", "n"]
        return 90 * directions.index(direction)

    # Converts degrees to directions
    def _to_dir(self, degrees):
        directions = ["w", "s", "e", "n"]
        return directions[int((degrees % 360) / 90)]
        

    # Returns the direction 90 degrees * times clockwise of direction
    def _rotate_dir(self, direction, times):
        if direction == "c":
            return "c"
        return self._to_dir(self._to_degrees(direction) + 90 * times)
                
        
    def get_mining_spot(self):
        if self.mine == None:
            return None
        
        tile = self.mine.get_assigned_spot(self.worker)
        if tile is not None:
            return Position(tile[0], tile[1])
        return None
        
    def set_objective(self, objective):
        if self.objective == objective:
            return
        self.objective = objective
        self.objective_changed = True
        if self.debug:
            print("Worker", self.worker.id, "has new objective", self.objective, file=sys.stderr)
        
    def get_step_direction(self, game_map, steps, avoid_city=False):
        direction = self.worker.pos.direction_to(self.destination)
        step = step = self.worker.pos.translate(direction, 1)

        if direction == "c" and (step.x, step.y) in steps:              # If the worker plans to stay put but is blocking the step of another worker
            for i in range(4):
                new_dir = self._rotate_dir("w", i)
                step = self.worker.pos.translate(new_dir, 1)
                if (step.x, step.y) not in steps:
                    return new_dir
        
        if avoid_city or (step.x, step.y) in steps:                     # Get best detour
            cell = game_map.get_cell(step.x, step.y)
            if cell.citytile is None and (step.x, step.y) not in steps:
                return direction
            
            shortest_dist = float("inf")
            best_dir = None
            for i in range(4):
                new_dir = self._rotate_dir(direction, i)
                step = self.worker.pos.translate(new_dir, 1)
                if step.x < 0 or step.x >= game_map.width or step.y < 0 or step.y >= game_map.height or (step.x, step.y) in steps:
                    continue
                cell = game_map.get_cell(step.x, step.y)
                dist = step.distance_to(self.destination) 
                if cell.citytile is None and dist < shortest_dist:
                    best_dir = new_dir
                    
            if best_dir is not None:
                return best_dir
              
        return direction
            
    
    def on_city_tile(self, game_map):
        tile = game_map.get_cell_by_pos(self.worker.pos)
        return tile.citytile is None
    
    def find_nearest_empty_tile(self, loc, game_map):
        if self.tile_is_empty(loc, game_map):
            return loc
        
        searched = set()
        q = [loc]
        
        while len(q) > 0:
            p = q.pop(0)
            searched.add((p.x, p.y))
            
            if self.tile_is_empty(p, game_map):
                return p
            
            for direction in [(1, 0), (0, 1), (-1, 0), (0, -1)]:
                neighbor = Position(p.x + direction[0], p.y + direction[1])
                if neighbor.x >= 0 and neighbor.x < game_map.width and neighbor.y >= 0 and neighbor.y < game_map.height and (neighbor.x, neighbor.y) not in searched:
                    q.append(neighbor)
            
            
    def tile_is_empty(self, pos, game_map):
        cell = game_map.get_cell(pos.x, pos.y)
        return cell.citytile is None and not cell.has_resource()
    
    def get_action(self, controller, steps):
        self._update_mining(controller)
        
        if not self.worker.can_act():
            return None, (self.worker.pos.x, self.worker.pos.y)
        
        self._handle_objective_change()
        self._handle_mine_assignment(controller)
        self._handle_destination_arrival()
        
        if self.destination is None and self.objective == WorkerObjective.BuildCity and self.worker.can_build(controller.map):
            if self.debug:
                print("Worker", self.worker.id, "building city tile at", self.worker.pos)
            return self.worker.build_city(), (self.worker.pos.x, self.worker.pos.y)
        
        self._handle_destination_assignment(controller)
                
        if self.destination is not None:
            avoid_city = self.objective == WorkerObjective.BuildCity and self.worker.get_cargo_space_left() == 0
            step_dir = self.get_step_direction(controller.map, steps, avoid_city)
            if self.debug:
                print("Worker", self.worker.id, "step direction:", step_dir, file=sys.stderr)
            # step_dir = self.worker.pos.direction_to(self.destination)
            step = self.worker.pos.translate(step_dir, 1)
            return self.worker.move(step_dir), (step.x, step.y)
        
        return None, (self.worker.pos.x, self.worker.pos.y)
    
class Workers:
    def __init__(self, worker_list, debug):
        self.debug = debug
        self.workers = {}                              # Maps worker ids to WorkerAgent objs
        self.task_proportions = [0.5, 0.5, 0.0]
        
        for worker in worker_list:
            self.workers[worker.id] = WorkerAgent(worker, self.debug)
            
        if self.debug:
            print("Workers object initialized")

        self._reassign_objectives()
            
    def _reassign_objectives(self):
        num_city_builders = math.ceil(self.task_proportions[0] * len(self.workers))
        num_fuel_gatherers = math.ceil(self.task_proportions[1] * len(self.workers))
        worker_ids = self.workers.keys()
        
        for i, worker_id in enumerate(worker_ids):
            if i < num_city_builders:
                self.workers[worker_id].set_objective(WorkerObjective.BuildCity)
                continue
            if i < num_city_builders + num_fuel_gatherers:
                self.workers[worker_id].set_objective(WorkerObjective.GatherFuel)
                continue
            self.workers[worker_id].set_objective(WorkerObjective.Rest)
        
    def update(self, worker_list):
        if self.debug:
            print("Updating workers object")
           
        # Remove workers that were lost last turn
        lost_workers = set(self.workers.keys()).difference(set([worker.id for worker in worker_list]))
        for lost_worker in lost_workers:
            self.workers.pop(lost_worker)
            
        for worker in worker_list:
            if worker.id in self.workers:
                self.workers[worker.id].update(worker)
                continue
                
            self.workers[worker.id] = WorkerAgent(worker, self.debug)
            self._reassign_objectives()
            if self.debug:
                print("Worker added", file=sys.stderr)
                
    def update_task_proportions(self, proportions):
        self.task_proportions = proportions
        self._reassign_objectives()
            
            
    def get_actions(self, controller):
        actions = []
        steps = set()
        
        for worker in self.workers.values():
            action, step = worker.get_action(controller, steps)
            steps.add(step)
            if action is not None:
                actions.append(action)
                
        return actions


Writing WorkerAgent.py


The Cities and CityWrapper classes hold information about city tiles and cities. They are mainly used to provide navigation methods and generate city tile actions.

In [4]:
%%writefile CityWrapper.py
from lux.game_map import Position
import sys

class CityWrapper:
    def __init__(self, city_obj, debug):
        self.city = city_obj
        self.debug = debug
    
    def get_nearest_periph_pos(self, loc, game_map):
        if self.debug:
            print("Searching for city build location", file=sys.stderr)
        # Return periphery tile obj closest to loc (Only works if loc is not inside city)
        
        # Sort tiles in city according to distance from loc
        sorted_tiles = sorted(self.city.citytiles, key=lambda tile: tile.pos.distance_to(loc))

        for tile in sorted_tiles:
            for direction in [(1, 0), (0, 1), (-1, 0), (0, -1)]:
                neighbor = Position(tile.pos.x + direction[0], tile.pos.y + direction[1])
                if neighbor.x >= 0 and neighbor.x < game_map.width and neighbor.y >= 0 and neighbor.y < game_map.height:
                    cell = game_map.get_cell(neighbor.x, neighbor.y)
                    if cell.citytile == None and not cell.has_resource():
                        return neighbor
                    
        return None
    
    def get_nearest_city_tile(self, loc):
        # Return city tile closest to loc
        shortest_dist = float("inf")
        closest = None
        for tile in self.city.citytiles:
            dist = tile.pos.distance_to(loc)
            if dist < shortest_dist:
                shortest_dist = dist
                closest = tile
        return closest
    
    def get_actions(self, controller, workers_needed):
        actions = []
        workers_built = 0
        for tile in self.city.citytiles:
            if tile.can_act():
                if workers_needed - workers_built > 0:
                    actions.append(tile.build_worker())
                    workers_built += 1
                    if self.debug:
                        print("City tile", tile.pos, "creating new worker.", file=sys.stderr)
                    continue
                actions.append(tile.research())
        return actions, workers_built
    
class CitiesWrapper:
    def __init__(self, cities_list, debug):
        self.cities = [CityWrapper(city, debug) for city in cities_list]
        self.debug = debug
        
    def update(self, cities_list):
        self.cities = [CityWrapper(city, self.debug) for city in cities_list]
    
    def get_nearest_city(self, loc):
        # Return CityWrapper obj closest to loc
        shortest_dist = float("inf")
        closest = None
        
        for city in self.cities:
            dist = city.get_nearest_city_tile(loc).pos.distance_to(loc)
            if dist < shortest_dist:
                shortest_dist = dist
                closest = city
                
        return closest
    
    def get_nearest_city_tile(self, loc):
        # Return CityTile obj closest to loc
        shortest_dist = float("inf")
        closest = None
        
        for city in self.cities:
            tile = city.get_nearest_city_tile(loc)
            dist = tile.pos.distance_to(loc)
            if dist < shortest_dist:
                shortest_dist = dist
                closest = tile
                
        return closest
    
    def get_nearest_periph_pos(self, loc, game_map):
        sorted_cities = sorted(self.cities, key=lambda city: city.get_nearest_city_tile(loc).pos.distance_to(loc))
        
        for city in sorted_cities:
            periph = city.get_nearest_periph_pos(loc, game_map)
            if periph is not None:
                return periph
            
        return None
    
    def get_actions(self, controller):
        actions = []
        workers_needed = max(controller.state.num_city_tiles - controller.state.num_workers, 0)
        
        for city in self.cities:
            city_actions, workers_built = city.get_actions(controller, workers_needed)
            actions += city_actions
            workers_needed -= workers_built
            
        return actions


Writing CityWrapper.py


The Mine class handles mining operations. Mine objects hold the positions of resource tiles and handle the assignment of worker units to said tiles.

In [5]:
%%writefile Mine.py
from lux.game_map import Position
import sys

class Mine:
    def __init__(self, game_state, resource_tile_set, resource_type, debug):
        self.resource_type = resource_type
        self.resource_tiles = resource_tile_set
        self.assigned_workers = {}                                      # Maps worker IDs to assigned worker_tile
        #self.available_resources = 0
        #self.cart_loc = self.get_cart_loc()
        #self.available_work_tiles = len(self.worker_tiles)              # Number of available worker tiles
        self.debug = debug
    
    def _find_cart_loc(self):
        # Find and return the best location to park the cart
        pass
    
    def _get_open_worker_tile(self, worker_pos):
        available_tiles = list(filter(lambda tile: tile not in self.assigned_workers.values(), self.resource_tiles))
        available_tiles = sorted(available_tiles, key=lambda tile: Position(tile[0], tile[1]).distance_to(worker_pos))
        return available_tiles[0]
    
    def get_resource_tiles(self):
        return self.resource_tiles
    
    def worker_assigned(self, worker_id):                               # Checks if a given worker is assigned to mine
        return worker_id in self.assigned_workers
    
    def get_dist(self, loc):                                            # Returns the shortest distance between loc and all spots in mine
        shortest_dist = float("inf")
        
        for tile in self.resource_tiles:
            tile_pos = Position(tile[0], tile[1])
            dist = tile_pos.distance_to(loc)
            if dist < shortest_dist:
                shortest_dist = dist
                
        return shortest_dist
    
    def has_opening(self):                                              # Checks if there are any available spots in mine
        return len(self.resource_tiles) > len(self.assigned_workers)
    
    def assign_worker(self, worker):
        self.assigned_workers[worker.id] = self._get_open_worker_tile(worker.pos)
        
    def release_worker(self, worker):
        if worker.id in self.assigned_workers:
            self.assigned_workers.pop(worker.id)
        
    def get_assigned_spot(self, worker):
        if worker.id in self.assigned_workers:
            return self.assigned_workers[worker.id]
        return None
    
    def report_resource_depleted(self, pos, assigned_worker):
        self.resource_tiles.remove((pos.x, pos.y))
        self.release_worker(assigned_worker)

    
class Mines:
    def __init__(self, game_state, debug):
        self.mines = []
        self.debug = debug
        
        self._build_mines(game_state)
        
    def _is_valid_tile(self, game_state, x, y, w, h, resource_type, searched):
        if x < 0 or x >= w or y < 0 or y >= h or (x, y) in searched:
            return False
        
        tile = game_state.map.get_cell(x, y)
        if not tile.has_resource() or tile.resource.type != resource_type:
            return False
        
        return True
        
    def _get_resource_cluster(self, game_state, x, y, w, h, resource_type, cluster_tiles=set(), searched=set()):
        # Given x, y of a starting tile, search game map to find tiles of resource cluster
        searched.add((x, y))
        tile = game_state.map.get_cell(x, y)
        
        if not tile.has_resource():                             # Add tile to border set and make no recursive calls
            return cluster_tiles, searched
        
        cluster_tiles.add((x, y))
        
        for direction in [(1, 0), (0, 1), (-1, 0), (0, -1)]:  # Call function recursively on surrounding tiles
            new_x, new_y = x + direction[0], y + direction[1]
            if self._is_valid_tile(game_state, new_x, new_y, w, h, resource_type, searched):
                new_cluster_tiles, new_searched = self._get_resource_cluster(game_state, new_x, new_y, w, h, resource_type, cluster_tiles, searched)
                cluster_tiles = cluster_tiles.union(new_cluster_tiles)
                searched = searched.union(new_searched)
            
        return cluster_tiles, searched
        
    def _build_mines(self, game_state): 
        # Iterate over map to find clusters of resource tiles
        w, h = game_state.map.width, game_state.map.height
        searched = set()
        clusters = []
        resource_types = []
        
        for x in range(w):
            for y in range(h):
                if (x, y) in searched:
                    continue
                tile = game_state.map.get_cell(x, y)
                if tile.has_resource():
                    resource_types.append(tile.resource.type)
                    cluster, new_searched = self._get_resource_cluster(game_state, x, y, w, h, tile.resource.type, set(), set())
                    searched = searched.union(new_searched)
                    clusters.append(cluster)
        
        # ToDo: Merge mines of same resource type that share borders
        
        # Build Mine objs from clusters and borders
        for cluster, resource_type in zip(clusters, resource_types):
            self.mines.append(Mine(game_state, cluster, resource_type, self.debug))
        
        if self.debug:
            print("Clusters:", clusters, file=sys.stderr)
                    
    def update(self, game_state):
        # Check mines to see if they need updated
        for mine in self.mines:
            needs_update = mine.update_mine(game_state)
            
            if needs_update:
                # Find viable cell from mine to seed cluster search
                mine_tiles = mine.get_resource_tiles()
                new_cluster = None
                new_border = None
                
                for tile in mine_tiles:
                    cell = game_state.map.get_cell(tile[0], tile[1])
                
                    if cell.has_resource():
                        new_cluster, new_border, searched = self._get_resource_cluster(game_state, tile[0], tile[1], gamestate.map.width, gamestate.map.height, cell.resource.type, set(), set())
                        break
                
                self.mines.remove(mine)
                
                if new_cluster is not None:
                    self.mines.append(Mine(game_state, new_cluster, new_border, self.debug))
    
    def get_closest_mine(self, loc, resource_type):
        closest_mine = None
        shortest_dist = float("inf")
        
        for mine in self.mines:
            if mine.resource_type != resource_type:
                continue
            for tile in mine.resource_tiles:
                tile_pos = Position(tile[0], tile[1])
                dist = tile_pos.distance_to(loc)
                
                if dist < shortest_dist:
                    shortest_dist = dist
                    closest_mine = mine
                    
        return closest_mine
    
    def place_in_mine(self, worker, resource_type):
        sorted_mines = sorted([mine for mine in self.mines if mine.resource_type == resource_type], key=lambda mine: mine.get_dist(worker.pos))
        
        for mine in sorted_mines:
            if mine.has_opening():
                mine.assign_worker(worker)
                return mine 
            
        return None


Writing Mine.py


The controller object contains all workers, mines, and cities and provides an interface for the RL agent to modify the current strategy.

In [6]:
%%writefile Controller.py
from Mine import Mines
from WorkerAgent import Workers
from CityWrapper import CitiesWrapper
import numpy as np
import sys

class State:
    def __init__(self, game_state, player, opponent):
        self._update_state(game_state, player, opponent)
        
    def _update_state(self, game_state, player, opponent):
        self.num_workers = sum([1 if unit.is_worker() else 0 for unit in player.units])
        self.num_carts = sum([1 if unit.is_cart() else 0 for unit in player.units])
        self.num_city_tiles = player.city_tile_count
        self.num_opponent_workers = sum([1 if unit.is_worker() else 0 for unit in opponent.units])
        self.num_opponent_carts = sum([1 if unit.is_cart() else 0 for unit in opponent.units])
        self.num_opponent_city_tiles = opponent.city_tile_count
        self.research_points = player.research_points
        self.opponent_research_points = opponent.research_points
        self.turn = game_state.turn
        
    def get_state_vector(self):
        return np.array([
            self.num_workers,
            self.num_carts,
            self.num_city_tiles,
            self.num_opponent_workers,
            self.num_opponent_carts,
            self.num_opponent_city_tiles,
            self.research_points,
            self.opponent_research_points,
            self.turn
        ]).reshape(1, -1)
        

class Controller:
    def __init__(self, game_state, player, opponent, debug):
        self.debug = debug
        self.game_state = game_state
        self.map = game_state.map
        self.state = State(game_state, player, opponent)
        self.player = player
        self.opponent = opponent
        self.mines = Mines(game_state, debug)
        self.workers = Workers([unit for unit in player.units if unit.is_worker()], debug)
#         self.carts = []
        self.cities = CitiesWrapper(self.player.cities.values(), debug)
        
    def update(self, game_state, player, opponent):
        self.game_state = game_state
        self.state._update_state(game_state, player, opponent)
        self.map = game_state.map
        self.player = player
        self.opponent = opponent
        #self.mines.update(game_state)
        self.cities.update(self.player.cities.values())
        self.workers.update([unit for unit in player.units if unit.is_worker()])
        
    def get_state_vector(self):
        return self.state.get_state_vector()
    
    def apply_agent_action(self, action):
        self.workers.update_task_proportions(action)    
        
    def get_actions(self):
        if self.debug:
            print("Turn", self.game_state.turn, file=sys.stderr)
        worker_actions = self.workers.get_actions(self)
        city_actions = self.cities.get_actions(self)
        return worker_actions + city_actions


Writing Controller.py


The RLAgent class defines a Q-learning agent. The agent can either take in a settings and model object or, if none are provided, loads them from files.

In [7]:
%%writefile RLAgent.py
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import tensorflow as tf
import numpy as np
import random
import pickle

from tensorflow import keras

class RLAgent:
    def __init__(self, settings=None, model=None):
        if settings is None:
            with open("agent_settings", "rb") as settings_file:
                settings = pickle.load(settings_file)
        else:
            with open("agent_settings", "wb") as settings_file:
                pickle.dump(settings, settings_file)
                
        if model is not None:
            model.save("agent_model")
        self.action_space = self._get_action_space()
        self.replays = []
        self.state_size = 9
        self.action_size = 55
        self.gamma = settings["gamma"]
        self.epsilon = settings["epsilon"]
        self.training_mode_active = settings["training_mode_active"]
        self.reward_weights = settings["reward_weights"]
        self.num_explore_turns = settings["num_explore_turns"]
        self.explore_timer = 0
        self.exploring = False
        self.explore_action = None
        self.q_net = keras.models.load_model("agent_model")
        self.target_net = self.q_net
        
    def _get_action_space(self):
        action_space = []
        for i in range(10):
            for j in range(10 - i):
                a, b = i / 10.0, j / 10.0
                c = (10 - i - j) / 10.0
                action_space.append([a, b, c])
                    
        return action_space
    
    def get_action(self, state):
        if self.exploring:
            self.explore_timer -= 1
            if self.explore_timer == 0:
                self.exploring = False
            return self.explore_action
        
        if self.training_mode_active or np.random.rand() < self.epsilon:
            self.exploring = True
            self.explore_timer = self.num_explore_turns
            self.explore_action = random.randrange(self.action_size)
            return self.explore_action

        q_vals = self.q_net.predict(state)
        return np.argmax(q_vals[0])
    
    def lookup_action(self, code):
        return self.action_space[code]
    
    def train(self): 
        for state, action, reward, next_state, last_turn in self.replays:
            target = self.q_net.predict(state)
            
            if last_turn:
                target[0][action] = reward
            else:
                t = self.target_net.predict(next_state)
                target[0][action] = reward + self.gamma * np.amax(t[0])
            
            self.q_net.fit(state, target, epochs=1, verbose=0)
        self.q_net.save("agent_model")
    
    def add_replay(self, replay):
        self.replays.append(replay)


Writing RLAgent.py


In [8]:
#%%writefile agent.py
from lux.game import Game
from lux.game_map import Cell, RESOURCE_TYPES, Position
from lux.constants import Constants
from lux.game_constants import GAME_CONSTANTS
from lux import annotate
from Controller import Controller
from RLAgent import *
import math
import sys
import pickle

def calculate_reward(s, s_prime, reward_weights):
        reward_vec = (s_prime[0] - s[0]) * np.array(reward_weights)
        return np.sum(reward_vec)

# we declare this global game_state object so that state persists across turns so we do not need to reinitialize it all the time
game_state = None
controller = None
state = None
action_code = None
rl_agent = None
def agent(observation, configuration):
    global game_state
    global controller
    global state
    global action_code
    global rl_agent

    ### Do not edit ###
    if observation["step"] == 0:
        game_state = Game()
        game_state._initialize(observation["updates"])
        game_state._update(observation["updates"][2:])
        game_state.id = observation.player
        
        player = game_state.players[observation.player]
        opponent = game_state.players[(observation.player + 1) % 2]
        controller = Controller(game_state, player, opponent, False)
        state = controller.get_state_vector()
        
        rl_agent = RLAgent("test")
        action_code = 54
        reward = 0
    else:
        game_state._update(observation["updates"])
        player = game_state.players[observation.player]
        opponent = game_state.players[(observation.player + 1) % 2]
        controller.update(game_state, player, opponent)
    
        s_prime = controller.get_state_vector()
        reward = calculate_reward(state, s_prime, rl_agent.reward_weights)
        rl_agent.add_replay([state, action_code, reward, s_prime, game_state.turn == 359])
        state = s_prime
    
        action_code = rl_agent.get_action(state)
    action = rl_agent.lookup_action(action_code)
#     print(action, state, reward)
    controller.apply_agent_action(action)  
    
    if game_state.turn > 0 and game_state.turn % rl_agent.train_interval == 0:
        rl_agent.train()

    
    return controller.get_actions()

2021-10-05 19:23:54.019479: W tensorflow/stream_executor/platform/default/dso_loader.cc:60] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /opt/conda/lib
2021-10-05 19:23:54.019625: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.


In [9]:
%%writefile agent.py
from lux.game import Game
from Controller import Controller
from RLAgent import *

def calculate_reward(s, s_prime, reward_weights):
        reward_vec = (s_prime[0] - s[0]) * np.array(reward_weights)
        return np.sum(reward_vec)

# we declare this global game_state object so that state persists across turns so we do not need to reinitialize it all the time
game_state = None
controller = None
state = None
action_code = None
rl_agent = None
def agent(observation, configuration):
    global game_state
    global controller
    global state
    global action_code
    global rl_agent

    ### Do not edit ###
    if observation["step"] == 0:
        game_state = Game()
        game_state._initialize(observation["updates"])
        game_state._update(observation["updates"][2:])
        game_state.id = observation.player
        
        player = game_state.players[observation.player]
        opponent = game_state.players[(observation.player + 1) % 2]
        controller = Controller(game_state, player, opponent, False)
        state = controller.get_state_vector()
        
        rl_agent = RLAgent()
        action_code = 54
        reward = 0

    else:
        game_state._update(observation["updates"])
        player = game_state.players[observation.player]
        opponent = game_state.players[(observation.player + 1) % 2]
        controller.update(game_state, player, opponent)
    
        s_prime = controller.get_state_vector()
        reward = calculate_reward(state, s_prime, rl_agent.reward_weights)
        rl_agent.add_replay([state, action_code, reward, s_prime, game_state.turn == 359])
        state = s_prime
    
        action_code = rl_agent.get_action(state)
    action = rl_agent.lookup_action(action_code)
    # print(action, state, reward)
    controller.apply_agent_action(action)

    if (game_state.turn == 359):
        rl_agent.train()  

    return controller.get_actions()


Overwriting agent.py


The base agent is used for training. It runs a strictly rule-based version of the agent and to compare the Q-learning agent to it's basic rule-based implementation.

In [10]:
%%writefile base_agent.py
from lux.game import Game
from Controller import Controller

# we declare this global game_state object so that state persists across turns so we do not need to reinitialize it all the time
game_state = None
controller = None

def base_agent(observation, configuration):
    global game_state
    global controller

    ### Do not edit ###
    if observation["step"] == 0:
        game_state = Game()
        game_state._initialize(observation["updates"])
        game_state._update(observation["updates"][2:])
        game_state.id = observation.player
        
        player = game_state.players[observation.player]
        opponent = game_state.players[(observation.player + 1) % 2]
        controller = Controller(game_state, player, opponent, False)
        
    else:
        game_state._update(observation["updates"])
        player = game_state.players[observation.player]
        opponent = game_state.players[(observation.player + 1) % 2]
        controller.update(game_state, player, opponent)
    
    return controller.get_actions()

Writing base_agent.py


The following cell trains the model. By toggling on "training mode", it makes the agent always choose a random action. This is to try to fill out the largest number of q-values for different actions taken in different states.

Settings:
* gamma: The discount rate.
* epsilon: Controls the probability of performing a random action for exploration purposes.
* num_explore_turns: Sets the number of turns for which to perform exploration action. This is necessary since the effects of an action can really only be observed over an interval of turns.
* training_mode_active: Toggles training mode.
* reward_weights: Sets weights for different state variables when calculating rewards. (See state vector description at top of notebook)

In [11]:
import pickle
import argparse
from RLAgent import *
from agent import *
from base_agent import *
from kaggle_environments import make
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam

def toggle_training_mode():
    settings = None
    with open("agent_settings", "rb") as settings_file:
        settings = pickle.load(settings_file)
    settings["training_mode_active"] = not settings["training_mode_active"] 
    with open("agent_settings", "wb") as settings_file:
        pickle.dump(settings, settings_file)

def train(num_episodes):
    toggle_training_mode()
    for i in range(num_episodes):
        env = make("lux_ai_2021", configuration={"seed": 562124210, "loglevel": 1, "annotations": True}, debug=True)
        steps = env.run([agent, base_agent])
        print("Episode", i + 1, "of", num_episodes, "completed")
    toggle_training_mode()

settings = {
    "gamma": 0.2,
    "epsilon": 0.1,
    "num_explore_turns": 10,
    "training_mode_active": False,
    "reward_weights": (10, 10, 10, 0, 0, 0, 0, 0, -5)
}

optimizer = Adam(learning_rate=0.2)
model = Sequential()
model.add(Dense(32, activation='relu', input_dim=9))
model.add(Dense(55, activation='linear'))
model.compile(loss='mse', optimizer=optimizer)

rl_agent = RLAgent(settings=settings, model=model)

num_episodes = 10
train(num_episodes)

2021-10-05 19:23:58.831695: I tensorflow/compiler/jit/xla_cpu_device.cc:41] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-10-05 19:23:58.834649: W tensorflow/stream_executor/platform/default/dso_loader.cc:60] Could not load dynamic library 'libcuda.so.1'; dlerror: libcuda.so.1: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /opt/conda/lib
2021-10-05 19:23:58.834689: W tensorflow/stream_executor/cuda/cuda_driver.cc:326] failed call to cuInit: UNKNOWN ERROR (303)
2021-10-05 19:23:58.834719: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (73a9fd28f76b): /proc/driver/nvidia/version does not exist
2021-10-05 19:23:58.835764: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operation

Episode 1 of 10 completed
Episode 2 of 10 completed
Episode 3 of 10 completed
Episode 4 of 10 completed
Episode 5 of 10 completed
Episode 6 of 10 completed
Episode 7 of 10 completed
Episode 8 of 10 completed
Episode 9 of 10 completed
Episode 10 of 10 completed


In [12]:
env = make("lux_ai_2021", configuration={"seed": 562124210, "loglevel": 1, "annotations": True}, debug=True)
steps = env.run([agent, base_agent])
env.render(mode="ipython", width=1200, height=800)

In [13]:
!tar -czf submission.tar.gz *