In [1]:
import os
os.environ["LANGFUSE_SECRET_KEY"] = "sk-lf-86758fd8-a721-4fae-b664-c4c76f3554cd"
os.environ["LANGFUSE_PUBLIC_KEY"] = "pk-lf-a185ba56-5896-44da-b072-6ef414739b8a"

In [2]:
import concurrent.futures
from anthropic import AnthropicBedrock
from compiler.core import Compiler
from langfuse import Langfuse
from fsm_core.helpers import solve_agent
from fsm_core import common, typespec, drizzle, typescript, handler_tests, handlers
from fsm_core.common import Node, AgentState


import statemachine

In [3]:
langfuse_client = Langfuse()
compiler = Compiler("botbuild/tsp_compiler", "botbuild/app_schema")
m_claude = AnthropicBedrock(aws_profile="dev", aws_region="us-west-2")

In [4]:
def not_impl(ctx):
    raise NotImplementedError("Not implemented")

def fail_state(ctx):
    print("FAILED")

In [5]:
class ActorContext:
    def __init__(self, compiler: Compiler):
        self.compiler = compiler


class TypespecActor:
    def __init__(self, m_claude: AnthropicBedrock, compiler: Compiler, langfuse_client: Langfuse):
        self.m_claude = m_claude
        self.compiler = compiler
        self.langfuse_client = langfuse_client

    def execute(self, application_description: str):
        start = typespec.Entry(application_description)
        result, _ = solve_agent(start, ActorContext(compiler), "solve_typespec", self.m_claude, self.langfuse_client)
        if result is None:
            raise ValueError("Failed to solve typespec")
        if not isinstance(result.data.inner, typespec.Success):
            raise Exception("Bad state: " + str(result.data.inner))
        return result.data.inner


class DrizzleActor:
    def __init__(self, m_claude: AnthropicBedrock, compiler: Compiler, langfuse_client: Langfuse):
        self.m_claude = m_claude
        self.compiler = compiler
        self.langfuse_client = langfuse_client

    def execute(self, typespec_definitions: str):
        start = drizzle.Entry(typespec_definitions)
        result, _ = solve_agent(start, ActorContext(compiler), "solve_drizzle", self.m_claude, self.langfuse_client)
        if result is None:
            raise ValueError("Failed to solve drizzle")
        if not isinstance(result.data.inner, drizzle.Success):
            raise Exception("Failed to solve drizzle: " + str(result.data.inner))
        return result.data.inner


class TypescriptActor:
    def __init__(self, m_claude: AnthropicBedrock, compiler: Compiler, langfuse_client: Langfuse):
        self.m_claude = m_claude
        self.compiler = compiler
        self.langfuse_client = langfuse_client

    def execute(self, typespec_definitions: str):
        start = typescript.Entry(typespec_definitions)
        result, _ = solve_agent(start, ActorContext(compiler), "solve_typescript", self.m_claude, self.langfuse_client)
        if result is None:
            raise ValueError("Failed to solve typescript")
        if not isinstance(result.data.inner, typescript.Success):
            raise Exception("Failed to solve typescript: " + str(result.data.inner))
        return result.data.inner


class HandlerTestsActor:
    def __init__(self, m_claude: AnthropicBedrock, compiler: Compiler, langfuse_client: Langfuse, max_workers=5):
        self.m_claude = m_claude
        self.compiler = compiler
        self.langfuse_client = langfuse_client
        self.max_workers = max_workers
    
    def execute(self, functions: list[str], typescript_schema: str, drizzle_schema: str) -> dict[str, handler_tests.Success]:
        future_to_tests: dict[concurrent.futures.Future[tuple[Node[AgentState] | None, Node[AgentState]]]] = {}
        result_dict = {}
        with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            for function in functions:
                start = handler_tests.Entry(function, typescript_schema, drizzle_schema)
                future_to_tests[executor.submit(solve_agent, start, ActorContext(compiler), "solve_handler_tests", self.m_claude, self.langfuse_client)] = function
            for future in concurrent.futures.as_completed(future_to_tests):
                function = future_to_tests[future]
                result, _ = future.result()
                # can skip if failure and generate what succeeded
                if result is None:
                    raise ValueError(f"Failed to solve handler tests for {function}")
                if not isinstance(result.data.inner, handler_tests.Success):
                    raise Exception(f"Failed to solve handler tests for {function}: " + str(result.data.inner))
                result_dict[function] = result.data.inner
        return result_dict


