<html>
<div style="background-image: linear-gradient(to left, rgb(255, 255, 255), rgb(138, 136, 136)); width: 600px; vertical-align: middle; height: 40px; margin: 10px;">
<h1 style="font-family: Georgia; color: black;"> AI-Fall 01-CA1 </h1>
</div>
<div style="background-image: linear-gradient(to left, rgb(255, 255, 255), rgb(138, 136, 136)); width: 500px; margin: 10px;">
  <img src="https://upload.wikimedia.org/wikipedia/en/thumb/f/fd/University_of_Tehran_logo.svg/225px-University_of_Tehran_logo.svg.png" width=60px width=auto style="padding:10px; vertical-align: middle;">
  <span style="font-family: Georgia; font-size:30px; color: black;"> University of Tehran </span>
</div>
<div style=" background-image: linear-gradient(to left, rgb(255, 255, 255), rgb(138, 136, 136)); width: 400px; height: 30px; margin: 10px;">
  <span style="font-family: Georgia; font-size:15pt; color: black; vertical-align: middle;"> Saman Eslami Nazari - std id: 810199375 </span>
</div>
</html>

First, I make a `Graph` class to save the given problem's map.

In [1]:
from typing import Callable, Any, Optional

In [2]:
class Graph:

    class Node:
        def __init__(self, id: int):
            self.id = id
            self.adj_list = {}

        def add_edge(self, dest: int, cost: int) -> None:
            self.adj_list[dest] = cost

    def __init__(self, nodes_count: int):
        self.nodes = [
            self.Node(node_id) for node_id in range(0, nodes_count + 1)
        ]
        self.morids_recipe = {}
        self.recipes = set()
        self.special_nodes = set()

    def add_edge(self, u: int, v: int, cost: int) -> None:
        self.nodes[u].add_edge(v, cost)
        self.nodes[v].add_edge(u, cost)

    def add_morid_recipe(self, morid_node: int, morid_recipes: set) -> None:
        self.morids_recipe[morid_node] = morid_recipes
        self.recipes = self.recipes.union(morid_recipes)

    def add_special_node(self, special_node: int) -> None:
        self.special_nodes.add(special_node)

    def get_neighs(self, node: int) -> list[int]:
        return list(self.nodes[node].adj_list.keys())

    def get_edge_cost(self, u: int, v: int) -> int:
        return self.nodes[u].adj_list[v]

    def get_morids(self) -> set[int]:
        return set(self.morids_recipe.keys())

    def get_recipes(self) -> set[int]:
        return self.recipes


Next, I will create a `State` class that will save the required attributes like the current location of Seyyed, status of Dizies (satisfaction of Morids is the same as status of Dizies) and etc.
- `special_nodes`: A dictionary indicating how many times Seyyed has passed each special node till the current state.
- `sat_morids`: A set containing Morids that their recipes are collected and are satisfied now.
- `collected_recipes`: A set containing gathered recipes.
- `current_loc`: An int showing the current location of Seyyed.
- `current_time_on_node`: An int showing how long Seyyed has stayed on the current graph node. This attribute will be used when Seyyed is passing a special node.

In [3]:
class State:
    def __init__(
        self,
        special_nodes: dict,
        sat_morids: set,
        collected_recipes: set,
        current_loc: int,
        current_path: list[int],
        current_time_on_node: int
    ):
        self.special_nodes: dict[int, int] = special_nodes
        self.sat_morids: set[int] = sat_morids
        self.collected_recipes: set[int] = collected_recipes
        self.current_loc: int = current_loc
        self.current_path: list[int] = current_path
        self.current_time_on_node: int = current_time_on_node

    def __eq__(self, __o: object) -> bool:
        return (
            self.current_loc == __o.current_loc and
            self.sat_morids == __o.sat_morids and
            self.collected_recipes == __o.collected_recipes and
            self.current_time_on_node == __o.current_time_on_node
        )

    def __str__(self) -> str:
        return (
            str(self.current_loc) +
            str(self.sat_morids) +
            str(self.collected_recipes) +
            str(self.current_time_on_node)
        )

    def __hash__(self) -> int:
        return hash(self.__str__())


`SearchNode` class contains a state and a path cost. This will be used later in different search algorithms.

