In [None]:
import json
from IPython.display import display, Javascript
from luxai_s3.wrappers import LuxAIS3GymEnv, RecordEpisode

def render_episode(episode: RecordEpisode) -> None:
    data = json.dumps(episode.serialize_episode_data(), separators=(",", ":"))
    display(Javascript(f"""
var iframe = document.createElement('iframe');
iframe.src = 'https://s3vis.lux-ai.org/#/kaggle';
iframe.width = '100%';
iframe.scrolling = 'no';

iframe.addEventListener('load', event => {{
    event.target.contentWindow.postMessage({data}, 'https://s3vis.lux-ai.org');
}});

new ResizeObserver(entries => {{
    for (const entry of entries) {{
        entry.target.height = `${{Math.round(320 + 0.3 * entry.contentRect.width)}}px`;
    }}
}}).observe(iframe);

element.append(iframe);
    """))

def evaluate_agents(agent_1_cls, agent_2_cls, seed=42, games_to_play=3, replay_save_dir="replays", render=True):
    env = RecordEpisode(
        LuxAIS3GymEnv(numpy_output=True), save_on_close=True, save_on_reset=True, save_dir=replay_save_dir
    )
    obs, info = env.reset(seed=seed)
    total_wins = np.zeros((2))
    for i in range(games_to_play):
        obs, info = env.reset()
        env_cfg = info["params"] # only contains observable game parameters
        player_0 = agent_1_cls("player_0", env_cfg)
        player_1 = agent_2_cls("player_1", env_cfg)
    
        # main game loop
        game_done = False
        step = 0
        print(f"Running game {i}")
        while not game_done:
            actions = dict()
            for agent in [player_0, player_1]:
                actions[agent.player] = agent.act(step=step, obs=obs[agent.player])
            obs, reward, terminated, truncated, info = env.step(actions)
            # info["state"] is the environment state object, you can inspect/play around with it to e.g. print
            # unobservable game data that agents can't see
            dones = {k: terminated[k] | truncated[k] for k in terminated}
            if dones["player_0"] or dones["player_1"]:
                game_done = True
            step += 1
        total_wins += np.array([reward["player_0"],reward["player_1"]])
        if render:
            render_episode(env)
    print(total_wins/games_to_play)
    env.close() # free up resources and save final replay

In [None]:
from lux.utils import direction_to
import numpy as np
class Agent():
    def __init__(self, player: str, env_cfg) -> None:
        self.player = player
        self.opp_player = "player_1" if self.player == "player_0" else "player_0"
        self.team_id = 0 if self.player == "player_0" else 1
        self.opp_team_id = 1 if self.team_id == 0 else 0
        np.random.seed(0)
        self.env_cfg = env_cfg
        
        self.unit_explore_locations = dict()
        self.relic_node_positions = []
        self.discovered_relic_nodes_ids = set()

    def act(self, step: int, obs, remainingOverageTime: int = 60):
        """implement this function to decide what actions to send to each available unit. 
        
        step is the current timestep number of the game starting from 0 going up to max_steps_in_match * match_count_per_episode - 1.
        """
        unit_mask = np.array(obs["units_mask"][self.team_id]) # shape (max_units, )
        unit_positions = np.array(obs["units"]["position"][self.team_id]) # shape (max_units, 2)
        unit_energys = np.array(obs["units"]["energy"][self.team_id]) # shape (max_units, 1)
        observed_relic_node_positions = np.array(obs["relic_nodes"]) # shape (max_relic_nodes, 2)
        observed_relic_nodes_mask = np.array(obs["relic_nodes_mask"]) # shape (max_relic_nodes, )
        team_points = np.array(obs["team_points"]) # points of each team, team_points[self.team_id] is the points of the your team
        # ids of units you can control at this timestep
        available_unit_ids = np.where(unit_mask)[0]
        actions = np.zeros((self.env_cfg["max_units"], 3), dtype=int)


        # visible relic nodes
        visible_relic_node_ids = set(np.where(observed_relic_nodes_mask)[0])
        # save any new relic nodes that we discover for the rest of the game.
        for id in visible_relic_node_ids:
            if id not in self.discovered_relic_nodes_ids:
                self.discovered_relic_nodes_ids.add(id)
                self.relic_node_positions.append(observed_relic_node_positions[id])

        # unit ids range from 0 to max_units - 1
        for unit_id in available_unit_ids:
            unit_pos = unit_positions[unit_id]
            # if we found at least one relic node
            if len(self.relic_node_positions) > 0:
                nearest_relic_node_position = self.relic_node_positions[0]
                manhattan_distance = abs(unit_pos[0] - nearest_relic_node_position[0]) + abs(unit_pos[1] - nearest_relic_node_position[1])
                
                # if close to the relic node we want to move randomly around it and hope to gain points
                if manhattan_distance <= 4:
                    random_direction = np.random.randint(0, 5)
                    actions[unit_id] = [random_direction, 0, 0]
                else:
                    # otherwise we want to move towards the relic node
                    actions[unit_id] = [direction_to(unit_pos, nearest_relic_node_position), 0, 0]
            # every 20 steps or if a unit doesn't have an assigned location to explore
            else:
                if step % 20 == 0 or unit_id not in self.unit_explore_locations:
                    # pick a random location on the map for the unit to explore
                    rand_loc = (np.random.randint(0, self.env_cfg["map_width"]), np.random.randint(0, self.env_cfg["map_height"]))
                    self.unit_explore_locations[unit_id] = rand_loc
                # using the direction_to tool we can generate a direction that makes the unit move to the saved location
                # note that the first index of each unit's action represents the type of action. See specs for more details
                actions[unit_id] = [direction_to(unit_pos, self.unit_explore_locations[unit_id]), 0, 0]
        return actions

