In [10]:
import os
import re
import csv
import json
from tqdm import tqdm

In [None]:
your_api_key = '123'
your_api_base= '123'
your_model_name = '123'


# 调用LLM获取ComplianceUnit

## prompt 模板

In [11]:
prompt_cu_v1 = """
法律条文cu拆分指令

你现在需要以资深法律分析专家的身份，执行法律条文到合规检查单元（ComplianceUnit, cu）的精确转换任务。以下是完整的工作指南：

一、角色定位与核心使命
作为法律智能分析引擎，你的核心任务是：
1. 像资深法律顾问那样准确解构法律条文
2. 将复杂的法律表述转化为可验证的原子化单元

二、cu构建规范

0. 格式
  {{
      "subject": "",
      "condition": "",
      "constraint": "",
      "contextual_info": ""
  }}

1. 主体(subject)处理规则：
   - 主体是受到本法规的限制或者要求的主体 
   - 有些法律的条款很长, 可能存在"主体A在C情况时候应满足要求D, 在E情况时应当满足要求F...", 你要关注上下文关系正确标注主体. 
   - 专有名词遵循原文, 如"董监高"无需拆分为"董事, 监事和高管"
   - 多个主体用" | "分隔（示例："控股股东 | 实际控制人"）只有主体可以拼接, 条件和限制不可以拼接. 
   - 如果本单元不适用[主体, 条件, 限制]的划分, 就记录在contextual_info, 然后本条放一个空字符串
   - 无明确subject时留空

2. 条件(condition)构建指南：
   - 为表述准确, 可以写成较长的复句
   - 包含完整的规则触发情景：
     - 如有主体行为: 不需要再赘述主体, 例如股东减持股份可以直接记作"减持股份"
     - 如有第三方状态:  必须注明是哪个第三方, 如"'上市公司'被立案调查期间"
     - 如有时间限定: 忠于原文进行表述, 例如"首次卖出的15个交易日前"
   - 当出现"但是...除外"、"除...外"等但书结构时：
      - 原文："主体A存在B场景时应当C，存在D情况时除外"
      - 正确condition："存在B场景且不存在D情况"
      - 错误处理：单独建立与D相关的cu
   - 无明确condition时留空

3. 约束(constraint)表述规范：
   - 为表述准确, 可以写成较长的复句
   - 保留原文的强制性表述（"应当"、"不得"等）
   - 量化要求必须完整保留（如"不得超过3个月"等）
   - 信息披露要求必须完整保留(如"应当进行某某披露, 披露应包含某某内容, 披露有某某时效要求"等)
   - constraint中不应当包含主体A"可以"进行行为B这样的语句, 因为这不是一个constrain, 而是某种权利的声明, 应当放置在contextual_info中. 
   - 无明确constraint时留空

4. 辅助信息(contextual_info)处理规则：
   - 存放无法归类到前三项的内容，包括但不限于：
     - 指标计算方式（如"收盘价以发行日向后复权计算"）. 有的法条内定义了一些指标的计算方式, 而一些cu的执行依赖这些指标. 你需要注意上下文关系, 将指标的计算方式放在对应的cu的contextual_info内, 例如某cu的constrain项对某个指标提出要求, 而contextual_info项记录该指标的运行方式. 
     - 法条的立意和执行信息, 如"为实现xxx目标依托xxx上位法指定本法", "本法由xx主体负责执行和解释"等
   - contextual_info并不是"附加信息", 更不是对某个条款的拓展和解释, 禁止把本应该属于condition和constrain的信息放在contextual_info. 
   - 法律规定 "主体A可以进行行为B" 时, 表示赋予权利或倡议的, 不属于合规要求, 应当放在contextual_info而不是constrain; 法律规定 "主体A可以进行行为B" , 蕴含 "如果不这么做就会违规" 的语义时, 应当属于constrain. 例如, "大股东可以减持其所持股份的25%" 就是constrain. 经验上看, 主体为交易所和证监会等监管部门时大概率为表示权利, 主体为其他情况时多为constrain. Example: "本所可以依规对违规减持行为采取相应监管措施"属于contextual_info. 
   - 记录contextual_info的时候尽量遵照原文, 避免自行解释
   - 若无contextual_info则留空. 切勿杜撰. 

5. cu原则（请严格遵循）
每个合规检查单元必须满足以下原则：

【原子性要求】
- 每个cu只能描述单一的义务场景, 存在多场景连接时参考以下要求拆分
  (1) 并列条款
  - subject中
  - condition中存在"或"的关系应当拆分, 存在"且"的关系不能拆分
  - constrain中存在"或"的关系不得拆分, 存在"且"的关系应当拆分
  - constrain中出现"应当符合下列规定"的是真并列关系, 所有的"下列规定"直接是and的关系; 出现"至少应当符合下列条件之一", 属于不能拆分的伪并列关系, 见下条. 
  (2) 识别"伪并列"条款
  - "应当至少符合下列条件之一", 并非是并列条款, 因为下面若干条件是存在一个即可的关系. 例如, "主体A存在B场景时应当至少符合下列条件之一: C, D, F", 应当拆分为一条: {{"subject":"A", "condition"："存在B场景", "constrain":"应当至少符合下列条件之一: C, D, F"}}
  (3) 递进条款（如"当A时，应当B；B完成后，应当C"）需拆分为两条("当A时，应当B", "当AB时，应当C"), 并注意保留前置条件
  (4) 但书条款可以有并列结构. 例如, "主体A存在B场景时应当C，存在D情况时除外"应当拆解为{{"subject":"A", "condition"："存在B场景且不存在D情况", "constrain":"应当C"}}

  

【保真性要求】
- 拆分后的cu集合必须与原文保持逻辑等价性，特别注意：
  - 不得扩大或缩小适用范围（如原文限定"立案调查期间"，不得简化为"调查期间"）
  - 保留所有量化指标（如"15个交易日"、"3个月"等）
  - 忠于原文

三、术语 (不需要进一步拆分的专有名词)
  拆分cu时应尽量遵守原文用词, 例如原文出现"董监高"则采用"董监高", 不需要拆分为董事, 监事和高级管理人员. 

四、EXAMPLES

▶ 原始法条：
第五条 持股5%以上股东通过集中竞价减持的，应当提前15日公告，但因司法强制执行导致的减持除外。
✅ 正确cu：
{{
    "subject": "持股5%以上股东",
    "condition": "通过集中竞价减持且不属于因为司法强制执行导致的",
    "constraint": "应当提前15日公告",
    "contextual_info": ""
}}
❌ 错误示例: 
{{
    "subject": "持股5%以上股东",
    "condition": "通过集中竞价减持",
    "constraint": "应当提前15日公告",
    "contextual_info": "因司法强制执行导致的减持除外"
}}


▶ 原始法条：
"主体A在B情况下应当C, C中包含D, E, F"
✅ 正确cu：
[{{subject:A, condition:B, constrain:应当C, C中包含D, condition:nan}},{{subject:A, condition:B, constrain:应当C, C中包含E, condition:nan}},{{subject:A, condition:B, constrain:应当C, C中包含F, condition:nan}}]
❌ 错误示例: 
[{{subject:A, condition:B, constrain:应当C, condition:C中包含D, F, F}}]


▶ 原始法条：
"上市公司根据《公司法》规定因维护公司价值及股东权益所必需回购股份的，应当符合以下条件之一：（一）...；（二）...；（三）...；（四）...。
"
✅ 正确cu：
[{{
    "subject": "上市公司",
    "condition": "根据《公司法》规定因维护公司价值及股东权益所必需回购股份的",
    "constraint": "应当符合以下条件之一：（一）...；（二）...；（三）...；（四）...",
    "contextual_info": ""
}}]
❌ 错误示例: 
将"应当符合以下条件之一"的四个条件分开组建cu, 这样实际上无法独立执行. 


五、其他注意事项
- 你应该考虑整个法条内部的上下文关系, 而不是机械地一句句拆解. 例如, 有时候法条的前半段在讲解某种指标的限制, 而后半段才讲解该指标如何计算, 你应该通过思考注意到这种上下文联系. 
- 保留所有修饰性副词（如"充分关注"、"主动做好"）
- 遇到模糊表述时保持原文结构，不得擅自解释


六、实践
请你按照提示词中对cu的定义, 将下列法条拆解为cu, 最后以一个python列表承载所有的cu, 每个cu是一个字典. 你必须用<cu> </cu>包裹你最后返回的列表, 不然这些数据无法被提取. 
你需要处理的法条: 
{law_article}
"""

## 批量调用llm

In [None]:
import csv
import asyncio
from tqdm.asyncio import tqdm_asyncio
from utils.call_gpt import call_gpt_async
import os

async def process_law_articles_async(
    file_name,
    max_concurrency: int = 10,
    encoding: str = 'utf-8-sig'
):
    """
    异步处理法律条文的函数，支持并行控制
    
    参数：
    input_file: 输入文件路径
    output_file: 输出文件路径
    max_concurrency: 最大并行请求数 (默认5)
    encoding: 文件编码 (默认utf-8-sig)
    """

    if file_name.endswith(".csv"):
        pass
    else:
        file_name += ".csv"

    input_dir = r"law_to_ComplianceUnit/st_1_law_csv"
    output_dir = r"law_to_ComplianceUnit/st_2_ComplianceUnit/raw_response"
    os.makedirs(output_dir, exist_ok=True)

    input_path = os.path.join(input_dir, file_name)
    output_path = os.path.join(output_dir, file_name)

    # 读取CSV文件
    with open(input_path, mode='r', encoding=encoding) as infile:
        reader = csv.DictReader(infile)
        law_articles = list(reader)

    # 共享状态容器
    class State:
        def __init__(self):
            self.total_prompt_tokens = 0
            self.total_completion_tokens = 0
            self.success_count = 0
            self.failure_count = 0
            self.processed_data = []
            self.lock = asyncio.Lock()

    state = State()

    async def process_article(article):
        """处理单个法条的异步任务"""
        nonlocal state
        law_article_num = article['law_article_num']
        law_article = article['law_article']
        
        try:
            prompt = prompt_cu_v1.format(law_article=law_article)
            
            # 调用异步接口
            content, reasoning_content, api_usage = await call_gpt_async(
                prompt=prompt,
                api_key=your_api_key,
                base_url=your_api_base,
                model=your_model_name,
                # temperature=0.6,
            )
            
            # 原子操作更新状态
            async with state.lock:
                state.total_prompt_tokens += api_usage.prompt_tokens
                state.total_completion_tokens += api_usage.completion_tokens
                state.success_count += 1
                state.processed_data.append({
                    'law_article_num': law_article_num,
                    'law_article': law_article,
                    'response': content,
                    'reasoning_content': reasoning_content,
                    'api_usage': api_usage
                })
            
            return True
        except Exception as e:
            async with state.lock:
                state.failure_count += 1
                print(f"Failed to process law article {law_article_num}: {e}")
            return False

    # 创建异步任务
    tasks = [process_article(article) for article in law_articles]
    
    # 使用tqdm进度条分批执行
    pbar = tqdm_asyncio(total=len(tasks), desc="Processing law articles")
    
    # 分批执行控制并发
    for i in range(0, len(tasks), max_concurrency):
        batch_tasks = tasks[i:i + max_concurrency]
        await asyncio.gather(*batch_tasks)
        pbar.update(len(batch_tasks))
    
    pbar.close()

    # 保存结果到CSV
    with open(output_path, mode='w', encoding=encoding, newline='') as outfile:
        fieldnames = ['law_article_num', 'law_article', 'response', 'reasoning_content', 'api_usage']
        writer = csv.DictWriter(outfile, fieldnames=fieldnames)
        
        writer.writeheader()
        for data in state.processed_data:
            writer.writerow(data)
    

In [13]:
# await process_law_articles_async('北京证券交易所上市公司持续监管指引第8号——股份减持和持股管理.csv')

## 从LLM回复中提取cu

In [1]:
import csv
import re
import json
import os
import pandas as pd # 导入 pandas 库

def split_responses(file_name):

    if file_name.endswith(".csv"):
        pass
    else:
        file_name += ".csv"

    input_dir = r"law_to_ComplianceUnit/st_2_ComplianceUnit/raw_response"
    output_dir = r"law_to_ComplianceUnit/st_2_ComplianceUnit"
    os.makedirs(output_dir, exist_ok=True)

    input_path = os.path.join(input_dir, file_name)
    
    # --- 修改输出文件路径和扩展名 ---
    base_name = os.path.splitext(file_name)[0] # 获取不带扩展名的文件名
    output_filename = f"{base_name}.xlsx" # 设置输出文件名为 .xlsx
    output_path = os.path.join(output_dir, output_filename)
    
    encoding = 'utf-8-sig' # CSV 读取时常用

    def clean_response(response):
        """清理和解析response列中的JSON数据"""
        try:
            # 统一替换<\\cu>为</cu>
            response = response.replace("<\\cu>", "</cu>")
            response = response.replace("<\\\\cu>", "</cu>")
            
            # 提取所有<cu>和</cu>之间的内容
            matches = re.findall(r'<cu>(.*?)</cu>', response, re.DOTALL)
            
            if matches:
                # 找到内容长度最长的那个
                longest_content = max(matches, key=len).strip()
                # 尝试修复常见的JSON引号问题（例如，使用单引号而不是双引号）
                try:
                    # 尝试直接解析
                    return json.loads(longest_content)
                except json.JSONDecodeError:
                     # 如果直接解析失败，尝试替换单引号为双引号（注意处理字符串内部的引号）
                    try:
                        corrected_content = re.sub(r"(?<!\\)'", '"', longest_content) # 替换非转义的单引号
                         # 进一步处理可能存在的字典键不是字符串的情况 (虽然不太标准，但有时会遇到)
                        corrected_content = re.sub(r"([{,]\s*)(\w+)(\s*:)", r'\1"\2"\3', corrected_content)
                        return json.loads(corrected_content)
                    except json.JSONDecodeError as e_inner:
                        print(f"二次JSON解析尝试失败: {e_inner} in content: {longest_content[:200]}...") # 打印部分内容帮助调试
                        return [] # 返回空列表表示解析失败
            else:
                return []
        except Exception as e: # 捕获更广泛的异常
            print(f"处理 response 时出错: {e} in response: {response[:200]}...")
            return []

    # 读取CSV文件
    try:
        with open(input_path, mode='r', encoding=encoding) as infile:
            reader = csv.DictReader(infile)
            law_articles = list(reader)
    except FileNotFoundError:
        print(f"错误：输入文件未找到 {input_path}")
        return
    except Exception as e:
        print(f"读取CSV文件时出错: {e}")
        return

    # 处理response并记录拆分后的数据
    split_data = []
    for row in law_articles:
        law_article_num = row.get('law_article_num', 'UNKNOWN') # 使用 .get() 避免 KeyError
        law_article = row.get('law_article', '')
        response_raw = row.get('response', '')
        api_usage = row.get('api_usage', '')
        reasoning_content = row.get('reasoning_content', '')
        
        responses_list = clean_response(response_raw)
        
        # 确保 responses_list 是一个列表
        if not isinstance(responses_list, list):
            print(f"警告: clean_response 未返回列表，针对 law_article_num={law_article_num}. 返回类型: {type(responses_list)}")
            responses_list = [] # 如果不是列表，置为空列表以避免后续错误

        for k, response_item in enumerate(responses_list, start=1):
             # 确保 response_item 是一个字典
            if not isinstance(response_item, dict):
                 print(f"警告: response 列表中的元素不是字典，针对 law_article_num={law_article_num}, k={k}. 元素: {response_item}")
                 continue # 跳过这个无效的元素

            split_data.append({
                # 'law_article_num': law_article_num, # 注释掉，因为不需要在最终输出中
                # 'law_article': law_article,
                'cu_id': f"cu_{law_article_num}_{k}",
                'subject': response_item.get('subject', ''),
                'condition': response_item.get('condition', ''),
                'constraint': response_item.get('constraint', ''),
                'contextual_info': response_item.get('contextual_info', ''),
                # 'api_usage': api_usage,
                # 'reasoning_content': reasoning_content
            })

    # --- 新增：排序功能 ---
    def get_sort_key(item):
        """为排序提取 cu_id 中的数字部分"""
        parts = item['cu_id'].split('_')
        try:
            # parts[0] 是 "cu"
            # parts[1] 是 law_article_num (可能包含非数字，如'.')
            # parts[2] 是 k (应该是数字)
            law_num_str = parts[1]
            k_num = int(parts[2])

            # 为了处理 law_article_num 可能包含 '.' 或其他字符，
            # 我们可以尝试将其拆分并转换为数字元组以进行自然排序
            # 例如 '10.1' -> (10, 1), '10' -> (10,)
            law_num_parts = []
            current_part = ''
            for char in law_num_str:
                if char.isdigit():
                    current_part += char
                else:
                    if current_part:
                         law_num_parts.append(int(current_part))
                    current_part = ''
                     # 可以选择性地保留非数字分隔符，或者忽略它们
                     # law_num_parts.append(char) # 如果需要保留分隔符
            if current_part:
                 law_num_parts.append(int(current_part))

            # 返回一个元组进行排序：先按法律条文编号的各部分排序，再按 k 排序
            return (tuple(law_num_parts), k_num)

        except (IndexError, ValueError) as e:
            # 如果 cu_id 格式不符合预期或无法转换数字，则进行简单字符串排序作为后备
            print(f"警告：解析 cu_id 时出错 '{item['cu_id']}' ({e}). 使用默认排序。")
            return (item['cu_id'],) # 返回原始 ID 以进行字符串排序

    split_data.sort(key=get_sort_key)

    # --- 修改：使用 pandas 保存为 Excel 文件 ---
    if split_data: # 只有在有数据时才保存
        # 定义最终输出的列顺序
        fieldnames = [
            'cu_id', 'subject', 'condition', 'constraint', 'contextual_info'
        ]
        
        # 将列表字典转换为 pandas DataFrame
        df = pd.DataFrame(split_data)
        
        # 确保只包含所需的列，并按指定顺序排列
        df_output = df[fieldnames]

        try:
            # 保存到 Excel 文件，不包含 DataFrame 的索引
            df_output.to_excel(output_path, index=False, engine='openpyxl')
            print(f'文件已排序并保存为 Excel: {output_path}')
        except Exception as e:
            print(f"保存 Excel 文件时出错: {e}")
    else:
        print("没有从输入文件中提取到有效数据，未生成输出文件。")

In [15]:
# split_responses('北京证券交易所上市公司持续监管指引第8号——股份减持和持股管理.csv')

# 主workflow

In [2]:
import os
# 定义文件目录路径
directory_path = r"law_to_ComplianceUnit/st_1_law_csv"

# 提取所有.doc文件的文件名，不包含路径
filenames = [
    f for f in os.listdir(directory_path) if f.endswith('.csv')
]


filenames

['北京证券交易所上市公司持续监管指引第8号——股份减持和持股管理.csv',
 '北京证券交易所上市公司持续监管指引第3号——股权激励和员工持股计划.csv',
 '北京证券交易所上市公司持续监管指引第5号——要约收购.csv',
 '北京证券交易所上市公司持续监管指引第2号——季度报告.csv',
 '北京证券交易所上市公司持续监管指引第1号——独立董事.csv',
 '北京证券交易所上市公司持续监管指引第9号——募集资金管理.csv',
 '北京证券交易所上市公司持续监管指引第4号——股份回购.csv',
 '北京证券交易所上市公司持续监管指引第10号——权益分派.csv',
 '北京证券交易所上市公司持续监管指引第6号——内幕信息知情人管理及报送.csv',
 '北京证券交易所上市公司持续监管指引第7号——转板.csv']

In [None]:
# for doc_file in doc_filenames:
#     main(doc_file)

