In [39]:
import heapq

class Node:
    def __init__(self, state, parent=None, action=None, cost=0, priority=None):
        self.state = state
        self.parent = parent
        self.action = action
        self.cost = cost
        # For priority queues: if no priority is given, use the cost (for UCS)
        self.priority = cost if priority is None else priority

    # This allows nodes to be compared by their priority (for use in a heap)
    def __lt__(self, other):
        return self.priority < other.priority

In [40]:
class StackFrontier:
    def __init__(self):
        self.frontier = []

    def add(self, node):
        self.frontier.append(node)

    def contains_state(self, state):
        return any(node.state == state for node in self.frontier)

    def empty(self):
        return len(self.frontier) == 0

    def remove(self):
        if self.empty():
            raise Exception("empty frontier")
        # LIFO removal for DFS
        return self.frontier.pop()

class QueueFrontier:
    def __init__(self):
        self.frontier = []

    def add(self, node):
        self.frontier.append(node)

    def contains_state(self, state):
        return any(node.state == state for node in self.frontier)

    def empty(self):
        return len(self.frontier) == 0

    def remove(self):
        if self.empty():
            raise Exception("empty frontier")
        # FIFO removal for BFS
        return self.frontier.pop(0)

class PQueueFrontier:
    def __init__(self):
        self.frontier = []

    def add(self, node):
        heapq.heappush(self.frontier, node)

    def contains_state(self, state):
        return any(node.state == state for node in self.frontier)

    def empty(self):
        return len(self.frontier) == 0

    def remove(self):
        if self.empty():
            raise Exception("empty frontier")
        return heapq.heappop(self.frontier)


In [41]:
class GraphProblem:
    def __init__(self, vertices, edges, start, goal, heuristic_values=None):
        self.vertices = vertices
        self.edges = edges
        self.start = start
        self.goal = goal
        # Provide a heuristic value for each vertex; if none given, default to 0
        self.heuristic_values = heuristic_values if heuristic_values is not None else [0] * len(vertices)

    def initial_state(self):
        return self.start

    def goal_test(self, state):
        return state == self.goal

    def result(self, state):
        """
        Return a list of (action, next_state, step_cost) for a given state.
        The 'action' can simply be the same as the next_state.
        """
        results = []
        for edge in self.edges:
            # Check if this edge is connected to the current state.
            if edge[0] == state:
                # If a cost is provided, use it; otherwise default to 1.
                cost = edge[2] if len(edge) > 2 else 1
                results.append((edge[1], edge[1], cost))
            elif edge[1] == state:
                cost = edge[2] if len(edge) > 2 else 1
                results.append((edge[0], edge[0], cost))
        results.sort()  # Sorting for consistency
        return results

    def path_cost(self, state, newstate):
        """
        Return the cost to go from state to newstate based on the edge information.
        """
        for edge in self.edges:
            if ((edge[0] == state and edge[1] == newstate) or
                (edge[1] == state and edge[0] == newstate)):
                return edge[2] if len(edge) > 2 else 1
        return float("inf")

    def heuristic(self, state):
        # Return the heuristic value for a given state.
        if self.heuristic_values:
            return self.heuristic_values[self.vertices.index(state)]
        return 0


In [42]:
def graph_search(problem):
    """
    Standard graph search using a FIFO (queue) frontier (i.e. breadth-first search).
    Returns a tuple (solution, explored) where:
      - solution is (actions, path) from start to goal.
      - explored is the set of states that were expanded.
    """
    start = Node(state=problem.initial_state(), parent=None, action=None, cost=0)
    frontier = QueueFrontier()
    frontier.add(start)
    explored = set()

    while not frontier.empty():
        node = frontier.remove()
        if problem.goal_test(node.state):
            actions = []
            cells = []
            while node.parent is not None:
                actions.append(node.action)
                cells.append(node.state)
                node = node.parent
            cells.append(node.state)
            actions.reverse()
            cells.reverse()
            return (actions, cells), explored

        explored.add(node.state)
        for action, next_state, step_cost in problem.result(node.state):
            if next_state not in explored and not frontier.contains_state(next_state):
                child = Node(
                    state=next_state,
                    parent=node,
                    action=action,
                    cost=node.cost + step_cost
                )
                frontier.add(child)
    raise Exception("No solution found")

