Modify your code for uniform-cost search from Homework 1 so that it provides optionally as output the number of nodes expanded in completing the search.

Include a new optional logical (True/False) argument return_nexp, so your function calls to the new uniform cost search will look like: uniform_cost(start, goal, state_graph, return_cost,return_nexp).

• If return_nexp is True, then the last output in the output tuple should be the number of nodes expanded.

• If return_nexp is False, then the code should behave exactly as it did in Homework 1.

Then, verify that your revised codes are working by checking Neal’s optimal route from New York to Chicago. Include the number of nodes expanded and the path cost (using map_distances).

In [None]:
import numpy as np
import heapq
import unittest

def path(previous, s): 
    '''
    `previous` is a dictionary chaining together the predecessor state that led to each state
    `s` will be None for the initial state
    otherwise, start from the last state `s` and recursively trace `previous` back to the initial state,
    constructing a list of states visited as we go
    '''
    if s is None:
        return []
    else:
        return path(previous, previous[s])+[s]

def pathcost(path, step_costs):
    '''
    add up the step costs along a path, which is assumed to be a list output from the `path` function above
    '''
    cost = 0
    for s in range(len(path)-1):
        cost += step_costs[path[s]][path[s+1]]
    return cost

map_distances = dict(
    chi=dict(det=283, cle=345, ind=182),
    cle=dict(chi=345, det=169, col=144, pit=134, buf=189),
    ind=dict(chi=182, col=176),
    col=dict(ind=176, cle=144, pit=185),
    det=dict(chi=283, cle=169, buf=256),
    buf=dict(det=256, cle=189, pit=215, syr=150),
    pit=dict(col=185, cle=134, buf=215, phi=305, bal=247),
    syr=dict(buf=150, phi=253, new=254, bos=312),
    bal=dict(phi=101, pit=247),
    phi=dict(pit=305, bal=101, syr=253, new=97),
    new=dict(syr=254, phi=97, bos=215, pro=181),
    pro=dict(bos=50, new=181),
    bos=dict(pro=50, new=215, syr=312, por=107),
    por=dict(bos=107))

sld_providence = dict(
    chi=833,
    cle=531,
    ind=782,
    col=618,
    det=596,
    buf=385,
    pit=458,
    syr=253,
    bal=325,
    phi=236,
    new=157,
    pro=0,
    bos=38,
    por=136)

# Solution:

class Frontier_PQ:
    ''' frontier class for uniform search, ordered by path cost '''
    ''' frontier class for uniform search, ordered by path cost '''
    ''' represents the frontier (priority queue)
    priority queue uses a binary heap 
    Heap's root has element with min for min heap 
    instantiation args of Frontier_PQ:
    start is the initial state 
    cost is the initial path cost ''' 
    def __init__(self, start, cost):
      #print(start)
      #print(cost)
      # states maintains a dictionary 
      # of states on frontier
      # along with minimum path cost to arrive at them
      # meant to keep track of lowest-cost to each state 
      self.states = {}
      # add cost of start node 
      self.states[start] = cost
      # q is a list of (cost, state) tuples
      # representing elements on the frontier 
      # should be treated as priority queue 
      # appropiately initialize starting state and cost
      self.q = []
      self.q.append((cost, start))
    
    # adds the (cost, state) tuple to the frontier 
    def add(self, state, cost_path):
      # heappush pushes item onto heap 
      # maintaining the heap invariant 
      heapq.heappush(self.q, (cost_path, state))
      #print(self.q)

    # return the lowest-cost (cost, state) tuple
    # pop it off the frontier 
    def pop(self):
      # heappop pops the smallest item off heap 
      # maintaining the heap invariant 
      return heapq.heappop(self.q)
    
    # if you find a lower-cost path 
    # to a state already on the frontier 
    # replace it using this method 
    def replace(self, state, cost):
    # lookup the old min path cost 
      old_cost = self.states[state]
      #print(old_cost)
      #print(self.q)
      # find the index of the (old_cost, state)
      # tuple in the priority queue 
      index = self.q.index((old_cost, state))
      #print(index)
      # update (cost, state) tuple 
      self.q[index] = (cost, state)
      #print(self.q)
      # make sure that min heap property is maintained
      # if a cost is greater than the cost of the parent
      # the nodes need to be switched 
      while index >= 1 and self.q[index-1][0] > self.q[index][0]:   
        # switch parent and child nodes 
        temp = self.q[index]
        self.q[index] = self.q[index-1]
        self.q[index-1] = temp 
        index -= 1 

    # function to check if PQ is empty 
    def is_empty(self):
      return len(self.q) == 0 



