In [2]:
import requests
import json
import os
from dotenv import load_dotenv
import time
import pandas as pd
import networkx as nx
from pyvis.network import Network
from tqdm.notebook import tqdm

# 加载环境变量
load_dotenv()

# 获取API密钥
api_key = os.getenv("DMX_API_KEY")
if not api_key:
    raise ValueError("请在.env文件中设置您的DMX_API_KEY")

print("✅ 环境设置完毕，API密钥已加载。")

✅ 环境设置完毕，API密钥已加载。


In [3]:
file_path = 'data/Design_of_Analog_CMOS_Integrated_Circuits_2e_by_Behzad_Razavi_拉扎维_英文版.json' # 确保文件名与您保存的文件一致

try:
    with open(file_path, 'r', encoding='utf-8') as f:
        book_data = json.load(f)
    print(f"📚 成功加载书籍: '{book_data['title']}'")
    print(f"共有 {book_data['total_sections']} 个章节。")
except FileNotFoundError:
    print(f"❌ 错误: 未找到文件 '{file_path}'。请确保文件路径正确。")
except json.JSONDecodeError:
    print(f"❌ 错误: 文件 '{file_path}' 不是有效的JSON格式。")

📚 成功加载书籍: '智能分割文档'
共有 232 个章节。


In [4]:
# API配置
API_URL = "https://www.dmxapi.cn/v1/chat/completions"
MODEL_NAME = "deepseek-v3.1"

# 使用您提供的、针对拉扎维教材的、高度优化的Prompt模板
prompt_template = """
You are an expert in analog CMOS circuit design with a deep, intuitive understanding of Behzad Razavi's textbook, "Design of Analog CMOS Integrated Circuits." You are also a specialist in building knowledge graphs. Your task is to meticulously read the provided text and extract semantic triplets to build a comprehensive knowledge graph of analog circuit design.

**1. GLOBAL SCHEMA:**
You must strictly adhere to the following entity and relation types.

**ENTITY TYPES:**
- `Component`: Fundamental physical elements (e.g., MOS Transistor, Resistor, Capacitor).
- `Circuit Topology`: A specific arrangement of components (e.g., Common-Source Stage, Differential Pair, Cascode Current Mirror).
- `Fundamental Concept`: Core theoretical principles (e.g., Negative Feedback, Frequency Response, Stability).
- `Physical Phenomenon`: Observable non-ideal effects in silicon (e.g., Channel-Length Modulation, Body Effect).
- `Performance Parameter`: Quantifiable metrics of a circuit (e.g., Voltage Gain (Av), GBW, Phase Margin, Slew Rate).
- `Design Technique`: A strategy to achieve a goal (e.g., Cascoding, Miller Compensation, Source Degeneration).
- `Analysis Method`: A procedure for analyzing a circuit (e.g., Small-Signal Analysis, Half-Circuit Analysis).
- `Mathematical Model`: An equation or abstract model (e.g., Small-Signal Model, Square-Law Equation).
- `Graphical Representation`: A visual plot for analysis (e.g., Bode Plot, Root Locus).
- `Design Trade-off`: Inherent conflicting relationships (e.g., Gain-Bandwidth Trade-off).

**RELATION TYPES:**
- `is_composed_of`: (Circuit Topology -> Component / Circuit Topology) - Hierarchical structure.
- `exhibits`: (Circuit Topology -> Performance Parameter) - A circuit has a characteristic.
- `affected_by`: (Performance Parameter -> Physical Phenomenon) - A non-ideality degrades a metric.
- `mitigated_by`: (Physical Phenomenon / Design Trade-off -> Design Technique) - A technique solves a problem.
- `influences`: (Performance Parameter -> Performance Parameter) - Captures trade-offs.
- `analyzed_by`: (Circuit Topology -> Analysis Method) - How a circuit is studied.
- `is_based_on`: (Method/Technique -> Fundamental Concept) - The theoretical foundation.
- `modeled_by`: (Component/Circuit -> Mathematical Model) - An abstract representation for analysis.
- `quantified_by`: (Performance Parameter -> Mathematical Model) - The formula for a metric.
- `visualized_as`: (Fundamental Concept -> Graphical Representation) - How a concept is plotted.

**2. EXTRACTION RULES:**
- **Output Format**: You MUST return the output as a JSON list of triplets. Each triplet must have the keys: `head`, `relation`, `tail`, `head_type`, `tail_type`, and `explanation`.
- **Explanation Field**: The `explanation` field is mandatory. Briefly state the reasoning for your extraction based on the text.
- **Specificity**: Always extract the most specific and accurate entity and relation types.
- **Normalization**: Standardize terminology. For example, "Common-Source Amplifier" and "CS Stage" should both be normalized to "Common-Source Stage".
- **Focus on Intuition**: Razavi's text is rich with cause-and-effect reasoning. Capture these relationships. The goal is not just to list parts, but to explain *why* a design works and what its trade-offs are.

**3. EXAMPLE:**

**Input Text:**
"The simple common-source stage suffers from low voltage gain due to channel-length modulation. This effect reduces the output impedance. To boost the gain, a technique called cascoding can be employed, which increases the output impedance by stacking another transistor on top of the main amplifying device."

**JSON Output:**
```json
[
  {
    "head": "Common-Source Stage",
    "relation": "exhibits",
    "tail": "Voltage Gain (Av)",
    "head_type": "Circuit Topology",
    "tail_type": "Performance Parameter",
    "explanation": "The text explicitly states the common-source stage has a voltage gain, which is a key performance parameter."
  },
  {
    "head": "Voltage Gain (Av)",
    "relation": "affected_by",
    "tail": "Channel-Length Modulation",
    "head_type": "Performance Parameter",
    "tail_type": "Physical Phenomenon",
    "explanation": "The text states that the gain suffers from (is affected by) channel-length modulation."
  },
  {
    "head": "Channel-Length Modulation",
    "relation": "influences",
    "tail": "Output Impedance",
    "head_type": "Physical Phenomenon",
    "tail_type": "Performance Parameter",
    "explanation": "The text explains that channel-length modulation achieves its effect by reducing the output impedance."
  },
  {
    "head": "Voltage Gain (Av)",
    "relation": "mitigated_by",
    "tail": "Cascoding",
    "head_type": "Performance Parameter",
    "tail_type": "Design Technique",
    "explanation": "The text presents cascoding as a technique specifically used to mitigate the problem of low gain."
  },
  {
    "head": "Cascoding",
    "relation": "is_composed_of",
    "tail": "MOS Transistor",
    "head_type": "Design Technique",
    "tail_type": "Component",
    "explanation": "Cascoding is implemented by stacking a transistor, which is a fundamental component."
  }
]
Now, based on all the above instructions, extract the knowledge graph triplets from the following text section:

{{TEXT_CONTENT}}
"""

print("✅ API参数和Prompt模板定义完毕。")

✅ API参数和Prompt模板定义完毕。


In [6]:
import concurrent.futures

# --- API配置和Headers (从块4移到这里，方便函数访问) ---
API_URL = "https://www.dmxapi.cn/v1/chat/completions"
MODEL_NAME = "deepseek-v3.1"
MAX_WORKERS = 32  # 设置并发数为32

headers = {
    "Accept": "application/json",
    "Authorization": f"Bearer {api_key}", 
    "User-Agent": "DMXAPI/1.0.0",
    "Content-Type": "application/json",
}


