# 溯源树-web搜索版
1. 接收一个论文的链接、或pdf、或论文名，然后：
    * 如果是pdf，直接使用pypdf解析
    * 如果是链接，获取其pdf版本，
    * 如果是论文名，使用google scholar搜索，获取其pdf链接，然后使用pypdf解析
2. LLM解析论文，获取{title，publication_date，abstract，……，category，references[article_1,...]} 
3. 在谷歌学术上搜索所有引用论文的论文名，回到1.的接收论文名的情况，对这些论文也进行解析，然后基于一定的方法（比如论文引用数、摘要的相似度）选取其中的top k，这种做法可以只对摘要进行，因为嵌入模型处理长度有上限

In [None]:
# 依赖安装
! pip install pydantic langchain langchain_community langchain_core PyPDF2 anytree arxiv requests matplotlib

In [None]:
import os
from dotenv import load_dotenv

# 加载 .env 文件
load_dotenv()

# 尝试获取环境变量并打印出来
dashscope_api_key = os.getenv("DASHSCOPE_API_KEY")

## 1.由论文名获取论文内容
- pdf解析  
- 论文名搜索-测试api调用时是否有websearch能力？--没有
- 没有的话就得结合工具了，看一下能不能直接获取论文内容

通过arxiv，根据论文名搜索论文，返回：1.网页上的论文信息 2.论文原文

In [None]:
import arxiv
import requests
from PyPDF2 import PdfReader
from io import BytesIO

def search_paper_info(paper_title):
    """
    搜索arXiv论文并返回其元数据信息。
    
    参数:
        paper_title (str): 要搜索的论文标题
        
    返回:
        info (str): 论文标题、摘要等元数据（若未找到则返回错误信息）
    """
    try:
        # 使用arxiv.py库搜索论文
        search = arxiv.Search(
            query=paper_title,
            max_results=1,
            sort_by=arxiv.SortCriterion.Relevance
        )
        client = arxiv.Client()
        results = list(client.results(search))
        
        if not results:
            return "未找到匹配论文"
        
        paper = results[0]
        
        # 构建元数据字典
        info = {
            "title": paper.title,
            "abstract": paper.summary
        }
        return info

    except arxiv.ArxivError as e:
        return f"arXiv搜索失败: {str(e)}"

def search_paper_text(paper_title):
    """
    下载并解析arXiv论文的PDF文本内容。
    
    参数:
        paper_title (str): 要搜索的论文标题
        
    返回:
        text (str|None): 论文PDF文本内容（解析失败或未找到时返回None）
    """
    try:
        # 使用arxiv.py库搜索论文
        search = arxiv.Search(
            query=paper_title,
            max_results=1,
            sort_by=arxiv.SortCriterion.Relevance
        )
        client = arxiv.Client()
        results = list(client.results(search))
        
        if not results:
            return "未找到匹配论文", None
        
        paper = results[0]
        
        # 下载并解析PDF
        pdf_url = paper.pdf_url
        
        
        # 下载PDF文件
        response = requests.get(pdf_url, timeout=10)
        response.raise_for_status()
        
        # 解析PDF内容
        with BytesIO(response.content) as pdf_file:
            reader = PdfReader(pdf_file)
            text = "\n".join(page.extract_text() for page in reader.pages)
        
        return text.strip() if text else None

    except requests.exceptions.RequestException as e:
        return f"PDF下载失败: {str(e)}"
    except Exception as e:
        return f"PDF解析失败: {str(e)}"

In [None]:
# 测试
# paper_title="HotpotQA: A Dataset for Diverse, Explainable Multi-hop Question Answering"

In [None]:
# paper_info= search_paper_info(paper_title)

In [None]:
# print(paper_info)
# print(paper_info["abstract"])

In [None]:
# paper_text = search_paper_text(paper_title) 
# print(paper_text[:200])

## 2.解析论文LLM
和之前差不多，解析论文信息：题目、发布时间、摘要、引用文献；  
改用LangChain的结构化输出处理格式问题