for filename in filenames:
    await process_law_articles_async(filename)
    split_responses(filename)



文件已排序并保存为 Excel: law_to_ComplianceUnit/st_2_ComplianceUnit/北京证券交易所上市公司持续监管指引第8号——股份减持和持股管理.xlsx
文件已排序并保存为 Excel: law_to_ComplianceUnit/st_2_ComplianceUnit/北京证券交易所上市公司持续监管指引第3号——股权激励和员工持股计划.xlsx
文件已排序并保存为 Excel: law_to_ComplianceUnit/st_2_ComplianceUnit/北京证券交易所上市公司持续监管指引第5号——要约收购.xlsx
文件已排序并保存为 Excel: law_to_ComplianceUnit/st_2_ComplianceUnit/北京证券交易所上市公司持续监管指引第2号——季度报告.xlsx
文件已排序并保存为 Excel: law_to_ComplianceUnit/st_2_ComplianceUnit/北京证券交易所上市公司持续监管指引第1号——独立董事.xlsx
文件已排序并保存为 Excel: law_to_ComplianceUnit/st_2_ComplianceUnit/北京证券交易所上市公司持续监管指引第9号——募集资金管理.xlsx
文件已排序并保存为 Excel: law_to_ComplianceUnit/st_2_ComplianceUnit/北京证券交易所上市公司持续监管指引第4号——股份回购.xlsx
文件已排序并保存为 Excel: law_to_ComplianceUnit/st_2_ComplianceUnit/北京证券交易所上市公司持续监管指引第10号——权益分派.xlsx
文件已排序并保存为 Excel: law_to_ComplianceUnit/st_2_ComplianceUnit/北京证券交易所上市公司持续监管指引第6号——内幕信息知情人管理及报送.xlsx
文件已排序并保存为 Excel: law_to_ComplianceUnit/st_2_ComplianceUnit/北京证券交易所上市公司持续监管指引第7号——转板.xlsx
