In [64]:
import csv
import io
import time
from hashlib import sha1
from queue import PriorityQueue

import numpy as np

### Game Class

In [65]:
class PuzzleState:

    def __init__(self, puzzle, car_fuels):
        self.puzzle = puzzle
        self.car_fuels = car_fuels

    @classmethod
    def construct_initial_puzzlestate(cls, puzzle_configuration_line):
        return cls(cls.parse_initial_puzzle(puzzle_configuration_line[0]), cls.parse_initial_car_fuels(puzzle_configuration_line))

    @classmethod
    def construct_puzzlestate_copy(cls, puzzlestate):
        return cls(np.copy(puzzlestate.puzzle), puzzlestate.car_fuels.copy())

    @staticmethod
    def parse_initial_puzzle(puzzle_configuration):
        array_line_input = np.array(list(puzzle_configuration))
        initial_puzzle = np.reshape(array_line_input, (6, 6))
        return initial_puzzle

    @staticmethod
    def parse_initial_car_fuels(puzzle_configuration_line):
        initial_car_fuels = dict.fromkeys(puzzle_configuration_line[0].replace('.', '')[::-1])
        for predefined_car_fuel in puzzle_configuration_line[1:]:
            initial_car_fuels[predefined_car_fuel[0]] = int(predefined_car_fuel[1])
        for car, fuel in initial_car_fuels.items():
            if fuel is None:
                initial_car_fuels[car] = 100
        return initial_car_fuels

    def get_puzzle_configuration_string(self):
        puzzle_configuration = ''
        for char in np.nditer(self.puzzle):
            puzzle_configuration += str(char)
        return puzzle_configuration

    def get_puzzle_configuration_flat_shape(self):
        puzzle_configuration_string = self.get_puzzle_configuration_string()
        puzzle_configuration_flat_shape = ''
        for i in range(6):
            puzzle_configuration_flat_shape += puzzle_configuration_string[i*6:i*6+6] + '\n'
        return puzzle_configuration_flat_shape

    def is_equal_state(self, puzzlestate):
        return np.array_equal(self.puzzle, puzzlestate.puzzle)

    def is_goal_state(self):
        return np.all(self.puzzle[2, np.array([4,5])] == 'A')

    # attempt at optimization by looking for cars by columns right-left up-down
    def get_puzzle_cars(self):
        cars_to_iterate = []
        for element in np.nditer(self.puzzle.T[::-1].copy()):
            if element != '.':
                cars_to_iterate.append(str(element))
        return sorted(set(cars_to_iterate), key=cars_to_iterate.index)

    def get_children_puzzlestates(self):
        children_nodes = []

        self.if_car_at_valet()  # if car at exit then removes it

        for car in [car for car in self.get_puzzle_cars() if self.car_fuels[car] > 0]:  # only iterate over cars with fuel
            car_indices = list(zip(*np.where(self.puzzle == car)))  # np.where() returns tuple (rows, cols) so zip to get coords
            car_length_orientation_vector = np.sum(np.diff(car_indices, axis=0), axis=0)  # 2-in-1 to get diff b/w pairs for orientation and sum for length-1
            current_axis_identifier, car_axis = self.get_puzzle_car_axis(car_length_orientation_vector, car_indices)
            empty_spot_indices = np.where(car_axis == '.')[0]
            car_indices_one_axis = list(zip(*car_indices))[current_axis_identifier ^ 1]  # zip back for axes and index correct tuple
            possible_movements_temp = np.sort(np.concatenate((np.array(car_indices_one_axis), empty_spot_indices)))  # periods and the car indices
            car_indices_in_possible_movements_temp = np.searchsorted(possible_movements_temp, car_indices_one_axis)
            possible_movements_temp_diff = np.ediff1d(possible_movements_temp)
            displacement_indices_direction_tuples = []  # tuple (displacement, slice indices in axis, direction string)
            car_length = np.sum(car_length_orientation_vector) + 1
            min_car_index_one_axis =min(car_indices_one_axis)
            max_car_index_one_axis = max(car_indices_one_axis)

            # left or up displacement
            min_car_index_in_possible_movements_temp = min(car_indices_in_possible_movements_temp)
            while min_car_index_in_possible_movements_temp - 1 >= 0:
                if possible_movements_temp_diff[min_car_index_in_possible_movements_temp - 1] == 1:
                    # car min index can be moved to pos_mov_temp[min_car_ind_pos_mov_temp - 1]
                    displacement = possible_movements_temp[min_car_index_in_possible_movements_temp - 1] - min_car_index_one_axis
                    movement_direction_string = 'left' if current_axis_identifier == 0 else 'up'
                    car_axis_slice_indices = (max_car_index_one_axis - car_length + displacement + 1, max_car_index_one_axis + 1)
                    displacement_indices_direction_tuples.append((displacement, car_axis_slice_indices, movement_direction_string))
                    min_car_index_in_possible_movements_temp -= 1
                else:
                    break

            # right or down displacement
            max_car_index_in_possible_movements_temp = max(car_indices_in_possible_movements_temp)
            while max_car_index_in_possible_movements_temp < np.size(possible_movements_temp_diff):
                if possible_movements_temp_diff[max_car_index_in_possible_movements_temp] == 1:
                    # car max index can be moved to pos_mov_temp[max_car_ind_pos_mov_temp + 1]
                    displacement = possible_movements_temp[max_car_index_in_possible_movements_temp + 1] - max_car_index_one_axis
                    movement_direction_string = 'right' if current_axis_identifier == 0 else 'down'
                    car_axis_slice_indices = (min_car_index_one_axis, min_car_index_one_axis + car_length + displacement)
                    displacement_indices_direction_tuples.append((displacement, car_axis_slice_indices, movement_direction_string))
                    max_car_index_in_possible_movements_temp += 1
                else:
                    break

            for movement_tuple in displacement_indices_direction_tuples:

                if self.car_fuels[car] >= abs(movement_tuple[0]):  # only complete move if car has enough fuel
                    puzzlestate_copy = PuzzleState.construct_puzzlestate_copy(self)
                    puzzlestate_copy.car_fuels[car] -= abs(movement_tuple[0])
                    _, car_axis = puzzlestate_copy.get_puzzle_car_axis(car_length_orientation_vector, car_indices)
                    car_axis_slice = car_axis[movement_tuple[1][0]:movement_tuple[1][1]]
                    car_axis_slice[:] = np.roll(car_axis_slice, movement_tuple[0])
                    movement_tuple_formatted = (car, movement_tuple[2], abs(movement_tuple[0]))
                    children_nodes.append((puzzlestate_copy, movement_tuple_formatted))

        return children_nodes

    def get_puzzle_car_axis(self, car_length_orientation_vector, car_indices):
        current_axis_identifier = None
        car_axis = None
        if car_length_orientation_vector[0] == 0 and car_length_orientation_vector[1] > 0:  # horizontal -> bit 0
            current_axis_identifier = 0
            car_axis = self.puzzle[car_indices[0][current_axis_identifier], :]
        elif car_length_orientation_vector[0] > 0 and car_length_orientation_vector[1] == 0:  # vertical -> bit 1
            current_axis_identifier = 1
            car_axis = self.puzzle[:, car_indices[0][current_axis_identifier]]
        return current_axis_identifier, car_axis

    def if_car_at_valet(self):
        neighbour_pairs_indices = np.where(self.puzzle[2, :-1] == self.puzzle[2, 1:])[0]  # compares equality of neighbour pairs
        exit_car_indices = -1
        # checks overlap of equal neighbours and possible car lengths at exits
        for i in range(4, -1, -1):
            possible_exit_car_indices = np.arange(i, 5)
            if np.all(np.in1d(possible_exit_car_indices, neighbour_pairs_indices)):
                exit_car_indices = np.append(possible_exit_car_indices, 5)
            else:
                break
        if type(exit_car_indices) == np.ndarray:
            self.puzzle[2, exit_car_indices[0]:] = '.'

