# Integrated Intelligent Systems: Module 7 - Robot Action Planning (Tutorial)

This notebook introduces the fundamentals of robot action planning and basic practices of it.

## Part 1: Concepts and Definitions

In [6]:
from IPython.display import display, HTML,IFrame
display(IFrame('rap_quiz.html', width=1000, height=1350))

## Part 2: Hands-On

### Exercise 1: Backward Search (Planning): 

Implement a state-space search based on a BFS/Dijkstra logic to find a path from a GOAL state back to a START state in a tree-based domain. The task is about finding the sequence of actions to go from GOAL to START.

In [3]:
import threading
import time
import random
from collections import deque

# Domain: A tree where edges are actions. 
# Format: parent_state: { action_name: child_state }
TREE_DOMAIN = {
    "at_cupboard": {"open_door": "door_open", "move_to_table": "at_table"},
    "door_open": {"reach_inside": "holding_cup", "close_door": "at_cupboard"},
    "holding_cup": {"move_to_table": "cup_at_table"},
    "at_table": {"pick_up": "holding_cup_at_table"},
    "holding_cup_at_table": {"pour": "liquid_served"}
}

def backward_search(start_state, goal_state):
    """
    Find a path from goal_state back to start_state.
    Returns a list of actions.
    """
    print(f"\n--- Exercise 1: Searching for plan to reach {goal_state} ---")
    
    # We need an inverse mapping to go backwards
    inverse_domain = {}
    for parent, actions in TREE_DOMAIN.items():
        for act, child in actions.items():
            if child not in inverse_domain: inverse_domain[child] = []
            inverse_domain[child].append((parent, act))

    queue = deque([(goal_state, [])])
    visited = {goal_state}

    while queue:
        current, path = queue.popleft()
        if current == start_state:
            # We found the start! The path is currently in reverse order.
            return path[::-1]

        for prev_state, action in inverse_domain.get(current, []):
            if prev_state not in visited:
                visited.add(prev_state)
                queue.append((prev_state, path + [action]))
    
    return None


### Exercise 2: The Recovery Hierarchy

A simulation of a "Pick-and-Place" action that requires nested logic for Simple Retry, Parameter Change (increasing force), and Action Change (resetting the robot base).

In [4]:
class RobotHardware:
    def __init__(self):
        self.base_aligned = True
        self.attempts = 0

    def primitive_grasp(self, force):
        self.attempts += 1
        # Fails if base isn't aligned, or force too low, or just bad luck
        if not self.base_aligned:
            raise Exception("AlignmentFailure")
        if force < 20 and random.random() < 0.8:
            raise Exception("GripFailure")
        if random.random() < 0.2: # Random stochastic failure
            raise Exception("StochasticFailure")
        print(f"  [Hardware] Grasp Successful with force {force}!")

def execute_with_recovery():
    print("\n--- Exercise 2: Recovery Hierarchy ---")
    robot = RobotHardware()
    force = 10
    
    # LEVEL 3: Action Change Loop
    for action_retry in range(2): 
        try:
            # LEVEL 2: Parameter Change Loop
            for param_retry in range(2):
                # LEVEL 1: Simple Retry Loop
                for simple_retry in range(3):
                    try:
                        print(f"  [Plan] Strategy 1: Attempting grasp (Force: {force})...")
                        robot.primitive_grasp(force)
                        return True
                    except Exception as e:
                        print(f"  [Plan] Failed: {e}. Retrying...")
                
                print("  [Plan] Strategy 2: Increasing grasp force parameter.")
                force += 15
            
            # If we reach here, we need Strategy 3
            print("  [Plan] Strategy 3: Action Change. Re-aligning robot base...")
            robot.base_aligned = True
            force = 10 # Reset params
        except Exception:
            continue
    
    print("  [Plan] All recovery strategies exhausted.")
    return False


### Exercise 3: Concurrent-Reactive Patterns: A mini-implementation of a CPL-style executor.

It includes the pursue, try_all, and whenever operators using Python's threading and Condition variables.

In [None]:
# =================================================================
# EXERCISE 3: CONCURRENT-REACTIVE BEHAVIOR
# Task: Use 'pursue' to monitor for spills while pouring.
# Implementations for: whenever, wait, pursue, try_all, try_in_order
# =================================================================

class Fluent:
    def __init__(self, value):
        self._val = value
        self._cond = threading.Condition()
    def get(self): return self._val
    def set(self, v):
        with self._cond:
            self._val = v
            self._cond.notify_all()
    def wait_for(self, target):
        with self._cond:
            while self._val != target:
                self._cond.wait()

def pursue(tasks):
    """Terminates all when one finishes."""
    stop_event = threading.Event()
    results = []
    
    def wrapper(task):
        res = task(stop_event)
        if res is not None:
            results.append(res)
            stop_event.set()

    threads = [threading.Thread(target=wrapper, args=(t,)) for t in tasks]
    for t in threads: t.start()
    for t in threads: t.join()
    return results

def whenever(fluent, target, callback):
    """Infinite loop reacting to fluent pulses."""
    def run(stop_event):
        while not stop_event.is_set():
            fluent.wait_for(target)
            if stop_event.is_set(): break
            callback()
            time.sleep(0.1) # Debounce
    return run

def try_in_order(tasks):
    """Executes tasks in sequence; stops at first success."""
    for t in tasks:
        try:
            res = t(threading.Event())
            if res: return True
        except: continue
    return False

def reactive_demo():
    print("\n--- Exercise 3: Reactivity (Pursue & Whenever) ---")
    spill_fluent = Fluent(False)
    
    def pour_action(stop_evt):
        for i in range(5):
            if stop_evt.is_set(): 
                print("  [Action] Pouring ABORTED.")
                return None
            print(f"  [Action] Pouring... {i+1}")
            time.sleep(0.5)
        print("  [Action] Pouring Finished.")
        return "Success"

    def monitor_spill(stop_evt):
        spill_fluent.wait_for(True)
        print("  [Monitor] EMERGENCY: Spill Detected!")
        return "Failure"

    # Trigger a spill randomly in background
    def trigger():
        time.sleep(0.8)
        spill_fluent.set(True)
    
    threading.Thread(target=trigger).start()
    
    outcome = pursue([pour_action, monitor_spill])
    print(f"  [Result] Termination outcome: {outcome}")

# =================================================================
# MAIN EXECUTION
# =================================================================

if __name__ == "__main__":
    # Run Exercise 1
    plan = backward_search("at_cupboard", "liquid_served")
    print(f"Plan found: {' -> '.join(plan) if plan else 'No path'}")

    # Run Exercise 2
    execute_with_recovery()

    # Run Exercise 3
    reactive_demo()


--- Exercise 1: Searching for plan to reach liquid_served ---
Plan found: move_to_table -> pick_up -> pour

--- Exercise 2: Recovery Hierarchy ---
  [Plan] Strategy 1: Attempting grasp (Force: 10)...
  [Plan] Failed: GripFailure. Retrying...
  [Plan] Strategy 1: Attempting grasp (Force: 10)...
  [Hardware] Grasp Successful with force 10!

--- Exercise 3: Reactivity (Pursue & Whenever) ---
  [Action] Pouring... 1
  [Action] Pouring... 2
  [Action] Pouring... 3
  [Action] Pouring... 4
  [Action] Pouring... 5
  [Action] Pouring Finished.
