# ACS3930 Assignment 1

Based on the given classes and functions, finish the three questions marked in red. Leave comments to explain your code. I will grade your assignment based on the completeness of your code and the clarity of the comments. 

Have fun with your Assignment 1!

Your name: Pyae Phone Htoo

Your Student ID: 3153093


In [1]:
%matplotlib inline
import matplotlib.pyplot as plt
import random
import heapq
import math
import sys
from collections import defaultdict, deque, Counter
from itertools import combinations
import copy

# Problem class

In [3]:
class Problem(object):
    """The abstract class for a formal problem. A new domain subclasses this,
    overriding `actions` and `results`, and perhaps other methods.
    The default heuristic is 0 and the default action cost is 1 for all states.
    When yiou create an instance of a subclass, specify `initial`, and `goal` states 
    (or give an `is_goal` method) and perhaps other keyword args for the subclass."""

    def __init__(self, initial=None, goal=None, **kwds): 
        self.__dict__.update(initial=initial, goal=goal, **kwds) 
        
    def actions(self, state):        raise NotImplementedError
    def result(self, state, action): raise NotImplementedError
    def is_goal(self, state):        return state == self.goal
    def action_cost(self, s, a, s1): return 1
    def h(self, node):               return 0
    
    def __str__(self):
        return '{}({!r}, {!r})'.format(
            type(self).__name__, self.initial, self.goal)
    

class Node:
    "A Node in a search tree."
    def __init__(self, state, parent=None, action=None, path_cost=0):
        self.__dict__.update(state=state, parent=parent, action=action, path_cost=path_cost)

    def __repr__(self): return '<{}>'.format(self.state)
    def __len__(self): return 0 if self.parent is None else (1 + len(self.parent))
    def __lt__(self, other): return self.path_cost < other.path_cost
    
    
failure = Node('failure', path_cost=math.inf) # Indicates an algorithm couldn't find a solution.
cutoff  = Node('cutoff',  path_cost=math.inf) # Indicates iterative deepening search was cut off.
    
    
def expand(problem, node):
    "Expand a node, generating the children nodes."
    s = node.state
    for action in problem.actions(s):
        s1 = problem.result(s, action)
        cost = node.path_cost + problem.action_cost(s, action, s1)
        yield Node(s1, node, action, cost)
        

def path_actions(node):
    "The sequence of actions to get to this node."
    if node.parent is None:
        return []  
    return path_actions(node.parent) + [node.action]


def path_states(node):
    "The sequence of states to get to this node."
    if node in (cutoff, failure, None): 
        return []
    return path_states(node.parent) + [node.state]

class CountCalls:
    """Delegate all attribute gets to the object, and count them in ._counts"""
    def __init__(self, obj):
        self._object = obj
        self._counts = Counter()
        
    def __getattr__(self, attr):
        "Delegate to the original object, after incrementing a counter."
        self._counts[attr] += 1
        return getattr(self._object, attr)

# Queues

First-in-first-out and Last-in-first-out queues, and a `PriorityQueue`, which allows you to keep a collection of items, and continually remove from it the item with minimum `f(item)` score.

In [6]:
FIFOQueue = deque

LIFOQueue = list

class PriorityQueue:
    """A queue in which the item with minimum f(item) is always popped first."""

    def __init__(self, items=(), key=lambda x: x): 
        self.key = key
        self.items = [] # a heap of (score, item) pairs
        for item in items:
            self.add(item)
            
    def empty(self):
        """Return True if the priority queue is empty, False otherwise."""
        return len(self.items) == 0
         
    def add(self, item):
        """Add item to the queuez."""
        pair = (self.key(item), item)
        heapq.heappush(self.items, pair)

    def pop(self):
        """Pop and return the item with min f(item) value."""
        return heapq.heappop(self.items)[1]
    
    def top(self): return self.items[0][1]

    def __len__(self): return len(self.items)

## <font color='red'>Q1: What are below search algorithms? name them.</font>


In [8]:
# what is below search algorithm? put your answer here: It is Best-First Search (Graph Search)
def search_1(problem, f):
    node = Node(problem.initial)
    frontier = PriorityQueue([node], key=f)
    reached = {problem.initial: node}
    while frontier:
        node = frontier.pop()
        if problem.is_goal(node.state):
            return node
        for child in expand(problem, node):
            s = child.state
            if s not in reached or child.path_cost < reached[s].path_cost:
                reached[s] = child
                frontier.add(child)
    return failure


In [12]:
# what is below search algorithm? put your answer here: Tree Search

