Make sure you update the **STUDENT_TOKEN**, and then run this block. You should use FIRSTNAME-SURNAME(S) as the TOKEN.

The code in this block can be safely ignored.

In [820]:
#Update your token
STUDENT_TOKEN = 'GABRIEL DE OLAGUIBEL'

## ignore this code, just used for submission
import requests
import pprint
import json

def get_problem(problem_id):
  r = requests.get('https://emarchiori.eu.pythonanywhere.com/get-problem?TOKEN=%s&problem=%s' % (STUDENT_TOKEN, problem_id))
  if not r.status_code == 200:
    print('\033[91m' + str(r.status_code))
  return r.json()

def submit_answer(problem_id, answer):
  r = requests.get('https://emarchiori.eu.pythonanywhere.com/submit-answer?TOKEN=%s&problem=%s' % (STUDENT_TOKEN, problem_id), json = answer)
  if not r.status_code == 200:
    print('\033[91m' + str(r.status_code))
  result = r.json()['result']
  if result == 'PASSED':
    print('\033[92m' + result)
  else:
    print('\033[91m' + result)


# This is a base class for the problems, you will need to create a version of
# this for each problem, but that makes reusing the search code easier.
class Problem:
  def is_end(self, state): # Checks if the given state is an end or goal state
    return False

  def get_initial_state(self): # Returns the initial state of the problem
    return None

  def expand(self, parent): # Returns the successor states of a given state (parent)
    return []

  def heuristic(self, state): # Returns a heuristic value for the given state. (0 in base class)
    return 0


### Game Implementation provided by the professor

In [821]:
# Sliding tiles problem (BFS)

class SlidingTiles(Problem): # Class to represent the sliding tiles problem
  def __init__(self, initial_state, final_state):
    self.initial_state = initial_state
    self.final_state = final_state

  # We only need to check against the final state to know we are at an end state
  def is_end(self, state): # overide the is_end method from the Problem class
    return state == self.final_state

  def get_initial_state(self):
    return self.initial_state

  # Expand will have, at most, 4 valid actions, were the empty position moves left,
  # right, up or down. We use '%' and '//' operations to check when this is valid
  # as we are using a list representation of a 3x3 matrix.
  def expand(self, parent): # overide the expand method from the Problem class
    successors = [] #
    state = parent['state'] # state is the current state
    new_cost = parent['cost'] + 1 # new_cost is the cost of the current state + 1 since we measure cost by the number of moves
    pos = state.index(0) # pos is the position of the hole/empty space

    # pos - 1 (move empty to the left)
    # This move is valid from the following positions (not the ones with -)
    # - 1 2
    # - 4 5
    # - 7 8
    if pos % 3 > 0: # if the position is not in the first column
      new_state = state.copy()
      new_state[pos], new_state[pos - 1] = new_state[pos - 1], new_state[pos] # swap the empty space with the tile to the left
      successors.append({'state': new_state, 'parent': parent, 'action': 'L', 'cost': new_cost}) # add the new state to the successors list

    # pos + 1 (move empty to the right)
    # This move is valid from the following positions (not the ones with -)
    # 0 1 -
    # 3 4 -
    # 6 7 -
    if pos % 3 < 2: # if the position is not in the last column
      new_state = state.copy()
      new_state[pos], new_state[pos + 1] = new_state[pos + 1], new_state[pos] # swap
      successors.append({'state': new_state, 'parent': parent, 'action': 'R', 'cost': new_cost})

    # pos - 3 (move empty up)
    # This move is valid from the following positions (not the ones with -)
    # - - -
    # 3 4 5
    # 6 7 8
    if pos // 3 > 0: # if the position is not in the first row
      new_state = state.copy()
      new_state[pos], new_state[pos - 3] = new_state[pos - 3], new_state[pos]
      successors.append({'state': new_state, 'parent': parent, 'action': 'U', 'cost': new_cost})

    # pos + 3 (move empty down)
    # This move is valid from the following positions (not the ones with -)
    # 0 1 2
    # 3 4 5
    # - - -
    if pos // 3 < 2:
      new_state = state.copy()
      new_state[pos], new_state[pos + 3] = new_state[pos + 3], new_state[pos]
      successors.append({'state': new_state, 'parent': parent, 'action': 'D', 'cost': new_cost})

    return successors



## Exercise 1: 
#### New implementations:
- **Report the number of Visited Nodes**: To keep track of the number of visited nodes, I simply add a counter that increments every time we visit a new node. In the BFS implementation, this would be every time we pull a node from the frontier and expand it.
- **Report the effective branching factor**: The effective branching factor for a tree-based search algorithm like BFS can be calculated as: 

