In [1]:
# SequentialChain 支持多个链路的顺序执行
from langchain.chains import LLMChain
from langchain.chat_models import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain.chains import SequentialChain
from src.llm.llm_factory import LLMFactory

llm = LLMFactory.create_llm()

# chain1 任务：翻译成中文
first_prompt = ChatPromptTemplate.from_template(
    "把下面内容翻译成中文：\n\n{content}"
)
chain_one = LLMChain(
    llm=llm,
    prompt=first_prompt,
    verbose=True,
    output_key="Chinese_Review",
)
# chain 2 任务：对翻译后的中文进行总结摘要 input_key是上一个chain的output_key
second_prompt = ChatPromptTemplate.from_template('用一句话总结下面内容:\n\n{Chinese_Review}')
chain_two = LLMChain(
    llm=llm,
    prompt=second_prompt,
    verbose=True,
    output_key="Chinese_Summary",
)

# chain 3 任务：智能识别语言 input_key是上一个chain的output_key
third_prompt = ChatPromptTemplate.from_template('下面内容是什么语言:\n\n{Chinese_Summary}')
chain_three = LLMChain(
    llm=llm,
    prompt=third_prompt,
    verbose=True,
    output_key="Language",
)

# chain 4 任务：针对摘要使用指定语言进行评论 input_key是上一个chain的output_key
fourth_prompt = ChatPromptTemplate.from_template("请使用指定的语言对以下内容进行回复:\n\n内容:{Chinese_Summary}\n\n语言:{Language}")
chain_four = LLMChain(
    llm=llm,
    prompt=fourth_prompt,
    verbose=True,
    output_key="Reply",
)

# overall 任务：翻译成中文->对翻译后的中文进行总结摘要->智能识别语言->针对摘要使用指定语言进行评论
overall_chain = SequentialChain(
    chains=[chain_one, chain_two, chain_three, chain_four],
    verbose=True,
    input_variables=["content"],
    output_variables=["Chinese_Review", "Chinese_Summary", "Language", "Reply"],
)

content = """
Apache Spark is a unified analytics engine for large-scale data processing. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, pandas API on Spark for pandas workloads, MLlib for machine learning, GraphX for graph processing, and Structured Streaming for incremental computation and stream processing.
"""

