# <center>科技计量数据地理信息清洗Agent（langgraph异步agent）</center>

In [None]:
import os
from dotenv import load_dotenv

### 1.1加载环境变量，获取DEEPSEEK API密钥

In [17]:
# 加载 .env 文件
load_dotenv(".env")

True

In [18]:
API_KEY=os.environ.get('DEEPSEEK_API_KEY')
deepseek_chat_model = "deepseek-chat" # DeepSeek-V3.2的非思考模式
#deepseek_reason_model = "deepseek-reasoner" # DeepSeek-V3.2 的思考模式

## 1.3方式二： 使用 deepseek SDK 调用 DeepSeek 模型

In [19]:
from langchain_deepseek import ChatDeepSeek
llm2 = ChatDeepSeek(api_key=API_KEY, model=deepseek_chat_model)
# 测试调用
# messages =[HumanMessage("你是谁呀?")]
# print(llm2.invoke(messages).content)

In [20]:
# AI识别智能体参数设置：创建专门用于地址推理的 LLM 实例
llm_reasoning = ChatDeepSeek(
    api_key=API_KEY,
    model="deepseek-chat",
    temperature=0.0,      # 确保每次输入“清华大学”都必回“北京”
    max_tokens=20,        # 足够覆盖所有省份名
    top_p=0.1,            # 极高的一致性
    timeout=30,           # 给并发留出足够的超时余量
    # stop=["\n", " "]    # 可选：遇到换行或空格立即停止，防止 AI 输出解释
)

# LanggraphAgent构建

In [21]:
import json
from typing import TypedDict, Optional
from langgraph.graph import StateGraph, END

# ①定义智能体状态类型

In [22]:
# --- 1. 定义状态结构 ---
class AgentState(TypedDict):
    raw_address: str        # 原始输入地址
    matched_province: Optional[str]  # 识别出的标准省份
    confidence: float       # 置信度
    needs_ai: bool          # 是否需要AI介入
    iteration_count: int    # 迭代次数

# ②规则匹配智能体

In [None]:
# --- 2. 定义智能体节点逻辑 ---
def rule_matcher_agent(state: AgentState):
    """规则匹配智能体：通过反向查找JSON词表进行硬匹配"""
    addr = state['raw_address']
    
    # 遍历词表进行匹配（实际可用FlashText加速）
    for province, aliases in PROVINCE_RULES.items():
        if any(alias.lower() in addr.lower() for alias in aliases):
            return {
                "matched_province": province,
                "needs_ai": False,
                "confidence": 1.0
            }
    
    # 如果规则没匹配到，标记为需要AI识别
    return {"needs_ai": True, "matched_province": None}

# ③AI推理智能体

In [24]:
async def ai_reasoner_agent(state: AgentState):
    """AI 推理智能体：调用LLM识别复杂的机构名或不规则地址"""
    addr = state['raw_address']
    
    # 此处模拟调用LLM (Prompt: 识别该机构/地址所属的中国省份，仅输出省份名称)
    prompt = f"""请分析以下地址，识别其最可能的所属省份。

地址名称：{addr}

请严格遵循以下规则：
1. 只返回省份名称，如'广东'、'北京'、'江苏'等
2. 如果是直辖市，返回直辖市名称
3. 如果无法确定省份，例如是人名，返回'难以识别'
4. 不要解释，不要添加任何其他文字
5. 如果名称中包含城市名，返回该城市所在的省份

示例：
输入：广东绘宇智能勘测科技有限公司 → 输出：广东
输入：深圳市天泰网络技术有限公司 → 输出：广东
输入：清华大学 → 输出：北京
输入：阿里巴巴集团 → 输出：浙江
输入：华为技术有限公司 → 输出：广东
输入: 刘宗吉 → 输出：难以识别

请直接输出省份名称："""
    response = await llm_reasoning.ainvoke(prompt) #等待并发调用，# ainvoke 允许程序在等待 DeepSeek 回复时，去处理下一个地址，不要用 invoke 方法
    
    # 假设AI识别结果
    ai_result = response.content
    
    return {
        "matched_province": ai_result,
        "confidence": 0.85
    }

