In [None]:
#Create a cell object
#Each cell object should only move north, south, left, right
#Agent cell is the only initialized cell
#Keep an open list
#Keep a closed list 
#Each cell will have 3 values
#heuteristic value - manhattan distance from cell to goal
#distance cost - always 1 cost for agent to reach cell




In [None]:
import maze_gen as mg
maze_name = 'maze1'
maze = mg.retrieve_maze(maze_name)

maze_start = mg.find_cell(maze, 'S')
maze_end = mg.find_cell(maze, '!')
print(f"start:{maze_start} end:{maze_end}")

In [44]:
import heapq
WALL = '%'
EMPTY = ' '
START = 'S'
GOAL = '!'

def heuristics(a: tuple, b: tuple) -> float:
    #Assign x and y cords
    x1, y1 = a
    x2, y2 = b
    #Calculate manhattan distance
    distance = abs(x1 - x2) + abs(y1 - y2)
    #print(distance)
    return distance

def knowledge_map(true_map):
    """
    Creates a belief map for agent to keep track of what cells it has visited and if the cell is blocked/unblocked, with each cell initially marked as unblocked.
    Params:
        Index0 : ' ' if cell is empty, other wise '#' if wall. Cells not seen will always be assumed empty
    """
    map = [[' ' for _ in range(len(true_map))] for _ in range(len(true_map))]
    return map

def get_neighbors(map, cell: tuple) -> list:
    """
    Get all neighbors of a given cell
    """
    NORTH, SOUTH, EAST, WEST = +1, -1, +1, -1

    c_x, c_y = cell
    possible = []

    if c_x + NORTH in range(0, len(map)) and c_y in range(0,len(map[c_x + NORTH])):
        if map[c_x + NORTH][c_y] == EMPTY:
            possible.append((c_x + NORTH, c_y))

    if c_x + SOUTH in range(0, len(map)) and c_y in range(0,len(map[c_x + SOUTH])):
        if map[c_x + SOUTH][c_y] == EMPTY:
            possible.append((c_x + SOUTH, c_y))

    if c_x in range(0, len(map)) and c_y + EAST in range(0,len(map[c_x])):
        if map[c_x][c_y + EAST] == EMPTY:
            possible.append((c_x, c_y + EAST))

    if c_x in range(0, len(map)) and c_y + WEST in range(0,len(map[c_x])):
        if map[c_x][c_y + WEST] == EMPTY:
            possible.append((c_x, c_y + WEST))

    #Debug to see why map ranges are out of bounds
    #print(f"map bounds: x -> {len(map)} y -> {len(map[0])}")
    return possible

def cell_is_blocked(map, cell):
    """
    Check to see if cell is a wall or open
    """
    x, y = cell
    if map[x][y] == WALL:
        return True
    return False

def reconstructed_path(parent, start, goal):
    """
    Reconstruct the potential path found by a_start_search
    """
    path = []
    path.append(goal)
    cell = goal
    while parent[cell] != start:
        cell = parent[cell]
        path.append(cell)
        

    path.append(start)
    return list(reversed(path))

