Skip to content

ballyang747/dingTalkMutilAgent

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

81 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

🤖 钉钉 Multi-Agent 助手

一个住在钉钉里、记性好、嘴又严的数字同事。 它会联网查资料、帮你记日程、整理知识库,还懂得在该闭嘴时闭嘴。

把一条钉钉消息丢给它,它会自己想清楚"这事儿该不该接、谁来干、怎么干",然后调用合适的工具——联网搜索、写日程、翻记忆、查知识库——最后把答案脱敏后送回你的聊天框。全程 LangGraph 编排,异步流水线,隐私双向把关。

LLM 和 Embedding 都走 DashScope 云端 API,本地不推理、不要 GPU;弱到只剩 1 核的云主机也能跑(有 lite 模式兜底)。


✨ 它能干什么

  • 🔍 联网搜索问答 —— DashScope enable_search 自带搜索 + 整合回答;search_web 还能单独配个更快的模型,免得你等得花儿都谢了。
  • 📅 日程 / TODO 管理 —— 自然语言记录查询:"明天上午10点开周会"、"下周三记得交报告"。相对日期解析(今天/明天/下周三/ISO/月日)、复选框模板、删除/完成/更新,样样齐活;OSS 多端同步,断网还有本地回退。
  • 🧠 多轮对话记忆 —— 按时间召回最近 N 条历史,角色标记 + 去重;AsyncSqliteSaver 持久化 graph 状态,关机重启不健忘。
  • 💎 长期记忆(用户事实) —— mem0 风格:每轮在后台偷偷抽取"值得记住的事"(你的偏好/背景/技能),去重入库,下次自动注入上下文。也能直接说"记住我爱喝美式"、"你还记得我什么"。
  • 重要性 × 时间衰减重排 —— 向量召回不是谁相似谁上位,而是 相似度 + 重要性 × 时效衰减 综合排序,老旧冷知识自动让位。
  • 📜 滚动摘要 —— 会话太长?LLM 把最旧的一批压成一行摘要、删掉原文,给你的上下文窗口"瘦身"。
  • 📚 Wiki 知识库 —— 粘贴一篇文章,它消化成结构化摘要页存进知识库;之后能基于已归档内容合成带引用的回答;还能体检、批量导入。相当于给团队养了一个越用越聪明的"第二大脑"。
  • 🛡️ PII 双向脱敏 —— 正则覆盖 21 类实体 + GLiNER 语义补充(可分别开关);进来的脱敏、出去的也脱敏,原始内容绝不落库。它知道得很多,但绝不会说漏嘴。
  • 🚧 安全过滤 —— 长度限制、提示词注入检测、控制字符过滤。想骗它"忽略之前的指令"?抱歉,它没那么好骗。
  • 🤝 多 Agent 协作 —— Router 分类意图 → Dispatcher 路由工具 → Planner 拉 DAG 并行执行。复杂任务自己拆步骤,不用你手把手教。
  • ⏱️ 性能护栏 —— 每个 LLM 调用 / 召回 / 整轮都有独立超时兜底,保证回复稳稳落在钉钉 60s 窗口内。不会让你对着"对方正在输入..."望穿秋水。

🗺️ 架构流程图

一条消息的奇幻漂流:

flowchart TD
    MSG[钉钉消息<br/>Stream → on_message] --> LOCK[每会话锁<br/>串行化]
    LOCK --> SF{① security_filter<br/>长度/注入/控制字符}
    SF -->|拒绝| SYN
    SF -->|通过| PII[② pii_mask_input<br/>正则+GLiNER 脱敏]
    PII --> SUM[③ store_user_msg<br/>脱敏消息写 Milvus<br/>打 importance + flush]
    SUM --> EF[④ extract_facts<br/>后台 fire-and-forget<br/>抽事实→去重→写 user_facts]
    EF --> BC[⑤ build_context<br/>最近N条历史 + 长期事实注入<br/>超阈值触发滚动摘要]
    BC --> RS[⑥ reason<br/>Router 意图分类<br/>plain ainvoke + JSON]
    RS --> DEC{route_decision}
    DEC -->|WIKI_* / 简单任务| EX[⑦ execute<br/>Dispatcher 单步执行]
    DEC -->|PLAN 复杂任务| PL[⑦ plan<br/>Planner DAG 并行]
    EX --> SYN[⑧ synthesize<br/>聚合结果 去重]
    PL --> SYN
    SYN --> PO[⑨ pii_mask_output<br/>回复脱敏]
    PO --> SAM[⑩ store_assistant_msg<br/>脱敏回复写 Milvus]
    SAM --> END[⑪ END → 回复钉钉]
    EF -.后台不阻塞.-> UFACT[(user_facts)]

    subgraph 存储
        MIL[(Milvus conversations)]
        UFACT[(Milvus user_facts)]
        OSS[(OSS 日程/Wiki)]
        VAULT[(vault 本地缓存)]
        CP[(checkpoints.sqlite)]
    end
Loading

关键路径:①→⑪ 全程超时兜底(整轮 55s),保证回复落在钉钉 60s 窗口内。 extract_facts 是 fire-and-forget 后台任务,不阻塞回复——它在你看不到的角落默默记笔记。 WIKI_* 动作直接 execute,不进 Planner(Wiki 工具自带多步 LLM 逻辑,自己就能玩)。


🧠 记忆体系:三层脑子,各司其职

触发 存储 / 作用
对话历史 每轮自动 conversations collection,最近 N 条按时间正序(不是向量相似,要的是上下文连贯)
重要性 × 时效重排 向量召回时 sim + 0.3·importance·exp(-age/半衰期)
滚动摘要 会话 > 阈值 内存计数器判定,LLM 压缩最旧一批为 is_summary 行并删原消息
自动事实抽取 每轮后台 FactExtractor (fire-and-forget) 抽事实 → 去重 → 写 user_facts
事实注入上下文 每轮自动 top-3 相关长期事实注入 history([长期记忆] …)
显式事实工具 "记住X"/"记得我什么" Router → STORE_FACT / RECALL_FACTS
跨会话召回 MilvusStore.search_by_user(按 user_id)

📚 Wiki 知识库:团队的第二大脑

操作 触发 说明
归档 ingest "归档这篇文章: <文章>" 1 次 LLM 生成摘要页 → sources/ + index + log + OSS
查询 query "知识库里关于 X 的内容" 2 次 LLM(选页+合成),带 (页面名) 引用
体检 lint "检查一下知识库健康" 0 LLM,纯结构检查 + 修复建议,秒回
批量导入 python scripts/import_kb.py <dir> --exclude 原始转录 无 LLM,原样导入现有 .md

🗄️ 存储分层

存储 角色 内容
Milvus conversations 对话记忆 脱敏消息 + 向量 + importance + is_summary + 时间
Milvus user_facts 长期事实 用户偏好/背景/技能(按 user_id,跨会话)
OSS 多端共享源 日程/知识库 Markdown(source of truth)
vault/{日程,知识库}/ 本地缓存 Markdown 本地副本,OSS 失败回退
data/checkpoints.sqlite LangGraph 状态 图执行状态快照,按 session 恢复

没有 Git 同步那套——多端同步统一由 OSS 承担。一个真相源,省心。


📁 项目结构

