In [3]:
#imports
import os
import time
import math
import types
import pandas as pd
import networkx as nx




DEBUG_MODE = False

#folder for content
FOLDER = '/content/a1search'

#import contents of the folder
def get_filename(alias = None):
  d = {
      "csv1":  f'{FOLDER}/TestCase_01_NodeID.csv',
      "csv2":  f'{FOLDER}/TestCase_02_NodeID.csv',
      "csv3":  f'{FOLDER}/TestCase_03_NodeID.csv',
      "txt1":  f'{FOLDER}/TestCase_01_EdgeList.txt',
      "txt2":  f'{FOLDER}/TestCase_02_EdgeList.txt',
      "txt3":  f'{FOLDER}/TestCase_03_EdgeList.txt',
      "txt1mk": f'{FOLDER}/TestCase_01_mk.txt',
      "txt2mk": f'{FOLDER}/TestCase_02_mk.txt',
  }
  if alias:
    return d[alias]
  else:
    return d

def get_start_and_goal(which_csv):
  """
  get [start_node_name, goal_node_name] from which file, which in ['csv1', 'csv2', 'csv3']
  """
  csv = read_csv(get_filename(which_csv))
  start = csv[0][0]
  goal = csv[-1][0]
  return [start, goal]


#name of directory working
def pwd():

  return os.getcwd()

def ls(folder):

  #Lists all files and directories in the folder

  for root, files in os.walk(folder):
    for filename in files:
      print(os.path.join(root, filename))

def my_debug(*args):
  if DEBUG_MODE:
    print("DEBUG", args)



#reads csv returns array

def read_csv(filename, header = None):


  df = pd.read_csv(filename, header = header)
  return df.to_numpy()

def convert_array_to_dict(spatial_array):

  #convert a spatial array (['node', x, y][]) to dict {'node': (x, y)}

  d = {}
  for n, x, y in spatial_array:
    d[n] = (x, y)
  return d


#construct graph from array edges
def construct_graph(arr):

  g = nx.Graph()
  for row in arr:
    node1, node2, weight = row
    g.add_edge(node1, node2, weight=weight)
  return g

#NeworkX graph
def draw_graph(graph, label=True):


  nx.draw(graph, with_labels = label)
  # nx.draw(graph, pos=nx.spring_layout(graph))


#returns neighbors
def get_neighbors(graph, node0, explored_node_dict = None):

  unexplored = []
  neighbors = graph.neighbors(node0)

  if explored_node_dict is None:
    return neighbors
  else:
    for x in neighbors:
      found = explored_node_dict.get(x, False)
      if not found:
        unexplored.append(x)
    return unexplored

def expand_frontier(algorithm, frontier, neighbors):

  #Expand the frontier set
  #DFS and BFS do NOT use heuristic_dict.
  #The cost_dict can be ignore if all edges' cost are the same.

  done = True

  match algorithm:
    case 'DFS':
      frontier = neighbors + frontier
    case 'BFS':
      frontier = frontier + neighbors
    # case 'Astar_Euclid':
      # pass
    # case 'Astar_Manhattan':
      # pass
    # case 'Astar_Chebyshev':
      # pass
    case _:
      done = False
      print('ERROR: wrong algorithm,', algorithm)
  return done, frontier

#search dfs or bfs
def search_dfs(graph, start, goal, max_steps, algorithm = 'DFS'):

  walk_path = []
  explored = {}
  frontier = [start]
  step = 0
  complete = False

  if algorithm not in ['BFS', 'DFS']:
    return complete, step, walk_path

  my_debug(0, None, 'frontier =>', frontier)
  for step in range(1, max_steps + 1):
    node0 = frontier.pop(0)
    explored[node0] = True   # remember explored nodes
    walk_path = walk_path + [node0]
    if node0 == goal:
      complete = True
      my_debug(step, node0, 'frontier =>', frontier)
      break
    else:
      # this is the only diff btw BFS and DFS
      done, frontier = expand_frontier(algorithm, frontier, get_neighbors(graph, node0, explored))
      # if algorithm == 'DFS':
        # frontier = get_neighbors(graph, node0, explored) + frontier
      # else:
        # frontier = frontier + get_neighbors(graph, node0, explored)
      my_debug(step, node0, 'frontier =>', frontier)

      # stop for loop if not done
      if not done:
        break
  return [complete, step, walk_path]