def search_2(problem, f):
    frontier = PriorityQueue([Node(problem.initial)], key=f)
    while frontier:
        node = frontier.pop()
        if problem.is_goal(node.state):
            return node
        for child in expand(problem, node):
            if not is_cycle(child):
                frontier.add(child)
    return failure

def is_cycle(node, k=30):
    def find_cycle(ancestor, k):
        return (ancestor is not None and k > 0 and
                (ancestor.state == node.state or find_cycle(ancestor.parent, k - 1)))
    return find_cycle(node.parent, k)


In [14]:
def g(n): return n.path_cost

# what is below search algorithm? put your answer here: It is Uniform-Cost Search.
def search_3(problem):
    return search_1(problem, f=g)


In [16]:
# what is below search algorithm? put your answer here: It is Breadth-first search

def search_4(problem):
    return search_1(problem, f=len)


In [18]:
# what is below search algorithm? put your answer here: Depth-first Search

def search_5(problem):
    return search_1(problem, f=lambda n: -len(n))

In [20]:
# what is below search algorithm? put your answer here: Breath-first search

def search_6(problem):
    node = Node(problem.initial)
    if problem.is_goal(problem.initial):
        return node
    frontier = FIFOQueue([node])
    reached = {problem.initial}
    while frontier:
        node = frontier.pop()
        for child in expand(problem, node):
            s = child.state
            if problem.is_goal(s):
                return child
            if s not in reached:
                reached.add(s)
                frontier.appendleft(child)

In [22]:
# what is below search algorithm? put your answer here: Depth-limit search

def search_7(problem, limit=10):
    frontier = LIFOQueue([Node(problem.initial)])
    result = failure
    while frontier:
        node = frontier.pop()
        if problem.is_goal(node.state):
            return node
        elif len(node) >= limit:
            result = cutoff
        elif not is_cycle(node):
            for child in expand(problem, node):
                frontier.append(child)
    return result

In [24]:
# what is below search algorithm? put your answer here: Iterative deepening search

def search_8(problem):
    for limit in range(1, sys.maxsize):
        result = search_7(problem, limit)
        if result != cutoff:
            return result

## <font color='red'>Q2: Define a subclass called "MazeProblem" from the Problem class we defined at the begining.</font>

### Objective: 
To apply your knowledge of search problems in artificial intelligence, implement a MazeProblem class in Python. This class should extend a given abstract class Problem and apply to a simple 2D maze.

### Background: 
In search problems, the objective is to find a path from an initial state to a goal state within a given environment. In this assignment, your environment is a 2D maze represented by a grid of cells. Each cell can either be open (0) or blocked (1). Your task is to navigate from a given starting point to a target location in the maze.

### Task:

#### Class Implementation:

Define a class `MazeProblem` that inherits from the abstract class Problem.
The constructor `__init__` should take three parameters: `initial`, `goal`, and `maze`. Here, initial is a tuple representing the starting coordinates (x, y) in the maze, goal is a tuple representing the target coordinates, and maze is a list of lists representing the 2D maze grid.

Implement the method actions that returns a list of possible actions from the current state. Actions can be "Up", "Down", "Left", or "Right", and you cannot move into blocked cells or outside the maze.

Implement the method result that returns the resulting state from taking an action at the current state.

#### Maze Creation:

Create a simple 5x5 maze as a list of lists, where 0 represents an open path, and 1 represents a blocked path.
Define a starting point and a goal within the maze.

#### Testing:

Create an instance of MazeProblem with the maze, the starting point, and the goal. Using the `search_3`,`search_4`,..., `search_8` search algorithms defined above to solve the problem. Return the solutions using the given`path_states()` function. 

Design an experiemnt to compare their runing time. You may find python `%timeit` function useful.


In [27]:
class MazeProblem(Problem):
    
    def __init__(self, initial, goal, maze):
        """Define a Maze problem. Maze is a grid of 1s and 0s, where 1s are blocked."""
        super().__init__(initial, goal)
        self.maze = maze

    def actions(self, state):
        """Return actions available from the current state."""
        actions = []
        x, y = state
        if y > 0 and self.maze[x][y-1] == 0: actions.append("Up")
        if y < len(self.maze[0]) - 1 and self.maze[x][y+1] == 0: actions.append("Down")
        if x > 0 and self.maze[x-1][y] == 0: actions.append("Left")
        if x < len(self.maze) - 1 and self.maze[x+1][y] == 0: actions.append("Right")
        return actions

    def result(self, state, action):
        """Return the resulting state from taking an action at the current state."""
        x, y = state
        if action == "Up": return (x, y-1)
        if action == "Down": return (x, y+1)
        if action == "Left": return (x-1, y)
        if action == "Right": return (x+1, y)