ding_talk_helper/
├── agents/
│   ├── graph.py              # LangGraph StateGraph (11 节点) + 超时 + checkpointer
│   ├── router.py             # 意图分类 (plain ainvoke + JSON, 容错 fallback)
│   ├── dispatcher.py         # TOOL_MAP (15 工具) + dispatch_step
│   ├── structured_output.py  # Pydantic: ReasonerOutput/PlannerOutput/PlanStepOutput
│   ├── planner.py            # PlannerAgent + execute_plan (DAG)
│   ├── fact_extractor.py     # mem0 风格自动事实抽取 (fire-and-forget)
│   ├── summary_agent.py      # 滚动摘要 + maybe_summarize 触发器
│   ├── llm.py                # ChatOpenAI + DashScope (enable_search, LLM_SEARCH_MODEL)
│   ├── executor/{search,repo,memory,domain,wiki}.py
│   ├── wiki/manager.py       # WikiManager (vault + index + log + lint + import + OSS)
│   └── memory/
│       ├── __init__.py       # format_history / format_facts / score_importance / rerank_hits / is_duplicate_fact
│       ├── milvus_store.py   # 对话存储 + 跨会话召回 + 摘要 + flush + recent_messages
│       ├── user_fact_store.py# 长期事实存储
│       └── embedding.py
├── services/{pii_masker,security,oss_client}.py
├── dingtalk/client.py        # Stream 客户端
├── tests/                    # 222 单元测试 + 3 集成测试(默认跳过)
├── scripts/import_kb.py      # 批量导入现有 .md 到 OSS Wiki(无 LLM)
├── scripts/setup_linux.sh    # Linux 一键安装
├── Dockerfile / Dockerfile.lite / docker-compose*.yml  # 容器化部署(完整 + 轻量)
├── requirements.txt / requirements-lite.txt  # 完整 / 轻量(无torch) 依赖
├── config.py / main.py
└── docs/                     # 设计文档 + Linux 部署指南

🚀 快速开始(本地)

1. 创建 conda 环境(国内镜像)

repo.anaconda.com 在国内常被墙,先配上清华 TUNA 镜像:

cat > ~/.condarc <<'EOF'
channels:
  - defaults
show_channel_urls: true
default_channels:
  - https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main
  - https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/r
custom_channels:
  conda-forge: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud
EOF

conda create -n ding_talk_helper python=3.11 -y
conda activate ding_talk_helper
pip install -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple

SSL 修复(Windows conda 环境必做):复制 certifi 证书到 <env>/ssl/cacert.pem—— cp "$(python -c "import certifi;print(certifi.where())")" "$CONDA_PREFIX/ssl/cacert.pem"

2. 配置环境变量

cp .env.example .env
# 填入真实凭据:DINGTALK_APP_KEY/SECRET、LLM_API_KEY、EMBEDDING_API_KEY、Milvus、OSS

PII 模型需下载 knowledgator/gliner-pii-small-v1.0models/gliner-pii-small-v1.0/(仅 PII_USE_GLINER=true 时需要;lite 模式可跳过)。

3. 跑测试 & 启动

python -m pytest -q                    # 单元测试(默认跳过集成)
python -m pytest -m integration -v     # 集成测试(需真实凭据)

python main.py                         # 启动(= uvicorn main:app --reload --port 8000)
curl http://127.0.0.1:8000/health      # {"status":"ok"}

启动时自动:装配组件、MilvusStore.ensure_collection()WikiManager.ensure_layout()、(凭据齐全时)启动钉钉 Stream。

🐳 Docker 一把梭

不想折腾环境?容器化部署已备好,完整版与轻量版任选:

# 完整版(含 GLiNER 语义脱敏,镜像较大)
docker compose up -d --build

# 轻量版(无 torch/gliner,纯正则脱敏,~800MB,适合云主机)
docker compose -f docker-compose.lite.yml up -d --build

lite 版务必在 .env 里设 PII_USE_GLINER=false,否则镜像没装 gliner 会启动失败。

Linux 云主机部署

docs/DEPLOY_LINUX.md(一键脚本 scripts/setup_linux.sh --lite、systemd、Milvus Docker、弱硬件配置)。


⚙️ 配置(.env,均可选)

LLM / 性能护栏:

变量 默认 作用
LLM_MODEL qwen3.6-max-preview 分类/抽取/规划用模型
LLM_SEARCH_MODEL (空=LLM_MODEL) search_web 专用模型,建议配更快的(如 qwen-plus)
LLM_CALL_TIMEOUT_SECONDS 30 单次 LLM 调用超时(Router/FactExtractor)
TOOL_DISPATCH_TIMEOUT_SECONDS 40 工具派发超时(search_web 等慢操作)
GRAPH_RUN_TIMEOUT_SECONDS 55 整轮超时兜底(< 钉钉 60s 窗口)
MEMORY_RECALL_TIMEOUT_SECONDS 10 build_context 单次召回超时
CONVERSATION_HISTORY_LIMIT 10 每轮注入的最近对话消息条数