def graph_search_cost(problem, frontier_algo, use_astar=False):
    """
    Graph search that takes cost into account. Use 'frontier_algo' as the frontier class,
    for example PQueueFrontier for Uniform Cost Search (UCS) or A* Search.

    If 'use_astar' is True, then the node's priority is set to (g + h),
    otherwise it is just the path cost g.

    Returns a tuple (solution, explored) where:
      - solution is (actions, path) from start to goal.
      - explored is a dictionary mapping states to the best cost found.
    """
    start = Node(state=problem.start, parent=None, action=None, cost=0)
    # Set initial priority: for A* add heuristic, for UCS use cost only.
    start.priority = start.cost + problem.heuristic(start.state) if use_astar else start.cost

    frontier = frontier_algo()
    frontier.add(start)
    explored = {}  # state -> best cost found

    while not frontier.empty():
        node = frontier.remove()
        if problem.goal_test(node.state):
            actions = []
            cells = []
            while node.parent is not None:
                actions.append(node.action)
                cells.append(node.state)
                node = node.parent
            cells.append(node.state)
            actions.reverse()
            cells.reverse()
            return (actions, cells), explored

        # Skip if we have already found a better cost for this state.
        if node.state in explored and explored[node.state] <= node.cost:
            continue
        explored[node.state] = node.cost

        for action, next_state, step_cost in problem.result(node.state):
            new_cost = node.cost + step_cost
            child = Node(state=next_state, parent=node, action=action, cost=new_cost)
            if use_astar:
                child.priority = new_cost + problem.heuristic(next_state)
            else:
                child.priority = new_cost
            # Only add if we haven't explored next_state with a lower cost.
            if next_state not in explored or new_cost < explored.get(next_state, float('inf')):
                frontier.add(child)
    raise Exception("No solution found")

In [44]:
vertices = ['S', 'A', 'B', 'C', 'D', 'E', 'F', 'H', 'J', 'K', 'L', 'T']

In [45]:
edges = [
    ('S', 'A'),
    ('S', 'B'),
    ('A', 'F'),
    ('B', 'F'),
    ('B', 'C'),
    ('C', 'E'),
    ('D', 'H'),
    ('C', 'J'),
    ('F', 'H'),
    ('H', 'K'),
    ('J', 'L'),
    ('K', 'J'),
    ('K', 'T'),
    ('L', 'T')
]

In [46]:
problem = GraphProblem(vertices, edges, 'S', 'T')
solution, explored = graph_search(problem)
print(solution)
print(explored)

(['A', 'F', 'H', 'K', 'T'], ['S', 'A', 'F', 'H', 'K', 'T'])
{'B', 'D', 'K', 'A', 'E', 'J', 'F', 'C', 'S', 'H', 'L'}


In [47]:
heuristics = [10, 6, 20, 2, 5, 1, 2, 2, 3, 1, 4, 0]
edges_with_cost = [
    ('S', 'A', 2),
    ('S', 'B', 10),
    ('A', 'F', 4),
    ('B', 'F', 2),
    ('B', 'C', 5),
    ('C', 'E', 5),
    ('D', 'H', 3),
    ('C', 'J', 12),
    ('F', 'H', 8),
    ('H', 'K', 5),
    ('J', 'L', 4),
    ('K', 'J', 1),
    ('K', 'T', 7),
    ('L', 'T', 5)
]

In [49]:
problem = GraphProblem(vertices, edges_with_cost, 'S', 'T', heuristics)
solution_cost, explored_cost = graph_search_cost(problem, PQueueFrontier, use_astar=False)
print("\nUniform Cost Search solution (actions, path):", solution_cost)
print("Cost explored:", explored_cost)


