# Day 23

https://adventofcode.com/2021/day/23

Initially I tried Q-learning for this (at the end of the notebook), which was able to learn a decent strategy but not find the best one. So I built a network with a node for each possible state of the game, and an edge between nodes i and j if there is a valid move that goes from state i to state j. The edges are weighted by the energy cost of the moves. The Dijkstra algorithm in networkx then finds the series of moves ending at the winning state with lowest energy cost.

The problem imposes three restrictions on moves that significantly reduce the number of possible states. I added two further conditions:
* An amphipod cannot leave its home unless there is an incorrect amphipod below it. 
* When returning home, an amphipod must go to the lowest available space.

Neither of these are required by the rules, but the sequence of moves with lowest energy cost will always meet these two conditions. 

In [1]:
import numpy as np

In [2]:
raw_test = ["#############", 
"#...........#", 
"###B#C#B#D###", 
"  #A#D#C#A#  ", 
"  #########  "]

raw = ["#############",
"#...........#",
"###C#A#B#D###",
"  #C#A#D#B#  ",
"  #########  "]

raw_extended = ["#############",
"#...........#",
"###C#A#B#D###",
"  #D#C#B#A#  ",
"  #D#B#A#C#  ",
"  #C#A#D#B#  ",
"  #########  "]

finished_test = ["#############", 
"#...........#", 
"###A#B#C#D###", 
"  #A#B#C#D#  ", 
"  #########  "]

finished_test_extended = ["#############", 
"#...........#", 
"###A#B#C#D###", 
"  #A#B#C#D#  ", 
"  #A#B#C#D#  ", 
"  #A#B#C#D#  ", 
"  #########  "]

The next cell creates a map of the game with a symbol (in 0-9 or V-T) for each place that an amphipod can stand. Then it creates a dictionary of paths between each pair of places. The dictionary stores the length of the path and any places in between the end points that could block the path if there's an amphipod there.

In [10]:
import networkx as nx

room_map = np.array([['#', '#', '#', '#', '#', '#', '#', '#', '#', '#', '#', '#', '#'],
       ['#', '0', '1', '.', '2', '.', '3', '.', '4', '.', '5', '6', '#'],
       ['#', '#', '#', '7', '#', '8', '#', '9', '#', 'V', '#', '#', '#'],
       [' ', ' ', '#', 'W', '#', 'X', '#', 'Y', '#', 'Z', '#', ' ', ' '],
       [' ', ' ', '#', 'M', '#', 'N', '#', 'O', '#', 'P', '#', ' ', ' '],
       [' ', ' ', '#', 'Q', '#', 'R', '#', 'S', '#', 'T', '#', ' ', ' '],
       [' ', ' ', '#', '#', '#', '#', '#', '#', '#', '#', '#', ' ', ' ']],
      dtype='<U1')

all_locations = "0123456789VWXYZMNOPQRST"
corridor_locations = list("0123456")
room_locations = list("789VWXYZMNPQRST")

def all_indices(array, element):
    w = np.where(array == element)
    return list(zip(w[0], w[1]))

def first_index(array, element):
    return all_indices(array, element)[0]


location_indices = {x: first_index(room_map, x) for x in all_locations}

node_attributes = {}
G = nx.Graph()
for i in range(1, room_map.shape[0] - 1):
    for j in range(1, room_map.shape[1] - 1):
        square = room_map[i, j]
        if square not in ["#", " "]:
            G.add_node((i, j))
            if room_map[i - 1, j] not in ["#", " "]:
                G.add_edge((i - 1, j), (i, j))
            if room_map[i, j - 1] not in ["#", " "]:
                G.add_edge((i, j - 1), (i, j))
            
            if square in all_locations:
                node_attributes[(i, j)] = {"location": square}
                
nx.set_node_attributes(G, node_attributes)

all_paths = {}
for l_0 in all_locations:
    for l_1 in all_locations:
        path = nx.shortest_path(G, first_index(room_map, l_0), first_index(room_map, l_1))[1:]
        path_length = len(path)
        blocking_locations = []
        for p in path:
            if "location" in G.nodes[p]:
                blocking_locations.append(G.nodes[p]["location"])
               
        all_paths[(l_0, l_1)] = {"path": path, "path_length": path_length, "blocking_locations": blocking_locations}
 