b = Number of nodes Generated / Number of nodes visited. 

I keep track of the number of nodes expanded and generated in the same way I keep track of the number of visited nodes.


In [822]:
import queue

# We define the failure value clearly, to simplify checking if needed
FAILURE = {'state': None, 'parent': None, 'action': None, 'cost': None}

# Uodated BFS code
def bfs_modified(problem):
    initial_state = problem.get_initial_state()
    final_state = problem.final_state # Access the final state directly from the problem instance
    inital_node = {'state': initial_state, 'parent': None, 'action': None, 'cost': 0}

    # Check if the intial state is already the final state
    if initial_state == final_state:
        return inital_node, 1, 0 # return the initial node, 1 node visited and 0 effective branching factor

    # intialize the frontier and reached list
    frontier = queue.Queue() 
    frontier.put(inital_node) 
    reached = [initial_state] 

    # Initialize counters for finding visited nodes and branching factor
    nodes_visited = 0 
    nodes_generated = 0 

    while not frontier.empty():
        node = frontier.get() 
        nodes_visited += 1 

        # Check if the node is the final state
        if problem.is_end(node['state']):
            effective_branching_factor = nodes_generated / nodes_visited 
            return node, nodes_visited, effective_branching_factor
        
        # Expand the node
        for successor in problem.expand(node): 
            if not successor['state'] in reached: # only add the node if it is not in the reached list
                reached.append(successor['state']) 
                frontier.put(successor) 
                nodes_generated += 1
    # # calculate the effective branching factor.
    effective_branching_factor = nodes_generated / nodes_visited if nodes_visited > 0 else 0 # Adding a check to avoid division by zero error
    return FAILURE, nodes_visited, effective_branching_factor # return the failure value if no solution is found, nodes expanded and the effective branching factor

In [823]:
def json_to_list(state): # function to convert the state from a string to a list
  return list(map(lambda n: int(n), state.split(',')))

def submit_solution_to_server(initial_state, path):
    # Convert the initial state to the required string format
    initial_state_str = ','.join(map(lambda n: str(n), initial_state))
    
    # If the initial state is already the final state, set the path to a default value (e.g., 'U')
    if not path:
        path = [''] # enter here the expected path for an already solved problem

    # Prepare the answer to be submitted to the server
    answer = {
        'initial_state': initial_state_str,
        'path': '-'.join(map(lambda n: n['action'], filter(lambda n: not n['action'] == None, path)))
    }
    
    # Submit the answer to the server for checking
    submit_result = submit_answer('sliding_tiles', answer)
    
    return submit_result

# Get the problem from the server
problem = get_problem('sliding_tiles')
pprint.pprint(problem)

# Convert the state from a string to a list
initial_state = json_to_list(problem['initial-state'])
final_state = json_to_list(problem['final-state'])

# Un-comment these lines to override the initial state provided by the server if needed for debugging
# initial_state = [0, 2, 3, 1, 4, 6, 7, 5, 8] # easy (~4 depth)
# initial_state = [1, 2, 5, 8, 7, 6, 4, 3, 0] # hard (~15 depth)
# initial_state = [3, 6, 2, 7, 0, 1, 8, 4, 5] # very hard (>20 depth)


# Use the modified BFS to solve the problem and get the nodes visited and branching factor
solution, nodes_visited, branching_factor = bfs_modified(SlidingTiles(initial_state, final_state))

# Display the nodes visited and the effective branching factor
print(f"Nodes Visited: {nodes_visited}")
print(f"Effective Branching Factor: {branching_factor:.2f}") 

# Reconstruct the path to the solution
path = []
node = solution
while node != None:
    path.insert(0, node)
    node = node['parent']

# Print the path in a way to make it easy to debug and check
for node in path:
    print('------- %s' % node['action'])
    state = node['state']
    for i in range(3):
        print('¦ %i %i %i ¦' % (state[i*3], state[i*3 + 1], state[i*3 + 2]))


submit_result = submit_solution_to_server(initial_state , path)
print(submit_result)
#print(path)


{'explanation': '3x3 grid of tiles, with one missing. Tiles can only move from '
                'an adjacent place to the empty one (0).',
 'final-state': '1,2,3,4,5,6,7,8,0',
 'initial-state': '1,0,3,4,2,6,7,5,8',
 'path-format': '([UDLR](-[UDLR])*)?'}
