In [329]:
import aocd
import dataclasses
import numpy as np
import enum

real_data = aocd.get_data(day=16, year=2022)
test_data = """Valve AA has flow rate=0; tunnels lead to valves DD, II, BB
Valve BB has flow rate=13; tunnels lead to valves CC, AA
Valve CC has flow rate=2; tunnels lead to valves DD, BB
Valve DD has flow rate=20; tunnels lead to valves CC, AA, EE
Valve EE has flow rate=3; tunnels lead to valves FF, DD
Valve FF has flow rate=0; tunnels lead to valves EE, GG
Valve GG has flow rate=0; tunnels lead to valves FF, HH
Valve HH has flow rate=22; tunnel leads to valve GG
Valve II has flow rate=0; tunnels lead to valves AA, JJ
Valve JJ has flow rate=21; tunnel leads to valve II"""

In [351]:
from typing import Sequence, Union, Tuple, Type
import copy
import itertools
import sys
import random

@dataclasses.dataclass
class Node:
    """ Defines a node.
    
    Args:
        name: name of the node.
        rate: the flow rate.
        connections: the connected notes.
        
    Attributes:
        dist: distance to the other nodes. 
    """
    name: str
    rate: int
    connections: Sequence[str]
        
    def __post_init__(self):
        self.dist = {}
        for connection in self.connections:
            self.dist[connection] = 1

@dataclasses.dataclass
class Graph:
    """Defines a graph"""
    node_list: list
        
    def __post_init__(self):
        self.name_to_node = {}
        self.critical_node_names = []
        for node in self.node_list:
            self.name_to_node[node.name] = node
            if node.rate > 0:
                self.critical_node_names.append(node.name)
                
    def get(self, name):
        return self.name_to_node[name]
        
    def min_dist(
        self, 
        start_node_name: str, 
        end_node_name: str, 
        dist_limit: int,
    ) -> int:
        """Finds the minimum distance to the other node."""
        
        start_node = self.get(start_node_name)
        end_node = self.get(end_node_name)
        
        if start_node_name == end_node_name:
            return 0
        
        if end_node_name in start_node.dist:
            return start_node.dist[end_node.name]
        
        min_distance = np.Inf
        for child_name in list(start_node.dist):
            start_child_dist = start_node.dist[child_name]
            if start_child_dist >= dist_limit:
                continue
                
            child_end_dist = self.min_dist(child_name, end_node_name, dist_limit - start_child_dist)
            child_node = self.get(child_name)
            if child_end_dist != np.Inf:
                child_node.dist[end_node.name] = child_end_dist
            
            total_distance = start_child_dist + child_end_dist
            if total_distance < min_distance:
                min_distance = total_distance
        return min_distance
    
    def simulate_route(
        self,
        node_order: Sequence[str], 
        count_down: int,
        start_node: str = "AA",
        debug: bool = False,
    ) -> int:
        """Simulates total steam generated."""
        
        time = 0
        total_pressure = 0
        total_rate = 0
        travel_time = 0
        next_rate_increase = 0
        current_node = start_node
        node_order = copy.deepcopy(node_order)
        while time <= count_down:
            total_pressure += total_rate
            
            if debug:
                print(f"t={time}, p={total_pressure}, rate={total_rate}, node={current_node}")
            
            if travel_time > 0:
                travel_time -= 1
                time += 1
                continue

            total_rate += next_rate_increase
            next_rate_increase = 0
                
            if len(node_order) > 0:
                new_node = node_order.pop(0)
                next_rate_increase = self.get(new_node).rate
                travel_time = self.min_dist(current_node, new_node, 10)
                current_node = new_node
                
            time += 1
        return total_pressure
    
@dataclasses.dataclass
class Animal:
    gene: Sequence[str]
    graph: Graph
    mutations: int
        
    def __post_init__(self):
        for _ in range(self.mutations):
            self.gene = mutate(self.gene)
        self.fitness = self.graph.simulate_route(self.gene, 30)

            
def mutate(seq):
    length = len(seq)
    seq = copy.deepcopy(seq)
    pos0 = random.randint(0, length - 1)
    pos1 = random.randint(0, length - 1)
    seq[pos0], seq[pos1] = seq[pos1], seq[pos0]
    return seq

def reproduce(parent: Sequence[str]) -> Animal:
    return Animal(parent.gene, parent.graph, parent.mutations)
    
@dataclasses.dataclass
class GeneticAlgorithm:
    """Applies genertic algorithm to optimize."""
    graph: Graph
    init_sequence: Sequence[str]
    steps: int
    pop_size: int
    perturbation: int

    def birth(self, pop_size, gene):
        population = []
        for i in range(pop_size):
            animal = Animal(gene, self.graph, self.perturbation)
            population.append(animal)
        return population
    
    def start_evolution(self):
        population = self.birth(self.pop_size, self.init_sequence)
        best_animal = None
        for i in range(self.steps):
            population.sort(key=lambda x: x.fitness, reverse=True)
            population = population[:self.pop_size]
            best_animal = population[0]
            if i % 1 == 0: print(f"Step {i}: Best={best_animal.fitness}")
            new_generation = []
            # The fittest animal produce the most.
            for idx, animal in enumerate(population[:self.pop_size]):
                for _ in range(self.pop_size - idx):
                    child = reproduce(animal)
                    new_generation.append(child)
            population = new_generation
        population.sort(key=lambda x: x.fitness, reverse=True)
        return population[0]

            