#search bfs
def search_bfs(graph, start, goal, max_steps):

  return search_dfs(graph, start, goal, max_steps, 'BFS')


#generic heuristic
def gen_heuristic_dict(algorithm, spatial_dict, goal, is_freezed = True):

  #Based on spatial data ([node, x, y][]) to calc the distance between node and goal.
  #Support 3 kinds of distance: Euclid, Manhattan, and Chebyshev.
  #return a (optional)freezed heuristic dict as {node: distance}

  # heuristic dict
  h = {}

  gx, gy = spatial_dict[goal]
  for n, (x, y) in spatial_dict.items():
    match algorithm:
      case 'Astar_Euclid':
        h[n] = math.sqrt((gx - x) * (gx - x) + (gy - y) * (gy - y))
      case 'Astar_Manhattan':
        h[n] = abs(gx - x) + abs(gy - y)
      case 'Astar_Chebyshev':
        h[n] = max(abs(gx - x), abs(gy - y))
      case 'Astar_Heuristic_0':
        h[n] = 0
      case _:
        raise ValueError(f"ERROR: wrong algorithm {algorithm}")
  if is_freezed:
    # Freeze the dictionary
    h = types.MappingProxyType(h)
  return h

#A* search
def search_a_star(graph, start, goal, heuristic_dict, max_steps):


  # pylint: disable=missing-class-docstring
  # data structure for one node in Frontier, sortable
  class FrontierNode:
    def __init__(self, node, cost):
      self.node = node
      self.cost = cost
    def __lt__(self, other):
      return self.cost < other.cost
    def __str__(self):
      return f"{{'{self.node}':{self.cost}}}, "
    def __repr__(self):
      return f"{{'{self.node}':{self.cost}}}, "

  def upd_cost_dict(node, node_next_array, cost_dict_ref):
    #calc and save the cost for node_next, which is the minimal cost from start to node_next
    cost_node_to_node_next = 1
    for n in node_next_array:
      cost_dict_ref[n] = min(
        cost_dict_ref[node] + cost_node_to_node_next,
        cost_dict_ref.get(n, float('inf'))
      )
    return cost_dict_ref

    #return total cost from start to node, or infinite if NA
  def g(node, cost_dict):
    return cost_dict.get(node, float('inf'))

#return the heuristic value of the node, or infinite if NA
  def h(node, heuristic_dict):
    return heuristic_dict.get(node, float('inf'))

  def expand_frontier_astar(frontier_nodes, neighbors, cost_dict, heuristic_dict):
    for n in neighbors:
      cost = h(n, heuristic_dict) + g(n, cost_dict)
      frontier_nodes.append(FrontierNode(n, cost))
      frontier_nodes.sort()
    return frontier_nodes

#pop frontier for nodes
  def pop_frontier_nodes(frontier_nodes):

    node = frontier_nodes.pop(0)  # a bug -> .pop(), now fixed
    return  node.node, node.cost, frontier_nodes

  # vars
  step = 0
  complete = False
  node0 = None
  node0_cost = None
  walk_path = []
  explored = {}
  cost_dict = {f'{start}': 0}
  frontier_nodes: list[FrontierNode] = [FrontierNode(start, 0 + h(start, heuristic_dict))]

  my_debug(step, node0, node0_cost, 'graph =>', graph)
  my_debug(step, node0, node0_cost, 'heuristic_dict =>', heuristic_dict)
  my_debug(step, node0, node0_cost, 'frontier_nodes =>', frontier_nodes)
  my_debug(step, node0, node0_cost, 'cost_dict =>', cost_dict)

  # logic
  for step in range(1, max_steps + 1):

    # pop the lowest one from frontier
    node0, node0_cost, frontier_nodes  = pop_frontier_nodes(frontier_nodes)

    # remember explored nodes
    explored[node0] = True

    # record the walk path
    walk_path = walk_path + [node0]

    # stop if reach the goal
    if node0 == goal:
      complete = True
      my_debug(step, node0, node0_cost, 'frontier_nodes =>', frontier_nodes)
      my_debug(step, node0, node0_cost, 'cost_dict =>', cost_dict)
      break

    # get all neighbors of node0
    neighbors = get_neighbors(graph, node0, explored)

    # calc new neighbors' g(), and save them into cost_dict
    cost_dict = upd_cost_dict(node0, neighbors, cost_dict)

    # expand frontier with new neighbors and their g() + h()
    frontier_nodes = expand_frontier_astar(frontier_nodes, neighbors, cost_dict, heuristic_dict)

    my_debug(step, node0, node0_cost, 'frontier_nodes =>', frontier_nodes)
    my_debug(step, node0, node0_cost, 'cost_dict =>', cost_dict)

  return [complete, step, walk_path, node0]



