In [None]:
%load_ext autotime
%load_ext autoreload
%autoreload
%load_ext dotenv
%dotenv

In [None]:
import logging

# Set the logging level to INFO
logging.basicConfig(level=logging.INFO)

In [None]:
import os
from openai import OpenAI

 
client = OpenAI(
    api_key=os.environ["GEMINI_API_KEY"],
    base_url="https://generativelanguage.googleapis.com/v1beta/openai/"
)

In [None]:
import json
import dataclasses

import py_trees
import arrow
from toolbox import Toolbox


# Clear the blackboard so we start fresh
py_trees.blackboard.Blackboard.clear()

blackboard = py_trees.blackboard.Client(name="Global")
blackboard.register_key(key="order", access=py_trees.common.Access.WRITE)
blackboard.order = []
blackboard.register_key(key="message_history", access=py_trees.common.Access.WRITE)
blackboard.message_history = []
blackboard.register_key(key="current_item", access=py_trees.common.Access.WRITE)
blackboard.current_item = None
blackboard.register_key(key="needs_customer_input", access=py_trees.common.Access.WRITE)
blackboard.needs_customer_input = False
blackboard.register_key(key="llm_job", access=py_trees.common.Access.WRITE)
blackboard.llm_job = None
blackboard.register_key(key="order_complete", access=py_trees.common.Access.WRITE)
blackboard.order_complete = False

In [None]:
import operator

from llm_node import LLMNode
from customer_input import CustomerInput
from complete_order import CompleteOrder
from ask_item import AskItem
from ask_how_many import AskHowMany
from ask_fries import AskFries
from ask_topping import AskTopping
from confirm_item import ConfirmItem


root = py_trees.composites.Selector(name="MainSequence", memory=False)


llm_jobs = py_trees.composites.Sequence("LLM Job", memory=False, children=[
    py_trees.behaviours.CheckBlackboardVariableValue("Has LLM Job?", py_trees.common.ComparisonExpression(
        variable="/llm_job",
        value=None,
        operator=operator.ne,
    )),
    LLMNode("LLM Job", client)
])

customer_input = py_trees.composites.Sequence("Customer Input", memory=False, children=[
    py_trees.behaviours.CheckBlackboardVariableValue("Needs Customer Input?", py_trees.common.ComparisonExpression(
        variable="/needs_customer_input",
        value=True,
        operator=operator.eq,
    )),
    CustomerInput("Ask Customer")
])

complete_order = py_trees.composites.Sequence("Complete Order", memory=False, children=[
    py_trees.behaviours.CheckBlackboardVariableValue("Has order items?", py_trees.common.ComparisonExpression(
        variable="/order",
        value=1,
        operator=lambda order, length: len(order) >= length,
    )),
    py_trees.behaviours.CheckBlackboardVariableValue("Not currently working on something?", py_trees.common.ComparisonExpression(
        variable="/current_item",
        value=None,
        operator=operator.eq,
    )),
    CompleteOrder("Confirm Order")
])

choose_item = py_trees.composites.Sequence("Choose Item", memory=False, children=[
    py_trees.behaviours.CheckBlackboardVariableValue("Needs current item?", py_trees.common.ComparisonExpression(
        variable="/current_item",
        value=None,
        operator=operator.eq,
    )),
    AskItem("Ask Item"),
])

customize_item = py_trees.composites.Selector('Customize Item', memory=False)
customize_chicken_nuggets = py_trees.composites.Sequence('Chicken Nuggets', memory=False, children=[
    py_trees.behaviours.CheckBlackboardVariableValue("Item is chicken nuggets?", py_trees.common.ComparisonExpression(
        variable="/current_item.name",
        value="chicken nuggets",
        operator=operator.eq
    )),
    py_trees.composites.Selector('How many?', memory=False, children=[
        py_trees.behaviours.CheckBlackboardVariableValue("Has Quantity?", py_trees.common.ComparisonExpression(
            variable="/current_item.quantity",
            value=None,
            operator=operator.ne
        )),
        AskHowMany("Ask how many?"),
    ]),
    py_trees.composites.Selector('With fries?', memory=False, children=[
        py_trees.behaviours.CheckBlackboardVariableValue("Has fries?", py_trees.common.ComparisonExpression(
            variable="/current_item.fries",
            value=None,
            operator=operator.ne
        )),
        AskFries("Do you want fries with that?"),
    ]),
])

def make_topping(name, topping):
    return py_trees.composites.Selector(f'With {topping}?', memory=False, children=[
        py_trees.behaviours.CheckBlackboardVariableValue(f"Has {topping}?", py_trees.common.ComparisonExpression(
            variable="/current_item",
            value=None,
            operator=lambda item, _: item.__dict__[topping] is not None
        )),
        AskTopping(name, topping),
    ])

customize_hamburger = py_trees.composites.Sequence('Hamburger', memory=False, children=[
    py_trees.behaviours.CheckBlackboardVariableValue("Item is hamburger?", py_trees.common.ComparisonExpression(
        variable="/current_item.name",
        value="hamburger",
        operator=operator.eq
    )),
    make_topping("Pickles?", "pickles"),
    make_topping("Lettuce?", "lettuce"),
    make_topping("Tomatoes?", "tomatoes"),
    py_trees.composites.Selector('With fries?', memory=False, children=[
        py_trees.behaviours.CheckBlackboardVariableValue("Has fries?", py_trees.common.ComparisonExpression(
            variable="/current_item.fries",
            value=None,
            operator=operator.ne
        )),
        AskFries("Do you want fries with that?"),
    ]),
])

customize_item.add_children([
    customize_chicken_nuggets,
    customize_hamburger,
])

confirm_item = ConfirmItem("Confirm Item")

build_item = py_trees.composites.Sequence("Build Item", memory=False, children=[
    customize_item,
    confirm_item,
])

ordering = py_trees.composites.Selector('Ordering', memory=False, children=[
    complete_order,
    choose_item,
    build_item,
])  

root.add_children([
    llm_jobs,
    customer_input,
    ordering,
])

root.setup()

In [None]:
from IPython.display import clear_output

def log_tree():
    print(py_trees.display.ascii_tree(root, show_status=True))
    print(py_trees.display.unicode_blackboard())

while not blackboard.order_complete:
    clear_output()
    for message in blackboard.message_history:
        print(f"{message['role']}: {message['content']}")
        
    for node in root.tick():
        #log_tree()
        pass

In [None]:
import pydot
from IPython.display import SVG

graph = py_trees.display.dot_tree(root)
SVG(graph.create_svg())