In [None]:
from langchain_community.llms import Tongyi
from langchain.prompts import PromptTemplate
from pydantic import BaseModel, Field
from langchain.output_parsers import PydanticOutputParser

class PaperInfo(BaseModel):
    """
    定义论文信息的结构化输出格式。
    """
    title: str = Field(description="Title of the paper")
    abstract: str = Field(description="Abstract of the paper")
    references: list[str] = Field(description="List of titles from the 'references' section")

class ParseLLM:
    def __init__(self, model_name, api_key):
        self.llm = Tongyi(model=model_name, api_key=api_key)
        
        # 使用 PydanticOutputParser 来解析结构化输出
        self.output_parser = PydanticOutputParser(pydantic_object=PaperInfo)
        
        # 解析引用，对应建树方法2
        self.parse_prompt_template = PromptTemplate(template="""
            <document>{document}</document>
            
            <task>
            Extract the following information from the document above and return it in JSON format:
            - title: Title of the paper
            - abstract: Abstract of the paper
            - references: List of titles from the "references" section (exact strings as they appear). Only extract titles, do not include other information.
            </task>
            
            {format_instructions}
        """, input_variables=["document"], partial_variables={"format_instructions": self.output_parser.get_format_instructions()})

    def parse_paper(self, paper):
        try:
            # 格式化提示模板
            prompt = self.parse_prompt_template.format(document=paper)
            
            # 调用模型并获取响应
            response = self.llm.invoke(prompt)
            
            # 使用 PydanticOutputParser 解析响应为结构化数据
            structured_output = self.output_parser.parse(response)
            return structured_output
        
        except Exception as e:
            # 捕获其他异常（如 LLM 拒绝回答、网络错误等）
            raise RuntimeError(f"Error occurred while processing the paper: {str(e)}")

In [None]:
# 测试
# 初始化解析器
# parser =ParseLLM(model_name="qwen-turbo",api_key=dashscope_api_key)
 
# result = parser.parse_paper(paper_text)

In [None]:
# print("Title:", result.title)
# print("Abstract:", result.abstract)
# print("References:", result.references)

## 3.进行建树
def parse_and_build_trace_tree(root_paper_title, max_layers, parse_llm, embedding, output_path, top_k):  
维护一个待处理节点队列nodes_to_process[paper_title:str]，进行以下建树操作：  
1. 获取根节点文献内容，使用search_paper_text获取paper_text
2. 根据当前处理节点的引用，对引用列表中的论文，使用search_paper_info获取paper_info
3. 使用嵌入模型，对引用论文的摘要向量化，选择摘要相关性top k且之前未出现在溯源树中的引用论文作为当前处理论文的子节点，并将选择的论文的paper_title加入待处理节点队列
4. 重复步骤1-3，直至溯源树达到要求的层次
使用anytree建树，建完树后使用save_tree_to_json(root,output_path)函数，将树结构保存到output_path

辅助函数

In [None]:
import os
import json
from anytree import Node, RenderTree
    
# 打印树结构
def print_tree(root_node):
    for pre, fill, node in RenderTree(root_node):
        print(f"{pre}{node.name}")
        
# 将树转换为dict
def tree_to_dict(node):
    """
    将 anytree 节点及其子节点递归转换为字典。
    
    参数:0
        node: anytree 节点。
        
    返回:
        表示树结构的字典。
    """
    return {
        "name": node.name,
        "abstract": node.abstract, 
        "children": [tree_to_dict(child) for child in node.children]
    }

# 将树保存为json
def save_tree_to_json(root, file_path):
    """
    将 anytree 树保存为 JSON 文件。
    
    参数:
        root: 树的根节点。
        file_path: 输出 JSON 文件的路径。
    """
    # 确保文件路径的目录存在
    os.makedirs(os.path.dirname(file_path), exist_ok=True)
    
    # 将树转换为字典
    tree_dict = tree_to_dict(root)
    
    # 写入 JSON 文件
    with open(file_path, 'w', encoding='utf-8') as f:
        json.dump(tree_dict, f, ensure_ascii=False, indent=4)
    
    print(f"树已成功保存到: {file_path}")
        
# 将字典转换为树
def dict_to_tree(tree_dict, parent=None):
    """
    将字典形式的树结构递归转换为 anytree 节点。
    
    参数:
        tree_dict: 表示树结构的字典。
        parent: 当前节点的父节点（用于递归构建子节点）。
        
    返回:
        构建完成的 anytree 节点。
    """
    # 创建当前节点
    current_node = Node(tree_dict["name"], parent=parent, data=tree_dict.get("data"))
    
    # 递归构建子节点
    for child_dict in tree_dict.get("children", []):
        dict_to_tree(child_dict, parent=current_node)
    
    return current_node

# 从json加载树
def load_tree_from_json(file_path):
    """
    从 JSON 文件中加载树结构并返回根节点。
    
    参数:
        file_path: 输入 JSON 文件的路径。
        
    返回:
        树的根节点。
    """
    with open(file_path, 'r', encoding='utf-8') as f:
        tree_dict = json.load(f)
    
    # 从字典重建树
    root = dict_to_tree(tree_dict)
    return root

建树，额外说明：  
- references_limit  
    - 设references_limit主要是为了速度，以及有的论文引用数量太大（遇到过一个九百多的），
    - 当references_limit=10时，生成一个节点需要1min左右，包括获取论文信息（5~10s/篇）、获取原文并解析论文引用（20s左右）、建向量库和选取topk（20s左右）的时间开销；
    - 大部分论文的引用论文数量在30左右，设为30应该已经足够；实测30篇论文获取信息大约需要2min，建节点大约需要3min
- 实际使用时需要需要平衡topk和max_layers，个人感觉max_layers超过三层基本没有必要，以三层为例，topk选取3~5应该比较适宜

后续用于research时，看看能不能并行加速

In [None]:
from anytree import Node, RenderTree
from collections import deque
from typing import List, Tuple
from langchain_core.documents import Document
from langchain_core.vectorstores import InMemoryVectorStore
from tqdm import tqdm

def build_trace_tree(
    root_paper_title: str,
    max_layers: int,
    parse_llm,
    embedding,
    output_path: str,
    top_k: int,
    references_limit:int
):
    """
    使用 anytree 构建溯源树，并将树结构保存为 JSON 文件。
    
    参数:
        root_paper_title (str): 根节点论文标题
        max_layers (int): 溯源树的最大层数
        parse_llm: 用于解析论文内容的 LLM 工具
        embedding: 嵌入模型，用于计算摘要向量的相关性
        output_path (str): 输出溯源树的文件路径
        top_k (int): 每层选择的最相关引用论文数量
        references_limit (int): 每篇论文中引用文献的最大数量
    
    返回:
        None: 将溯源树保存到指定路径
    """
    # 初始化溯源树和待处理队列
    nodes_to_process = deque()  # 待处理节点队列
    visited_titles = set()  # 已访问的论文标题集合，避免重复处理
    # 初始化内存向量数据库
    vector_store = InMemoryVectorStore(embedding)
    
    # 获取根节点论文内容并初始化溯源树
    try:
        root_paper_info = search_paper_info(root_paper_title)
        if "未找到匹配论文" in root_paper_info:
            raise ValueError("未找到根节点论文")
        
        root_paper_text = search_paper_text(root_paper_title)
        if not root_paper_text:
            raise ValueError("无法获取根节点论文内容")
        
        # 创建根节点，节点中保存论文名和论文摘要
        root_node = Node(root_paper_info["title"], abstract=root_paper_info["abstract"])
        nodes_to_process.append((root_node, 0))  # (当前节点, 当前层数)
        visited_titles.add(root_paper_info["title"])    # 节点标题添加到已访问集合
        
    except Exception as e:
        print(f"初始化根节点失败: {e}")
        return

    # 构建溯源树
    while nodes_to_process:
        current_node, current_layer = nodes_to_process.popleft()

        # 如果达到最大层数，停止处理
        if current_layer >= max_layers:
            break
        
        # 提示当前处理论文的信息
        print(f"\n当前节点论文: {current_node.name}")

        # 获取当前论文的引用列表
        # 先获取原文，然后用LLM解析出引用文献列表
        try:
            paper_text=search_paper_text(current_node.name)
            references = parse_llm.parse_paper(paper_text).references
            if not references:
                continue
        except Exception as e:
            print(f"解析引用失败: {e}")
            continue
        
        # 截取前references_limit个引用
        if len(references) > references_limit:
            references = references[:references_limit]

        # 对引用论文进行信息提取和筛选
        for ref_title in tqdm(references,desc="处理引用论文信息", unit="paper"):
            # 获取引用论文的元数据信息
            ref_info = search_paper_info(ref_title)
                        
            if "未找到匹配论文" in ref_info:
                continue  # 跳过无效引用
            
            if ref_info["title"] in visited_titles:
                continue  # 跳过已访问的论文

            # 将引用论文的标题和摘要添加到内存向量数据库,注意使用arxiv上获得的论文信息，避免LLM解析时出现的任何格式和大小写问题
            doc=Document(page_content=ref_info["abstract"], metadata={"title": ref_info["title"]})
            vector_store.add_documents([doc])

        # 选择相关性最高的 top_k 引用论文
        selected_docs = vector_store.similarity_search(current_node.abstract, k=top_k)

        # 对召回且未加入树中的论文创建节点，更新溯源树，添加子节点到队列，添加论文标题到已访问集合
        for doc in selected_docs:
            if doc.metadata["title"] not in visited_titles:
                child_node = Node(doc.metadata["title"], parent=current_node, abstract=doc.page_content)
                nodes_to_process.append((child_node, current_layer + 1))
                visited_titles.add(doc.metadata["title"])
                
        # 清空vectore，最简单的方法就是重新初始化一个
        vector_store = InMemoryVectorStore(embedding)
        

    # 打印溯源树结构（可选）
    print("\n溯源树构建完成，打印树结构:")
    # print(RenderTree(root_node))
    print_tree(root_node)

    # # 保存溯源树到 JSON 文件
    save_tree_to_json(root_node, output_path)
    
    return root_node

In [None]:
# embedding
from langchain_community.embeddings import DashScopeEmbeddings

parse_llm=ParseLLM(model_name="qwen-turbo",api_key=dashscope_api_key)

embeddings = DashScopeEmbeddings(
    model="text-embedding-v3", dashscope_api_key=dashscope_api_key
)

In [None]:
# 建树示例
root_paper_title="PoisonedRAG"
output_path="output/tree/"+root_paper_title+".json"

In [None]:
root=build_trace_tree(
    root_paper_title=root_paper_title,
    max_layers=3,
    parse_llm=parse_llm,
    embedding=embeddings,
    output_path=output_path,
    top_k=3,
    references_limit=30
)

In [None]:
# 建树后的写入、写出json文件测试
print_tree(root)

# tree_path="output/tree/trace_tree01.json"
# save_tree_to_json(root,tree_path)

In [None]:
# tree_path="output/tree/trace_tree01.json"
# root=load_tree_from_json(tree_path)
# print_tree(root)

## 4.绘制溯源树,转为图像

使用Graphviz，需要先去官网下载：https://www.graphviz.org/download/，  
添加Graphviz/bin的路径到环境变量path，并且重启程序

In [None]:
from anytree import Node, RenderTree
from anytree.exporter import DotExporter
from graphviz import Source
import os
import textwrap