all_paths[("Y", "0")]

{'path': [(2, 7), (1, 7), (1, 6), (1, 5), (1, 4), (1, 3), (1, 2), (1, 1)],
 'path_length': 8,
 'blocking_locations': ['9', '3', '2', '1', '0']}

The Amphipod class stores information about each amphipod. The valid_moves method returns places it can move to, implementing the following conditions:

* If in the corridor, it can return to its home
* If at home and has not moved yet, it can move to the corridor
* If at home and has already moved, cannot move again

In [4]:
energy_costs = {"A": -1, "B": -10, "C": -100, "D": -1000}
     
home_locations_ordered = {"A": ["Q", "M", "W", "7"], 
                  "B": ["R", "N", "X", "8"], 
                  "C": ["S", "O", "Y", "9"], 
                  "D": ["T", "P", "Z", "V"]}
    
home_locations = {l: set(home_locations_ordered[l]) for l in "ABCD"}

other_room_locations = dict()
for room in home_locations_ordered.values():
    for square in room:
        other_room_locations[square] = room
    
    
class Amphipod:
    
    def __init__(self, i, letter, position):
        self.id = i
        self.letter = letter
        self.position = position
        self.energy_per_step = energy_costs[self.letter]
        self.home_locations = home_locations[self.letter]
        self.returned_home = False
        
    def filter_home_locations(self, valid_locations):
        self.home_locations = set([x for x in self.home_locations if x in valid_locations])
        
    def move(self, move):
        move_length = all_paths[(self.position, move)]["path_length"]
        self.position = move
        if self.is_at_home():
            self.returned_home = True
        return self.energy_per_step * move_length
        
    def is_at_home(self):
        return self.position in self.home_locations
    
    def is_in_wrong_room(self):
        return self.position in room_locations and not self.is_at_home()
    
    def is_in_corridor(self):
        return self.position in corridor_locations
    
    def valid_moves(self):
        if self.returned_home:
            return []
        elif self.is_in_corridor():
            return [(self.position, b) for b in self.home_locations]
        else:
            return [(self.position, b) for b in corridor_locations]
        

The AmphipodEnvironment keeps track of the game state. The valid_moves method lists the valid_moves from each Amphipod, then filters them to add the following conditions:

* the path must be clear
* if the amphipod is at home, it can only move if there is an incorrect amphipod below it
* cannot return home if there is an incorrect amphipod there
* when returning home, must go to the lowest available space

The state_to_str method concatenates the state to a single string, used as a unique identifier for each state.

In [5]:
class AmphipodEnvironment:
    
    def __init__(self, initial_state):
        self.room_locations = []
        self.won = False
        self.state = np.array([list(l) for l in initial_state], dtype = str)
        
        i = 0
        self.amphipods = []
        for letter in "ABCD":
            idxs = all_indices(self.state, letter)
            for idx in idxs:
                location = room_map[idx]
                self.room_locations.append(location)
                self.amphipods.append(Amphipod(i, letter, location))
                i = i + 1
        
        for a in self.amphipods:
            a.filter_home_locations(self.room_locations)
        
        self.blocked_locations = {x: True for x in self.room_locations}
        self.blocked_locations.update({x: False for x in corridor_locations})
        self.other_room_locations = {
            x: [y for y in other_room_locations[x] if y in self.room_locations] for x in other_room_locations
        }
        self.cost = 0
    
    def check_valid_room_location(self, loc):
        other_room_locations = self.other_room_locations[loc]
        if any([a.position in other_room_locations and a.is_in_wrong_room() for a in self.amphipods]):
            return False
        
        for other_loc in other_room_locations:
            if other_loc == loc:
                return True
            
            elif not any([a.position == other_loc for a in self.amphipods]):
                return False
    
    def state_to_str(self):
        statestr = self.state[1:-1, 1:-1]
        statestr = statestr.reshape(statestr.size)
        return ''.join(statestr)
    
    def valid_moves(self):
        res = []
        for a in self.amphipods:
            valid_moves = a.valid_moves()
            disqualified_moves = []
            
            for move in valid_moves:
                
                # check path is clear
                if any([self.blocked_locations[b] for b in all_paths[move]["blocking_locations"]]):
                    disqualified_moves.append(move)
                    continue
                
                # if at home, check that home has incorrect amphipods
                elif move[1] in corridor_locations:
                    if self.check_valid_room_location(a.position):
                        disqualified_moves.append(move)
                        continue
                
                # if returning home, check that home has no incorrect amphipods, 
                # and that destination is the lowest available point
                elif move[1] in self.room_locations:
                    if not self.check_valid_room_location(move[1]):
                        disqualified_moves.append(move)
                        
            valid_moves = [move for move in valid_moves if move not in disqualified_moves]
            
            res = res + valid_moves
            
        return res
                    
                
    def step(self, move):
        
        a = [a for a in self.amphipods if a.position == move[0]][0]
        p = move[1]
        
        self.state[location_indices[a.position]] = "."
        self.blocked_locations[a.position] = False
        
        self.state[location_indices[p]] = a.letter
        self.blocked_locations[p] = True
        
        reward = a.move(p)
        
        self.cost = self.cost + reward
        
        done = all([a.is_at_home() for a in self.amphipods])
        
        if done:
            self.won = True
        
        return (self.state_to_str(), reward, done, a.returned_home)