### State Space Search Class

In [66]:
class Node:

    def __init__(self, puzzlestate, parent, g, h, movement_information):
        self.puzzlestate = puzzlestate
        self.parent = parent
        self.children = []
        self.g = g
        self.h = h
        self.f = self.g + self.h
        self.movement_information = movement_information

    @classmethod
    def construct_root_node(cls, puzzlestate):
        return cls(puzzlestate, None, 0, 0, None)

    @classmethod
    def construct_child_node(cls, puzzlestate, parent, movement_information):
        return cls(puzzlestate, parent, parent.g, 0, movement_information)

    def __lt__(self, other):
        return self.f < other.f

    def increment_g(self):
        self.g += 1

    def set_h(self, h):
        self.h = h

    def update_f(self):
        self.f = self.g + self.h

    def h1(self):
        ambulance_indices = np.ndarray.flatten(np.argwhere(self.puzzlestate.puzzle[2, :] == 'A'))
        right_of_ambulance_indices = self.puzzlestate.puzzle[2, ambulance_indices[-1]+1:]
        cars_right_of_ambulance_indices = right_of_ambulance_indices[right_of_ambulance_indices != '.']
        return len(set(cars_right_of_ambulance_indices))

    def h2(self):
        ambulance_indices = np.ndarray.flatten(np.argwhere(self.puzzlestate.puzzle[2, :] == 'A'))
        right_of_ambulance_indices = self.puzzlestate.puzzle[2, ambulance_indices[-1]+1:]
        blocked_positions_right_of_ambulance_indices = right_of_ambulance_indices[right_of_ambulance_indices != '.']
        return len(blocked_positions_right_of_ambulance_indices)

    LAMBDA_H3 = 4
    def h3(self):
        return Node.LAMBDA_H3 * self.h1()

    def h4(self):
        ambulance_indices = np.ndarray.flatten(np.argwhere(self.puzzlestate.puzzle[2, :] == 'A'))
        right_of_ambulance_indices = self.puzzlestate.puzzle[2, ambulance_indices[-1]+1:]
        cars_right_of_ambulance_indices = right_of_ambulance_indices[right_of_ambulance_indices != '.']
        if ambulance_indices[-1]+1 <=5:
            top_right_of_ambulance_indices = self.puzzlestate.puzzle[1, ambulance_indices[-1]+1:ambulance_indices[-1]+2]
            cars_top_right_of_ambulance_indices = top_right_of_ambulance_indices[top_right_of_ambulance_indices != '.']
            bottom_right_of_ambulance_indices = self.puzzlestate.puzzle[3, ambulance_indices[-1]+1:ambulance_indices[-1]+2]
            cars_bottom_right_of_ambulance_indices = bottom_right_of_ambulance_indices[bottom_right_of_ambulance_indices != '.']
            cars_in_h4_of_ambulance_indices = np.concatenate((cars_right_of_ambulance_indices, cars_top_right_of_ambulance_indices, cars_bottom_right_of_ambulance_indices), axis=None)
        else:
            cars_in_h4_of_ambulance_indices = cars_right_of_ambulance_indices
        return len(set(cars_in_h4_of_ambulance_indices)) / 2

    def ucs_algorithm(self):
        if self.parent is not None: # only increment g if caller is a child node
            self.increment_g()
            self.update_f()

    def gbfs_algorithm(self, heuristic_function):
        self.set_h(getattr(self, heuristic_function)()) # every root/child node has a heuristic value
        self.update_f()

    def a_algorithm(self, heuristic_function):
        if self.parent is not None:
            self.increment_g()
        self.set_h(getattr(self, heuristic_function)())
        self.update_f()

    def get_root_node(self, solution_path):
        if self.parent is None:
            solution_path.append(self)
            return solution_path
        else:
            solution_path.append(self)
            return self.parent.get_root_node(solution_path)

    search_path_length_counter = 0
    def get_search_file_line(self):
        car_fuels_to_write = ''
        for car_fuel in [str(car_fuel[0])+str(self.puzzlestate.car_fuels[car_fuel]) for car_fuel in self.puzzlestate.car_fuels if self.puzzlestate.car_fuels[car_fuel] != 100]:
            car_fuels_to_write += ' ' + car_fuel
        Node.search_path_length_counter += 1
        return '{f} {g} {h} {puzzle}{car_fuels}\n'.format(f=self.f, g=self.g, h=self.h, puzzle=self.puzzlestate.get_puzzle_configuration_string(), car_fuels=car_fuels_to_write)

    @staticmethod
    def is_node_searched(puzzlestate, closed_node_hashmap, open_node_queue):
        puzzle_hash = sha1(puzzlestate.puzzle).hexdigest()
        return puzzle_hash in closed_node_hashmap or puzzle_hash in open_node_queue

    def state_space_search(self, algorithm_function, heuristic_function=None):
        open_node_queue = PriorityQueue()
        open_node_hashmap = dict() # for faster searching
        closed_node_hashmap = dict()
        if heuristic_function is not None: # GBFS or A
            getattr(self, algorithm_function)(heuristic_function)
        else: # UCS
            getattr(self, algorithm_function)()
        open_node_queue.put(self)

        while not open_node_queue.empty():
            current_node = open_node_queue.get() # pop out first sorted node
            buffered_writer.write(current_node.get_search_file_line().encode(encoding='utf-8'))
            if current_node.puzzlestate.is_goal_state():
                return current_node
            else:
                closed_node_hashmap[sha1(current_node.puzzlestate.puzzle).hexdigest()] = current_node
                for child in current_node.puzzlestate.get_children_puzzlestates(): # child is the tuple (puzzlestate_copy, movement_tuple_formatted)
                    # if this child node is not in the closed node hashmap or open node hashmap (faster than using open node queue)
                    if not Node.is_node_searched(child[0], closed_node_hashmap, open_node_hashmap):
                        child_node = Node.construct_child_node(child[0], current_node, child[1])  # child[0] is the PuzzleState, child[1] is formatted_movement_tuple
                        if heuristic_function is not None: # GBFS or A
                            getattr(child_node, algorithm_function)(heuristic_function)
                        else: # UCS
                            getattr(child_node, algorithm_function)()
                        current_node.children.append(child_node)
                        open_node_queue.put(child_node)
                        open_node_hashmap[sha1(child_node.puzzlestate.puzzle).hexdigest()] = child_node
        return self

