In [None]:
import os
import json
from typing import List, Dict, Any
from dotenv import load_dotenv

from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import PyPDFLoader, TextLoader
from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain.schema import Document
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate

from langgraph.graph import StateGraph, END
from typing_extensions import TypedDict


Note: you may need to restart the kernel to use updated packages.


In [None]:
load_dotenv()
api_key = os.getenv("OPENAI_API_KEY")

if not api_key:
    print("API KEY?")

try:
    llm = ChatOpenAI(
        model="gpt-3.5-turbo",
        temperature=0.1,
        api_key=api_key
    )
    
    embeddings = OpenAIEmbeddings(api_key=api_key)
except Exception as e:
    print(e)

class RAGState(TypedDict):
    documents: List[Document]
    question: str
    retrieved_docs: List[Document]
    answer: str
    evaluation_score: float
    optimization_needed: bool

In [None]:
# 3. 文档预处理模块
def process_documents(file_paths: List[str] = None, sample_text: str = None) -> List[Document]:
    print("开始文档预处理...")
    documents = []
    
    
    if file_paths:
        # 处理文件列表
        for file_path in file_paths:
            try:
                if file_path.endswith('.pdf'):
                    loader = PyPDFLoader(file_path)
                else:
                    loader = TextLoader(file_path, encoding='utf-8')
                docs = loader.load()
                documents.extend(docs)
                print(f"处理文件: {file_path}")
            except Exception as e:
                print(f"无法处理文件 {file_path}: {e}")
    
    # 文档分割
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=500,
        chunk_overlap=50,
        length_function=len,
    )
    
    split_documents = text_splitter.split_documents(documents)
    print(f"文档分割完成，共 {len(split_documents)} 个文档块")
    
    return split_documents

# 测试文档处理
test_documents = process_documents(file_paths=['AS1668-1-2015.pdf'])
print(f"\n文档处理结果：")
print(f"文档块数量: {len(test_documents)}")
print(f"第一个文档块预览: {test_documents[0].page_content[:100]}...")


开始文档预处理...
处理文件: AS1668-1-2015.pdf
文档分割完成，共 884 个文档块

文档处理结果：
文档块数量: 884
第一个文档块预览: )> 
en ... 
O> 
O> 
00 
... 
"' 0 ... 
u, 
AS 1668.1:2015 
(Incorporating Amendment No . 1) 
Austral...


In [5]:
print(f"第一个文档块预览: {test_documents[0].page_content[:1000]}...")

第一个文档块预览: )> 
en ... 
O> 
O> 
00 
... 
"' 0 ... 
u, 
AS 1668.1:2015 
(Incorporating Amendment No . 1) 
Australian 
STANDARD 
The use of ventilation and air 
conditioning in buildings 
Part 1: Fire and smoke control in 
buildings...


In [None]:
def create_vector_store(documents: List[Document]) -> FAISS:
    print("创建向量存储...")
    try:
        # 使用FAISS创建向量存储
        vectorstore = FAISS.from_documents(
            documents=documents,
            embedding=embeddings
        )
        print(f"向量存储创建成功，包含 {len(documents)} 个文档")
        return vectorstore
    except Exception as e:
        print(f"向量存储创建失败: {e}")
        return None

def save_vector_store(vectorstore: FAISS, path: str = "faiss_index"):
    try:
        vectorstore.save_local(path)
        print(f"向量存储已保存到: {path}")
    except Exception as e:
        print(f"保存失败: {e}")

def load_vector_store(path: str = "faiss_index") -> FAISS:
    try:
        vectorstore = FAISS.load_local(path, embeddings, allow_dangerous_deserialization=True)
        print(f"向量存储已从 {path} 加载")
        return vectorstore
    except Exception as e:
        print(f"加载失败: {e}")
        return None


print("开始创建向量存储...")
vectorstore = create_vector_store(test_documents)




开始创建向量存储...
创建向量存储...
向量存储创建成功，包含 884 个文档

🔍 检索测试结果：
查询: 我有十个充电桩 我需要配备多大的负载电流
检索到 2 个相关文档

文档 1: @ 
en 
iii :::, 
C. 
Ill 
a. 
Cl) 
)> 
C: 
!!l. 
el iii" 
i 
(I) 
iii :::, 
C. 
Ill 
a. 
Cl) 
0 
<O ...

