<a href="https://colab.research.google.com/github/ZieiN/Labs-in-Sirius/blob/main/My_lab4upd.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Anytime heuristic search 

In [None]:
import copy
import math
import math
from random import shuffle
from heapq import heappop, heappush, heapify
from random import randint
import time

EPS = 0.00001

## Grid Search Domain Representation

In [None]:
class Map:
    '''
    Square grid map class represents the environment for our moving agent
        - width -- the number of columns in the grid
        - height -- the number of rows in the grid
        - cells -- the binary matrix, that represents the grid. 0 - cell is traversable, 1 - cell is blocked
    '''

    def __init__(self):
        '''
        Default constructor
        '''

        self.width = 0
        self.height = 0
        self.cells = []
    

    def ReadFromString(self, cellStr, width, height):
        '''
        Converting a string (with '#' representing obstacles and '.' representing free cells) to a grid
        '''
        self.width = width
        self.height = height
        self.cells = [[0 for _ in range(width)] for _ in range(height)]
        cellLines = cellStr.split("\n")
        i = 0
        j = 0
        for l in cellLines:
            if len(l) != 0:
                j = 0
                for c in l:
                    if c == '.':
                        self.cells[i][j] = 0
                    elif c == '#':
                        self.cells[i][j] = 1
                    else:
                        continue
                    j += 1
                if j != width:
                    raise Exception("Size Error. Map width = ", j, ", but must be", width )
                i += 1

        if i != height:
            raise Exception("Size Error. Map height = ", i, ", but must be", height )
    
    def SetGridCells(self, width, height, gridCells):
        '''
        Initialization of map by list of cells.
        '''
        self.width = width
        self.height = height
        self.cells = gridCells

    def InBounds(self, i, j):
        '''
        Check if the cell is on a grid.
        '''
        return (0 <= j < self.width) and (0 <= i < self.height)
    
    def Traversable(self, i, j):
        '''
        Check if the cell is not an obstacle.
        '''
        return not self.cells[i][j]
        
    def LegalMove(self, i1, j1, i2, j2):
      return self.InBounds(i1, j2) and self.Traversable(i1,j2) and self.InBounds(i2,j1) and self.Traversable(i2, j1)

    def GetNeighbours(self, i, j):
        '''
        Returns a list of neighbouring cells as (i,j) tuples. 
        Fucntions should returns such neighbours, that allows both cardinal and diagonal moves, 
        but dissalows cutting corners and squezzing. 

        Parameters:
            i, j (int, int): Position on the grid map, for which neighbours are needed

        Returns:
            neighbours(list[(int, int)]): List of neighbours grid map (i, j) coordinates.   
        '''
        neighbors = []
        dy = [1,1,1,0,0,-1,-1,-1]
        dx = [0,1,-1,1,-1,0,1,-1]
        for k in range(8):
          x = i+dx[k]
          y = j+dy[k]
          if self.InBounds(x, y) and self.Traversable(x, y) and self.LegalMove(i, j, x, y):
            neighbors.append((x,y))
        return neighbors

    def CalculateCost(self, i1, j1, i2, j2):
        '''                    
        Computes cost of simple moves between cells on grid, 
        or euclidean distance, if there isnt a simple move between cells

        Args:
            i1, j1 (int, int): Positions of first cell on the grid map
            i2, j2 (int, int): Positions of first cell on the grid map
   

        Returns:
            [float]: cost of simple moves or euclidean distance.   
        '''

        if(abs(i1 - i2) + abs(j1 - j2) == 2):
            return 1.41421356237
        elif (abs(i1 - i2) + abs(j1 - j2) == 1):
            return 1.0
        else:
            return math.sqrt(abs(i1 - i2) ** 2 + abs(j1 - j2) ** 2)


In [None]:
class Node:
    '''
    Class represents a grid search node

    - i, j: coordinates of corresponding grid element
    - g: g-value of the node
    - h: h-value of the node
    - F: f-value of the node
    - parent: pointer to the parent-node 
    '''

    def __init__(self, coord, g = math.inf, h = math.inf, F = None, parent = None):
        self.i = coord[0]
        self.j = coord[1]
        self.g = g
        self.h = h
        if F is None:
            self.F = self.g + h
        else:
            self.F = F        
        self.parent = parent
    
    def __eq__(self, other):
        return (self.i == other.i) and (self.j == other.j)
    
    def __lt__(self, other):
        return (self.F < other.F) or ((self.F == other.F) and (self.g < other.g))