### Load Input File

In [67]:
def parse_input_file(path):
    puzzlestates_temp = []
    with open(path, 'r') as f:
        lines_nonempty = filter(None, (line.rstrip() for line in f))
        for line in [line for line in lines_nonempty if line[0] != '#']:
            puzzle_configuration_line = line.split(' ')
            puzzlestate = PuzzleState.construct_initial_puzzlestate(puzzle_configuration_line)
            puzzlestates_temp.append(puzzlestate)
    return puzzlestates_temp

### Output Solution File Helper Functions

In [68]:
def write_solution_path_file(solution_path): # written at 5am, please do not judge the hackery
    buffered_writer.write('Initial board configuration: {}\n\n{}\n'.format(solution_path[-1].puzzlestate.get_puzzle_configuration_string(), solution_path[-1].puzzlestate.get_puzzle_configuration_flat_shape()).encode(encoding='utf-8'))
    car_fuels = ''
    for car in solution_path[-1].puzzlestate.car_fuels:
        car_fuels += ' {}:{},'.format(car, solution_path[-1].puzzlestate.car_fuels[car])
    car_fuels = car_fuels[:-1]
    buffered_writer.write('Car fuel available:{}\n\n'.format(car_fuels).encode(encoding='utf-8'))
    buffered_writer.write('Runtime: {} seconds\nSearch path length: {} states\nSolution path length: {} moves\n'.format(execution_time, Node.search_path_length_counter, len(solution_path) - 1).encode(encoding='utf-8'))
    solution_path_movements = ''
    solution_path_movement_nodes = ''
    final_node_car_fuels = ''
    for node in reversed(solution_path[:-1]):
        movement_string = ' {} {} {};'.format(node.movement_information[0], node.movement_information[1], node.movement_information[2])
        solution_path_movements += movement_string
        node_car_fuels = ''
        for car in node.puzzlestate.car_fuels:
            if node.puzzlestate.car_fuels[car] != 100:
                node_car_fuels += ' {}{}'.format(car, node.puzzlestate.car_fuels[car])
                final_node_car_fuels = node_car_fuels
        movement_node_string = '{} {} {}\t{} {} {}\n'.format(node.movement_information[0], '{:>5}'.format(node.movement_information[1]), node.movement_information[2], '{:>2}'.format(node.puzzlestate.car_fuels[node.movement_information[0]]), node.puzzlestate.get_puzzle_configuration_string(), node_car_fuels)
        solution_path_movement_nodes += movement_node_string
    final_car_fuels = ''
    for car in solution_path[0].puzzlestate.car_fuels:
        final_car_fuels += ' {}{},'.format(car, solution_path[0].puzzlestate.car_fuels[car])
    buffered_writer.write('Solution path:{}\n\n{}\n'.format(solution_path_movements[:-1], solution_path_movement_nodes).encode(encoding='utf-8'))
    buffered_writer.write('!{}\n{}'.format(final_node_car_fuels, solution_path[0].puzzlestate.get_puzzle_configuration_flat_shape()).encode(encoding='utf-8'))