In [None]:
a = [[10,3], [10,4]]
print([10,3] in a)

In [None]:
#%%writefile agent/agent.py
from lux.utils import direction_to
import numpy as np
import random


class Agent2():
    def __init__(self, player: str, env_cfg) -> None:
        self.player = player
        self.opp_player = "player_1" if self.player == "player_0" else "player_0"
        self.team_id = 0 if self.player == "player_0" else 1
        self.opp_team_id = 1 if self.team_id == 0 else 0
        np.random.seed(0)
        self.env_cfg = env_cfg
        if self.player=="player_0":
            self.start_pos = [0,0]
            self.pnum = 0
        else:
            self.start_pos = [self.env_cfg["map_width"], self.env_cfg["map_height"]]
            self.pnum = 1
        self.unit_explore_locations = dict()
        self.relic_node_positions = []
        self.discovered_relic_nodes_ids = set()

        self.n_units = self.env_cfg["max_units"]
        self.range = self.env_cfg["unit_sensor_range"]
        self.width = self.env_cfg["map_width"]
        self.height = self.env_cfg["map_height"]
        self.explore_targets = [(self.range+1, self.height-self.range-1), 
                                (self.width-self.range-1, self.range+1), 
                                (self.width-self.start_pos[0]+self.range*(-1)**self.pnum, self.height-self.start_pos[0]+self.range*(-1)**self.pnum)
                               ]
        self.relic_targets = []
        self.explore_targets = []
        self.unit_has_target = -np.ones((self.n_units)) # -1=no target; 0=explore target; 1=relic target; 2=on relic
        self.unit_targets = dict(zip(range(0,self.n_units), np.zeros(self.n_units)))
        self.prev_points = 0
        self.prev_points_increase = 0
        self.prev_actions = None

    def direction_to_change(self, direction):
        if direction==0:
            change = [0,0]
        if direction==1:
            change = [0,-1]
        if direction==2:
            change = [1,0]
        if direction==3:
            change = [0,1]
        if direction==4:
            change = [-1,0]
        return np.array(change)
    
    def get_moves(self, obs, unit_id, unit_pos):
        prev_pos = [unit_pos[0] - self.direction_to_change(self.prev_actions[unit_id][0])[0], unit_pos[1] - self.direction_to_change(self.prev_actions[unit_id][0])[1]]
        new_pos = [[unit_pos[0], unit_pos[1]-1],
                  [unit_pos[0]+1, unit_pos[1]],
                  [unit_pos[0], unit_pos[1]+1],
                  [unit_pos[0]-1, unit_pos[1]]]
        moves = [0]
        for ii, pos in enumerate(new_pos):
            if pos[0]<0 or pos[1]<0 or pos[0]>=self.width or pos[1]>=self.height or (pos[0]==prev_pos[0] and pos[1]==prev_pos[1]) or obs["map_features"]["tile_type"][pos[0], pos[1]]==2 :
                pass
            else:
                moves.append(direction_to(unit_pos, pos))
        return moves
        
    # moves around asteroids
    def move_obstacle_avoid(self, obs, unit_id, unit_pos, direction):
        moves = self.get_moves(obs, unit_id, unit_pos)
        if direction in moves:
            return direction
        elif moves:
            return random.choice(moves)
        else:
            return 0
            
    def relic_to_targets(self, pos):
        targets = []
        for i in range(-2,3,1):
            for j in range(-2,3,1):
                targets.append(np.array([pos[0]+i, pos[1]+j]))
        return targets
        
    def act(self, step: int, obs, remainingOverageTime: int = 60):
        """implement this function to decide what actions to send to each available unit. 
        
        step is the current timestep number of the game starting from 0 going up to max_steps_in_match * match_count_per_episode - 1.
        """
        unit_mask = np.array(obs["units_mask"][self.team_id]) # shape (max_units, )
        unit_positions = np.array(obs["units"]["position"][self.team_id]) # shape (max_units, 2)
        unit_energys = np.array(obs["units"]["energy"][self.team_id]) # shape (max_units, 1)
        observed_relic_node_positions = np.array(obs["relic_nodes"]) # shape (max_relic_nodes, 2)
        observed_relic_nodes_mask = np.array(obs["relic_nodes_mask"]) # shape (max_relic_nodes, )
        team_points = np.array(obs["team_points"]) # points of each team, team_points[self.team_id] is the points of the your team
        increase = team_points[self.team_id]-self.prev_points
        # ids of units you can control at this timestep
        available_unit_ids = np.where(unit_mask)[0]
        actions = np.zeros((self.env_cfg["max_units"], 3), dtype=int)

        #print(obs)
        # visible relic nodes
        visible_relic_node_ids = set(np.where(observed_relic_nodes_mask)[0])
        # save any new relic nodes that we discover for the rest of the game.
        for id in visible_relic_node_ids:
            if id not in self.discovered_relic_nodes_ids:
                self.discovered_relic_nodes_ids.add(id)
                self.relic_node_positions.append(observed_relic_node_positions[id])
                self.relic_targets.extend(self.relic_to_targets(observed_relic_node_positions[id]))
        #print(self.relic_targets)
        # unit ids range from 0 to max_units - 1
        for unit_id in available_unit_ids:
            unit_pos = unit_positions[unit_id]
            if self.unit_has_target[unit_id]!=-1 and self.unit_has_target[unit_id]!=2 and unit_pos[0]==self.unit_targets[unit_id][0] and unit_pos[1]==self.unit_targets[unit_id][1]:
                if increase>self.prev_points_increase:
                    self.unit_has_target[unit_id]=2
                else:
                    self.unit_has_target[unit_id]=-1
            if self.unit_has_target[unit_id]==-1:
                # set target of unit to relic tile
                if self.relic_targets:
                    rand = np.random.randint(0,len(self.relic_targets)) ### closest relic target not random
                    dist = np.sum(np.abs(np.array(self.relic_targets)-unit_pos),axis=1)
                    target = self.relic_targets.pop(np.argmin(dist))
                    self.unit_has_target[unit_id] = 1
                
                # every 20 steps or if a unit doesn't have an assigned location to explore
                else:
                    if step % 20 == 0 or unit_id not in self.unit_explore_locations:
                        if self.explore_targets:
                            target = self.explore_targets.pop(0)
                        else:
                            # pick a random location on the map for the unit to explore
                            rand_loc = (np.random.randint(0, self.env_cfg["map_width"]), np.random.randint(0, self.env_cfg["map_height"]))
                            target = rand_loc
                        self.unit_has_target[unit_id] = 0
                
                self.unit_targets[unit_id] = target
            #print(self.unit_has_target)
            direction = self.move_obstacle_avoid(obs, unit_id, unit_pos, direction_to(unit_pos, self.unit_targets[unit_id]))
            actions[unit_id] = [direction, 0, 0]
        #print(self.unit_has_target, "\n", self.unit_targets, "\n",)
        self.prev_points = team_points[self.team_id]
        self.prev_points_increase = increase
        self.prev_actions = actions
        return actions