In [6]:
import copy

def build_game_graph(G, env, debug = True):
    """Recursive function to build a graph of all possible states in the game, with weighted edges representing
    valid moves between states."""
    valid_moves = env.valid_moves()
    if len(valid_moves) == 0:
        return
    i = env.state_to_str()
    for move in valid_moves:
        if debug:
            print(move)
        new_env = copy.deepcopy(env)
        j, reward, done, _ = new_env.step(move)
        if j not in G.nodes:
            G.add_node(j)
            build_game_graph(G, new_env, debug = False)
            
        G.add_edge(i, j)
        nx.set_edge_attributes(G, {(i, j): {"weight": -reward}})
    return

def shortest_path(G, start, end):
    """Use Dijkstra algorithm to find the shortest weighted path between states 'start' and 'end' on graph G."""
    start_node = AmphipodEnvironment(start).state_to_str()
    end_node = AmphipodEnvironment(end).state_to_str()
    paths = nx.dijkstra_path(G, start_node, end_node)
    path_edges = [G.edges[e] for e in zip(paths[:-1], paths[1:])]
    return sum([x["weight"] for x in path_edges])

In [7]:
env = AmphipodEnvironment(raw)
G = nx.Graph()

G.add_node(env.state_to_str())    
build_game_graph(G, env)

('8', '0')
('8', '1')
('8', '2')
('8', '3')
('8', '4')
('8', '5')
('8', '6')
('9', '0')
('9', '1')
('9', '2')
('9', '3')
('9', '4')
('9', '5')
('9', '6')
('7', '0')
('7', '1')
('7', '2')
('7', '3')
('7', '4')
('7', '5')
('7', '6')
('V', '0')
('V', '1')
('V', '2')
('V', '3')
('V', '4')
('V', '5')
('V', '6')


In [8]:
shortest_path(G, raw, finished_test)

11536

In [12]:
env = AmphipodEnvironment(raw_extended)
G = nx.Graph()

G.add_node(env.state_to_str())

build_game_graph(G, env)

('8', '0')
('8', '1')
('8', '2')
('8', '3')
('8', '4')
('8', '5')
('8', '6')
('9', '0')
('9', '1')
('9', '2')
('9', '3')
('9', '4')
('9', '5')
('9', '6')
('7', '0')
('7', '1')
('7', '2')
('7', '3')
('7', '4')
('7', '5')
('7', '6')
('V', '0')
('V', '1')
('V', '2')
('V', '3')
('V', '4')
('V', '5')
('V', '6')


In [13]:
shortest_path(G, raw_extended, finished_test_extended)

55136

### Q Learning

Below is the Q-learning code. The penalty for each move is the energy cost of the movement. There is a penalty of 50,000 for getting into a situation with no valid moves. There is a reward of 100,000 for winning the game. The algorithm can learn a decent strategy but does not find the best strategy, at least with this number of iterations.

