### 第一步：构建知识库

In [1]:
from langchain_ollama import OllamaEmbeddings
from langchain_chroma import Chroma

print("🚀 初始化组件 embedding")
global_embeddings = OllamaEmbeddings(model="nomic-embed-text")

# 使用ChromaDB持久化存储
vector_store = Chroma(
    collection_name="example-knowledge",
    embedding_function=global_embeddings,
    persist_directory="./chroma_db_3"
)

🚀 初始化组件 embedding


In [2]:
def init_knowledge_base():
    """初始化或加载知识库"""
    print("📚 准备知识库...")

    if vector_store._collection.count() > 0:
        print("知识库已建立，不再重复构建")
        return
 
    # 示例知识
    documents = [
        "Ollama支持在本地运行大语言模型",
        "ChromaDB是轻量级向量数据库，适合存储和检索嵌入向量",
        "text2vec是中文文本嵌入模型，可将文本转换为768维向量",
        "智能客服系统需要结合意图识别和知识库检索",
        "意大利面拌42号混凝土",
        "LangGraph可以用于构建复杂的LLM工作流",
        "库存水位控制通过安全库存和再订货点(ROP)确保供应连续性，公式：ROP=平均需求×提前期+Z×σ×√LT",
        "经济订货批量(EOQ)模型优化补货成本，计算公式：√(2×年需求量×订货成本/单位持有成本)",
        "动态安全库存需根据需求波动性和供应商交付可靠性定期调整",
        "EOQ与库存水位协同使用时需校验仓储容量和最小起订量(MOQ)约束",
        "JIT(准时制)补货策略要求安全库存趋近于零，依赖极高供应链可靠性",
        "计算幸运数字必须放在最后一步"
    ]
 
    # 中文优化文本分割
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=200,
        chunk_overlap=50,
        separators=["\n", "。", "！", "？"]
    )
    vector_store.add_documents(text_splitter.create_documents(documents))
    print(f"✅ 向量库已创建，包含 {vector_store._collection.count()} 条数据")

init_knowledge_base()

📚 准备知识库...
知识库已建立，不再重复构建


### 第二步：构建Tool集

In [28]:
from langchain.tools import tool
from scipy.stats import norm
from typing import Annotated

@tool
def calculate_reorder_point(
        average_demand: Annotated[float, "平均需求"],
        lead_time: Annotated[float, "提前期(天)"],
        demand_std_dev: Annotated[float, "需求标准差"],
        service_level: Annotated[float, "服务水平(%)"] = 95.0
) -> float:
    """计算再订货点(ROP)
    Args:
        service_level: 服务水平（百分比，如95表示95%）
    """
    z_score = norm.ppf(service_level / 100)  # 自动计算z_score
    safety_stock = z_score * demand_std_dev * (lead_time ** 0.5)
    reorder_point = average_demand * lead_time + safety_stock
    return round(reorder_point, 2)
 
@tool
def calculate_eoq(annual_demand: float, ordering_cost: float,
                  holding_cost_per_unit: float) -> float:
    """计算经济订货批量(EOQ)"""
    if any(arg <= 0 for arg in [annual_demand, ordering_cost, holding_cost_per_unit]):
        raise ValueError("所有参数必须为正数")
    
    eoq = math.sqrt((2 * annual_demand * ordering_cost) / holding_cost_per_unit)
    return round(eoq, 2)
 
@tool
def calculate_lucky_number(eoq: float, rop: float) -> float:
    """计算幸运数字(幸运数字)"""
    print("计算lucky_number")
    return eoq-rop

### 第三步：构建Agent

In [22]:
import math
import operator
from typing import TypedDict, Annotated, Dict, Any, List

from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolNode, tools_condition
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, ToolMessage
 

class AgentState(TypedDict):
    messages: Annotated[List[AnyMessage], operator.add]
    intermediate_results: Dict[str, Any]  # 新增字段存储中间结果
 