In [None]:

np.set_printoptions(linewidth=500)
relic_map = RelicMap(2)
#relic_map.map[3,3] = 2
#relic_map.map[3,4] = 2
#relic_map.map[3,5] = 1
relic_map.new_relic([5,5])
relic_map.step([[0,1],[2,1]], 0)
print(relic_map.map)
relic_map.step([[0,2],[2,2]], 0)
print(relic_map.map)
relic_map.step([[0,3],[3,2]], 0)
print(relic_map.map)
relic_map.step([[0,4],[3,3]], 0)
print(relic_map.map)
relic_map.step([[1,4],[3,4]], 1)
print(relic_map.map)
relic_map.step([[2,4],[3,4]], 1)
print(relic_map.map)
relic_map.step([[3,4],[3,4]], 1)
print(relic_map.map)
relic_map.step([[3,5],[3,4]], 2)
print(relic_map.map)
relic_map.step([[3,4],[3,4]], 2)
print(relic_map.map)
relic_map.step([[3,3],[3,4]], 1)
print(relic_map.map)
relic_map.step([[3,2],[3,4]], 1)
print(relic_map.map)
relic_map.step([[4,2],[3,4]], 1)
print(relic_map.map)
relic_map.step([[4,3],[3,4]], 2)
print(relic_map.map)