def run_dfs(which_txt, which_csv, max_steps = 10000, algorithm = "DFS"):
  if algorithm not in ["BFS", "DFS"]:
    return False, 0, 0, []

  [start, goal] = get_start_and_goal(which_csv)
  start_time = time.time()


  if algorithm == "DFS":
    complete, step, walk_path = search_dfs(construct_graph(read_csv(get_filename(which_txt))), start, goal, max_steps)
  else:
    complete, step, walk_path = search_bfs(construct_graph(read_csv(get_filename(which_txt))), start, goal, max_steps)

  seconds = time.time() - start_time
  return complete, step, seconds, walk_path

def run_bfs(which_txt, which_csv, max_steps = 10000):
  return run_dfs(which_txt, which_csv, max_steps, "BFS")

#demo run
def run_demo(which_txt, which_csv = None, node0 = 'N_1'):
  # show txt data
  print(f'{which_txt}: ')
  txt1 = read_csv(get_filename(which_txt))
  print(txt1[0])
  print(txt1[0][0], type(txt1[0][0]))
  print(txt1[0][1], type(txt1[0][1]))
  print(txt1[0][2], type(txt1[0][2]))
  print()

  # start and goal nodes
  if which_csv is not None:
    print('[start, goal]: ', get_start_and_goal(which_csv))

  # new graph
  g = construct_graph(txt1)

  # show neighbors of node0
  print(node0, 'neighbors: ', get_neighbors(g, node0))

  # show graph
  draw_graph(g)

#run A* with heuristic
def run_a_star(algorithm, which_txt, which_csv, max_steps = 10000):


  # reading start and goal nodes
  [start, goal] = get_start_and_goal(which_csv)

  # heuristic data and graph
  heuristic_dict = gen_heuristic_dict(algorithm,
        convert_array_to_dict(read_csv(get_filename(which_csv))) , goal)

  # graph
  graph = construct_graph(read_csv(get_filename(which_txt)))

  # run
  start_time = time.time()
  complete, step, walk_path, node_reached = search_a_star(
      graph, start, goal,
      heuristic_dict, max_steps)
  seconds = time.time() - start_time
  return complete, step, seconds, walk_path, node_reached


#run defintion
def run():
  for output_mode in ['stat', 'path-only']:
    print()

    # DFS
    for (alg, txt, csv) in [('DFS', 'txt1', 'csv1'), ('DFS', 'txt2', 'csv2'), ('DFS', 'txt3', 'csv3')]:
      complete, step, seconds, walk_path = run_dfs(txt, csv, 10000)
      match output_mode:
        case 'stat':
          print(f"{alg}:", "complete?", complete, "steps?", step, "seconds?", seconds, "walk_path?", walk_path)
        case 'path-only':
          print(f"{alg}:", walk_path)

    # BFS
    for (alg, txt, csv) in [('BFS', 'txt1', 'csv1'), ('BFS', 'txt2', 'csv2'), ('BFS', 'txt3', 'csv3')]:
      complete, step, seconds, walk_path = run_bfs(txt, csv, 10000)
      match output_mode:
        case 'stat':
          print(f"{alg}:", "complete?", complete, "steps?", step, "seconds?", seconds, "walk_path?", walk_path)
        case 'path-only':
          print(f"{alg}:", walk_path)

    # A*
    for alg in ['Astar_Manhattan', 'Astar_Euclid', 'Astar_Chebyshev', 'Astar_Heuristic_0']:
      for (txt, csv) in [('txt1', 'csv1'), ('txt2', 'csv2'), ('txt3', 'csv3')]:
        complete, step, seconds, walk_path, node_reached = run_a_star(alg, txt, csv, 10000)
        match output_mode:
          case 'stat':
            print(f"{alg}:", "complete?", complete, "steps?", step, "seconds?", seconds, "walk_path?", walk_path)
          case 'path-only':
            print(f"{alg}:", walk_path)

  '''
  run_demo('txt1', 'csv1')
  run_demo('txt2', 'csv2')
  run_demo('txt3', 'csv3')
  '''
  return

if __name__ == "__main__":
  # DEBUG_MODE = True   # enable/disable debug mode
  run()




DFS: complete? True steps? 20 seconds? 0.0035262107849121094 walk_path? ['N_0', 'N_1', 'N_6', 'N_5', 'N_10', 'N_11', 'N_16', 'N_15', 'N_20', 'N_21', 'N_7', 'N_12', 'N_13', 'N_14', 'N_8', 'N_9', 'N_4', 'N_18', 'N_19', 'N_24']
DFS: complete? True steps? 56 seconds? 0.002834796905517578 walk_path? ['N_0', 'N_10', 'N_20', 'N_11', 'N_1', 'N_2', 'N_3', 'N_4', 'N_13', 'N_14', 'N_23', 'N_33', 'N_24', 'N_34', 'N_35', 'N_45', 'N_55', 'N_44', 'N_54', 'N_64', 'N_43', 'N_53', 'N_63', 'N_42', 'N_32', 'N_31', 'N_41', 'N_51', 'N_50', 'N_61', 'N_60', 'N_71', 'N_81', 'N_91', 'N_90', 'N_80', 'N_70', 'N_72', 'N_82', 'N_62', 'N_30', 'N_40', 'N_21', 'N_52', 'N_36', 'N_46', 'N_56', 'N_66', 'N_57', 'N_67', 'N_68', 'N_69', 'N_78', 'N_79', 'N_89', 'N_99']
DFS: complete? True steps? 424 seconds? 0.007917404174804688 walk_path? ['N_0', 'N_1', 'N_2', 'N_102', 'N_100', 'N_200', 'N_300', 'N_400', 'N_500', 'N_501', 'N_600', 'N_401', 'N_301', 'N_302', 'N_402', 'N_502', 'N_602', 'N_601', 'N_603', 'N_503', 'N_504', 'N_

#**Report**#

How to execute: google colab was used to run

Format:
steps
time(seconds)

##DFS:

20  
0.0035262107849121094

56   
0.002834796905517578

424    
0.007917404174804688



##BFS:

25  
0.0028693675994873047

74  
0.003057241439819336

977

0.010239839553833008




##Astar_Mangattan:

15  
0.00016045570373535156

38  
0.0003218650817871094

857  
0.006470918655395508



##Astar_Euclid:

16  
0.00015020370483398438

44  
0.0004906654357910156

886  
0.006995439529418945

##Astar_Chebyshev:

18  
0.00014448165893554688

48  
0.0002956390380859375

915

0.006708621978759766



##Astar_Heuristic_0:

25

0.00015592575073242188

74

0.00040030479431152344

977

0.007172822952270508


#**Data**#

Search speed order:
fast to slow


DFS

A*_Manhattan

A*_Euclid  

A*_Chebyshev

BFS == Astar_Manhattan.

The 2nd case is an exception as DFS is slower than the A* Manhattan and other A* methods



#**Anaylsis**#

The reason that I chose the algorithms that I chose is because they make the walk closer.

Manhattan walks through x then y, h(A) = x + y

Euclid walks through z, h(A) = z

Chebyshev is less accurate then Manhattan, use the bigger one between x or y

 h(A) = max(x, y)

Heuristic_0 is always 0, h(A) = 0


Manhatten is more close with the walk path and the edge are always connected two nodes with gap 1.

Euclid is close to Manhatten, but less accurate

Chebyshev is less accurate with only selecting either x or y.

Heuristic_0 is the worse one

DFS is greedy it reaches its goal quicker than orders in the test cases for 2 and 3.

completeness: DFS is the only one NOT-Complete as it will fall info a loop and never reach the goal. All other are bases on BFS and meet completeness.

optimal: BFS and A* are optimal.

space complexity: DFS is much lower than A*. A* is better than BFS. DFS: O(b * m),
BFS: O(b ^ d),
A* is similar to BFS but it is better.

time comlexity:
DFS > BFS > A*
DFS: O(bm),
BFS: O(bd).

The farther the goal from the bottom level is the quicker that BFS can reach its goal.
A* like BFS, but heumistic helps save it time.