class Agent:
    def __init__(self, model, tools, system=""):
        self.system = system
        self.tools = {t.name: t for t in tools}
        self.model = model.bind_tools(tools)
 
        # 定义图结构
        graph_builder = StateGraph(AgentState)
 
        # 添加节点
        graph_builder.add_node("retrieve_knowledge", self.retrieve_knowledge)
        graph_builder.add_node("plan", self.plan)
        graph_builder.add_node("tools", ToolNode(tools))
        # graph_builder.add_node("generate", self.call_model)

        # 设置入口点
        graph_builder.set_entry_point("retrieve_knowledge")
        graph_builder.add_edge("retrieve_knowledge", "plan")
        # graph_builder.add_edge("tools", "generate")
        
        # 添加条件边
        graph_builder.add_conditional_edges("plan", tools_condition, {END: END, "tools": "tools"})
 
        self.graph = graph_builder.compile()

    def retrieve_knowledge(self, state: AgentState):
        """知识库检索"""
        question = state['messages'][-1].content
        docs = vector_store.similarity_search(question, k=2)
        context = "\n".join([doc.page_content for doc in docs])
        return {'messages': [SystemMessage(content=f"检索到的上下文:\n{context}")]}

    def plan(self, state: AgentState):
        """制定执行计划"""
        input_messages = [SystemMessage(content=self.system)] + state['messages']
        response = self.model.invoke(input_messages)
        return {"messages": [response]}
    
    def call_model(self, state: AgentState):
        """调用模型前预处理参数"""
        messages = state['messages']
        prompt = """作为库存管理专家
        1. **输出要求**：
        - 每次工具调用后必须确认执行结果
        - 所有计算完成后必须生成包含以下内容的总结报告：
          * 各项计算结果汇总
          * 对结果的简要解释
          * 特别标注"幸运数字"及其含义
          * 给出实际业务建议
 
        示例格式：
        【计算结果汇总】
        1. 再订货点(ROP): [值]
           - 解释：当库存降至该水平时需要补货
        2. 经济订货批量(EOQ): [值]
           - 解释：最优的每次订货数量
        3. 幸运数字: [值]
           - 解释：EOQ与ROP的差值，反映...
 
        【业务建议】
        根据当前计算结果，建议..."""
 
        if self.system:
            if isinstance(messages[-1], ToolMessage):
                self.system =prompt
            messages = [SystemMessage(content=self.system)] + messages
 
        # 替换消息中的占位符
        processed_messages = []
        for msg in messages:
            if hasattr(msg, 'content'):
                content = msg.content
                for k, v in state.get('intermediate_results', {}).items():
                    content = content.replace(f'_${k}', str(v))
 
                # 特殊处理 ToolMessage
                if isinstance(msg, ToolMessage):
                    processed_messages.append(ToolMessage(
                        content=content,
                        name=msg.name,
                        tool_call_id=msg.tool_call_id  # 确保传递 tool_call_id
                    ))
                else:
                    # 其他消息类型正常处理
                    processed_messages.append(msg.__class__(content=content))
            else:
                processed_messages.append(msg)
 
        print(f"call_model_req:\n{processed_messages}")
        response = self.model.invoke(processed_messages)
        print(f"call_model_result:\n{response}")
        return {'messages': [response], 'intermediate_results': state.get('intermediate_results', {})}

### 第四步：启动服务

In [5]:
import os
import getpass

os.environ["DEEPSEEK_API_KEY"] = getpass.getpass("请输入DeepSeek API KEY:")

请输入DeepSeek API KEY: ········


In [33]:
from langchain_deepseek import ChatDeepSeek