@dataclasses.dataclass
class SolverA:
    """
    A solver instance.
    
    args:
        raw_data: the raw input data.
    """
    raw_data: str

    def __post_init__(self):
        self.lines = self.raw_data.splitlines()
        
    def debug(self) -> int:
        """Finds the answer.
        
        Returns:
            The answer.
        """
        nodes = []
        for line in self.lines:
            a, b = line.split("; ")
            name = a.split(" ")[1]
            rate = int(a.split("rate=")[-1])
            if "to valves " in b:
                connections = b.split("to valves ")[-1].split(", ")
            else:
                connections = b.split("to valve ")[-1].split(", ")
            nodes.append(Node(name, rate, connections))
        return Graph(nodes)
    
    def answer(self) -> int:
        nodes = []
        for line in self.lines:
            a, b = line.split("; ")
            name = a.split(" ")[1]
            rate = int(a.split("rate=")[-1])
            if "to valves " in b:
                connections = b.split("to valves ")[-1].split(", ")
            else:
                connections = b.split("to valve ")[-1].split(", ")
            nodes.append(Node(name, rate, connections))
            
        nodes.sort(key=lambda x: x.rate, reverse=True)
        graph = Graph(nodes)
            
        critical_node_names = graph.critical_node_names

        ga = GeneticAlgorithm(
            graph=graph,
            init_sequence=critical_node_names,
            steps=8,
            pop_size=200,
            perturbation=2,
        )

        fittest = ga.start_evolution()
        return fittest.fitness

In [352]:
SolverA(test_data).answer()

Step 0: Best=1649
Step 1: Best=1651
Step 2: Best=1651
Step 3: Best=1651
Step 4: Best=1651
Step 5: Best=1651
Step 6: Best=1651
Step 7: Best=1651


1651

In [354]:
SolverA(real_data).answer()

Step 0: Best=1335
Step 1: Best=1444
Step 2: Best=1579
Step 3: Best=1738
Step 4: Best=1767
Step 5: Best=1767
Step 6: Best=1767
Step 7: Best=1767


1767

In [400]:
from typing import Sequence, Union, Tuple, Type
import copy
import itertools
import sys
import random

@dataclasses.dataclass
class Node:
    """ Defines a node.
    
    Args:
        name: name of the node.
        rate: the flow rate.
        connections: the connected notes.
        
    Attributes:
        dist: distance to the other nodes. 
    """
    name: str
    rate: int
    connections: Sequence[str]
        
    def __post_init__(self):
        self.dist = {}
        for connection in self.connections:
            self.dist[connection] = 1

@dataclasses.dataclass
class Graph:
    """Defines a graph"""
    node_list: list
        
    def __post_init__(self):
        self.name_to_node = {}
        self.critical_node_names = []
        for node in self.node_list:
            self.name_to_node[node.name] = node
            if node.rate > 0:
                self.critical_node_names.append(node.name)
                
    def get(self, name):
        return self.name_to_node[name]
        
    def min_dist(
        self, 
        start_node_name: str, 
        end_node_name: str, 
        dist_limit: int,
    ) -> int:
        """Finds the minimum distance to the other node."""
        
        start_node = self.get(start_node_name)
        end_node = self.get(end_node_name)
        
        if start_node_name == end_node_name:
            return 0
        
        if end_node_name in start_node.dist:
            return start_node.dist[end_node.name]
        
        min_distance = np.Inf
        for child_name in list(start_node.dist):
            start_child_dist = start_node.dist[child_name]
            if start_child_dist >= dist_limit:
                continue
                
            child_end_dist = self.min_dist(child_name, end_node_name, dist_limit - start_child_dist)
            child_node = self.get(child_name)
            if child_end_dist != np.Inf:
                child_node.dist[end_node.name] = child_end_dist
            
            total_distance = start_child_dist + child_end_dist
            if total_distance < min_distance:
                min_distance = total_distance
        return min_distance
    
    def simulate_route(
        self,
        node_order: Sequence[str], 
        count_down: int,
        start_node: str = "AA",
        debug: bool = False,
    ) -> int:
        """Simulates total steam generated."""
        
        time = 0
        total_pressure = 0
        total_rate = 0
        next_rate_increase = 0
        current_node = start_node
        node_order = copy.deepcopy(node_order)
        me = Worker(0, start_node, 0)
        elephant = Worker(0, start_node, 0)
        workers = [me, elephant]
        
        while time <= count_down:
            total_pressure += total_rate
            
            if debug:
                print(f"t={time}, p={total_pressure}, rate={total_rate}",
                      f"node={workers[0].current_node, workers[1].current_node}")
            
            avaliable_workers = []
            for worker in workers:
                if worker.travel_time > 0:
                    worker.travel_time -= 1
                else:
                    avaliable_workers.append(worker)
            
            for worker in avaliable_workers:
                total_rate += worker.next_rate_increase
                worker.next_rate_increase = 0
                
            for worker in avaliable_workers:
                if len(node_order) > 0:
                    if worker is me:
                        new_node = node_order.pop(0)
                    else:
                        new_node = node_order.pop(-1)
                    worker.next_rate_increase = self.get(new_node).rate
                    worker.travel_time = self.min_dist(worker.current_node, new_node, 10)
                    worker.current_node = new_node
                
            time += 1
        return total_pressure

