# AI News LangGraph Workflow Visualization

This notebook visualizes and explains the LangGraph workflow for the AI News generation system following the metamorphosis pattern.

## 1. System Architecture Overview

The system follows the **Metamorphosis Pattern** with these key components:

- **WorkflowExecutor**: Centralized orchestration
- **WorkflowNodes**: Modular node implementations
- **DirectToolRegistry**: Direct tool access without routing
- **PromptRegistry**: Centralized prompt management
- **State Management**: Pydantic-based type-safe state

In [None]:
# Import required libraries for visualization
import sys
from pathlib import Path
sys.path.insert(0, str(Path.cwd()))

# For visualization
from IPython.display import Image, display, HTML, Markdown
import json

## 2. Main Workflow Graph

The LangGraph workflow consists of the following nodes and edges:

In [None]:
# Define the workflow structure
workflow_structure = {
    "nodes": [
        {"id": "initialize", "label": "Initialize Workflow", "type": "start"},
        {"id": "fetch_news", "label": "Fetch News", "type": "process"},
        {"id": "summarize_topic", "label": "Summarize Topic", "type": "process"},
        {"id": "review_quality", "label": "Review Quality", "type": "review"},
        {"id": "generate_newsletter", "label": "Generate Newsletter", "type": "output"},
        {"id": "end", "label": "END", "type": "end"}
    ],
    "edges": [
        {"from": "initialize", "to": "fetch_news", "type": "direct"},
        {"from": "fetch_news", "to": "summarize_topic", "type": "direct"},
        {"from": "summarize_topic", "to": "fetch_news", "type": "conditional", "condition": "more_topics"},
        {"from": "summarize_topic", "to": "review_quality", "type": "conditional", "condition": "all_topics_done"},
        {"from": "review_quality", "to": "generate_newsletter", "type": "direct"},
        {"from": "generate_newsletter", "to": "end", "type": "direct"}
    ]
}

print("Workflow Nodes:")
for node in workflow_structure["nodes"]:
    print(f"  • {node['label']} ({node['type']})")

## 3. Visual Workflow Diagram

Here's a visual representation of the workflow using Mermaid:

In [None]:
# Create Mermaid diagram
mermaid_diagram = """
```mermaid
graph TD
    Start([Start]) --> Init[Initialize Workflow]
    Init --> FetchNews[Fetch News for Topic]
    FetchNews --> Summarize[Summarize Topic]
    Summarize --> Decision{More Topics?}
    Decision -->|Yes| FetchNews
    Decision -->|No| Review[Review Quality]
    Review --> Generate[Generate Newsletter]
    Generate --> End([End])

    style Start fill:#90EE90
    style End fill:#FFB6C1
    style Init fill:#87CEEB
    style FetchNews fill:#DDA0DD
    style Summarize fill:#F0E68C
    style Review fill:#FFA07A
    style Generate fill:#98FB98
    style Decision fill:#FFE4B5
```
"""

display(Markdown(mermaid_diagram))

## 4. Detailed Node Descriptions

In [None]:
# Node details with their responsibilities
node_details = {
    "Initialize Workflow": {
        "function": "initialize_workflow",
        "purpose": "Set up workflow state and load configuration",
        "inputs": ["topics_path", "main_topic"],
        "outputs": ["topics_config", "initialized state"],
        "agent": "None (setup only)"
    },
    "Fetch News": {
        "function": "fetch_news_for_topic",
        "purpose": "Search and fetch news articles for current topic",
        "inputs": ["current_topic", "search_query"],
        "outputs": ["topic_results", "articles"],
        "agent": "Research Agent",
        "tools": ["NewsSearchTool", "analyze_relevance"]
    },
    "Summarize Topic": {
        "function": "summarize_topic",
        "purpose": "Create comprehensive summary of fetched articles",
        "inputs": ["topic_results", "articles"],
        "outputs": ["topic_summaries", "key_findings"],
        "agent": "Editor Agent",
        "tools": ["extract_key_points", "summarize_text"]
    },
    "Review Quality": {
        "function": "review_quality",
        "purpose": "Assess quality of summaries (self-reviewer pattern)",
        "inputs": ["topic_summaries"],
        "outputs": ["quality_scores", "feedback"],
        "agent": "Self-Reviewer Agent",
        "tools": ["evaluate_review_text"]
    },
    "Generate Newsletter": {
        "function": "generate_newsletter",
        "purpose": "Create final HTML/Markdown newsletter",
        "inputs": ["topic_summaries", "executive_summary"],
        "outputs": ["newsletter_html", "newsletter_markdown"],
        "agent": "Chief Editor Agent",
        "tools": ["FileManager"]
    }
}