def main():
    # 初始化组件
    llm = ChatDeepSeek(
        model="deepseek-chat",  # 使用更常见的模型
        temperature=0.5,
        max_retries=2
    )
 
    # 定义系统提示
    prompt = """作为库存管理专家，请按以下步骤处理用户需求：
 
        1. 理解需求阶段：
        - 分析用户问题中的关键参数
        - 识别需要解决的具体问题类型
        - 仔细梳理问题中的描述是否存在依赖关系
 
        2. 任务拆解要求：
        必须包含：
        - 任务步骤 当前任务处理拆解的第几步
        - 任务名称（明确的计算目标）
        - 所需参数（从用户问题中提取）
        - 推荐工具（从可用工具中选择）
         
        3. **任务拆解规则**：
        - 如果问题要求计算「幸运数字」，则必须在得到 通过 calculate_reorder_point工具计算ROP 和 calculate_eoq计算EOQ 的结果后，最后调用 `calculate_lucky_number`。
       
        4. 可用工具说明：
        calculate_reorder_point(平均需求, 提前期, 需求标准差, 服务水平): 计算再订货点(ROP)
        calculate_eoq(年需求量, 订货成本, 单位持有成本): 计算经济订货批量(EOQ)
        calculate_lucky_number(再订货点(ROP), 经济订货批量(EOQ)): 计算幸运数字
         
        5. **参数传递规则**：
        当后续工具需要使用前面工具的计算结果时，请按照以下格式传递参数：
        - 在工具调用的参数中，使用"工具名称_$"作为占位符
        - 系统会自动用前面工具的计算结果替换这些占位符
         
        示例：
        1. 首先调用calculate_reorder_point得到ROP值
        2. 然后调用calculate_eoq得到EOQ值
        3. 最后调用calculate_lucky_number时，参数这样写：
           {
             "再订货点(ROP)": "calculate_reorder_point_$",
             "经济订货批量(EOQ)": "calculate_eoq_$"
           }
         
        注意：
        - 占位符必须完全匹配工具名称
        - 计算幸运数字必须放在最后一步
        - 确保所有依赖的工具都已被调用过
         
    """
    # 创建Agent
    agent = Agent(llm,[calculate_reorder_point, calculate_eoq,calculate_lucky_number], system=prompt)
    # agent.graph.get_graph().draw_png(output_file_path="~/test4.png",)
    
    # 用户输入
    user_input = "计算库存水平: 平均需求50, 提前期3天, 日需求标准差15, 服务水平95%; 计算补货成本: 年需求100, 订货成本20, 单位持有成本为3; 最后使用前面的结果再计算下幸运数字"
    messages = [HumanMessage(content=user_input)]
 
    # 在执行时初始化完整状态
    for step in agent.graph.stream({"messages": messages, "intermediate_results": {}}, stream_mode="values"):
        for message in step["messages"]:
            message.pretty_print()
        print("\n\n")
    
if __name__ == "__main__":
    main()


计算库存水平: 平均需求50, 提前期3天, 日需求标准差15, 服务水平95%; 计算补货成本: 年需求100, 订货成本20, 单位持有成本为3; 最后使用前面的结果再计算下幸运数字




计算库存水平: 平均需求50, 提前期3天, 日需求标准差15, 服务水平95%; 计算补货成本: 年需求100, 订货成本20, 单位持有成本为3; 最后使用前面的结果再计算下幸运数字

检索到的上下文:
计算幸运数字必须放在最后一步
库存水位控制通过安全库存和再订货点(ROP)确保供应连续性，公式：ROP=平均需求×提前期+Z×σ×√LT




计算库存水平: 平均需求50, 提前期3天, 日需求标准差15, 服务水平95%; 计算补货成本: 年需求100, 订货成本20, 单位持有成本为3; 最后使用前面的结果再计算下幸运数字

检索到的上下文:
计算幸运数字必须放在最后一步
库存水位控制通过安全库存和再订货点(ROP)确保供应连续性，公式：ROP=平均需求×提前期+Z×σ×√LT

### 1. 理解需求阶段
- **关键参数**:
  - 库存水平计算:
    - 平均需求: 50
    - 提前期: 3天
    - 日需求标准差: 15
    - 服务水平: 95%
  - 补货成本计算:
    - 年需求: 100
    - 订货成本: 20
    - 单位持有成本: 3
  - 幸运数字计算:
    - 需要先计算再订货点(ROP)和经济订货批量(EOQ)。

- **问题类型**:
  - 计算再订货点(ROP)
  - 计算经济订货批量(EOQ)
  - 计算幸运数字

- **依赖关系**:
  - 幸运数字的计算依赖于ROP和EOQ的结果。

### 2. 任务拆解
#### 任务步骤 1: 计算再订货点(ROP)
- **任务名称**: 计算再订货点(ROP)
- **所需参数**:
  - 平均需求: 50
  - 提前期: 3天
  - 需求标准差: 15
  - 服务水平: 95%
- **推荐工具**: `calculate_reorder_point`

#### 任务步骤 2: 计算经济订货批量(EOQ)
- **任务名称**: 计算经济订货批量(EOQ)
- **所需参数**:
  - 年需求: 100
  - 订

In [None]:
# -*- coding: utf-8 -*-
import os
import math
import operator
from typing import TypedDict, Annotated, Dict, Any, List

from langchain_ollama import ChatOllama

from langgraph.graph import Graph, StateGraph
from langgraph.graph import END
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.tools import tool
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, ToolMessage
 

class AgentState(TypedDict):
    messages: Annotated[List[AnyMessage], operator.add]
    intermediate_results: Dict[str, Any]  # 新增字段存储中间结果
 