def draw_tree_to_image(root, filename="tree.png", format="png", output_path=".", max_text_width=20):
    """
    使用 graphviz 将 anytree 格式的树绘制为图像文件，节点颜色按层次轮替。
    
    参数:
        root (Node): 树的根节点，类型为 anytree.Node。
        filename (str): 导出的图像文件名，默认为 "tree.png"。
        format (str): 图像格式，默认为 "png"，可选值包括 "png", "svg", "pdf" 等。
        output_path (str): 输出目录，默认为当前目录 "."。
        max_text_width (int): 每行最大字符数，默认为 20。
        
    输出:
        生成指定格式的树形图，并保存到指定目录中。
    """
    if not root:
        print("树为空")
        return

    # 确保输出目录存在
    os.makedirs(output_path, exist_ok=True)

    # 构造完整的文件路径
    full_path = os.path.join(output_path, filename)

    # 计算每个节点的颜色映射
    node_color_map = calculate_depth_colors(root)

    # 动态计算节点大小和间距
    def calculate_node_size_and_spacing():
        # 获取所有节点
        all_nodes = [node for _, _, node in RenderTree(root)]
        max_name_length = max(len(node.name) for node in all_nodes)
        max_children = max(len(node.children) for node in all_nodes) or 1
        node_width = max(0.03, 0.005 * max_name_length)  # 减小节点宽度
        ranksep = max(0.05, 0.01 * max_name_length)      # 层间距随名称长度调整
        nodesep = max(0.05, 0.02 * max_children)        # 同层节点间距随子节点数量调整
        return node_width, ranksep, nodesep

    node_width, ranksep, nodesep = calculate_node_size_and_spacing()

    # 自动换行函数
    def wrap_text(text, max_width):
        """
        将文字内容按指定宽度自动换行，并确保内容符合 DOT 格式要求。
        
        参数:
            text (str): 原始文字内容。
            max_width (int): 每行最大字符数。
            
        返回:
            str: 插入换行符后的文字内容，已转义为符合 DOT 格式的字符串。
        """
        wrapped_lines = textwrap.wrap(text, width=max_width)
        wrapped_text = "\n".join(wrapped_lines)
        return json.dumps(wrapped_text)  # 使用 json.dumps 自动转义特殊字符

    # 使用 DotExporter 导出 DOT 格式
    dot_exporter = DotExporter(
        root,
        nodeattrfunc=lambda node: (
            f'label={wrap_text(node.name, max_text_width)} '
            f'shape=box style=filled fillcolor="{node_color_map[node]}" '
            f'width={node_width} height={node_width} fontsize=14'
        ),
        edgeattrfunc=lambda parent, child: 'color="#E8F3E8" style="bold"',
        options=[
            "rankdir=TB",                # 设置从上到下布局
            f"ranksep={ranksep}",        # 设置层间距
            f"nodesep={nodesep}",        # 设置同层节点间距
            "splines=curved"             # 设置边为曲线
        ]
    )

    # 导出 DOT 格式数据
    dot_data = "".join(dot_exporter)
    # print("Generated DOT data:")  # 调试输出
    # print(dot_data)  # 打印生成的 DOT 数据以便检查

    # 使用 graphviz 渲染为图像文件
    graph = Source(dot_data)
    rendered_file = graph.render(filename=full_path, format=format, cleanup=True)
    print(f"树已导出为图像文件: {rendered_file}")

def calculate_depth_colors(root):
    """
    计算树中每个节点的深度，并返回一个颜色映射字典。
    
    参数:
        root (Node): 树的根节点。
        
    返回:
        dict: 节点到颜色的映射（如 {node: "#006400"}）。
    """
    # 定义三种逐渐变浅的绿色
    green_colors = [
        "#32CD32",  # 中绿
        "#98FB98",  # 浅绿
        "#F0FFF0",  # 淡绿
    ]

    # 使用广度优先搜索（BFS）计算深度并生成颜色映射
    node_color_map = {}
    queue = [(root, 0)]  # (当前节点, 当前深度)
    while queue:
        node, depth = queue.pop(0)
        color_index = depth % len(green_colors)  # 根据深度选择颜色
        node_color_map[node] = green_colors[color_index]
        for child in node.children:
            queue.append((child, depth + 1))

    return node_color_map

In [None]:
# 示例用法
output_directory = "output/charts/" # 指定输出路径
draw_tree_to_image(root, filename="tree03", output_path=output_directory,max_text_width=20)