# Prohledávání stavového prostoru
(Základní techniky umělé inteligence)

In [1]:
from robomission import SpaceWorld

In [76]:
# Pomocne funkce a tridy
# TODO: vyfaktorovat do samostatneho modulu
from collections import namedtuple, defaultdict, deque
Position = namedtuple('Position', ['row', 'col'])

class State:
    def __init__(self, world):
        # TODO: enforce immutability
        # TODO: allow for state[position]
        self.world = world
        self.n = max(pos.row for pos in world) + 1
        self.m = max(pos.col for pos in world) + 1
        # finds spaceship
        self.spaceship = None
        for pos, field in world.items():
            if field == 'S':
                self.spaceship = pos
                break
        # cache wormholes
        wormholes = defaultdict(list)
        for pos, field in world.items():
            if field in 'WXYZ':
                wormholes[field].append(pos)
        self.wormhole_positions = []
        self.wormhole_end = {}
        for letter, positions in wormholes.items():
            if len(positions) != 2:
                continue
                #raise ValueError('Expects exectly 2 wormholes of given type.')
            self.wormhole_positions.extend(positions)
            self.wormhole_end[positions[0]] = positions[1]
            self.wormhole_end[positions[1]] = positions[0]
        # We allow for plans without a spaceship
        # (used e.g. by show_path)
        self.world_vis = SpaceWorld()

    def is_goal(self):
        return self.spaceship.row == 0
    
    def show(self):
        # TODO: use jupyter-display
        # hack to overcome bug with empty fields
        text = str(self).replace('| ', '|k')
        self.world_vis.set(text)
        
    def is_wormhole(self, position):
        return position in self.wormhole_positions
    
    def get_wormhole_end(self, position):
        return self.wormhole_end[position]
        
    def __str__(self):
        # TODO: add __repr__ for debugging (State('''...''')
        fields = [[
            self.world[(Position(row, col))]
            for col in range(self.m)]
                for row in range(self.n)]
        text = '\n'.join('|{inside}|'.format(inside='|'.join(row)) for row in fields)
        #text = text.replace(' ', '.')
        return text
        
    
    
def perform(state, action):
    # TODO: thow an error if the resulting state is dead (?)
    # (or allow for representation of dead states)
    world = state.world.copy()
    spaceship = move(state.spaceship, action)
    if state.is_wormhole(spaceship):
        spaceship = state.get_wormhole_end(spaceship)
    world[state.spaceship] = ' '
    world[spaceship] = 'S' 
    return State(world)

def move(spaceship, action):
    dy = -1
    dx = -1 if action == 'l' else 0 if action == 'f' else 1
    new_spaceship = Position(
        row=spaceship.row + dy,
        col=spaceship.col + dx)
    return new_spaceship    


def actions(state):
    """Return actions that don't lead to dead state.
    """
    return [
        a for a in 'lfr'
        if state.world.get(move(state.spaceship, a), 'A') != 'A']     

        
def parse_state(text):
    rows = text.strip().split('||')
    fields = [row.strip('|').split('|') for row in rows]
    world = {}
    spaceship = None
    for r, row in enumerate(fields):
        for c, field in enumerate(row):
            world[Position(r, c)] = field
            if field == 'S':
                spaceship = Position(r, c)
    return State(world) 
        
        
def show_path(path):
    # path = iterable of States
    # Constructs a pseudo-state showing the path
    world = path[0].world.copy()
    for i, state in enumerate(path):
        world[state.spaceship] = str(i)
    print(State(world))
        


state = parse_state(
'| | | | | |'
'| | | | | |'
'|A|A|A|A| |'
'| | | | | |'
'| | |S| | |')
#print(state)
state.show()


# state = parse_state(
# '| | | | | |'
# '| | | |W| |'
# '|A|A|A|A| |'
# '| | | | | |'
# '| | |S| |W|')
# print(state)
# print(state.wormholes)
# print(state.wormhole_end)


# stav: pozice raketky (row, col)
# (zbytek sveta je nemenny)

# akce: 'l', 'f', 'r' (left, forward, right)
#print(perform(perform(perform(state, 'r'), 'r'), 'l'))