def uniform_cost(start, goal, state_graph, return_cost=False, return_nexp=True):
    """
    takes start and goal tuples with
    state and cost, a graph and return cost 
    returns a solution tuple or tuple 
    return_cost if set to True
    """ 
    # initialize frontier, 
    # a priority queue ordered by path cost  
    # with current as element 
    frontier = Frontier_PQ(start, 0)
    how_node_reached = {start:None}
    # keep looping while the frontier has nodes in it 
    # initialize accumulator to keep track of number expanded
    nexp = 0
    while not frontier.is_empty():
      # pop the node with the shortest path
      # off of the priority queue 
      node = frontier.pop()
      nexp += 1  
      # print(node)
      # do a check to see if we are at goal state 
      # Stop when we reach the goal
      current_state = node[1]
      if current_state == goal:
        # Found it!
        # print(how_node_reached)
        node_path = path(how_node_reached, current_state)
        # if return_cost is True 
        # output should be tuple 
        # first value is list of solution path
        # second value is path cost 
        if return_cost: 
          path_cost = pathcost(node_path, state_graph)
          if return_nexp:
            return node_path, path_cost, nexp
          else:
            return node_path, path_cost  
        # return_cost is False 
        # only output is solution path list object 
        if return_nexp:
          return node_path, nexp 
        else: 
          return node_path  
    
      # iterate over all children of current node 
      for child in state_graph[current_state]:
        # if child hasn't been reached already
        # or if the shortest path to the current node 
        # plus the distance from current node to the child is less than current shortest path in states
        # we found a new shortest path and need to update
        updated_path = frontier.states[current_state] + state_graph[current_state][child]
        if child not in frontier.states or updated_path < frontier.states[child]:
          # if child hasn't been reached yet,
          # add it to PQ 
          if child not in frontier.states: 
            frontier.add(child, updated_path)
          else: 
            frontier.replace(child, updated_path)
          # update the states dictionary 
          # with lowest cost path 
          frontier.states[child] = updated_path
          # Keep track of how we got to this node
          how_node_reached[child] = current_state
    
    
    node_path = path(how_node_reached, goal)
    if return_cost: 
      path_cost = pathcost(node_path, state_graph)
    return node_path, path_cost
    # return_cost is False 
    # only output is solution path list object 
    return node_path  


Define a function to take as an argument the state that Neal is in (city on our graphs), and return as output the value of the straight-line distance heuristic, between Neal’s state and Providence.

Note that your function should be quite short, and amounts to looking up the proper value from the sld_providence dictionary defined in the helper functions. Call this function heuristic_sld_providence.

In [None]:
sld_providence = dict(
    chi=833,
    cle=531,
    ind=782,
    col=618,
    det=596,
    buf=385,
    pit=458,
    syr=253,
    bal=325,
    phi=236,
    new=157,
    pro=0,
    bos=38,
    por=136)
def heuristic_sld_providence(state):
    """ 
    input is state Neal is in 
    output is straight-line distance heuristic 
    between where Neal is and Providence 
    """
    return sld_providence[state]

## A* Search
We are finally ready to help Neal use his knowledge of straight-line distances from various cities to Providence to inform his family’s route to move from Chicago to Providence!

Modify your uniform-cost search codes from 1.1 even further so that they now perform A* search, using as the heuristic function the straight-line distance to Providence.

Provide heuristic as an additional argument, which should just be the function to call within the A* code. So your call to the A routine should look like: astar_search(start, goal, state_graph,

heuristic, return_cost, return_nexp). (This kind of modular programming will make it much easier to swap in alternative heuristic functions later, and also helps to facilitate debugging if something goes wrong.)

In [None]:
import numpy as np
import heapq
import unittest

def path(previous, s): 
    '''
    `previous` is a dictionary chaining together the predecessor state that led to each state
    `s` will be None for the initial state
    otherwise, start from the last state `s` and recursively trace `previous` back to the initial state,
    constructing a list of states visited as we go
    '''
    if s is None:
        return []
    else:
        return path(previous, previous[s])+[s]

def pathcost(path, step_costs):
    '''
    add up the step costs along a path, which is assumed to be a list output from the `path` function above
    '''
    cost = 0
    for s in range(len(path)-1):
        cost += step_costs[path[s]][path[s+1]]
    return cost