for node_name, details in node_details.items():
    print(f"\n📌 {node_name}")
    print(f"   Function: {details['function']}")
    print(f"   Purpose: {details['purpose']}")
    print(f"   Agent: {details.get('agent', 'N/A')}")
    if 'tools' in details:
        print(f"   Tools: {', '.join(details['tools'])}")

## 5. State Flow Through the Workflow

In [None]:
# Demonstrate state evolution
state_evolution = [
    {
        "stage": "Initial",
        "current_stage": "initialized",
        "topic_index": 0,
        "data": {"topics_config": "loaded", "topic_results": [], "summaries": []}
    },
    {
        "stage": "After Fetch (Topic 1)",
        "current_stage": "fetching",
        "topic_index": 0,
        "data": {"topic_results": ["Topic 1 articles"], "summaries": []}
    },
    {
        "stage": "After Summarize (Topic 1)",
        "current_stage": "summarizing",
        "topic_index": 1,
        "data": {"topic_results": ["Topic 1 articles"], "summaries": ["Topic 1 summary"]}
    },
    {
        "stage": "After All Topics",
        "current_stage": "reviewing",
        "topic_index": 3,
        "data": {"topic_results": ["All articles"], "summaries": ["All summaries"], "quality_scores": "calculated"}
    },
    {
        "stage": "Final",
        "current_stage": "completed",
        "topic_index": 3,
        "data": {"newsletter": "generated", "outputs": {"html": "path", "markdown": "path"}}
    }
]

print("State Evolution Through Workflow:")
print("=" * 60)
for state in state_evolution:
    print(f"\n🔄 {state['stage']}")
    print(f"   Stage: {state['current_stage']}")
    print(f"   Topic Index: {state['topic_index']}")
    print(f"   State Data: {json.dumps(state['data'], indent=6)}")

## 6. Conditional Routing Logic

In [None]:
# Demonstrate the conditional routing
def should_continue_topics(current_index: int, total_topics: int) -> str:
    """
    Determine whether to continue fetching topics or move to review.
    This is the conditional edge logic in the graph.
    """
    if current_index < total_topics:
        return "fetch_more"  # Go back to fetch_news node
    else:
        return "review"      # Move to review_quality node

# Test the routing logic
test_cases = [
    (0, 3, "Processing topic 1 of 3"),
    (1, 3, "Processing topic 2 of 3"),
    (2, 3, "Processing topic 3 of 3"),
    (3, 3, "All topics complete, moving to review")
]

print("Conditional Routing Logic:")
print("=" * 40)
for current, total, description in test_cases:
    result = should_continue_topics(current, total)
    emoji = "🔄" if result == "fetch_more" else "✅"
    print(f"{emoji} Index {current}/{total}: {description}")
    print(f"   → Next: {result}")
    print()

## 7. Direct Tool Access Pattern

In [None]:
# Visualize the direct tool access pattern
direct_tools_diagram = """
```mermaid
graph LR
    subgraph "Traditional Pattern"
        A1[Agent] --> R1[Router]
        R1 --> T1[Tool]
        T1 --> R1
        R1 --> A1
    end
    
    subgraph "Metamorphosis Pattern (Direct Access)"
        A2[Agent] --> T2[Tool Registry]
        T2 --> T2A[Tool A]
        T2 --> T2B[Tool B]
        T2 --> T2C[Tool C]
        T2A --> A2
        T2B --> A2
        T2C --> A2
    end
    
    style A1 fill:#FFB6C1
    style A2 fill:#90EE90
```
"""

display(Markdown("### Direct Tool Access vs Traditional Routing"))
display(Markdown(direct_tools_diagram))

