In [1]:
import os
from uuid import UUID

from langchain_core.outputs import GenerationChunk, ChatGenerationChunk
from langchain_openai import ChatOpenAI
from langchain_core.prompts import (ChatPromptTemplate, PromptTemplate)
from langchain_core.output_parsers import StrOutputParser
from typing import Any, Optional, Union

# 1.编排Prompt
prompt = ChatPromptTemplate.from_template("{query}")

llm = ChatOpenAI(model="deepseek-chat", api_key=os.environ.get("DS_KEY"), base_url=os.environ.get("DS_API_BASE"))

parser = StrOutputParser()


class Chain:
    steps: list = []

    def __init__(self, steps):
        self.steps = steps

    def invoke(self, input: Any) -> Any:
        output: Any = input
        for step in self.steps:
            output = step.invoke(output)
            print(step)
            print("执行结果:", output)
            print("================")
        return output


chain = Chain([prompt, llm, parser])
print(chain.invoke({"query": "你好，你是？"}))


input_variables=['query'] input_types={} partial_variables={} messages=[HumanMessagePromptTemplate(prompt=PromptTemplate(input_variables=['query'], input_types={}, partial_variables={}, template='{query}'), additional_kwargs={})]
执行结果: messages=[HumanMessage(content='你好，你是？', additional_kwargs={}, response_metadata={})]
client=<openai.resources.chat.completions.completions.Completions object at 0x112a35bb0> async_client=<openai.resources.chat.completions.completions.AsyncCompletions object at 0x112a54200> root_client=<openai.OpenAI object at 0x107635610> root_async_client=<openai.AsyncOpenAI object at 0x112a35c10> model_name='deepseek-chat' model_kwargs={} openai_api_key=SecretStr('**********') openai_api_base='https://api.deepseek.com/v1'
执行结果: content='您好！我是由中国的深度求索（DeepSeek）公司开发的智能助手DeepSeek-V3。如您有任何任何问题，我会尽我所能为您提供帮助。' additional_kwargs={'refusal': None} response_metadata={'token_usage': {'completion_tokens': 37, 'prompt_tokens': 7, 'total_tokens': 44, 'completion_tokens_details': N

## 自定义Chain的处理

In [None]:
import os
from langchain_openai import ChatOpenAI
from langchain_core.prompts import (ChatPromptTemplate, PromptTemplate)
from langchain_core.output_parsers import StrOutputParser
from typing import Any
from langchain_core.runnables import RunnableParallel, RunnablePassthrough

# 1.编排Prompt
prompt = ChatPromptTemplate.from_template("{query}")

llm = ChatOpenAI(model="deepseek-chat", api_key=os.environ.get("DS_KEY"), base_url=os.environ.get("DS_API_BASE"))

parser = StrOutputParser()
chain = prompt | llm | parser

#等价于一下写法
# composed_chain_with_pip = (
#     RunnableParallel({"query": RunnablePassthrough()})
#     .pipe(prompt)
#     .pipe(llm)
#     .pipe(parser)
# )

print(chain.invoke({"query": "你好，你是?"}))


## RunnablePassthrough 传递数据

In [4]:
import os
from langchain_openai import ChatOpenAI
from langchain_core.prompts import (ChatPromptTemplate, PromptTemplate)
from langchain_core.output_parsers import StrOutputParser
from typing import Any
from langchain_core.runnables import RunnableParallel, RunnablePassthrough
from operator import itemgetter

# 1.编排Prompt
prompt = ChatPromptTemplate.from_template("{query}")

# 构建大语言模型
llm = ChatOpenAI(model="deepseek-chat", api_key=os.environ.get("DS_KEY"), base_url=os.environ.get("DS_API_BASE"))

# 创建链
#方式一
# chain = {"query": RunnablePassthrough()} | prompt | llm | StrOutputParser()
# 
# content = chain.invoke("你好,你是？")
# print(content)

#方式二
chain = {"query": itemgetter("query")} | prompt | llm | StrOutputParser()
content = chain.invoke({"query": "你好,你是？"})
print(content)



你好！我是DeepSeek Chat，由深度求索公司打造的智能AI助手。我可以帮你解答问题、提供建议、聊天交流，还能处理各种文本和文件内容。如果有任何需要，尽管问我吧！😊  

有什么可以帮你的呢？


## 传递的数据中添加数据

In [6]:
import os
from langchain_openai import ChatOpenAI
from langchain_core.prompts import (ChatPromptTemplate, PromptTemplate)
from langchain_core.output_parsers import StrOutputParser
from typing import Any
from langchain_core.runnables import RunnableParallel, RunnablePassthrough
from operator import itemgetter


def retrieval(query: str) -> str:
    """模拟一个检索器，传入query，输出文本"""
    print("执行检索", query)
    return "我叫VV，是一名AI应用开发工程师"


# 1.编排Prompt
prompt = ChatPromptTemplate.from_template("""请根据用户的提问回他问题，可以参考上下文进行回复
<context>
{context}
</context>
用户的问题是:{query}
""")

# 构建大语言模型
llm = ChatOpenAI(model="deepseek-chat", api_key=os.environ.get("DS_KEY"), base_url=os.environ.get("DS_API_BASE"))

parser = StrOutputParser()

chain = (
        RunnablePassthrough.assign(context=lambda query: retrieval(query)) |
        prompt |
        llm |
        parser
)
content = chain.invoke({"query": "你好我叫什么"})

print(content)


执行检索 {'query': '你好我叫什么'}
你好，VV！根据我们的对话记录，你的名字是VV，是一名AI应用开发工程师。有什么我可以帮你的吗？ 😊


## 利用回调功能调试链应用-让过程更透明
Callback 是 LangChain 提供的回调机制，允许我们在 LLM 应用程序的各个阶段使用 hook (钩 子)。钩子的含义也非常简单，我们把应用程序看成一个一个的处理逻辑，从开始到结束，钩子就是在 事件传送到终点前截获并监控事件的传输。

Callback 收集到的信息可以直接输出到控制台，也可以输出到文件，更可以输入到第三方应用，相当于 独立的日志管理系统，通过这些日志就可以分析应用的运行情况，统计异常率，运行的瓶颈模块以便优化。
1. CallbackHandler:对每个应用场景比如 Agent 或 Chain 或 Tool 的纪录。
2. CallbackManager:对所有 CallbackHandler 的封装和管理，包括了单个场景的 Handle，也包括 运行时整条链路的 Handle。

In [8]:
import os
from langchain_openai import ChatOpenAI
from langchain_core.prompts import (ChatPromptTemplate, PromptTemplate)
from langchain_core.output_parsers import StrOutputParser
from typing import Any
from langchain_core.runnables import RunnableParallel, RunnablePassthrough
from langchain_core.callbacks import StdOutCallbackHandler

# 1.编排Prompt
prompt = ChatPromptTemplate.from_template("{query}")

# 构建大语言模型
llm = ChatOpenAI(model="deepseek-chat", api_key=os.environ.get("DS_KEY"), base_url=os.environ.get("DS_API_BASE"))

chain = {"query": RunnablePassthrough()} | prompt | llm | StrOutputParser()

content = chain.stream("你好你是？", config={
    "callbacks": [StdOutCallbackHandler()]
})
print(content)

for chunk in content:
    pass


<generator object RunnableSequence.stream at 0x115d784f0>


[1m> Entering new RunnableSequence chain...[0m


[1m> Entering new RunnableParallel<query> chain...[0m


[1m> Entering new RunnablePassthrough chain...[0m

[1m> Finished chain.[0m

[1m> Finished chain.[0m


[1m> Entering new ChatPromptTemplate chain...[0m

[1m> Finished chain.[0m


[1m> Entering new StrOutputParser chain...[0m

[1m> Finished chain.[0m

[1m> Finished chain.[0m


## 自定义回调

In [14]:
from langchain_core.callbacks import  BaseCallbackHandler,StdOutCallbackHandler
from uuid import UUID
from typing import Dict, Any, List, Optional
from typing import Optional,Any,Union
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableParallel, RunnablePassthrough
class LLMOpsCallbackHandler(BaseCallbackHandler):
    """自定义LLMOps回调"""
    def on_llm_start(
        self,
        serialized: dict[str, Any],
        prompts: list[str],
        *,
        run_id: UUID,
        parent_run_id: Optional[UUID] = None,
        tags: Optional[list[str]] = None,
        metadata: Optional[dict[str, Any]] = None,
        **kwargs: Any,
    ) -> Any:
        print("on_llm_start serialized:",serialized)
        print("on_llm_start_ prompts",prompts)

    # def on_llm_new_token(
    #     self,
    #     token: str,
    #     *,
    #     chunk: Optional[Union[GenerationChunk, ChatGenerationChunk]] = None,
    #     run_id: UUID,
    #     parent_run_id: Optional[UUID] = None,
    #     **kwargs: Any,
    # ) -> Any:
    #    print("on_llm_new_token:", token)
       
       
# 1.编排Prompt
prompt = ChatPromptTemplate.from_template("{query}")

# 构建大语言模型
llm = ChatOpenAI(model="deepseek-chat", api_key=os.environ.get("DS_KEY"), base_url=os.environ.get("DS_API_BASE"))

chain = {"query": RunnablePassthrough()} | prompt | llm | StrOutputParser()

content = chain.stream("你好你是？", config={
    "callbacks": [StdOutCallbackHandler(),LLMOpsCallbackHandler()]
})
print(content)

for chunk in content:
    pass


<generator object RunnableSequence.stream at 0x1117b09a0>


[1m> Entering new RunnableSequence chain...[0m


[1m> Entering new RunnableParallel<query> chain...[0m


[1m> Entering new RunnablePassthrough chain...[0m

[1m> Finished chain.[0m

[1m> Finished chain.[0m


[1m> Entering new ChatPromptTemplate chain...[0m

[1m> Finished chain.[0m
on_llm_start serialized: {'lc': 1, 'type': 'constructor', 'id': ['langchain', 'chat_models', 'openai', 'ChatOpenAI'], 'kwargs': {'model_name': 'deepseek-chat', 'temperature': 0.7, 'openai_api_key': {'lc': 1, 'type': 'secret', 'id': ['OPENAI_API_KEY']}, 'openai_api_base': 'https://api.deepseek.com/v1', 'max_retries': 2, 'n': 1}, 'name': 'ChatOpenAI'}
on_llm_start_ prompts ['Human: 你好你是？']


[1m> Entering new StrOutputParser chain...[0m

[1m> Finished chain.[0m

[1m> Finished chain.[0m
