# AOC2022

## Day 16 / Part 2 / Proboscidea Volcanium

Problem Description: https://adventofcode.com/2022/day/16#part2

Input: [Example](aoc2022_day16_example.txt)

In [1]:
%load_ext pycodestyle_magic
%pycodestyle_on

In [2]:
"""Solution for AOC2022, day 16, part 2."""
from dataclasses import dataclass
import itertools
import logging
import re
import sys
import numpy as np
from scipy.sparse.csgraph import dijkstra

LOGGER = logging.getLogger(__name__)

# show/hide debug logs
SHOW_DEBUG_LOG = False
# set input file
INPUT_FILE = "aoc2022_day16_example.txt"

In [3]:
TIMEFRAME = 26

In [4]:
@dataclass
class Valve:
    """Class representing a valve."""
    idx: int
    name: str
    flow_rate: int


class Cave:
    """Class representing a cave with valves and a tunnel network."""
    valves = None
    tunnels = None
    _valve_name_index = None
    _valve_predecessor_matrix = None
    _shortest_path_cache = None
    _shortest_path_len_cache = None
    _best_solutions = 0

    def __init__(self, valves, tunnels):
        self.valves = valves
        self.tunnels = tunnels

        # calculate valve indices
        self._valve_name_index = {}
        for idx, valve_name in enumerate(
            sorted([valve.name for valve in valves.values()])
        ):
            self.valves[valve_name].idx = idx
            self._valve_name_index[valve_name] = idx

        graph = np.zeros((len(valves), len(valves)), dtype=int)
        for src_valve in self.valves.values():
            for trg_valve in map(
                lambda trg_valve_name: self.valves[trg_valve_name],
                tunnels[src_valve.name]
            ):
                graph[src_valve.idx, trg_valve.idx] = 1

        _, self._valve_predecessor_matrix = dijkstra(
            csgraph=graph,
            directed=True,
            unweighted=True,
            # indices=start_idx,    # search all-shortest-paths
            return_predecessors=True
        )

        self._shortest_path_cache = {}
        self._shortest_path_len_cache = {}

    def shortest_path(self, src_valve_name, trg_valve_name):
        """
        Calculate the shortest path between a source valve and a target valve.
        """
        if (src_valve_name, trg_valve_name) not in self._shortest_path_cache:
            start_idx = self._valve_name_index[src_valve_name]
            end_idx = self._valve_name_index[trg_valve_name]
            path = [end_idx]
            idx = end_idx
            while start_idx != self._valve_predecessor_matrix[start_idx, idx]:
                idx = self._valve_predecessor_matrix[start_idx, idx]
                path.append(idx)
            path.append(start_idx)
            self._shortest_path_cache[(src_valve_name, trg_valve_name)] = \
                list(reversed(path))

        return self._shortest_path_cache[(src_valve_name, trg_valve_name)]

    def shortest_path_len(self, src_valve_name, trg_valve_name):
        """
        Calculate the length of the shortest path between a source valve
        and a target valve.
        """
        params = (src_valve_name, trg_valve_name)
        if params not in self._shortest_path_len_cache:
            self._shortest_path_len_cache[params] = len(
                self.shortest_path(src_valve_name, trg_valve_name)
            ) - 1
        return self._shortest_path_len_cache[(src_valve_name, trg_valve_name)]

    def find_most_pressure_release(self, curr_valves, mins_remaining):
        """
        Find the most pressure release possible starting from valves
        curr_valves with min_remaining minutes before the vulcano erupts.
        """
        self._best_solutions = [0] * (TIMEFRAME + 1)
        valves_to_visit = [
            valve.name for valve in self.valves.values() if valve.flow_rate > 0
        ]
        return self._find_most_pressure_release(
            curr_valves, mins_remaining, valves_to_visit, 0
        )

    def _find_most_pressure_release(
        self, curr_valves, mins_remaining, valves_to_visit, solution_so_far
    ):
        """
        Helper function for
        find_most_pressure_release(self, curr_valves, mins_remaining).
        """
        # early termination criterion
        if TIMEFRAME - max(mins_remaining) > 10:
            self._best_solutions[TIMEFRAME - max(mins_remaining)] = max(
                self._best_solutions[TIMEFRAME - max(mins_remaining)],
                solution_so_far
            )
            if (
                solution_so_far <
                self._best_solutions[TIMEFRAME - max(mins_remaining)] * 0.9
            ):
                return 0

        valves_to_visit = valves_to_visit.copy()
        if curr_valves[0].name in valves_to_visit:
            valves_to_visit.pop(valves_to_visit.index(curr_valves[0].name))
        if curr_valves[1].name in valves_to_visit:
            valves_to_visit.pop(valves_to_visit.index(curr_valves[1].name))

        # given mins_remaining > 0, we can at least open the valve
        # without following any tunnel
        pressure_releases = [
            curr_valves[0].flow_rate*(mins_remaining[0]-1)
            if mins_remaining[0] > 0 else 0,
            curr_valves[1].flow_rate*(mins_remaining[1]-1)
            if mins_remaining[1] > 0 else 0
        ]
        if mins_remaining[0] > 0 and mins_remaining[1] > 0:
            pressure_releases.append(
                pressure_releases[0] + pressure_releases[1]
            )

        for next_valves in map(
            lambda valve_names:
                [self.valves[valve_names[0]], self.valves[valve_names[1]]],
            itertools.permutations(valves_to_visit, 2)
        ):
            path_lengths = [
                self.shortest_path_len(
                    curr_valves[0].name, next_valves[0].name
                ),
                self.shortest_path_len(
                    curr_valves[1].name, next_valves[1].name
                ),
            ]
            if (
                path_lengths[0] < mins_remaining[0] or
                path_lengths[1] < mins_remaining[1]
            ):
                # follow tunnel w/o opening the valve
                # relevant only for curr_valves[.].name==AA where flow_rate==0
                # otherwise, we go A->C instead of A->B->C (w/o opening B)
                if (
                    curr_valves[0].flow_rate == 0 and
                    curr_valves[1].flow_rate == 0
                ):
                    LOGGER.debug(
                        "%smove from %s to %s in %s min. "
                        "(w/o opening valve)...",
                        "  " * (1 + (TIMEFRAME - max(mins_remaining))),
                        [valve.name for valve in curr_valves],
                        [valve.name for valve in next_valves],
                        path_lengths
                    )
                    pressure_released = self._find_most_pressure_release(
                        next_valves,
                        [
                            mins_remaining[0]-path_lengths[0],
                            mins_remaining[1]-path_lengths[1]
                        ],
                        valves_to_visit,
                        solution_so_far
                    )
                    pressure_releases.append(pressure_released)
                # follow tunnel w/ opening the valve
                else:
                    LOGGER.debug(
                        "%smove from %s to %s in %s+1 min. "
                        "(w/ opening valve)...",
                        "  " * (1 + (TIMEFRAME - max(mins_remaining))),
                        [valve.name for valve in curr_valves],
                        [valve.name for valve in next_valves],
                        path_lengths
                    )
                    pressure_released = self._find_most_pressure_release(
                        next_valves,
                        [
                            mins_remaining[0]-path_lengths[0]-1,
                            mins_remaining[1]-path_lengths[1]-1
                        ],
                        valves_to_visit,
                        (
                            solution_so_far + pressure_releases[0] +
                            pressure_releases[1]
                        )
                    )
                    pressure_releases.append(
                        pressure_released + pressure_releases[0] +
                        pressure_releases[1]
                    )

        LOGGER.debug(
            "%smax. pressure to release with %s min. and starting from %s "
            "is %s",
            "  " * (1 + (TIMEFRAME - max(mins_remaining))),
            mins_remaining,
            [valve.name for valve in curr_valves],
            max(pressure_releases)
        )

        return max(pressure_releases)

