# LangGraph Functional API Workflow Demonstrations

This notebook demonstrates the key capabilities of LangGraph's Functional API approach, showing how it differs from traditional graph-based and event-driven architectures.

## Architecture Paradigm: Function-based with @entrypoint and @task decorators


In [None]:
# Function-based with @entrypoint and @task decorators
from langgraph.func import entrypoint, task

@task()
def process_data(input_data: str) -> dict:
    # Standard Python constructs - no graph thinking
    if input_data.startswith("urgent"):
        return {"priority": "high", "data": input_data}
    return {"priority": "normal", "data": input_data}

@entrypoint()
def main_workflow(user_input: str) -> dict:
    # Imperative programming style
    result = process_data(user_input).result()
    return result

print("🔧 Functional API workflow created - function-based approach with decorators")


## Control Flow: Standard Python constructs (if/for/while loops)


In [None]:
# Standard Python constructs (if/for/while loops)
@entrypoint()
def control_flow_workflow(data_list: list) -> list:
    processed_items = []
    
    for item in data_list:  # Standard Python for loop
        if item > 10:  # Standard Python if statement
            processed_items.append(item * 2)
        elif item < 0:
            continue  # Standard Python control flow
            
    return processed_items

print("🔧 Control flow workflow created - standard Python constructs, no graph thinking")


## Observability: State changes and variable changes within steps


In [None]:
# State changes and variable changes within steps
from datetime import datetime

@task()
def observable_task(input_data: dict) -> dict:
    # State changes are tracked automatically
    input_data["processed"] = True
    input_data["timestamp"] = datetime.now()
    
    # Variable changes within steps are observable
    counter = 0
    for item in input_data.get("items", []):
        counter += 1
        input_data[f"item_{counter}"] = item
        
    return input_data

print("🔧 Observable task created - state changes and variable tracking within steps")


## Checkpointing: Entrypoint-level checkpointing


In [None]:
# Entrypoint-level checkpointing
from langgraph.checkpoint.memory import MemorySaver

@entrypoint(checkpointer=MemorySaver())
def checkpointed_workflow(input_data: str, config: dict) -> str:
    # Checkpointing happens at entrypoint level
    step1_result = process_step1(input_data).result()
    
    # Resume after error by running with None and same thread_id
    step2_result = process_step2(step1_result).result()
    
    return step2_result

@task()
def process_step1(data: str) -> str:
    return f"processed_{data}"

@task()
def process_step2(data: str) -> str:
    return f"final_{data}"

print("🔧 Checkpointed workflow created - entrypoint-level checkpointing")


## Human-in-the-Loop: Capability available but with limitations


In [None]:
# Human-in-the-loop capability with limitations
@task()
def human_review_task(document: str) -> str:
    # Limited human interaction - requires external input mechanism
    print(f"Document for review: {document}")
    user_input = input("Approve? (y/n): ")
    
    if user_input.lower() == 'y':
        return "approved"
    return "rejected"

@entrypoint()
def human_workflow(document: str) -> str:
    result = human_review_task(document).result()
    return f"Document status: {result}"

print("🔧 Human-in-the-loop workflow created - limited but available human interaction")


## Node Parallelization: Same step execution with different data


In [None]:
# Same step can be executed in parallel with different data
import asyncio

@task()
async def parallel_task(item: str) -> str:
    # Same step with different data concurrently
    await asyncio.sleep(1)  # Simulate processing
    return f"processed_{item}"

@entrypoint()
async def parallel_workflow(items: list) -> list:
    # Execute same step in parallel with different data
    tasks = [parallel_task(item) for item in items]
    results = await asyncio.gather(*tasks)
    return results

print("🔧 Parallel workflow created - same step execution with different data concurrently")


## Workflow Composition: Function-based composition using standard Python async/await patterns


In [None]:
# Function-based composition using standard Python async/await
@task()
async def task_a(data: str) -> str:
    return f"task_a_{data}"

@task()
async def task_b(data: str) -> str:
    return f"task_b_{data}"

@entrypoint()
async def composed_workflow(input_data: str) -> dict:
    # Sequential composition
    result_a = await task_a(input_data).result()
    result_b = await task_b(result_a).result()
    
    # Parallel composition
    parallel_results = await asyncio.gather(
        task_a(input_data).result(),
        task_b(input_data).result()
    )
    
    return {"sequential": result_b, "parallel": parallel_results}

print("🔧 Composed workflow created - function-based composition with async/await patterns")


## Error Handling: Resume after error by running with None and same thread ID


In [None]:
# Resume after error by running with None and same thread ID
@entrypoint(checkpointer=MemorySaver())
def error_handling_workflow(input_data: str, config: dict) -> str:
    try:
        result = risky_operation(input_data).result()
        return result
    except Exception as e:
        # Resume after error by running with None and same thread_id
        print(f"Error occurred: {e}")
        # System automatically handles resume with same thread_id
        return "error_recovered"

@task()
def risky_operation(data: str) -> str:
    import random
    if random.random() < 0.5:  # 50% chance of failure
        raise Exception("Simulated error")
    return f"success_{data}"

print("🔧 Error handling workflow created - resume after error with same thread ID")


## Database Integration: PostgreSQL, Redis, and SQLite integrations available


In [None]:
# PostgreSQL, Redis, SQLite integrations available
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.checkpoint.redis import RedisSaver
from langgraph.checkpoint.sqlite import SqliteSaver

# PostgreSQL integration
@entrypoint(checkpointer=PostgresSaver.from_conn_string("postgresql://user:pass@localhost/db"))
def postgres_workflow(input_data: str) -> str:
    return f"stored_in_postgres_{input_data}"

# Redis integration  
@entrypoint(checkpointer=RedisSaver.from_conn_string("redis://localhost:6379"))
def redis_workflow(input_data: str) -> str:
    return f"stored_in_redis_{input_data}"

# SQLite integration
@entrypoint(checkpointer=SqliteSaver.from_conn_string("sqlite:///checkpoints.db"))
def sqlite_workflow(input_data: str) -> str:
    return f"stored_in_sqlite_{input_data}"

print("🔧 Database integration workflows created - PostgreSQL, Redis, SQLite support")


## Summary

This notebook demonstrates the key capabilities of LangGraph's Functional API:

- **Function-based architecture** with `@entrypoint` and `@task` decorators
- **Standard Python control flow** (if/for/while loops) without graph thinking
- **Step-level observability** with automatic state and variable tracking
- **Entrypoint-level checkpointing** with resume capabilities
- **Limited human-in-the-loop** integration
- **Parallel execution** of same steps with different data
- **Function-based composition** using async/await patterns
- **Error handling** with automatic resume mechanisms
- **Database integration** support for PostgreSQL, Redis, and SQLite

The Functional API provides a more imperative, Python-native approach compared to graph-based or event-driven architectures.