In [4]:
class SearchNode:
    def __init__(self, state: State, path_cost: int):
        self.state = state
        self.path_cost = path_cost
    
    def __eq__(self, __o: object) -> bool:
        return self.state == __o.state

    def __hash__(self) -> int:
        return hash(self.state)

`Problem` class contains the `Graph`, `get_next_states` method to get the next `SearchNode`s based on the rules of the problem, an `is_goal` method to check the given state and see if it's a goal or not and a `get_init_state`.
- `get_next_state`: This method will first check if Seyyed is currently on a special node; If yes then it will check wether he can pass the node or he still has to wait. If he had to wait, it will return a state same as the previous one but with an incremented `current_time_on_node`; else it will record Seyyed's passing on this special node and it will update the graph's `special_nodes` dictionary. If he wasn't on a special node, it will give a list of states in which Seyyed has moved to the adjacent graph nodes.
- `is_goal`: Returns true if the problem is satisfied. In this particular problem, satisfaction of the problem is equal to the satisfaction of every Morid.
- `get_init_state`: This will return the very first state. Every search algorithm will use this state as the starting state. This state has zero value for every special node because Seyyed hasn't passed any of them yet. It also has an empty set for satisfied Morids and collected recipes because no recipe is gathered yet and therefore no Morid is satisfied. The current location of Seyyed is saved in the `Graph` class and used as the `current_loc` of this state. The path also contains just the current location of Seyyed because he hasn't moved yet. In the end the time on current node is zero because Seyyed has just started its search.

In [5]:
from copy import deepcopy

class Problem:
    def __init__(self, graph: Graph, seyyed_init_loc: int):
        self._graph = graph
        self._seyyed_init_loc = seyyed_init_loc

    def get_next_states(self, current_state: State) -> list[tuple[State, int]]:
        if (
            current_state.current_loc in self._graph.special_nodes and
            current_state.current_time_on_node < current_state.special_nodes[
                current_state.current_loc]
        ):
            new_state = deepcopy(current_state)
            new_state.current_time_on_node += 1
            return [(new_state, 1)]

        if (current_state.current_loc in self._graph.special_nodes):
            self._update_special_nodes(current_state)

        result_states = []

        for adj_node in self._graph.get_neighs(current_state.current_loc):
            new_state = deepcopy(current_state)
            new_state.current_loc = adj_node

            # update gathered recipes
            self._update_gathered_recipes(new_state)

            # check and update Morids sat status
            self._update_morids_state(new_state)

            # update current path
            new_state.current_path.append(adj_node)

            # update current time
            new_state.current_time_on_node = 0

            result_states.append((new_state, 1))

        return result_states

    def is_goal(self, state: State) -> bool:
        return state.sat_morids == set(self._graph.morids_recipe.keys())

    def get_init_state(self) -> State:
        return State(
            {x: 0 for x in self._graph.special_nodes},
            set(),
            set(),
            self._seyyed_init_loc,
            [self._seyyed_init_loc],
            0
        )

    def get_morids(self) -> set[int]:
        return self._graph.get_morids()

    def get_recipes(self) -> set[int]:
        return self._graph.get_recipes()

    def _update_special_nodes(self, state: State) -> None:
        if state.current_loc in self._graph.special_nodes:
            state.special_nodes[state.current_loc] += 1

    def _update_morids_state(self, state: State) -> None:
        if state.current_loc in self._graph.morids_recipe:
            if self._graph.morids_recipe[state.current_loc].issubset(
                state.collected_recipes
            ):
                state.sat_morids.add(state.current_loc)

    def _update_gathered_recipes(self, state: State) -> None:
        if state.current_loc in self._graph.recipes:
            state.collected_recipes.add(state.current_loc)


Now lets create a `bfs_solve` that will use the above entities to solve the problem. <br>
This algorithm will expand the whole tree one level, in each iteration. It also checks repetition of states so that it won't get stuck in a loop or exhaust the memory. This algorithm use more memory than the other solutions because it has to save every frontier node plus the whole search tree to check for repeated states. In this particular problem, BFS is optimal because every step has the same cost (except fro special nodes that I handled them differently in the previous part so that BFS and other algorithms would be optimal).