# List available tools
tools = [
    {"name": "search_news", "category": "research", "description": "Search for news articles"},
    {"name": "extract_key_points", "category": "processing", "description": "Extract key points from text"},
    {"name": "evaluate_review_text", "category": "langgraph", "description": "Evaluate text quality"},
    {"name": "extract_achievements", "category": "langgraph", "description": "Extract achievements from content"},
    {"name": "save_html", "category": "file_management", "description": "Save HTML content"},
    {"name": "save_json", "category": "file_management", "description": "Save JSON data"}
]

print("\n📦 Direct Access Tools:")
for tool in tools:
    print(f"  • {tool['name']:25} [{tool['category']:15}] - {tool['description']}")

## 8. Execution Flow with Real Components

In [None]:
# Show how to use the actual workflow
code_example = '''
# Initialize the workflow
from src.ai_news_langgraph.graph_v2 import AINewsWorkflow

# Create workflow instance
workflow = AINewsWorkflow(
    use_parallel=False,      # Sequential processing
    checkpoint_type="memory", # Memory-based checkpointing
    enable_streaming=False    # No real-time updates
)

# Run the workflow
result = workflow.run(
    main_topic="AI in Cancer Care",
    topics_path="config/topics.yaml"
)

# Check results
print(f"Status: {result['status']}")
print(f"Outputs: {result['outputs']}")
print(f"Metrics: {result['metrics']}")
'''

print("Example: Running the Workflow")
print("=" * 40)
print(code_example)

## 9. Parallel Execution Pattern

In [None]:
# Visualize parallel vs sequential execution
parallel_diagram = """
```mermaid
graph TD
    subgraph "Sequential Execution"
        S1[Topic 1] --> S2[Topic 2]
        S2 --> S3[Topic 3]
        S3 --> SR[Review All]
    end
    
    subgraph "Parallel Execution"
        P0[Start] --> P1[Topic 1]
        P0 --> P2[Topic 2]
        P0 --> P3[Topic 3]
        P1 --> PM[Merge Results]
        P2 --> PM
        P3 --> PM
        PM --> PR[Review All]
    end
    
    style S1 fill:#FFB6C1
    style P1 fill:#90EE90
    style P2 fill:#90EE90
    style P3 fill:#90EE90
```
"""

display(Markdown("### Sequential vs Parallel Topic Processing"))
display(Markdown(parallel_diagram))

# Performance comparison
performance_comparison = {
    "Sequential": {
        "Topics": 5,
        "Time per topic": "3 seconds",
        "Total time": "15 seconds",
        "Advantages": ["Simple debugging", "Lower memory usage", "Predictable order"],
        "Disadvantages": ["Slower overall", "No parallelism"]
    },
    "Parallel": {
        "Topics": 5,
        "Time per topic": "3 seconds",
        "Total time": "~5 seconds (with 3 workers)",
        "Advantages": ["Faster execution", "Better resource utilization"],
        "Disadvantages": ["Complex debugging", "Higher memory usage", "Potential race conditions"]
    }
}

print("\n⚡ Performance Comparison:")
for mode, details in performance_comparison.items():
    print(f"\n{mode} Mode:")
    print(f"  • Total Time: {details['Total time']}")
    print(f"  • Advantages: {', '.join(details['Advantages'])}")
    print(f"  • Disadvantages: {', '.join(details['Disadvantages'])}")

## 10. Error Handling and Recovery

In [None]:
# Error handling flow
error_flow = """
```mermaid
graph TD
    Node[Node Execution] --> Check{Error?}
    Check -->|No| Success[Continue Workflow]
    Check -->|Yes| Retry{Retry Available?}
    Retry -->|Yes| RetryNode[Retry Node]
    Retry -->|No| LogError[Log Error]
    RetryNode --> Node
    LogError --> Recovery{Recovery Possible?}
    Recovery -->|Yes| Continue[Continue with Partial Data]
    Recovery -->|No| Fail[Mark Workflow Failed]
    
    style Check fill:#FFE4B5
    style Success fill:#90EE90
    style Fail fill:#FFB6C1
```
"""

display(Markdown("### Error Handling Flow"))
display(Markdown(error_flow))

