In [1]:
from pathlib import Path
import os

yr = 2024
d = 20

inp_path = os.path.join(Path(os.path.abspath("")).parents[2], 
             'Input', '{}'.format(yr), 
             '{}.txt'.format(d))

# inp_path  = 'C:\\Users\\riggi\\Projects\\Advent_Of_Code\\Input\\2024\\20_ex1.txt'

with open(inp_path, 'r') as file:
    inp = file.readlines()

In [2]:
import numpy as np
import heapq
from enum import Enum
from functools import cache
import copy
def format_input(inp):
    '''
    Turn the input maze string into a 2D numpy array
    '''
    from itertools import product

    return (np.array([[(1 if c in '.SE' else np.inf) for c in ''.join(l).strip()] for l in inp]), 
            [(i,j) for (i, j) in product(range(len(inp)), range(len(inp[1])-1)) if inp[i][j]=='S'][0], 
            [(i,j) for (i, j) in product(range(len(inp)), range(len(inp[1])-1)) if inp[i][j]=='E'][0]
            )

In [3]:
formatted_input = format_input(inp)
formatted_input

(array([[inf, inf, inf, ..., inf, inf, inf],
        [inf,  1.,  1., ...,  1.,  1., inf],
        [inf,  1., inf, ..., inf,  1., inf],
        ...,
        [inf,  1., inf, ..., inf,  1., inf],
        [inf,  1.,  1., ...,  1.,  1., inf],
        [inf, inf, inf, ..., inf, inf, inf]]),
 (69, 59),
 (73, 35))

In [4]:
from itertools import product
import numpy as np
from collections import Counter


class Directions(Enum):
    UP = 1
    DOWN = 2
    LEFT = 3
    RIGHT = 4

dir2coord = {Directions.UP: [-1,0],
            Directions.DOWN: [1,0],
            Directions.LEFT: [0,-1],
            Directions.RIGHT: [0,1]}


def turn(dir, left=False):
  d = {Directions.UP: Directions.RIGHT,
       Directions.RIGHT: Directions.DOWN,
       Directions.DOWN: Directions.LEFT,
       Directions.LEFT: Directions.UP}
  if not left:
    return d[dir]
  else:
    return {v:k for k, v in d.items()}[dir]

def arr_to_tuple(arr):
  return tuple([tuple(l) for l in arr])

def tuple_to_arr(arr):
  return np.array([(l) for l in arr])


def manhattan_distance(p1, p2):
  '''
  We use Manhattan Distance from the current location
  to the target as our A* search heuristic

  This simulates a situation where all nodes on a 
  direct path between the current and target nodes
  have an edge cost of 1
  '''
  return np.sum(np.abs(np.array(p1)-np.array(p2)))



class A_Star_Node:
  '''
   A Node which will be utilized for the A* path-finding algorithm
   If different restrictions were placed on the path then we could override the methods in this class
  '''
    
  def __init__(self, loc, direction, num_direction, g, prev, start_loc, end_loc):
    self.loc = loc
    self.direction = direction
    self.num_direction = num_direction
    self.g = g
    self.h = manhattan_distance(loc, end_loc)
    self.f = self.g + self.h
    self.prev = prev
    self.start_loc = start_loc
    self.end_loc = end_loc

  def get_descriptor(self):
    return (self.loc, self.direction, self.num_direction)


  def direction_to_next(self, direction=None):
    if direction is None:
      direction = self.direction
    dloc = dir2coord[direction]
    return tuple(np.add(dloc, list(self.loc)))

  def get_possible_next_descriptors(self):
    '''
    Return the possible next states according to
    the movement specifications

    We break the unique A* state nodes up by:
    (location, direction, --placeholder--)
    '''
      
    # If we are at the end location, then return no new locations
    if self.loc == self.end_loc:
      return []

    possible_nexts = [(self.direction_to_next(d), d, 1)
                      for d in list(Directions)]


    # If one of the nexts is the end then convert it to the end
    return [n if n[0]!=self.end_loc else (n[0], None, None) for n in possible_nexts]
  

  @staticmethod
  def calculate_ng(cur_g, maze_cost, _):
    return cur_g + maze_cost

  def __eq__(self, other):
    if isinstance(other, tuple):
      return self.get_descriptor() == other
    if isinstance(other, A_Star_Node):
      return self.get_descriptor() == other.get_descriptor()
    return None

  def __hash__(self):
    return hash(self.get_descriptor())


class A_Star_Node_Cheater(A_Star_Node):


  def __init__(self, loc, direction, cheat_ind, g, prev, start_loc, end_loc):
    self.loc = loc
    self.direction = direction
    self.cheat_ind = cheat_ind if cheat_ind is not None else 0
    self.g = g
    self.h = manhattan_distance(loc, end_loc)
    self.f = self.g + self.h
    self.prev = prev
    self.start_loc = start_loc
    self.end_loc = end_loc

  def get_descriptor(self):
    return (self.loc, self.direction, self.cheat_ind) # Location, Direction Facing (unused), cheat_indicator, is_cheating

  def get_possible_next_descriptors(self):
    '''
    Return the possible next states according to
    the movement specifications

    We break the unique A* state nodes up by:
    (location, direction, cheat_indicator)
    
    If cheat_indicator == 0 <-- Haven't cheated yet
    If cheat_indicator == 1 <-- Cheated this timestep
    If cheat_indicator == 2 <-- Cheated this and the previous timesteps
    If cheat_indicator > 2 <-- Cheated Twice Already... Can't anymore
    '''
    if self.loc == self.end_loc:
      print("FOUND ONE!")
      return []

    print("WOO: ", self.cheat_ind)
    possible_nexts = []
    if self.cheat_ind == 0:
      possible_nexts.extend([(self.direction_to_next(d), d, 0)
                    for d in list(Directions)])
    possible_nexts.extend([(self.direction_to_next(d), d, self.cheat_ind+1)
                    for d in list(Directions)])
    
    print("POSSIBLE NEXTS: ", possible_nexts)
    # If one of the nexts is the end then convert it to the end
    # return [n if n[0]!=self.end_loc else (n[0], None, None) for n in possible_nexts]
    return possible_nexts

  @staticmethod
  def calculate_ng(cur_g, maze_cost, descriptor):
    if descriptor[2] not in (0,1):
      return super(A_Star_Node_Cheater, A_Star_Node_Cheater).calculate_ng(cur_g, maze_cost, None)
    else:
      return cur_g + 1

  def __hash__(self):
    prevs = []
    cur_prev = self.prev
    while cur_prev is not None:
      prevs.append(cur_prev.loc)
      cur_prev = cur_prev.prev
    print("HASH: ", (self.get_descriptor(), tuple(prevs)))
    return hash((self.get_descriptor(), tuple(prevs)))

@cache
def a_star(start, end, arr, A_Star_Node=A_Star_Node, start_dir=Directions.RIGHT):
  '''
  A* Implementation with a priority queue for the queue nodes

  Also implemented hash tables for fast lookups in queue, and searched lists
  '''

  if isinstance(arr, tuple):
    arr = tuple_to_arr(arr)

  class Heap_Node_Wrapper:
    '''
    Wraps around a heap node so the priority
    queue can order them by their F and H values
    '''
    def __init__(self, node):
      self.node = node

    def __lt__(self, other):
      n = self.node
      other = other.node
      return (n.f, n.h) < (other.f, other.h)

    def __eq__(self, other):
      n = self.node
      other = other.node
      return (n.f, n.h) == (other.f, other.h)

  def in_arr(loc):
    h, w = arr.shape
    return ((0 <= loc[0] and loc[0] < h)
            and (0 <= loc[1] and loc[1] < w))

  s_node = A_Star_Node(start, start_dir, None, arr[start], None, start, end)
  q = [Heap_Node_Wrapper(s_node)]
  qhash = {s_node:s_node} # Mirrors q, just so we can do O(1) existence checks
  searched = {}

  cnt = 0
  while len(q) != 0:
    
    cur = heapq.heappop(q).node
    try:
      del qhash[cur]
    except:
      print((cur.get_descriptor(), None if cur.prev is None else cur.prev.loc))
      raise Exception()
    searched[cur]=True

    for n in [x for x in cur.get_possible_next_descriptors() if x not in searched and in_arr(x[0])]:
        
      # n is just a descriptor so if we see a node in the queue matching that
      # descriptor then we can get a reference to it
      n_node = None
      if n in qhash:
        n_node = qhash[n]


      ng = A_Star_Node.calculate_ng(cur.g, arr[n[0]], n)


      if n_node is None or ng < n_node.g:
        if n_node is None:
          # If we don't have this node in the queue create it
          n_node = A_Star_Node(n[0], n[1], n[2], ng, cur, start, end)
          heapq.heappush(q, Heap_Node_Wrapper(n_node))
          qhash[n_node] = n_node
        else:
          n_node.g = ng
          n_node.prev = cur
    cnt+=1

  # Once we make it to the end node trace back through
  # the path to the beginning
  endns = [s for s in searched if s.loc==end]
  assert(len(endns)==1)
  endn = endns[0]
  path = [endn.get_descriptor()]
  at_beginning = False
  while not at_beginning:
    if endn.prev is not None:
      path.append(endn.prev.get_descriptor())
    endn = endn.prev
    if endn.prev is None:
      at_beginning = True

  while(len(path)>=2 and path[0][0]==end and path[1][0]==end):
    path = path[1:]

  return list(reversed(path))