@dataclasses.dataclass
class Worker:
    travel_time: int
    current_node: str
    next_rate_increase: int
    
@dataclasses.dataclass
class Animal:
    gene: Sequence[str]
    graph: Graph
    mutations: int
        
    def __post_init__(self):
        for _ in range(self.mutations):
            self.gene = mutate(self.gene)
        self.fitness = self.graph.simulate_route(self.gene, 26)

            
def mutate(seq):
    length = len(seq)
    seq = copy.deepcopy(seq)
    pos0 = random.randint(0, length - 1)
    pos1 = random.randint(0, length - 1)
    seq[pos0], seq[pos1] = seq[pos1], seq[pos0]
    return seq

def reproduce(parent: Sequence[str]) -> Animal:
    return Animal(parent.gene, parent.graph, parent.mutations)
    
@dataclasses.dataclass
class GeneticAlgorithm:
    """Applies genertic algorithm to optimize."""
    graph: Graph
    init_sequence: Sequence[str]
    steps: int
    pop_size: int
    perturbation: int

    def birth(self, pop_size, gene):
        population = []
        for i in range(pop_size):
            animal = Animal(gene, self.graph, self.perturbation)
            population.append(animal)
        return population
    
    def start_evolution(self):
        population = self.birth(self.pop_size, self.init_sequence)
        best_animal = None
        for i in range(self.steps):
            population.sort(key=lambda x: x.fitness, reverse=True)
            population = population[:self.pop_size]
            best_animal = population[0]
            if i % 1 == 0: print(f"Step {i}: Best={best_animal.fitness}")
            new_generation = []
            # The fittest animal produce the most.
            for idx, animal in enumerate(population[:self.pop_size]):
                for _ in range(self.pop_size - idx):
                    child = reproduce(animal)
                    new_generation.append(child)
            population = new_generation
        population.sort(key=lambda x: x.fitness, reverse=True)
        return population[0]

            
@dataclasses.dataclass
class SolverB:
    """
    A solver instance.
    
    args:
        raw_data: the raw input data.
    """
    raw_data: str

    def __post_init__(self):
        self.lines = self.raw_data.splitlines()
        
    def debug(self) -> int:
        """Finds the answer.
        
        Returns:
            The answer.
        """
        nodes = []
        for line in self.lines:
            a, b = line.split("; ")
            name = a.split(" ")[1]
            rate = int(a.split("rate=")[-1])
            if "to valves " in b:
                connections = b.split("to valves ")[-1].split(", ")
            else:
                connections = b.split("to valve ")[-1].split(", ")
            nodes.append(Node(name, rate, connections))
        return Graph(nodes)
    
    def answer(self) -> int:
        nodes = []
        for line in self.lines:
            a, b = line.split("; ")
            name = a.split(" ")[1]
            rate = int(a.split("rate=")[-1])
            if "to valves " in b:
                connections = b.split("to valves ")[-1].split(", ")
            else:
                connections = b.split("to valve ")[-1].split(", ")
            nodes.append(Node(name, rate, connections))
            
        nodes.sort(key=lambda x: x.rate, reverse=True)
        graph = Graph(nodes)
            
        critical_node_names = graph.critical_node_names

        ga = GeneticAlgorithm(
            graph=graph,
            init_sequence=critical_node_names,
            steps=10,
            pop_size=200,
            perturbation=2,
        )

        fittest = ga.start_evolution()
        return fittest.fitness

In [398]:
SolverB(test_data).debug().simulate_route(["JJ", "BB", "CC", "EE", "HH", "DD"], 26)

1707

In [402]:
SolverB(test_data).answer()

Step 0: Best=1704
Step 1: Best=1707
Step 2: Best=1707
Step 3: Best=1707
Step 4: Best=1707
Step 5: Best=1707
Step 6: Best=1707
Step 7: Best=1707
Step 8: Best=1707
Step 9: Best=1707


1707

In [401]:
SolverB(real_data).answer()

Step 0: Best=1583
Step 1: Best=2036
Step 2: Best=2151
Step 3: Best=2301
Step 4: Best=2427
Step 5: Best=2484
Step 6: Best=2522
Step 7: Best=2528
Step 8: Best=2528
Step 9: Best=2528


2528