def write_no_solution_file(root_node):
    buffered_writer.write('Initial board configuration: {}\n\n{}\n'.format(root_node.puzzlestate.get_puzzle_configuration_string(), root_node.puzzlestate.get_puzzle_configuration_flat_shape()).encode(encoding='utf-8'))
    car_fuels = ''
    for car in root_node.puzzlestate.car_fuels:
        car_fuels += ' {}:{},'.format(car, root_node.puzzlestate.car_fuels[car])
    car_fuels = car_fuels[:-1]
    buffered_writer.write('Car fuel available:{}\n\n'.format(car_fuels).encode(encoding='utf-8'))
    buffered_writer.write('Sorry, could not solve the puzzle as specified.\nError: no solution found\n\n'.encode(encoding='utf-8'))
    buffered_writer.write('Runtime: {} seconds'.format(execution_time).encode(encoding='utf-8'))

### Output Analysis Helper Function

In [69]:
def write_row_to_analysis_csv(row):
    with open('analysis.csv', 'a') as f:
        writer = csv.writer(f)
        writer.writerow(row)

### Runner

In [70]:
PATH_TO_INPUT_FILE = '../Sample/sample-input.txt'
all_puzzlestates = parse_input_file(PATH_TO_INPUT_FILE)

write_row_to_analysis_csv(['Puzzle Number', 'Algorithm', 'Heuristic', 'Length of the Solution', 'Length of the Search Path', 'Execution Time (in seconds)'])