Nodes Visited: 19
Effective Branching Factor: 1.63
------- None
¦ 1 0 3 ¦
¦ 4 2 6 ¦
¦ 7 5 8 ¦
------- D
¦ 1 2 3 ¦
¦ 4 0 6 ¦
¦ 7 5 8 ¦
------- D
¦ 1 2 3 ¦
¦ 4 5 6 ¦
¦ 7 0 8 ¦
------- R
¦ 1 2 3 ¦
¦ 4 5 6 ¦
¦ 7 8 0 ¦
[92mPASSED
None


## Exercise 2: 
#### A* implementation:
- **Solved sliding-tile puzzles with A star Search**: Implemented more than 1 heuristic: Manhattan distance, Misplaced tiles, and Linear Conflict.
- **Report the number of Visited Nodes**: To keep track of the number of visited nodes, I simply add a counter that increments every time we visit a new node. In the A* implementation, this would be every time we pull a node from the frontier and expand it.
- **Report the effective branching factor**: The effective branching factor for a tree-based search algorithm like A* can be calculated as: 

ebf = Number of nodes Generated / Number of nodes visited. 


In [824]:
# Define the heuristic functions

# Heuristic 1: Manhattan Distance (sum of the distances of each tile to its final position)
def manhattan_distance(state, final_state):
    distance = 0
    for i in range(len(state)): # iterate over the state
        if state[i] != 0: # ignore the empty space
            goal_x, goal_y = divmod(final_state.index(state[i]), 3) # get the goal position of the tile
            state_x, state_y = divmod(i, 3) # get the current position of the tile
            distance += abs(goal_x - state_x) + abs(goal_y - state_y) # calculate the distance between the current position and the goal position
    return distance

# Heuristic 2: Number of misplaced tiles
def misplaced_tiles(state, final_state):
    misplaced = 0
    for i in range(len(state)): # iterate over the state
        if state[i] != final_state[i]: # if the tile is not in the correct position
            misplaced += 1 # increment the misplaced counter
    return misplaced

# Heuristic 3: Linear conflict (Manhattan Distance + 2 * Number of linear conflicts)
# adds a penalty for tiles that are in the correct row or column but are reversed relative to each other.
# I credit this idea from the following paper: https://cdn.aaai.org/AAAI/1996/AAAI96-178.pdf

def linear_conflict(state, final_state):
    # calculate the manhattan distance
    distance = manhattan_distance(state, final_state)

    # calculate the number of linear conflicts in rows
    for row in range(3):
        tiles_in_row = state[row * 3 : row * 3 + 3] #current
        goal_tiles_in_row = final_state[row * 3 : row * 3 + 3]
        for i in range(3):
            for j in range(i + 1, 3):# to compare
                # if the tiles are in the same row and the goal tiles are in the same row and the tiles are reversed
                if (tiles_in_row[i] in goal_tiles_in_row and tiles_in_row[j] in goal_tiles_in_row and tiles_in_row[i] > tiles_in_row[j]):
                        distance += 2 #extra penalty

    # calculate the number of linear conflicts in columns
    for col in range(3):
        tiles_in_col = [state[col], state[col + 3], state[col + 6]] 
        goal_tiles_in_col = [final_state[col], final_state[col + 3], final_state[col + 6]]
        for i in range(3):
            for j in range(i + 1, 3):
                # if the tiles are in the same column and the goal tiles are in the same column and the tiles are reversed
                if (tiles_in_col[i] in goal_tiles_in_col and tiles_in_col[j] in goal_tiles_in_col and tiles_in_col[i] > tiles_in_col[j]):
                        distance += 2

    return distance

# Test the heuristic functions

test_state = [2,1,3,
              4,0,6,
              7,5,8]

goal_state = [1, 2, 3, 
              4, 5, 6, 
              7, 8, 0]

print(manhattan_distance(test_state, goal_state))
print(misplaced_tiles(test_state, goal_state))
print(linear_conflict(test_state, goal_state))




4
5
6


In [825]:
# A* Implementation
import heapq 

# We define the failure value clearly, to simplify checking if needed
FAILURE = {'state': None, 'parent': None, 'action': None, 'cost': None}

def a_star(problem, heuristic):
  start_node = {'state': problem.get_initial_state(), 'parent': None, 'action': None, 'cost': 0} 
  frontier = [(heuristic(start_node['state'], problem.final_state),0, start_node)] # initialize the frontier with the initial state
  reached = {tuple(start_node['state']): start_node} # initialize the reached list with the initial state

  nodes_visited = 0 # nodes pulled from the frontier and expanded

  while frontier:
    # Get the node with the lowest cost from the frontier
    _, _, current_node = heapq.heappop(frontier)
    nodes_visited += 1

    # Check if the current node is the goal state
    if problem.is_end(current_node['state']):
      effective_branching_factor = len(reached) / nodes_visited if nodes_visited > 1 else 0
      return current_node, nodes_visited, effective_branching_factor
    
    # Expand the current node
    for successor in problem.expand(current_node):
      cost_to_successor = current_node['cost'] + 1 # cost to the successor is the cost of the current node + 1
      heuristic_val = heuristic(successor['state'], problem.final_state)
      total_cost = cost_to_successor + heuristic_val

      # Check if the successor is not in the reached list or if it is but with a higher cost
      if tuple(successor['state']) not in reached or cost_to_successor < reached[tuple(successor['state'])]['cost']:
          successor['cost'] = cost_to_successor # update the cost of the successor
          successor['parent'] = current_node # update the parent of the successor
          reached[tuple(successor['state'])] = successor # add the successor to the reached list
          heapq.heappush(frontier, (total_cost, id(successor), successor)) # add the successor to the frontier, use id(successor) to avoid comparing the nodes

  effective_branching_factor = len(reached) / nodes_visited if nodes_visited > 1 else 0
  return FAILURE, nodes_visited, effective_branching_factor

  

In [826]:
# Code to test all the heuristics to the server and compare the results
def test_heuristic(heuristic_func):

    print(f"Testing heuristic: {heuristic_func.__name__}")
    print("-"*40)  # Print the name of the heuristic

    # Use the A* algorithm to solve the problem and get the nodes visited and branching factor
    solution, nodes_visited, branching_factor = a_star(SlidingTiles(initial_state, final_state), heuristic_func)

    # Display the nodes visited and the effective branching factor
    print(f"Nodes Visited: {nodes_visited}")
    print(f"Effective Branching Factor: {branching_factor:.2f}")

    # Reconstruct the path to the solution
    path = []
    node = solution
    while node != None:
        path.insert(0, node)
        node = node['parent']

    # Submit the solution to the server and print the result
    submit_result = submit_solution_to_server(initial_state, path)
    print(f"Result: {submit_result}\n")
    
    return path

# Get the problem from the server
problem = get_problem('sliding_tiles')

# only print the initial-state and final-state bc of limited output space
print(f"Initial State: {problem['initial-state']}")
print(f"Final State: {problem['final-state']}")
print("\n")  # Add an extra newline for separation

initial_state = json_to_list(problem['initial-state'])
final_state = json_to_list(problem['final-state'])

# Call the test function for each heuristic and store the path (assuming each heuristic will produce the same path)
paths = []
for heuristic in [manhattan_distance, misplaced_tiles, linear_conflict]:
    paths.append(test_heuristic(heuristic))

# Print the path only once at the end
print("\nSolution Path:")
print("-"*40)
for node in paths[0]:  
    print('------- %s' % node['action'])
    state = node['state']
    for i in range(3):
        print('¦ %i %i %i ¦' % (state[i*3], state[i*3 + 1], state[i*3 + 2]))



Initial State: 1,2,3,0,4,8,7,6,5
Final State: 1,2,3,4,5,6,7,8,0


Testing heuristic: manhattan_distance
----------------------------------------
Nodes Visited: 9
Effective Branching Factor: 2.11
[92mPASSED
Result: None

Testing heuristic: misplaced_tiles
----------------------------------------
Nodes Visited: 17
Effective Branching Factor: 1.76
[92mPASSED
Result: None

Testing heuristic: linear_conflict
----------------------------------------
Nodes Visited: 96
Effective Branching Factor: 1.80
[92mPASSED
Result: None


Solution Path:
----------------------------------------
------- None
¦ 1 2 3 ¦
¦ 0 4 8 ¦
¦ 7 6 5 ¦
------- R
¦ 1 2 3 ¦
¦ 4 0 8 ¦
¦ 7 6 5 ¦
------- D
¦ 1 2 3 ¦
¦ 4 6 8 ¦
¦ 7 0 5 ¦
------- R
¦ 1 2 3 ¦
¦ 4 6 8 ¦
¦ 7 5 0 ¦
------- U
¦ 1 2 3 ¦
¦ 4 6 0 ¦
¦ 7 5 8 ¦
------- L
¦ 1 2 3 ¦
¦ 4 0 6 ¦
¦ 7 5 8 ¦
------- D
¦ 1 2 3 ¦
¦ 4 5 6 ¦
¦ 7 0 8 ¦
------- R
¦ 1 2 3 ¦
¦ 4 5 6 ¦
¦ 7 8 0 ¦


#### SLiding Tile Heuristic Performance Comparison

- **Manhattan Distance** and **Misplaced Tiles**:
  - Both heuristics generally exhibited comparable performance metrics.
  - They efficiently guided the A* search, resulting in fewer visited nodes for the given test cases. This suggests that, for simple to moderately complex states, they can find solutions quickly.
  
- **Linear Conflict**:
  - This heuristic is more sophisticated, incorporating additional knowledge about the problem into its estimates. Specifically, it accounts for tiles that are in the correct row or column but are in conflict.
  - As a result, it often visited more nodes than the other two heuristics. This increased number of visited nodes indicates a broader and potentially more exhaustive search.
  - The lower branching factor observed with Linear Conflict suggests that, although it might examine more nodes, the paths it explores tend to be less branched. This might be because it's considering more detailed constraints on potential solutions.
  
- **Overall Takeaway**:
  - While Manhattan Distance and Misplaced Tiles provided a more direct approach to finding solutions in our test instances, Linear Conflict's method, though more exhaustive, might be advantageous in puzzles of greater complexity or those that involve more intricate conflicts.


## Exercise 3: 
#### Labyrinth implementation:
- **Implemented Labyrinth class**: The Labyrinth class has the following methods:
  - **is_end**: Returns True if the current state is a goal state. In this case, the goal state is when the agent is in the same position as the goal.
  - **get_initial_state**: Returns the initial state of the problem. In this case, the initial state is the position of the agent.
  - **expand**: Returns a list of all the possible actions that can be taken from the current state. In this case, the possible actions are the directions that the agent can move in.
  
- **Solved labyrinth with new Uninformed stradegy**: Bidirectional Search: Forward and backward search from the initial and goal states. The search stops when the two searches meet. The path is then reconstructed by combining the paths from the initial and goal states to the meeting point. In this case Uniform cost search is not optimal since the cost of each action is always 1.
- **Solved labyrinth with Informed stradegy**: Reused A*

In [827]:
#labyrinth Implementation

class Labyrinth:
    def __init__(self, map):
        self.map_matrix = [list(line) for line in map]
        self.rows = len(self.map_matrix)
        self.cols = len(self.map_matrix[0])
        
    def is_end(self, state):
        x, y = state
        return self.map_matrix[x][y] == 'E' 

    def get_initial_state(self):
        for x in range(self.rows):
            for y in range(self.cols):
                if self.map_matrix[x][y] == 'S':
                    return (x, y)
        return None  # Ideally, this line should never be reached

    def get_goal_state(self):
        for x in range(self.rows):
            for y in range(self.cols):
                if self.map_matrix[x][y] == 'E':
                    return (x, y)
        return None  # Ideally, this line should never be reached

    def expand(self, parent):
        children = []
        x, y = parent['state'] # current state
        # Possible moves
        moves = [('U', (-1, 0)), ('D', (1, 0)), ('L', (0, -1)), ('R', (0, 1))]

        # Expand the node
        for action, (change_x, change_y) in moves: 
            new_x, new_y = x + change_x, y + change_y 
            # Check boundaries and avoid walls
            if 0 <= new_x < self.rows and 0 <= new_y < self.cols and self.map_matrix[new_x][new_y] not in ['#']:
                child = {
                    'state': (new_x, new_y),
                    'parent': parent,
                    'action': action,
                    'cost': parent['cost'] + 1  # each move has a cost of 1
                }
                children.append(child)
                
        return children

In [828]:
# PriorityQueue fix from Professor:
import functools

@functools.total_ordering
class Prioritize:
    def __init__(self, priority, item):
        self.priority = priority
        self.item = item

    def __eq__(self, other):
        return self.priority == other.priority

    def __lt__(self, other):
        return self.priority < other.priority


In [829]:
# Bidirectional Search Implementation: from session 6 slides
'''''
Psuedocode from slides:
function BIBF-SEARCH(problemF , fF , problemB, fB) returns a solution node, or failure
    nodeF ← NODE(problemF.INITIAL) // Node for a start state
    nodeB ← NODE(problemB.INITIAL) // Node for a goal state
    frontierF ← a priority queue ordered by fF, with nodeF as an element
    frontierB ←a priority queue ordered by fB, with nodeB as an element
    reachedF ← a lookup table, with one key nodeF.STATE and value nodeF
    reachedB ← a lookup table, with one key nodeB.STATE and value nodeB
    solution ←failure
    while not TERMINATED(solution, frontierF, frontierB) do
        if fF (TOP(frontierF)) < fB(TOP(frontierB)) then
            solution ← PROCEED(F, problemF, frontierF, reachedF, reachedB, solution)
        else solution ← PROCEED(B, problemB, frontierB, reachedB, reachedF, solution)
    return solution
'''''
from queue import PriorityQueue

def bidirectional_search(problem):
    #Helper function to get a unique string for state to use in reached
    def state_to_str(state): #  convert state representations into strings so they can be easily compared and stored 
        return f"{state[0]}-{state[1]}" 

    #Initialization
    start = {'state': problem.get_initial_state(), 'parent': None, 'action': None, 'cost': 0}
    goal = {'state': problem.get_goal_state(), 'parent': None, 'action': None, 'cost': 0}

    frontierF = PriorityQueue()
    #use prioritize class to fix the priority queue
    frontierF.put(Prioritize(0, start))

    frontierB = PriorityQueue()
    frontierB.put(Prioritize(0, goal))

    reachedF = {state_to_str(start['state']): start}
    reachedB = {state_to_str(goal['state']): goal}

    nodes_visited = 0
    explored_states = set()

    while not frontierF.empty() and not frontierB.empty(): # 
        #Expand forward frontier
        currentF = frontierF.get().item
        explored_states.add(currentF['state'])

        #Expand backward frontier
        currentB = frontierB.get().item
        explored_states.add(currentB['state'])

        #Checking for solution
        if state_to_str(currentF['state']) in reachedB:
            # Return path
            solutionF = currentF
            solutionB = reachedB[state_to_str(currentF['state'])] # geting the goal state from the reachedB dictionary
            # Construct paths from the start to the intersection and from the goal to the intersection
            pathF = [] 
            while solutionF['parent']:
                pathF.append(solutionF['action'])
                solutionF = solutionF['parent']
            pathF.reverse()

            pathB = []
            while solutionB['parent']:
                pathB.append(solutionB['action'])
                solutionB = solutionB['parent']

            solution_path = pathF + pathB
            ebf = nodes_visited ** (1 / len(solution_path))
            return solution_path, nodes_visited, ebf, explored_states

        nodes_visited += 1

        # Expand the currentF node
        for child in problem.expand(currentF):
            child_state_str = state_to_str(child['state'])
            if child_state_str not in reachedF:
                reachedF[child_state_str] = child
                #frontierF.put((child['cost'], child))
                frontierF.put(Prioritize(child['cost'], child))
                
        # Expand the currentB node
        for child in problem.expand(currentB):
            child_state_str = state_to_str(child['state'])
            if child_state_str not in reachedB:
                reachedB[child_state_str] = child
                #frontierB.put((child['cost'], child))
                frontierB.put(Prioritize(child['cost'], child))
                
        # Check after expanding both, if there's a state that is in both reachedF and reachedB
        intersecting_state_str = set(reachedF.keys()).intersection(set(reachedB.keys()))
        if intersecting_state_str:
            intersecting_state_str = intersecting_state_str.pop()
            # Get the solution from the forward and backward dictionaries
            solutionF = reachedF[intersecting_state_str]
            solutionB = reachedB[intersecting_state_str]
            
            # Construct paths from the start to the intersection and from the goal to the intersection
            pathF = []
            while solutionF['parent']:
                pathF.append(solutionF['action'])
                solutionF = solutionF['parent']
            pathF.reverse()

            pathB = []
            while solutionB['parent']:
                pathB.append(solutionB['action'])
                solutionB = solutionB['parent']
            pathB.reverse()

            solution_path = pathF + pathB
            ebf = nodes_visited ** (1 / len(solution_path))
            return solution_path, nodes_visited, ebf, explored_states

    # If we exit the loop, that means there's no solution
    return None, nodes_visited, float('inf'), explored_states  # No solution found




In [841]:
# get the coordinates of the starting point for the visualization
def get_starting_point(map_matrix):
    for row_idx, row in enumerate(map_matrix):
        for col_idx, cell in enumerate(row):
            if cell == 'S':
                return row_idx, col_idx

# Visualize the final solution on the map
def visualize_solution(original_map, solution_path):
    map_matrix = [list(row) for row in original_map]
    state = get_starting_point(map_matrix)  # Get starting coordinates
    
    for action in solution_path:
        row, col = state
        if action == 'U':
            row -= 1
        elif action == 'D':
            row += 1
        elif action == 'L':
            col -= 1
        elif action == 'R':
            col += 1
        
        if map_matrix[row][col] == '.':
            map_matrix[row][col] = '*'
        state = (row, col)
    
    # Print the solution map
    for row_content in map_matrix:
        print("".join(row_content))

# visualize all explored states on the map
def visualize_explored(original_map, explored_states):
    map_matrix = [list(row) for row in original_map]
    
    for state in explored_states:
        row, col = state
        if map_matrix[row][col] == '.':
            map_matrix[row][col] = 'o'
    
    # Print the modified map
    for row_content in map_matrix:
        print("".join(row_content))

problem = get_problem('labyrinth-small')
#pprint.pprint(problem)

solution, nodes_visited, ebf, explored_states = bidirectional_search(Labyrinth(problem['map']))

# If a solution is found, we can reconstruct the path in a similar way as before:
if solution:
    path = solution

    # debug
    #print(solution)
    #print(type(solution))
    
    #node = solution
    #while node and node['action']:
        #path.insert(0, node['action'])
        # debug
        #print(node['parent'])
        #print(type(node['parent']))

        #node = node['parent']
    answer = {'path': '-'.join(path)}
    
    # Submit the answer and print the result
    submit_answer('labyrinth-small', answer)
    print(answer)

    # print the nodes visited and the effective branching factor
    print(f"Nodes Visited: {nodes_visited}")
    print(f"Effective Branching Factor: {ebf:.2f}")
    
    # Visualize the solution on the map
    print("\nSolution Path on Map:")
    visualize_solution(problem['map'], path)
    
    # Visualize all explored states on the map
    print("\nAll Explored States on Map:")
    visualize_explored(problem['map'], explored_states)





[91mFAILED: Invalid move onto wall in pos 16 4
{'path': 'D-R-R-R-R-R-D-D-L-L-D-D-D-D-D-D-D-D-D-D-D-D-L-L-D-D-R-R-R-R-L-U-U-U-L-L-L-L-U-U-L-L-L-L-D-D-D-D-L-L-L-L-L-L-U-U'}
Nodes Visited: 78
Effective Branching Factor: 1.08

Solution Path on Map:
S.###################
******........#.#...#
#####*#.#######.###.#
#..***#.....#.#.....#
#.#*###.#####.#.#.#.#
#.#*#.......#...#.#.#
###*#.#.#######.#.###
#..*#.#.#.....#.#...#
###*#.#.#.#####.#####
#..*#.#.#.#.#...#...#
#.#*#####.#.#.#.#.#.#
#.#*..#.#...#.#...#.#
###*###.###.#.#####*#
#..*#.#...#...#.#*#.#
###*#.#.#.#*###.#*#.#
#***#.#.#..*....#*#.#
#*###.#####*###*#*###
#*****#...#.#.#.....#
#.###.###.#.#.#.#.#.#
#...#.......#...#.#.#
###################.E

All Explored States on Map:
So###################
oooooooooooooo#.#...#
#####o#o#######.###.#
#ooooo#ooooo#.#.....#
#o#o###o#####.#.#.#.#
#o#o#ooooooo#...#.#.#
###o#o#o#######.#.###
#ooo#o#o#ooo..#.#...#
###o#o#o#o#####o#####
#ooo#o#o#o#o#ooo#...#
#o#o#####o#o#o#o#.#.#
#o#ooo#o#ooo#o#...#.#

### Labyrinth Informed stradegy: Adapt A* Implementation

In [831]:
# Heuristics defined here
import math

# Heuristic 1: Manhattan Distance (sum of the distances of each tile to its final position)

def manhattan_distance(current_state, goal_state):
    x1, y1 = current_state
    x2, y2 = goal_state
    return abs(x1 - x2) + abs(y1 - y2)

# Heuristic 2: Euclidean Distance (straight line distance between the current state and the goal state)

def euclidean_distance(current_state, goal_state):
    x1, y1 = current_state
    x2, y2 = goal_state
    return math.sqrt((x1 - x2)**2 + (y1 - y2)**2) # uses pythagorean theorem (THis is the first time ive ever used it in my life lol)

# Test the heuristic functions
start = (0, 0)
goal = (3, 4)

print(manhattan_distance(start, goal))
print(euclidean_distance(start, goal))


7
5.0


In [832]:
# A* code: Aiming to use the same code as before
import heapq 

def find_end_state(map_matrix):
  for i, row in enumerate(map_matrix):
      for j, cell in enumerate(row):
          if cell == 'E':
              return i, j
  return None

# We define the failure value clearly, to simplify checking if needed
FAILURE = {'state': None, 'parent': None, 'action': None, 'cost': None}

def a_star(problem, heuristic):
  start_node = {'state': problem.get_initial_state(), 'parent': None, 'action': None, 'cost': 0} 
  final_state = find_end_state(problem.map_matrix)  # fetch the final state using the new method I made
  frontier = [(heuristic(start_node['state'], final_state), 0, start_node)]
  reached = {start_node['state']: start_node} 

  nodes_visited = 0 # nodes pulled from the frontier and expanded

  while frontier:
    # Get the node with the lowest cost from the frontier
    _, _, current_node = heapq.heappop(frontier)
    nodes_visited += 1

    # Check if the current node is the goal state
    if problem.is_end(current_node['state']):
      effective_branching_factor = len(reached) / nodes_visited if nodes_visited > 1 else 0
      return current_node, nodes_visited, effective_branching_factor
    
    # Expand the current node
    for successor in problem.expand(current_node):
      cost_to_successor = current_node['cost'] + 1 # cost to the successor is the cost of the current node + 1
      heuristic_val = heuristic(successor['state'], final_state)
      total_cost = cost_to_successor + heuristic_val

      # Check if the successor is not in the reached list or if it is but with a higher cost
      if successor['state'] not in reached or cost_to_successor < reached[successor['state']]['cost']:
          successor['cost'] = cost_to_successor # update the cost of the successor
          successor['parent'] = current_node # update the parent of the successor
          reached[successor['state']] = successor # add the successor to the reached list
          heapq.heappush(frontier, (total_cost, id(successor), successor)) # add the successor to the frontier, use id(successor) to avoid comparing the nodes

  effective_branching_factor = len(reached) / nodes_visited if nodes_visited > 1 else 0
  return FAILURE, nodes_visited, effective_branching_factor

  

In [845]:
# test the A* with each heuristic and compare the results
def test_a_star_with_heuristics(problem_name, heuristics):
    problem = get_problem(problem_name)
    for heuristic in heuristics:
        print(f"Testing heuristic: {heuristic.__name__}")
        print("----------------------------------------")
        
        solution, nodes_visited, ebf = a_star(Labyrinth(problem['map']), heuristic)
        
        if solution and solution != FAILURE:
            path = []
            node = solution
            while node and node['action']:
                path.insert(0, node['action'])
                node = node['parent']
            answer = {'path': '-'.join(path)}
            
            # Submit the answer and print the result
            response = submit_answer(problem_name, answer)
            print(response)  # Assuming the response contains the string "PASSED" on successful submission
            
            # print the nodes visited and the effective branching factor
            print(f"Nodes Visited: {nodes_visited}")
            print(f"Effective Branching Factor: {ebf:.2f}")
            
            # Visualize the solution on the map
            print("\nSolution Path on Map:")
            visualize_solution(problem['map'], path)
            
            print("\n")  # For spacing between the two heuristic test results
            print(answer)
        
        else:
            print("No solution found!")

# Using the two heuristic functions to test A*
test_a_star_with_heuristics('labyrinth-small', [manhattan_distance, euclidean_distance])

#print(answer)
#print(type(answer))


Testing heuristic: manhattan_distance
----------------------------------------
[92mPASSED
None
Nodes Visited: 116
Effective Branching Factor: 1.05

Solution Path on Map:
S*###################
.*****........#.#...#
#####*#.#######.###.#
#..***#.....#.#.....#
#.#*###.#####.#.#.#.#
#.#*#.......#...#.#.#
###*#.#.#######.#.###
#..*#.#.#.....#.#...#
###*#.#.#.#####.#####
#..*#.#.#.#.#...#...#
#.#*#####.#.#.#.#.#.#
#.#*..#.#...#.#...#.#
###*###.###.#.#####.#
#..*#.#...#...#.#.#.#
###*#.#.#.#.###.#.#.#
#***#.#.#..*****#.#.#
#*###.#####*###*#.###
#*****#...#*#.#*****#
#.###*###.#*#.#.#.#*#
#...#*******#...#.#*#
###################*E


{'path': 'R-D-R-R-R-R-D-D-L-L-D-D-D-D-D-D-D-D-D-D-D-D-L-L-D-D-R-R-R-R-D-D-R-R-R-R-R-R-U-U-U-U-R-R-R-R-D-D-R-R-R-R-D-D-D-R'}
Testing heuristic: euclidean_distance
----------------------------------------
[92mPASSED
None
Nodes Visited: 124
Effective Branching Factor: 1.05

Solution Path on Map:
S*###################
.*****........#.#...#
#####*#.#######.###.#
#..*

### Labyrinth A* Conclusion:

**Manhattan vs. Euclidean Distance:**

- **Manhattan Distance:**
   - Suited for grid-based problems with movements in cardinal directions.
   - Visited Nodes: 118; Effective Branching Factor: 1.04.

- **Euclidean Distance:**
   - Represents actual straight-line distance, ideal if diagonal movements are possible.
   - Visited Nodes: 124; Effective Branching Factor: 1.05.

**Comparison:**
Manhattan Distance was slightly more efficient in terms of nodes visited in this specific problem. However, the differences are marginal. For strict grid-based problems, Manhattan might be more apt, while Euclidean could be favored when there's potential for diagonal paths.
