In [1]:
##mounting in order to access inputs
from google.colab import drive
drive.mount('/content/drive')

drive_path = "drive/MyDrive/inputs/"

Mounted at /content/drive


*PROBLEM 1*

Formalizing the Maze problem 

In this problem you are required to restructure the solution of the maze problem solved in our labs, using a formal approach based on the abstract class SearchProblem (given in the FormalizedSearch.ipynb file). The Maze should be a concrete class inheriting from the SearchProblem class.

In [2]:
class SearchProblem:
    def __init__(self, initialState, goal, maze):
        super().__init__(initialState)
        self.goal = goal
        self.maze = maze

    def actions(self, state):
        x, y = state
        if x > 0 and self.maze[x - 1][y] != "X":
            yield "UP"
        if x < len(self.maze) - 1 and self.maze[x + 1][y] != "X":
            yield "DOWN"
        if y > 0 and self.maze[x][y - 1] != "X":
            yield "LEFT"
        if y < len(self.maze[0]) - 1 and self.maze[x][y + 1] != "X":
            yield "RIGHT"

    def result(self, state, action):
        x, y = state
        if action == "UP":
            return x - 1, y
        if action == "DOWN":
            return x + 1, y
        if action == "LEFT":
            return x, y - 1
        if action == "RIGHT":
            return x, y + 1

        return None

    def goal_test(self, state):
        return state == self.goal

*PROBLEM 2*

The 7-puzzle In this problem you are going to solve the 7-puzzle (which is a simplified version of the 8-puzzle). The puzzle is played in a 3x3 board, containing 7 tiles numbered 1 to 7 and two empty cells. Unlike the original 8-puzzle, the 7-puzzle that we are dealing with in this problem, can have tiles of different weights, so the cost of the move is equal to the weight of the tile that is being moved. You are required to write a Python program which automatically reads the weights from the file weights.txt, then reads as input (from the user) the name of the file containing the initial state of the board. Note: if the weights.txt file is not available, then the program should automatically assign the weight 1 to each tile. The program should generate as a result three output files (BFS.txt, UCS.txt and AStar.txt) which contain the complete solutions using: a. Breadth-first search (notice that optimality is not guaranteed in this case). b. Uniform-cost search c. A* with heuristic function the sum of products of the form (weight × Manhattan distance from its goal position) for each tile

The file weights.txt contains just 7 numbers delimited by whitespaces (representing the weights of tiles 1, 2, 3, 4, 5, 6, 7 respectively). The input file contains three lines representing the initial state of the board.
The output file will show in the first three lines the number of explored states, the length of the solution path (i.e. number of moves) and the cost of the solution path. Then all steps of the solution will be shown giving the move and the resulting state.

In [None]:
import sys
import heapq
import numpy as np
import copy

class Node():
    def __init__(self, state, parent, action, depth, cost):
        self.state = state
        self.parent = parent
        self.action = action
        self.depth = depth
        self.cost = cost

class QueueFrontier():
    def __init__(self):
        self.frontier = []

    def add(self, node):
        self.frontier.append(node)

    def contains_state(self, state):
        return any(node.state == state for node in self.frontier)

    def empty(self):
        return len(self.frontier) == 0

    def remove(self):
        if self.empty():
            raise Exception("empty frontier")
        else:
            node = self.frontier[0]
            self.frontier = self.frontier[1:]
            return node

class PriorityQueueFrontier():
    def __init__(self):
        self.frontier = []

    def add(self, newNode, priorityValue):
        heapq.heappush(self.frontier, (priorityValue, id(newNode), newNode) )

    def contains_state(self, state):
        return any(node.state == state for node in self.frontier)

    def empty(self):
        return len(self.frontier) == 0

    def remove(self):
        if self.empty():
            raise Exception("empty")
        else:
            t = heapq.heappop(self.frontier)
            return t[-1]