In [None]:
def EuclidDistance(node1, node2):
    '''
    Returns a euclidean distance between cells corresponding to nodes
    '''
    #return math.sqrt(2)*min(abs(node1.i - node2.i), abs(node1.j - node2.j)) + abs(abs(node1.i - node2.i) - abs(node1.j - node2.j))
    return math.sqrt((node1.i - node2.i) ** 2 + (node1.j - node2.j) ** 2)


In [None]:
def MakePath(goal):
    '''
    Creates a path by tracing parent pointers from the goal node to the start node
    It also returns path's length.
    '''

    length = goal.g
    current = goal
    path = []
    while current.parent:
        path.append(current)
        current = current.parent
    path.append(current)
    return path[::-1], length


## Implementing OPEN and CLOSED

In [None]:
class Open:

    def __init__(self):
        self.elements = []    
        # self.dic = {}
    def __iter__(self):
        return iter(self.elements)
    
    def __len__(self):
        return len(self.elements)

    def isEmpty(self):
        if len(self.elements) != 0:
            return False
        return True

    def AddNode(self, item : Node, *args):
        '''
        AddNode is the method that puts (e.g. inserts or updates) the node to OPEN
        When implementing it do not forget to handle all possible cases:
         - node already in OPEN but the new g-value is better;
         - node already in OPEN but the new g-value is worse;
         - node is not in OPEN yet.
        '''
        for existingNode in self.elements:
            if existingNode.i == item.i and existingNode.j == item.j:
                if existingNode.g > item.g:
                    existingNode.g = item.g
                    existingNode.F = item.F
                    existingNode.parent = item.parent
                    return
                else:
                    return
        self.elements.append(item)
        return

    def GetBestNode(self, *args):
        '''
        GetBestNode is the method that 
         i) finds the best node, i.e. the one with the lowest f-value (f=g+h) (for Dijkstra h=0),
         ii) removes it from OPEN and 
         iii) returns it
        '''
        bestF = math.inf
        bestCoord = 0
        for i in range(len(self.elements)):
            if self.elements[i].F < bestF:
                bestCoord = i
                bestF = self.elements[i].F
                
        best = self.elements.pop(bestCoord)
        return best
    
    def UniteAndUpdate(self, other, newWeight):
        '''
        Unite Open set with other container and update F-values of nodes with new weight:
        
        f(Node) = g(Node) + weight * h(Node)
        '''
#TODO
        dic={}
        for i in range(len(self.elements)):
          self.elements[i].F = self.elements[i].g + newWeight*self.elements[i].h
          dic[(self.elements[i].i, self.elements[i].j)] = i
        heapify(self.elements)
        for x in other:
          x.F = x.g + newWeight * x.h
          if (x.i, x.j) in dic:
            id = dic[(x.i, x.j)]
            if x.g < self.elements[id].g:
              self.elements[id].g = x.g
              self.elements[id].F = x.F
          else:
            self.elements.append(x)


In [None]:
class Closed:
    
    def __init__(self):
        self.elements = {}


    def __iter__(self):
        return iter(self.elements)

    def __len__(self):
        return len(self.elements)

    def AddNode(self, item : Node):
        '''
        AddNode is the method that inserts the node to CLOSED
        '''
        if self.WasExpanded(item):
          tmp = self.elements[(item.i, item.j)]
          if item.g < tmp:
            self.elements[(item.i, item.j)] = item.g
        else:
          self.elements[(item.i, item.j)] = item.g

    def WasExpanded(self, item : Node):
        '''
        WasExpanded is the method that checks if a node has been expanded
        '''
        return ((item.i, item.j) in self.elements)

    def GetNode(self, item):
        '''
        Finds node from Closed set by (i, j) coordinates on grid and returns it. Returns None if node was not found.
        '''
#TODO (copy from previous labs)e
        return self.elements.get((item.i,item.j), None)

## Weighted A*