def a_start_search(start_cell, goal_cell, known_map, sign):
    """
    Function that runs a* search from given cell to goal cell using the world knowledge the agent has.
    Params: 
        start_cell : Starting cell in the form of a tuple (x,y)
        goal_cell : End cell in a tuple (x,y) that has '!'
        known_map : 2d list that contains the map knowledge of all cells the agent has seen at runtime
        sign :
            -1 → break ties on largest g  
             1 → break ties on smallest g
    """
    #min heap to track cells we can reach
    open_list = []
    #Visited list to track which cells have already been visited
    closed_list = set()
    #sample entry for closed_list
    h_value = heuristics(start_cell, goal_cell)
    g_value = 0
    f_cost = h_value + g_value
    
    #initialize heap with start cell
    CONSTANT = len(known_map) * len(known_map[0]) + 1
    priority = CONSTANT * f_cost + (sign * g_value)
    heapq.heappush(open_list, (priority, start_cell))

    #keep track of g_value starting with start cell
    g_tracker = dict()
    g_tracker[start_cell] = g_value

    #Keep track of parents
    parent = dict()
    parent[start_cell] = None

    #Track expansions
    expansions = 0

    while open_list:
        #Pop lowest f_cost in heap
        state = heapq.heappop(open_list)
        current_cell = state[1]
        #print(f"current state: {state}")
        #print(f"current cell: {current_cell}")

        #Check to see if we reached goal
        if state[1] == goal_cell:
            #print("Found potential goal, reconstructing path")
            return reconstructed_path(parent, start_cell, goal_cell), expansions

        #Update closed list as needed
        if current_cell in closed_list:
            continue

        #Add node to seen
        closed_list.add(current_cell)
        #print(f"Expanding {state}")
        expansions += 1

        #get neighbors
        potential_neighbors = get_neighbors(known_map, state[1])

        for neighbor in potential_neighbors:
            #print each neighbor for debug
            #print(f"possible neighbor:{neighbor}")

            #Skip cell if it is known to be a wall
            if cell_is_blocked(known_map, neighbor):
                continue
            
            #Get the potential cost to reach this cell
            potential_g = g_tracker[current_cell] + 1
            
            #Push neighbor into open/frontier if they haven't been seen or found cheaper cost
            if neighbor not in g_tracker or potential_g < g_tracker[neighbor]:
                #Update g cost
                g_tracker[neighbor] = potential_g
                #Update parent of neighbor
                parent[neighbor] = current_cell

                #Calulate values of neighbor
                h_value = heuristics(neighbor, goal_cell)
                f_cost = h_value + g_tracker[neighbor]

                #Push neighbor into heap but give priority according to the sign
                CONSTANT = len(known_map) * len(known_map[0]) + 1
                priority = CONSTANT * f_cost + (sign * g_tracker[neighbor])
                heapq.heappush(open_list, (priority, neighbor))
             
    return False


In [45]:
def update_known_map(true_map, agent_map, current_cell):
    """
    Update neighbors the cells current position
    """
    all_possible = get_neighbors(agent_map, current_cell)
    truly_possible = get_neighbors(true_map, current_cell)

    #Get a list of all blocked cells in 4 cardinal directions
    blocked_cells = [cell for cell in all_possible if cell not in truly_possible]
    #print(f"all -> {all_possible}")
    #print(f"true -> {truly_possible}")
    #print(f"blocked -> {blocked_cells}")

    #Update agent map
    for neighbor in blocked_cells:
        agent_map[neighbor[0]][neighbor[1]] == WALL
    
    return agent_map
    

def repeated_a_star(begin_state, end, true_map, sign):
    """
    Function that repeatedly runs a* search until goal is reached or all possible cells exhausted
    Params:
        begin_state : list object that contains the specific maze you want to use
        end : string object (either 'S' for start or '!' for end) to find in maze
        true_map : True map of maze with all information provided
        sign :
            -1 will break ties on largest g  
             1 will break ties on smallest g
    """
    #Create knowledge map where every cell is assumed reachable by agent
    known_map = knowledge_map(true_map)
    current_state = begin_state
    #Initialize the finalized path
    finalized_path = [current_state]
    
    #Let agent glance at nearby cells (4 cardinal directions) and update map with knowledge
    known_map = update_known_map(true_map, known_map, current_state)
    #print(potential_neighbors)
    expansion_count = 0

    #Loop to find path or return maze as unsolvable
    while current_state != end:
        #Initiate A* search using known knowledge
        potential_path, current_exp_count = a_start_search(current_state, end, known_map, sign)

        #Update expansion counter
        expansion_count += current_exp_count
        if potential_path is False:
            return("No path is possible. Maze cannot be solved after all cells are exhausted.")
        
        #Follow each cell in the found path
        for i in range(0,len(potential_path)-1):
            next_cell = potential_path[i+1]
            #print(next_cell)
            #Check to see if cell is blocked on true map and reevaluate path as needed
            if true_map[next_cell[0]][next_cell[1]] == WALL:
                #print(f"Found a wall at {next_cell}. Restarting at {current_state} with updated map")
                known_map[next_cell[0]][next_cell[1]] = WALL
                break
            #Move agent if cell is open and walkable
            else:
                current_state = next_cell
                finalized_path.append(current_state)
                #Update neighbors of current agent cell 
                known_map = update_known_map(true_map, known_map, current_state)
    
    print(f"Total expansions performed: {expansion_count} with {sign}")
    return finalized_path