In [29]:
# Define a simple maze as a list of lists where 0 is open and 1 is a wall
maze = [[0, 0, 0, 0, 1],
        [1, 1, 0, 1, 1],
        [0, 0, 0, 0, 0],
        [0, 1, 1, 1, 0],
        [0, 0, 0, 0, 0]]

# Create a MazeProblem instance
initial_state = (0, 0)  # Start at the top-left corner
goal_state = (4, 4)     # Goal is the bottom-right corner
maze_instance = MazeProblem(initial_state, goal_state, maze)


In [46]:
# using the search_3,search_4,..., search_8 algorithms to solve the problem

#put your code here
for i in range(8):
    if i <=1:
        print("search_",i+1," solutions : ", path_states(globals()[f"search_{i+1}"](maze_instance,g)))
    elif i == 6:
        print("search_",i+1," solutions : ", path_states(globals()[f"search_{i+1}"](maze_instance , limit=10)))
    else:
        print("search_",i+1," solutions : ", path_states(globals()[f"search_{i+1}"](maze_instance)))

# example solution returned by path_states(): 
# search_1 solution: [(0, 0), (0, 1), (0, 2), (1, 2), (2, 2), (2, 3), (2, 4), (3, 4), (4, 4)]
# search_2 solution: [(0, 0), (0, 1), (0, 2), (1, 2), (2, 2), (2, 3), (2, 4), (3, 4), (4, 4)]
# ...
# search_8 solution: [(0, 0), (0, 1), (0, 2), (1, 2), (2, 2), (2, 3), (2, 4), (3, 4), (4, 4)]


search_ 1  solutions :  [(0, 0), (0, 1), (0, 2), (1, 2), (2, 2), (2, 3), (2, 4), (3, 4), (4, 4)]
search_ 2  solutions :  [(0, 0), (0, 1), (0, 2), (1, 2), (2, 2), (2, 3), (2, 4), (3, 4), (4, 4)]
search_ 3  solutions :  [(0, 0), (0, 1), (0, 2), (1, 2), (2, 2), (2, 3), (2, 4), (3, 4), (4, 4)]
search_ 4  solutions :  [(0, 0), (0, 1), (0, 2), (1, 2), (2, 2), (2, 3), (2, 4), (3, 4), (4, 4)]
search_ 5  solutions :  [(0, 0), (0, 1), (0, 2), (1, 2), (2, 2), (2, 1), (2, 0), (3, 0), (4, 0), (4, 1), (4, 2), (4, 3), (4, 4)]
search_ 6  solutions :  [(0, 0), (0, 1), (0, 2), (1, 2), (2, 2), (2, 3), (2, 4), (3, 4), (4, 4)]
search_ 7  solutions :  [(0, 0), (0, 1), (0, 2), (1, 2), (2, 2), (2, 3), (2, 4), (3, 4), (4, 4)]
search_ 8  solutions :  [(0, 0), (0, 1), (0, 2), (1, 2), (2, 2), (2, 3), (2, 4), (3, 4), (4, 4)]


## <font color='red'>Q3: Bidirectional search.</font>

Given below functions about bidirectional search, define the heuristic functions (using manhattan distance) for forward and backward search in MazeProblem.

Using the `bidirectional_uniform_cost_search` and `bidirectional_astar_search algorithms` to solve the problem. Return the solutions using the given `path_states()` function.

Design an experiemnt to compare the runing time of `search_3`, `search_4`,...,`search_8`, `bidirectional_uniform_cost_search`, and `bidirectional_astar_search algorithms`. You may find python `%timeit` function useful.


In [49]:
def bidirectional_best_first_search(problem_f, f_f, problem_b, f_b, terminated):
    node_f = Node(problem_f.initial)
    node_b = Node(problem_f.goal)
    frontier_f, reached_f = PriorityQueue([node_f], key=f_f), {node_f.state: node_f}
    frontier_b, reached_b = PriorityQueue([node_b], key=f_b), {node_b.state: node_b}
    solution = failure
    while frontier_f and frontier_b and not terminated(solution, frontier_f, frontier_b):
        def S1(node, f):
            return str(int(f(node))) + ' ' + str(path_states(node))
        #print('Bi:', S1(frontier_f.top(), f_f), S1(frontier_b.top(), f_b))
        if f_f(frontier_f.top()) < f_b(frontier_b.top()):
            solution = proceed('f', problem_f, frontier_f, reached_f, reached_b, solution)
        else:
            solution = proceed('b', problem_b, frontier_b, reached_b, reached_f, solution)
    return solution