class HandlersActor:
    def __init__(self, m_claude: AnthropicBedrock, compiler: Compiler, langfuse_client: Langfuse):
        self.m_claude = m_claude
        self.compiler = compiler
        self.langfuse_client = langfuse_client

    def execute(self, functions: list[str], typescript_schema: str, drizzle_schema: str, tests: dict[str, handler_tests.Success]) -> dict[str, handlers.Success]:
        futures_to_handlers: dict[concurrent.futures.Future[tuple[Node[AgentState] | None, Node[AgentState]]]] = {}
        result_dict = {}
        with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
            for function in functions:
                start = handlers.Entry(function, typescript_schema, drizzle_schema, tests[function].source)
                futures_to_handlers[executor.submit(solve_agent, start, ActorContext(compiler), "solve_handlers", self.m_claude, self.langfuse_client)] = function
            for future in concurrent.futures.as_completed(futures_to_handlers):
                function = futures_to_handlers[future]
                result, _ = future.result()
                if result is None:
                    raise ValueError(f"Failed to solve handlers for {function}")
                if not isinstance(result.data.inner, handlers.Success):
                    raise Exception(f"Failed to solve handlers for {function}: " + str(result.data.inner))
                result_dict[function] = result.data.inner
        return result_dict


In [6]:
typespec_actor = TypespecActor(m_claude, compiler, langfuse_client)
drizzle_actor = DrizzleActor(m_claude, compiler, langfuse_client)
typescript_actor = TypescriptActor(m_claude, compiler, langfuse_client)
handler_tests_actor = HandlerTestsActor(m_claude, compiler, langfuse_client)
handlers_actor = HandlersActor(m_claude, compiler, langfuse_client)

In [7]:
machine: statemachine.State = {
    "on": {
        "START": "typespec",
    },
    "states": {
        "typespec": {
            "on": {
                "PROMPT": "adjust",
                "CONFIRM": "core",
            },
            "states": {
                "init": {
                    "invoke": {
                        "src": typespec_actor,
                        "input_fn": lambda ctx: (ctx["application_description"],),
                        "on_done": {
                            "target": "wait",
                            "actions": [lambda ctx, event: ctx.update({"typespec_schema": event})],
                        }
                    }
                },
                "adjust": {
                    "entry": [not_impl],
                },
                "wait": {},
            },
            "always": {
                "guard": lambda ctx: "typespec_schema" not in ctx,
                "target": "init",
            }
        },
        "core": {
            "states": {
                "drizzle": {
                    "invoke": {
                        "src": drizzle_actor,
                        "input_fn": lambda ctx: (ctx["typespec_schema"].typespec,),
                        "on_done": {
                            "target": "typescript",
                            "actions": [lambda ctx, event: ctx.update({"drizzle_schema": event})],
                        },
                        "on_error": {
                            "target": "failure",
                            "actions": [lambda ctx, event: ctx.update({"error": event})],
                        },
                    }
                },
                "typescript": {
                    "invoke": {
                        "src": typescript_actor,
                        "input_fn": lambda ctx: (ctx["typespec_schema"].typespec,),
                        "on_done": {
                            "target": "handler_tests",
                            "actions": [lambda ctx, event: ctx.update({"typescript_schema": event})],
                        },
                        "on_error": {
                            "target": "failure",
                            "actions": [lambda ctx, event: ctx.update({"error": event})],
                        },
                    }
                },
                "handler_tests": {
                    "invoke": {
                        "src": handler_tests_actor,
                        "input_fn": lambda ctx: (ctx["typescript_schema"].functions, ctx["typescript_schema"].typescript_schema, ctx["drizzle_schema"].drizzle_schema),
                        "on_done": {
                            "target": "handlers",
                            "actions": [lambda ctx, event: ctx.update({"handler_tests": event})],
                        },
                        "on_error": {
                            "target": "failure",
                            "actions": [lambda ctx, event: ctx.update({"error": event})],
                        },
                    }
                },
                "handlers": {
                    "invoke": {
                        "src": handlers_actor,
                        "input_fn": lambda ctx: (ctx["typescript_schema"].functions, ctx["typescript_schema"].typescript_schema, ctx["drizzle_schema"].drizzle_schema, ctx["handler_tests"]),
                        "on_done": {
                            "target": "complete",
                            "actions": [lambda ctx, event: ctx.update({"handlers": event})],
                        },
                        "on_error": {
                            "target": "failure",
                            "actions": [lambda ctx, event: ctx.update({"error": event})],
                        },
                    }
                },
            },
            "always": {
                "guard": lambda ctx: "typespec_schema" in ctx,
                "target": "drizzle",
            }
        },
        "complete": {
            "on": {
                "PROMPT": "edit_application",
            }
        },
        "failure": {
            "entry": [fail_state],
        },
        "edit_application": {
            "entry": [not_impl],
        },
    }
}