In [None]:
def WAStar(startCoord, goalCoord, gridMap, hFunc, weight):
    '''
    Runs weighted A* search algorithm without re-expansion on any domain.

    Parameters:
        startCoord (tuple(int, int)): The start state of search in useful for your implementation form.
        goalCoord (tuple(int, int)): The goal state of search in useful for your implementation form.
        gridMap: An additional domain information (such as grid map).
        hFunc (function): The heuristics function for currently using domain.
        weight (float): The weight of heuristics in F-value computation.

    Returns:
        pathFound (bool): Path was found or not.   
        lastNode (Node): The last node in path. None if path was not found.
    '''

    OPEN = Open()
    CLOSED = Closed()
    goalNode = Node(goalCoord)
    startNode = Node(startCoord)
    OPEN.AddNode(Node(startCoord, 0))
    while not OPEN.isEmpty():
      current = OPEN.GetBestNode()
      if current.i == goalCoord[0] and current.j == goalCoord[1]:
        return True, current, len(CLOSED)
      if CLOSED.WasExpanded(current):
        continue
      CLOSED.AddNode(current)
      neb = gridMap.GetNeighbours(current.i, current.j)
      for state in neb:
        new_state = Node((state[0], state[1]))
        new_state.g = current.g + gridMap.CalculateCost(current.i, current.j, new_state.i, new_state.j)
        new_state.h = hFunc(new_state, goalNode)
        new_state.F = new_state.g + weight*new_state.h
        new_state.parent = current
        if CLOSED.WasExpanded(new_state):
          if CLOSED.GetNode(new_state) > new_state.g:
            OPEN.AddNode(new_state)
        else:
          OPEN.AddNode(new_state)
    return False, None, len(CLOSED)


## Naive Anytime A*

In [None]:
def NaiveATAStar(startCoord, goalCoord, gridMap, hFunc, startWeight = 3.0, stepWeight = 0.5):
    '''
    Repeatedly runs weighted A* search algorithm without re-expansion on any domain, 
    decreasing current weight from startWeight to 1.0 by stepWeight.

    Parameters:
        startCoord (tuple(int, int)): The start state of search in useful for your implementation form.
        goalCoord (tuple(int, int)): The goal state of search in useful for your implementation form.
        gridMap: An additional domain information (such as grid map).
        hFunc (function): The heuristics function for currently using domain.
        startWeigth (float): The initial weight of heuristics in F-value computation. Must be greater or equal to 1.0
        stepWeigth (float): The value by which the weight will be reduced, if it is provided by the algorithm 

    Returns:
        pathFound (bool): Path was found or not.   
        lastNode (Node): The last node in path. None if path was not found.
    '''

    weight = startWeight if (startWeight >= 1) else 1
    result = WAStar(startCoord, goalCoord, gridMap, hFunc, weight)
    yield result[0], result[1], result[2], weight

    while True:
        weight = (weight - stepWeight) if (weight - stepWeight) >= 1 else 1
        result = WAStar(startCoord, goalCoord, gridMap, hFunc, weight)
        yield result[0], result[1], result[2], weight
        if weight <= 1:
            break


## Anytime Repairing A*

In [None]:
def ComputeActualWeight(OPEN, INCONS, weight, goalNode):
    minOpen = math.inf
    minIncons = math.inf
    if not OPEN.isEmpty():
        minOpen = min(v.g + v.h for v in OPEN)
    if not INCONS.isEmpty():
        minIncons = min(v.g + v.h for v in INCONS)
    minG = min(minOpen, minIncons)

    return min(weight, goalNode.g / minG)

def ImprovePath(goalNode, gridMap, hFunc, OPEN, INCONS, CLOSED, VISITED, weight):

    while not OPEN.isEmpty():
      current = OPEN.GetBestNode()
      if current.F >= goalNode.F:
        OPEN.AddNode(current)
        return True, goalNode, len(CLOSED)
      CLOSED.AddNode(current)
      VISITED.AddNode(current)
      if current.i == goalNode.i and current.j == goalNode.j:
        return True, current, len(CLOSED)
      neb = gridMap.GetNeighbours(current.i, current.j)
      for state in neb:
        new_state = Node((state[0], state[1]))
        new_state.g = current.g + gridMap.CalculateCost(current.i, current.j, new_state.i, new_state.j)
        new_state.h = hFunc(new_state, goalNode)
        new_state.F = new_state.g + weight*new_state.h
        new_state.parent = current
        oldG = 0
        if not VISITED.WasExpanded(new_state):
          oldG = math.inf
        else:
          oldG = VISITED.GetNode(new_state)
        if oldG > new_state.g:
          if not CLOSED.WasExpanded(new_state):
            OPEN.AddNode(new_state)
          else:
            INCONS.AddNode(new_state)
    return False, goalNode, len(CLOSED)

def ARAStar(startCoord, goalCoord, gridMap, hFunc, startWeight = 3.0, stepWeight = 0.5):
    '''
    Runs Anytime Repairing A* search algorithm.

    Parameters:
        startCoord (tuple(int, int)): The start state of search in useful for your implementation form.
        goalCoord (tuple(int, int)): The goal state of search in useful for your implementation form.
        gridMap: An additional domain information (such as grid map).
        hFunc (function): The heuristics function for currently using domain.
        startWeigth (float): The initial weight of heuristics in F-value computation. Must be greater or equal to 1.0
        stepWeigth (float): The value by which the weight will be reduced, if it is provided by the algorithm  (not used)

    Returns:
        pathFound (bool): Path was found or not.   
        lastNode (Node): The last node in path. None if path was not found.
    '''
    OPEN = Open()
    INCONS = Open()
    CLOSED = Closed()
    VISITED = Closed()
    goalNode = Node(goalCoord, math.inf, math.inf, math.inf)
    startNode = Node(startCoord, 0)
    startNode.h = hFunc(startNode, goalNode)
    startNode.F = startWeight * startNode.h
    weight = startWeight
    OPEN.AddNode(startNode)
    pathFound, goalNode, iterations = ImprovePath(goalNode, gridMap, hFunc, OPEN, INCONS, CLOSED, VISITED, weight)
    yield pathFound, goalNode, iterations, weight
    ActualWeight = ComputeActualWeight(OPEN, INCONS, weight, goalNode)
    while ActualWeight > 1:
      weight = min(weight - stepWeight, ActualWeight)
      if weight < 1:
        weight = 1
      OPEN.UniteAndUpdate(INCONS, weight)
      CLOSED = Closed()
      INCONS = Open()
      pathFound, goalNode, iterations = ImprovePath(goalNode, gridMap, hFunc, OPEN, INCONS, CLOSED, VISITED, weight)
      yield pathFound, goalNode, iterations, weight
      ActualWeight =  ComputeActualWeight(OPEN, INCONS, weight, goalNode)


# Experiment

In [None]:
def SimpleTest(SearchGenerator, task, heuristicFunction, startWeigth, stepWeigth, maxTime):
    '''
    SimpleTest runs SearchGenerator on one task (use a number from 0 to 25 to 
    choose a certain debug task on simple map or None to choose a random task 
    from this pool) with *args as optional arguments and displays:
     - 'Path found!' and some statistics -- path was found
     - 'Path not found!' -- path was not found
     - 'Execution error' -- an error occurred while executing the SearchFunction In first two cases function also draws visualisation of the task

    Parameters:
        SearchGenerator (generator):
        task (int):
        heuristicFunction (function): The heuristics function for currently using domain.
        startWeigth (float): The initial weight of heuristics in F-value computation. Must be greater or equal to 1.0
        stepWeigth (float): The value by which the weight will be reduced, if it is provided by the algorithm 
        maxTime (float): The amount of time given for the pathfinding procedure. Set in seconds.  

    Returns:
        pathFound (bool): Path was found or not.   
        length (Node): The length of the last found path.
        w (int): The sub-optimality value of the last found path.. 
    '''
    
    height = 15
    width = 30
    mapstr = '''
. . . # # . . . . . . . . # # . . . # . . # # . . . . . . .  
. . . # # # # # . . # . . # # . . . . . . # # . . . . . . . 
. . . . . . . # . . # . . # # . . . # . . # # . . . . . . . 
. . . # # . . # . . # . . # # . . . # . . # # . . . . . . . 
. . . # # . . # . . # . . # # . . . # . . # # . . . . . . . 
. . . # # . . # . . # . . # # . . . # . . # # # # # . . . . 
. . . # # . . # . . # . . # # . . . # . . # # # # # . . . . 
. . . . . . . # . . # . . # # . . . # . . # . . . . . . . . 
. . . # # . . # . . # . . # # . . . # . . # . . . . . . . . 
. . . # # . . # . . # . . # # . . . # . . # . . . . . . . . 
. . . # # . . . . . # . . . . . . . # . . . . . . . . . . . 
. . . # # # # # # # # # # # # # . # # . # # # # # # # . # # 
. . . # # . . . . . . . . # # . . . . . . . . . . . . . . . 
. . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
. . . # # . . . . . . . . # # . . . . . . . . . . . . . . .
'''

    taskMap = Map()
    taskMap.ReadFromString(mapstr, width, height)
    starts = [(0, 1), (6, 2), (5, 6), (13, 7), (4, 23)]
    goals = [(1, 28), (2, 29), (3, 20), (3, 20), (0, 0)]
    lengths = [47.8994949, 40.8994949, 38.2426407, 21.8284271, 47.4852814]

    if (task is None) or not (0 <= task < 5):
        task = randint(0, 4)

    start = Node(starts[task])
    goal = Node(goals[task])
    length = lengths[task]

    trueResult = WAStar((start.i, start.j), (goal.i, goal.j), taskMap, heuristicFunction, 1) 
    path = MakePath(trueResult[1])
    print("A* Path found! Length: {:.7f}".format(path[1]))

    start_time = time.time()
    lastResult = False, None

    iter_count = 0
    timer = 0.0
    finale_timer = 0.0
    w = 0.0
    for result in SearchGenerator((start.i, start.j), (goal.i, goal.j), taskMap, heuristicFunction, startWeigth, stepWeigth):
        iter_count += 1
        timer = time.time() - start_time
        
        if timer < maxTime:
            lastResult = result
            finale_timer = timer
            if lastResult[0]:
                wLength = lastResult[1].g
                w = wLength / length
                print(iter_count, "iter. Real sub-optimality value: {:.5f}".format(w),'Current weight: {:.5f}'.format(lastResult[3]), 'Time:', timer, 'Steps:', lastResult[2])

                if w - 1 < EPS:
                    break
            else:
                print(iter_count, "iter. Weighted path not found!", 'Time:', timer, 'Steps:', lastResult[2])
        else:
            break
    
    if lastResult[0]:
        wLength = lastResult[1].g
        w = wLength/length
        print("Weighted path found! Finale sub-optimality value: {:.7f}. Iterations: {:d}. Time: {:.7}".format(w, iter_count, finale_timer))
        return True, wLength, w
    else:
        print("Weighted path not found!")
        return False, math.inf, math.inf