In [5]:
def main():
    """Main function to solve puzzle."""
    valves = {}
    tunnels = {}
    with open(INPUT_FILE, encoding="utf-8") as file_obj:
        for line in [line.rstrip() for line in file_obj.readlines()]:
            src_valve_name, flow_rate, trg_valve_names = re.search(
                r"Valve (\w+) has flow rate=(\d+); " +
                r"tunnel[s]? lead[s]? to valve[s]? ([\w ,]+)",
                line
            ).groups()

            flow_rate = int(flow_rate)
            trg_valve_names = trg_valve_names.split(", ")
            valves[src_valve_name] = Valve(None, src_valve_name, flow_rate)
            tunnels[src_valve_name] = trg_valve_names

    cave = Cave(valves, tunnels)

    LOGGER.debug("traverse tunnels...")
    pressure_released = cave.find_most_pressure_release(
        [valves["AA"], valves["AA"]], [TIMEFRAME, TIMEFRAME]
    )

    LOGGER.debug("")

    print(f"solution: {pressure_released}")

In [6]:
if __name__ == "__main__":
    LOGGER.setLevel(logging.DEBUG if SHOW_DEBUG_LOG else logging.INFO)
    log_formatter = logging.Formatter("%(message)s")
    log_handler = logging.StreamHandler(sys.stdout)
    log_handler.setFormatter(log_formatter)
    LOGGER.addHandler(log_handler)
    main()

solution: 1707