In [125]:
# Tools for debugging
class Logger:
    def __init__(self):
        self.output = True
        
    def debug(self, output=True):
        self.output = output
    
    def log(self, message):
        if self.output:
            print(message)
            print('-------')
    
    def log_search_tree(self, parents):
        self.log(search_tree_as_state(parents))

    def log_costs(self, costs, heuristic=None):
        self.log(costs_as_state(costs, heuristic))
        

# def inverse_tree(parents):
#     root = None
#     children = defaultdict(list)
#     for child, parent in parents.items():
#         if parent is None:
#             root = child
#         else:
#             children[parent].append(child)
#     return root, children

def get_root(parents):
    for child, parent in parents.items():
        if parent is None:
            return child
        
def get_depth(node, parents):
    depth = 0
    while parents[node] is not None:
        node = parents[node]
        depth += 1
    return depth

def search_tree_as_state(parents):
    #root, children = inverse_tree(parents)
    root = get_root(parents)
    world = root.world.copy()
    for state in parents.keys():
        depth = get_depth(state, parents)
        world[state.spaceship] = str(depth)
    return State(world)    
    
def show_search_tree(parents):
    print(search_tree_as_state(parents))   

def costs_as_state(costs, heuristic=None):
    heuristic = heuristic or (lambda x: 0)
    world = next(iter(costs.keys())).world.copy()
    for state in costs.keys():
        total_cost = costs[state] + heuristic(state)
        world[state.spaceship] = str(total_cost)
    return State(world)


LOGGER = Logger()

In [3]:
w = SpaceWorld()
w.set('''
|D|b |bD|b |b |
|k |k |kA|k |k |
|k |kA|kA|kA|k |
|k |k |kA|k |k |
|k |k |kS|k |k |
''')

In [121]:
# TODO: colored-table (or even better: use our react component)
from IPython.display import HTML, display

def show_state(state):
    text = str(state).split('\n')
    html = HTML('''
        <style>
          table {
            border-collapse: collapse;
          }
          table, td {
            border: 1px solid black;
          }   
        </style>
        ''' + 
        '<table><tr>{}</tr></table>'.format(
            '</tr><tr>'.join(
                '<td>{}</td>'.format(
                    '</td><td>'.join(
                        field for field in row)) for row in text)))
    display(html)
    
show_state(state)

0,1,2,3,4
.,.,.,.,.
.,.,.,.,.
A,A,A,A,.
.,.,.,.,.
.,.,S,.,.


.....
.....
AAAA.
.....
..S..


# Greedy Search (Hladové hledání)

TODO: explain

TODO: demonostrate needed functions (actions, is_goal, perform, append)

In [52]:
def greedy_search(state):
    """Return path from state to a goal state"""
    path = [state]
    while not state.is_goal():
        available_actions = actions(state)
        if not available_actions:
            break # Failed to find a path.
        # Choose the first available action (greedy choice).
        action = available_actions[0]
        state = perform(state, action)
        path.append(state)
    # Return the complete or the partial path.
    return path

# Jednoduchy pripad: vzdy dostupna prave jedna akce.
state = parse_state(
'|A|A| |A|A|'
'|A|A| |A|A|'
'|A| |A|A|A|'
'|A|A| |A|A|'
'|A|A|S|A|A|')
show_path(greedy_search(state))

|A|A|4|A|A|
|A|A|3|A|A|
|A|2|A|A|A|
|A|A|1|A|A|
|A|A|0|A|A|


In [57]:
# Greedy search bude fungovat i v pripade hodne
# ridce rozmistenych prekazek.
state = parse_state(
'| | | | | |'
'|A| |A| |A|'
'| | | | | |'
'|A| |A| |A|'
'| | | | | |'
'| | |S| | |')
show_path(greedy_search(state))

|5| | | | |
|A|4|A| |A|
|3| | | | |
|A|2|A| |A|
| |1| | | |
| | |0| | |


In [56]:
# Obecne ale greedy search zadnou cestu do cile najit nemusi,
# je mozne, ze se zasekne (ve "slepe ulicce").
state = parse_state(
'| | | | | |'
'|A|A| |A|A|'
'| | | | | |'
'| |A|A|A| |'
'| | | | | |'
'| | |S| | |')
show_path(greedy_search(state))

