# Search


This tutorial is motivated by the awesome lecture of Dorsa Sadigh and Percy Liang [Stanford CS221: Artificial Intelligence: Principles and Techniques | Autumn 2019](
https://www.youtube.com/playlist?list=PLoROMvodv4rO1NB9TD4iUZ3qghGEGtqNX).

## Motivating Example :Transportation Problem
**Problem Setup:**
1. Street with blocks numbered 1 to n
2. Walking from s to s+1 takes 1 minute.
3. Taking a magic tram from s to 2s takes 2 minutes.
**Question: How to go  from 1 to n in the least time ?**

In [1]:
from typing import List,Tuple
import sys
sys.setrecursionlimit(10000)

## Implementation of Problem Setup
class TransportationProblem:
    def __init__(self, N):
        # N = number of blocks
        self.N = N
    def startState(self)-> int:
        return 1
    def isEnd(self, state)-> bool:
        return state == self.N
    def succAndCost(self, state)->List[Tuple]:
        #(action, newState, cost) triples
        result = []
        if state+1<=self.N:
            result.append(('walk', state+1, 1))
        if state*2<=self.N:
            result.append(('tram', state*2, 2))
        return result
    
problem = TransportationProblem(N=10)
print(problem.succAndCost(1))  # [('walk', 2, 1), ('tram', 2, 2)]
print(problem.succAndCost(2))  # [('walk', 3, 1), ('tram', 4, 2)]
print(problem.succAndCost(3))  # [('walk', 4, 1), ('tram', 6, 2)]
print(problem.succAndCost(10)) # []

[('walk', 2, 1), ('tram', 2, 2)]
[('walk', 3, 1), ('tram', 4, 2)]
[('walk', 4, 1), ('tram', 6, 2)]
[]


### Setup

Search space is considered as a search tree with $b$ branching factor and $D$ depth

## 1. Backtracking Search

#### Time Compleixty $O(b^D)$

#### Space Compleixty $O(D)$
Remember only the best solution

In [2]:
def backtrackingSearch(problem):
    best = { 'cost': float('+inf'), 'history': None }    
    def recurse(state, history, totalCost):
        if problem.isEnd(state):
            if totalCost<best['cost']:
                best['cost'] = totalCost
                best['history'] = history
            return
        for action, newState, cost in problem.succAndCost(state):
            recurse(newState, history+[(action, newState, cost)], totalCost+cost)
    
    recurse(problem.startState(), history=[], totalCost=0)
    return (best['cost'], best['history'])

In [3]:
def printSolution(solution):
    totalCost, history = solution
    print('totalCost: {}'.format(totalCost))
    for item in history:
        print(item)

In [4]:
problem = TransportationProblem(N=10)
printSolution(backtrackingSearch(problem))

totalCost: 6
('walk', 2, 1)
('walk', 3, 1)
('walk', 4, 1)
('walk', 5, 1)
('tram', 10, 2)


## 2. Depth-First Search

**Idea:** Backtracking search until you find a goal state. Hence, stop exploring three for "the right side".

**Reqirement:** Cost of reaching goal must be __0__. In practice the time compleixty is better than the backtracing search but in terms of worst time compleixty they are same


#### Time Compleixty $O(b^D)$

#### Space Compleixty $O(D)$
Remember only the best solution

## 3. Breadth-First Search

**Idea:** Explore solutions close to the root state at first.

**Reqirement:** __C>=0__ and __constant__ over edges.

#### Time Compleixty $O(b^d)$

1. $d<<D$

#### Space Compleixty $O(b^d)$


# Dynamic Programmig


__Idea:__ Backtracking search with memoization

__A state is a summary of all past actions sufficient to choose future actions optimally__.

__Requirement:__ The state graph is defined by A(s) and Succ(s,a) is acyclic.

$$
FC(s)  = \begin{cases}
0 & \text{ if } IsEnd(s)\\
min_a \{ C(s,a) + FC(s') \} & \text{otherwise} \\
\end{cases}
$$

1. $C(\cdot,\cdot)$ cost function
2. $FC(\cdot)$ minimum future cost function



In [5]:
def dynamicProgramming(problem):
    cache = {}
    id_succ={}
    def futureCost(state):
        # (1) Return zero cost for an end state
        if problem.isEnd(state):
            return 0
        # (2) Return previously computed minimum cost of a given state
        if state in cache:
            return cache[state]
        # (3) Compute current cost and future cost of each successor state
        costs=[cost+futureCost(newState) \
                for action, newState, cost in problem.succAndCost(state)]
        # (4) Mininum cost
        min_cost=min(costs)
        # (5) Store min cost and index of successor
        cache[state] = min_cost
        id_succ[state]=costs.index(min_cost)
        return min_cost
    
    optimal_future_cost=futureCost(problem.startState())
    # To Recover history
    s=problem.startState()
    path=[]
    while True:
        if problem.isEnd(s):
            break        
        next_= [(action, newState, cost) for action, newState, cost in problem.succAndCost(s)][id_succ[s]]
        path.append(next_)
        _,s,_=next_
    return optimal_future_cost,path

In [6]:
problem = TransportationProblem(N=10)
printSolution(dynamicProgramming(problem)) # optimal cost.

totalCost: 6
('walk', 2, 1)
('walk', 3, 1)
('walk', 4, 1)
('walk', 5, 1)
('tram', 10, 2)


# Uniform Cost Search

__Idea:__ UCS enumerates state in order of increasing past cost.

__Requriement:__ Cost is non-negative.


#### Time Compleixty $O( nlog(n))$

$n$ is the number of states explored . 

#### Space Compleixty $O( b^d )$


In [7]:
import heapq
from queue import PriorityQueue as PQ
# Data structure for supporting uniform cost search.
class PriorityQueueWithHeapq:
    def  __init__(self):
        self.DONE = -100000
        self.heap = []
        self.priorities = {}  # Map from state to priority

    # Insert |state| into the heap with priority |newPriority| if
    # |state| isn't in the heap or |newPriority| is smaller than the existing
    # priority.
    # Return whether the priority queue was updated.
    def update(self, state, newPriority):
        oldPriority = self.priorities.get(state)
        if oldPriority == None or newPriority < oldPriority:
            self.priorities[state] = newPriority
            heapq.heappush(self.heap, (newPriority, state))
            return True
        return False

    # Returns (state with minimum priority, priority)
    # or (None, None) if the priority queue is empty.
    def removeMin(self):
        while len(self.heap) > 0:
            priority, state = heapq.heappop(self.heap)
            if self.priorities[state] == self.DONE: continue  # Outdated priority, skip
            self.priorities[state] = self.DONE
            return (state, priority)
        return (None, None) # Nothing left...
    
class PriorityQueue:
    def __init__(self, maxsize=0):
        self.pq = PQ(maxsize)
        self.priorities = dict() # Map from state to priority
        self.DONE = -100000
            
    def update(self,state,new_cost):
        """ (1) If the cost is lower """
        old_cost = self.priorities.get(state)
        if old_cost == None or new_cost < old_cost:
            """ (1.1) Update priorty of the state """
            self.priorities[state] = new_cost
            self.pq.put((new_cost,state))
            return True
        return False
    def removeMin(self):
        """ Fetch most promissing state """
        while self.pq.qsize() > 0:
            new_cost,state = self.pq.get(timeout=1)
            if self.priorities[state] == self.DONE: 
                continue  # Outdated priority, skip
            """ Mark a state as explored """
            self.priorities[state] = self.DONE
            return new_cost,state
        return (None, None) # Nothing left...

In [8]:
def uniformCostSearch(problem):
    # Explored, Frontier, and Unexplored states
    frontier = PriorityQueue()
    frontier.update(problem.startState(),0)
    book_keeping=dict() # map from next state to predecessor

    while True:
        # Move from frontier to explored
        pastCost,state = frontier.removeMin()
        if problem.isEnd(state):
            break
        # Push out on the frontier
        for action, newState, cost in problem.succAndCost(state):
            if frontier.update(newState,pastCost+cost):
                book_keeping[newState]=state
                
    s_prime=problem.N
    state_to_state=dict()
    while True:
        s = book_keeping.get(s_prime)
        if s:
            state_to_state[s]=s_prime
            s_prime=s
        else:
            break
    del book_keeping,frontier
    s=problem.startState()
    path=[]
    while True:
        if problem.isEnd(s):
            break        
        next_= [(action, newState, cost) for action, newState, cost in problem.succAndCost(s) if state_to_state[s]==newState][0]
        path.append(next_)
        _,s,_=next_
    return (pastCost, path)

In [9]:
problem = TransportationProblem(N=10)
printSolution(uniformCostSearch(problem))

totalCost: 6
('walk', 2, 1)
('tram', 4, 2)
('walk', 5, 1)
('tram', 10, 2)


## Analysis of UCS, Theorem:correctness
__When a state $s$ is popped from the frontier and moved to explored. its priorty is PastCost(s), the minimum cost to s.__

# Learning as an inverse search problem



## Forward Problem (search)

$x=\mathbb R $ and $y=\mathbb R^k$
$$ C(s,a) \rightarrow (a_1, \dots, a_k) $$


## Inverse Problem (learning)
$x=\mathbb R^k $ and $y=\mathbb R$
$$ (a_1, \dots, a_k) \rightarrow C(s,a) $$

## Transportation Problem with Weighted Costs

In [10]:
class TransportationProblemWithWeightedCosts:
    def __init__(self, N, weights):
        # N = number of blocks
        # weights = weights of different actions
        self.N = N
        self.weights = weights
    def startState(self):
        return 1
    def isEnd(self, state):
        return state == self.N
    def succAndCost(self, state):
        # return list of (action, newState, cost) triples
        result = []
        if state+1<=self.N:
            result.append(('walk', state+1, self.weights['walk']))
        if state*2<=self.N:
            result.append(('tram', state*2, self.weights['tram']))
        return result
    
problem = TransportationProblemWithWeightedCosts(10, {'walk': 1, 'tram': 3})
print(problem.succAndCost(1))  # [('walk', 2, 1), ('tram', 2, 3)]
print(problem.succAndCost(2))  # [('walk', 3, 1), ('tram', 4, 3)]
print(problem.succAndCost(3))  # [('walk', 4, 1), ('tram', 6, 3)]
print(problem.succAndCost(10)) # []

[('walk', 2, 1), ('tram', 2, 3)]
[('walk', 3, 1), ('tram', 4, 3)]
[('walk', 4, 1), ('tram', 6, 3)]
[]


In [11]:
def predict(N, weights,f=uniformCostSearch)->List[str]:
    problem = TransportationProblemWithWeightedCosts(N, weights)
    totalCost, history = f(problem)
    return [action for action, newState, cost in history]

def generateExamples(n,f,trueWeights = {'walk': 1, 'tram': 3}):
    return [(N, predict(N, trueWeights,f)) for N in range(1, n)]

In [12]:
for n, actions in generateExamples(10,f=dynamicProgramming):
    print(n,actions)

1 []
2 ['walk']
3 ['walk', 'walk']
4 ['walk', 'walk', 'walk']
5 ['walk', 'walk', 'walk', 'walk']
6 ['walk', 'walk', 'walk', 'walk', 'walk']
7 ['walk', 'walk', 'walk', 'walk', 'walk', 'walk']
8 ['walk', 'walk', 'walk', 'tram']
9 ['walk', 'walk', 'walk', 'tram', 'walk']


In [13]:
for n,actions in generateExamples(10,f=uniformCostSearch):
    print(n,actions)

1 []
2 ['walk']
3 ['walk', 'walk']
4 ['walk', 'walk', 'walk']
5 ['walk', 'walk', 'walk', 'walk']
6 ['walk', 'walk', 'tram']
7 ['walk', 'walk', 'tram', 'walk']
8 ['walk', 'walk', 'walk', 'tram']
9 ['walk', 'walk', 'walk', 'tram', 'walk']


In [14]:
def structuredPerceptron(examples):
    weights = {'walk': 0, 'tram': 0}
    print('Dataset')
    for n in examples:
        print(n)
    for t in range(100):
        numMistakes = 0
        for N, trueActions in examples:
            # Make a prediction
            predActions = predict(N, weights)
            if predActions != trueActions:
                numMistakes += 1
            # Update weights
            for action in trueActions:
                weights[action] -= 1
            for action in predActions:
                weights[action] += 1
        print('Iteration {}, numMistakes = {}, weights = {}'.format(t, numMistakes, weights))
        if numMistakes == 0:
            print('Done!')
            break

In [15]:
structuredPerceptron(generateExamples(10,f=uniformCostSearch))

Dataset
(1, [])
(2, ['walk'])
(3, ['walk', 'walk'])
(4, ['walk', 'walk', 'walk'])
(5, ['walk', 'walk', 'walk', 'walk'])
(6, ['walk', 'walk', 'tram'])
(7, ['walk', 'walk', 'tram', 'walk'])
(8, ['walk', 'walk', 'walk', 'tram'])
(9, ['walk', 'walk', 'walk', 'tram', 'walk'])
Iteration 0, numMistakes = 4, weights = {'walk': 3, 'tram': 0}
Iteration 1, numMistakes = 5, weights = {'walk': 4, 'tram': 1}
Iteration 2, numMistakes = 4, weights = {'walk': 1, 'tram': 3}
Iteration 3, numMistakes = 0, weights = {'walk': 1, 'tram': 3}
Done!


# A* Search

Run UCS with modified edge costs

$$ \hat{C}(s,a) = C(s,a) + h(Succ(s,a))-h(s) $$
where $h(\cdot)$ is a heuristic function.

### Heuristic $h$

1. $h$ must be __consistent__, i.e., fulfill the triangle inequality

$$ \hat C (s,a) = C(s,a) + h(s') - h (s) \ge 0, $$

where $s':=Succ(s,a)$

2. $h(s_{end})=0$

## Correctness of A*

If h is consistent, A* returns the minimum cost path


## Efficiency  of A*

UCS explores all states $s$ satisfiying
$$ PastCost(s) \leq PastCost(EndState)$$

A* explores all states $s$ satisfiying
$$ PastCost(s) \leq PastCost(s_{end}) - h (s)$$


### Admissibility

A heuristic $h(s)$ is admissble if 

$$ h(s) \leq FutureCost(s) $$
Admissible heuristics are optimistic. Note that __consistency implies admissibility__