<font color=red>测试初始化<font>

In [None]:
# Django初始化
import django_setup

In [2]:
# 导入相关模型：get_user_model, Project, FileRecord, DocumentAnalysis, FileProjectLink, ProjectHistory
from django.contrib.auth import get_user_model
from apps.doc_analysis.models import DocumentAnalysis, InvalidStatusTransition
from apps.projects.models import Project
from apps.files.models import FileRecord
from django.core.files.uploadedfile import SimpleUploadedFile
from apps.doc_analysis.document_extractors import DocxExtractor



In [None]:
# 准备测试所需的 user, project, file_record  (其中project与file_record关联)
User = get_user_model()

# 获取已存在的测试数据

# 获取已存在的用户
user = User.objects.get(phone='18501771516')
print(f"用户: {user.phone}")
        
# 获取已存在的项目
project = Project.objects.get(project_name='测试项目1')
print(f"项目: {project.project_name}")
        
# 获取已存在的文件
file_record = FileRecord.objects.get(id='3')
print(f"文件: {file_record.name}")



场景1：创建分析并关联已有文件

In [None]:
#清除‘测试分析1”
DocumentAnalysis.objects.filter(title="测试分析1").delete()


In [None]:
# 1. 创建文档分析实例 - 测试分析1
doc_analysis = DocumentAnalysis.objects.create(
    project=project,
    title="测试分析1",
    created_by=user,
    #analysis_questions=["资质要求", "技术参数"]  # 示例分析问题
)
print(f"创建文档分析: {doc_analysis.title} (ID: {doc_analysis.id})")
print(f"初始状态: {doc_analysis.status}")

In [None]:
# 2. 关联已有文件
try:
    doc_analysis.update_file_record(file_record)
    print(f"成功关联文件: {file_record.name}")
    #print(f"提取的XML长度: {len(doc_analysis.raw_xml) if doc_analysis.raw_xml else 0}")
except Exception as e:
    print(f"关联文件失败: {str(e)}")

print("\n")

In [None]:
# 3. 提取元素
extractor=DocxExtractor(doc_analysis)
extractor.extract_elements()

In [None]:
# 4. 打印元素
from pprint import pprint
Analysis1 = DocumentAnalysis.objects.get(id=doc_analysis.id)
pprint(Analysis1.extracted_elements)


场景2：创建分析并上传新文件

In [None]:
#清除‘测试分析2”
DocumentAnalysis.objects.filter(title="测试分析2").delete()


In [None]:
# 1. 创建新的文档分析实例 - 测试分析2
doc_analysis2 = DocumentAnalysis.objects.create(
    project=project,
    title="测试分析2",
    created_by=user,
    analysis_questions=["投标要求", "评分标准"]  # 示例分析问题
)
print(f"创建文档分析: {doc_analysis2.title} (ID: {doc_analysis2.id})")

In [None]:
# 2.上传真实的 DOCX文件

# 2.1 准备文件路径
doc_path = "C:/Users/huiwa/Downloads/文本分析测试/CaseTest/case8：招标文件-第1包：一级压榨花生油.docx"

# 2.2 读取文件内容
with open(doc_path, 'rb') as f:
    file_content = f.read()
test_file = SimpleUploadedFile(
    "test_doc.docx",
    file_content,
    content_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document"
)
print(f"文件大小: {test_file.size}")

# 2. 创建新的文件记录 并 存储文件对象
new_file_record = FileRecord.objects.create(
    name="test_doc.docx",
    file=test_file,  # 使用之前准备的测试文件
    owner=user,
    size = test_file.size
)
print(f"创建文件记录: {new_file_record.name}")

In [7]:
# 3. 关联新文件
try:
    doc_analysis2.update_file_record(new_file_record)
except Exception as e:
    print(f"关联文件失败: {str(e)}")

In [None]:
# 4. 触发开始分析，并提取文档元素 elements 存入数据库
print("\n===== 最终状态检查 =====")
print(f"开始分析前-状态: {doc_analysis2.status}")
doc_analysis2.start_analysis()
print(f"开始分析后-状态: {doc_analysis2.status}")
extractor2=DocxExtractor(doc_analysis2)
extractor2.extract_elements();