for index, puzzlestate in enumerate(all_puzzlestates):
    root_node = Node.construct_root_node(puzzlestate)

    if index > 0:
        write_row_to_analysis_csv(['', '', '', '', '', '',])

    for algorithm in ['ucs', 'gbfs', 'a']:
        for heuristic in (range(1) if algorithm == 'ucs' else range(1,5)):
            heuristic_function = 'h{}'.format(heuristic) if algorithm != 'ucs' else None
            optional_heuristic_file_identifier = '-h{}-'.format(heuristic) if algorithm != 'ucs' else '-'
            search_file = io.FileIO('output_files/{}{}search-{}.txt'.format(algorithm, optional_heuristic_file_identifier, index + 1), 'w')
            buffered_writer = io.BufferedWriter(search_file, buffer_size=1024*64) # 64 KB buffer
            Node.search_path_length_counter = 0
            execution_start_time = time.time()
            goal_node = root_node.state_space_search('{}_algorithm'.format( algorithm), heuristic_function)
            execution_time = time.time() - execution_start_time
            buffered_writer.flush()
            search_file.close()

            solution_file = io.FileIO('output_files/{}{}solution-{}.txt'.format(algorithm, optional_heuristic_file_identifier, index + 1), 'w')
            buffered_writer = io.BufferedWriter(solution_file)
            if goal_node.parent is not None:
                solution_path = goal_node.get_root_node([])
                write_solution_path_file(solution_path)
            else:
                solution_path = None
                write_no_solution_file(goal_node)
            buffered_writer.flush()
            solution_file.close()

            write_row_to_analysis_csv([index + 1, algorithm.upper(), '{}'.format(heuristic_function) if heuristic_function else 'NA', len(solution_path) if solution_path else 'NONE', Node.search_path_length_counter, '{:.2f}'.format(execution_time)])

            print('Puzzle {} complete with algorithm={} and heuristic={}'.format(index + 1, algorithm, heuristic_function))