class Puzzle():

    def __init__(self, weights, initial_state_file):
        self.weights = weights
        with open(drive_path + initial_state_file, 'r') as f:
            self.initial_state = np.array([list(map(int, line.split())) for line in f.readlines()])
    
    def goal_state(self) -> np.ndarray:
        return np.array([
            [1, 2, 3],
            [4, 5, 6],
            [7, 0, 0]
        ])
    
    def manhattan_distance(self, tile, position):
        goalPositions = {
            1: (0, 0), 2: (0, 1), 3: (0, 2),
            4: (1, 0), 5: (1, 1), 6: (1, 2),
            7: (2, 0)
        }
        goalPosition = goalPositions[tile]
        return abs(position[0] - goalPosition[0]) + abs(position[1] - goalPosition[1])

    def heuristicFunction(self, currentNode):
        return sum(self.weights[tile - 1] * self.manhattan_distance(tile, np.argwhere(currentNode.state == tile)[0]) for tile in range(1, 8))

    def neighbors(self, state):
        empty_positions = list(map(tuple, np.argwhere(state == 0)))
        possible_moves = []

        for position in empty_positions:
            row, col = position
            if row > 0:
                new_state = state.copy()
                new_state[row, col], new_state[row - 1, col] = new_state[row - 1, col], new_state[row, col]
                if new_state[row, col] != 0:
                    possible_moves.append(("Move " + str(new_state[row, col]) + " DOWN", new_state, self.weights[new_state[row, col] - 1]))

            if row < 2:
                new_state = state.copy()
                new_state[row, col], new_state[row + 1, col] = new_state[row + 1, col], new_state[row, col]
                if new_state[row, col] != 0:
                    possible_moves.append(("Move " + str(new_state[row, col]) + " UP", new_state, self.weights[new_state[row, col] - 1]))
            if col > 0:
                new_state = state.copy()
                new_state[row, col], new_state[row, col - 1] = new_state[row, col - 1], new_state[row, col]
                if new_state[row, col] != 0:
                    possible_moves.append(("Move " + str(new_state[row, col]) + " RIGHT", new_state, self.weights[new_state[row, col] - 1]))
            if col < 2:
                new_state = state.copy()
                new_state[row, col], new_state[row, col + 1] = new_state[row, col + 1], new_state[row, col]
                if new_state[row, col] != 0:
                    possible_moves.append(("Move " + str(new_state[row, col]) + " LEFT", new_state, self.weights[new_state[row, col] - 1]))

        return possible_moves

    def solve(self, method):
        if method not in ["BFS", "UCS", "AStar"]:
            raise ValueError("Invalid")

        start = Node(state=self.initial_state, parent=None, action=None, depth=0, cost=0)
        
        if method == "BFS":
            frontier = [start]
        else:
            frontier = PriorityQueueFrontier()
            priorityValue = start.cost if method == "UCS" else start.cost + self.heuristicFunction(start)
            frontier.add(start, priorityValue)

        self.explored = set()
        num_explored = 0
        goal = self.goal_state()

        while frontier:
            if method == "BFS":
                node = frontier.pop(0)
            else:
                node = frontier.remove()

            if np.array_equal(node.state, goal):
                actions = []
                states = []
                while node.parent is not None:
                    actions.append(node.action)
                    states.append(node.state)
                    node = node.parent
                actions.reverse()
                states.reverse()

                output_file = f"{initial_state_file[:-4]}{method}.txt"
                with open(output_file, "w") as f:
                    f.write(f"Number of explored states: {num_explored}\n")
                    f.write(f"Length of solution path: {len(actions)}\n")
                    f.write(f"Cost of solution path: {sum(self.weights[state - 1] for state in np.array(states).flatten() if state)}\n")
                    for i, (action, state) in enumerate(zip(actions, states), 1):
                        f.write(f"\nStep {i}: {action}\n")
                        f.write("\n".join(" ".join(str(cell) for cell in row) for row in state))
                        f.write("\n")

                return

            self.explored.add(tuple(map(tuple, node.state)))
            num_explored += 1

            for action, new_state, cost in self.neighbors(node.state):
                if tuple(map(tuple, new_state)) not in self.explored:
                    child = Node(state=new_state, parent=node, action=action, depth=node.depth + 1, cost=node.cost + cost)
                    if method == "BFS":
                        frontier.append(child)
                    else:
                        priorityValue = child.cost if method == "UCS" else child.cost + self.heuristicFunction(child)
                        frontier.add(child, priorityValue)