def proceed(direction, problem, frontier, reached, reached2, solution):
    node = frontier.pop()
    for child in expand(problem, node):
        s = child.state
        #print('proceed', direction, S(child))
        if s not in reached or child.path_cost < reached[s].path_cost:
            frontier.add(child)
            reached[s] = child
            if s in reached2: # Frontiers collide; solution found
                solution2 = (join_nodes(child, reached2[s]) if direction == 'f' else
                             join_nodes(reached2[s], child))
                #print('solution', path_states(solution2), solution2.path_cost, 
                # path_states(child), path_states(reached2[s]))
                if solution2.path_cost < solution.path_cost:
                    solution = solution2
    return solution

def join_nodes(nf, nb):
    """Join the reverse of the backward node nb to the forward node nf."""
    #print('join', S(nf), S(nb))
    join = nf
    while nb.parent is not None:
        cost = join.path_cost + nb.path_cost - nb.parent.path_cost
        join = Node(nb.parent.state, join, nb.action, cost)
        nb = nb.parent
        #print('  now join', S(join), 'with nb', S(nb), 'parent', S(nb.parent))
    return join

# Termination condition for bidirectional search
def terminated(solution, frontier_f, frontier_b):
    if frontier_f.empty() or frontier_b.empty():
        return True
    n_f, n_b = frontier_f.top(), frontier_b.top()
    return (n_f.path_cost + n_b.path_cost) >= solution.path_cost if solution else False


In [51]:
def inverse_problem(problem):
    if isinstance(problem, CountCalls):
        return CountCalls(inverse_problem(problem._object))
    else:
        inv = copy.copy(problem)
        inv.initial, inv.goal = inv.goal, inv.initial
        return inv
    
def bidirectional_uniform_cost_search(problem_f):
    def terminated(solution, frontier_f, frontier_b):
        n_f, n_b = frontier_f.top(), frontier_b.top()
        return g(n_f) + g(n_b) > g(solution)
    return bidirectional_best_first_search(problem_f, g, inverse_problem(problem_f), g, terminated)

def bidirectional_astar_search(problem_f):
    def terminated(solution, frontier_f, frontier_b):
        nf, nb = frontier_f.top(), frontier_b.top()
        return g(nf) + g(nb) > g(solution)
    problem_b = inverse_problem(problem_f)
    return bidirectional_best_first_search(problem_f, lambda n: g(n) + problem_f.h(n),
                                           problem_b, lambda n: g(n) + problem_b.h(n), 
                                           terminated)
   

In [53]:
# define the heuristic functions (using manhattan distance) for forward and backward search in MazeProblem

def manhattan_distance(state, goal):
    #put your code here
    x1, y1 = state
    x2, y2 = goal
    return abs(x1 - x2) + abs(y1 - y2)
    
def f_f(node):
    #put your code here
    return node.path_cost + manhattan_distance(node.state, maze_instance.goal)

def f_b(node):
    #put your code here
    return node.path_cost + manhattan_distance(node.state, maze_instance.initial)

    

In [55]:
# `bidirectional_uniform_cost_search` and `bidirectional_astar_search algorithms`

#put your code here
print("bidirectional_astar_search solution: ", path_states(bidirectional_uniform_cost_search(maze_instance)))
print("bidirectional_astar_search solution: ", path_states(bidirectional_astar_search(maze_instance)))


# example solution returned by path_states(): 
# bidirectional_astar_search solution: [(0, 0), (0, 1), (0, 2), (1, 2), (2, 2), (2, 3), (2, 4), (3, 4), (4, 4)]
# bidirectional_uniform_cost_search solution: [(0, 0), (0, 1), (0, 2), (1, 2), (2, 2), (2, 3), (2, 4), (3, 4), (4, 4)]




bidirectional_astar_search solution:  [(0, 0), (0, 1), (0, 2), (1, 2), (2, 2), (2, 3), (2, 4), (3, 4), (4, 4)]
bidirectional_astar_search solution:  [(0, 0), (0, 1), (0, 2), (1, 2), (2, 2), (2, 3), (2, 4), (3, 4), (4, 4)]