| | | | | |
|A|A| |A|A|
|3| | | | |
|2|A|A|A| |
| |1| | | |
| | |0| | |


# DFS

In [5]:
# Tree-DFS pomoci rekurze (nehlida zacykleni)
def dfs(state):
    """Return path from state to a goal state"""
    #print('---\nstate:\n' + str(state))
    #input()
    if state.is_goal():
        return [state]
    for action in actions(state):      
        next_state = perform(state, action)
        path = dfs(next_state)
        if path:
            return [state] + path
    return None  # no path found

state = parse_state(
'| | | | | |'
'| | | | | |'
'|A|A|A|A| |'
'| | | | | |'
'| | |S| | |')
show_path(dfs(state))

| | |4| | |
| | | |3| |
|A|A|A|A|2|
| | | |1| |
| | |0| | |


In [27]:
# Zasobnik = pridavame a odebirame ze stejneho konce,
# napr. stos knih, naskladane talire.
stack = ['a', 'b', 'c']  # vrchol zasobniku je vpravo ('c')
stack.append('d')  # pridani 'd' na vrchol zasobniku
print(stack)  # ['a', 'b', 'c', 'd']
stack.pop()  # odebrani prvku 'd' z vrcholu zasobniku
print(stack)  # ['a', 'b', 'c']
stack.pop()  # odebrani prvku 'c' z vrcholu zasobniku
print(stack)  # ['a', 'b']

['a', 'b', 'c', 'd']
['a', 'b', 'c']
['a', 'b']


In [25]:
# Terminology: tree vs. parents?
def reconstruct_path(tree, state):
    reversed_path = []
    while state is not None:
        reversed_path.append(state)
        state = tree[state]
    path = list(reversed(reversed_path))
    return path

    
# tree search - bez kontroly zacykleni
def iterative_dfs(initial_state):
    stack = [initial_state]
    tree = {initial_state: None}  # maps nodes to their parents
    while stack:
        LOGGER.log_search_tree(tree)
        state = stack.pop()
        if state.is_goal():
            return reconstruct_path(tree, state)
        for action in actions(state):
            next_state = perform(state, action)
            stack.append(next_state)
            tree[next_state] = state
            

LOGGER.debug(False)
state = parse_state(
'| | | | | |'
'| | | | | |'
'|A|A|A|A| |'
'| | | | | |'
'| | |S| | |')
show_path(iterative_dfs(state))

| | | | |4|
| | | | |3|
|A|A|A|A|2|
| | | |1| |
| | |0| | |


In [45]:
# graph search - bez duplicitnich vypoctu a moznosti zacykleni
# TODO: potreba motivovat prikladem (zacykleni by potrebovalo
# cervi diry, ale redundantni vypocty lze i bez toho)
def graph_dfs(initial_state):
    # Potrebujeme ukladat i rodice (tj. ukladame hrany)
    # TODO: check jestli tohle neni potreba i v tree-searchi
    stack = [(initial_state, None)]
    tree = {}  # maps states to their parents
    while stack:
        state, parent = stack.pop()
        # TODO: check what is the correct place for the test
        if state in tree: # = is visited
            continue
        tree[state] = parent
        LOGGER.log_search_tree(tree)
        if state.is_goal():
            return reconstruct_path(tree, state)
        for action in actions(state):
            next_state = perform(state, action)
            stack.append((next_state, state))       

LOGGER.debug(False)
state = parse_state(
'| | | | | |'
'| | | | | |'
'|A|A|A|A| |'
'| | | | | |'
'| | |S| | |')
show_path(graph_dfs(state))            

| | | | |4|
| | | | |3|
|A|A|A|A|2|
| | | |1| |
| | |0| | |


Co DFS nezvladne: pokud nejsou reseni ve stejne hloubce (a pritom chceme najit optimalni cestu). Priklad s cervimi dirami:

In [78]:
LOGGER.debug(False)
state = parse_state(
'| | | | | |'
'| | | |W| |'
'| | | | | |'
'| | |W| | |'
'| | |A| | |'
'| | |S| | |')
show_path(graph_dfs(state))

| | | | |5|
| | | |W|4|
| | | | |3|
| | |W| |2|
| | |A|1| |
| | |0| | |


# BFS (Breadth-first search)

In [37]:
# Fronta = usporadana kolekce prvku, odebirame a pridavame na opacne
# konce, jako u bezne fronty napr. v obchode.
queue = deque(['a', 'b', 'c'])  # 'a' na zacatku fronty, 'c' na konci
queue.append('d')  # 'd' prisel na konec fronty
print(queue)  # deque(['a', 'b', 'c', 'd'])
queue.popleft()  # odchazi 'a'
print(queue)  # deque(['b', 'c', 'd'])
queue.popleft()  # odchazi 'b'
print(queue)  # deque(['c', 'd'])

deque(['a', 'b', 'c', 'd'])
deque(['b', 'c', 'd'])
deque(['c', 'd'])


In [None]:
# TODO: svet, ktery nejde optimalne resit pomoci DFS
# (cervi diry nebo nestejna hloubka reseni)

In [83]:
def bfs(initial_state):
    if initial_state.is_goal():
        return [initial_state]
    queue = deque([initial_state])
    tree = {initial_state: None}  # maps states to their parents
    while queue:
        LOGGER.log_search_tree(tree)
        state = queue.popleft()
        for action in actions(state):
            next_state = perform(state, action)
            if (next_state in tree.keys()
                or next_state in queue):
                continue
            queue.append(next_state)
            tree[next_state] = state
            # U BFS lze test na cilovy stav provadet uz zde.
            if next_state.is_goal():
                return reconstruct_path(tree, next_state)

# LOGGER.debug(False)
# state = parse_state(
# '| | | | | |'
# '| | | | | |'
# '|A|A|A|A| |'
# '| | | | | |'
# '| | |S| | |')
# show_path(bfs(state))
LOGGER.debug(False)
state = parse_state(
'| | | | | |'
'| | | |W| |'
'| | | | | |'
'| | |W| | |'
'| | |A| | |'
'| | |S| | |')
show_path(bfs(state))

| | |3| | |
| | | |2| |
| | | | | |
| | |W| | |
| |1|A| | |
| | |0| | |


# Uniform Cost Search (UCS) (Dijkstra)

Přidání cen akcí -> DFS ani BFS nestačí.

Příklad cen: let dopředu 2, let vlevo/vpravo 3.

In [92]:
# # TODO: vysvetlit prioritni haldu
# from heapq import heappush, heappop

# queue = []
# heappush(queue, (5, 'a'))  # Vkladame prvek 'a' s prioritou 5.
# heappush(queue, (3, 'b'))  # Vkladame prvek 'b' s prioritou 3.
# heappush(queue, (7, 'c'))  # Vkladame prvek 'c' s prioritou 7.
# print(queue)  # [(3, 'b'), (5, 'a'), (7, 'c')]
# print(heappop(queue)) # (3, 'b')
# print(queue)  # [(5, 'a'), (7, 'c')]
# print(heappop(queue)) # (5, 'a')
# print(queue)  # [(7, 'c')]

costs = {}
costs['a'] = 5
costs['b'] = 3
costs['c'] = 7
min(costs, key=lambda x: costs[x])

'b'

In [113]:
from math import inf
ACTION_COSTS = {'l': 3, 'f': 2, 'r': 3}

def ucs(initial_state):
    # Ke kazdeme videnemu stavu ulozime jeho rodice.
    tree = {initial_state: None}
    # Do fronty ukladame dvojice (cena, stav).
    #queue = [(0, initial_state)]
    # Budeme zvlast ukladat vsechny ceny (i do jiz prozkoumanych
    # vrcholu) a okraj (vrcholy k prozkoumani).
    costs = {initial_state: 0}
    fringe = {initial_state}
    while fringe:
        LOGGER.log_costs(costs)
        #cost, state = heappop(queue)
        state = min(fringe, key=lambda s: costs[s])
        fringe.remove(state)
        if state.is_goal():
            return reconstruct_path(tree, state)
        for action in actions(state):
            next_state = perform(state, action)
            new_cost = costs[state] + ACTION_COSTS[action]
            old_cost = costs.get(next_state, inf)
            if new_cost < old_cost:
                fringe.add(next_state)
                costs[next_state] = new_cost
                tree[next_state] = state
            