<font color=orange size=3> 文章结构分析 <font> <br>
<font color=gray size=2> 
当我们分析招标文件时，首先会检视阅读，了解文档的框架和结构。<br>
好的框架和结构，能让大模型更好地理解文档的上下文。 <br>
具体到特定分析时，大模型能够根据框架结构，更准确地找到相关的内容。<br>

这会带来两个好处：<br>
1. 提升分析结果的准确性，避免无关内容带来的噪音影响<br>
2. 减少大模型分析时使用的token数，降低成本。 <br>

所以，我们会在这个环节，花一点时间和资源，来检视和校准文档的章节结构。<br>

章节结构信息通常来自：<br>
1. 文档开头的目录<br>
2. 正文中的标题（H1,H2,H3）或文档大纲<br>

理想情况下，它们的信息应该一致。如果不一致，就优先级而言，目录的优先级最高，其次是文档大纲，最后是标题。 <br>

章节结构层级通常建议在2-3级，过高的层级会造成大模型在选择上下文时无所适从，而过多的层级则无法体现文档的结构化信息。这也是检视的重点。我们会结合章节的篇幅给予建议，并借助大模型来获得更多的章节信息。<br>

最后，我们将向您交付章节结构的信息，您可以据此进行最终确认或进一步调整。 <br>

</font>

In [None]:
# 从数据库提取文档元素 elements 用于分析
elements = doc_analysis2.extracted_elements
from pprint import pprint
pprint(elements)

In [None]:
# 打印文档目录
print("====== 文档目录 ======")
for elem in elements:
    if elem['is_toc'] == True:
        print(f"{elem['sequence_number']}  {elem['content'][:30]}")


In [None]:
# 打印章节标题
print("====== 文档大纲标题 ======")
for elem in elements:
    if elem['is_heading'] == True:
        print(f"{elem['sequence_number']}  {elem['content'][:30]}")



In [12]:
# 导入分析器，并进行大纲分析，返回分析结果results
from apps.doc_analysis.outline_analyzer import DocumentOutlineAnalyzer
OutlineAnalyzer = DocumentOutlineAnalyzer(elements)
results = OutlineAnalyzer.compare_toc_and_outline()
# 打印分析结果
print(f"目录但非大纲标题的元素: {len(results['toc_only'])}")
print(f"大纲标题但非目录的元素: {len(results['outline_only'])}")
print(f"大纲标题层级与目录标题层级不匹配元素:{len(results['level_differences'])}")

In [16]:
#生成建议
suggestions = OutlineAnalyzer.outline_suggestions(results)
pprint(suggestions)

In [None]:
#采纳建议
# confirmed_suggestions 来自用户前端
confirmed_suggestions = None
OutlineAnalyzer.corrrect_outline(confirmed_suggestions)


In [None]:
OutlineAnalyzer.print_analysis_results(results)


In [None]:
for elem in elements:
    if elem['element_type'] == 'ElementType.PARAGRAPH':
        print(f"{elem['sequence_number']}  {elem['content'][:30]}")

In [None]:
print(type(elements))

In [None]:
pprint(elements[0])

In [14]:
from apps.doc_analysis.docx_parser._03_element_extractor import ElementType, DocumentElement

In [None]:
ele_type = elements[0]['element_type']
print(type(ele_type),ele_type)
ele_sequence_number = elements[0]['sequence_number']
print(type(ele_sequence_number), ele_sequence_number)


In [None]:
elements[0]['element_type'] == 'ElementType.PARAGRAPH'

In [17]:
from apps.doc_analysis.doc_structurer._03_tree_builder import TreeBuilder
from apps.doc_analysis.doc_structurer.doc_tree_retriever import DocTreeRetriever

In [18]:
tree_builder = TreeBuilder(elements)

In [19]:

doc_structure = tree_builder.build_to_level(target_level=2)

<font color=red>1. 测试分析模型 Models.py：创建和状态自动更新<font>

In [None]:
# 创建文档分析
analyses = DocumentAnalysis.objects.filter(project=project, file_record=file_record)
isAnalysisExist = analyses.exists()
if isAnalysisExist:
    print(f"分析已存在，跳过创建")
    this_analysis = analyses.filter(title="测试分析")
    this_analysis.update(status=DocumentAnalysis.AnalysisStatus.PENDING)
    print(f"初始化文档分析为PENDING状态: {this_analysis.first().status}")

else:
    analysis = DocumentAnalysis.objects.create(
        title="测试分析",
        project=project,
        file_record=file_record,
        created_by=user
    )


In [None]:
# 打印分析列表
analyses = DocumentAnalysis.objects.filter(title__startswith='测试分析')
for analysis in analyses:
    print(f"分析号:{analysis.id}\n",
          f"分析名称：{analysis.title}\n",
          f"分析所在项目：{analysis.project.project_name}\n", 
          f"分析的文件：{analysis.file_record.name}\n" ,
          f"分析的阶段：{analysis.status}\n",
          f"分析的问题：{analysis.analysis_questions}\n",
          f"分析结果：{analysis.analysis_result}\n",
          f"分析创建者：{analysis.created_by.phone}\n",
          f"分析用时：{analysis.processing_time}\n",
          )

In [None]:
# 测试状态转换流程
if analysis.status == DocumentAnalysis.AnalysisStatus.PENDING:
    print("1.可以测试文档分析从PENDING到PROCESSING的流转：")
    analysis.start_analysis()
    print(f"开始分析后状态: {analysis.status}")
else:
    print(f"1. 测试文档分析状态在{analysis.status}，不能使用start_analysis()方法")


In [None]:
# 模拟分析结果
sample_result = [
    {
        "question": "资质要求",
        "answer": "需要具备建筑施工总承包特级资质"
    },
    {
        "question": "技术参数",
        "answer": "项目规模：建筑面积50000平方米"
    }
]
# 完成分析
if analysis.status == DocumentAnalysis.AnalysisStatus.PROCESSING:
    print("2.可以测试文档分析从PROCESSING到COMPLETED的流转：")
    analysis.complete_analysis(result=sample_result)
    print(f"完成分析后状态: {analysis.status}")

else:
    print(f"2. 测试文档分析状态在{analysis.status}，不能使用complete_analysis()方法")


In [None]:
# 确认分析结果
confirmed_results = [
    {
        "question": "资质要求",
        "answer": "需要具备建筑施工总承包特级资质",
        "comment": "确认无误"
    },
    {
        "question": "技术参数",
        "answer": "项目规模：建筑面积50000平方米",
        "comment": "数据已核实"
    }
]

# 完成分析
if analysis.status == DocumentAnalysis.AnalysisStatus.COMPLETED:
    print("3.可以测试文档分析从COMPLETED到CONFIRMED的流转：")
    analysis.confirm_analysis(user=user, confirmed_results=confirmed_results)
    print(f"完成分析后状态: {analysis.status}")

else:
    print(f"3. 测试文档分析状态在{analysis.status}，不能使用confirm_analysis()方法")


In [None]:
# 测试错误状态转换
print("\n2. 测试错误状态转换处理：")
try:
    # 创建新的分析实例用于测试失败场景
    failed_analysis = DocumentAnalysis.objects.create(
        title="测试失败分析",
        project=project,
        file_record=file_record,
        created_by=user
    )
    print(f"创建失败分析的状态: {failed_analysis.status}")
    # 直接尝试确认一个未完成的分析
    failed_analysis.confirm_analysis(user=user, confirmed_results=[])
except InvalidStatusTransition as e:
    print(f"预期的错误捕获: {str(e)}")

In [None]:
# failed_analysis.delete()

In [None]:
# 测试失败流程
failed_analysis.start_analysis()
failed_analysis.fail_analysis(error_message="文档格式不支持")
print(f"失败分析状态: {failed_analysis.status}")
print(f"错误信息: {failed_analysis.error_message}")

In [None]:
# 查看分析结果
print("\n3. 查看最终分析结果：")
final_analysis = DocumentAnalysis.objects.get(id=analysis.id)
print(f"分析标题: {final_analysis.title}")
print(f"当前状态: {final_analysis.status}")
print(f"分析结果: {final_analysis.analysis_result}")
print(f"确认时间: {final_analysis.confirmed_at}")
print(f"确认用户: {final_analysis.confirmed_by.phone}")

<font color=red>2.Serializers.py测试<font>

In [5]:
from apps.doc_analysis.serializers import ( 
    DocumentAnalysisBaseSerializer, 
    DocumentAnalysisCreateSerializer,
    AnalysisResultUpdateSerializer,
    AnalysisConfirmationSerializer,
    DocumentAnalysisDisplaySerializer
)

In [21]:
# 准备测试数据
test_file = FileRecord.objects.get(id='2')
test_project = Project.objects.get(project_name='测试项目1')
test_user = User.objects.get(phone='18501771516')

In [22]:
# 模拟请求类
class MockRequest:
    def __init__(self, user=None):
        self.user = test_user
        self.method = 'POST'  # 可以根据需要设置请求方法
        self.META = {}        # 请求元数据
        self.session = {}     # 会话数据

# 创建模拟请求实例
mock_request = MockRequest(user=test_user)


In [None]:
# 1. 测试创建序列化器
print("=== 测试创建序列化器 ===")
create_data = {
    "project_id": test_project.id,
    "file_record_id": test_file.id,
    "title": "序列化器测试分析",
    "analysis_questions": ["资质要求", "技术参数"]
}

create_serializer = DocumentAnalysisCreateSerializer(
    data=create_data,
    context={'request': MockRequest()}
)

if create_serializer.is_valid():
    new_analysis = create_serializer.save()
    print(f"✅ 创建成功 - ID: {new_analysis.id}")
else:
    print(f"❌ 创建失败 - 错误: {create_serializer.errors}")

In [None]:
# 打印new_analysis
for analysis in [new_analysis]:
    print(f"分析号:{analysis.id}\n",
          f"分析名称：{analysis.title}\n",
          f"分析所在项目：{analysis.project.project_name}\n", 
          f"分析的文件：{analysis.file_record.name}\n" ,
          f"分析的阶段：{analysis.status}\n",
          f"分析的问题：{analysis.analysis_questions}\n",
          f"分析结果：{analysis.analysis_result}\n",
          f"分析创建者：{analysis.created_by.phone}\n",
          f"分析用时：{analysis.processing_time}\n",
          )

In [None]:
# 测试无效文件类型
print("\n测试无效文件类型:")
invalid_file = FileRecord.objects.create(
    name="test.txt",
    type="TXT",
    size=1024,
    owner=test_user,  # 添加必需的owner字段
    version=1,        # 添加必需的version字段
    processing_status='NONE',  # 添加必需的processing_status字段
    created_by=test_user.phone  # 添加必需的created_by字段
)

invalid_data = create_data.copy()
invalid_data["file_record_id"] = invalid_file.id

invalid_serializer = DocumentAnalysisCreateSerializer(
    data=invalid_data,
    context={'request': mock_request}
)

if not invalid_serializer.is_valid():
    print(f"✅ 正确捕获错误: {invalid_serializer.errors}")

In [None]:
# 2. 测试结果更新序列化器
print("\n=== 测试结果更新序列化器 ===")
update_data = {

            "question": "资质要求",
            "answer": "需要具备建筑施工总承包特级资质",
            "context": ["上下文段落1", "上下文段落2"],
            "confidence": 0.95
        }

result_serializer = AnalysisResultUpdateSerializer(
    instance=new_analysis,
    data=update_data,
    context={'request': mock_request}
)

if result_serializer.is_valid():
    updated = result_serializer.save()
    print(f"✅ 结果更新成功 - 最新结果: {updated.analysis_result[-1]}")
else:
    print(f"❌ 更新失败 - 错误: {result_serializer.errors}")

In [None]:
# 3. 测试确认序列化器
print("\n=== 测试确认序列化器 ===")
confirmation_data = {
            "question": "资质要求",
            "answer": "需要具备建筑施工总承包特级资质",
            "comment": "测试确认"
        }


confirmation_serializer = AnalysisConfirmationSerializer(
    instance=new_analysis,
    data=confirmation_data,
    context={'request': mock_request}
)

if confirmation_serializer.is_valid():
    confirmed = confirmation_serializer.save()
    print(f"✅ 确认成功 - 状态: {confirmed.status}")
    print(f"确认信息: {confirmed.analysis_result[0].get('confirmation')}")
else:
    print(f"❌ 确认失败 - 错误: {confirmation_serializer.errors}")