def process_section(section):
    """
    处理单个章节的函数：构建Prompt、调用API、并进行健壮的JSON解析。
    这是将在每个并发线程中执行的工作单元。
    """
    section_title = section['title']
    content = section['content']
    
    final_prompt = prompt_template.replace("{{TEXT_CONTENT}}", content)
    
    payload = {
        "model": MODEL_NAME,
        "messages": [{"role": "user", "content": final_prompt}],
    }
    
    try:
        response = requests.post(API_URL, headers=headers, data=json.dumps(payload), timeout=60) # 设置60秒超时
        response.raise_for_status()
        result = response.json()
        
        message_content = result['choices'][0]['message']['content']
        
        # --- 健壮的JSON解析逻辑 ---
        # 1. 寻找JSON列表的开始和结束位置，以忽略前后的额外文本
        start_index = message_content.find('[')
        end_index = message_content.rfind(']')
        
        if start_index != -1 and end_index != -1:
            json_str = message_content[start_index : end_index + 1]
            try:
                # 2. 只解析提取出的JSON字符串
                triplets = json.loads(json_str)
                if isinstance(triplets, list):
                    return triplets
            except json.JSONDecodeError as json_err:
                # 3. 如果解析失败，打印详细错误和有问题的文本
                print(f"  ❌ JSON解析错误: 章节 '{section_title}'. 错误: {json_err}")
                print(f"      - 模型返回的原始文本 (片段): {message_content[:500]}...")
                return [] # 返回空列表以继续
        else:
            print(f"  ⚠️ 警告: 在章节 '{section_title}' 的返回中未找到JSON列表。")
            return []

    except requests.exceptions.RequestException as req_err:
        print(f"  ❌ 网络请求错误: 章节 '{section_title}'. 错误: {req_err}")
    except (KeyError, IndexError) as e:
        print(f"  ❌ API返回格式错误: 章节 '{section_title}'. 错误: {e}")
        print(f"      - API原始返回: {response.text}")
    except Exception as e:
        print(f"  ❌ 未知错误: 章节 '{section_title}'. 错误: {e}")
        
    return [] # 如果任何环节出错，返回一个空列表


# --- 主执行逻辑 ---
all_triplets = []
sections_to_process = book_data['sections']

# 使用ThreadPoolExecutor进行并发处理
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
    # executor.map会并发地对sections_to_process中的每个元素调用process_section函数
    # tqdm提供了进度条
    results_iterator = executor.map(process_section, sections_to_process)
    
    # 使用tqdm包装迭代器以显示进度
    for triplets in tqdm(results_iterator, total=len(sections_to_process), desc="并发提取知识中"):
        if triplets:
            all_triplets.extend(triplets)

print(f"\n✅ 知识提取完成！总共提取了 {len(all_triplets)} 个三元组。")

并发提取知识中:   0%|          | 0/232 [00:00<?, ?it/s]

  ❌ 网络请求错误: 章节 'Basic PLL Topology'. 错误: HTTPSConnectionPool(host='www.dmxapi.cn', port=443): Max retries exceeded with url: /v1/chat/completions (Caused by SSLError(SSLEOFError(8, '[SSL: UNEXPECTED_EOF_WHILE_READING] EOF occurred in violation of protocol (_ssl.c:1017)')))
  ❌ 网络请求错误: 章节 'Bandgap Reference'. 错误: HTTPSConnectionPool(host='www.dmxapi.cn', port=443): Read timed out. (read timeout=60)

✅ 知识提取完成！总共提取了 3377 个三元组。


In [7]:
if not all_triplets:
    print("❌ 未能提取到任何三元组，无法构建知识图谱。")
else:
    df = pd.DataFrame(all_triplets)
    print("原始提取三元组数量:", len(df))

    df.dropna(subset=['head', 'tail', 'relation'], inplace=True)
    df['head'] = df['head'].str.strip().str.title()
    df['tail'] = df['tail'].str.strip().str.title()
    df.drop_duplicates(inplace=True)
    print("清洗和去重后三元组数量:", len(df))

    G = nx.Graph()
    for _, row in df.iterrows():
        head_type = row.get('head_type', 'Unknown')
        tail_type = row.get('tail_type', 'Unknown')
        G.add_node(row['head'], type=head_type, title=f"{row['head']}\nType: {head_type}")
        G.add_node(row['tail'], type=tail_type, title=f"{row['tail']}\nType: {tail_type}")
        G.add_edge(row['head'], row['tail'], label=row['relation'], title=row['relation'])

    print("\n✅ 知识图谱构建完成！")
    print(f"  - 节点 (实体) 数量: {G.number_of_nodes()}")
    print(f"  - 边 (关系) 数量: {G.number_of_edges()}")

原始提取三元组数量: 3377
清洗和去重后三元组数量: 3375

✅ 知识图谱构建完成！
  - 节点 (实体) 数量: 2461
  - 边 (关系) 数量: 3134


In [8]:
if 'G' in locals() and G.number_of_nodes() > 0:
    color_map = {
        "Component": "#DB4437",           # Red
        "Circuit Topology": "#4285F4",    # Blue
        "Fundamental Concept": "#0F9D58", # Green
        "Physical Phenomenon": "#9C27B0", # Purple
        "Performance Parameter": "#F4B400",# Yellow
        "Design Technique": "#FF6D00",    # Orange
        "Analysis Method": "#00BCD4",     # Cyan
        "Mathematical Model": "#E91E63",  # Pink
        "Graphical Representation": "#795548", # Brown
        "Design Trade-off": "#FFEB3B",    # Lime
        "Unknown": "#9E9E9E"              # Grey
    }

    net = Network(notebook=True, height="800px", width="100%", cdn_resources='in_line', bgcolor="#222222", font_color="white")
    
    # 从NetworkX图导入数据
    net.from_nx(G)

    # 自定义节点样式
    for node in net.nodes:
        node_type = G.nodes[node['id']].get('type', 'Unknown')
        node['color'] = color_map.get(node_type, color_map["Unknown"])
        degree = G.degree(node['id'])
        node['size'] = 10 + degree * 2

    net.show_buttons(filter_=['physics'])
    
    output_filename = "razavi_knowledge_graph.html"
    net.show(output_filename)
    
    print(f"\n🚀 交互式知识图谱已生成！")
    print(f"请在浏览器中打开文件: {output_filename}")
else:
    print("图谱为空，跳过可视化步骤。")

razavi_knowledge_graph.html

🚀 交互式知识图谱已生成！
请在浏览器中打开文件: razavi_knowledge_graph.html


In [10]:
import json

# 检查DataFrame对象'df'是否存在且不为空
if 'df' in locals() and not df.empty:
    
    # 1. 定义输出文件名
    output_json_filename = "razavi_triplets_list.json"
    
    # 2. 使用Pandas的to_json方法直接导出
    # - orient='records' 会生成一个 "记录" 列表，每个记录是一个字典，这正是您要的格式。
    # - indent=4 使JSON文件格式化，易于阅读。
    # - force_ascii=False 确保中文字符或特殊符号正确显示。
    try:
        df.to_json(output_json_filename, orient='records', indent=4, force_ascii=False)
        
        print(f"✅ 知识图谱已成功导出为三元组列表JSON文件: {output_json_filename}")
        print("文件格式与Prompt中定义的输出结构完全一致。")
        
    except Exception as e:
        print(f"❌ 导出JSON文件时发生错误: {e}")