文档 2: i 
!!?. 
ll) 
:, 
a. 
ll) 
a. 
(/) 
0 
co 
iu 
C: 
@ 
en 
iii :, 
a. 
ll) 
a. 
(/) 
)> 
C: 
~ 
~ iii...
向量存储已保存到: faiss_index


In [10]:
if vectorstore:
    # 测试检索功能
    test_query = "我有十个充电桩 我需要配备多大的负载电流"
    retrieved_docs = vectorstore.similarity_search(test_query, k=2)
    
    print(f"\n🔍 检索测试结果：")
    print(f"查询: {test_query}")
    print(f"检索到 {len(retrieved_docs)} 个相关文档")
    for i, doc in enumerate(retrieved_docs):
        print(f"\n文档 {i+1}: {doc.page_content[:2000]}...")
    
    # 保存向量存储
    save_vector_store(vectorstore)


🔍 检索测试结果：
查询: 我有十个充电桩 我需要配备多大的负载电流
检索到 2 个相关文档

文档 1: @ 
en 
iii :::, 
C. 
Ill 
a. 
Cl) 
)> 
C: 
!!l. 
el iii" 
i 
(I) 
iii :::, 
C. 
Ill 
a. 
Cl) 
0 
<O 
i» 
C: 
Copyrighted material licensed SAi Global UNSW - LIBRARY. Accessed on 2019-11-03. 
Reproduction,distribution,storage or use on a network is prohibited. 
TABLE 4.1 (continued) 
Item Power wiring Automatic control* Override control* 
13 Car park fans No special requirements. Shutdown shall be in Where a FDCIE is provided, 
installed as accordance with switches shall be located on...

文档 2: i 
!!?. 
ll) 
:, 
a. 
ll) 
a. 
(/) 
0 
co 
iu 
C: 
@ 
en 
iii :, 
a. 
ll) 
a. 
(/) 
)> 
C: 
~ 
~ iii" 
Al I 
Copyrighted material licensed SAi Global UNSW - LIBRARY. Accessed on 2019-11-03. 
Reproduction,distribution,storage or use on a network is prohibited. 
TABLE 4.1 (continued) 
Item Power wiring Automatic control* Override control* 
11 Car park exhaust Where a car park exhaust system Initiation and operation in Where a FDCIE is provided,

In [None]:
# 5.
import re
import json

def extract_answer_anchors(chunk_text: str) -> Dict[str, Any]:
    """
    步骤1: 提取答案锚点
    从技术条款中提取核心要求作为答案锚点
    """
    print("步骤1: 提取答案锚点...")
    
    anchor_prompt = PromptTemplate(
        input_variables=["chunk_text"],
        template="""[角色]
你作为专业技术文档分析专家，需要从技术条款中提取核心要求作为答案锚点

[任务]
阅读以下条款文本，提取关键事实要素：
1. 识别主要概念和定义
2. 列出所有条件项、要求或规则
3. 提取关键数据、标准或参数
4. 保留原始表述的精确措辞

[条款文本]
{chunk_text}

[输出要求]
JSON格式，包含以下字段：
- "main_concept": "主要概念"
- "key_points": ["要点1", "要点2", ...]
- "conditions": ["条件1", "条件2", ...] (如果有)
- "parameters": ["参数1", "参数2", ...] (如果有)

只返回JSON，不要其他内容。"""
    )
    
    try:
        formatted_prompt = anchor_prompt.format(chunk_text=chunk_text)
        response = llm.invoke(formatted_prompt)
        
        # 提取JSON内容
        json_match = re.search(r'\{.*\}', response.content, re.DOTALL)
        if json_match:
            anchors = json.loads(json_match.group())
            print("答案锚点提取成功")
            return anchors
        else:
            print("未找到有效JSON，使用默认锚点")
            return {
                "main_concept": "未识别",
                "key_points": [chunk_text[:100] + "..."],
                "conditions": [],
                "parameters": []
            }
    except Exception as e:
        print(f"答案锚点提取失败: {e}")
        return {
            "main_concept": "提取失败",
            "key_points": [chunk_text[:100] + "..."],
            "conditions": [],
            "parameters": []
        }