In [8]:
# Hyperparameters
alpha = 0.2
gamma = 0.6
epsilon = 0.2

# Q table
q_table = {}

# For plotting metrics
all_epochs = [] 
all_penalties = []

for i in range(1, 30001):
    env = AmphipodEnvironment(raw_test)
    state = env.state_to_str()
    
    epochs, penalties, reward, = 0, 0, 0
    done = False
    
    while not done:
        
        if state not in q_table:
            q_table[state] = {x: 0 for x in env.valid_moves()}
        
        action_space = {x: q_table[state][x] for x in env.valid_moves()}   
                
        if np.random.uniform(0, 1) < epsilon:
            action = list(action_space.keys())[np.random.randint(len(action_space))] # Explore action space
        else:
            action = list(action_space.keys())[np.argmax(list(action_space.values()))] # Exploit learned values

        next_state, reward, done, returned_home = env.step(action) 
        
        old_value = action_space[action]
        
        if next_state not in q_table:
            q_table[next_state] = {x: 0 for x in env.valid_moves()}
        
        next_valid_moves = {x: q_table[next_state][x] for x in env.valid_moves()}  
        
        if env.won:
            next_max = 0
            reward = reward + 100000
            done = True
        elif len(next_valid_moves) == 0:
            next_max = 0
            reward = reward - 50000
            done = True
        else:
            next_max = np.max(list(next_valid_moves.values()))
        
        new_value = (1 - alpha) * old_value + alpha * (reward + gamma * next_max)
        q_table[state][action] = new_value

        state = next_state
        epochs += 1
        
    if i % 10000 == 0:
        print(f"Episode: {i}")

print("Training finished.\n")

Episode: 10000
Episode: 20000
Episode: 30000
Training finished.



In [9]:
env = AmphipodEnvironment(raw_test)
state = env.state_to_str()
done = False
total_reward = 0

while not done:
    action_space = q_table[state]    
        
    action = list(action_space.keys())[np.argmax(list(action_space.values()))]
    
    state, reward, done, _ = env.step(action) 
    total_reward = total_reward + reward
    print(env.state)
    
print(total_reward)

[['#' '#' '#' '#' '#' '#' '#' '#' '#' '#' '#' '#' '#']
 ['#' '.' 'B' '.' '.' '.' '.' '.' '.' '.' '.' '.' '#']
 ['#' '#' '#' '.' '#' 'C' '#' 'B' '#' 'D' '#' '#' '#']
 [' ' ' ' '#' 'A' '#' 'D' '#' 'C' '#' 'A' '#' ' ' ' ']
 [' ' ' ' '#' '#' '#' '#' '#' '#' '#' '#' '#' ' ' ' ']]
[['#' '#' '#' '#' '#' '#' '#' '#' '#' '#' '#' '#' '#']
 ['#' '.' 'B' '.' '.' '.' '.' '.' '.' '.' '.' 'B' '#']
 ['#' '#' '#' '.' '#' 'C' '#' '.' '#' 'D' '#' '#' '#']
 [' ' ' ' '#' 'A' '#' 'D' '#' 'C' '#' 'A' '#' ' ' ' ']
 [' ' ' ' '#' '#' '#' '#' '#' '#' '#' '#' '#' ' ' ' ']]
[['#' '#' '#' '#' '#' '#' '#' '#' '#' '#' '#' '#' '#']
 ['#' '.' 'B' '.' '.' '.' 'C' '.' '.' '.' '.' 'B' '#']
 ['#' '#' '#' '.' '#' '.' '#' '.' '#' 'D' '#' '#' '#']
 [' ' ' ' '#' 'A' '#' 'D' '#' 'C' '#' 'A' '#' ' ' ' ']
 [' ' ' ' '#' '#' '#' '#' '#' '#' '#' '#' '#' ' ' ' ']]
[['#' '#' '#' '#' '#' '#' '#' '#' '#' '#' '#' '#' '#']
 ['#' '.' 'B' '.' '.' '.' '.' '.' '.' '.' '.' 'B' '#']
 ['#' '#' '#' '.' '#' '.' '#' 'C' '#' 'D' '#' '#' '#']
 [' ' '