In [None]:
class RelicMap():
    '''
    Relic map keeps track of locations of relic positions, known fragments, disproven fragments and possible fragments.
    It also stores the current status for each unit in relation to it's position and fragment locations
    map: 24 x 24 game map
        -1 = unknown
        0 = disproven fragment
        1 = possible fragment
        2 = known fragment
        3 = known and occupied
    '''
    def __init__(self, n_units):
        self.map = -np.ones((24,24))
        self.unit_status = np.zeros((n_units))

    def reset(self):
        self.map[self.map==3] = 2
    
    def new_relic(self, pos):
        patch = self.map[pos[0]-2:pos[0]+3,pos[1]-2:pos[1]+3]
        patch[patch==-1] = 1
        self.map[pos[0]-2:pos[0]+3,pos[1]-2:pos[1]+3] = patch

    def get_fragments(self):
        knowns = np.transpose((self.map>=2)).nonzero())
        return knowns

    def get_possibles(self):
        possibles = np.transpose((self.map==1).nonzero())
        return possibles
        
    def step(self, unit_positions, increase_change):
        knowns_increase = 0
        knowns = np.transpose((self.map>=2).nonzero())
        for frag in knowns:
            if (frag.tolist() in unit_positions):
                knowns_increase +=1
        new_increase = increase-knowns_increase
        count = 0
        print("Knowns: ", knowns, "Increase: ", increase, "Known increase: ", knowns_increase)
        print("Unit positions: ", unit_positions)
        for ii, unit in enumerate(unit_positions):
            if self.map[*unit] == 1:
                if increase_change>0:
                    self.map[*unit] = 3
                    count += 1
                else:
                    self.map[*unit] = 0
        for ii, unit in enumerate(unit_positions):
            if self.map[*unit] == -1:
                if increase_change - count <=0:
                    self.map[*unit] = 0
                else:
                    self.map[*unit] = 3
        for ii, unit in enumerate(unit_positions):
            self.unit_status[ii] = self.map[*unit]
        print(self.map.T)
        

In [None]:
class RelicMap():
    '''
    Relic map keeps track of locations of relic positions, known fragments, disproven fragments and possible fragments.
    It also stores the current status for each unit in relation to it's position and fragment locations
    map: 24 x 24 game map
        -1 = unknown
        0 = disproven fragment
        1 = possible fragment
        2 = known fragment
        3 = known and occupied
    '''
    def __init__(self, n_units):
        self.map = -np.ones((24,24))
        self.map_visited = np.zeros((24,24))
        self.map_confidence = np.zeros((24,24))
        self.map_occupied = np.zeros((24,24))
        self.unit_status = np.zeros((n_units))

    def reset(self):
        self.map[self.map==3] = 2
        poss = knowns = np.transpose((self.map_confidence<=0.75).nonzero())
        poss2 = knowns = np.transpose((self.map_confidence>=0.25).nonzero())
        for p in poss:
            if p.tolist() in poss2.tolist():
                self.map_visited[p] = 0
                
    def new_relic(self, pos):
        #patch = self.map[pos[0]-2:pos[0]+3,pos[1]-2:pos[1]+3]
        #patch[patch==-1] = 1
        self.map[pos[0]-2:pos[0]+3,pos[1]-2:pos[1]+3] = 1
        self.map[abs(pos[1]-23)-2:abs(pos[1]-23)+3,abs(pos[0]-23)-2:abs(pos[0]-23)+3] = 1
        
        self.map_confidence[pos[0]-2:pos[0]+3,pos[1]-2:pos[1]+3] = 9/25
        self.map_confidence[abs(pos[1]-23)-2:abs(pos[1]-23)+3,abs(pos[0]-23)-2:abs(pos[0]-23)+3] = 9/25

    def get_fragments(self):
        knowns = np.transpose((self.map_confidence>=0.75).nonzero())
        #print(knowns)
        return knowns

    def get_possibles(self):
        poss = []
        possibles = np.transpose((self.map==1).nonzero())
        possibles2 = np.transpose((self.map_visited==0).nonzero())
        for p in possibles:
            if p.tolist() in possibles2.tolist():
                poss.append(p)
        return poss
        
    def step(self, unit_positions, increase):
        new = []
        ones = []
        rest = []
        for frag in np.transpose((self.map==1).nonzero()):
            if frag.tolist() in unit_positions.tolist():
                if self.map_visited[*frag]==0:
                    new.append(frag)
                elif self.map_confidence[*frag]==1:
                    ones.append(frag)
                elif self.map_confidence[*frag]>0.001:
                    rest.append(frag)
        remaining_points = increase-len(ones)
        uncertains_len = len(new) + len(rest)
        if uncertains_len>0:
            t = remaining_points/uncertains_len
            #print("t: ", t)
            for frag in new:
                self.map_visited[*frag]=1
                self.map_confidence[*frag] = t
            if remaining_points>0:
                for frag in rest:
                    self.map_confidence[*frag] = self.map_confidence[*frag]*t
        ones = []
        uncertains = []
        total = 0
        for frag in np.transpose((self.map==1).nonzero()):
            if frag in unit_positions:
                if self.map_confidence[*frag]==1:
                    ones.append(frag)
                elif self.map_confidence[*frag]<1 and self.map_confidence[*frag]>0:
                    uncertains.append(frag)
                    total += (self.map_confidence[*frag])
        #print(ones)
        remaining = increase-len(ones)
        #print("final remain: ", remaining, "total of uncertains: ", total)
        if total>0 and remaining>0:
            for frag in np.transpose((self.map==1).nonzero()):
                if frag in unit_positions:
                    if self.map_confidence[*frag]<1:
                        self.map_confidence[*frag] = self.map_confidence[*frag]*(remaining/total)
        self.map_confidence = np.clip(self.map_confidence,0,1)
                
        