def generate_standard_answer(chunk_text: str, anchors: Dict[str, Any]) -> str:
    """
    步骤2: 生成标准答案
    基于原文和答案锚点生成完整标准答案
    """
    print("步骤2: 生成标准答案...")
    
    answer_prompt = PromptTemplate(
        input_variables=["chunk_text", "anchors"],
        template="""[角色]
你作为专业技术专家，需要基于原文和答案锚点生成完整标准答案

[任务]
使用以下信息创建专业答案：
1. 基于主要概念构建答案框架
2. 完整包含所有关键要点
3. 按逻辑顺序组织条件和参数
4. 保持原始文本的精确表述
5. 答案长度控制在80-120词

[原文文本]
{chunk_text}

[答案锚点]
{anchors}

[答案生成约束]
- 禁止添加解释性内容
- 禁止修改原始条款表述
- 必须包含精确信息
- 条件项必须使用数字序号列表
- 保持专业术语的准确性

[输出要求]
纯文本格式，结构清晰的标准答案"""
    )
    
    try:
        formatted_prompt = answer_prompt.format(
            chunk_text=chunk_text,
            anchors=json.dumps(anchors, ensure_ascii=False, indent=2)
        )
        response = llm.invoke(formatted_prompt)
        answer = response.content.strip()
        print("标准答案生成成功")
        return answer
    except Exception as e:
        print(f"标准答案生成失败: {e}")
        # return f"基于提供的内容，主要概念是{anchors.get('main_concept', '未知')}。"
        return None

def generate_reverse_question(standard_answer: str, anchors: Dict[str, Any]) -> str:
    print("步骤3: 逆向生成问题...")
    
    question_prompt = PromptTemplate(
        input_variables=["standard_answer", "anchors"],
        template="""[角色]
你作为专业考试出题专家，需要基于标准答案创建测试问题

[任务]
生成1个专业问题，要求：
1. 明确指向答案中的关键要素
2. 使用专业术语
3. 问题类型选择：概念定义型、条件询问型、参数查询型、场景应用型
4. 确保问题有明确的答案指向性
5. 问题要符合原文的逻辑
6. 使用英语回答

[标准答案]
{standard_answer}

[答案锚点]
{anchors}

[输出要求]
纯文本格式，单句问题，不超过30词"""
    )
    
    try:
        formatted_prompt = question_prompt.format(
            standard_answer=standard_answer,
            anchors=json.dumps(anchors, ensure_ascii=False, indent=2)
        )
        response = llm.invoke(formatted_prompt)
        question = response.content.strip()
        
        # 清理问题格式
        question = re.sub(r'^问题[:：]\s*', '', question)
        question = question.strip('\"\'')
        
        print("问题生成成功")
        return question
    except Exception as e:
        print(f"问题生成失败: {e}")
        return None

def create_negative_samples(standard_answer: str, anchors: Dict[str, Any]) -> List[str]:
    print("步骤4: 创建负样本...")
    
    negative_prompt = PromptTemplate(
        input_variables=["standard_answer", "anchors"],
        template="""[角色]
你作为专业审核员，需要创建典型错误答案

[任务]
基于标准答案生成2个错误答案，要求：
1. 每个答案包含1种专业常见错误类型
2. 错误类型必须属于以下类别：
   - 信息遗漏（遗漏关键要点或条件）
   - 信息添加（添加不存在的内容）
   - 概念混淆（错误理解主要概念）
   - 参数错误（数据或标准错误）
3. 保持答案表面合理性
4. 每个错误答案长度与标准答案相近

[标准答案]
{standard_answer}

[答案锚点]
{anchors}

[输出要求]
JSON格式: {{"negative_samples": ["错误答案1", "错误答案2"]}}
只返回JSON，不要其他内容。"""
    )
    
    try:
        formatted_prompt = negative_prompt.format(
            standard_answer=standard_answer,
            anchors=json.dumps(anchors, ensure_ascii=False, indent=2)
        )
        response = llm.invoke(formatted_prompt)
        
        # 提取JSON内容
        json_match = re.search(r'\{.*\}', response.content, re.DOTALL)
        if json_match:
            result = json.loads(json_match.group())
            negative_samples = result.get("negative_samples", [])
            print(f"负样本生成成功，共{len(negative_samples)}个")
            return negative_samples
        else:
            print("未找到有效JSON，生成默认负样本")
            return [
                f"关于{anchors.get('main_concept', '主要概念')}的部分信息（信息遗漏）",
                f"{anchors.get('main_concept', '主要概念')}还包括其他未提及的方面（信息添加）"
            ]
    except Exception as e:
        print(f"{e}")
        # return [
        #     "生成失败的错误答案1",
        #     "生成失败的错误答案2"
        # ]
        return None

def generate_professional_qa_pairs(documents: List[Document], num_pairs: int = 3) -> List[Dict[str, Any]]:
    """
    优化的QA生成主函数
    实现完整的四步骤流程
    """
    print("开始专业QA生成流程...")
    print("=" * 50)
    
    qa_pairs = []
    
    # 处理每个文档块
    # 可不可以不按照顺序，随机抽取文档块。。。
    for i, doc in enumerate(documents[:num_pairs]):
        print(f"\n处理文档块 {i+1}/{min(num_pairs, len(documents))}")
        print("-" * 30)
        
        chunk_text = doc.page_content
        
        try:
            # 提取答案锚点
            anchors = extract_answer_anchors(chunk_text)
            
            # 生成标准答案  
            standard_answer = generate_standard_answer(chunk_text, anchors)
            
            # 逆向生成问题
            question = generate_reverse_question(standard_answer, anchors)
            
            # 创建负样本
            negative_samples = create_negative_samples(standard_answer, anchors)
            
            # 构建完整QA对
            qa_pair = {
                "source_text": chunk_text,
                "question": question,
                "reference_answer": standard_answer,
                "negative_samples": negative_samples,
                "answer_anchors": anchors,
                "metadata": {
                    "chunk_index": i,
                    "generated_at": "2024-12-19",
                    "generation_method": "four_step_professional"
                }
            }
            
            qa_pairs.append(qa_pair)
            print(f"QA对 {i+1} 生成完成")
            
        except Exception as e:
            print(f"文档块 {i+1} 处理失败: {e}")
            continue
    
    print(f"QA生成完成，总共生成 {len(qa_pairs)} 个QA对")
    return qa_pairs

# 兼容性包装函数 
def generate_qa_pairs(documents: List[Document], num_pairs: int = 3) -> List[Dict[str, str]]:
    professional_qa = generate_professional_qa_pairs(documents, num_pairs)
    
    # 转换为简单格式以保持兼容性
    simple_qa = []
    for qa in professional_qa:
        simple_qa.append({
            "question": qa["question"],
            "answer": qa["reference_answer"],
            "source": "professional_generated"
        })
    
    return simple_qa

def quality_validation(qa_pairs: List[Dict[str, str]]) -> List[Dict[str, Any]]:
    """
    对生成的问答对进行质量验证
    对应流程图中的"质量验证"步骤
    """
    print("🔍 开始质量验证...")
    
    validation_prompt = PromptTemplate(
        input_variables=["question", "answer"],
        template="""
        请评估以下问答对的质量，从1-10分评分：
        
        问题: {question}
        答案: {answer}
        
        评估标准：
        1. 问题是否清晰明确？
        2. 答案是否准确完整？
        3. 问答对是否具有实用价值？
        
        请只回答一个1-10之间的数字分数。
        """
    )
    
    validated_qa = []
    
    for qa in qa_pairs:
        try:
            formatted_prompt = validation_prompt.format(
                question=qa["question"],
                answer=qa["answer"]
            )
            response = llm.invoke(formatted_prompt)
            
            # 提取分数
            score_text = response.content.strip()
            score = float(score_text) if score_text.replace('.', '').isdigit() else 5.0
            
            qa_with_score = qa.copy()
            qa_with_score["quality_score"] = score
            validated_qa.append(qa_with_score)
            
        except Exception as e:
            print(f"验证失败: {e}")
            qa_with_score = qa.copy()
            qa_with_score["quality_score"] = 5.0
            validated_qa.append(qa_with_score)
    
    # 按质量分数排序
    validated_qa.sort(key=lambda x: x["quality_score"], reverse=True)
    
    avg_score = sum(qa["quality_score"] for qa in validated_qa) / len(validated_qa)
    print(f"质量验证完成，平均分数: {avg_score:.2f}")
    
    return validated_qa

print("开始QA生成演示...")

professional_qa_pairs = generate_professional_qa_pairs(test_documents, num_pairs=10)

print(f"QA生成结果展示：")
print("=" * 60)

