<html>
<div>
  <img src="https://www.engineersgarage.com/wp-content/uploads/2021/11/TCH36-01-scaled.jpg" width=360px width=auto style="vertical-align: middle;">
  <span style="font-family: Georgia; font-size:30px; color: white;"> <br/> University of Tehran <br/> AI_CA1 <br/> Spring 02 </span>
</div>
<span style="font-family: Georgia; font-size:15pt; color: white; vertical-align: middle;"> low_mist - std id: 810100186 </span>
</html>

in this notebook we are to solve a searching problem with different uninformed and informed search approaches such as BFS, IDS, A* and Weighted A*

Ok first we define the directory in which we have the tests in order to run different algorithms

In [610]:
TEST_DIRECTORY = "Tests/"

## Graph and States
first we have to define the graph components.  
`Node` contains the types.  
`Edge` contains the types plus it's unsafe time list if it's rickety (it's a list that shows if we cross for the ith time we have to wait for some seconds so it's safe again to cross).  
and a dictionary mapping rickety edges to how many times they have been crossed.

In [611]:
from dataclasses import dataclass, field, fields, _MISSING_TYPE
from typing import Any, Callable
from enum import Flag, auto

class EdgeType(Flag):
    NORMAL = auto()
    RICKETY = auto()

@dataclass
class Edge:
    destination: int
    id: int
    type: EdgeType = field(default_factory = EdgeType.NORMAL)
    wait_for: int = field(default_factory = 0)
    weight: int = field(default_factory = 0)
    
class NodeType(Flag):
    NORMAL = auto()
    PIZZA = auto()
    STUDENT = auto()

@dataclass
class Node:
    edges: list[Edge]
    type: NodeType

now that we have edges and nodes we define the graph that contains some extra information compared to normal graphs namely, order of students, map from pizza to students.

In [612]:
class Graph:
    '''this class is only the graph connection of problem not agent's state'''
    def __init__(self, n : int):
        self.num_of_nodes = n
        self.nodes = [Node([], NodeType.NORMAL) for _ in range(n)]
        self.pizzas_corresponding_students: dict[int, int] = dict()
        self.priority_queues: list[tuple[int, int]] = list()
        self.students: list[int] = list()
        
    def add_edges(self, origin: int, destination: int, type: EdgeType, id: int, wait_for: int = 0, weight: int = 0): 
        self.nodes[origin].edges.append(Edge(destination, id, type, wait_for, weight)) 
        self.nodes[destination].edges.append(Edge(origin, id, type, wait_for, weight))
        
    def add_pizza_and_student(self, student_position: int, pizza_position: int):
        self.pizzas_corresponding_students[pizza_position] = student_position
        self.students.append(student_position)
        self.nodes[pizza_position].type = NodeType.PIZZA
        self.nodes[student_position].type = NodeType.STUDENT
        
    def add_priority(self, priority: tuple[int, int]):
        self.priority_queues.append((self.students[priority[0]], self.students[priority[1]]))

now it's time to describe the states in which our agents will be during execution of search.  
this `State` class contains information about the world such as our position and left pizzas' locations and so forth, about agent itself like the path it has taken to get here and such things.  