map_distances = dict(
    chi=dict(det=283, cle=345, ind=182),
    cle=dict(chi=345, det=169, col=144, pit=134, buf=189),
    ind=dict(chi=182, col=176),
    col=dict(ind=176, cle=144, pit=185),
    det=dict(chi=283, cle=169, buf=256),
    buf=dict(det=256, cle=189, pit=215, syr=150),
    pit=dict(col=185, cle=134, buf=215, phi=305, bal=247),
    syr=dict(buf=150, phi=253, new=254, bos=312),
    bal=dict(phi=101, pit=247),
    phi=dict(pit=305, bal=101, syr=253, new=97),
    new=dict(syr=254, phi=97, bos=215, pro=181),
    pro=dict(bos=50, new=181),
    bos=dict(pro=50, new=215, syr=312, por=107),
    por=dict(bos=107))

sld_providence = dict(
    chi=833,
    cle=531,
    ind=782,
    col=618,
    det=596,
    buf=385,
    pit=458,
    syr=253,
    bal=325,
    phi=236,
    new=157,
    pro=0,
    bos=38,
    por=136)

# Solution:

def heuristic_sld_providence(state):
    return sld_providence[state]

class Frontier_PQ:
    ''' frontier class for uniform search, ordered by path cost '''
    ''' represents the frontier (priority queue)
    priority queue uses a binary heap 
    Heap's root has element with min for min heap 
    instantiation args of Frontier_PQ:
    start is the initial state 
    cost is the initial path cost ''' 
    def __init__(self, start, cost):
      #print(start)
      #print(cost)
      # states maintains a dictionary 
      # of states on frontier
      # along with minimum path cost to arrive at them
      # meant to keep track of lowest-cost to each state 
      self.states = {}
      # add cost of start node 
      self.states[start] = cost
      # q is a list of (cost, state) tuples
      # representing elements on the frontier 
      # should be treated as priority queue 
      # appropiately initialize starting state and cost
      self.q = []
      self.q.append((cost, start))
    
    # adds the (cost, state) tuple to the frontier 
    def add(self, state, cost_path):
      # heappush pushes item onto heap 
      # maintaining the heap invariant 
      heapq.heappush(self.q, (cost_path, state))
      #print(self.q)

    # return the lowest-cost (cost, state) tuple
    # pop it off the frontier 
    def pop(self):
      # heappop pops the smallest item off heap 
      # maintaining the heap invariant 
      return heapq.heappop(self.q)
    
    # if you find a lower-cost path 
    # to a state already on the frontier 
    # replace it using this method 
    def replace(self, state, cost):
    # lookup the old min path cost 
      old_cost = self.states[state]
      #print(old_cost)
      #print(self.q)
      # find the index of the (old_cost, state)
      # tuple in the priority queue 
      index = self.q.index((old_cost, state))
      #print(index)
      # update (cost, state) tuple 
      self.q[index] = (cost, state)
      #print(self.q)
      # make sure that min heap property is maintained
      # if a cost is greater than the cost of the parent
      # the nodes need to be switched 
      while index >= 1 and self.q[index-1][0] > self.q[index][0]:   
        # switch parent and child nodes 
        temp = self.q[index]
        self.q[index] = self.q[index-1]
        self.q[index-1] = temp 
        index -= 1 

    # function to check if PQ is empty 
    def is_empty(self):
      return len(self.q) == 0
    
    
# Solution:

def astar_search(start, goal, state_graph, heuristic, return_cost=False, return_nexp=False):
    '''A* search from `start` to `goal`
    start = initial state
    goal = goal state
    heuristic = function for estimated cost to goal (function name)
    return_cost = logical (True/False) for whether or not to return the total path cost
    return_nexp = logical (True/False) for whether or not to return the number of nodes expanded
    '''
    '''A* search from `start` to `goal`
    start = initial state
    goal = goal state
    heuristic = function for estimated cost to goal (function name)
    return_cost = logical (True/False) for whether or not to return the total path cost
    return_nexp = logical (True/False) for whether or not to return the number of nodes expanded
    ''' 
    # initialize frontier, 
    # a priority queue ordered by path cost  
    # with current as element 
    frontier = Frontier_PQ(start, 0)
    how_node_reached = {start:None}
    # keep looping while the frontier has nodes in it 
    # initialize accumulator to keep track of number expanded
    nexp = 0
    while not frontier.is_empty():
      # pop the node with the shortest path
      # off of the priority queue 
      node = frontier.pop()
      
      nexp += 1  
      # print(node)
      # do a check to see if we are at goal state 
      # Stop when we reach the goal
      current_state = node[1]
      
      if current_state == goal:
        # Found it!
        # print(how_node_reached)
        node_path = path(how_node_reached, current_state)
        # if return_cost is True 
        # output should be tuple 
        # first value is list of solution path
        # second value is path cost 
        if return_cost: 
          path_cost = pathcost(node_path, state_graph)
          if return_nexp:
            return node_path, path_cost, nexp
          else:
            return node_path, path_cost  
        # return_cost is False 
        # only output is solution path list object 
        if return_nexp:
          return node_path, nexp 
        else: 
          return node_path  
    
      # iterate over all children of current node 
      for child in state_graph[current_state]:
        # if child hasn't been reached already
        # or if the shortest path to the current node 
        # plus the distance from current node to the child is less than current shortest path in states
      
        #print(heuristic_val)
        # we found a new shortest path and need to update
        updated_path = frontier.states[current_state] + state_graph[current_state][child] 
        # find the heuristic value 
        heuristic_val = heuristic(child)
        # add the lowest cost path to the heuristic 
        new_f_val = updated_path + heuristic_val 
        if child not in frontier.states or new_f_val < frontier.states[child]:
          # if child hasn't been reached yet,
          # add it to PQ 
          if child not in frontier.states: 
            frontier.add(child, new_f_val)
          else: 
            frontier.replace(child, new_f_val)
          # update the states dictionary 
          # with lowest cost path 
          frontier.states[child] = updated_path
          # Keep track of how we got to this node
          how_node_reached[child] = current_state
    
    
    node_path = path(how_node_reached, goal)
    if return_cost: 
      path_cost = pathcost(node_path, state_graph)
    return node_path, path_cost
    # return_cost is False 
    # only output is solution path list object 
    return node_path          