# ④校验智能体

In [25]:
def validator_agent(state: AgentState):
    standard_provinces = PROVINCE_RULES.keys()
    result = state['matched_province']
    current_conf = state.get('confidence', 0)
    
    if result in standard_provinces:
        # 如果在标准库中，保持原有的置信度，或者稍微加权提升，但不直接改为 1.0
        return {"confidence": current_conf} 
    elif result == "难以识别":
        return {"confidence": 0.0, "matched_province": "难以识别"}
    else:
        # 如果 AI 吐出的词不在省份列表里，置信度降为 0
        return {"confidence": 0.0, "matched_province": "格式错误"}

# ⑤构建图逻辑

In [26]:
# --- 4. 构建图逻辑 (Workflow) ---

workflow = StateGraph(AgentState)

# 添加节点
workflow.add_node("rule_matcher", rule_matcher_agent)
workflow.add_node("ai_reasoner", ai_reasoner_agent)
workflow.add_node("validator", validator_agent)

# 设置入口，强制从规则匹配开始
workflow.set_entry_point("rule_matcher")

# 构建条件分支
def route_after_rule(state: AgentState):
    if state["needs_ai"]:
        return "ai_reasoner"
    return "validator"

workflow.add_conditional_edges(
    "rule_matcher",
    route_after_rule,
    {
        "ai_reasoner": "ai_reasoner",
        "validator": "validator"
    }
)

workflow.add_edge("ai_reasoner", "validator")
workflow.add_edge("validator", END)

# 编译应用
app = workflow.compile()

# 调用函数

# 调用执行

In [27]:
import pandas as pd
import asyncio
from tqdm.asyncio import tqdm

# 限制最大并发数为 20
n = 20
sem = asyncio.Semaphore(n)

async def process_row_task(app, addr):
    """单个地址的处理任务"""
    async with sem:  # 只有获取到信号量（20个名额之一）才能执行
        initial_state = {
            "raw_address": addr,
            "matched_province": None,
            "confidence": 0.0,
            "needs_ai": False,
            "iteration_count": 0
        }
        try:
            # 关键点：使用 ainvoke 启动图流转
            final_state = await app.ainvoke(initial_state)
            return {
                "cleaned_province": final_state.get("matched_province"),
                "confidence": final_state.get("confidence"),
                "processed_by": "AI" if final_state.get("needs_ai") else "Rule"
            }
        except Exception as e:
            return {"cleaned_province": f"Error: {str(e)}", "confidence": 0, "processed_by": "System"}

async def run_batch_cleaning(app, input_file, output_file, address_column):
    """异步批处理主函数"""
    # 读取数据
    df = pd.read_excel(input_file)
    addresses = df[address_column].astype(str).tolist()
    
    print(f"开始异步处理，总计: {len(addresses)} 条，并发限制: {n}")
    
    # 创建所有异步任务
    tasks = [process_row_task(app, addr) for addr in addresses]
    
    # 并发执行并显示进度条
    results = await tqdm.gather(*tasks)
    
    # 结果回写
    res_df = pd.DataFrame(results)
    final_df = pd.concat([df, res_df], axis=1)
    final_df.to_excel(output_file, index=False)
    print(f"处理完成，保存至: {output_file}")

In [28]:
import nest_asyncio

if __name__ == "__main__":
    # 确保 PROVINCE_RULES 已加载
    with open('province_keywords.json', 'r') as f:
        PROVINCE_RULES = json.load(f)
        
    # 编译 app (确保节点已添加 async 版本的 ai_reasoner_agent)
    app = workflow.compile()

    # 启动异步主程序
    nest_asyncio.apply()

    # 之后你原来的代码就可以直接运行了
    asyncio.run(run_batch_cleaning(app, "著作权2.xlsx", "cleaned_data_v3.xlsx", address_column="著作权人"))

开始异步处理，总计: 235 条，并发限制: 20


100%|██████████| 235/235 [00:13<00:00, 17.01it/s]

处理完成，保存至: cleaned_data_v3.xlsx