for i, qa_pair in enumerate(professional_qa_pairs):
    print(f"\nQA对 {i+1}:")
    print("-" * 40)
    print(f"原文片段: {qa_pair['source_text'][:100]}...")
    print(f"\n⚓ 答案锚点:")
    print(f"   主要概念: {qa_pair['answer_anchors']['main_concept']}")
    print(f"   关键要点: {qa_pair['answer_anchors']['key_points'][:2]}")
    
    print(f"\n生成问题: {qa_pair['question']}")
    print(f"\n标准答案: {qa_pair['reference_answer']}")
    
    print(f"\n负样本:")
    for j, neg in enumerate(qa_pair['negative_samples']):
        print(f"   {j+1}. {neg}")
    
    print(f"\n元数据: {qa_pair['metadata']['generation_method']}")
    print("=" * 60)

# print(f"使用原有generate_qa_pairs接口:")
# traditional_qa = generate_qa_pairs(test_documents, num_pairs=2)
# validated_qa = quality_validation(traditional_qa)

# print(f"\n兼容模式生成的问答对：")
# for i, qa in enumerate(validated_qa):
#     print(f"\n问答对 {i+1} (分数: {qa['quality_score']}):")
#     print(f"Q: {qa['question']}")
#     print(f"A: {qa['answer']}")
    
# print(f"\n专业QA生成模块升级完成！")

# print(f"  答案锚点提取")
# print(f"  标准答案生成") 
# print(f"  逆向问题生成")
# print(f"  负样本创建")


开始QA生成演示...
开始专业QA生成流程...

处理文档块 1/10
------------------------------
步骤1: 提取答案锚点...
答案锚点提取成功
步骤2: 生成标准答案...
标准答案生成成功
步骤3: 逆向生成问题...
问题生成成功
步骤4: 创建负样本...
负样本生成成功，共2个
QA对 1 生成完成

处理文档块 2/10
------------------------------
步骤1: 提取答案锚点...
答案锚点提取成功
步骤2: 生成标准答案...
标准答案生成成功
步骤3: 逆向生成问题...
问题生成成功
步骤4: 创建负样本...
负样本生成成功，共2个
QA对 2 生成完成

处理文档块 3/10
------------------------------
步骤1: 提取答案锚点...
答案锚点提取成功
步骤2: 生成标准答案...
标准答案生成成功
步骤3: 逆向生成问题...
问题生成成功
步骤4: 创建负样本...
负样本生成成功，共2个
QA对 3 生成完成

处理文档块 4/10
------------------------------
步骤1: 提取答案锚点...
答案锚点提取成功
步骤2: 生成标准答案...
标准答案生成成功
步骤3: 逆向生成问题...
问题生成成功
步骤4: 创建负样本...
负样本生成成功，共2个
QA对 4 生成完成

处理文档块 5/10
------------------------------
步骤1: 提取答案锚点...
答案锚点提取成功
步骤2: 生成标准答案...
标准答案生成成功
步骤3: 逆向生成问题...
问题生成成功
步骤4: 创建负样本...
负样本生成成功，共2个
QA对 5 生成完成

处理文档块 6/10
------------------------------
步骤1: 提取答案锚点...
答案锚点提取成功
步骤2: 生成标准答案...
标准答案生成成功
步骤3: 逆向生成问题...
问题生成成功
步骤4: 创建负样本...
负样本生成成功，共2个
QA对 6 生成完成

处理文档块 7/10
------------------------------
步骤1: 提取答案锚点...
答案锚点提取成功
步骤2: 生成

In [16]:
professional_qa_pairs

[{'source_text': ')> \nen ... \nO> \nO> \n00 \n... \n"\' 0 ... \nu, \nAS 1668.1:2015 \n(Incorporating Amendment No . 1) \nAustralian \nSTANDARD \nThe use of ventilation and air \nconditioning in buildings \nPart 1: Fire and smoke control in \nbuildings',
  'question': '根据澳大利亚标准AS 1668.1:2015和修订版，建筑物中的火灾和烟雾控制主要依赖于哪些系统？',
  'reference_answer': 'Fire and smoke control in buildings is governed by the Australian Standard AS 1668.1:2015, which also includes Amendment No. 1. This standard outlines the use of ventilation and air conditioning systems to ensure proper fire and smoke control measures are in place within buildings.',
  'negative_samples': ['Fire and smoke control in buildings is not regulated by any standards or guidelines, it is up to individual building owners to decide on the measures to take.',
   'The use of ventilation and air conditioning systems is not necessary for fire and smoke control in buildings, natural ventilation is sufficient.'],
  'answer_anchors': {'main_concep