# Error scenarios and handling
error_scenarios = [
    {
        "scenario": "News API Failure",
        "node": "fetch_news",
        "handling": "Retry 3 times with exponential backoff",
        "recovery": "Skip topic if all retries fail"
    },
    {
        "scenario": "LLM Timeout",
        "node": "summarize_topic",
        "handling": "Retry with shorter input",
        "recovery": "Use basic summary if LLM fails"
    },
    {
        "scenario": "File Save Error",
        "node": "generate_newsletter",
        "handling": "Try alternative directory",
        "recovery": "Return content without saving"
    }
]

print("\n🚨 Error Handling Scenarios:")
for scenario in error_scenarios:
    print(f"\n• {scenario['scenario']}")
    print(f"  Node: {scenario['node']}")
    print(f"  Handling: {scenario['handling']}")
    print(f"  Recovery: {scenario['recovery']}")

## 11. Workflow Metrics and Monitoring

In [None]:
# Sample metrics from a workflow run
sample_metrics = {
    "total_articles": 45,
    "topics_processed": 3,
    "success_rate": 0.92,
    "duration_seconds": 28.5,
    "errors_count": 1,
    "warnings_count": 2,
    "node_timings": {
        "initialize": 0.5,
        "fetch_news": 15.2,
        "summarize_topic": 8.3,
        "review_quality": 2.1,
        "generate_newsletter": 2.4
    },
    "quality_scores": {
        "Topic 1": 0.85,
        "Topic 2": 0.92,
        "Topic 3": 0.78
    }
}

print("📊 Workflow Metrics Example:")
print("=" * 40)
print(f"Total Articles Fetched: {sample_metrics['total_articles']}")
print(f"Topics Processed: {sample_metrics['topics_processed']}")
print(f"Success Rate: {sample_metrics['success_rate']:.1%}")
print(f"Total Duration: {sample_metrics['duration_seconds']:.1f} seconds")
print(f"\nNode Execution Times:")
for node, time in sample_metrics['node_timings'].items():
    bar = "█" * int(time * 2)
    print(f"  {node:20} {bar} {time:.1f}s")
print(f"\nQuality Scores:")
for topic, score in sample_metrics['quality_scores'].items():
    bar = "⭐" * int(score * 5)
    print(f"  {topic}: {bar} ({score:.2f})")

## 12. Summary and Key Takeaways

### 🎯 Key Features of This LangGraph Implementation:

1. **Metamorphosis Pattern**: Direct tool access, self-reviewing, thread-based management
2. **Modular Design**: Separate nodes, tools, prompts, and state management
3. **Type Safety**: Pydantic models throughout
4. **Scalability**: Support for parallel processing
5. **Resilience**: Comprehensive error handling and recovery
6. **Observability**: Detailed metrics and logging

### 🔄 Workflow Summary:

1. **Initialize** → Load configuration and set up state
2. **Fetch Loop** → For each topic, fetch relevant news articles
3. **Summarize Loop** → Create summaries for each topic
4. **Quality Review** → Self-review all summaries
5. **Generate Output** → Create final newsletter in HTML/Markdown

### 📝 Usage Example:

```python
from src.ai_news_langgraph.graph_v2 import AINewsWorkflow

workflow = AINewsWorkflow(use_parallel=True)
result = workflow.run("AI in Cancer Care")
```

In [None]:
# Final visualization of the complete system
complete_system = """
```mermaid
graph TB
    subgraph "Input Layer"
        Config[Configuration]
        Topics[Topics JSON/YAML]
    end
    
    subgraph "Processing Layer"
        WE[WorkflowExecutor]
        WN[WorkflowNodes]
        TR[ToolRegistry]
        PR[PromptRegistry]
    end
    
    subgraph "LangGraph Core"
        Graph[StateGraph]
        Nodes[Nodes]
        Edges[Edges]
        State[WorkflowState]
    end
    
    subgraph "Output Layer"
        HTML[HTML Newsletter]
        MD[Markdown Report]
        JSON[JSON Data]
    end
    
    Config --> WE
    Topics --> WE
    WE --> Graph
    Graph --> WN
    WN --> TR
    WN --> PR
    Graph --> State
    State --> HTML
    State --> MD
    State --> JSON
    
    style Config fill:#E6E6FA
    style WE fill:#98FB98
    style Graph fill:#87CEEB
    style HTML fill:#FFE4B5
```
"""

display(Markdown("### Complete System Architecture"))
display(Markdown(complete_system))

print("\n✨ Workflow Visualization Complete!")