else:
    print("⚠️ DataFrame为空，没有内容可以导出。")

✅ 知识图谱已成功导出为三元组列表JSON文件: razavi_triplets_list.json
文件格式与Prompt中定义的输出结构完全一致。


In [11]:
import json

# --- 配置 ---
input_filename = "razavi_triplets_list.json"      # 输入文件名（您之前导出的文件）
output_filename = "razavi_triplets_list_cleaned.json" # 清理后保存的新文件名

# --- 执行清理 ---
try:
    # 1. 读取原始JSON文件
    with open(input_filename, 'r', encoding='utf-8') as f:
        data = json.load(f)
    
    print(f"成功读取文件 '{input_filename}'，包含 {len(data)} 条记录。")

    # 2. 遍历每一条记录并移除不需要的键
    cleaned_count = 0
    key_to_remove = "voltage_allocation"

    for triplet in data:
        if key_to_remove in triplet:
            del triplet[key_to_remove]
            cleaned_count += 1
            
    print(f"清理完成。在 {cleaned_count} 条记录中找到了并移除了 '{key_to_remove}' 字段。")

    # 3. 将清理后的数据写入新文件
    with open(output_filename, 'w', encoding='utf-8') as f:
        json.dump(data, f, indent=4, ensure_ascii=False)
        
    print(f"✅ 清理后的数据已成功保存到: {output_filename}")
    print("您现在可以使用这个干净的文件进行后续操作。")

except FileNotFoundError:
    print(f"❌ 错误: 找不到输入文件 '{input_filename}'。请确认文件名是否正确，或者您是否已成功导出该文件。")
except Exception as e:
    print(f"❌ 处理文件时发生未知错误: {e}")

成功读取文件 'razavi_triplets_list.json'，包含 3375 条记录。
清理完成。在 3375 条记录中找到了并移除了 'voltage_allocation' 字段。
✅ 清理后的数据已成功保存到: razavi_triplets_list_cleaned.json
您现在可以使用这个干净的文件进行后续操作。


In [12]:
import networkx as nx
import random
import json

class PathFinder:
    """
    一个封装了从知识图谱中查找多种类型路径的逻辑的类。
    """
    def __init__(self, G: nx.Graph):
        if not isinstance(G, nx.Graph) or G.number_of_nodes() == 0:
            raise ValueError("必须提供一个有效的、非空的NetworkX图对象。")
        self.G = G
        # 预先筛选“关键节点”，用于生成复杂路径
        self.key_nodes = [n for n, d in self.G.nodes(data=True) 
                          if d.get('type') in ['Circuit Topology', 'Design Technique']]

    def _format_path(self, path_nodes):
        # 辅助函数，用于将路径格式化为标准字典列表
        formatted_path = []
        for i in range(len(path_nodes) - 1):
            u, v = path_nodes[i], path_nodes[i+1]
            edge_data = self.G.get_edge_data(u, v)
            if i == 0:
                formatted_path.append({"node": u, "type": self.G.nodes[u].get('type', 'Unknown')})
            formatted_path.append({"relation": edge_data.get('label', 'Unknown')})
            formatted_path.append({"node": v, "type": self.G.nodes[v].get('type', 'Unknown')})
        return formatted_path

    def find_causal_path(self):
        """查找因果解释型路径 ('为什么...')"""
        all_paths = []
        # [电路] -> [参数] -> [现象]
        circuits = [n for n, d in self.G.nodes(data=True) if d.get('type') == 'Circuit Topology']
        for circuit in circuits:
            for param in self.G.neighbors(circuit):
                if (self.G.nodes[param].get('type') == 'Performance Parameter' and self.G.get_edge_data(circuit, param).get('label') == 'exhibits'):
                    for phenom in self.G.neighbors(param):
                        if (self.G.nodes[phenom].get('type') == 'Physical Phenomenon' and self.G.get_edge_data(param, phenom).get('label') == 'affected_by'):
                            all_paths.append([circuit, param, phenom])
        if not all_paths: return None
        return {"type": "causal", "path": self._format_path(random.choice(all_paths))}

    def find_problem_solving_path(self):
        """查找问题解决型路径 ('如何...')"""
        # [现象] -> [技术]
        all_paths = []
        for u, v, data in self.G.edges(data=True):
            if (data.get('label') == 'mitigated_by' and 
                self.G.nodes[u].get('type') == 'Physical Phenomenon' and 
                self.G.nodes[v].get('type') == 'Design Technique'):
                all_paths.append([u,v])
        if not all_paths: return None
        return {"type": "problem-solving", "path": self._format_path(random.choice(all_paths))}

    def find_comparative_pair(self):
        """查找对比分析型组 ('比较A和B...')"""
        comparisons = {}
        params = [n for n, d in self.G.nodes(data=True) if d.get('type') == 'Performance Parameter']
        for param in params:
            connected_circuits = [n for n in self.G.neighbors(param) if self.G.nodes[n].get('type') == 'Circuit Topology' and self.G.get_edge_data(n, param).get('label') == 'exhibits']
            if len(connected_circuits) >= 2:
                comparisons[param] = connected_circuits
        if not comparisons: return None
        param_to_compare = random.choice(list(comparisons.keys()))
        nodes_to_compare = random.sample(comparisons[param_to_compare], 2)
        return {"type": "comparative", "parameter": param_to_compare, "nodes": nodes_to_compare}
        
    def find_complex_path(self, min_length=4, max_length=6, retries=20):
        """查找连接两个关键节点的复杂路径"""
        if len(self.key_nodes) < 2: return None
        for _ in range(retries):
            start_node, end_node = random.sample(self.key_nodes, 2)
            try:
                paths = [p for p in nx.all_simple_paths(self.G, start_node, end_node, cutoff=max_length) if len(p) >= min_length]
                if paths:
                    return {"type": "complex-synthesis", "path": self._format_path(random.choice(paths))}
            except nx.NetworkXNoPath:
                continue
        return None


In [13]:
from tqdm import tqdm # 用于显示进度条

def generate_kg_paths_batch(G, num_paths: int):
    """
    从知识图谱G中批量生成指定数量的、多样化的路径。
    
    :param G: NetworkX图对象。
    :param num_paths: 需要生成的路径总数。
    :return: 一个包含路径信息字典的列表。
    """
    print(f"开始批量生成 {num_paths} 条知识图谱路径...")
    finder = PathFinder(G)
    
    # 定义可供选择的路径类型和对应的查找函数
    # 我们给复杂路径更高的权重，以生成更多有难度的问题
    path_finders = {
        "causal": finder.find_causal_path,
        "problem-solving": finder.find_problem_solving_path,
        "comparative": finder.find_comparative_pair,
        "complex-synthesis": finder.find_complex_path,
    }
    path_weights = [0.15, 0.15, 0.15, 0.55] # 权重加起来为1
    path_types = list(path_finders.keys())

    generated_paths = []
    # 使用tqdm显示进度
    for _ in tqdm(range(num_paths), desc="生成路径中"):
        # 根据权重随机选择一种路径类型
        chosen_type = random.choices(path_types, weights=path_weights, k=1)[0]
        
        # 调用对应的查找函数
        path = path_finders[chosen_type]()
        
        if path:
            generated_paths.append(path)
            
    print(f"路径生成完成！成功生成 {len(generated_paths)} / {num_paths} 条有效路径。")
    return generated_paths

