In [1]:
%load_ext lab_black

In [2]:
%%capture
# !pip install --upgrade luxai2022
# !cp -r ../input/lux-ai-2022-beta/* .
import sys
sys.path.append('lux_kit')  #  lux_kit is a copy of https://github.com/Lux-AI-Challenge/Lux-Design-2022/tree/main/kits/python

### Set up Lux Eye

In [3]:
from lux_eye import run_agents

### Start AgentV2

# TODOs

## Moving:
- Move queue
    - A* pathfinding with weights based on move_cost
    - Add high cost where another bot will collide 
        - But position of other bots changes ... 
        - Maybe first just increase cose of entire path?
        - Can bots switch position?
        - Maybe A*, evaluate collisions, if collision, add cost to collision point and repeat until max repeats
        - Cost of collision decrease with distance (i.e. things more likely to change in future anyway)
            - Still want fairly high cost since better not to change work queue
 
- Avoid collisions
    - Record trajectory of all units
        - Keep light/heavy enemy/friendly separate (heavies should only avoid heavies for example)
        - If mining add X turns at that location
    - Ensure no collisions within X turns, otherwise change route
    

    
## Metrics
- Cost for trip from A to B
    - Cost for path either side of best (i.e. narrow paths also not great)
        - Can just add cost to best path and recalculate
    - Factor in weather somehow