weights = [1, 1, 1, 1, 1, 1, 1]
try:
    with open(drive_path + "weights.txt", 'r') as f:
        weights = list(map(int, f.read().split()))
except FileNotFoundError:
    pass

initial_state_file = input("Enter the initial state file name: ")

puzzle = Puzzle(weights, initial_state_file)
puzzle.solve("BFS")
puzzle.solve("UCS")
puzzle.solve("AStar")

*PROBLEM 3*

Jewel collector in a Maze In a maze there are three types of jewels: red, green and blue. Our agent is initially positioned at location denoted with letter A. The goal of the agent is to collect at least one jewel for each type. As usual whitespaces represents cells that the agent can walk through, while the # symbol represents the walls. 

Solve this problem using: a. Depth first search b. Breadth first search c. A* search (adapt an appropriate heuristic) d. Generate a drawing depicting the solution path (in light green), all the jewels in their respective colors (R,G,B), the explored nodes in dark grey and unexplored nodes in white.

In [None]:
import sys
import heapq

class Node():
    def __init__(self, state, parent, action, depth, collected_jewels):
        self.state = state
        self.parent = parent
        self.action = action
        self.depth = depth
        self.collected_jewels = collected_jewels

class StackFrontier():
    def __init__(self):
        self.frontier = []

    def add(self, node):
        self.frontier.append(node)

    def contains_state(self, state):
        return any(node.state == state for node in self.frontier)

    def empty(self):
        return len(self.frontier) == 0

    def remove(self):
        if self.empty():
            raise Exception("empty")
        else:
            node = self.frontier[-1]
            self.frontier = self.frontier[:-1]
            return node

class QueueFrontier():
    def __init__(self):
        self.frontier = []

    def add(self, node):
        self.frontier.append(node)

    def contains_state(self, state):
        return any(node.state == state for node in self.frontier)

    def empty(self):
        return len(self.frontier) == 0

    def remove(self):
        if self.empty():
            raise Exception("empty")
        else:
            node = self.frontier[0]
            self.frontier = self.frontier[1:]
            return node

class PriorityQueueFrontier():
    def __init__(self):
        self.frontier = []

    def add(self, newNode, priorityValue):
        heapq.heappush(self.frontier, (priorityValue, id(newNode), newNode) )

    def contains_state(self, state):
        return any(node.state == state for node in self.frontier)

    def empty(self):
        return len(self.frontier) == 0

    def remove(self):
        if self.empty():
            raise Exception("empty")
        else:
            t = heapq.heappop(self.frontier)
            return t[-1]