overall_chain(content)


  chain_one = LLMChain(
  overall_chain(content)




[1m> Entering new SequentialChain chain...[0m


[1m> Entering new LLMChain chain...[0m
Prompt after formatting:
[32;1m[1;3mHuman: 把下面内容翻译成中文：


Apache Spark is a unified analytics engine for large-scale data processing. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, pandas API on Spark for pandas workloads, MLlib for machine learning, GraphX for graph processing, and Structured Streaming for incremental computation and stream processing.
[0m

[1m> Finished chain.[0m


[1m> Entering new LLMChain chain...[0m
Prompt after formatting:
[32;1m[1;3mHuman: 用一句话总结下面内容:

Apache Spark 是一款用于大规模数据处理的统一分析引擎。它提供Java、Scala、Python和R的高级API，以及支持通用执行图的优化引擎。该引擎还包含一系列丰富的上层工具：支持SQL与结构化数据处理的Spark SQL、面向pandas工作负载的Spark版pandas API、机器学习库MLlib、图计算工具GraphX，以及用于增量计算与流处理的Structured Streaming组件。

（翻译说明：
1. 采用技术文档专业用语，如"

{'content': '\nApache Spark is a unified analytics engine for large-scale data processing. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, pandas API on Spark for pandas workloads, MLlib for machine learning, GraphX for graph processing, and Structured Streaming for incremental computation and stream processing.\n',
 'Chinese_Review': 'Apache Spark 是一款用于大规模数据处理的统一分析引擎。它提供Java、Scala、Python和R的高级API，以及支持通用执行图的优化引擎。该引擎还包含一系列丰富的上层工具：支持SQL与结构化数据处理的Spark SQL、面向pandas工作负载的Spark版pandas API、机器学习库MLlib、图计算工具GraphX，以及用于增量计算与流处理的Structured Streaming组件。\n\n（翻译说明：\n1. 采用技术文档专业用语，如"unified analytics engine"译为"统一分析引擎"\n2. 保留专业术语原名+中文解释的格式，如"pandas API on Spark"处理为"Spark版pandas API"\n3. 将英文长句拆分为符合中文阅读习惯的短句结构\n4. 使用"该引擎"替代重复的主语，保持行文流畅\n5. 技术组件名称如MLlib/GraphX等保留原名不翻译，符合行业惯例\n6. "incremental computation"译为专业术语"增量计算"而非字面翻译）',


# 路由链调用

In [6]:
from langchain.prompts import PromptTemplate

# 物理链
physics_template = """您是一位非常聪明的物理教授。
您擅长以简洁易懂的方式回答物理问题。
当您不知道问题答案的时候，您会坦率承认不知道。
下面是一个问题：
{input}"""
physics_prompt = PromptTemplate.from_template(physics_template)

# 数学链
math_template = """您是一位非常优秀的数学教授。
您擅长回答数学问题。
您之所以如此优秀，是因为您能够将困难问题分解成组成的部分，回答这些部分，然后将它们组合起来，回答更广泛的问题。
下面是一个数学问题：
{input}"""
math_prompt = PromptTemplate.from_template(math_template)
from langchain.chains import ConversationChain
from langchain.chains import LLMChain
prompt_infos = [
    {
        "name": "physics",
        "description": "擅长回答物理问题",
        "prompt_template": physics_template,
    },
    {
        "name": "math",
        "description": "擅长回答数学问题",
        "prompt_template": math_template,
    }
]

destination_chains = {}
for p_info in prompt_infos:
    name = p_info["name"]
    prompt_template = p_info["prompt_template"]
    prompt = PromptTemplate(
        template=prompt_template,
        input_variables=["input"]
    )
    chain = LLMChain(
        llm=llm,
        prompt=prompt,
    )
    destination_chains[name] = chain
    
default_chain = ConversationChain(
    llm=llm,
    output_key="text"
)


In [11]:
from langchain.chains.router.llm_router import LLMRouterChain, RouterOutputParser
from langchain.chains.router.multi_prompt_prompt import MULTI_PROMPT_ROUTER_TEMPLATE
from langchain.chains.router import MultiPromptChain

destinations = [f"{p['name']}:{p['description']}" for p in prompt_infos]
destinations_str = "\n".join(destinations)
router_template = MULTI_PROMPT_ROUTER_TEMPLATE.format(destinations=destinations_str)
router_prompt = PromptTemplate(
    template=router_template,
    input_variables=["input"],
    output_parser=RouterOutputParser()
)
router_chain = LLMRouterChain.from_llm(
    llm,
    router_prompt
)


chain = MultiPromptChain(
    router_chain=router_chain,
    destination_chains=destination_chains,
    default_chain=default_chain,
    verbose=True
)


physics:擅长回答物理问题
math:擅长回答数学问题
input_variables=['input'] input_types={} output_parser=RouterOutputParser() partial_variables={} template='Given a raw text input to a language model select the model prompt best suited for the input. You will be given the names of the available prompts and a description of what the prompt is best suited for. You may also revise the original input if you think that revising it will ultimately lead to a better response from the language model.\n\n<< FORMATTING >>\nReturn a markdown code snippet with a JSON object formatted to look like:\n```json\n{{\n    "destination": string \\ name of the prompt to use or "DEFAULT"\n    "next_inputs": string \\ a potentially modified version of the original input\n}}\n```\n\nREMEMBER: "destination" MUST be one of the candidate prompt names specified below OR it can be "DEFAULT" if the input is not well suited for any of the candidate prompts.\nREMEMBER: "next_inputs" can just be the original input if you don\'t think any

In [12]:
chain.run({"input": "牛顿第一定律是什么？"})



[1m> Entering new MultiPromptChain chain...[0m
physics: {'input': '牛顿第一定律是什么？'}
[1m> Finished chain.[0m


'牛顿第一定律，也称为**惯性定律**，其核心内容可简洁概括为：\n\n> **“若无外力作用，静止的物体保持静止，运动的物体保持匀速直线运动。”**\n\n### 关键点：\n1. **惯性**：物体维持当前运动状态（静止或匀速运动）的内在属性。\n2. **外力是改变运动的原因**：只有外力才能改变物体的速度（大小或方向）。\n3. **理想情况**：现实中不存在完全无外力的环境，但该定律揭示了运动的本质。\n\n### 例子：\n- 急刹车时，乘客因惯性继续前倾（原运动状态试图保持）。\n- 太空中的物体近似无外力，会以恒定速度运动。\n\n这是牛顿力学的基础，后续第二、第三定律在此框架上展开。'

# Transform Chain

In [14]:
from langchain.prompts import PromptTemplate

prompt = PromptTemplate.from_template(
    """对下面的文字进行总结：
    {output_text}
    总结: """
)

In [19]:
from langchain.chains.sequential import SimpleSequentialChain
from langchain.chains.llm import LLMChain
from langchain.chains.transform import TransformChain


def transform_func(inputs: dict) -> dict:
    text = inputs["text"]
    shortened_text = "\n\n".join(text.split("\n\n")[:2])
    return {"output_text": shortened_text}

# 文档转换链
transform_chain = TransformChain(
    input_variables=["text"],
    output_variables=["output_text"],
    transform=transform_func
)

template = """对下面的文字进行总结：
{output_text}

总结:"""
prompt = PromptTemplate(
    input_variables=["output_text"],
    template=template
)

llm_chain = LLMChain(
    llm = llm,
    prompt = prompt
)
# 使用顺序链，链接起来
sequential_chain = SimpleSequentialChain(
    chains = [transform_chain, llm_chain],
)


In [20]:
sequential_chain.run("""
Apache Spark is a unified analytics engine for large-scale data processing.
It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. 
It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, 
pandas API on Spark for pandas workloads, MLlib for machine learning, GraphX for graph processing, and Structured Streaming for incremental computation and stream processing.
""")

'Apache Spark是一个统一的大规模数据分析引擎，具有以下核心特点：  \n1. **多语言支持**：提供Java、Scala、Python和R的高级API，并优化了通用执行图的引擎性能。  \n2. **丰富的工具集**：  \n   - Spark SQL：支持SQL和结构化数据处理。  \n   - pandas API on Spark：兼容pandas工作负载。  \n   - MLlib：机器学习库。  \n   - GraphX：图数据处理。  \n   - Structured Streaming：增量计算与流处理。  \n\n总结：Spark是一个高性能、多功能的分布式计算平台，整合了批处理、流处理、机器学习等多种数据处理能力。'

# 自定义自己的链

In [None]:
# 远程加载别人的链
# pip install load_chain
from langchain.chains import load_chain
chain = load_chain("lc://chains/hello-world/chain.json")
chain.run("大象")

In [23]:
# 自定义链
from typing import List, Dict, Any, Optional
from langchain.callbacks.manager import (
    CallbackManagerForChainRun
)
from langchain.chains.base import Chain
from langchain.prompts.base import BasePromptTemplate
from langchain.base_language import BaseLanguageModel
from langchain.prompts import PromptTemplate

class wiki_article_chain(Chain):
    """开发一个wiki文章生成器"""
    prompt: BasePromptTemplate
    llm: BaseLanguageModel
    out_key: str = "text"

    @property
    def input_keys(self) -> List[str]:
        """将返回Prompt所需的所有键"""
        return self.prompt.input_variables
    @property
    def output_keys(self) -> List[str]:
        """将始终返回text键"""
        return [self.out_key]
    
    def _call(
            self,
            inputs: Dict[str, Any],
            run_manager: Optional[
                CallbackManagerForChainRun] = None,
    ) -> Dict[str, Any]:
        """运行链"""
        prompt_value = self.prompt.format_prompt(**inputs)
        print("prompt_value:", prompt_value)
        response = self.llm.generate_prompt(
            [prompt_value], callbacks=run_manager.get_child() if run_manager else None
        )
        print("response:", response)
        if run_manager:
            run_manager.on_text("wiki article is written")
        return {self.out_key: response.generations[0][0].text}
    @property
    def _chain_type(self) -> str:
        """链类型"""
        return "wiki_article_chain"

In [24]:
from langchain.chat_models import ChatOpenAI
from langchain.prompts import PromptTemplate

chain = wiki_article_chain(
    prompt=PromptTemplate(
        template="写一篇关于{topic}的维基百科形式的文章",
        input_variables=["topic"]
    ),
    llm=llm
)

In [25]:
result = chain.run({"topic": "人工智能"})
print(result)

prompt_value: text='写一篇关于人工智能的维基百科形式的文章'
response: generations=[[ChatGeneration(text='以下是一篇符合维基百科格式和风格的人工智能（AI）条目示例。请注意，实际维基百科条目会经过社区审核并可能包含更多引用来源：\n\n---\n\n**人工智能**  \n*（重定向自 AI）*  \n\n**人工智能**（Artificial Intelligence，缩写为 **AI**）是计算机科学的一个分支，旨在创建能够模拟人类智能的机器系统。这些系统通过学习、推理、问题解决、感知和语言理解等能力执行通常需要人类智慧的任务。AI的研究领域包括机器学习、自然语言处理、计算机视觉和机器人技术等。\n\n## 概述  \n人工智能的核心目标是开发能够自主执行复杂任务的系统。根据能力范围，AI可分为：  \n- **狭义人工智能（Narrow AI）**：专精于特定任务（如语音识别、图像分类）。  \n- **通用人工智能（AGI）**：具备人类水平的广泛认知能力（尚未实现）。  \n\n## 历史  \n1. **奠基期（1940s–1950s）**：图灵提出“机器能否思考”（图灵测试），达特茅斯会议（1956年）正式确立AI学科。  \n2. **低谷与复兴（1970s–1990s）**：专家系统兴起，算力限制导致“AI寒冬”；1997年IBM“深蓝”击败国际象棋冠军。  \n3. **现代发展（21世纪）**：大数据与深度学习推动突破，如AlphaGo（2016年）、ChatGPT（2022年）。  \n\n## 关键技术  \n- **机器学习**：通过数据训练模型（如神经网络）。  \n- **自然语言处理（NLP）**：使机器理解并生成人类语言。  \n- **计算机视觉**：识别和分析视觉信息。  \n- **强化学习**：通过试错优化决策（如机器人控制）。  \n\n## 应用领域  \n- **医疗**：疾病诊断（如AI辅助影像分析）。  \n- **交通**：自动驾驶技术。  \n- **金融**：欺诈检测与算法交易。  \n- **制造业**：预测性维护与自动化质检。  \n\n## 伦理与社会影响  \n- **争议**：算法偏见、隐私问题、自动化导致的就业冲击。  \n- *

# 四种文档处理链

In [None]:
# 直接将文档内容直接塞进prompt中， 让llm回答提供上下文资料， 适合小文档的场景
from langchain.chains.combine_documents.stuff import StuffDocumentsChain
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
from langchain.document_loaders import PyPDFLoader
from langchain.chat_models import ChatOpenAI

loader = PyPDFLoader("loader.pdf")
# print(loader.load())

prompt_template = """对以下文字做简洁的总结:
{text}
简洁的总结:"""
prompt = PromptTemplate.from_template(prompt_template)

llm_chain = LLMChain(llm, prompt)
stuff_chain = StuffDocumentsChain(
    llm_chain=llm_chain,
    document_variable_name="text"
)
docs = loader.load()
print(stuff_chain.run(docs))

In [None]:
# 使用预封装好的load_summarize_chain
from langchain.document_loaders import PyPDFLoader
from langchain.chat_models import ChatOpenAI
from langchain.chains.summarize import load_summarize_chain

loader = PyPDFLoader("loader.pdf")
docs = loader.load()
chain = load_summarize_chain(
    llm=llm,
    chain_type="stuff",
    verbose=True,
)

chain.run(docs)

# 循环引用文档，不断投喂llm，产生各种中间文档 适合逻辑有上下文关联的，

In [None]:
from langchain.prompts import PromptTemplate
from langchain.document_loaders import PyPDFLoader
from langchain.text_splitter import CharacterTextSplitter

# Load
loader = PyPDFLoader("loader.pdf")
docs = loader.load()

# Split
text_split = CharacterTextSplitter.from_tiktoken_encoder(
    chunk_size = 1000,
    chunk_overlap=0
)
split_docs = text_split.split_documents(docs)

prompt_template = """对以下文字做简洁的总结:
{text}
简洁的总结:"""
prompt = PromptTemplate.from_template(prompt_template)

refine_template = (
    "你的任务是产生最终摘要\n"
    "我们已经提供了一个到某个特定点的现有回答：\n"
    "{existing_answer}\n"
    "我们有机会通过下面的一些更多上下文来完善现有的回答(仅在需要时使用)。\n"
    "----------\n"
    "{text}\n"
    "----------\n"
    "根据新的上下文，用中文完善原始回答。\n"
    "如果上下文没有用处，返回原始回答。"
)
refine_prompt = PromptTemplate.from_template(refine_template)

chain = load_summarize_chain(
    llm=llm,
    chain_type="refine",
    question_prompt=prompt,
    refine_prompt=refine_prompt,
    return_intermediate_steps=True,
    input_key="documents",
    output_key="output_text",
)


In [None]:
result = chain({"documents": split_docs}, return_only_outputs=True)

In [None]:
print("\n\n".join(result["intermediate_steps"][: -3]))  # 查看中间步骤

# 使用MapReduce 来作为预制链
# 将文档拆成多个文档， 每个文档都走一遍llm，将结果压缩合并，最终再走一遍llm

In [None]:
text_splitter = CharacterTextSplitter.from_tiktoken_encoder(
    chunk_size=1000,
    chunk_overlap=0,
)

split_docs = text_splitter.split_documents(docs)
# print(split_docs)

# map chain
map_template = """对以下文字做简洁的总结:
"{content}"
简洁的总结:"""
map_prompt = PromptTemplate.from_template(map_template)
map_chain = LLMChain(
    llm=llm,
    prompt=map_prompt,
)
# reduce chain
reduce_template = """以下是一个摘要集合:
{doc_summaries}
将上述摘要与所有关键细节进行总结。
总结:"""
reduce_prompt = PromptTemplate.from_template(reduce_template)
reduce_chain = LLMChain(
    prompt=reduce_prompt,
    llm=llm,
)

stuff_chain = StuffDocumentsChain(
    llm_chain=reduce_chain,
    document_variable_name="doc_summaries",
)
from langchain.chains import ReduceDocumentsChain
from langchain.chains import MapReduceDocumentsChain

reduce_final_chain = ReduceDocumentsChain(
    combine_documents_chain=stuff_chain,
    collapse_documents_chain=stuff_chain,
    token_max=4000,
)

# map reduce chain
map_reduce_chain = MapReduceDocumentsChain(
    llm_chain=map_chain,
    document_variable_name="content",
    reduce_documents_chain=reduce_final_chain,
)

In [None]:
map_reduce_chain.run(split_docs)

# Map re-rank链 对每个文档运行初始提示，这不仅会尝试完成任务，还会对答案的确定性分数，返回得分最高的响应

In [27]:
from langchain.document_loaders import PyPDFLoader
from langchain.text_splitter import CharacterTextSplitter
from langchain.chains.qa_with_sources import load_qa_with_sources_chain
# load pdf
loader = PyPDFLoader("D:\\Download\\DeepSeek从入门到精通(20250204).pdf")
docs = loader.load()

# split text
text_splitter = CharacterTextSplitter.from_tiktoken_encoder(
    chunk_size=500,
    chunk_overlap=0,
)
split_docs = text_splitter.split_documents(docs)

chain = load_qa_with_sources_chain(
    llm=llm,
    chain_type="map_rerank",
    metadata_keys=["source"],
    return_intermediate_steps=True
)
print(chain)

verbose=False llm_chain=LLMChain(verbose=False, prompt=PromptTemplate(input_variables=['context', 'question'], input_types={}, output_parser=RegexParser(regex='(.*?)\\nScore: (\\d*)', output_keys=['answer', 'score']), partial_variables={}, template="Use the following pieces of context to answer the question at the end. If you don't know the answer, just say that you don't know, don't try to make up an answer.\n\nIn addition to giving an answer, also return a score of how fully it answered the user's question. This should be in the following format:\n\nQuestion: [question here]\nHelpful Answer: [answer here]\nScore: [score between 0 and 100]\n\nHow to determine the score:\n- Higher is a better answer\n- Better responds fully to the asked question, with sufficient level of detail\n- If you do not know the answer based on the context, that should be a score of 0\n- Don't be overconfident!\n\nExample #1\n\nContext:\n---------\nApples are red\n---------\nQuestion: what color are apples?\nHe

See also the following migration guides for replacements based on `chain_type`:
stuff: https://python.langchain.com/docs/versions/migrating_chains/stuff_docs_chain
map_reduce: https://python.langchain.com/docs/versions/migrating_chains/map_reduce_chain
refine: https://python.langchain.com/docs/versions/migrating_chains/refine_chain
map_rerank: https://python.langchain.com/docs/versions/migrating_chains/map_rerank_docs_chain

  chain = load_qa_with_sources_chain(


In [None]:
chain({"input_documents": split_docs, "question": "什么是深度学习？"}, return_only_outputs=True)