In [None]:

np.set_printoptions(linewidth=500)
relic_map = RelicMap(2)
#relic_map.map[3,3] = 2
#relic_map.map[3,4] = 2
#relic_map.map[3,5] = 1
relic_map.new_relic([5,5])
relic_map.step([[0,1],[2,1]], 0)
print("poss: ", relic_map.map_possibles, "\n", "knowns: ", relic_map.map_knowns, "\n", "confidence: ", relic_map.map_confidence, "\n", "\n", "\n")
relic_map.step([[0,2],[2,2]], 0)
print("poss: ", relic_map.map_possibles, "\n", "knowns: ", relic_map.map_knowns, "\n", "confidence: ", relic_map.map_confidence, "\n", "\n", "\n")
relic_map.step([[0,3],[3,2]], 0)
print("poss: ", relic_map.map_possibles, "\n", "knowns: ", relic_map.map_knowns, "\n", "confidence: ", relic_map.map_confidence, "\n", "\n", "\n")
relic_map.step([[0,4],[3,3]], 0)
print("poss: ", relic_map.map_possibles, "\n", "knowns: ", relic_map.map_knowns, "\n", "confidence: ", relic_map.map_confidence, "\n", "\n", "\n")
relic_map.step([[1,4],[3,4]], 1)
print("poss: ", relic_map.map_possibles, "\n", "knowns: ", relic_map.map_knowns, "\n", "confidence: ", relic_map.map_confidence, "\n", "\n", "\n")
relic_map.step([[2,4],[3,4]], 1)
print("poss: ", relic_map.map_possibles, "\n", "knowns: ", relic_map.map_knowns, "\n", "confidence: ", relic_map.map_confidence, "\n", "\n", "\n")
relic_map.step([[3,4],[3,4]], 1)
print("poss: ", relic_map.map_possibles, "\n", "knowns: ", relic_map.map_knowns, "\n", "confidence: ", relic_map.map_confidence, "\n", "\n", "\n")
relic_map.step([[3,5],[3,4]], 2)
print("poss: ", relic_map.map_possibles, "\n", "knowns: ", relic_map.map_knowns, "\n", "confidence: ", relic_map.map_confidence, "\n", "\n", "\n")
relic_map.step([[3,4],[3,4]], 2)
print("poss: ", relic_map.map_possibles, "\n", "knowns: ", relic_map.map_knowns, "\n", "confidence: ", relic_map.map_confidence, "\n", "\n", "\n")
relic_map.step([[3,3],[3,4]], 1)
print("poss: ", relic_map.map_possibles, "\n", "knowns: ", relic_map.map_knowns, "\n", "confidence: ", relic_map.map_confidence, "\n", "\n", "\n")
relic_map.step([[3,2],[3,4]], 1)
print("poss: ", relic_map.map_possibles, "\n", "knowns: ", relic_map.map_knowns, "\n", "confidence: ", relic_map.map_confidence, "\n", "\n", "\n")
relic_map.step([[4,2],[3,4]], 1)
print("poss: ", relic_map.map_possibles, "\n", "knowns: ", relic_map.map_knowns, "\n", "confidence: ", relic_map.map_confidence, "\n", "\n", "\n")
relic_map.step([[4,3],[3,4]], 2)
print("poss: ", relic_map.map_possibles, "\n", "knowns: ", relic_map.map_knowns, "\n", "confidence: ", relic_map.map_confidence, "\n", "\n", "\n")

In [None]:
a = np.array([[1,2],[3,2]])
print(list(a))