Print the the following using your code:

1. the optimal path

2. the optimal path cost (miles traveled)

3. the number of states expanded during the A* search

Additionally, print how many states must be expanded to find the optimal path from Buffalo to Providence using the regular old uniform-cost search algorithm from 1.1

In [None]:
import numpy as np
import heapq
import unittest

def path(previous, s): 
    '''
    `previous` is a dictionary chaining together the predecessor state that led to each state
    `s` will be None for the initial state
    otherwise, start from the last state `s` and recursively trace `previous` back to the initial state,
    constructing a list of states visited as we go
    '''
    if s is None:
        return []
    else:
        return path(previous, previous[s])+[s]

def pathcost(path, step_costs):
    '''
    add up the step costs along a path, which is assumed to be a list output from the `path` function above
    '''
    cost = 0
    for s in range(len(path)-1):
        cost += step_costs[path[s]][path[s+1]]
    return cost

map_distances = dict(
    chi=dict(det=283, cle=345, ind=182),
    cle=dict(chi=345, det=169, col=144, pit=134, buf=189),
    ind=dict(chi=182, col=176),
    col=dict(ind=176, cle=144, pit=185),
    det=dict(chi=283, cle=169, buf=256),
    buf=dict(det=256, cle=189, pit=215, syr=150),
    pit=dict(col=185, cle=134, buf=215, phi=305, bal=247),
    syr=dict(buf=150, phi=253, new=254, bos=312),
    bal=dict(phi=101, pit=247),
    phi=dict(pit=305, bal=101, syr=253, new=97),
    new=dict(syr=254, phi=97, bos=215, pro=181),
    pro=dict(bos=50, new=181),
    bos=dict(pro=50, new=215, syr=312, por=107),
    por=dict(bos=107))

sld_providence = dict(
    chi=833,
    cle=531,
    ind=782,
    col=618,
    det=596,
    buf=385,
    pit=458,
    syr=253,
    bal=325,
    phi=236,
    new=157,
    pro=0,
    bos=38,
    por=136)

# Solution:

def heuristic_sld_providence(state):
  """ 
  input is state Neal is in 
  output is straight-line distance heuristic 
  between where Neal is and Providence 
  """
  return sld_providence[state] 

class Frontier_PQ:
    ''' frontier class for uniform search, ordered by path cost '''
    ''' represents the frontier (priority queue)
    priority queue uses a binary heap 
    Heap's root has element with min for min heap 
    instantiation args of Frontier_PQ:
    start is the initial state 
    cost is the initial path cost ''' 
    def __init__(self, start, cost):
      #print(start)
      #print(cost)
      # states maintains a dictionary 
      # of states on frontier
      # along with minimum path cost to arrive at them
      # meant to keep track of lowest-cost to each state 
      self.states = {}
      # add cost of start node 
      self.states[start] = cost
      # q is a list of (cost, state) tuples
      # representing elements on the frontier 
      # should be treated as priority queue 
      # appropiately initialize starting state and cost
      self.q = []
      self.q.append((cost, start))
    
    # adds the (cost, state) tuple to the frontier 
    def add(self, state, cost_path):
      # heappush pushes item onto heap 
      # maintaining the heap invariant 
      heapq.heappush(self.q, (cost_path, state))
      #print(self.q)

    # return the lowest-cost (cost, state) tuple
    # pop it off the frontier 
    def pop(self):
      # heappop pops the smallest item off heap 
      # maintaining the heap invariant 
      return heapq.heappop(self.q)
    
    # if you find a lower-cost path 
    # to a state already on the frontier 
    # replace it using this method 
    def replace(self, state, cost):
    # lookup the old min path cost 
      old_cost = self.states[state]
      #print(old_cost)
      #print(self.q)
      # find the index of the (old_cost, state)
      # tuple in the priority queue 
      index = self.q.index((old_cost, state))
      #print(index)
      # update (cost, state) tuple 
      self.q[index] = (cost, state)
      #print(self.q)
      # make sure that min heap property is maintained
      # if a cost is greater than the cost of the parent
      # the nodes need to be switched 
      while index >= 1 and self.q[index-1][0] > self.q[index][0]:   
        # switch parent and child nodes 
        temp = self.q[index]
        self.q[index] = self.q[index-1]
        self.q[index-1] = temp 
        index -= 1 

    # function to check if PQ is empty 
    def is_empty(self):
      return len(self.q) == 0
    
     
# Solution:

def astar_search(start, goal, state_graph, heuristic, return_cost=False, return_nexp=False):
    '''A* search from `start` to `goal`'''
    '''A* search from `start` to `goal`
    start = initial state
    goal = goal state
    heuristic = function for estimated cost to goal (function name)
    return_cost = logical (True/False) for whether or not to return the total path cost
    return_nexp = logical (True/False) for whether or not to return the number of nodes expanded
    ''' 
    # initialize frontier, 
    # a priority queue ordered by path cost  
    # with current as element 
    frontier = Frontier_PQ(start, 0)
    how_node_reached = {start:None}
    # keep looping while the frontier has nodes in it 
    # initialize accumulator to keep track of number expanded
    nexp = 0
    while not frontier.is_empty():
      # pop the node with the shortest path
      # off of the priority queue 
      node = frontier.pop()
      
      nexp += 1  
      # print(node)
      # do a check to see if we are at goal state 
      # Stop when we reach the goal
      current_state = node[1]
      
      if current_state == goal:
        # Found it!
        # print(how_node_reached)
        node_path = path(how_node_reached, current_state)
        # if return_cost is True 
        # output should be tuple 
        # first value is list of solution path
        # second value is path cost 
        if return_cost: 
          path_cost = pathcost(node_path, state_graph)
          if return_nexp:
            return node_path, path_cost, nexp
          else:
            return node_path, path_cost  
        # return_cost is False 
        # only output is solution path list object 
        if return_nexp:
          return node_path, nexp 
        else: 
          return node_path  
    
      # iterate over all children of current node 
      for child in state_graph[current_state]:
        # if child hasn't been reached already
        # or if the shortest path to the current node 
        # plus the distance from current node to the child is less than current shortest path in states
      
        #print(heuristic_val)
        # we found a new shortest path and need to update
        updated_path = frontier.states[current_state] + state_graph[current_state][child] 
        # find the heuristic value 
        heuristic_val = heuristic(child)
        # add the lowest cost path to the heuristic 
        new_f_val = updated_path + heuristic_val 
        if child not in frontier.states or new_f_val < frontier.states[child]:
          # if child hasn't been reached yet,
          # add it to PQ 
          if child not in frontier.states: 
            frontier.add(child, new_f_val)
          else: 
            frontier.replace(child, new_f_val)
          # update the states dictionary 
          # with lowest cost path 
          frontier.states[child] = updated_path
          # Keep track of how we got to this node
          how_node_reached[child] = current_state
    
    
    node_path = path(how_node_reached, goal)
    if return_cost: 
      path_cost = pathcost(node_path, state_graph)
    return node_path, path_cost
    # return_cost is False 
    # only output is solution path list object 
    return node_path          
 
    