the properties in our states are two types:
* criteria in determining uniqueness
    * `agent_position`: which shows the agent's position.
    <!-- * `left_pizzas`: which depicts the left pizzas position and their corresponding student positions. -->
    * `left_students`: which depicts the left students position and their corresponding pizza positions.
    * `time_elapsed`: the amount of time (in seconds) that has passed since we started the search.
    <!-- * `pizzas_carrying`: a dictionary containing the pizzas that we are carrying right now with their destination (note: it's said in problem that we can only carry one pizza at a time but we imagine that we can carry all of them and if we decide to deliver some pizza we will drop all other pizzas). -->
    * `pizza_carrying`: the tuple which contains the pizza position and the student to whom we have to deliver the pizza. (if it exists)
* not important in determining uniqueness
    * `path`: which demonstrates the path we have taken to get here
    * `satisfied_students`: which indicates the students are given their pizzas
    * `priorities_left`: list of all priorities that are left and should be taken into account
    * `wait_till_for_rickety_edges`: a map for keep tracking of time that we have to wait
    <!-- * `pizzas_passed`: number of pizzas that we have passed -->

In [613]:
@dataclass 
class AgentState:
    agent_position: int
    left_students: dict[int, int]
    left_priorities: list[tuple[int, int]]
    time_elapsed: int = field(default_factory = lambda : 0)
    pizza_carrying : tuple[int, int] = field(default_factory = lambda: (None, None))
    path: list[int] = field(default_factory = lambda: list())
    available_time_edges: dict[int, int] = field(default_factory = lambda: dict())
    
    def _tuple(self):
        return (self.agent_position, self.left_students, self.time_elapsed, self.pizza_carrying)
    
    def __eq__(self, other: Any) -> bool:
        return (isinstance(other, AgentState)) and (hash(self._tuple) == hash(other._tuple))
    
    def __hash__(self) -> int:
        return hash(self._tuple)
    
    def __str__(self) -> str:
        return f"agent_postition: {self.agent_position}, time: {self.time_elapsed}, pizza_carrying: {self.pizza_carrying}, left_priority: {self.left_priorities}, path: {self.path}, left_students: {self.left_students}"

next we define some useful functions for our agent.  
- `has_reached_goad`: simply checks whether there is any pizza left or not.
- `initial_state`: to find out where we are at start of the game
- `actions`: this function gets an state and returns the list of all actions that is available

In [614]:
from copy import deepcopy

class Agent:
    @staticmethod
    def has_reached_goal(state: AgentState) -> bool:
        return len(state.left_students) == 0

    @staticmethod
    def initial_state(graph: Graph, start_position: int) -> AgentState:
        return AgentState( 
            agent_position = start_position,
            left_priorities = graph.priority_queues,
            left_students = {value: key for key, value in graph.pizzas_corresponding_students.items()},
            path = [start_position])  
              
    @staticmethod
    def actions(graph: Graph, state: AgentState) -> list[AgentState]:
        ans = list()
        for edge in graph.nodes[state.agent_position].edges:
            new_state = deepcopy(state)
            new_agent_position = edge.destination
            new_state.time_elapsed += 1
            # print(f"edge type from {state.agent_position} to {edge.destination}: {edge.type}")
            if edge.type == EdgeType.RICKETY:
                if not edge.id in new_state.available_time_edges or \
                    new_state.available_time_edges[edge.id] < new_state.time_elapsed:
                    new_state.available_time_edges[edge.id] = new_state.time_elapsed + edge.wait_for # we cross the edge so we update the time 
                else:
                    new_agent_position = state.agent_position # cause we won't move
            

            new_state.path.append(new_agent_position)
            new_state.agent_position = new_agent_position
            
            if new_agent_position != state.agent_position:            
                if graph.nodes[new_agent_position].type == NodeType.STUDENT:
                    print(f"bitch i'm a student")
                    pizza, student = new_state.pizza_carrying
                    if student in new_state.left_students:
                        print("ok i'm delivering")
                        del new_state.left_students[student]
                        new_state.left_priorities = list(filter(lambda x: x[0] != student, new_state.left_priorities))
                        new_state.pizza_carrying = (None, None)
                    print(new_state.left_priorities)
                        
                elif graph.nodes[new_agent_position].type == NodeType.PIZZA:
                    if graph.pizzas_corresponding_students[new_agent_position] in new_state.left_students and \
                        not any(x[1] == new_agent_position for x in new_state.left_priorities):
                        another_state = deepcopy(new_state)
                        another_state.pizza_carrying = (new_agent_position, graph.pizzas_corresponding_students[new_agent_position])
                        # print(another_state)
                        ans.append(another_state)
                        # print(ans[-1])
                        
            ans.append(new_state)
            
        print("-----------------------------------------------")
        print(*ans, sep="****************")
        print("-----------------------------------------------")
        return ans

# Running algorithms
we have all the fundamentals that are needed so it's time to read files and implement the searches.  
note that test files are in `Tests` directory.  

In [615]:
TYPE_INDEX = 2
# every -1 is cause we start counting from 0

def read_input_file(filename: str) -> tuple[Graph, int]:
    with open(filename, 'r', encoding = 'utf-8') as file:
        input = [line.rstrip('\n') for line in file]
    return read_input(input)
    
    
def read_input(input: list[str]) -> Graph:
    iterative_input = iter(input)
    num_of_nodes, num_of_edges = map(int, next(iterative_input).split(" "))
    graph = Graph(num_of_nodes)
    
    edges_line = []
    for i in range(num_of_edges):
        origin, destination = map(lambda x: int(x) - 1, next(iterative_input).split(" "))
        edges_line.append([origin, destination, EdgeType.NORMAL, i])
        
    num_of_rickey_edges = int(next(iterative_input))
    for i in range(num_of_rickey_edges):
        num, time = map(int, next(iterative_input).split(" "))
        edges_line[num - 1][TYPE_INDEX] = EdgeType.RICKETY
        edges_line[num - 1].append(time)
        
    for edge in edges_line:
        graph.add_edges(*edge)
        
    start_position = int(next(iterative_input)) - 1    
        
    num_of_students = int(next(iterative_input))
    
    for i in range(num_of_students):
        student_node, pizza_node = map(lambda x: int(x) - 1, next(iterative_input).split(" "))
        graph.add_pizza_and_student(student_node, pizza_node)
        
    num_of_priority = int(next(iterative_input))
    for i in range(num_of_priority):
        first, second = map(lambda x: int(x) - 1, next(iterative_input).split(" "))
        graph.add_priority((first, second))
        
    return graph, start_position
    

now timing functions.  
we will repeat them *TEST_REPEATS* times and calculate the average.  
each search algorithm will also return the goal state with number of states it has visited.  

In [616]:
import time

TEST_REPEATS = 3


def run(search_alg: Callable[[Graph, AgentState], tuple[AgentState, int]], filename: str):
    graph, start_position = read_input_file(filename)
    time_start = time.time()
    for _ in range(TEST_REPEATS):
        goal_state, num_visited = search_alg(graph, start_position)
    time_elapsed = (time.time() - time_start) / TEST_REPEATS

    if goal_state is None:
        print('No solution')
    else:
        print(*[x + 1 for x in goal_state.path], sep = '->')
        print('Path cost:', len(goal_state.path) - 1)
        print('Visited states:', num_visited)
        print('Average time:', time_elapsed)

# BFS
we will search for the goal state with a bfs in state graph.  
the goal check is done after the state is out of the queue.

In [617]:
def BFS(graph: Graph, start_position: int):    
    start = Agent.initial_state(graph, start_position)
    
    if Agent.has_reached_goal(start):
        return start
    
    frontier= []
    visited = set()
    frontier.append(start)
    
    while frontier:
        current_state = frontier.pop(0)
        if Agent.has_reached_goal(current_state):
            return current_state, len(visited)

        for next_state in Agent.actions(graph, current_state):
            if next_state in visited or next_state in frontier:
                continue
            frontier.append(next_state)

        visited.add(current_state)

    return None, None

In [618]:
run(BFS, "assets\Tests\Test0.txt")

in inital state {3: 2, 4: 1} and main is {2: 3, 1: 4}
-----------------------------------------------
agent_postition: 1, time: 1, pizza_carrying: (1, 4), left_priority: [(3, 4)], path: [0, 1], left_students: {3: 2, 4: 1}****************agent_postition: 1, time: 1, pizza_carrying: (None, None), left_priority: [(3, 4)], path: [0, 1], left_students: {3: 2, 4: 1}
-----------------------------------------------
-----------------------------------------------
agent_postition: 0, time: 2, pizza_carrying: (1, 4), left_priority: [(3, 4)], path: [0, 1, 0], left_students: {3: 2, 4: 1}****************agent_postition: 2, time: 2, pizza_carrying: (2, 3), left_priority: [(3, 4)], path: [0, 1, 2], left_students: {3: 2, 4: 1}****************agent_postition: 2, time: 2, pizza_carrying: (1, 4), left_priority: [(3, 4)], path: [0, 1, 2], left_students: {3: 2, 4: 1}
-----------------------------------------------
-----------------------------------------------
agent_postition: 0, time: 2, pizza_carrying: (