In [6]:
def bfs_solve(problem: Problem) -> tuple[list[int], int, int]:
    node = SearchNode(problem.get_init_state(), 0)
    states_cnt: int = 0
    states_cnt += 1
    if problem.is_goal(node.state):
        return node.state.current_path, node.path_cost, states_cnt
    frontier_queue: list[SearchNode] = [node]
    explored_list: set[SearchNode] = set()
    while True:
        if not frontier_queue:
            return []

        node = frontier_queue.pop(0)
        for next_state in problem.get_next_states(node.state):
            new_node = SearchNode(next_state[0], node.path_cost + next_state[1])
            if new_node in explored_list or new_node in frontier_queue:
                continue
            states_cnt += 1
            if problem.is_goal(new_node.state):
                return new_node.state.current_path, new_node.path_cost, states_cnt
            frontier_queue.append(new_node)
        explored_list.add(node)


It's time to get inputs and test the `bfs_solve`.

In [7]:
def get_inputs(inputs_file_path: str) -> Problem:
    f = open(inputs_file_path)

    n, m = [int(x) for x in f.readline().split()]
    graph = Graph(n)

    for i in range(m):
        u, v = [int(x) for x in f.readline().split()]
        graph.add_edge(u, v, 1)
    
    h = int(f.readline())
    u_list = [int(u) for u in f.readline().split()]
    for u in u_list:
        graph.add_special_node(u)

    s = int(f.readline())
    for i in range(s):
        current_line = [int(x) for x in f.readline().split()]
        current_morid_recipes = set(current_line[2:])
        graph.add_morid_recipe(current_line[0], current_morid_recipes)
    
    init_loc = int(f.readline())
    f.close()
    return Problem(graph, init_loc)

Now lets run tests on the `bfs_solve`.

In [8]:
TEST_CASE = "test_cases/input3.txt"
problem = get_inputs(TEST_CASE)
path, path_cost, states_cnt = bfs_solve(problem)
print(f"state : {path}\ncost : {path_cost}\nstates count: {states_cnt}")

state : [13, 11, 10, 3, 2, 6, 12, 5, 9, 4, 1, 13, 11, 10]
cost : 13
states count: 1052


After this, I will again solve the problem with Iterative Deepening Search method. For this purpose, I will first make a `depth_limited_search` method to do the searching with a certain depth limit. <br>
In this method we will expand one node at a time and thus creating a branch till we reach a dead end, a solution or the depth limit. In case of not finding the solution in one branch, the algorithm will go back one node and expand other successive states. This method is DFS with depth limit. We will call this method in another function with a certain depth limit; If succeeded, we will return the solution, else we will run the DFS again with a deeper limit. This will continue until a solution is found. <br>
The IDS method uses less memory than BFS but takes much longer to solve the problem. This is because we were able to check one state against more states for repetition in BFS than IDS. The reason behind this is that the IDS sometimes, searches a branch that doesn't contain the answer first and then go back to other branches that have a solution; If we save the false branch in the explored set, then the IDS will recognize the branch with solution as a repeated state and therefore will not expand that branch. But to be optimistic, it does save a lot of memory by keeping only the current branch and ignoring the other parts of the search tree.

In [9]:
# returns path list, a boolean indicating cut-off status, path cost and the states count
def depth_limited_search_rec(
    search_node: SearchNode, problem: Problem, explored: set[SearchNode], limit: int
) -> tuple[list[int], bool, int, int]:
    states_cnt = 1
    if limit <= 0:
        return [], True, 0, states_cnt

    if problem.is_goal(search_node.state):
        return search_node.state.current_path, False, search_node.path_cost, states_cnt

    explored.add(search_node)
    cut_off_occurred = False
    for next_state in problem.get_next_states(search_node.state):
        new_node = SearchNode(next_state[0], search_node.path_cost + next_state[1])
        if new_node in explored:
            continue
        result, cut_off, path_cost, temp_states_cnt = depth_limited_search_rec(
            new_node,
            problem,
            explored,
            limit - 1
        )
        states_cnt += temp_states_cnt
        if cut_off:
            cut_off_occurred = True
        if result:
            return result, False, path_cost, states_cnt

    explored.remove(search_node)
    return [], cut_off_occurred, 0, states_cnt


def depth_limited_search(problem: Problem, limit: int):
    init_node = SearchNode(problem.get_init_state(), 0)
    return depth_limited_search_rec(init_node, problem, {init_node}, limit)


In [10]:
def ids_solve(problem: Problem) -> tuple[list[int], int]:
    limit = 0
    states_cnt = 0
    result, cut_off, path_cost, temp_states_cnt = depth_limited_search(
        problem, limit
    )
    states_cnt += temp_states_cnt
    while cut_off:
        limit += 1
        result, cut_off, path_cost, temp_states_cnt = depth_limited_search(
            problem, limit
        )
        states_cnt += temp_states_cnt
    return result, path_cost, states_cnt


In [11]:
problem = get_inputs(TEST_CASE)
path, path_cost, states_cnt = ids_solve(problem)
print(f"path: {path}\ncost: {path_cost}\nstates count: {states_cnt}")

path: [13, 11, 10, 3, 2, 6, 12, 5, 9, 4, 1, 13, 11, 10]
cost: 13
states count: 50842


Now lets create a solver using A* method. First we will need a heuristic function. For every state, I will estimate their distance with the goal state with the sum of the remaining unsatisfied Morids and the recipes that are yet to be gathered.

In [12]:
def get_h(state: State, problem: Problem) -> int:
    left_recipes = problem.get_recipes() - state.collected_recipes
    left_morids = problem.get_morids() - state.sat_morids
    return (
        len(left_recipes.union(left_morids))
    )


Proof of consistency:
This heuristic function is equal to $h(n) = |unsatisfied\space morids \cup uncollected\space recipes|$.
To prove that this function is consistent, we must prove that for every successor $P$ of $N$ the following inequality is true:
$$h(N)\leq c(N,P)+h(P)$$
We know that in every step we may satisfy a Morid or collect a new recipe, but we can't dissatisfy a Morid or remove a recipe. Therefore for every successor $P$ of $N$ we have $h(N) \leq h(P)$. We also know that between any two nodes like $U$ and $V$, $c(U,V)>0$.
So it is proved that the $h(N)\leq c(N,P)+h(P)$ is always true for every step in the search, and the heuristic function is consistent. <br>
A* performs better than the previous algorithms in case of memory and time consumption. This is because it expands less states and is focused towards the goal state.

In [13]:
import heapq

# wrapper class for priority queue
class FSearchNodeWrap:
    def __init__(self, f: int, node: SearchNode):
        self.f = f
        self.node = node

    def __lt__(self, other):
        return self.f < other.f


def a_star_solve(problem: Problem, alpha: Optional[float] = 1) -> tuple[list[int], int, int]:
    def f(n: SearchNode) -> float:
        return n.path_cost + alpha * get_h(n.state, problem)
        
    states_cnt = 0
    init_node = SearchNode(problem.get_init_state(), 0)

    frontier_p_queue: list[FSearchNodeWrap] = []
    heapq.heappush(frontier_p_queue, FSearchNodeWrap(f(init_node), init_node))

    explored_set: set[SearchNode] = set()

    while frontier_p_queue:
        current_node: SearchNode = heapq.heappop(frontier_p_queue).node
        states_cnt += 1
        explored_set.add(current_node)
        if problem.is_goal(current_node.state):
            return current_node.state.current_path, current_node.path_cost, states_cnt
        for next_state in problem.get_next_states(current_node.state):
            new_node = SearchNode(
                next_state[0], current_node.path_cost + next_state[1]
            )
            if new_node in explored_set:
                continue
            heapq.heappush(frontier_p_queue,
                           FSearchNodeWrap(f(new_node), new_node))

    return [], 0, states_cnt


In [14]:
problem = get_inputs(TEST_CASE)
path, path_cost, states_cnt = a_star_solve(problem)
print(f"path: {path}\ncost: {path_cost}\nstates count: {states_cnt}")


path: [13, 11, 10, 3, 2, 6, 12, 5, 9, 4, 1, 13, 11, 10]
cost: 13
states count: 479


To fully test each implementation, I will write a function that runs each test case 3 times on the given implementation and prints out the result. I will also define some colors for enhanced readability on the output.

In [15]:
class bcolors:
    HEADER = '\033[95m'
    OKBLUE = '\033[94m'
    OKCYAN = '\033[96m'
    OKGREEN = '\033[92m'
    WARNING = '\033[93m'
    FAIL = '\033[91m'
    ENDC = '\033[0m'
    BOLD = '\033[1m'
    UNDERLINE = '\033[4m'


In [16]:
from timeit import timeit
from textwrap import dedent


# imp is a function that gets a Problem ans other required args as input and
# return path, cost and states count of the solution
def imp_test(
    imp: Callable[[Problem], tuple[list[int], int, int]], test_cases: list[str], *args: Any
) -> None:

    print(f"{bcolors.OKCYAN}testing {imp.__name__}{bcolors.ENDC}:")
    for test_case in test_cases:
        SETUP = f"from __main__ import {imp.__name__}, get_inputs, Problem"
        CODE = f'''
        problem = get_inputs("{test_case}")
        {imp.__name__}({",".join(["problem"] + [str(x) for x in args])})
        '''

        run_time = timeit(setup=dedent(SETUP), stmt=dedent(CODE), number=3) / 3

        problem: Problem = get_inputs(test_case)
        path, path_cost, states_cnt = imp(*(problem, *args))

        print(dedent(
            f'''
            {bcolors.BOLD}{test_case}{bcolors.ENDC}:
                path : {"->".join([str(x) for x in path])}
                path cost : {path_cost}
                states count : {states_cnt}
                run time avg : {run_time}'''
        ))

In [17]:
TEST_CASE_LIST = ["test_cases/input.txt", "test_cases/input2.txt", "test_cases/input3.txt"]

print(f"{bcolors.HEADER}BFS{bcolors.ENDC}")
imp_test(bfs_solve, TEST_CASE_LIST)
print(f"{bcolors.HEADER}IDS{bcolors.ENDC}")
imp_test(ids_solve, TEST_CASE_LIST)
print(f"{bcolors.HEADER}A*{bcolors.ENDC}")
imp_test(a_star_solve, TEST_CASE_LIST)
print(f"{bcolors.HEADER}weighted A* with alpha=1{bcolors.ENDC}")
imp_test(a_star_solve, TEST_CASE_LIST, 1)
print(f"{bcolors.HEADER}weighted A* with alpha=2{bcolors.ENDC}")
imp_test(a_star_solve, TEST_CASE_LIST, 2)

[95mBFS[0m
[96mtesting bfs_solve[0m:

[1mtest_cases/input.txt[0m:
    path : 1->3->4->5->7->10->11->9->8
    path cost : 8
    states count : 35
    run time avg : 0.003716199998355781

[1mtest_cases/input2.txt[0m:
    path : 9->10->9->4->12->3->7->5->8
    path cost : 8
    states count : 90
    run time avg : 0.006973933336363795

[1mtest_cases/input3.txt[0m:
    path : 13->11->10->3->2->6->12->5->9->4->1->13->11->10
    path cost : 13
    states count : 1052
    run time avg : 0.11172299999937725
[95mIDS[0m
[96mtesting ids_solve[0m:

[1mtest_cases/input.txt[0m:
    path : 1->3->4->5->7->10->11->9->8
    path cost : 8
    states count : 932
    run time avg : 0.03563509999852007

[1mtest_cases/input2.txt[0m:
    path : 9->10->9->4->12->3->7->5->8
    path cost : 8
    states count : 2153
    run time avg : 0.09331706666853279

[1mtest_cases/input3.txt[0m:
    path : 13->11->10->3->2->6->12->5->9->4->1->13->11->10
    path cost : 13
    states count : 50842
    run

Now I will fill the table with the gathered information.

test case 1 : input.txt

| |Problem Answer (The minimum cost to reach final state)|States Count|run time avg|
|:---|:----:|:----:|:----:|
|BFS|8|35|0.0103|
|IDS|8|932|0.1463|
|A*|8|36|0.0106|
|Weighted A* 1|8|36|0.0133|
|Weighted A* 2|8|22|0.0085|

test case 2 : input2.txt

| |Problem Answer (The minimum cost to reach final state)|States Count|run time avg|
|:---|:----:|:----:|:----:|
|BFS|8|90|0.0216|
|IDS|8|2153|0.3695|
|A*|8|48|0.0115|
|Weighted A* 1|8|48|0.0.0148|
|Weighted A* 2|8|16|0.0063|

test case 3 : input3.txt

| |Problem Answer (The minimum cost to reach final state)|States Count|run time avg|
|:---|:----:|:----:|:----:|
|BFS|13|1052|0.4572|
|IDS|13|50842|9.0233|
|A*|13|479|0.1385|
|Weighted A* 1|13|479|0.1385|
|Weighted A* 2|14|31|0.0101|