Search Agent (Qwen3-0.6B) 重构版说明

目标：基于本地 Qwen3-0.6B 搭建一个最小可用的搜索 Agent Pipeline。

核心流程：
1. 用户提供初始任务 prompt。
2. 模型逐 token 生成并检测工具调用格式。
3. 遇到搜索工具调用 => 执行搜索 => 将搜索结果注入上下文继续生成。
4. 不再触发工具调用且出现终止标记（<eos> / eos token） => 输出最终链接集合。

Notebook 结构：
- 配置与依赖
- 模型加载
- 工具与解析层 (Tool / SearchTool / ToolCallParser)
- AgentPipeline 主循环 (流式 + 工具接入)
- 示例运行 (单函数入口)
- 扩展建议

约定：
- 工具调用格式候选：`<tool_call:search>查询词` / `search: 查询词` / `[SEARCH] 查询词`
- 工具响应结束 token id（暂用推测值）：151666 (</tool_response>)
- 本示例仅做结构演示，未包含安全过滤、重试、并行优化。

这一部分说明所需的依赖与环境前提，提醒读者检查本地模型目录 `./Qwen3-0.6B` 并在缺包时安装 `transformers`、`huggingface_hub`、`requests`、`beautifulsoup4`。

这个单元会动态检查依赖是否安装，并导入后续流程会用到的核心库；如果缺包会抛出清晰的提示便于补齐环境。

In [5]:
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch, requests, re
from bs4 import BeautifulSoup
from pathlib import Path
from dataclasses import dataclass
from typing import Dict, List, Any


下面的单元负责加载本地 Qwen3-0.6B 模型与分词器，并根据是否可用 GPU 自动选择 dtype 和 device_map。

In [6]:
# 模型与分词器加载
MODEL_DIR = Path("Qwen3-0.6B")
if not MODEL_DIR.exists():
    raise FileNotFoundError(f"模型目录不存在: {MODEL_DIR.resolve()}")

print(f"加载模型目录: {MODEL_DIR}")
tokenizer = AutoTokenizer.from_pretrained(str(MODEL_DIR), local_files_only=True)

dtype = torch.float16 if torch.cuda.is_available() else torch.float32
load_kwargs = {"local_files_only": True, "torch_dtype": dtype}
if torch.cuda.is_available():
    load_kwargs["device_map"] = "auto"

model = AutoModelForCausalLM.from_pretrained(str(MODEL_DIR), **load_kwargs)
model.eval()
print("模型加载完成，当前设备:", model.device)


加载模型目录: Qwen3-0.6B
模型加载完成，当前设备: cuda:0
模型加载完成，当前设备: cuda:0


这一段用于快速检查 tokenizer 的特殊标记，以及检索包含 "tool" 关键词的 token 以辅助调试工具调用格式。

In [7]:
# Tokenizer 信息检查
print("特殊 token:", tokenizer.special_tokens_map)
print("其他特殊 token:", tokenizer.additional_special_tokens)

matched = []
for token, tid in tokenizer.get_vocab().items():
    if "tool" in token:
        matched.append((tid, token))
    if len(matched) >= 100:
        break
print("包含 'tool' 的 token 示例 (前 100):")
for tid, token in matched:
    print(tid, token)