Uniform Cost Search solution (actions, path): (['A', 'F', 'H', 'K', 'T'], ['S', 'A', 'F', 'H', 'K', 'T'])
Cost explored: {'S': 0, 'A': 2, 'F': 6, 'B': 8, 'C': 13, 'H': 14, 'D': 17, 'E': 18, 'K': 19, 'J': 20, 'L': 24}


In [50]:
heuristic_values = [0 for _ in vertices]
problem_astar = GraphProblem(vertices, edges, 'S', 'T', heuristic_values)
solution_astar, explored_astar = graph_search_cost(problem_astar, PQueueFrontier, use_astar=True)
print("\nA* Search solution (actions, path):", solution_astar)
print("A* explored:", explored_astar)


A* Search solution (actions, path): (['B', 'C', 'J', 'K', 'T'], ['S', 'B', 'C', 'J', 'K', 'T'])
A* explored: {'S': 0, 'A': 1, 'B': 1, 'F': 2, 'C': 2, 'E': 3, 'H': 3, 'J': 3, 'D': 4, 'K': 4, 'L': 4}


------------------
# Maze:

In [53]:
import heapq
from tkinter import Tk, Canvas
import matplotlib
matplotlib.use('Agg')  # Use a non-interactive backend
import matplotlib.pyplot as plt
import matplotlib.patches as patches

# -----------------------------------------------
# Node and Frontier Classes for Search Algorithms
# -----------------------------------------------

class Node:
    def __init__(self, state, parent=None, action=None, cost=0, priority=None):
        self.state = state         # for a maze, a state is a tuple (row, col)
        self.parent = parent       # pointer to the previous Node
        self.action = action       # the move made to reach this state ('U', 'D', 'L', 'R')
        self.cost = cost           # cumulative cost so far (for uniform cost or A*)
        # For priority queues: if no priority is given, use the cost
        self.priority = cost if priority is None else priority

    # Allows nodes to be compared by their priority (for use in a heap)
    def __lt__(self, other):
        return self.priority < other.priority

class QueueFrontier:
    # Frontier as a FIFO queue (for breadth-first search)
    def __init__(self):
        self.frontier = []

    def add(self, node):
        self.frontier.append(node)

    def contains_state(self, state):
        return any(node.state == state for node in self.frontier)

    def empty(self):
        return len(self.frontier) == 0

    def remove(self):
        if self.empty():
            raise Exception("empty frontier")
        return self.frontier.pop(0)

class PQueueFrontier:
    # Frontier as a priority queue (for uniform cost search / A*)
    def __init__(self):
        self.frontier = []

    def add(self, node):
        heapq.heappush(self.frontier, node)

    def contains_state(self, state):
        return any(node.state == state for node in self.frontier)

    def empty(self):
        return len(self.frontier) == 0

    def remove(self):
        if self.empty():
            raise Exception("empty frontier")
        return heapq.heappop(self.frontier)

# -----------------------------------------------
# MazeProblem: A Search Problem Based on a Maze String
# -----------------------------------------------