In [47]:
#Testing figure 9 observations
import maze_gen as mg
maze_name = 'maze1'
maze = mg.retrieve_maze(maze_name)

maze_start = mg.find_cell(maze, START)
maze_end = mg.find_cell(maze, GOAL)
print(f"start:{maze_start} end:{maze_end}")

final_path_largest = repeated_a_star(maze_start, maze_end, maze, -1)
#final_path_smallest = repeated_a_star(maze_start, maze_end, maze, 1)

start:(73, 17) end:(11, 69)
Total expansions performed: 53122 with -1


In [None]:
#Testing figure 9 observations
import maze_gen as mg
maze_name = 'maze1'
maze = mg.retrieve_maze(maze_name)

maze_start = mg.find_cell(maze, START)
maze_end = mg.find_cell(maze, GOAL)
print(f"start:{maze_start} end:{maze_end}")

final_path_largest = repeated_a_star(maze_start, maze_end, maze, -1)
#final_path_smallest = repeated_a_star(maze_start, maze_end, maze, 1)

start:(4, 2) end:(4, 4)
current state: (52, (4, 2))
current state: (51, (4, 3))
current state: (50, (4, 4))
current state: (52, (4, 2))
current state: (103, (3, 2))
current state: (102, (3, 3))
current state: (101, (3, 4))
current state: (100, (4, 4))
current state: (52, (4, 2))
current state: (103, (4, 1))
current state: (154, (3, 1))
current state: (154, (4, 0))
current state: (205, (2, 1))
current state: (204, (2, 2))
current state: (203, (2, 3))
current state: (202, (2, 4))
current state: (201, (3, 4))
current state: (200, (4, 4))
current state: (130, (2, 1))
current state: (129, (3, 1))
current state: (128, (4, 1))
current state: (127, (4, 2))
current state: (179, (4, 0))
current state: (180, (3, 0))
current state: (181, (1, 1))
current state: (180, (1, 2))
current state: (179, (1, 3))
current state: (178, (1, 4))
current state: (177, (2, 4))
current state: (176, (3, 4))
current state: (175, (4, 4))
current state: (156, (1, 1))
current state: (155, (2, 1))
current state: (154, (3,

In [None]:
#Check to see if all mazes generated are valid utilizing Repeated Forward A* search
import maze_gen as mg

for i in range(1, 51):
    maze = mg.retrieve_maze(f"maze{i}")
    maze_start = mg.find_cell(maze, START)
    maze_end = mg.find_cell(maze, GOAL)
    print(f"start:{maze_start} end:{maze_end}")
    
    final_path = repeated_a_star(maze_start, maze_end, maze, -1)
    if isinstance(final_path, list):
        print(f"maze{i} is solvable")
    else:
        print(final_path)

In [50]:
def adaptive_a_start_search(start_cell, goal_cell, known_map, h_tracker, sign):
    """
    Function that runs a* search from given cell to goal cell using the world knowledge the agent has.
    Params: 
        start_cell : Starting cell in the form of a tuple (x,y)
        goal_cell : End cell in a tuple (x,y) that has '!'
        known_map : 2d list that contains the map knowledge of all cells the agent has seen at runtime
        sign :
            -1 → break ties on largest g  
             1 → break ties on smallest g
    """
    #min heap to track cells we can reach
    open_list = []
    #Visited list to track which cells have already been visited
    closed_list = set()
    #sample entry for closed_list
    if start_cell in h_tracker:
        h_value = h_tracker[start_cell]
    else:
        h_value = heuristics(start_cell, goal_cell)
    g_value = 0
    f_cost = h_value + g_value
    
    #initialize heap with start cell
    CONSTANT = len(known_map) * len(known_map[0]) + 1
    priority = CONSTANT * f_cost + (sign * g_value)
    heapq.heappush(open_list, (priority, start_cell))

    #keep track of g_value starting with start cell
    g_tracker = dict()
    g_tracker[start_cell] = g_value

    #Keep track of parents
    parent = dict()
    parent[start_cell] = None

    #Track expansions
    expansions = 0

    while open_list:
        #Pop lowest f_cost in heap
        state = heapq.heappop(open_list)
        current_cell = state[1]
        #print(f"current cell: {current_cell}")
        #print(state)

        #Check to see if we reached goal
        if state[1] == goal_cell:
            #print("Found potential goal, reconstructing path")
            return reconstructed_path(parent, start_cell, goal_cell), expansions, closed_list, g_tracker[current_cell]

        #Update closed list as needed
        if current_cell in closed_list:
            continue

        #Add node to seen
        closed_list.add((current_cell, g_tracker[current_cell]))
        #print(f"Expanding {state}")
        expansions += 1

        #get neighbors
        potential_neighbors = get_neighbors(known_map, state[1])

        for neighbor in potential_neighbors:
            #print each neighbor for debug
            #print(f"possible neighbor:{neighbor}")

            #Skip cell if it is known to be a wall
            if cell_is_blocked(known_map, neighbor):
                continue
            
            #Get the potential cost to reach this cell
            potential_g = g_tracker[current_cell] + 1
            
            #Push neighbor into open/frontier if they haven't been seen or found cheaper cost
            if neighbor not in g_tracker or potential_g < g_tracker[neighbor]:
                #Update g cost
                g_tracker[neighbor] = potential_g
                #Update parent of neighbor
                parent[neighbor] = current_cell

                #Calulate values of neighbor
                if neighbor in h_tracker:
                    h_value = h_tracker[neighbor]
                else:
                    h_value = heuristics(neighbor, goal_cell)
                    
                f_cost = h_value + g_tracker[neighbor]

                #Push neighbor into heap but give priority according to the sign
                CONSTANT = len(known_map) * len(known_map[0]) + 1
                priority = CONSTANT * f_cost + (sign * g_tracker[neighbor])
                heapq.heappush(open_list, (priority, neighbor))
             
    return False


def adaptive_a_star(begin_state, end, true_map, sign):
    """
    Function that repeatedly runs a* search until goal is reached or all possible cells exhausted
    Params:
        begin_state : list object that contains the specific maze you want to use
        end : string object (either 'S' for start or '!' for end) to find in maze
        true_map : True map of maze with all information provided
        sign :
            -1 will break ties on largest g  
             1 will break ties on smallest g
    """
    #Create knowledge map where every cell is assumed reachable by agent
    known_map = knowledge_map(true_map)
    current_state = begin_state
    #Initialize the finalized path
    finalized_path = [current_state]
    
    #Let agent glance at nearby cells (4 cardinal directions) and update map with knowledge
    known_map = update_known_map(true_map, known_map, current_state)
    #print(potential_neighbors)
    expansion_count = 0

    #Create a heuristic value tracker
    heuristic_tracker = dict()

    #Loop to find path or return maze as unsolvable
    while current_state != end:
        #Initiate A* search using known knowledge
        potential_path, current_exp_count, closed_list, g_cost_path = adaptive_a_start_search(current_state, end, known_map, heuristic_tracker, sign)

        #Update g val of each cell in closed list
        for cell, g_val in closed_list:
            #Heuristic update
            h_new = g_cost_path - g_val
            #print(f"cell: {cell}, h_new: {h_new}, g val: {g_val}, path g cost:{g_cost_path}")
            heuristic_tracker[cell] = h_new

        #Update expansion counter
        expansion_count += current_exp_count
        if potential_path is False:
            return("No path is possible. Maze cannot be solved after all cells are exhausted.")
        
        #Follow each cell in the found path
        for i in range(0,len(potential_path)-1):
            next_cell = potential_path[i+1]
            #print(next_cell)
            #Check to see if cell is blocked on true map and reevaluate path as needed
            if true_map[next_cell[0]][next_cell[1]] == WALL:
                #print(f"Found a wall at {next_cell}. Restarting at {current_state} with updated map")
                known_map[next_cell[0]][next_cell[1]] = WALL
                break
            #Move agent if cell is open and walkable
            else:
                current_state = next_cell
                finalized_path.append(current_state)
                #Update neighbors of current agent cell 
                known_map = update_known_map(true_map, known_map, current_state)
    
    print(f"Total expansions performed: {expansion_count} with {sign}")
    return finalized_path

In [52]:
#Testing figure 9 observations
import maze_gen as mg
maze_name = 'maze1'
maze = mg.retrieve_maze(maze_name)

maze_start = mg.find_cell(maze, START)
maze_end = mg.find_cell(maze, GOAL)
print(f"start:{maze_start} end:{maze_end}")

final_path_largest = adaptive_a_star(maze_start, maze_end, maze, -1)
#final_path_smallest = repeated_a_star(maze_start, maze_end, maze, 1)

start:(73, 17) end:(11, 69)
Total expansions performed: 39508 with -1


In [37]:
#Testing figure 9 observations
import maze_gen as mg
maze_name = 'test_maze_fig2'
maze = mg.retrieve_maze(maze_name)

maze_start = mg.find_cell(maze, START)
maze_end = mg.find_cell(maze, GOAL)
print(f"start:{maze_start} end:{maze_end}")

final_path_largest = adaptive_a_star(maze_start, maze_end, maze, -1)
#final_path_smallest = repeated_a_star(maze_start, maze_end, maze, 1)

start:(4, 2) end:(4, 4)
Found potential goal, reconstructing path
cell: (4, 2), h_new: 2, g val: 0, path g cost:2
cell: (4, 3), h_new: 1, g val: 1, path g cost:2
Found potential goal, reconstructing path
cell: (4, 2), h_new: 4, g val: 0, path g cost:4
cell: (3, 3), h_new: 2, g val: 2, path g cost:4
cell: (3, 4), h_new: 1, g val: 3, path g cost:4
cell: (3, 2), h_new: 3, g val: 1, path g cost:4
Found potential goal, reconstructing path
cell: (2, 2), h_new: 4, g val: 4, path g cost:8
cell: (3, 4), h_new: 1, g val: 7, path g cost:8
cell: (2, 3), h_new: 3, g val: 5, path g cost:8
cell: (4, 2), h_new: 8, g val: 0, path g cost:8
cell: (3, 1), h_new: 6, g val: 2, path g cost:8
cell: (4, 1), h_new: 7, g val: 1, path g cost:8
cell: (2, 1), h_new: 5, g val: 3, path g cost:8
cell: (2, 4), h_new: 2, g val: 6, path g cost:8
cell: (4, 0), h_new: 6, g val: 2, path g cost:8
Found potential goal, reconstructing path
cell: (2, 4), h_new: 2, g val: 5, path g cost:7
cell: (1, 3), h_new: 4, g val: 3, path g

In [29]:
print(final_path_largest)

[(4, 2), (4, 1), (3, 1), (2, 1), (1, 1), (0, 1), (0, 2), (0, 3), (0, 4), (1, 4), (2, 4), (3, 4), (4, 4)]
