<a href="https://colab.research.google.com/github/cmhrabi/AgenticDesignPatterns/blob/main/notebooks/Chapter_3_Parallelization_(LangChain_Code_Example).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
pip install langchain langchain-community langchain-openai langgraph

Note: you may need to restart the kernel to use updated packages.


### Model Init

In [3]:
import os
import asyncio
from typing import Optional

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import Runnable, RunnableParallel, RunnablePassthrough, RunnableBranch

# --- Configuration ---
# Ensure your API key environment variable is set (e.g., OPENAI_API_KEY)
try:
    llm: Optional[ChatOpenAI] = ChatOpenAI(
        api_key=os.getenv("OPENROUTER_API_KEY"),
        base_url=os.getenv("OPENROUTER_BASE_URL"),
        model="anthropic/claude-sonnet-4",
        temperature=0
    )
    if llm:
        print(f"Language model initialized: {llm.model_name}")
except Exception as e:
    print(f"Error initializing language model: {e}")
    llm = None

  from pydantic.v1.fields import FieldInfo as FieldInfoV1


Language model initialized: anthropic/claude-sonnet-4


### Chains

In [4]:

# --- Define Independent Chains ---
# These three chains represent distinct tasks that can be executed in parallel.
summarize_chain: Runnable = (
    ChatPromptTemplate.from_messages([
        ("system", "Summarize the following topic concisely:"),
        ("user", "{topic}")
    ])
    | llm
    | StrOutputParser()
)

questions_chain: Runnable = (
    ChatPromptTemplate.from_messages([
        ("system", "Generate three interesting questions about the following topic:"),
        ("user", "{topic}")
    ])
    | llm
    | StrOutputParser()
)

terms_chain: Runnable = (
    ChatPromptTemplate.from_messages([
        ("system", "Identify 5-10 key terms from the following topic, separated by commas:"),
        ("user", "{topic}")
    ])
    | llm
    | StrOutputParser()
)

random_chain: Runnable = (
    ChatPromptTemplate.from_messages([
        ("system", "Provide a random fun fact about the following topic:"),
        ("user", "{topic}")
    ])
    | llm
    | StrOutputParser()
)

# 2. Define the final synthesis prompt which will combine the parallel results.
synthesis_prompt = ChatPromptTemplate.from_messages([
    ("system", """Based on the following information:
     Summary: {summary}
     Related Questions: {questions}
     Key Terms: {key_terms}
     Random Fact: {random_fact}
     Synthesize a comprehensive answer."""),
    ("user", "Original topic: {topic}")
])

# 1. Define the block of tasks to run in parallel. The results of these,
#    along with the original topic, will be fed into the next step.
map_chain = RunnableParallel(
    {
        "summary": summarize_chain,
        "questions": questions_chain,
        "key_terms": terms_chain,
        "random_fact": random_chain,
        "topic": RunnablePassthrough(),  # Pass the original topic through
    }
)


### Handlers

In [5]:
def unclear_handler(x) -> str:
    """Handles requests that couldn't be delegated."""
    print("\n--- HANDLING UNCLEAR REQUEST ---")
    return f"Request must be about space exploration: '{x['topic']}'. Please clarify."

async def space_exploration_handler(x) -> str:
    """Handles space exploration related topics."""
    print("\n--- HANDLING SPACE EXPLORATION REQUEST ---")
    return await full_parallel_chain.ainvoke(x['topic'])
# --- Build the Parallel + Synthesis Chain ---
full_parallel_chain = map_chain | synthesis_prompt | llm | StrOutputParser()

### Coordinator

In [10]:
coordinator_router_prompt = ChatPromptTemplate.from_messages([
    ("system", """Analyze the user's request and determine which specialist handler should process it.
     - If the request is related to space exploration, output 'space_exploration'.
     - If the request is unclear or doesn't fit either category, output 'unclear'.
     ONLY output one word: 'space_exploration' or 'unclear'."""),
    ("user", "{request}")
])

branches = {
    "space_exploration": RunnablePassthrough.assign(output=space_exploration_handler),
    "unclear": RunnablePassthrough.assign(output=unclear_handler),
}

delegation_branch = RunnableBranch(
    (lambda x: x['decision'].strip() == 'space_exploration', branches["space_exploration"]), # Added .strip()
    branches["unclear"] 
)

coordinator_chain = {
    "decision": coordinator_router_prompt | llm | StrOutputParser(),
    "topic": RunnablePassthrough()
} | delegation_branch | (lambda x: x['output'])

### Reflection

In [None]:
reflector_prompt = ChatPromptTemplate.from_messages([
    ("system", """You are a critic that provides constructive feedback on written content. 
    Analyze the content for:
    - Clarity and coherence
    - Accuracy and completeness
    - Grammar and style
    - Areas for improvement
    
    Provide specific, actionable feedback."""),
    ("human", "Original task: {task}\n\nGenerated content:\n{content}\n\nProvide your critique:")
])

# Define the revision prompt
revision_prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a helpful assistant that improves content based on feedback."),
    ("human", """Original task: {task}

Initial content:
{content}

Feedback received:
{feedback}

Please revise the content addressing the feedback.""")
])

In [22]:
# --- Run the Chain ---
async def run_parallel_with_reflection_example(topic: str, max_iterations: int=2) -> None:
    if not llm:
        print("LLM not initialized. Cannot run example.")
        return

    
    try:
        print(f"\n--- Running Parallel LangChain Example for Topic: '{topic}' ---")
        print("=" * 50)
        print("GENERATION")
        print("=" * 50)
        initial_response = await coordinator_chain.ainvoke(topic)
        print(f"\nInitial content:\n{initial_response}\n")

        for i in range(max_iterations):
            print("=" * 50)
            print(f"REFLECTION CYCLE {i + 1}")
            print("=" * 50)
            
            # Step 2: Get critique
            reflector_chain = reflector_prompt | llm | StrOutputParser()
            critique_response = reflector_chain.invoke({
                "task": topic,
                "content": initial_response
            })
            print(f"\nCritique:\n{critique_response}\n")
            
            # Step 3: Revise based on feedback
            revision_chain = revision_prompt | llm | StrOutputParser()
            revision_response = revision_chain.invoke({
                "task": topic,
                "content": initial_response,
                "feedback": critique_response
            })
            initial_response = revision_response
            print(f"\nRevised content:\n{revision_response}\n")
    except Exception as e:
        print(f"Error during chain execution: {e}")

if __name__ == "__main__":
    test_topic = "history of space exploration"
    # In Python 3.7+, asyncio.run is the standard way to run an async function.
    await run_parallel_with_reflection_example(test_topic)


--- Running Parallel LangChain Example for Topic: 'history of space exploration' ---
GENERATION

--- HANDLING SPACE EXPLORATION REQUEST ---

Initial content:
# The History of Space Exploration: From Cold War Competition to Commercial Innovation

The history of space exploration represents one of humanity's greatest technological and scientific achievements, evolving from Cold War rivalry to international collaboration and commercial innovation over the span of nearly eight decades.

## The Dawn of the Space Age (1940s-1960s)

Space exploration began in earnest during the late 1940s with early experiments using captured German V-2 rockets. Interestingly, the very first space travelers weren't humans but fruit flies, launched in 1947 to test radiation effects at high altitudes—proving that living organisms could survive spaceflight.

The space age truly began with the Soviet Union's launch of **Sputnik 1** in 1957, the first artificial satellite to orbit Earth. This basketball-sized sph