LOGGER.debug(False)
state = parse_state(
'| | | | | |'
'| | | | | |'
'|A|A|A|A| |'
'| | | | | |'
'| | |S| | |')
show_path(ucs(state))

| | | | |4|
| | | | |3|
|A|A|A|A|2|
| | | |1| |
| | |0| | |


# A*

Kombinace UCS a heuristiky.

Příklad heuristiky: vertikální vzdálenost od cíle (komplikovanější heuristiky můžou brát v úvahu počet posbíraných diamantů).

TODO: ukázat příklad, kdy UCS prohledává zbytečně moc (potřeba červích děr nebo diamantů) a jak to vyřešit pomocí jednoduché heuristiky

In [116]:
def heuristic_distance(state):
    return state.spaceship.row

In [118]:
heuristic_distance(parse_state(
'| | | | | |'
'| | | | | |'
'|A|A|A|A| |'
'| | |S| | |'
'| | | | | |'))

3

In [117]:
heuristic_distance(parse_state(
'| | |S| | |'
'| | | | | |'
'|A|A|A|A| |'
'| | | | | |'
'| | | | | |'))

0

In [127]:
# Jako UCS, lisi se jen vypoctem ceny.
def a_star(initial_state, heuristic):
    # Ke kazdeme videnemu stavu ulozime jeho rodice.
    tree = {initial_state: None}
    # Do fronty ukladame dvojice (cena, stav).
    #queue = [(0, initial_state)]
    # Budeme zvlast ukladat vsechny ceny (i do jiz prozkoumanych
    # vrcholu) a okraj (vrcholy k prozkoumani).
    costs = {initial_state: 0}
    fringe = {initial_state}
    while fringe:
        LOGGER.log_costs(costs, heuristic)
        #cost, state = heappop(queue)
        # Jedina zmena oproti UCS:
        state = min(fringe, key=lambda s: costs[s] + heuristic(s))
        fringe.remove(state)
        if state.is_goal():
            return reconstruct_path(tree, state)
        for action in actions(state):
            next_state = perform(state, action)
            new_cost = costs[state] + ACTION_COSTS[action]          
            old_cost = costs.get(next_state, inf)
            if new_cost < old_cost:
                fringe.add(next_state)
                costs[next_state] = new_cost
                tree[next_state] = state
            
LOGGER.debug(False)
state = parse_state(
'| | | | | |'
'| | | | | |'
'|A|A|A|A| |'
'| | | | | |'
'| | |S| | |')
show_path(a_star(state, heuristic=heuristic_distance))

| | | | |4|
| | | | |3|
|A|A|A|A|2|
| | | |1| |
| | |0| | |


## TODO

- intro
- DFS
- DFS - explain tree search vs. graph search
- nastroje pro pohodlne ladeni vsech algoritmu (vizualizace cesty i prubehu planovani (explored/frontier/unexplored states) -> umoznit vypisy (ala logger), vcetne textovych (-> muzu zkopirovat text stavu a vyzkouset si zacit z neho atp.)
- DFS - recursive version
- BFS (motivace: cervi diry - DFS nemusi najit nejkratsi cestu)
- UCS
- A\*
- refaktorovat (zprehlednit, zjednodusit, okomenotvat) kod
- inline react component for visualizing states, paths (and ideally also allow to play the game... or at least provide a link to task-editor)
- rezerva: greedy search, BF, DP, "patnáctka" (sliding tiles)
- parsovani sveta (jako v JS)
- vyfaktorovat pomocné funkce do samostatného .py modulu
- hezčí vykreslování stavu (HTML tabulka, react components)
- another notebook with stripped solutions
- inspirace: Jak to vyresit, Programatorska cvicebnice, Sbirka do Navalu, KSI (napr. Honzovo bludiste, davna DFS/BFS videa), Ucadity AI, EdX AI lecture, google
- testing by friends
- utopicke: pridat jako dalsi level do RoboMise (s pripravenymi high-level bloky a/nebo v RoboKodu)

## Rozšíření
* protihráč -> minimax
* náhoda, nejistota -> expectimax, MDP (value iteration - DP)
* too many states (even infinitely many of them)
* continuous actions
* ...

## Další zdroje
* TBA: videa z AI kurzu na EdX