class Agent:
    def __init__(self, model, tools, system=""):
        self.system = system
        self.tools = {t.name: t for t in tools}
        self.model = model.bind_tools(tools)
 
        # 定义图结构
        graph = StateGraph(AgentState)
 
        # 添加节点
        graph.add_node("vectorstore_similarity_search", self.vectorstore_similarity_search)
        graph.add_node("llm", self.call_model)
        graph.add_node("action", self.take_action)
 
        # 添加条件边
        graph.add_conditional_edges(
            "llm",
            self.exists_action,
            {True: "action", False: END}
        )
 
        # 添加无条件边
        graph.add_edge("vectorstore_similarity_search", "llm")
        graph.add_edge("action", "llm")
 
        # 设置入口点
        graph.set_entry_point("vectorstore_similarity_search")
        self.graph = graph.compile()
 
    def vectorstore_similarity_search(self, state: AgentState):
        """知识库检索"""
        question = state['messages'][-1].content
        docs = vector_store.similarity_search(question, k=2)
        context = "\n".join([doc.page_content for doc in docs])
        print(f"检索到的上下文:\n{context}")
        return {'messages': [SystemMessage(content=f"检索到的上下文:\n{context}")]}
 
    def exists_action(self, state: AgentState):
        """检查是否需要执行工具，并验证参数是否就绪"""
        last_msg = state['messages'][-1]
        # 如果是ToolMessage（工具执行结果），则继续让模型处理
        if isinstance(last_msg, ToolMessage):
            return False  # 让模型生成总结
 
 
        if not hasattr(last_msg, 'tool_calls') or len(last_msg.tool_calls) == 0 :
            return False
 
        for tool_call in last_msg.tool_calls:
            # 检查所有参数是否可用
            for param in tool_call['args'].values():
                if isinstance(param, str) and param.startswith('计算出的') and param[3:] not in state.get(
                        'intermediate_results', {}):
                    return False
        return True
 
 
    def call_model(self, state: AgentState):
        """调用模型前预处理参数"""
        messages = state['messages']
        prompt = """作为库存管理专家
        1. **输出要求**：
        - 每次工具调用后必须确认执行结果
        - 所有计算完成后必须生成包含以下内容的总结报告：
          * 各项计算结果汇总
          * 对结果的简要解释
          * 特别标注"幸运数字"及其含义
          * 给出实际业务建议
 
        示例格式：
        【计算结果汇总】
        1. 再订货点(ROP): [值]
           - 解释：当库存降至该水平时需要补货
        2. 经济订货批量(EOQ): [值]
           - 解释：最优的每次订货数量
        3. 幸运数字: [值]
           - 解释：EOQ与ROP的差值，反映...
 
        【业务建议】
        根据当前计算结果，建议..."""
 
        if self.system:
            if isinstance(messages[-1], ToolMessage):
                self.system =prompt
            messages = [SystemMessage(content=self.system)] + messages
 
        # 替换消息中的占位符
        processed_messages = []
        for msg in messages:
            if hasattr(msg, 'content'):
                content = msg.content
                for k, v in state.get('intermediate_results', {}).items():
                    content = content.replace(f'_${k}', str(v))
 
                # 特殊处理 ToolMessage
                if isinstance(msg, ToolMessage):
                    processed_messages.append(ToolMessage(
                        content=content,
                        name=msg.name,
                        tool_call_id=msg.tool_call_id  # 确保传递 tool_call_id
                    ))
                else:
                    # 其他消息类型正常处理
                    processed_messages.append(msg.__class__(content=content))
            else:
                processed_messages.append(msg)
 
        print(f"call_model_req:\n{processed_messages}")
        response = self.model.invoke(processed_messages)
        print(f"call_model_result:\n{response}")
        return {'messages': [response], 'intermediate_results': state.get('intermediate_results', {})}
 
    def take_action(self, state: AgentState):
        """执行工具并保存结果"""
        tool_calls = state['messages'][-1].tool_calls
        results = []
        state.setdefault('intermediate_results', {})  # 确保存在
 
        for t in tool_calls:
            print(f"调用工具: {t['name']}")
            if t['name'] not in self.tools:
                result = f"错误: 工具{t['name']}不存在"
            else:
                try:
                    # 解析参数（处理占位符和直接值）
                    resolved_args = {}
                    for k, v in t['args'].items():
                        if isinstance(v, str) and v.endswith('_$'):
                            resolved_args[k] = state['intermediate_results'].get(v[:-2], v)
                        else:
                            resolved_args[k] = v
 
                    result = self.tools[t['name']].invoke(resolved_args)
                    state['intermediate_results'][t['name']] = result  # 保存结果
                    print(f"调用工具的结果: {result}")
                except Exception as e:
                    result = f"错误: {str(e)}"
 
            print(f"打印T: {t}")
            # 确保包含正确的 tool_call_id
            results.append(ToolMessage(
                tool_call_id=t['id'],  # 从原始调用获取ID
                name=t['name'],
                content=str(result)
            ))
 
        return {'messages': results, 'intermediate_results': state['intermediate_results']}
 