def print_path(maze, path):
  cost = get_cost(maze, tuple(path))
  if isinstance(maze, tuple):
    maze = tuple_to_arr(maze)
  maze = maze.astype(str)
  maze = np.vectorize(lambda x: '#' if x == 'inf' else '.' if x in ['1.0', '1'] else '#')(maze)
  d2c = {Directions.UP: '^',
         Directions.LEFT: '<',
         Directions.RIGHT: '>',
         Directions.DOWN: 'v'}
  for p in path:
    maze[p[0]] = d2c.get(p[1], '.')
  s = '\n'.join([''.join(l) for l in maze])
  print(s)
  print(f"(Path took {len(path)-1} Steps with a Total Cost of {int(cost)})")


@cache
def get_cost(maze, path):
  '''
  Given a path return, the cost
  of that path
  '''
  if isinstance(path, tuple):
    path = list(path)
  if isinstance(maze, tuple):
    maze = tuple_to_arr(maze)
  c = 0
  path_list = path
  for i, p in enumerate(path_list):
    if i != 0:
      c += maze[p[0]]
  try:
    c = int(c)
  except:
    pass
  return c




def get_timesaves(base_path, maze, n=2):

  def get_possible_offsets(n=3):
    offsets = []
    for i in range(2, n+1):
      cur_offsets = product(range(i+1), range(i+1))
      cur_offsets = [o for o in cur_offsets if sum(o)==i]
      negations = []
      for o in cur_offsets:
        negations.append((o[0],-o[1]))
        negations.append((-o[0],o[1]))
        negations.append((-o[0],-o[1]))
      cur_offsets.extend(negations)
      offsets.extend(list(set(cur_offsets)))
    return list(set(offsets))

  def in_arr(loc, arr):
    h, w = arr.shape
    return ((0 <= loc[0] and loc[0] < h)
            and (0 <= loc[1] and loc[1] < w))

  base_cost = get_cost(maze, tuple(base_path))
  base_path = [bp[0] for bp in base_path]

  @cache
  def base_path_lookup(new_start):
    return base_path.index(new_start)


  offsets = get_possible_offsets(n)
  timesaves = []
  for i in range(len(base_path)-1):
    cur_timesaves = np.full(shape=len(offsets), fill_value=-np.inf, dtype=np.float32)
    for oi, offset in enumerate(offsets):
        new_start = (base_path[i][0]+offset[0], base_path[i][1]+offset[1])
        if in_arr(new_start, tuple_to_arr(maze)) and maze[new_start[0]][new_start[1]] == 1:
          dist_new = i +  (len(base_path) - base_path_lookup(new_start) - 1) + sum(map(abs, offset))
          dist_delta = base_cost - dist_new
          cur_timesaves[oi] = dist_delta
    timesaves.extend([int(ts) for ts in cur_timesaves if ts > 0])
    # timesaves.extend([ts for ts in cur_timesaves.values() if ts > 0])

  cnt = Counter(timesaves)
  return cnt


def count_timesaves_over(maze, path, thresh=100, n=2, multi=False):
  timesaves = get_timesaves(maze, path, n=n)
  return sum({k:v for k, v in timesaves.items() if k >= thresh}.values())

In [32]:
import time

t = time.time()

formatted_input = format_input(inp)
maze, start, end = arr_to_tuple(formatted_input[0]), formatted_input[1], formatted_input[2]
base_path = a_star(start, end, maze, A_Star_Node=A_Star_Node)

print(count_timesaves_over(base_path, maze, thresh=100, n=2))
print(count_timesaves_over(base_path, maze, thresh=100, n=20))

print('\nRUNTIME: ', time.time()-t)

1450
1015247

RUNTIME:  4905.816437482834