class MazeProblem:
    def __init__(self, maze_str):
        # Parse the maze string into a grid (list of lists)
        self.grid = [list(line) for line in maze_str.strip().splitlines()]
        self.rows = len(self.grid)
        self.cols = len(self.grid[0])
        self.start = None
        self.goal = None
        # Identify start (S) and goal (T) cells.
        for i in range(self.rows):
            for j in range(self.cols):
                if self.grid[i][j] == 'S':
                    self.start = (i, j)
                elif self.grid[i][j] == 'T':
                    self.goal = (i, j)
        if self.start is None or self.goal is None:
            raise Exception("Maze must have a start (S) and a goal (T)")

    def initial_state(self):
        return self.start

    def goal_test(self, state):
        return state == self.goal

    def result(self, state):
        """
        Returns a list of (action, next_state, step_cost) for a given state.
        Allowed moves are: Up, Down, Left, Right (if within bounds and not a wall '#').
        """
        i, j = state
        results = []
        directions = [('U', (i - 1, j)), ('D', (i + 1, j)),
                      ('L', (i, j - 1)), ('R', (i, j + 1))]
        for action, (ni, nj) in directions:
            if 0 <= ni < self.rows and 0 <= nj < self.cols:
                if self.grid[ni][nj] != '#':  # '#' represents a wall
                    results.append((action, (ni, nj), 1))
        return results

    def path_cost(self, state, newstate):
        # In the maze, each step costs 1.
        return 1

    def heuristic(self, state):
        # Manhattan distance from the current state to the goal.
        i, j = state
        gi, gj = self.goal
        return abs(i - gi) + abs(j - gj)

# -----------------------------------------------
# Graph Search Algorithms
# -----------------------------------------------

def graph_search(problem):
    """
    Standard graph search using a FIFO (queue) frontier (i.e. breadth-first search).
    Returns a tuple (solution, explored) where:
      - solution is (actions, path) from start to goal.
      - explored is the set of states that were expanded.
    """
    start = Node(state=problem.initial_state(), parent=None, action=None, cost=0)
    frontier = QueueFrontier()
    frontier.add(start)
    explored = set()

    while not frontier.empty():
        node = frontier.remove()
        if problem.goal_test(node.state):
            actions = []
            cells = []
            while node.parent is not None:
                actions.append(node.action)
                cells.append(node.state)
                node = node.parent
            cells.append(node.state)
            actions.reverse()
            cells.reverse()
            return (actions, cells), explored

        explored.add(node.state)
        for action, next_state, step_cost in problem.result(node.state):
            if next_state not in explored and not frontier.contains_state(next_state):
                child = Node(state=next_state,
                             parent=node,
                             action=action,
                             cost=node.cost + step_cost)
                frontier.add(child)
    raise Exception("No solution found")

def graph_search_cost(problem, frontier_algo, use_astar=False):
    """
    Graph search that takes costs into account (for UCS/A*).
    'frontier_algo' should be a frontier class (like PQueueFrontier).
    Set 'use_astar' to True to add the heuristic (A*), or False for UCS.
    Returns a tuple (solution, explored) where:
      - solution is (actions, path) from start to goal.
      - explored is a dictionary mapping states to the best cost found.
    """
    start = Node(state=problem.initial_state(), parent=None, action=None, cost=0)
    start.priority = start.cost + problem.heuristic(start.state) if use_astar else start.cost
    frontier = frontier_algo()
    frontier.add(start)
    explored = {}

    while not frontier.empty():
        node = frontier.remove()
        if problem.goal_test(node.state):
            actions = []
            cells = []
            while node.parent is not None:
                actions.append(node.action)
                cells.append(node.state)
                node = node.parent
            cells.append(node.state)
            actions.reverse()
            cells.reverse()
            return (actions, cells), explored

        if node.state in explored and explored[node.state] <= node.cost:
            continue
        explored[node.state] = node.cost

        for action, next_state, step_cost in problem.result(node.state):
            new_cost = node.cost + step_cost
            child = Node(state=next_state, parent=node, action=action, cost=new_cost)
            if use_astar:
                child.priority = new_cost + problem.heuristic(next_state)
            else:
                child.priority = new_cost
            if next_state not in explored or new_cost < explored.get(next_state, float('inf')):
                frontier.add(child)
    raise Exception("No solution found")

# -----------------------------------------------
# GUI: Draw the Maze and the Solution Path Using Tkinter
# -----------------------------------------------