In [None]:
for i in range(5):
    SimpleTest(NaiveATAStar, i, EuclidDistance, 5, 0.5, 0.05)

A* Path found! Length: 47.8994949
1 iter. Real sub-optimality value: 1.60692 Current weight: 5.00000 Time: 0.0069310665130615234 Steps: 135
2 iter. Real sub-optimality value: 1.60692 Current weight: 4.50000 Time: 0.015987634658813477 Steps: 136
3 iter. Real sub-optimality value: 1.35133 Current weight: 4.00000 Time: 0.023437976837158203 Steps: 150
4 iter. Real sub-optimality value: 1.35133 Current weight: 3.50000 Time: 0.03186345100402832 Steps: 162
5 iter. Real sub-optimality value: 1.35133 Current weight: 3.00000 Time: 0.04004669189453125 Steps: 172
Weighted path found! Finale sub-optimality value: 1.3513279. Iterations: 6. Time: 0.04004669
A* Path found! Length: 40.8994949
1 iter. Real sub-optimality value: 1.71079 Current weight: 5.00000 Time: 0.006646394729614258 Steps: 134
2 iter. Real sub-optimality value: 1.71079 Current weight: 4.50000 Time: 0.012699127197265625 Steps: 133
3 iter. Real sub-optimality value: 1.46875 Current weight: 4.00000 Time: 0.022404193878173828 Steps: 144


In [None]:
for i in range(5):
    SimpleTest(ARAStar, i, EuclidDistance, 5, 0.1, 0.05)

A* Path found! Length: 47.8994949
1 iter. Real sub-optimality value: 1.60692 Current weight: 5.00000 Time: 0.0069501399993896484 Steps: 136
2 iter. Real sub-optimality value: 1.60692 Current weight: 2.74895 Time: 0.008005619049072266 Steps: 7
3 iter. Real sub-optimality value: 1.60692 Current weight: 2.64895 Time: 0.014336109161376953 Steps: 13
4 iter. Real sub-optimality value: 1.60692 Current weight: 2.44463 Time: 0.015501976013183594 Steps: 18
5 iter. Real sub-optimality value: 1.01730 Current weight: 2.13511 Time: 0.019936561584472656 Steps: 62
6 iter. Real sub-optimality value: 1.00000 Current weight: 1.18426 Time: 0.02506399154663086 Steps: 67
Weighted path found! Finale sub-optimality value: 1.0000000. Iterations: 6. Time: 0.02506399
A* Path found! Length: 40.8994949
1 iter. Real sub-optimality value: 1.71079 Current weight: 5.00000 Time: 0.0071048736572265625 Steps: 135
2 iter. Real sub-optimality value: 1.71079 Current weight: 2.38936 Time: 0.008166313171386719 Steps: 14
3 ite