# --- 使用示例 ---
# 假设G已经被加载
all_paths = generate_kg_paths_batch(G, 1000)

开始批量生成 1000 条知识图谱路径...


生成路径中: 100%|██████████| 1000/1000 [20:13<00:00,  1.21s/it] 

路径生成完成！成功生成 1000 / 1000 条有效路径。

生成路径样本预览 (前5条):
{
  "type": "comparative",
  "parameter": "Voltage Swing",
  "nodes": [
    "Common-Source Stage",
    "Amplifier Stage"
  ]
}
{
  "type": "complex-synthesis",
  "path": [
    {
      "node": "Current Source Biasing",
      "type": "Design Technique"
    },
    {
      "relation": "mitigated_by"
    },
    {
      "node": "Input Common-Mode Level Variation",
      "type": "Physical Phenomenon"
    },
    {
      "relation": "affected_by"
    },
    {
      "node": "Transconductance Variation",
      "type": "Physical Phenomenon"
    },
    {
      "relation": "affected_by"
    },
    {
      "node": "Voltage Gain (Av)",
      "type": "Performance Parameter"
    },
    {
      "relation": "mitigated_by"
    },
    {
      "node": "Negative Feedback",
      "type": "Fundamental Concept"
    },
    {
      "relation": "is_based_on"
    },
    {
      "node": "Frequency Compensation",
      "type": "Design Technique"
    },
    {
      "relat




In [17]:
print(f"\n生成路径样本预览 (前5条):")
for p in all_paths[2:3]:
  print(json.dumps(p, indent=2, ensure_ascii=False))


生成路径样本预览 (前5条):
{
  "type": "complex-synthesis",
  "path": [
    {
      "node": "Shared Junction Layout",
      "type": "Design Technique"
    },
    {
      "relation": "mitigated_by"
    },
    {
      "node": "Drain Capacitance",
      "type": "Performance Parameter"
    },
    {
      "relation": "exhibits"
    },
    {
      "node": "Cascode Circuit",
      "type": "Circuit Topology"
    },
    {
      "relation": "is_composed_of"
    },
    {
      "node": "Multifinger Transistor",
      "type": "Circuit Topology"
    },
    {
      "relation": "is_composed_of"
    },
    {
      "node": "Mos Transistor",
      "type": "Component"
    },
    {
      "relation": "is_composed_of"
    },
    {
      "node": "Source Follower",
      "type": "Circuit Topology"
    },
    {
      "relation": "is_composed_of"
    },
    {
      "node": "Source Follower Compensation",
      "type": "Design Technique"
    }
  ]
}


In [18]:
import json

# 假设 all_paths 变量已经由您之前的代码块（generate_kg_paths_batch）成功生成并填充
# all_paths = generate_kg_paths_batch(G, 1000) # 这行代码在您的上一个代码块中

# 定义要保存的文件名
output_filename = "generated_kg_paths.json"

# 检查是否成功生成了路径
if 'all_paths' in locals() and all_paths:
    print(f"\n准备将 {len(all_paths)} 条路径导出到JSON文件...")
    try:
        # 使用 'w' 模式打开文件进行写入
        with open(output_filename, 'w', encoding='utf-8') as f:
            # json.dump 是将Python对象写入JSON文件的标准方法
            # indent=2 参数会让JSON文件格式化，带有缩进，更易于阅读
            # ensure_ascii=False 参数确保中文字符能被正确写入，而不是被转义成ASCII码
            json.dump(all_paths, f, indent=2, ensure_ascii=False)
        
        print(f"✅ 成功！所有路径已保存到文件: '{output_filename}'")
        print("您现在可以进行下一步，搭建Agent团队来处理这个文件中的路径。")

    except Exception as e:
        print(f"❌ 导出到JSON文件时发生错误: {e}")
else:
    print("⚠️ 'all_paths' 变量不存在或为空，没有可导出的路径。请先确保上一步代码已成功运行。")



准备将 1000 条路径导出到JSON文件...
✅ 成功！所有路径已保存到文件: 'generated_kg_paths.json'
您现在可以进行下一步，搭建Agent团队来处理这个文件中的路径。


In [59]:
import os
import json
import random
import networkx as nx
import concurrent.futures
import re
import tiktoken
from networkx.readwrite import json_graph
from dotenv import load_dotenv
from tqdm import tqdm
from crewai import Agent, Task, Crew, Process
from langchain_openai import ChatOpenAI
from crewai.tools import tool
from typing import Any, List, Optional, Type
from pydantic import BaseModel, Field

# --- 先检查路径文件结构 ---
def inspect_path_structure(paths_file: str, num_samples: int = 3):
    """检查路径文件的数据结构"""
    try:
        with open(paths_file, 'r', encoding='utf-8') as f:
            all_paths = json.load(f)
      
        print(f"=== 路径文件结构检查 ===")
        print(f"总路径数: {len(all_paths)}")
        print(f"前 {num_samples} 个路径的结构:")
      
        for i, path in enumerate(all_paths[:num_samples]):
            print(f"\n路径 {i+1}:")
            print(f"  键: {list(path.keys())}")
            if 'path' in path:
                print(f"  路径长度: {len(path['path'])}")
                print(f"  路径前2个节点: {path['path'][:2]}")
                if path['path']:
                    print(f"  第一个节点类型: {type(path['path'][0])}")
                    if isinstance(path['path'][0], dict):
                        print(f"  第一个节点键: {list(path['path'][0].keys())}")
            print(f"  完整示例: {json.dumps(path, indent=2, ensure_ascii=False)[:300]}...")
      
        return all_paths[0] if all_paths else None
      
    except Exception as e:
        print(f"检查路径文件时出错: {e}")
        return None

# 先检查文件结构
sample_path = inspect_path_structure("generated_kg_paths.json")


=== 路径文件结构检查 ===
总路径数: 1000
前 3 个路径的结构:

路径 1:
  键: ['type', 'parameter', 'nodes']
  完整示例: {
  "type": "comparative",
  "parameter": "Voltage Swing",
  "nodes": [
    "Common-Source Stage",
    "Amplifier Stage"
  ]
}...

路径 2:
  键: ['type', 'path']
  路径长度: 13
  路径前2个节点: [{'node': 'Current Source Biasing', 'type': 'Design Technique'}, {'relation': 'mitigated_by'}]
  第一个节点类型: <class 'dict'>
  第一个节点键: ['node', 'type']
  完整示例: {
  "type": "complex-synthesis",
  "path": [
    {
      "node": "Current Source Biasing",
      "type": "Design Technique"
    },
    {
      "relation": "mitigated_by"
    },
    {
      "node": "Input Common-Mode Level Variation",
      "type": "Physical Phenomenon"
    },
    {
      "relation":...

路径 3:
  键: ['type', 'path']
  路径长度: 13
  路径前2个节点: [{'node': 'Shared Junction Layout', 'type': 'Design Technique'}, {'relation': 'mitigated_by'}]
  第一个节点类型: <class 'dict'>
  第一个节点键: ['node', 'type']
  完整示例: {
  "type": "complex-synthesis",
  "path": [
    {
      "nod

In [56]:
def process_single_path(path_object: dict) -> dict | None:
    """
    修复版本：处理不同格式的路径数据结构和Markdown JSON输出
    """
    try:
        # === 提取路径描述的函数保持不变 ===
        def extract_path_description(path_obj):
            """从路径对象中提取可读的路径描述"""
            if 'path' not in path_obj:
                return "未知路径"
          
            path_nodes = path_obj['path']
            node_names = []
          
            for node in path_nodes:
                if isinstance(node, str):
                    node_names.append(node)
                elif isinstance(node, dict):
                    if 'name' in node:
                        node_names.append(node['name'])
                    elif 'label' in node:
                        node_names.append(node['label'])
                    elif 'id' in node:
                        node_names.append(str(node['id']))
                    else:
                        for key, value in node.items():
                            if isinstance(value, str):
                                node_names.append(f"{key}:{value}")
                                break
                        else:
                            node_names.append(str(node))
                else:
                    node_names.append(str(node))
          
            return " -> ".join(node_names)
      
        # 提取路径描述
        path_description = extract_path_description(path_object)
        path_type = path_object.get("type", "unknown")
      
        print(f"处理路径: {path_description[:100]}... (类型: {path_type})")
      
        # 任务定义保持不变
        task_research = Task(
            description=f"""
            Your task is to conduct in-depth research on a specific reasoning path derived from an analog circuit design knowledge graph.
          
            Reasoning Path to Research:
            Path Description: {path_description}
            Path Type: {path_type}
          
            For each concept in this path, use your search tool to find detailed definitions, principles, and quantitative details from the source textbook.
            Focus on understanding how these concepts relate to each other in the context of analog circuit design.
          
            Synthesize all this information into a comprehensive research report that explains:
            1. Each concept individually
            2. How the concepts connect and influence each other
            3. Practical applications and design considerations
            4. Any relevant equations, parameters, or design guidelines
            """,
            expected_output="A detailed and comprehensive research report that combines the structured path with the rich, contextual information retrieved for each concept.",
            agent=research_analyst
        )

        task_create_cot = Task(
            description=f"""
            Based on the provided research report about the path: {path_description}
          
            Create a complete, high-quality Chain-of-Thought (CoT) fine-tuning sample.
          
            Your output must be a valid JSON object with exactly these two keys:
            - "question": A challenging question that tests understanding of the concept relationships in this path
            - "cot_answer": A string containing <answer>final answer</answer>
            Requirements:
            1. The question should be specific and challenging
            2. The section should show expert-level reasoning
            3. The section should be concise and direct
            4. Do not mention external sources like "knowledge graph" or "textbook"
            5. Make the reasoning appear as pure expert thought

            Return ONLY a valid JSON object, no additional formatting or markdown.
            """,
            expected_output="A JSON object with 'question' and 'cot_answer' keys",
            agent=cot_synthesizer,
            context=[task_research]
        )
        
        # 组建并运行Crew
        cot_crew = Crew(
            agents=[research_analyst, cot_synthesizer],
            tasks=[task_research, task_create_cot],
            process=Process.sequential,
            verbose=0
        )

        result = cot_crew.kickoff()

        # === 修复：改进结果解析，处理Markdown代码块 ===
        def parse_crew_result(raw_result):
            """解析Crew的输出结果，处理各种格式"""
            if isinstance(raw_result, dict):
                return raw_result

            if isinstance(raw_result, str):
                # 移除Markdown代码块标记
                cleaned_result = raw_result.strip()
                
                # 处理json ... 格式
                if cleaned_result.startswith('```json'):
                    # 找到json和之间的内容
                    start_marker = 'json'
                    end_marker = '```'
                    
                    start_idx = cleaned_result.find(start_marker)
                    if start_idx != -1:
                        start_idx += len(start_marker)
                        end_idx = cleaned_result.find(end_marker, start_idx)
                        if end_idx != -1:
                            json_content = cleaned_result[start_idx:end_idx].strip()
                            try:
                                parsed = json.loads(json_content)
                                if 'question' in parsed and 'cot_answer' in parsed:
                                    return parsed
                            except json.JSONDecodeError as e:
                                print(f"JSON解析错误: {e}")
                                print(f"尝试解析的内容: {json_content[:200]}...")
                
                # 处理```格式（没有json标记）
                elif cleaned_result.startswith('```'):
                    lines = cleaned_result.split('\n')
                    if len(lines) > 2:
                        # 移除第一行的和最后一行的
                        json_content = '\n'.join(lines[1:-1]).strip()
                        try:
                            parsed = json.loads(json_content)
                            if 'question' in parsed and 'cot_answer' in parsed:
                                return parsed
                        except json.JSONDecodeError as e:
                            print(f"JSON解析错误: {e}")
                
                # 尝试直接解析JSON
                try:
                    parsed = json.loads(cleaned_result)
                    if 'question' in parsed and 'cot_answer' in parsed:
                        return parsed
                except json.JSONDecodeError:
                    pass
                
                # 使用正则表达式提取JSON对象
                json_patterns = [
                    r'{[^{}]*"question"[^{}]*"cot_answer"[^{}]*}',
                    r'{.*?"question".*?"cot_answer".*?}',
                    r'{.*}'
                ]
                
                for pattern in json_patterns:
                    matches = re.findall(pattern, cleaned_result, re.DOTALL)
                    for match in matches:
                        try:
                            parsed = json.loads(match)
                            if 'question' in parsed and 'cot_answer' in parsed:
                                return parsed
                        except json.JSONDecodeError:
                            continue
                
                return None

        parsed_result = parse_crew_result(result)

        if not parsed_result:
            print(f"❌ 无法解析有效结果")
            print(f"原始输出类型: {type(result)}")
            print(f"原始输出内容: {str(result)[:500]}...")
            
            # 尝试保存原始输出用于调试
            debug_filename = f"debug_output_{path_type}_{len(final_dataset) if 'final_dataset' in globals() else 'unknown'}.txt"
            try:
                with open(debug_filename, 'w', encoding='utf-8') as f:
                    f.write(f"路径: {path_description}\n")
                    f.write(f"类型: {path_type}\n")
                    f.write(f"输出类型: {type(result)}\n")
                    f.write(f"原始输出:\n{str(result)}")
                print(f"🐛 调试信息已保存到: {debug_filename}")
            except:
                pass
            
            return None

        # 验证必要字段
        if 'question' not in parsed_result or 'cot_answer' not in parsed_result:
            print(f"❌ 解析结果缺少必要字段")
            print(f"解析结果键: {list(parsed_result.keys())}")
            return None

        # 计算Token并构建最终输出
        question = parsed_result['question']
        cot_answer_str = parsed_result['cot_answer']

        # 解析think和answer部分
        think_match = re.search(r'(.*?)<answer>', cot_answer_str, re.DOTALL)
        answer_match = re.search(r'<answer>(.*?)</answer>', cot_answer_str, re.DOTALL)
      
        think_text = think_match.group(1).strip() if think_match else ""
        answer_text = answer_match.group(1).strip() if answer_match else ""
      
        # 计算Token
        try:
            question_tokens = len(tokenizer.encode(question))
            think_tokens = len(tokenizer.encode(think_text))
            answer_tokens = len(tokenizer.encode(answer_text))
            total_tokens = question_tokens + think_tokens + answer_tokens
        except:
            # 如果tokenizer失败，使用简单计算
            question_tokens = len(question.split())
            think_tokens = len(think_text.split())
            answer_tokens = len(answer_text.split())
            total_tokens = question_tokens + think_tokens + answer_tokens

        final_output = {
            "question": question,
            "cot_answer": cot_answer_str,
            "source_kg_path": path_object,
            "question_type": path_type,
            "path_description": path_description,
            "token_summary": {
                "question_tokens": question_tokens,
                "think_tokens": think_tokens,
                "answer_tokens": answer_tokens,
                "total_tokens": total_tokens
            }
        }
      
        print(f"✅ 成功生成样本，总Token数: {total_tokens}")
        return final_output

    except Exception as e:
        print(f"❌ 处理路径时发生严重错误: {e}")
        import traceback
        print(f"详细错误信息: {traceback.format_exc()}")
        return None

# 同时修改Agent的instruction，让它不要使用markdown格式
cot_synthesizer = Agent(
    role='Chain-of-Thought Question and Answer Synthesizer',
    goal='Create high-quality question-answer pairs with detailed chain-of-thought reasoning based on research reports. Generate pure, self-contained reasoning that demonstrates expert-level understanding.',
    backstory=(
        "You are an expert analog circuit designer and educator who excels at creating challenging questions "
        "and detailed step-by-step solutions. You can take technical research and transform it into "
        "educational content that tests deep understanding. Your reasoning appears as pure expert thought, "
        "never referencing external sources explicitly. You always return clean JSON without any markdown formatting."
    ),
    llm=llm,
    verbose=True
)

print("✅ 修复了JSON解析问题，现在应该能正确处理Markdown格式的输出了")

✅ 修复了JSON解析问题，现在应该能正确处理Markdown格式的输出了


In [57]:
# 生成完整数据集
print("🚀 开始大规模生成...")
NUMBER_OF_QUESTIONS = 2  # 或者你想要的数量

run_concurrent_generation(
    paths_file="generated_kg_paths.json",
    num_questions_to_generate=NUMBER_OF_QUESTIONS,
    max_workers=8  # 可以适当增加并发数
)

🚀 开始大规模生成...
✅ 成功从 'generated_kg_paths.json' 加载 1000 条路径。
ℹ️ 将处理前 2 条路径。

--- 开始使用 8 个并发线程生成 2 个CoT样本 ---
处理路径: 未知路径... (类型: comparative)
处理路径: node:Current Source Biasing -> relation:mitigated_by -> node:Input Common-Mode Level Variation -> re... (类型: complex-synthesis)


生成CoT数据:   0%|          | 0/2 [00:00<?, ?it/s]

生成CoT数据:  50%|█████     | 1/2 [00:28<00:28, 28.87s/it]

❌ 无法解析有效结果
原始输出类型: <class 'crewai.crews.crew_output.CrewOutput'>
原始输出内容: {"question":"How does the implementation of negative feedback in an amplifier mitigate the effects of transconductance variation on voltage gain, and how does dominant pole shifting contribute to ensuring stability in this context?","cot_answer":"<answer>Negative feedback is implemented in amplifiers to stabilize the voltage gain by reducing the sensitivity of the gain to variations in transconductance (gm). Without feedback, changes in gm due to input common-mode level variation or operating po...
🐛 调试信息已保存到: debug_output_complex-synthesis_unknown.txt


生成CoT数据: 100%|██████████| 2/2 [00:30<00:00, 15.32s/it]

❌ 无法解析有效结果
原始输出类型: <class 'crewai.crews.crew_output.CrewOutput'>
原始输出内容: When an op-amp in a closed-loop configuration enters saturation, it behaves similarly to a comparator in its nonlinear switching mode, but this represents a failure of the op-amp's intended linear operation. Comparators are designed for such nonlinear behavior. To prevent saturation in op-amps, input signal clipping, careful feedback network design, appropriate power supply selection, and input limiting components can be used to maintain linear operation and avoid this unintended deviation."
}...
🐛 调试信息已保存到: debug_output_comparative_unknown.txt

--- 处理完成 ---
✅ 成功: 0 个样本
❌ 失败: 2 个样本
⚠️ 没有生成任何有效样本。





In [1]:
# create_index.py

import os
from dotenv import load_dotenv

# LlamaIndex 核心组件
from llama_index.core import (
    Settings,
    SimpleDirectoryReader,
    VectorStoreIndex,
    StorageContext,
)
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.llms.openai import OpenAI

print("--- 开始创建并持久化RAG索引 ---")

# --- 1. 加载环境变量 ---
print("1. 正在加载环境变量...")
load_dotenv()
# 注意: 请确保您的.env文件中有API Key和Base URL
# 或者在这里直接指定
api_key = "sk-iuUHs8rYzVCCdNjTX1vV86a3lJe4erAGZpVbIE5CoidZd75J" # 使用您代码段中提供的Key
api_base = "https://www.dmxapi.cn/v1"

# --- 2. 定义存储路径和数据源路径 ---
PERSIST_DIR = "./storage"  # 定义索引将被保存到的文件夹名称
DATA_DIR = "./data/origin_book" # 您的原始文档所在的文件夹

# --- 3. 配置LlamaIndex全局设置 ---
# 即使只是创建索引，也需要配置模型，因为创建嵌入（Embedding）需要调用嵌入模型
print("2. 正在配置LlamaIndex设置...")
Settings.llm = OpenAI(model="gpt-4o", api_key=api_key, api_base=api_base)
Settings.embed_model = OpenAIEmbedding(model="text-embedding-3-large", api_key=api_key, api_base=api_base)
Settings.chunk_size = 512

# --- 4. 检查索引是否已存在 ---
if os.path.exists(PERSIST_DIR):
    print(f"警告: 索引目录 '{PERSIST_DIR}' 已存在。")
    # 您可以选择在这里停止，或者删除旧索引重新创建
    # import shutil
    # print("正在删除旧索引...")
    # shutil.rmtree(PERSIST_DIR)
    # print("旧索引已删除。")
    print("如果您想重新创建索引，请先手动删除 ./storage 文件夹。")
    print("--- 脚本执行完毕 ---")
    exit() # 退出脚本

# --- 5. 加载文档并创建索引 ---
try:
    print(f"3. 正在从 '{DATA_DIR}' 加载文档...")
    # 检查数据目录是否存在
    if not os.path.exists(DATA_DIR) or not os.listdir(DATA_DIR):
        print(f"错误: 数据目录 '{DATA_DIR}' 不存在或为空。请先添加您的 .md 文件。")
        exit()
        
    reader = SimpleDirectoryReader(DATA_DIR)
    documents = reader.load_data()

    if not documents:
        raise ValueError(f"在 '{DATA_DIR}' 目录中没有找到任何可加载的文档。")

    print(f"4. 文档加载完毕，共 {len(documents)} 个片段。开始构建索引... (这可能需要几分钟)")
    # 'from_documents' 会自动处理嵌入计算和索引构建
    index = VectorStoreIndex.from_documents(documents)

    # --- 6. 持久化索引到硬盘 ---
    print(f"5. 索引构建完成，正在将其保存到 '{PERSIST_DIR}'...")
    # 使用 index.storage_context.persist() 将索引的所有组件保存到指定目录
    index.storage_context.persist(persist_dir=PERSIST_DIR)

    print("✅ 索引已成功创建并保存！")
    print("--- 脚本执行完毕 ---")

except Exception as e:
    print(f"❌ 创建索引时发生错误: {e}")

--- 开始创建并持久化RAG索引 ---
1. 正在加载环境变量...
2. 正在配置LlamaIndex设置...
3. 正在从 './data/origin_book' 加载文档...
4. 文档加载完毕，共 1 个片段。开始构建索引... (这可能需要几分钟)
5. 索引构建完成，正在将其保存到 './storage'...
✅ 索引已成功创建并保存！
--- 脚本执行完毕 ---


In [None]:
# main_generator.py (3-Agent-Version with Few-Shot Examples)

import os
import json
import re
import traceback
import concurrent.futures
from typing import Dict

# --- 依赖库导入 ---
from dotenv import load_dotenv
from llama_index.core import Settings, StorageContext, load_index_from_storage
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.llms.openai import OpenAI as LlamaIndex_OpenAI
from crewai import Agent, Task, Crew, Process
from crewai.tools import tool
from langchain_openai import ChatOpenAI
from tqdm import tqdm
from openai import OpenAI as OpenAI_Client

# --- 1. 初始化设置 ---
print("--- 1. 正在加载环境变量 ---")
load_dotenv()
api_key = "sk-XR8bOVJRKbpeuZrB7DpwDmWvpOORfEFL6Zj8xIn5sKQelVXI"
api_base = "https://www.dmxapi.cn/v1"

# --- 2. 创建一个全局的、预配置的API客户端 ---
print("\n--- 2. 正在创建自定义API客户端以包含User-Agent Header ---")
custom_client = OpenAI_Client(
    api_key=api_key,
    base_url=api_base,
    default_headers={
        "Accept": "application/json",
        "Authorization": f"{api_key}",
        "User-Agent": "DMXAPI/1.0.0",
        "Content-Type": "application/json",
    }
)
print("✅ 自定义API客户端创建成功。")


# --- 3. RAG系统设置 (加载模式) ---
print("\n--- 3. 正在初始化LlamaIndex RAG系统 (加载模式) ---")
query_engine = None

def setup_llama_index_rag():
    """通过从硬盘加载预构建的索引来初始化RAG系统。"""
    global query_engine
    PERSIST_DIR = "./storage"
    if not os.path.exists(PERSIST_DIR):
        print(f"❌ 错误: 未找到预构建的索引目录 '{PERSIST_DIR}'。")
        print("➡️ 请先运行 'create_index.py' 脚本来创建索引。")
        return False
    try:
        print(f"✅ 发现已存在的索引，正在从 '{PERSIST_DIR}' 加载...")
        Settings.llm = LlamaIndex_OpenAI(model="gpt-4o-mini", client=custom_client)
        Settings.embed_model = OpenAIEmbedding(model="text-embedding-3-large", client=custom_client)
        storage_context = StorageContext.from_defaults(persist_dir=PERSIST_DIR)
        index = load_index_from_storage(storage_context)
        query_engine = index.as_query_engine(similarity_top_k=3)
        print("✅ LlamaIndex RAG 系统已从本地加载并准备就绪！")
        return True
    except Exception as e:
        print(f"❌ 从硬盘加载索引时发生错误: {e}")
        return False

# --- 4. CrewAI团队设置 ---
print("\n--- 4. 正在定义三智能体团队 ---")

# 4.1 定义工具 (保持不变)
@tool("textbook_context_search")
def search_textbook_context(query: str) -> str:
    """Use this tool to search the analog circuits textbook for detailed context."""
    if query_engine is None: return "Error: RAG query engine is not initialized."
    return str(query_engine.query(query))

# 4.2 实例化LLM
try:
    llm = ChatOpenAI(model="deep", client=custom_client)
    print("✅ CrewAI 使用的 LLM 已通过自定义客户端实例化成功。")
except Exception as e:
    print(f"❌ 实例化 CrewAI LLM 失败: {e}")
    raise

# 4.3 定义三个智能体
question_crafter = Agent(
    role='Multi-hop Question Crafter',
    goal='Based on a given knowledge graph path, create a single, insightful, multi-hop reasoning question that connects the concepts in the path.',
    backstory='You are an expert curriculum designer who excels at creating thought-provoking questions that force students to think beyond simple definitions and connect multiple ideas.',
    llm=llm,
    tools=[search_textbook_context],
    verbose=False
)

cot_constructor = Agent(
    role='Chain-of-Thought Constructor',
    goal='Given a question and supporting materials, construct a detailed, step-by-step reasoning process that logically leads to the answer. Your output MUST be strictly enclosed in <think></think> tags.',
    backstory='You are a meticulous logical reasoner. You externalize your entire thought process, showing how each piece of evidence and each logical step contributes to the final conclusion, like a detective explaining a case.',
    llm=llm,
    tools=[search_textbook_context],
    verbose=False
)

answer_synthesizer = Agent(
    role='Final Answer Synthesizer and JSON Formatter',
    goal='Synthesize a final, comprehensive answer by considering a pre-computed thought process, a direct answer, and RAG context. Then, format the question, thought process, and final answer into a single, clean JSON object.',
    backstory='You are a senior technical editor and data scientist. You have the final say. You review all the evidence (the step-by-step logic, the quick answer, background material) to formulate the most accurate and well-rounded conclusion. You are also a stickler for process and demand that the final output is always in a perfectly structured JSON format.',
    llm=llm,
    tools=[search_textbook_context],
    verbose=False
)
print("✅ 三个智能体已定义完毕。")

# 4.4 定义新的任务 (MODIFIED: 添加了Few-shot示例)
def create_crew_tasks(path_description: str, path_type: str):
    
    # 任务1: 创建问题
    task_create_question = Task(
        description=f"""
        Create a multi-hop reasoning question based on this knowledge graph path:
        PATH: "{path_description}"
        TYPE: "{path_type}"
        Use your search tool to gather context if needed.

        --- HIGH-QUALITY EXAMPLE ---
        - Given PATH: "Operational Amplifier -> Negative Feedback -> Stability -> Phase Margin"
        - Generated Question: "An operational amplifier is configured with a standard negative feedback loop. Explain how the concept of phase margin is critical for ensuring the stability of this circuit. Specifically, if the phase margin drops below 45 degrees, what kind of transient response (e.g., ringing, overshoot) would you expect to see at the output, and why does this happen from a feedback loop perspective?"
        --- END EXAMPLE ---
        """,
        agent=question_crafter,
        expected_output="A single, clear, and challenging question string, following the quality of the example."
    )

    # 任务2: 构造思维链
    task_generate_think = Task(
        description="""
        Based on the question from the previous step, construct a detailed Chain-of-Thought.
        Show the step-by-step reasoning process required to answer the question.
        Your entire output MUST be a single string enclosed in <think></think> tags.

        --- HIGH-QUALITY EXAMPLE ---
        - Given Question: "Explain how phase margin is critical for op-amp stability and what happens if it's below 45 degrees."
        - Generated <think> block:
        <think>
        1. **Initial Analysis:** The question links an op-amp in negative feedback to stability, focusing on phase margin. This is a classic control theory problem. The core idea is that negative feedback can become positive feedback if the phase shift is too large.
        2. **Stability Condition:** A circuit becomes unstable if the loop gain |Aβ| ≥ 1 at the frequency where the phase shift around the loop reaches -180 degrees. This causes oscillation.
        3. **Defining Phase Margin (PM):** Phase Margin is a safety margin. It's measured at the unity-gain frequency (where |Aβ| = 1). PM = 180° + (phase of Aβ). A positive PM means the system is stable.
        4. **Connecting PM to Transient Response:** Phase margin is directly related to the damping of the system. A low phase margin is "underdamped."
           - PM ≈ 45°: Moderately underdamped. The system will have significant overshoot and ringing.
           - PM ≈ 0°: Undamped, the system will oscillate.
        5. **Answering the "Why":** A low PM means the system's poles are close to the imaginary axis in the s-plane. For a step input, this results in a response with decaying oscillations (ringing). The feedback signal arrives back almost in phase with the input, causing constructive interference (overshoot) as the loop "over-corrects" before settling.
        </think>
        --- END EXAMPLE ---
        """,
        agent=cot_constructor,
        context=[task_create_question],
        expected_output="A string containing the reasoning process, enclosed in <think></think> tags, following the quality of the example."
    )

    # 任务3: 综合答案并格式化为JSON
    task_synthesize_and_format = Task(
        description="""
        Your final and most important task is to produce a single JSON object.
        Follow these steps precisely:
        1. Review the question and the <think> block.
        2. Independently, formulate your own direct answer to the question.
        3. Synthesize a final, comprehensive answer by integrating your direct answer, the logic from the <think> block, and any relevant details from the RAG tool. Enclose this final answer in <answer></answer> tags.
        4. Assemble the final JSON object using the original question, the full <think> block, and your new <answer> block.

        Your final output MUST be ONLY the valid JSON object and nothing else.

        --- HIGH-QUALITY EXAMPLE ---
        {
          "question": "An operational amplifier is configured with a standard negative feedback loop. Explain how the concept of phase margin is critical for ensuring the stability of this circuit. Specifically, if the phase margin drops below 45 degrees, what kind of transient response (e.g., ringing, overshoot) would you expect to see at the output, and why does this happen from a feedback loop perspective?",
          "think": "<think>\\n1. **Initial Analysis:** The question links an op-amp in negative feedback to stability, focusing on phase margin... (and so on, the full think block) ...The loop \\\"over-corrects\\\" several times before the error signal finally dampens out.\\n</think>",
          "answer": "<answer>A phase margin below 45 degrees indicates an underdamped system, which will exhibit significant overshoot and ringing in its transient response. This occurs because the low phase margin implies that at the unity-gain frequency, the feedback signal returns to the input with a phase shift dangerously close to -180 degrees. This near-in-phase condition causes constructive interference, leading the amplifier to 'over-correct' its output multiple times before settling. Essentially, the feedback loop is on the verge of becoming a positive feedback oscillator.</answer>"
        }
        --- END EXAMPLE ---
        """,
        agent=answer_synthesizer,
        context=[task_create_question, task_generate_think],
        expected_output="A single, clean JSON object with 'question', 'think', and 'answer' keys, following the format of the example."
    )
    
    return [task_create_question, task_generate_think, task_synthesize_and_format]

print("✅ 任务流程已定义，并已加入Few-shot示例。")


# --- 5. 核心逻辑: 数据处理与生成 ---
print("\n--- 5. 正在定义核心处理逻辑 ---")

def extract_path_description(path_obj: Dict) -> str:
    path_nodes = path_obj.get('path', [])
    if not path_nodes: return "Unknown Path"
    node_names = [str(node.get('name') or node.get('label') or node.get('id', 'unnamed_node')) if isinstance(node, dict) else str(node) for node in path_nodes]
    return " -> ".join(node_names)

def process_single_path(path_object: dict) -> dict | None:
    try:
        path_description = extract_path_description(path_object)
        path_type = path_object.get("type", "unknown")
        print(f"▶️  正在处理路径 ({path_type}): {path_description[:100]}...")
        agents = [question_crafter, cot_constructor, answer_synthesizer]
        tasks = create_crew_tasks(path_description, path_type)
        cot_crew = Crew(agents=agents, tasks=tasks, process=Process.sequential, verbose=0)
        crew_output = cot_crew.kickoff()
        result_str = crew_output.raw
        if not result_str or not result_str.strip():
            print("❌  错误: 最终智能体返回了空输出，跳过此路径。")
            return None
        # 使用正则表达式从可能包含前后文的输出中提取JSON对象
        match = re.search(r'\{.*\}', result_str, re.DOTALL)
        if not match:
            raise ValueError("Could not find a JSON object in the agent's final output.")
        parsed_result = json.loads(match.group(0))
        if 'question' not in parsed_result or 'think' not in parsed_result or 'answer' not in parsed_result:
             raise ValueError("Final JSON is missing required keys: question, think, or answer.")
        print(f"✅  成功! 已为路径生成完整的Q-T-A样本。")
        return parsed_result
    except Exception as e:
        print(f"❌  处理路径时发生错误: {e}")
        print(f"   详细错误信息: {traceback.format_exc()}")
        return None

# --- 6. 并发执行和主入口 ---
def run_concurrent_generation(paths_file: str, num_questions: int, max_workers: int):
    print(f"\n--- 6. 开始并发生成 ---")
    try:
        with open(paths_file, 'r', encoding='utf-8') as f: all_paths = json.load(f)
    except Exception as e:
        print(f"❌ 无法读取路径文件 '{paths_file}': {e}")
        return
    paths_to_process = all_paths[:num_questions]
    final_dataset = []
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_path = {executor.submit(process_single_path, path): path for path in paths_to_process}
        progress = tqdm(concurrent.futures.as_completed(future_to_path), total=len(paths_to_process), desc="正在生成Q-T-A样本")
        for future in progress:
            result = future.result()
            if result:
                final_dataset.append(result)
    print("\n--- 生成完成 ---")
    print(f"✅ 成功: {len(final_dataset)} 条样本")
    print(f"❌ 失败:  {len(paths_to_process) - len(final_dataset)} 条样本")
    if final_dataset:
        output_filename = "generated_qta_dataset.json"
        with open(output_filename, 'w', encoding='utf-8') as f: json.dump(final_dataset, f, indent=2, ensure_ascii=False)
        print(f"💾 数据集已保存至 '{output_filename}'")
    else:
        print("⚠️ 未生成任何有效样本。")

# --- 7. 脚本执行入口 ---
if __name__ == "__main__":
    rag_ready = setup_llama_index_rag()
    if rag_ready:
        NUMBER_OF_QUESTIONS_TO_GENERATE = 100
        MAX_CONCURRENT_WORKERS = 1
        run_concurrent_generation(
            paths_file="generated_kg_paths.json",
            num_questions=NUMBER_OF_QUESTIONS_TO_GENERATE,
            max_workers=MAX_CONCURRENT_WORKERS
        )
    else:
        print("\n由于RAG系统初始化失败，程序已停止执行。")

--- 1. 正在加载环境变量 ---

--- 2. 正在创建自定义API客户端以包含User-Agent Header ---
✅ 自定义API客户端创建成功。

--- 3. 正在初始化LlamaIndex RAG系统 (加载模式) ---

--- 4. 正在定义三智能体团队 ---
✅ CrewAI 使用的 LLM 已通过自定义客户端实例化成功。
✅ 三个智能体已定义完毕。
✅ 任务流程已定义，并已加入Few-shot示例。

--- 5. 正在定义核心处理逻辑 ---
✅ 发现已存在的索引，正在从 './storage' 加载...
Loading llama_index.core.storage.kvstore.simple_kvstore from ./storage\docstore.json.
Loading llama_index.core.storage.kvstore.simple_kvstore from ./storage\index_store.json.
✅ LlamaIndex RAG 系统已从本地加载并准备就绪！

--- 6. 开始并发生成 ---
▶️  正在处理路径 (comparative): Unknown Path...


正在生成Q-T-A样本:   0%|          | 0/100 [00:00<?, ?it/s]