def draw_maze_matplotlib(maze_problem, solution_path=None, filename="maze.png"):
    cell_size = 1  # Use unit square cells
    rows = maze_problem.rows
    cols = maze_problem.cols

    fig, ax = plt.subplots(figsize=(cols, rows))
    ax.set_xlim(0, cols)
    ax.set_ylim(0, rows)
    ax.set_aspect('equal')
    ax.invert_yaxis()  # Invert y-axis so row 0 is at the top
    plt.axis('off')

    # Draw maze grid
    for i in range(rows):
        for j in range(cols):
            if maze_problem.grid[i][j] == '#':
                color = 'black'
            elif maze_problem.grid[i][j] == 'S':
                color = 'green'
            elif maze_problem.grid[i][j] == 'T':
                color = 'red'
            else:
                color = 'white'
            rect = patches.Rectangle((j, i), 1, 1, facecolor=color, edgecolor='gray')
            ax.add_patch(rect)

    # Overlay solution path if provided
    if solution_path:
        for (i, j) in solution_path:
            # Avoid overwriting walls, start, or goal.
            if maze_problem.grid[i][j] not in ['#', 'S', 'T']:
                rect = patches.Rectangle((j, i), 1, 1, facecolor='blue', edgecolor='gray')
                ax.add_patch(rect)

    plt.savefig(filename, bbox_inches='tight')
    plt.close()
    print(f"Maze saved as {filename}")


In [55]:
# Define a maze as a string.
# Walls are '#' characters. 'S' is the start, 'T' is the goal.
maze_str = """
#####################
#S  #       #      T#
# # # ##### # #######
# #   #   # #       #
# ##### # # ####### #
#       #           #
#####################
"""
maze_problem = MazeProblem(maze_str)

# Solve the maze using BFS
solution, explored = graph_search(maze_problem)
print("BFS solution (actions, path):", solution)
print("States explored:", explored)

# Uncomment below to use A* (with Manhattan distance heuristic) instead:
# solution, explored = graph_search_cost(maze_problem, PQueueFrontier, use_astar=True)
# print("A* solution (actions, path):", solution)
# print("Explored states and costs:", explored)

# Draw the maze with the solution path highlighted.
# solution[1] is the list of states (cells) from start to goal.
draw_maze_matplotlib(maze_problem, solution_path=solution[1])

BFS solution (actions, path): (['D', 'D', 'D', 'D', 'R', 'R', 'R', 'R', 'R', 'R', 'U', 'U', 'R', 'R', 'D', 'D', 'R', 'R', 'R', 'R', 'R', 'R', 'R', 'R', 'R', 'R', 'U', 'U', 'L', 'L', 'L', 'L', 'L', 'L', 'U', 'U', 'R', 'R', 'R', 'R', 'R', 'R'], [(1, 1), (2, 1), (3, 1), (4, 1), (5, 1), (5, 2), (5, 3), (5, 4), (5, 5), (5, 6), (5, 7), (4, 7), (3, 7), (3, 8), (3, 9), (4, 9), (5, 9), (5, 10), (5, 11), (5, 12), (5, 13), (5, 14), (5, 15), (5, 16), (5, 17), (5, 18), (5, 19), (4, 19), (3, 19), (3, 18), (3, 17), (3, 16), (3, 15), (3, 14), (3, 13), (2, 13), (1, 13), (1, 14), (1, 15), (1, 16), (1, 17), (1, 18), (1, 19)])
States explored: {(3, 4), (3, 1), (3, 7), (5, 4), (4, 9), (5, 1), (5, 7), (5, 13), (3, 16), (3, 13), (5, 10), (5, 16), (3, 19), (5, 19), (1, 6), (2, 5), (1, 3), (1, 9), (2, 11), (1, 18), (1, 15), (3, 3), (3, 9), (5, 6), (5, 3), (5, 9), (4, 11), (3, 18), (3, 15), (5, 12), (5, 18), (5, 15), (1, 2), (2, 1), (1, 5), (1, 11), (1, 8), (1, 14), (2, 13), (1, 17), (4, 1), (4, 7), (3, 5), (5,