Puzzle 1 complete with algorithm=ucs and heuristic=None
Puzzle 1 complete with algorithm=gbfs and heuristic=h1
Puzzle 1 complete with algorithm=gbfs and heuristic=h2
Puzzle 1 complete with algorithm=gbfs and heuristic=h3
Puzzle 1 complete with algorithm=gbfs and heuristic=h4
Puzzle 1 complete with algorithm=a and heuristic=h1
Puzzle 1 complete with algorithm=a and heuristic=h2
Puzzle 1 complete with algorithm=a and heuristic=h3
Puzzle 1 complete with algorithm=a and heuristic=h4
Puzzle 2 complete with algorithm=ucs and heuristic=None
Puzzle 2 complete with algorithm=gbfs and heuristic=h1
Puzzle 2 complete with algorithm=gbfs and heuristic=h2
Puzzle 2 complete with algorithm=gbfs and heuristic=h3
Puzzle 2 complete with algorithm=gbfs and heuristic=h4
Puzzle 2 complete with algorithm=a and heuristic=h1
Puzzle 2 complete with algorithm=a and heuristic=h2
Puzzle 2 complete with algorithm=a and heuristic=h3
Puzzle 2 complete with algorithm=a and heuristic=h4
Puzzle 3 complete with algorithm

### Generate 50 Puzzles

In [71]:
def generate_puzzle():

    f = open("input.txt", "w")

    for x in range(50):
        HORIZONTAL_IDENTIFIER = 0
        temp_board = '....................................'

        #Inserting Ambulance
        a_index = np.random.randint(0, 5)
        temp_board = temp_board[: 12 + a_index] + 'AA' + temp_board[a_index + 14 :]

        #Inserting Cars
        car_names = ['B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M']
        number_of_cars = np.random.randint(6, 13)
        failures = 0
        cars_inserted = 1

        while cars_inserted <= number_of_cars:
            if failures > 10:
                break
            car_length = np.random.randint(2, 4)
            car_index = np.random.randint(0, 7 - car_length)
            orientation = np.random.randint(0, 2)
            car_line_index = np.random.randint(0, 6)
            car_name = car_names[cars_inserted - 1]

            if orientation == HORIZONTAL_IDENTIFIER:
                if not temp_board[6 * car_line_index + car_index : 6 * car_line_index + car_index + car_length].isupper():
                    temp_board = temp_board[: 6 * car_line_index + car_index] + (car_name * car_length) + temp_board[6 * car_line_index + car_index + car_length :]
                    cars_inserted += 1
                else:
                    failures += 1
            else:
                if not (temp_board[car_line_index + 6 * car_index] + temp_board[car_line_index + (6 * car_index) + 6] + (temp_board[car_line_index + (6 * car_index) + 12] if car_length == 3  else '')).isupper():
                    temp_board = temp_board[: car_line_index + (6 * car_index)] + car_name + temp_board[car_line_index + 6 * car_index + 1 : car_line_index + 6 * car_index + 6] + car_name + temp_board[car_line_index + 6 * car_index + 7 : car_line_index + 6 * car_index + 12] + (car_name if car_length == 3  else '') + temp_board[ (car_line_index + 6 * car_index + 13) if car_length == 3 else (car_line_index + 6 * car_index + 12) :]
                    cars_inserted += 1
                else:
                    failures += 1

        #Giving cars fuel
        has_random_amount = np.random.randint(0, cars_inserted / 2)
        for y in range(has_random_amount):
            temp_board += " " + str(car_names[np.random.randint(0, cars_inserted)]) + str(np.random.randint(0, 101))
        f.write(temp_board + "\n")
    f.close()

# generate_puzzle()