In [113]:
# Design an experiemnt to compare the runing time of search_3, search_4,...,search_8, 
# bidirectional_uniform_cost_search, and bidirectional_astar_search algorithms. 
# You may find python `timeit` function useful.

#put your code here
import timeit
runtimeResult= {}
microValue = 1000000 # for conversion of micro sec to sec.


for i in range(8): #looping for first 8 search functions
    if i <=1:
        print("search_",i+1," solutions : ", path_states(globals()[f"search_{i+1}"](maze_instance,g)))
        print("search_",i+1," runtime")
        runtime = %timeit -o globals()[f"search_{i+1}"](maze_instance,g)
    elif i == 6:
        print("search_",i+1," solutions : ", path_states(globals()[f"search_{i+1}"](maze_instance , limit=10)))
        print("search_",i+1," runtime")
        runtime = %timeit -o globals()[f"search_{i+1}"](maze_instance,limit=10)
    else:
        print("search_",i+1," solutions : ", path_states(globals()[f"search_{i+1}"](maze_instance)))
        print("search_",i+1," runtime")
        runtime = %timeit -o globals()[f"search_{i+1}"](maze_instance)
    print("")
    runtimeResult[f"search_{i+1}"] = runtime.average * microValue #storing first 8 seach algorithm and respective avg runtime pairs in dictionary

#testing 2 bidirectional searchs and storing respective runtime in dictionary
print("bidirectional_astar_search solution: ", path_states(bidirectional_uniform_cost_search(maze_instance)))
runtime= %timeit -o bidirectional_uniform_cost_search(maze_instance)
runtimeResult["Bidirectional_uniform_cost_search"] = runtime.average * microValue
print("")
print("bidirectional_astar_search solution: ", path_states(bidirectional_astar_search(maze_instance)))
runtime= %timeit -o bidirectional_astar_search(maze_instance)
runtimeResult["Bidirectional_astar_search"] = runtime.average * microValue
print("")

#sorting the dictionary in acending order based the value:runtime 
sortedRunTimeResult = sorted(runtimeResult.items(), key = lambda x:x[1])
print("Rank of runtime among different algorithms using Maze problem")

#printing the sorted dictionary to rank the search algorithm , accending in speed
numCount =1
for algorithm,alRuntime in sortedRunTimeResult:
    print(numCount," ", f"{algorithm} : " , f"{alRuntime:.2f} µs")
    numCount=numCount+1
    
#print best speed and worst speed search algorithm and respective avg runtime.
best_search, best_runtimeResult = sortedRunTimeResult[0]
worst_search, worst_runtimeResult = sortedRunTimeResult[len(sortedRunTimeResult)-1]
print("")
print(best_search, "has the fastest runtime among 10 searches with the average run time of ", f"{best_runtimeResult:.2f} µs" )
print(worst_search, "has the slowest runtime among 10 searches with the average run time of ", f"{worst_runtimeResult:.2f} µs" )


search_ 1  solutions :  [(0, 0), (0, 1), (0, 2), (1, 2), (2, 2), (2, 3), (2, 4), (3, 4), (4, 4)]
search_ 1  runtime
42.1 μs ± 3.11 μs per loop (mean ± std. dev. of 7 runs, 10,000 loops each)

search_ 2  solutions :  [(0, 0), (0, 1), (0, 2), (1, 2), (2, 2), (2, 3), (2, 4), (3, 4), (4, 4)]
search_ 2  runtime
67.4 μs ± 2.2 μs per loop (mean ± std. dev. of 7 runs, 10,000 loops each)

search_ 3  solutions :  [(0, 0), (0, 1), (0, 2), (1, 2), (2, 2), (2, 3), (2, 4), (3, 4), (4, 4)]
search_ 3  runtime
48.2 μs ± 2.88 μs per loop (mean ± std. dev. of 7 runs, 10,000 loops each)

search_ 4  solutions :  [(0, 0), (0, 1), (0, 2), (1, 2), (2, 2), (2, 3), (2, 4), (3, 4), (4, 4)]
search_ 4  runtime
56.6 μs ± 2.52 μs per loop (mean ± std. dev. of 7 runs, 10,000 loops each)

search_ 5  solutions :  [(0, 0), (0, 1), (0, 2), (1, 2), (2, 2), (2, 1), (2, 0), (3, 0), (4, 0), (4, 1), (4, 2), (4, 3), (4, 4)]
search_ 5  runtime
57.2 μs ± 2.57 μs per loop (mean ± std. dev. of 7 runs, 10,000 loops each)

search_ 6