记忆模块:

变量 默认 作用
MEMORY_FACTS_ENABLED true 启用长期事实存储(user_facts collection)
MEMORY_AUTO_EXTRACT true 每轮后台抽取事实(fire-and-forget,不阻塞回复)
MEMORY_DECAY_HALF_LIFE_DAYS 7.0 召回时效衰减半衰期
MEMORY_SUMMARIZE_ENABLED true 启用滚动摘要
MEMORY_SUMMARIZE_THRESHOLD 20 触发摘要的非摘要消息条数
MEMORY_SUMMARIZE_BATCH 10 每次压缩的最旧消息条数

Wiki:

变量 默认 作用
WIKI_ENABLED true 启用 Wiki 归档(关后 WIKI_INGEST 回退到 ANSWER)
WIKI_SUBFOLDER Wiki 知识库下的 Wiki 子目录名
OSS_WIKI_PREFIX wiki/ OSS 上 Wiki 文件前缀

PII:

变量 默认 作用
ENABLE_PII_MASKING true PII 脱敏总开关;false → 不脱敏、不加载 GLiNER/torch
PII_USE_GLINER true GLiNER 语义补充(重,需 torch);false → 纯正则(弱硬件建议 false)
PII_MODEL_PATH ./models/gliner-pii-small-v1.0 GLiNER 模型路径

🪶 弱硬件部署处方:ENABLE_PII_MASKING=true + PII_USE_GLINER=false(保留正则脱敏,不加载 torch);或 ENABLE_PII_MASKING=false(完全不脱敏,极致省资源)。


📖 Wiki 使用速查

归档这篇文章: <文章 ≥20字>     → WIKI_INGEST  (1 LLM, ~10s)  → 摘要页 + index + log + OSS
知识库里关于 X 的内容          → WIKI_QUERY   (2 LLM, ~20s)  → 选页 + 合成带引用回答
检查一下知识库健康             → WIKI_LINT    (0 LLM, 秒回)   → 问题列表 + 修复建议
python scripts/import_kb.py <dir> --exclude 原始转录          → 批量导入现有 .md(无 LLM)

触发前缀:归档这篇文章 / 记入wiki / 存入知识库 / 帮我把这段存进知识库 / archive: 等。


📊 当前状态与限制

  • 222 单元测试通过,3 集成测试默认跳过
  • 真机端到端验证(2026-06-24):钉钉 Stream、search_web 联网回答、多轮对话总结、长期事实存取、Wiki 归档/查询/体检/批量导入
  • pymilvus 3.0 全适配:显式 schema、IndexParamsload_collectionflush()list_indexes 判重
  • DashScope 适配:Router/FactExtractor 用 plain ainvoke + JSON(with_structured_output 对部分 schema 卡 30s);json_mode 提示词含 "json"
  • ⚠️ 集成测试从未真跑:需填好 .envpytest -m integration -v
  • ⚠️ GLiNER 对中文人名检出弱:偏英文;正则 PII 仍覆盖手机/邮箱/身份证等。改进需换中文 NER 模型
  • ℹ️ OSS 同步需装 oss2:pip install oss2;未装则仅本地,多端不同步

🚧 不在当前范围(后续路线图)

  • Wiki 进阶:实体图谱、交叉引用反向维护、LLM 矛盾检测
  • 知识库部门级权限控制、Rerank(qwen3-rerank)、answer_style 格式化
  • Milvus user_profiles / knowledge_graph
  • 钉钉重发去重(按 message_id 幂等)
  • Planner 改 plain ainvoke(与 Router/FactExtractor 一致)

📚 文档

🧰 技术栈

Python 3.11 · FastAPI · LangGraph 1.x · langchain-openai (DashScope) · pymilvus 3.0 · oss2 · gliner · dingtalk-stream · pytest · Docker


Built with LangGraph,一个会自己想清楚的助手。

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors