# Chapter 3: Parallelization (代码示例)

本 notebook 演示了如何使用 LangChain 实现 Parallelization（并行化）模式。

## 前置要求

- Python 3.8+
- 阿里云 DashScope API Key（需要设置环境变量 `DASHSCOPE_API_KEY`）


## 1. 安装依赖

首先安装所需的 Python 包。


In [7]:
# 安装必要的依赖包（使用 %pip 在 Jupyter 中更推荐）
%pip install -q langchain-core>=0.1.0 langchain-community>=0.1.0 dashscope>=1.17.0 python-dotenv>=1.0.0 nest-asyncio


Note: you may need to restart the kernel to use updated packages.


## 2. 获取 API Key

如果环境变量或 .env 文件中没有设置 `DASHSCOPE_API_KEY`，可以在此处直接设置。


In [8]:
import os
from dotenv import load_dotenv
load_dotenv()

# 获取 API key（优先从环境变量读取，如果没有则从 .env 文件）
dashscope_api_key = os.getenv("DASHSCOPE_API_KEY")


## 3. 导入库并初始化模型


In [9]:
import asyncio
import nest_asyncio
from typing import Optional

from langchain_community.chat_models import ChatTongyi
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import Runnable, RunnableParallel, RunnablePassthrough

# 允许在 Jupyter notebook 中运行异步代码
nest_asyncio.apply()

# --- Configuration ---
# 初始化 Qwen 语言模型（使用阿里云 DashScope API）
# model 参数可以指定不同的 Qwen 模型，如 "qwen-flash", "qwen-turbo", "qwen-plus", "qwen-max" 等
try:
    llm: Optional[ChatTongyi] = ChatTongyi(
        model="qwen-flash",
        temperature=0.7,
        dashscope_api_key=dashscope_api_key
    )
    if llm:
        print(f"Language model initialized: qwen-flash")
except Exception as e:
    print(f"Error initializing language model: {e}")
    llm = None


# --- Define Independent Chains ---
# These three chains represent distinct tasks that can be executed in parallel.

summarize_chain: Runnable = (
    ChatPromptTemplate.from_messages([
        ("system", "Summarize the following topic concisely:"),
        ("user", "{topic}")
    ])
    | llm
    | StrOutputParser()
)

questions_chain: Runnable = (
    ChatPromptTemplate.from_messages([
        ("system", "Generate three interesting questions about the following topic:"),
        ("user", "{topic}")
    ])
    | llm
    | StrOutputParser()
)

terms_chain: Runnable = (
    ChatPromptTemplate.from_messages([
        ("system", "Identify 5-10 key terms from the following topic, separated by commas:"),
        ("user", "{topic}")
    ])
    | llm
    | StrOutputParser()
)


# --- Build the Parallel + Synthesis Chain ---

# 1. Define the block of tasks to run in parallel. The results of these,
#    along with the original topic, will be fed into the next step.
map_chain = RunnableParallel(
    {
        "summary": summarize_chain,
        "questions": questions_chain,
        "key_terms": terms_chain,
        "topic": RunnablePassthrough(),  # Pass the original topic through
    }
)

# 2. Define the final synthesis prompt which will combine the parallel results.
synthesis_prompt = ChatPromptTemplate.from_messages([
    ("system", """Based on the following information:
     Summary: {summary}
     Related Questions: {questions}
     Key Terms: {key_terms}
     Synthesize a comprehensive answer."""),
    ("user", "Original topic: {topic}")
])

# 3. Construct the full chain by piping the parallel results directly
#    into the synthesis prompt, followed by the LLM and output parser.
full_parallel_chain = map_chain | synthesis_prompt | llm | StrOutputParser()


# --- Run the Chain ---
async def run_parallel_example(topic: str) -> None:
    """
    Asynchronously invokes the parallel processing chain with a specific topic
    and prints the synthesized result.

    Args:
        topic: The input topic to be processed by the LangChain chains.
    """
    if not llm:
        print("LLM not initialized. Cannot run example.")
        return

    print(f"\n--- Running Parallel LangChain Example for Topic: '{topic}' ---")
    try:
        # The input to `ainvoke` is the single 'topic' string, which is
        # then passed to each runnable in the `map_chain`.
        response = await full_parallel_chain.ainvoke(topic)
        print("\n--- Final Response ---")
        print(response)
    except Exception as e:
        print(f"\nAn error occurred during chain execution: {e}")

# 运行示例
# 在 Jupyter notebook 中，可以直接使用 await run_parallel_example(test_topic)
# 或者使用 asyncio.run()（需要 nest_asyncio 支持）
test_topic = "The history of space exploration"
asyncio.run(run_parallel_example(test_topic))

Language model initialized: qwen-flash

--- Running Parallel LangChain Example for Topic: 'The history of space exploration' ---



--- Final Response ---
**The History of Space Exploration: From Theoretical Dreams to Global Collaboration**

The history of space exploration is a remarkable journey spanning over a century, transforming humanity’s understanding of the cosmos and redefining our place in the universe. What began as speculative science fiction and theoretical physics evolved into one of the most ambitious technological and scientific endeavors in human history—driven by curiosity, national pride, Cold War competition, and ultimately, global cooperation.

### **Foundations of Rocketry and Early Visionaries (1920s–1940s)**  
Space exploration's roots lie in the pioneering work of visionary scientists who laid the theoretical groundwork for rocketry. Russian scientist **Konstantin Tsiolkovsky** formulated the foundational equations of rocket motion and envisioned space stations and interplanetary travel. In the United States, **Robert Goddard** conducted the first successful liquid-fueled rocket launch in