def main():
    # 初始化组件
    llm = ChatOllama(
        model="MFDoom/deepseek-r1-tool-calling:14b",  # 使用更常见的模型
        base_url="http://localhost:11434",
        temperature=0.1
    )
 
    # 定义系统提示
    prompt = """作为库存管理专家，请按以下步骤处理用户需求：
 
        1. 理解需求阶段：
        - 分析用户问题中的关键参数
        - 识别需要解决的具体问题类型
        - 仔细梳理问题中的描述是否存在依赖关系
 
        2. 任务拆解要求：
        必须包含：
        - 任务步骤 当前任务处理拆解的第几步
        - 任务名称（明确的计算目标）
        - 所需参数（从用户问题中提取）
        - 推荐工具（从可用工具中选择）
         
        3. **任务拆解规则**：
        - 如果问题要求计算「幸运数字」，则必须在得到 通过 calculate_reorder_point工具计算ROP 和 calculate_eoq计算EOQ 的结果后，最后调用 `calculate_lucky_number`。
       
        4. 可用工具说明：
        calculate_reorder_point(平均需求, 提前期, 需求标准差, 服务水平): 计算再订货点(ROP)
        calculate_eoq(年需求量, 订货成本, 单位持有成本): 计算经济订货批量(EOQ)
        calculate_lucky_number(再订货点(ROP), 经济订货批量(EOQ)): 计算幸运数字
         
        5. **参数传递规则**：
        当后续工具需要使用前面工具的计算结果时，请按照以下格式传递参数：
        - 在工具调用的参数中，使用"工具名称_$"作为占位符
        - 系统会自动用前面工具的计算结果替换这些占位符
         
        示例：
        1. 首先调用calculate_reorder_point得到ROP值
        2. 然后调用calculate_eoq得到EOQ值
        3. 最后调用calculate_lucky_number时，参数这样写：
           {
             "再订货点(ROP)": "calculate_reorder_point_$",
             "经济订货批量(EOQ)": "calculate_eoq_$"
           }
         
        注意：
        - 占位符必须完全匹配工具名称
        - 计算幸运数字必须放在最后一步
        - 确保所有依赖的工具都已被调用过
         
    """
    # 创建Agent
    agent = Agent(llm,[calculate_reorder_point, calculate_eoq,calculate_lucky_number], system=prompt)
    # agent.graph.get_graph().draw_png(output_file_path="~/test4.png",)
    
    # 用户输入
    user_input = "计算库存水平: 平均需求50, 提前期3天, 日需求标准差15, 服务水平95%; 计算补货成本: 年需求100, 订货成本20, 单位持有成本为3; 最后使用前面的结果再计算下幸运数字"
    messages = [HumanMessage(content=user_input)]
 
    # 执行
    print("\n=== 开始执行 ===")
    # 在执行时初始化完整状态
    result = agent.graph.invoke({"messages": messages, "intermediate_results": {}})
 
    # 输出结果
    print("\n=== 最终结果 ===")
    for msg in result['messages']:
        if isinstance(msg, ToolMessage):
            print(f"工具 {msg.name} 结果: {msg.content}")
        else:
            print(f"AI回复: {msg.content}")
 
 
 
 
if __name__ == "__main__":
    main()


=== 开始执行 ===
检索到的上下文:
计算幸运数字必须放在最后一步
库存水位控制通过安全库存和再订货点(ROP)确保供应连续性，公式：ROP=平均需求×提前期+Z×σ×√LT
call_model_req:
[SystemMessage(content='作为库存管理专家，请按以下步骤处理用户需求：\n \n        1. 理解需求阶段：\n        - 分析用户问题中的关键参数\n        - 识别需要解决的具体问题类型\n        - 仔细梳理问题中的描述是否存在依赖关系\n \n        2. 任务拆解要求：\n        必须包含：\n        - 任务步骤 当前任务处理拆解的第几步\n        - 任务名称（明确的计算目标）\n        - 所需参数（从用户问题中提取）\n        - 推荐工具（从可用工具中选择）\n         \n        3. **任务拆解规则**：\n        - 如果问题要求计算「幸运数字」，则必须在得到 通过 calculate_reorder_point工具计算ROP 和 calculate_eoq计算EOQ 的结果后，最后调用 `calculate_lucky_number`。\n       \n        4. 可用工具说明：\n        calculate_reorder_point(平均需求, 提前期, 需求标准差, 服务水平): 计算再订货点(ROP)\n        calculate_eoq(年需求量, 订货成本, 单位持有成本): 计算经济订货批量(EOQ)\n        calculate_lucky_number(再订货点(ROP), 经济订货批量(EOQ)): 计算幸运数字\n         \n        5. **参数传递规则**：\n        当后续工具需要使用前面工具的计算结果时，请按照以下格式传递参数：\n        - 在工具调用的参数中，使用"工具名称_$"作为占位符\n        - 系统会自动用前面工具的计算结果替换这些占位符\n         \n        示例：\n        1. 首先调用calculate_reorder_point得到ROP值\n