- Evaluate positions on map to see how good they are in terms of proximity (travel duration) to nearest resources
    - Add value if many resources in proximity to nearest resource (i.e. don't want to only have 1 ice nearby)
- Some sort of metric of where to sabotage opponent?
    - I.e. density of light / mining robots
    - Can I see energy of enemy bots?
        - Low energy bots would be easier targets
    - Can I see resources of enemy bots?
        - High resource bots are better targets
- At some point need some way to evaluate rewards
- Value of Factory locations in terms of low rubble for lichen growth
    - Will be some variables to this that won't be straighforward to tune, let RL?
- Cost of letting lichen grow
    - Need to factor in expansion etc
    - What about different watering schedules?

## Factory:
- Lichen growth near end
    - Depends how well growth can be calculated... 
        - Better allows for suicide burn but with higher risk
        - Worse will need a more sustained approache
    - Possible trick: light bot making a rubble channel to allow for lichen expansion without high water cost
        - Then clear rubble near end for massive lichen growth?


## Work Queues:
- Check it is possible to make move/dig/move/transfer repeat
- Miner:
    - Calculate energy cost to move from a factory tile to resource and back + mining cost (using A* for paths)
        - These can be can be calculated for many possible factory locations at the beginning in order to help choose good factory locations
        - Will need to be updated periodically to account for new rubble
     - Assign routes to bots by ids and keep record
         - Heavy get priority of best routes
     - Routes should be fully autonomous (i.e. everything repeats)
     - Need to build in energy buffer for night...
- Cargo:
    - Might be beneficial to transfer resources between factories
- Attacker:
    - Use heavy to hunt down enemy light or stationary heavy?
    - Maybe switch heavy with low resources high energy to hunt at night?
        - Maybe want to let RL decide when to do this
- Defender:
    - Hard to implement
    - If enemy close to friendlies with high resources, sacrifice lower resource bots to defend?
    - Transfer away resources first? 
    - Especially necessary near mining area?
- Rubble clearer:
    - Clear path to open areas for lichen growth
- Suicide bomber:
    - Especially during mars quake, add rubble near enemy?
         
## Agent:
- As weather changes approach, change behaviours?
    - Mars quake requires special attention (don't want to add rubble on my routes, do want to add to enemy routes)
    - Others probably aren't worth too much attention until RL (maybe build in some buffer to energy calculations?)
- Heavy bots to choose new routes etc first
    - Gives best routes to heavy, and light can then avoid them later
    - Low value miners consider changing first (may need to support different factory?)
    - If collisions are imminent, route change for lower value
        - Or evalue route change for both colliding parties and change preferable one
    - Low value attackers to change roles
- Light bots to avoid collisions with heavy bots first
    - Assume enemies heavies will move anywhere in 1 radius?
- Light bots avoid collisions with enemy light ONLY if holding more resources
    - I.e. good to take out enemies if they lose more
    
    
## Agent Structure:
- DistrictManager:
    - decide what factories should be generally aiming for (more ore/ice/attackers)
    - Maybe change these aims depending weather, metrics every X turns, etc
- FactoryManager:
    - calculate desire to build heavy vs light vs grow
        - I.e. more desire if good value for given aims
    - mine more ice/ore 
    - weight transfer ice/ore (e.g. +ve for demand, -ve for supply)
    - calculate value of mining routes
- UnitManager:
    - General unit control (keeping track of locations etc)
    

### Plotting
- Create single heatmap from rubble, ice, ore, lichen
    - Set a custom colorscale so that ice, ore can have a specific color
    - Lichen and rubble can be actual colorscales
        - Note: lichen only grows where no rubble
    - Hover info can include everything
        - Even include hoverinfo for factories/units etc?
        - Or make sure the factories/units hoverinfo includes info about what's under them
    - Add option to overlay text value on each cell (i.e. value scores of each tile for various calculations)
- Units/Factories remain as shapes (probably cheaper to move etc)
    - Maybe keep transparent heatmap for hover info (but need to think about what info to show)


In [8]:
from util import CENTER, UP, RIGHT, DOWN, LEFT, ICE, ORE, WATER, METAL, POWER, manhattan

from lux.kit import obs_to_game_state, GameState, EnvConfig
from lux.config import UnitConfig
from lux.utils import direction_to, my_turn_to_place_factory
import numpy as np
import sys
import lux


LOGGING_LEVEL = 3

test_obs = None


class UnitManager:
    def __init__(
        self,
        unit_id: str,
        unit: lux.unit.Unit,
        game_state: lux.kit.GameState,
        factories: np.ndarray,
    ):
        self.unit_id = unit_id
        self.unit = unit
        self.unit_config: UnitConfig = unit.unit_cfg
        self.game_state = game_state
        self.factories = factories  # [0,:] for locations, [1,:] for factory

    @property
    def nearest_factory(self):
        factory_distances = np.array(
            [manhattan(loc, self.unit.pos) for loc in self.factories[0]]
        )
        closest_factory_position = self.factories[0][np.argmin(factory_distances)]
        factory = self.factories[1][np.argmin(factory_distances)]
        return {
            "distance": np.min(factory_distances),
            "location": closest_factory_position,
            "factory": factory,
        }

    def log(self, message, level=2):
        if level > LOGGING_LEVEL:
            print(
                f"Step {self.game_state.real_env_steps}, Unit {self.unit_id}: {message}"
            )

    def move_to(self, position):
        actions = []
        direction = direction_to(self.unit.pos, position)
        move_cost = self.unit.move_cost(self.game_state, direction)
        if move_cost is None:
            raise RuntimeError(
                f"step:{self.game_state_.real_env_steps} {unit_id} got move_cost == None"
            )
        power_cost = self.unit.action_queue_cost(self.game_state) + move_cost
        if power_cost < self.unit.power:
            actions.append(self.unit.move(direction, repeat=0))
        else:
            actions.append(
                self.unit.recharge(
                    power_cost + self.unit.action_queue_cost(self.game_state)
                )
            )
        return actions

    def move_toward_factory(self, factory_id: int = None):
        """Move towards factory_id (or nearest if -1)"""
        if len(self.factories) == 0:
            print("no factory tiles")
            return actions
        # TODO: look at only specified factory_id if given
        factory_distances = np.array(
            [manhattan(loc, self.unit.pos) for loc in self.factories[0]]
        )
        if np.min(factory_distances) == 0:
            self.log("distance to factory is 0", level=1)
            return []
        closest_factory_tile = self.factories[0][np.argmin(factory_distances)]
        actions = self.move_to(closest_factory_tile)
        return actions

    def move_toward_nearest_resource(self, resource: int):
        """Moves towards nearest available resource (i.e. avoiding crashing)"""
        if resource == ICE:
            resource_map = self.game_state.board.ice
        elif resource == ORE:
            resource_map = self.game_state.board.ore
        else:
            raise NotImplementedError(f"Dont know {resource}")

        locations = np.argwhere(resource_map == 1)

        distances = np.array(
            [manhattan(ice_loc, self.unit.pos) for ice_loc in locations]
        )
        if np.min(distances) == 0:
            return []
        closest = locations[np.argmin(distances)]
        actions = self.move_to(closest)
        return actions

    def mine(self, resource: int, quantity: int):
        actions = []
        if resource == ICE:
            actions.extend(self.move_toward_nearest_resource(ICE))
            if len(actions) == 0:
                power_cost = self.unit.dig_cost(
                    self.game_state
                ) + self.unit.action_queue_cost(self.game_state)
                if power_cost <= self.unit.power:
                    actions.append(self.unit.dig(repeat=0))
                else:
                    actions.append(
                        self.unit.recharge(
                            power_cost + self.unit.action_queue_cost(self.game_state)
                        )
                    )
        return actions

    def deposit_at_factory(self, factory_id: int = -1):
        actions = []
        actions.extend(self.move_toward_factory(factory_id))
        if len(actions) == 0:
            if self.unit.cargo.ice:
                cargo_type = ICE
                cargo_amount = self.unit.cargo.ice
            elif self.unit.cargo.ore:
                cargo_type = ORE
                cargo_amount = self.unit.cargo.ore
            else:
                self.log("Nothing to deposit", level=3)
            actions.append(
                self.unit.transfer(CENTER, cargo_type, cargo_amount, repeat=0)
            )
        return actions

    def charge(self, target: int = None, amount: int = None, allow_partial=True):
        """Take charge from factory either aiming for target, or to take amount"""
        if allow_partial is False:
            # TODO
            raise NotImplementedError
        if target is None and amount is None:
            target = self.unit_config.BATTERY_CAPACITY
        elif target is not None and amount is not None:
            self.log(
                f"only one of target and amount should be set, got target: {target}, amount: {amount}",
                level=3,
            )
            raise RuntimeError
        if target is not None:
            amount = target - self.unit.power
        nearest_factory = self.nearest_factory
        if nearest_factory["distance"] > 1:
            actions = self.move_to(nearest_factory["position"])
        else:
            amount = min(amount, nearest_factory["factory"].power)
            actions = [self.unit.pickup(POWER, amount, repeat=0)]
        return actions


class AgentV1:
    def __init__(self, player: str, env_cfg: EnvConfig) -> None:
        self.player = player
        self.opp_player = "player_1" if self.player == "player_0" else "player_0"
        np.random.seed(0)
        self.env_cfg: EnvConfig = env_cfg

        # Variables to store info later to avoid unecessarily repeating
        self.game_state_: GameState = None

    def UnitManager(self, unit_id, unit):
        return UnitManager(
            unit_id,
            unit,
            self.game_state_,
            factories=[self.factory_tiles_, self.factory_units_],
        )

    def log(self, message, level=2):
        if level > LOGGING_LEVEL:
            print(
                f"Step: {self.game_state_.real_env_steps}, player: {self.player} - {message}"
            )

    def early_setup(self, step: int, obs, remainingOverageTime: int = 60):
        """Required API for Agent. This is called until all factories are placed"""
        self.game_state_ = obs_to_game_state(step, self.env_cfg, obs)
        if step == 0:
            return self.bid(obs)
        else:
            # factory placement period
            game_state = self.game_state_
            my_turn_to_place = my_turn_to_place_factory(
                game_state.teams[self.player].place_first, step
            )
            factories_to_place = game_state.teams[self.player].factories_to_place
            if factories_to_place > 0 and my_turn_to_place:
                return self.place_factory(step, obs)
            return dict()

    def place_factory(self, step, obs):
        """Place factory in early_setup"""
        game_state = obs_to_game_state(step, self.env_cfg, obs)

        # how many factories you have left to place
        factories_to_place = game_state.teams[self.player].factories_to_place

        # how much water and metal you have in your starting pool to give to new factories
        water_left = game_state.teams[self.player].water
        metal_left = game_state.teams[self.player].metal

        # spawn in random location with default resources
        potential_spawns = np.array(
            list(zip(*np.where(obs["board"]["valid_spawns_mask"] == 1)))
        )
        spawn_loc = potential_spawns[np.random.randint(0, len(potential_spawns))]
        return dict(spawn=spawn_loc, metal=150, water=150)

    def bid(self, obs):
        """Bid for starting factory (default to 0)"""
        return dict(faction="TheBuilders", bid=0)

    def act(self, step: int, obs, remainingOverageTime: int = 60):
        """Required API for Agent. This is called every turn after early_setup is complete"""
        # Set some useful variables for the step that can be used in factory_actions or unit_actions etc
        self.update_step_info(step, obs, remainingOverageTime)
        # global test_obs, test_cfg
        # test_obs = obs
        # test_cfg = self.env_cfg
        # if step == 50:
        #     raise Exception
        factory_actions = self.factory_actions(step, obs, remainingOverageTime)
        unit_actions = self.unit_actions(step, obs, remainingOverageTime)
        actions = dict(**factory_actions, **unit_actions)
        # print(actions)
        # if step > 10:
        #     raise Exception
        return actions

    def update_step_info(self, step, obs, remainingOverageTime):
        self.game_state_ = obs_to_game_state(step, self.env_cfg, obs)

        factory_tiles = []
        factories = []
        for unit_id, factory in self.game_state_.factories[self.player].items():
            factory_tiles += [factory.pos]
            factories += [factory]
        self.factory_tiles_, self.factory_units_ = [
            np.array(arr) for arr in [factory_tiles, factories]
        ]

        self.ice_locations_ = np.argwhere(self.game_state_.board.ice == 1)
        self.ore_locations_ = np.argwhere(self.game_state_.board.ore == 1)

    def factory_actions(self, step, obs, remainingOverageTime: int = 60):
        actions = dict()
        game_state = self.game_state_
        factories = game_state.factories[self.player]
        for unit_id, factory in factories.items():
            if self.env_cfg.max_episode_length - game_state.real_env_steps < 50:
                if factory.water_cost(game_state) <= factory.cargo.water:
                    actions[unit_id] = factory.water()
                    continue

            if (
                factory.power >= self.env_cfg.ROBOTS["HEAVY"].POWER_COST
                and factory.cargo.metal >= self.env_cfg.ROBOTS["HEAVY"].METAL_COST
            ):
                actions[unit_id] = factory.build_heavy()
                continue

        #             if factory.power >= self.env_cfg.ROBOTS["LIGHT"].POWER_COST and \
        #             factory.cargo.metal >= self.env_cfg.ROBOTS["LIGHT"].METAL_COST:
        #                 actions[unit_id] = factory.build_light()
        #                 continue
        return actions

    def unit_actions(self, step, obs, remainingOverageTime: int = 60):
        actions = dict()
        game_state = self.game_state_
        units = game_state.units[self.player]
        for unit_id, unit in units.items():
            unit_manager = self.UnitManager(unit_id, unit)
            if unit.power < unit.action_queue_cost(game_state) or unit.action_queue:
                # Unit doesn't have enough energy to act or already has actions to carry out
                continue
            if unit.unit_type == "HEAVY":
                actions[unit_id] = self.heavy_unit_action(unit_manager)
            elif unit.unit_type == "LIGHT":
                actions[unit_id] = self.light_unit_action(unit_manager)
            else:
                raise Exception(f"Dont know {unit.unit_type}")
        actions = {k: v for k, v in actions.items() if len(v) > 0}
        return actions

    def heavy_unit_action(self, unit_manager: UnitManager):
        unit = unit_manager.unit
        if unit.power < 1000 and unit_manager.nearest_factory["distance"] <= 1:
            self.log(f"sending {unit.unit_id} to charge", level=1)
            # TODO: Move off middle position before charging
            # TODO: Allow corners of factory for charging'
            # TODO: Calculate charge required to move/mine/move
            actions = unit_manager.charge(target=2000)
        elif unit.cargo.ice < 100:
            self.log(f"sending {unit.unit_id} to mine ice", level=1)
            actions = unit_manager.mine(ICE, quantity=100)
        else:
            self.log(f"sending {unit.unit_id} to drop off ice", level=1)

            # TODO: Deliver to factory with least water/ice
            actions = unit_manager.deposit_at_factory(factory_id=-1)
        return actions

    def light_unit_action(self, unit_manager: UnitManager):
        unit = unit_manager.unit
        if unit.power < 100 and unit_manager.nearest_factory["distance"] <= 1:
            # TODO: Move off middle position before charging
            # TODO: Allow corners of factory for charging'
            # TODO: Calculate charge required to move/mine/move
            actions = unit_manager.charge(target=200)
        elif unit.cargo.ice < 10:
            actions = unit_manager.mine(ICE, quantity=10)
        else:
            # TODO: Deliver to factory with least water/ice
            actions = unit_manager.deposit_at_factory(factory_id=-1)
        return actions

In [9]:
run_agents(AgentV1, AgentV1, map_seed=1, save_state_at=62)

{'factory_0': 1, 'factory_2': 1}
{'factory_1': 1, 'factory_3': 1}
{'unit_4': [array([  2,   0,   4, 550,   0])], 'unit_5': [array([  2,   0,   4, 550,   0])]}
{'unit_6': [array([  2,   0,   4, 550,   0])], 'unit_7': [array([  2,   0,   4, 550,   0])]}
{'unit_4': [array([0, 1, 0, 0, 0])], 'unit_5': [array([0, 4, 0, 0, 0])]}
{'unit_6': [array([0, 2, 0, 0, 0])], 'unit_7': [array([0, 4, 0, 0, 0])]}
{'unit_4': [array([0, 1, 0, 0, 0])], 'unit_5': [array([0, 4, 0, 0, 0])]}
{'unit_6': [array([0, 2, 0, 0, 0])], 'unit_7': [array([0, 4, 0, 0, 0])]}
{'unit_4': [array([0, 2, 0, 0, 0])], 'unit_5': [array([0, 4, 0, 0, 0])]}
{'unit_6': [array([0, 2, 0, 0, 0])], 'unit_7': [array([0, 4, 0, 0, 0])]}
{'unit_4': [array([0, 1, 0, 0, 0])], 'unit_5': [array([0, 4, 0, 0, 0])]}
{'unit_6': [array([0, 2, 0, 0, 0])], 'unit_7': [array([3, 0, 0, 0, 0])]}
{'unit_4': [array([0, 2, 0, 0, 0])], 'unit_5': [array([0, 4, 0, 0, 0])]}


Exception: 

In [6]:
class UnitManagerV2(UnitManager):
    def move_to(self, position):
        actions = []
        direction = direction_to(self.unit.pos, position)
        move_cost = self.unit.move_cost(self.game_state, direction)
        if move_cost is None:
            raise RuntimeError(
                f"step:{self.game_state_.real_env_steps} {unit_id} got move_cost == None"
            )
        power_cost = self.unit.action_queue_cost(self.game_state) + move_cost
        if power_cost < self.unit.power:
            actions.append(self.unit.move(direction, repeat=0))
        else:
            actions.append(
                self.unit.recharge(
                    power_cost + self.unit.action_queue_cost(self.game_state)
                )
            )
        return actions


class AgentV2(AgentV1):
    def UnitManager(self, unit_id, unit):
        return UnitManagerV2(
            unit_id,
            unit,
            self.game_state_,
            factories=[self.factory_tiles_, self.factory_units_],
        )

In [7]:
run_agents(AgentV2, AgentV1, map_seed=1)

57: 1 Units: (unit_6) collided at 31,15 with [34m[1] unit_7 UnitType.HEAVY at (31, 15)[0m surviving
84: 1 Units: (unit_5) collided at 24,37 with [34m[0] unit_4 UnitType.HEAVY at (24, 37)[0m surviving