class Maze():

    def __init__(self, filename):

        with open(filename) as f:
            contents = f.read()

        if contents.count("A") != 1:
            raise Exception("maze must have only 1 start point")

        contents = contents.splitlines()
        self.height = len(contents)
        self.width = max(len(line) for line in contents)

        self.walls = []
        self.red_jewels = set()
        self.green_jewels = set()
        self.blue_jewels = set()

        for i in range(self.height):
            row = []
            for j in range(self.width):
                try:
                    if contents[i][j] == "A":
                        self.start = (i, j)
                        row.append(False)
                    elif contents[i][j] == " ":
                        row.append(False)
                    elif contents[i][j] == "#":
                        row.append(True)
                    elif contents[i][j] == "R":
                        self.red_jewels.add((i, j))
                        row.append(False)
                    elif contents[i][j] == "G":
                        self.green_jewels.add((i, j))
                        row.append(False)
                    elif contents[i][j] == "B":
                        self.blue_jewels.add((i, j))
                        row.append(False)
                except IndexError:
                    row.append(False)
            self.walls.append(row)

        self.solution = None

    def print(self):
        solution = self.solution[1] if self.solution is not None else None
        print()
        for i, row in enumerate(self.walls):
            for j, col in enumerate(row):
                if col:
                    print("█", end="")
                elif (i, j) == self.start:
                    print("A", end="")
                elif (i, j) in self.red_jewels:
                    print("R", end="")
                elif (i, j) in self.green_jewels:
                    print("G", end="")
                elif (i, j) in self.blue_jewels:
                    print("B", end="")
                elif solution is not None and (i, j) in solution:
                    print("*", end="")
                else:
                    print(" ", end="")
            print()
        print()
        
    def neighbors(self, state, collected_jewels):
        row, col = state
        candidates = [
            ("up", (row - 1, col)),
            ("down", (row + 1, col)),
            ("left", (row, col - 1)),
            ("right", (row, col + 1))
        ]

        result = []
        for action, (r, c) in candidates:
            if 0 <= r < self.height and 0 <= c < self.width and not self.walls[r][c]:
                new_collected_jewels = collected_jewels.copy()
                if (r, c) in self.red_jewels:
                    new_collected_jewels.add((r,c))
                elif (r, c) in self.green_jewels:
                    new_collected_jewels.add((r,c))
                elif (r, c) in self.blue_jewels:
                    new_collected_jewels.add((r,c))
                result.append((action, (r, c), new_collected_jewels))
        return result

    def heuristicFunction(self, currentNode, remaining_jewels):
            if not remaining_jewels:
                return 0

            row, col = currentNode.state
            return min(abs(row - r) + abs(col - c) for (r, c) in remaining_jewels)

    def solve_dfs(self):

            self.num_explored = 0

            start = Node(state=self.start, parent=None, action=None, depth=0, collected_jewels=set())
            frontier = StackFrontier()
            frontier.add(start)
            self.explored = set()

            while True:
                if frontier.empty():
                    if all(any(jewel in node.collected_jewels for jewel in s) for s in [self.red_jewels, self.green_jewels, self.blue_jewels]):
                        print("Solution for DFS!\n")
                    else:
                        print("No solution for DFS!\n")
                    tmp = node.collected_jewels
                    actions = []
                    cells = []
                    while node.parent is not None:
                        actions.append(node.action)
                        cells.append(node.state)
                        node = node.parent
                    actions.reverse()
                    cells.reverse()
                    self.solution = (actions, cells)

                    return sum(j in self.red_jewels for j in tmp), sum(j in self.green_jewels for j in tmp), sum(j in self.blue_jewels for j in tmp)

                node = frontier.remove()
                self.num_explored += 1
                self.explored.add(node.state)

                for action, state, new_collected_jewels in self.neighbors(node.state, node.collected_jewels):
                    if not frontier.contains_state(state) and state not in self.explored:
                        child = Node(state=state, parent=node, action=action, depth=node.depth + 1, collected_jewels=new_collected_jewels)
                        frontier.add(child)

    def solve_bfs(self):
        self.num_explored = 0

        start = Node(state=self.start, parent=None, action=None, depth=0, collected_jewels=set())
        frontier = QueueFrontier()
        frontier.add(start)

        self.explored = set()

        while True:
            if frontier.empty():
                if all(any(jewel in node.collected_jewels for jewel in s) for s in [self.red_jewels, self.green_jewels, self.blue_jewels]):
                    print("Solution for BFS!\n")
                else:
                    print("No solution for BFS!\n")
                tmp = node.collected_jewels
                actions = []
                cells = []
                while node.parent is not None:
                    actions.append(node.action)
                    cells.append(node.state)
                    node = node.parent
                actions.reverse()
                cells.reverse()
                self.solution = (actions, cells)

                return sum(j in self.red_jewels for j in tmp), sum(j in self.green_jewels for j in tmp), sum(j in self.blue_jewels for j in tmp)

            node = frontier.remove()
            self.num_explored += 1

            self.explored.add(node.state)

            for action, state, new_collected_jewels in self.neighbors(node.state, node.collected_jewels):
                if not frontier.contains_state(state) and state not in self.explored:
                    child = Node(state=state, parent=node, action=action, depth=node.depth + 1, collected_jewels=new_collected_jewels)
                    frontier.add(child)

    def solve_astar(self):
        self.num_explored = 0

        start = Node(state=self.start, parent=None, action=None, depth=0, collected_jewels=set())
        frontier = PriorityQueueFrontier()
        all_jewels = self.red_jewels | self.green_jewels | self.blue_jewels
        priorityValue = 0 + self.heuristicFunction(start, all_jewels)
        frontier.add(start, priorityValue)

        self.explored = set()

        while True:
            if frontier.empty():
                if all(any(jewel in node.collected_jewels for jewel in s) for s in [self.red_jewels, self.green_jewels, self.blue_jewels]):
                    print("Solution for A*!\n")
                else:
                    print("No solution for A*!\n")
                tmp = node.collected_jewels
                actions = []
                cells = []
                while node.parent is not None:
                    actions.append(node.action)
                    cells.append(node.state)
                    node = node.parent
                actions.reverse()
                cells.reverse()
                self.solution = (actions, cells)

                return sum(j in self.red_jewels for j in tmp), sum(j in self.green_jewels for j in tmp), sum(j in self.blue_jewels for j in tmp)

            node = frontier.remove()
            self.num_explored += 1

            self.explored.add(node.state)

            for action, state, new_collected_jewels in self.neighbors(node.state, node.collected_jewels):
                if state not in self.explored:
                    child = Node(state=state, parent=node, action=action, depth=node.depth + 1, collected_jewels=new_collected_jewels)
                    remaining_jewels = all_jewels - child.collected_jewels
                    priorityValue = child.depth + self.heuristicFunction(child, remaining_jewels)
                    frontier.add(child, priorityValue)

    def output_image(self, filename, show_solution=True, show_explored=False):
        from PIL import Image, ImageDraw
        cell_size = 50
        cell_border = 2

        img = Image.new(
            "RGBA",
            (self.width * cell_size, self.height * cell_size),
            "black"
        )
        draw = ImageDraw.Draw(img)

        solution = self.solution[1] if self.solution is not None else None
        for i, row in enumerate(self.walls):
            for j, col in enumerate(row):

                if col:
                    fill = (40, 40, 40)

                elif (i, j) == self.start:
                    fill = (255, 255, 0)

                elif (i,j) in self.red_jewels:
                    fill = (255, 0, 0)
                elif (i,j) in self.green_jewels:
                    fill = (0, 255, 0) 
                elif (i,j) in self.blue_jewels:
                    fill = (0, 0, 255) 

                elif solution is not None and show_solution and (i, j) in solution:
                    fill = (144, 238, 144)  

                elif solution is not None and show_explored and (i, j) in self.explored:
                    fill = (169, 169, 169) 

                else:
                    fill = (255, 255, 255) 

                draw.rectangle(
                    ([(j * cell_size + cell_border, i * cell_size + cell_border),
                      ((j + 1) * cell_size - cell_border, (i + 1) * cell_size - cell_border)]),
                    fill=fill
                )

        img.save(filename)

def show_results(collected, method, show_explored=True):
    print("States Explored:", m.num_explored)
    print(f"RED:{collected[0]}")
    print(f"GREEN: {collected[1]}")
    print(f"BLUE: {collected[2]}")
    print()
    print("Expanded:")
    m.print()
    m.output_image(mazeName + method +".png", show_explored)

method = ['DFS', 'BFS', 'A*']
mazeName = input("Enter the name: ")
m = Maze(drive_path + mazeName+".txt")
print("Maze:")
m.print()

print("Solving for DFS...")
collected = m.solve_dfs()
show_results(collected, method[0], True)

print("Solving for BFS...")
collected = m.solve_bfs()
show_results(collected, method[1], True)

print("Solving for A*...")
collected = m.solve_astar()
show_results(collected, method[2], True)
