# LangGraph: Functional API Playground

In [56]:
# Ref:
# https://langchain-ai.github.io/langgraph/concepts/functional_api/

In [44]:
import time
import uuid
import random

from langgraph.checkpoint.memory import InMemorySaver
from langgraph.func import entrypoint, task
from langgraph.types import interrupt

## Define Task & Entrypoint

In [49]:
@task
def write_essay(topic: str) -> str:
    time.sleep(1)
    # Random task result would be cached for resumation.
    # e.g. API calls -> Idempotency + Task restarts
    number = random.randint(1, 10)
    return f"An essay about topic: {topic}, number: {number}"


@entrypoint(checkpointer=InMemorySaver())
def workflow(topic: str) -> dict:
    essay = write_essay(topic).result()
    is_approved = interrupt({
        "essay": essay,
        "action": "Please approve/reject the essay",
    })

    return {
        "essay": essay,
        "is_approved": is_approved,
    }

In [50]:
thread_id = str(uuid.uuid4())
config = {"configurable": {"thread_id": thread_id}}

In [51]:
# Sync invoke
workflow.invoke("cat", config=config)

{'__interrupt__': [Interrupt(value={'essay': 'An essay about topic: cat, number: 7', 'action': 'Please approve/reject the essay'}, resumable=True, ns=['workflow:61ac8046-26a0-512b-fc59-5e8fc0a22505'])]}

In [52]:
# Async invoke
await workflow.ainvoke("cat", config=config)

{'__interrupt__': [Interrupt(value={'essay': 'An essay about topic: cat, number: 5', 'action': 'Please approve/reject the essay'}, resumable=True, ns=['workflow:1b4c5c11-3c54-47dd-237d-36c732c27efc'])]}

In [53]:
# Stream items
for item in workflow.stream("cat", config=config):
    print(item)

{'write_essay': 'An essay about topic: cat, number: 1'}
{'__interrupt__': (Interrupt(value={'essay': 'An essay about topic: cat, number: 1', 'action': 'Please approve/reject the essay'}, resumable=True, ns=['workflow:64fea6c7-75f7-ec6d-9bf2-128e11f43ef3']),)}


In [54]:
# Async stream items
async for item in workflow.astream("cat", config=config):
    print(item)

{'write_essay': 'An essay about topic: cat, number: 3'}
{'__interrupt__': (Interrupt(value={'essay': 'An essay about topic: cat, number: 3', 'action': 'Please approve/reject the essay'}, resumable=True, ns=['workflow:06e9877c-4173-338a-eb8d-a4bedb8ae008']),)}


## Human-in-the-loop

In [55]:
from langgraph.types import Command

# New Thread.
config = {"configurable": {"thread_id": str(uuid.uuid4())}}

# Invoke and got interrupt.
item = workflow.invoke("cat", config=config)
print(item)

# Resume with human approval.
human_review = True
for item in workflow.stream(Command(resume=human_review), config):
    print(item)

{'__interrupt__': [Interrupt(value={'essay': 'An essay about topic: cat, number: 4', 'action': 'Please approve/reject the essay'}, resumable=True, ns=['workflow:98e2af47-a6ac-8f76-3bf4-0b339b65cb3a'])]}
{'workflow': {'essay': 'An essay about topic: cat, number: 4', 'is_approved': True}}


## Short-term memory

In [26]:
from typing import Optional

@entrypoint(checkpointer=InMemorySaver())
def stm_workflow(
    number: int, *, previous: Optional[int] = None
) -> entrypoint.final[int, int]:
    previous = previous or 0
    return entrypoint.final(value=previous, save=2*number)

In [27]:
config = {"configurable": {"thread_id": str(uuid.uuid4())}}

stm_workflow.invoke(3, config)

0

In [28]:
stm_workflow.invoke(2, config)

6

In [29]:
stm_workflow.invoke(5, config)

4

## Task: Sync vs Async

In [42]:
@task
def add_one(a: int) -> int:
    time.sleep(2)
    return a + 1


@entrypoint(checkpointer=InMemorySaver())
def sync_workflow(numbers: list[int]) -> list[int]:
    futures = [add_one(n) for n in numbers]
    results = [f.result() for f in futures]
    return results


# Call the entrypoint
config = {"configurable": {"thread_id": str(uuid.uuid4())}}

start_ts = time.perf_counter()
result = sync_workflow.invoke([1, 2, 3, 4, 5], config)
print(f"result: {result}")
print(f"Done in {time.perf_counter() - start_ts:.3f}s")

result: [2, 3, 4, 5, 6]
Done in 2.011s


In [41]:
# Async task
import asyncio

@task
async def add_one(a: int) -> int:
    await asyncio.sleep(2)
    return a + 1


@entrypoint(checkpointer=InMemorySaver())
async def async_workflow(numbers: list[int]) -> list[int]:
    futures = [add_one(n) for n in numbers]
    results = await asyncio.gather(*futures)
    return results


# Call the entrypoint
config = {"configurable": {"thread_id": str(uuid.uuid4())}}

start_ts = time.perf_counter()
result = await async_workflow.ainvoke([1, 2, 3, 4, 5], config)
print(f"result: {result}")
print(f"Done in {time.perf_counter() - start_ts:.3f}s")

result: [2, 3, 4, 5, 6]
Done in 2.007s


In [43]:
# LangGraph handles parallel execution for both sync & async invoke