In [None]:
class RelicMap():
    '''
    Relic map keeps track of locations of relic positions, known fragments, disproven fragments and possible fragments.
    It also stores the current status for each unit in relation to it's position and fragment locations
    map: 24 x 24 game map
        -1 = unknown
        0 = disproven fragment
        1 = possible fragment
        2 = known fragment
        3 = known and occupied
    '''
    def __init__(self, n_units):
        self.map_knowns = np.zeros((24,24))
        self.map_possibles = np.zeros((24,24))
        self.map_confidence = np.zeros((24,24))
        self.unit_status = np.zeros((n_units))

    def reset(self):
        pass
        #self.map[self.map==3] = 2
        #poss = knowns = np.transpose((self.map_confidence<=0.75).nonzero())
        #poss2 = knowns = np.transpose((self.map_confidence>=0.25).nonzero())
        #for p in poss:
        #    if p.tolist() in poss2.tolist():
        #        self.map_visited[p] = 0
                
    def new_relic(self, pos):
        #patch = self.map[pos[0]-2:pos[0]+3,pos[1]-2:pos[1]+3]
        #patch[patch==-1] = 1
        self.map_possibles[pos[0]-2:pos[0]+3,pos[1]-2:pos[1]+3] = 1
        self.map_possibles[abs(pos[1]-23)-2:abs(pos[1]-23)+3,abs(pos[0]-23)-2:abs(pos[0]-23)+3] = 1
        
        #self.map_confidence[pos[0]-2:pos[0]+3,pos[1]-2:pos[1]+3] = 9/25
        #self.map_confidence[abs(pos[1]-23)-2:abs(pos[1]-23)+3,abs(pos[0]-23)-2:abs(pos[0]-23)+3] = 9/25

    def get_fragments(self):
        knowns = np.transpose((self.map_knowns==1).nonzero())
        return list(knowns)

    def get_possibles(self):
        poss = np.transpose((self.map_possibles==1).nonzero())
        return list(poss)

    def move_away(self, pos):
        moves = [1,2,3,4]
        options = np.array([[pos[0],pos[1]-1],[pos[0]+1,pos[1]],[pos[0],pos[1]+1],[pos[0]-1,pos[1]]])
        for ii, option in enumerate(options):
            if np.max(option)>23 or np.min(option)<0:
                continue
            if self.map_knowns[*option]==1:
                return moves[ii]
            if self.map_possibles[*option]==0 and self.map_knowns[*option]==0:
                return moves[ii]
        return np.random.randint(1,5)
        
    def step(self, unit_positions, increase):
        S = []
        F = []
        ones = 0
        rest = []
        check_knowns = self.map_knowns.copy()
        check_possibles = self.map_possibles.copy()
        for unit in unit_positions:
            if check_knowns[*unit]==1:
                ones += 1
                check_knowns[*unit]=0
            if check_possibles[*unit]==1:
                check_possibles[*unit]=1
                S.append(unit)
        r1 = increase-ones
        r2 = 0
        c_sum = 0
        if r1<=0:
            for unit in S:
                self.map_possibles[*unit]=0
        else:
            for unit in S:
                self.map_confidence[*unit]=max(self.map_confidence[*unit],r1/len(S))
            '''for unit in S:
                r2 += self.map_confidence[*unit]
                if self.map_confidence[*unit]==0:
                    F.append(unit)
            c_sum += r2
            for unit in F:
                self.map_confidence[*unit] = (r1-r2)/len(F)
                c_sum += (r1-r2)/len(F)'''
            for unit in S:
                #self.map_confidence[*unit] = self.map_confidence[*unit]*(r1/c_sum)
                if self.map_confidence[*unit]==0:
                    self.map_possibles[*unit]=0
                if self.map_confidence[*unit]==1:
                    self.map_possibles[*unit]=0
                    self.map_knowns[*unit]=1
                    
            
            
            
                
        

In [None]:
#%%writefile agent/agent.py
from my_agent.lux.utils import direction_to, direction_to_change
#from agent.maps import RelicMap
import numpy as np
import random