特殊 token: {'eos_token': '<|im_end|>', 'pad_token': '<|endoftext|>', 'additional_special_tokens': ['<|im_start|>', '<|im_end|>', '<|object_ref_start|>', '<|object_ref_end|>', '<|box_start|>', '<|box_end|>', '<|quad_start|>', '<|quad_end|>', '<|vision_start|>', '<|vision_end|>', '<|vision_pad|>', '<|image_pad|>', '<|video_pad|>']}
其他特殊 token: ['<|im_start|>', '<|im_end|>', '<|object_ref_start|>', '<|object_ref_end|>', '<|box_start|>', '<|box_end|>', '<|quad_start|>', '<|quad_end|>', '<|vision_start|>', '<|vision_end|>', '<|vision_pad|>', '<|image_pad|>', '<|video_pad|>']
包含 'tool' 的 token 示例 (前 100):
45714 /tools
56466 Ġfunctools
48950 (tool
72790 -toolbar
23154 .tools
67870 Ġtoolbox
65695 _toolbar
25942 Ġtoolbar
41331 Ġitertools
78458 (toolbar
39723 _tools
60646 -tooltip
15918 tools
65894 Ġtoolkit
40224 -tool
67166 .tooltip
75027 /tool
24680 .toolStrip
44646 -tools
88265 .toolbox
36316 ertools
21539 tooltip
87183 toolbox
91234 Ġtooltips
47416 .toolbar
7375 Ġtools
151666 </tool_response>

后续工具实现依赖一些常量，这里集中定义工具结束 token、搜索模式、HTTP 头等基础配置，保持全局可见。

In [8]:
# 工具与解析层：基础配置
TOOL_END_TOKEN_ID = 151666  # 推测为 </tool_response>
SEARCH_PATTERNS = [
    r"search:\s*(.*)",       # 兼容旧格式 "search: query"
    r"\[TOOL_CALL\]\s*\n\s*search:\s*(.*)", # 兼容旧多行格式
    r"<tool_call:search>(.*?)$",  # 兼容旧XML样式
    r"\[SEARCH\]\s+(.*)$",      # 兼容旧标签
]
# 主要使用 <search>...</search> 新标签；旧模式保留以便混合 prompt 测试
HEADERS = {"User-Agent": "Mozilla/5.0 (SearchAgent Prototype)"}
MAX_LINKS_DEFAULT = 6

紧接着的代码定义抽象工具基类和抓取 jina.ai 搜索结果的 `SearchTool`，并处理异常返回结构化信息。

In [9]:
# 工具实现：SearchTool
class Tool:
    name: str

    def run(self, *args, **kwargs) -> Dict[str, Any]:
        raise NotImplementedError


class SearchTool(Tool):
    name = "search"

    def run(self, query: str, max_links: int = MAX_LINKS_DEFAULT) -> Dict[str, Any]:
        url = f"https://www.jina.ai/search?q={requests.utils.quote(query)}"
        try:
            resp = requests.get(url, headers=HEADERS, timeout=15)
            resp.raise_for_status()
        except Exception as exc:
            return {"query": query, "links": [], "error": str(exc)}

        soup = BeautifulSoup(resp.text, "html.parser")
        links: List[str] = []
        for anchor in soup.select("a[href]"):
            href = anchor.get("href")
            if href and href.startswith("http") and "jina.ai" not in href:
                links.append(href)
            if len(links) >= max_links:
                break
        return {"query": query, "links": links}


search_tool = SearchTool()


为了抽取模型输出中的工具调用，这里实现 `ToolCallParser`，兼容简单正则与预留的 JSON 包装格式。

In [10]:
# 工具调用解析器
class ToolCallParser:
    def find_search(self, text: str):
        for pattern in SEARCH_PATTERNS:
            match = re.search(pattern, text, re.IGNORECASE)
            if match:
                query = match.group(1).strip()
                query = re.split(r"</tool_response>|<eos>|\n", query)[0].strip()
                if query:
                    return query
        return None

    def find_json_tool(self, text: str):
        # 预留 JSON 工具调用格式: <tool_call>{"name":"search", ...}</tool_call>
        match = re.search(r"<tool_call>\s*(\{.*?\})\s*</tool_call>", text, re.DOTALL)
        if not match:
            return None
        raw_json = match.group(1)
        try:
            import json
            return json.loads(raw_json)
        except Exception:
            return None


parser = ToolCallParser()


在进入主循环前，需要一个统一的对话模板封装，这样模型在没有原生 chat 模板时也能 fallback 到简单的 role:content 拼接。

In [11]:
# Chat 模板包装（改进版）
def apply_chat_template(messages):
    if hasattr(tokenizer, "apply_chat_template"):
        try:
            return tokenizer.apply_chat_template(
                messages,
                tokenize=False,
                add_generation_prompt=True,
            )
        except Exception as e:
            print(f"[警告] apply_chat_template 失败: {e}, 使用 fallback 方案")
    
    # Fallback: 简单的消息拼接
    lines = []
    for m in messages:
        role = m['role']
        content = m['content']
        if role == 'system':
            lines.append(f"System: {content}")
        elif role == 'user':
            lines.append(f"User: {content}")
        elif role == 'assistant':
            lines.append(f"Assistant: {content}")
    
    lines.append("Assistant:")  # 提示模型开始生成
    return "\n\n".join(lines)


接下来的 Pipeline 单元负责逐 token 流式生成、检测工具调用、执行搜索并把结果拼回消息列表，是整个 Agent 的核心逻辑。

系统提示（System Prompt）

下面新增一个系统级指导信息，要求模型在需要检索时使用形如 <search>关键词</search> 的标签包围搜索词。生成过程中一旦形成闭合标签即触发搜索，将搜索结果与原始关键词回注入上下文继续回答。

In [12]:
# 系统提示定义（简化版 - 优先保证基础对话功能）
GLOBAL_SYSTEM_PROMPT = (
    "你是一个友好的助手。"
    "如果需要搜索信息,用 <search>关键词</search> 标签包围搜索词。"
    "否则直接回答问题。保持回答简洁自然。"
)

In [13]:
# AgentPipeline 定义（重构版 - 修复生成逻辑）
@dataclass
class GenerationResult:
    final_text: str
    search_queries: List[str]
    links: List[str]


class AgentPipeline:
    def __init__(self, model, tokenizer, tool: Tool, parser: ToolCallParser):
        self.model = model
        self.tokenizer = tokenizer
        self.tool = tool
        self.parser = parser

    def _prepare_inputs(self, messages: List[Dict[str, str]]):
        prompt = apply_chat_template(messages)
        if verbose_g:
            print("--- PROMPT START ---")
            print(prompt)
            print("--- PROMPT END ---\n")
        encoded = self.tokenizer([prompt], return_tensors="pt")
        return (
            encoded["input_ids"].to(self.model.device),
            encoded["attention_mask"].to(self.model.device),
        )

    def _append_message(self, messages: List[Dict[str, str]], role: str, content: str):
        messages.append({"role": role, "content": content})

    def _dedup(self, items: List[str]) -> List[str]:
        seen = set()
        ordered = []
        for item in items:
            if item not in seen:
                seen.add(item)
                ordered.append(item)
        return ordered

    def stream(self, messages: List[Dict[str, str]], *, max_steps: int = 96, verbose: bool = True) -> GenerationResult:
        global verbose_g
        verbose_g = verbose
        
        all_generated_tokens: List[int] = []  # 记录所有生成的token
        collected_links: List[str] = []
        search_queries: List[str] = []
        
        # 支持多轮搜索-生成
        max_search_rounds = 3
        for search_round in range(max_search_rounds):
            if verbose:
                print(f"\n{'='*50}")
                print(f"生成轮次 {search_round + 1}/{max_search_rounds}")
                print(f"{'='*50}\n")
            
            input_ids, attention_mask = self._prepare_inputs(messages)
            generated: List[int] = []
            cache = None
            
            with torch.inference_mode():
                for step in range(max_steps):
                    outputs = self.model(
                        input_ids=input_ids,
                        attention_mask=attention_mask,
                        use_cache=True,
                        past_key_values=cache,
                    )
                    logits = outputs.logits[:, -1, :]
                    cache = outputs.past_key_values

                    next_token = torch.argmax(logits, dim=-1)
                    token_id = next_token.item()
                    generated.append(token_id)

                    # 续写输入
                    input_ids = torch.cat([input_ids, next_token.unsqueeze(0)], dim=1)
                    attention_mask = torch.cat([
                        attention_mask,
                        torch.ones_like(next_token).unsqueeze(0),
                    ], dim=1)

                    token_text = self.tokenizer.decode([token_id], skip_special_tokens=False)
                    current_text = self.tokenizer.decode(generated, skip_special_tokens=False)
                    
                    if verbose:
                        print(f"Step {step:02d} | id={token_id:6d} | {repr(token_text)}")

                    # 终止条件: EOS token
                    if token_id == self.tokenizer.eos_token_id:
                        if verbose:
                            print("[终止] 遇到 EOS token，本轮生成结束。")
                        break
                    
                    # 检测 <search>...</search> 标签（只在生成足够长度后检测，避免过早触发）
                    if step > 5:  # 至少生成5个token后再检测
                        search_match = re.search(r"<search>(.*?)</search>", current_text, flags=re.DOTALL)
                        if search_match:
                            query = search_match.group(1).strip().replace("\n", " ")
                            if query and query not in search_queries:
                                if verbose:
                                    print(f"\n[搜索触发] query='{query}'")
                                search_queries.append(query)
                                
                                # 执行搜索
                                result = self.tool.run(query)
                                links = result.get("links", [])
                                collected_links.extend(links)
                                
                                # 构造搜索结果反馈
                                feedback = f"搜索词: {query}\n找到以下链接:\n"
                                if links:
                                    feedback += "\n".join(f"{i+1}. {link}" for i, link in enumerate(links[:5]))
                                else:
                                    feedback += "(未找到相关结果)"
                                feedback += "\n\n请基于这些信息继续回答。"
                                
                                # 保存当前生成的内容
                                all_generated_tokens.extend(generated)
                                
                                # 添加消息并准备下一轮生成
                                self._append_message(messages, "assistant", current_text)
                                self._append_message(messages, "user", feedback)
                                
                                if verbose:
                                    print(f"[搜索完成] 找到 {len(links)} 个链接，准备下一轮生成...")
                                break  # 跳出当前生成循环，开始新一轮
                
                # 本轮生成结束，检查是否需要继续
                if step == max_steps - 1:
                    if verbose:
                        print("[警告] 达到最大步数限制")
                    all_generated_tokens.extend(generated)
                    break  # 退出多轮循环
                
                # 如果没有触发搜索，说明正常结束
                search_match = re.search(r"<search>(.*?)</search>", current_text, flags=re.DOTALL)
                if not search_match:
                    all_generated_tokens.extend(generated)
                    if verbose:
                        print("[正常结束] 未检测到搜索请求，生成完成。")
                    break  # 正常结束，退出多轮循环

        # 解码最终文本
        final_text = self.tokenizer.decode(all_generated_tokens, skip_special_tokens=True)
        
        return GenerationResult(
            final_text=final_text,
            search_queries=search_queries,
            links=self._dedup(collected_links),
        )


为了方便外部调用，下面会构造 `run_search_agent` 包装函数，复用已经初始化好的 Pipeline 并打印关键信息。

In [14]:
# 统一入口函数（优化版）
pipeline = AgentPipeline(model=model, tokenizer=tokenizer, tool=search_tool, parser=parser)

def _clean_assistant_output(raw: str) -> str:
    """清理模型输出中的特殊标记"""
    import re
    # 移除思考标签
    cleaned = re.sub(r'<think>.*?</think>', '', raw, flags=re.DOTALL | re.IGNORECASE)
    # 移除搜索标签（保留内容）
    cleaned = re.sub(r'<search>(.*?)</search>', r'\1', cleaned, flags=re.DOTALL)
    # 移除特殊token
    for token in ['<|im_end|>', '<eos>', '</think>', '<|endoftext|>']:
        cleaned = cleaned.replace(token, '')
    # 清理多余换行
    cleaned = re.sub(r'\n{3,}', '\n\n', cleaned).strip()
    return cleaned

def run_search_agent(query: str, *, max_steps: int = 128, verbose: bool = True) -> GenerationResult:
    """
    运行搜索增强的 Agent
    
    Args:
        query: 用户查询
        max_steps: 每轮生成的最大步数
        verbose: 是否打印详细信息
    """
    messages = [
        {"role": "system", "content": GLOBAL_SYSTEM_PROMPT},
        {"role": "user", "content": query},
    ]
    
    result = pipeline.stream(messages, max_steps=max_steps, verbose=verbose)
    
    # 清理输出
    cleaned_output = _clean_assistant_output(result.final_text)
    
    # 打印结果摘要
    print("\n" + "="*60)
    print("生成完成！")
    print("="*60)
    
    if result.search_queries:
        print(f"\n执行了 {len(result.search_queries)} 次搜索:")
        for i, q in enumerate(result.search_queries, 1):
            print(f"  {i}. {q}")
    
    if result.links:
        print(f"\n找到 {len(result.links)} 个链接:")
        for i, link in enumerate(result.links[:8], 1):  # 只显示前8个
            print(f"  {i}. {link}")
        if len(result.links) > 8:
            print(f"  ... (还有 {len(result.links) - 8} 个)")
    
    print("\n最终回答:")
    print("-" * 60)
    print(cleaned_output if cleaned_output else "(空)")
    print("-" * 60)
    
    return result


最后的示例单元提供一个默认 prompt，方便手动触发一次搜索调用进行调试；默认注释掉实际调用以免误运行。

## 测试基础对话功能

在测试搜索功能之前，先确保模型能够正常进行对话。

In [15]:
# 测试 1: 基础问候（不需要搜索）
print("="*60)
print("测试 1: 基础问候 - 'hello'")
print("="*60)
simple_result = run_search_agent("hello", max_steps=50, verbose=False)


测试 1: 基础问候 - 'hello'

生成完成！

最终回答:
------------------------------------------------------------
<think>好

<：

好的 �

一个搜索

好的翻译

词

 what·

请，人关键词
关于
你的搜索的

、位...文好
------------------------------------------------------------

生成完成！

最终回答:
------------------------------------------------------------
<think>好

<：

好的 �

一个搜索

好的翻译

词

 what·

请，人关键词
关于
你的搜索的

、位...文好
------------------------------------------------------------


In [16]:
# 测试 2: 简单问答（不需要搜索）
print("\n" + "="*60)
print("测试 2: 简单问答")
print("="*60)
qa_result = run_search_agent("1+1等于几?", max_steps=50, verbose=False)



测试 2: 简单问答

生成完成！

最终回答:
------------------------------------------------------------
：

。
搜索？文
=
------------------------------------------------------------

生成完成！

最终回答:
------------------------------------------------------------
：

。
搜索？文
=
------------------------------------------------------------


In [None]:
# 测试 3: 需要搜索的问题
print("\n" + "="*60)
print("测试 3: 需要搜索 - 明确使用 <search> 标签")
print("="*60)
search_result = run_search_agent(
    "请帮我搜索: <search>Python 3.12 新特性</search>",
    max_steps=80,
    verbose=False
)


--- Running Complex Query ---
--- PROMPT START ---
<|im_start|>system
你是一个搜索增强助手。判断需要外部信息时，请生成 <search>关键词</search> 标签。 标签内只放原始搜索关键词，避免多句。形成 </search> 闭合后会自动检索 Jina.ai。 系统会将检索到的若干链接与原始关键词回传，你需要利用这些结果综合回答。 如果不需要搜索就直接正常回答并结束。不要虚构搜索结果。<|im_end|>
<|im_start|>user
给我一些近期开源中文多模态项目。
<search>开源 中文 多模态 项目</search><|im_end|>
<|im_start|>assistant

--- PROMPT END ---

Step 00 | id=151667 | '<think>'
Step 01 | id=198 | '\n'
Step 02 | id=29258 | '重'
Step 01 | id=198 | '\n'
Step 02 | id=29258 | '重'
Step 03 | id=13072 | '名'
Step 04 | id=198 | '\n'
Step 03 | id=13072 | '名'
Step 04 | id=198 | '\n'
Step 05 | id=59258 | '近'
Step 06 | id=78973 | '搜索'
Step 05 | id=59258 | '近'
Step 06 | id=78973 | '搜索'
Step 07 | id=5373 | '、'
Step 08 | id=198 | '\n'
Step 07 | id=5373 | '、'
Step 08 | id=198 | '\n'
Step 09 | id=198 | '\n'
Step 10 | id=198 | '\n'
Step 09 | id=198 | '\n'
Step 10 | id=198 | '\n'
Step 11 | id=198 | '\n'
Step 12 | id=198 | '\n'
Step 11 | id=198 | '\n'
Step 12 | id=198 | '\n'
Step 13 | id=198 | '\n

# 改进说明与下一步优化

## 本次重构的关键修复:
1. **修复生成逻辑** - 不再频繁清空 `generated` 列表,保留完整生成历史
2. **多轮生成支持** - 搜索触发后启动新一轮生成,而非重置状态
3. **改进终止条件** - 延迟搜索检测(至少5步后),避免过早触发
4. **优化模板格式** - 改进 fallback chat 模板,提高兼容性
5. **清理输出函数** - 移除特殊标记,保留可读内容

## 下一步可以优化的内容:
- **采样策略**: 使用 top-k / top-p / temperature 代替 argmax,增加多样性
- **结构化工具调用**: 支持 JSON 格式 `<tool_call>{"name":"search",...}</tool_call>`
- **多搜索源**: 聚合 Google/Bing/DuckDuckGo 等多个搜索引擎
- **结果摘要**: 使用模型对搜索结果进行总结和相关性评分
- **重试与限流**: 避免过频访问,添加指数退避
- **对话缓存**: LRU / Redis 缓存历史对话和搜索结果
- **安全过滤**: 域名白名单、恶意内容检测
- **流式输出**: 实时显示生成的 token