def uniform_cost(start, goal, state_graph, return_cost=False, return_nexp=True):
  # initialize frontier, 
    # a priority queue ordered by path cost  
    # with current as element 
    frontier = Frontier_PQ(start, 0)
    how_node_reached = {start:None}
    # keep looping while the frontier has nodes in it 
    # initialize accumulator to keep track of number expanded
    nexp = 0
    while not frontier.is_empty():
      # pop the node with the shortest path
      # off of the priority queue 
      node = frontier.pop()
      nexp += 1  
      # print(node)
      # do a check to see if we are at goal state 
      # Stop when we reach the goal
      current_state = node[1]
      if current_state == goal:
        # Found it!
        # print(how_node_reached)
        node_path = path(how_node_reached, current_state)
        # if return_cost is True 
        # output should be tuple 
        # first value is list of solution path
        # second value is path cost 
        if return_cost: 
          path_cost = pathcost(node_path, state_graph)
          if return_nexp:
            return node_path, path_cost, nexp
          else:
            return node_path, path_cost  
        # return_cost is False 
        # only output is solution path list object 
        if return_nexp:
          return node_path, nexp 
        else: 
          return node_path  
    
      # iterate over all children of current node 
      for child in state_graph[current_state]:
        # if child hasn't been reached already
        # or if the shortest path to the current node 
        # plus the distance from current node to the child is less than current shortest path in states
        # we found a new shortest path and need to update
        updated_path = frontier.states[current_state] + state_graph[current_state][child]
        if child not in frontier.states or updated_path < frontier.states[child]:
          # if child hasn't been reached yet,
          # add it to PQ 
          if child not in frontier.states: 
            frontier.add(child, updated_path)
          else: 
            frontier.replace(child, updated_path)
          # update the states dictionary 
          # with lowest cost path 
          frontier.states[child] = updated_path
          # Keep track of how we got to this node
          how_node_reached[child] = current_state
    
    
    node_path = path(how_node_reached, goal)
    if return_cost: 
      path_cost = pathcost(node_path, state_graph)
    return node_path, path_cost
    # return_cost is False 
    # only output is solution path list object 
    return node_path  
 
    

Comment on the difference in states that must be explored by each algorithm. Sanity check: No matter what your start and goal states are, how should the output from astar_search and  uniform_cost search compare?

In [None]:
"""
UCS is uninformed. It orders states to explore
by the lowest path cost, the backward cost g(n).
A* search is informed. It orders states to explore by 
f(n) = g(n) + h(n), which is the backward cost g(n) + the goal proximity
forward cost, h(n). Running A* with a constant heuristic is the same as running UCS. 
UCS guarantees that an optimal shortest-cost path will be returned as output.
If the heuristics for A* are set correctly, A* will also return 
the optimal, shortest-cost path. Running A* with an admissible heuristic 
for tree search or a consistent heuristic for graph search will guarantee
that it will return the optimal path. 
In question 4, we saw that with A* using a heuristic that was the 
straightline distance from a city to the goal city,
UCS and A* both returned the shortest cost path of 'buf', 'syr', 
'bos', 'pro' with a total distance of 512. Straight line distance is 
a good heuristic to use particularly for maps because it is a relaxed 
version of the problem, the shortest distance from one point to another if we didn't have to worry about roads.
For UCS, the shortest cost path so far might not be the lowest cost path
overall so the algorithm might explore a number of different directions.
On the other hand, a well guided heuristic can help A* find the optimal
path more efficiently. In question 4 above, A* expanded only 4 nodes to
complete the search,the cities on the optimal path. 
On the other hand, UCS expanded 12 nodes. 

"""

How many states are expanded by each of A*search and uniform cost search to find the optimal path from Philadelphia to Providence?

In [None]:
import numpy as np
import heapq
import unittest

def path(previous, s): 
    '''
    `previous` is a dictionary chaining together the predecessor state that led to each state
    `s` will be None for the initial state
    otherwise, start from the last state `s` and recursively trace `previous` back to the initial state,
    constructing a list of states visited as we go
    '''
    if s is None:
        return []
    else:
        return path(previous, previous[s])+[s]

def pathcost(path, step_costs):
    '''
    add up the step costs along a path, which is assumed to be a list output from the `path` function above
    '''
    cost = 0
    for s in range(len(path)-1):
        cost += step_costs[path[s]][path[s+1]]
    return cost

map_distances = dict(
    chi=dict(det=283, cle=345, ind=182),
    cle=dict(chi=345, det=169, col=144, pit=134, buf=189),
    ind=dict(chi=182, col=176),
    col=dict(ind=176, cle=144, pit=185),
    det=dict(chi=283, cle=169, buf=256),
    buf=dict(det=256, cle=189, pit=215, syr=150),
    pit=dict(col=185, cle=134, buf=215, phi=305, bal=247),
    syr=dict(buf=150, phi=253, new=254, bos=312),
    bal=dict(phi=101, pit=247),
    phi=dict(pit=305, bal=101, syr=253, new=97),
    new=dict(syr=254, phi=97, bos=215, pro=181),
    pro=dict(bos=50, new=181),
    bos=dict(pro=50, new=215, syr=312, por=107),
    por=dict(bos=107))

sld_providence = dict(
    chi=833,
    cle=531,
    ind=782,
    col=618,
    det=596,
    buf=385,
    pit=458,
    syr=253,
    bal=325,
    phi=236,
    new=157,
    pro=0,
    bos=38,
    por=136)

# Solution:

def heuristic_sld_providence(state):
  """ 
  input is state Neal is in 
  output is straight-line distance heuristic 
  between where Neal is and Providence 
  """
  return sld_providence[state] 

class Frontier_PQ:
    ''' frontier class for uniform search, ordered by path cost '''
    ''' represents the frontier (priority queue)
    priority queue uses a binary heap 
    Heap's root has element with min for min heap 
    instantiation args of Frontier_PQ:
    start is the initial state 
    cost is the initial path cost ''' 
    def __init__(self, start, cost):
      #print(start)
      #print(cost)
      # states maintains a dictionary 
      # of states on frontier
      # along with minimum path cost to arrive at them
      # meant to keep track of lowest-cost to each state 
      self.states = {}
      # add cost of start node 
      self.states[start] = cost
      # q is a list of (cost, state) tuples
      # representing elements on the frontier 
      # should be treated as priority queue 
      # appropiately initialize starting state and cost
      self.q = []
      self.q.append((cost, start))
    
    # adds the (cost, state) tuple to the frontier 
    def add(self, state, cost_path):
      # heappush pushes item onto heap 
      # maintaining the heap invariant 
      heapq.heappush(self.q, (cost_path, state))
      #print(self.q)

    # return the lowest-cost (cost, state) tuple
    # pop it off the frontier 
    def pop(self):
      # heappop pops the smallest item off heap 
      # maintaining the heap invariant 
      return heapq.heappop(self.q)
    
    # if you find a lower-cost path 
    # to a state already on the frontier 
    # replace it using this method 
    def replace(self, state, cost):
    # lookup the old min path cost 
      old_cost = self.states[state]
      #print(old_cost)
      #print(self.q)
      # find the index of the (old_cost, state)
      # tuple in the priority queue 
      index = self.q.index((old_cost, state))
      #print(index)
      # update (cost, state) tuple 
      self.q[index] = (cost, state)
      #print(self.q)
      # make sure that min heap property is maintained
      # if a cost is greater than the cost of the parent
      # the nodes need to be switched 
      while index >= 1 and self.q[index-1][0] > self.q[index][0]:   
        # switch parent and child nodes 
        temp = self.q[index]
        self.q[index] = self.q[index-1]
        self.q[index-1] = temp 
        index -= 1 

    # function to check if PQ is empty 
    def is_empty(self):
      return len(self.q) == 0
    
     
# Solution:

def astar_search(start, goal, state_graph, heuristic, return_cost=False, return_nexp=False):
    '''A* search from `start` to `goal`'''
    '''A* search from `start` to `goal`
    start = initial state
    goal = goal state
    heuristic = function for estimated cost to goal (function name)
    return_cost = logical (True/False) for whether or not to return the total path cost
    return_nexp = logical (True/False) for whether or not to return the number of nodes expanded
    ''' 
    # initialize frontier, 
    # a priority queue ordered by path cost  
    # with current as element 
    frontier = Frontier_PQ(start, 0)
    how_node_reached = {start:None}
    # keep looping while the frontier has nodes in it 
    # initialize accumulator to keep track of number expanded
    nexp = 0
    while not frontier.is_empty():
      # pop the node with the shortest path
      # off of the priority queue 
      node = frontier.pop()
      
      nexp += 1  
      # print(node)
      # do a check to see if we are at goal state 
      # Stop when we reach the goal
      current_state = node[1]
      
      if current_state == goal:
        # Found it!
        # print(how_node_reached)
        node_path = path(how_node_reached, current_state)
        # if return_cost is True 
        # output should be tuple 
        # first value is list of solution path
        # second value is path cost 
        if return_cost: 
          path_cost = pathcost(node_path, state_graph)
          if return_nexp:
            return node_path, path_cost, nexp
          else:
            return node_path, path_cost  
        # return_cost is False 
        # only output is solution path list object 
        if return_nexp:
          return node_path, nexp 
        else: 
          return node_path  
    
      # iterate over all children of current node 
      for child in state_graph[current_state]:
        # if child hasn't been reached already
        # or if the shortest path to the current node 
        # plus the distance from current node to the child is less than current shortest path in states
      
        #print(heuristic_val)
        # we found a new shortest path and need to update
        updated_path = frontier.states[current_state] + state_graph[current_state][child] 
        # find the heuristic value 
        heuristic_val = heuristic(child)
        # add the lowest cost path to the heuristic 
        new_f_val = updated_path + heuristic_val 
        if child not in frontier.states or new_f_val < frontier.states[child]:
          # if child hasn't been reached yet,
          # add it to PQ 
          if child not in frontier.states: 
            frontier.add(child, new_f_val)
          else: 
            frontier.replace(child, new_f_val)
          # update the states dictionary 
          # with lowest cost path 
          frontier.states[child] = updated_path
          # Keep track of how we got to this node
          how_node_reached[child] = current_state
    
    
    node_path = path(how_node_reached, goal)
    if return_cost: 
      path_cost = pathcost(node_path, state_graph)
    return node_path, path_cost
    # return_cost is False 
    # only output is solution path list object 
    return node_path          
 
    

def uniform_cost(start, goal, state_graph, return_cost=False, return_nexp=True):
  # initialize frontier, 
    # a priority queue ordered by path cost  
    # with current as element 
    frontier = Frontier_PQ(start, 0)
    how_node_reached = {start:None}
    # keep looping while the frontier has nodes in it 
    # initialize accumulator to keep track of number expanded
    nexp = 0
    while not frontier.is_empty():
      # pop the node with the shortest path
      # off of the priority queue 
      node = frontier.pop()
      nexp += 1  
      # print(node)
      # do a check to see if we are at goal state 
      # Stop when we reach the goal
      current_state = node[1]
      if current_state == goal:
        # Found it!
        # print(how_node_reached)
        node_path = path(how_node_reached, current_state)
        # if return_cost is True 
        # output should be tuple 
        # first value is list of solution path
        # second value is path cost 
        if return_cost: 
          path_cost = pathcost(node_path, state_graph)
          if return_nexp:
            return node_path, path_cost, nexp
          else:
            return node_path, path_cost  
        # return_cost is False 
        # only output is solution path list object 
        if return_nexp:
          return node_path, nexp 
        else: 
          return node_path  
    
      # iterate over all children of current node 
      for child in state_graph[current_state]:
        # if child hasn't been reached already
        # or if the shortest path to the current node 
        # plus the distance from current node to the child is less than current shortest path in states
        # we found a new shortest path and need to update
        updated_path = frontier.states[current_state] + state_graph[current_state][child]
        if child not in frontier.states or updated_path < frontier.states[child]:
          # if child hasn't been reached yet,
          # add it to PQ 
          if child not in frontier.states: 
            frontier.add(child, updated_path)
          else: 
            frontier.replace(child, updated_path)
          # update the states dictionary 
          # with lowest cost path 
          frontier.states[child] = updated_path
          # Keep track of how we got to this node
          how_node_reached[child] = current_state
    
    
    node_path = path(how_node_reached, goal)
    if return_cost: 
      path_cost = pathcost(node_path, state_graph)
    return node_path, path_cost
    # return_cost is False 
    # only output is solution path list object 
    return node_path  
 
    

    

## CSP Class 

In [None]:
from collections import OrderedDict

canada = OrderedDict(
    [("AB"  , ["BC","NT","SK"]),
    ("BC" , ["AB","NT","YT"]),
    ("LB" , ["NF", "NS", "PE","QC"]),
    ("MB" , ["ON","NV","SK"]),
    ("NB" , ["NS","QC"]),
    ("NF" , ["LB","QC"]),
    ("NS" , ["LB","NB","PE"]),
    ("NT" , ["AB","BC","NV","SK","YT"]),
    ("NV" , ["MB","NT"]),
    ("ON" , ["MB","QC"]),
    ("PE" , ["LB","NS","QC"]),
    ("QC" , ["LB","NB","NF","ON","PE"]),  
    ("SK" , ["AB","MB","NT"]),
    ("YT" , ["BC","NT"])])
    
states = ["AB", "BC", "LB", "MB", "NB", "NF", "NS", "NT", "NV", "ON", "PE", "QC", "SK", "YT"]
colors = ["blue", "green", "red"]

class CSP:
    # your code here#
    def __init__(self, neighbors, variables, domain):
        # neighbors is a dictionary 
        # for each variable,
        # it lists the other variables that cannot have the same value
        self.neighbors = neighbors
        # vars is a list of variables 
        # must have values from list domain assigned to them
        self.variables = variables 
        # domain is a list 
        # values in domain will be assigned to the vars 
        self.domain = domain 

cspObj = CSP(states,canada,colors)

# print(cspObj.neighbors)
# print(cspObj.variables)
# print(cspObj.domain)