In [8]:
fsm = statemachine.StateMachine(machine, {"application_description": "Make me a greeting bot"})

In [9]:
fsm.send("START")

Processing transition: [] typespec
Processing transition: ['typespec'] init
Processing transition: ['typespec', 'init'] wait


In [10]:
fsm.send("CONFIRM")

Processing transition: ['typespec', 'wait'] core
Processing transition: ['core'] drizzle
Processing transition: ['core', 'drizzle'] typescript
Processing transition: ['core', 'typescript'] handler_tests
Processing transition: ['core', 'handler_tests'] failure
FAILED


In [11]:
fsm.context

{'application_description': 'Make me a greeting bot',
 'typespec_schema': <fsm_core.typespec.Success at 0xffff7355a000>,
 'drizzle_schema': <fsm_core.drizzle.Success at 0xffff7c02d7c0>,
 'typescript_schema': <fsm_core.typescript.Success at 0xffff8414e420>,
 'error': Exception("Failed to solve handler tests for FunctionDeclaration(name='setUserName', argument_type='UserNameRequest', argument_schema='userNameRequestSchema', return_type='Promise<void>'): <fsm_core.handler_tests.Entry object at 0xffff672f91f0>")}

In [11]:
fsm.stack_path

['complete']

In [17]:
import graphviz
from typing import Dict, Any

def generate_statechart(machine: Dict[str, Any], filename: str = "statechart", fmt: str = "png") -> graphviz.Digraph:
    """
    Generates a statechart diagram (Graphviz Digraph) from the provided state machine structure.
    
    Args:
        machine: The state machine definition dictionary.
        filename: The output filename (without extension).
        fmt: The output format (png, pdf, svg, etc.).
    
    Returns:
        A graphviz.Digraph object.
    """
    dot = graphviz.Digraph(comment="Statechart")
    
    # Creates a unique node identifier by joining parent and current names
    def node_id(parent: str, name: str) -> str:
        return f"{parent}.{name}" if parent else name
    
    # Recursively add states and their transitions
    def add_state(name: str, state: Dict[str, Any], parent: str):
        current_id = node_id(parent, name)
        dot.node(current_id, label=name)
        
        # Process "on" transitions (event-triggered transitions)
        if "on" in state:
            for event, target in state["on"].items():
                target_id = node_id(parent, target)
                dot.edge(current_id, target_id, label=event)
        
        # Process "always" transitions (unconditional transitions)
        if "always" in state and "target" in state["always"]:
            target = state["always"]["target"]
            target_id = node_id(parent, target)
            dot.edge(current_id, target_id, label="always", style="dashed")
    
        # Process nested states. A dotted arrow indicates the initial state if provided.
        if "states" in state:
            # If the parent state specifies an initial child state, mark it.
            initial = state.get("initial")
            for child_name, child_state in state["states"].items():
                child_id = node_id(current_id, child_name)
                add_state(child_name, child_state, current_id)
                if initial == child_name:
                    dot.edge(current_id, child_id, label="initial", style="dotted")
    
    # Create a root node representing the top-level of the state machine.
    root_id = "root"
    dot.node(root_id, label="root", shape="ellipse")
    
    # Process any top-level transitions from machine["on"] if provided.
    if "on" in machine:
        for event, target in machine["on"].items():
            target_id = node_id(root_id, target)
            dot.edge(root_id, target_id, label=event)
    
    # Process top-level states.
    if "states" in machine:
        for state_name, state in machine["states"].items():
            add_state(state_name, state, root_id)
    
    # Optional: add a start pseudo-node pointing to the root.
    dot.node("start", shape="point")
    dot.edge("start", root_id, label="start")
    
    dot.format = fmt
    output_path = dot.render(filename, cleanup=True)
    print("Statechart generated at", output_path)
    return dot

In [None]:
res_viz = generate_statechart(machine, "fsm", "png")