class Agent3():
    def __init__(self, player: str, env_cfg) -> None:
        self.player = player
        self.opp_player = "player_1" if self.player == "player_0" else "player_0"
        self.team_id = 0 if self.player == "player_0" else 1
        self.opp_team_id = 1 if self.team_id == 0 else 0
        np.random.seed(0)
        self.env_cfg = env_cfg
        if self.player=="player_0":
            self.start_pos = [0,0]
            self.pnum = 0
        else:
            self.start_pos = [self.env_cfg["map_width"], self.env_cfg["map_height"]]
            self.pnum = 1
        self.unit_explore_locations = dict()
        self.relic_node_positions = []
        self.discovered_relic_nodes_ids = set()
        self.n_units = self.env_cfg["max_units"]
        self.relic_map = RelicMap(self.n_units)
        self.range = self.env_cfg["unit_sensor_range"]
        self.width = self.env_cfg["map_width"]
        self.height = self.env_cfg["map_height"]
        self.explore_targets = [(self.range+1, self.height-self.range-1), 
                                (self.width-self.range-1, self.range+1), 
                                (self.width-self.start_pos[0]+self.range*(-1)**self.pnum, self.height-self.start_pos[0]+self.range*(-1)**self.pnum)
                               ]
        self.relic_targets = []
        self.explore_targets = []
        self.fragment_locations = []
        
        self.unit_has_target = -np.ones((self.n_units)) # -1=no target; 0=explore target; 1=relic target; 2=on relic
        self.unit_targets = dict(zip(range(0,self.n_units), np.zeros((self.n_units,2))))
        self.prev_points = 0
        self.prev_points_increase = 0
        self.prev_actions = None


    
    def get_moves(self, obs, unit_id, unit_pos):
        prev_pos = [unit_pos[0] - direction_to_change(self.prev_actions[unit_id][0])[0], unit_pos[1] - direction_to_change(self.prev_actions[unit_id][0])[1]]
        new_pos = [[unit_pos[0], unit_pos[1]-1],
                  [unit_pos[0]+1, unit_pos[1]],
                  [unit_pos[0], unit_pos[1]+1],
                  [unit_pos[0]-1, unit_pos[1]]]
        moves = [0]
        for ii, pos in enumerate(new_pos):
            if pos[0]<0 or pos[1]<0 or pos[0]>=self.width or pos[1]>=self.height or (pos[0]==prev_pos[0] and pos[1]==prev_pos[1]) or obs["map_features"]["tile_type"][pos[0], pos[1]]==2 :
            #if pos[0]<0 or pos[1]<0 or pos[0]>23 or pos[1]>23 or obs["map_features"]["tile_type"][pos[0], pos[1]]==2:
                pass
            else:
                moves.append(direction_to(unit_pos, pos))
        #print(moves)
        return moves
        
    # moves around asteroids
    def move_obstacle_avoid(self, obs, unit_id, unit_pos, direction):
        moves = self.get_moves(obs, unit_id, unit_pos)
        if direction in moves:
            return direction
        elif moves:
            return random.choice(moves)
        else:
            return 0
            
    def relic_to_targets(self, pos):
        targets = []
        for i in range(-2,3,1):
            for j in range(-2,3,1):
                new_target = np.array([pos[0]+i, pos[1]+j])
                mirrored_target = np.abs(new_target-np.array([self.width, self.height]))
                targets.append(new_target)
                #targets.append(mirrored_target)
        return targets

    def reset(self):
        self.relic_map.reset()
        self.explore_targets = []
        self.unit_has_target = -np.ones((self.n_units)) # -1=no target; 0=explore target; 1=relic target; 2=on relic, 3=known fragment
        self.unit_targets = dict(zip(range(0,self.n_units), np.zeros((self.n_units,2))))
        self.prev_points = 0
        self.prev_points_increase = 0
        self.prev_actions = np.zeros((self.env_cfg["max_units"], 3), dtype=int)
        self.fragment_locations = self.relic_map.get_fragments()
        #if self.fragment_locations:
        #    frag_dist = np.sum(np.abs(np.array(self.start_pos) - np.array(self.fragment_locations)), axis=1)
        #    print(self.fragment_locations)
        #    print(frag_dist)
        #    self.fragment_locations = list(self.fragment_locations[np.argsort(frag_dist)])
        #print(self.fragment_locations)
        self.possible_locations = list(self.relic_map.get_possibles())
        free_frags = len(self.fragment_locations)
        free_pos = len(self.possible_locations)
        # TODO target closet first
        for unit_id in range (self.n_units):
            if free_frags>0:
                self.unit_has_target[unit_id] = 3
                self.unit_targets[unit_id] = self.fragment_locations[len(self.fragment_locations)-free_frags]
                free_frags -= 1
            elif free_pos>0:
                self.unit_has_target[unit_id] = 1
                self.unit_targets[unit_id] = self.possible_locations[len(self.possible_locations)-free_pos]
                free_pos -=1
        
    
    def act(self, step: int, obs, remainingOverageTime: int = 60):
        """implement this function to decide what actions to send to each available unit. 
        
        step is the current timestep number of the game starting from 0 going up to max_steps_in_match * match_count_per_episode - 1.
        """
        #print("Step: ", step)
        if step in [102,203,304,405]:
            #print("Step: ", step, np.round(self.relic_map.map_possibles.T,2), np.round(self.relic_map.map_knowns.T,2))
            self.reset()
        unit_mask = np.array(obs["units_mask"][self.team_id]) # shape (max_units, )
        unit_positions = np.array(obs["units"]["position"][self.team_id]) # shape (max_units, 2)
        unit_energys = np.array(obs["units"]["energy"][self.team_id]) # shape (max_units, 1)
        observed_relic_node_positions = np.array(obs["relic_nodes"]) # shape (max_relic_nodes, 2)
        observed_relic_nodes_mask = np.array(obs["relic_nodes_mask"]) # shape (max_relic_nodes, )
        team_points = np.array(obs["team_points"]) # points of each team, team_points[self.team_id] is the points of the your team
        increase = team_points[self.team_id]-self.prev_points
        # ids of units you can control at this timestep
        available_unit_ids = np.where(unit_mask)[0]
        actions = np.zeros((self.env_cfg["max_units"], 3), dtype=int)

        #print(obs)
        # visible relic nodes
        visible_relic_node_ids = set(np.where(observed_relic_nodes_mask)[0])
        #print(observed_relic_nodes_mask)
        # save any new relic nodes that we discover for the rest of the game.
        for id in visible_relic_node_ids:
            if id not in self.discovered_relic_nodes_ids:
                # explore units switch to relic collection
                self.relic_map.new_relic(observed_relic_node_positions[id])
                self.unit_has_target[self.unit_has_target==0]=-1
                self.discovered_relic_nodes_ids.add(id)
                self.relic_node_positions.append(observed_relic_node_positions[id])
                self.relic_targets.extend(self.relic_to_targets(observed_relic_node_positions[id]))
                # remove duplicates from relic targets
                self.relic_targets = list({array.tobytes(): array for array in self.relic_targets}.values())
        self.relic_map.step(unit_positions, increase)
        #print("Step: ", step, np.round(self.relic_map.map_possibles.T,2), "\n", np.round(self.relic_map.map_knowns.T,2), np.round(self.relic_map.map_confidence.T,2), "\n", )
        #print("Step: ", step, np.round(self.relic_map.map_confidence.T,2))
        #print(np.round(self.relic_map.map_confidence.T,2))
        # unit ids range from 0 to max_units - 1
        
        poss = self.relic_map.get_possibles()
        knowns = self.relic_map.get_fragments()

        # set first unit in list as on known fragment
        for known in knowns:
            for ii, unit_id in enumerate(available_unit_ids):
                unit_pos = unit_positions[unit_id]
                if unit_pos.tolist()==known.tolist():
                    self.unit_targets[unit_id] = unit_pos
                    self.unit_has_target[unit_id] = 2
                    break
            
        for ii, unit_id in enumerate(available_unit_ids):
            unit_pos = unit_positions[unit_id]
            direction = None
            # remove target if not possible
            if self.relic_map.map_possibles[*unit_pos]==0 and self.unit_has_target[unit_id]==1:
                self.unit_has_target[unit_id] = -1
                self.unit_targets[unit_id] = np.array([-1,-1])
                
            
            # remove target if it is already targeted by other unit
            if self.unit_targets[unit_id].tolist() in [a.tolist() for a in list(self.unit_targets.values())[:ii]]:
                #print("collision")
                self.unit_has_target[unit_id] = -1
                self.unit_targets[unit_id] = np.array([-1,-1])
                
            

            # if one possible, leave with probability 1-confidence
            if self.relic_map.map_possibles[*unit_pos]==1:
                #print("on possible")
                self.unit_targets[unit_id] = unit_pos
                self.unit_has_target[unit_id] = 1
                draw = np.random.binomial(1,1-np.clip(self.relic_map.map_confidence[*unit_pos],0,1),1)
                #draw = np.random.binomial(1,1-np.clip(self.relic_map.map_confidence[*unit_pos],0,1),1)
                if draw:
                    #print("move away")
                    direction = self.relic_map.move_away(unit_pos)

            # if on target and it's neither known nor possible
            if list(unit_pos)==list(self.unit_targets[unit_id]) and self.relic_map.map_possibles[*unit_pos]==0 and self.relic_map.map_knowns[*unit_pos]==0:
                #print("on target and it's nothing")
                self.unit_has_target[unit_id] = -1
            
            if self.unit_has_target[unit_id]==-1:
                # set target of unit to relic tile
                if poss:
                    rand = np.random.randint(0,len(poss)) ### closest relic target not random
                    dist = np.sum(np.abs(np.array(poss)-unit_pos),axis=1)
                    target = poss.pop(np.argmin(dist))
                    self.unit_has_target[unit_id] = 1
                
                # every 20 steps or if a unit doesn't have an assigned location to explore
                else:
                    if step % 20 == 0 or unit_id not in self.unit_explore_locations:
                        if self.explore_targets:
                            target = self.explore_targets.pop(0)
                        else:
                            # pick a random location on the map for the unit to explore
                            rand_loc = np.array([np.random.randint(2, self.env_cfg["map_width"])-2, np.random.randint(2, self.env_cfg["map_height"])-2])
                            target = rand_loc
                            #print(target)
                        self.unit_has_target[unit_id] = 0
                
                self.unit_targets[unit_id] = target
            #print(self.unit_targets[unit_id])
            if not direction:
                direction = self.move_obstacle_avoid(obs, unit_id, unit_pos, direction_to(unit_pos, self.unit_targets[unit_id]))
            
            #print(unit_id, self.unit_has_target[unit_id], self.unit_targets[unit_id], unit_positions[unit_id])
            actions[unit_id] = [direction, 0, 0]
        # only let one unit at a time check tile
        discover_flag = 0
        for ii, unit_id in enumerate(available_unit_ids):
            unit_pos = unit_positions[unit_id]
            if self.relic_map.map_possibles[unit_pos[0]+direction_to_change(actions[unit_id,0])[0],unit_pos[1]+direction_to_change(actions[unit_id,0])[1]]==1:
                if discover_flag:
                    actions[unit_id]=[0,0,0]
                else:
                    discover_flag=1
        #print(self.unit_has_target, "\n", self.unit_targets, "\n",)
        self.relic_map.map_occupied = np.zeros((24,24))
        self.prev_points = team_points[self.team_id]
        self.prev_points_increase = increase
        self.prev_actions = actions
        return actions

In [None]:
n = 50
evaluate_agents(Agent2, Agent3, seed=42, games_to_play=n, render=False)

In [None]:
a = {"a" : 2, "b" : 3}
print(list(a.values()))

In [None]:
!cd agent && tar -czf submission.tar.gz *
!mv agent/submission.tar.gz .

In [64]:
!luxai-s3 base_agent/main.py my_agent/main.py --seed 101 -o replay.html

Time Elapsed:  7.265054225921631
Rewards:  {'player_0': array(0, dtype=int32), 'player_1': array(5, dtype=int32)}
