In [1]:
import json
import os

data_path = "/Users/czy/projects/baohuchu_demo/data/test_data/1049个训练样本-1016_20250306_114145_04171bef2b50213a.json"

with open(data_path, "r") as f:
    data = json.load(f)
print(len(data))

2050


In [6]:
def generate_instruction(input_data: dict) -> dict:
    # 必需字段
    required_fields = ['line_name', 'error_wave_one_cycle_ago', 'error_wave_one_cycle_after', 
                       'error_again_wave_one_cycle_after', 'protect_recover', 'gt']
    
    # 验证输入数据是否包含必需的字段
    for field in required_fields:
        if field not in input_data:
            raise ValueError(f"缺少必要的字段: {field}")

    # 提取数据到变量中
    line_name = input_data['line_name']
    error_wave_one_cycle_ago = input_data['error_wave_one_cycle_ago']
    error_wave_one_cycle_after = input_data['error_wave_one_cycle_after']
    error_again_wave_one_cycle_after = input_data['error_again_wave_one_cycle_after']
    protect_recover = input_data['protect_recover']
    fault_type = input_data['gt']

    # 生成input字段的内容
    input_value = (f"线路名称：{line_name}；故障前一周波模拟值：{error_wave_one_cycle_ago}；"
                   f"故障后一周波模拟值：{error_wave_one_cycle_after}；再次故障后一周波模拟值：{error_again_wave_one_cycle_after}；"
                   f"重合闸数量：{len(protect_recover)}套。请用json形式给出重合闸小结、分析结论和故障分类。")

    # 数值分析
    def analyze_values(wave_before, wave_after):
        Ia_change = abs(float(wave_after['Ia']) - float(wave_before['Ia']))
        Ib_change = abs(float(wave_after['Ib']) - float(wave_before['Ib']))
        Ic_change = abs(float(wave_after['Ic']) - float(wave_before['Ic']))
        U0_after = float(wave_after['U0'])
        I0_after = float(wave_after['I0'])

        # 判断故障相
        fault_phase = None
        if Ia_change > 5 * Ib_change and Ia_change > 5 * Ic_change:
            fault_phase = 'A'
        elif Ib_change > 5 * Ia_change and Ib_change > 5 * Ic_change:
            fault_phase = 'B'
        elif Ic_change > 5 * Ia_change and Ic_change > 5 * Ib_change:
            fault_phase = 'C'

        # 分析结论
        analysis_conclusion = (f"故障时零序电压大于5V，仅有{fault_phase}相电压小于55V，"
                               f"{fault_phase}相电流变化量大于B、C相电流变化量的5倍，零序电流大于1A，符合{fault_phase}相接地故障特征，"
                               f"{line_name}发生{fault_phase}相接地故障。")
        
        return {
            "analysis_conclusion": analysis_conclusion,
            "fault_phase": fault_phase,
            "U0_after": U0_after,
            "I0_after": I0_after
        }

    analysis_result = analyze_values(error_wave_one_cycle_ago, error_wave_one_cycle_after)

    # 构建output字段的内容
    output_value = {
        '重合闸小结': (f"{len(protect_recover)}套保护重合闸动作，重合后三相电压均大于55V，零序电压小于5V，"
                    f"重合后故障消失，重合成功。"),
        '分析结论': analysis_result["analysis_conclusion"],
        '故障分类': fault_type
    }

    # 返回最终结果
    output_data = {
        'input': input_value,
        'output': str(output_value),
    }
    return output_data

In [7]:
import json

data_path = '/Users/czy/projects/baohuchu_demo/data/test_data/1049个训练样本-1016_20250324_180309_ce79e70008f87ff1.json'
with open(data_path, "r") as f:
    data_list = json.load(f) 

len(data_list)

2050

In [13]:
len=50
for data in data_list[1800:1800+len]:
    output_data = generate_instruction(data)
    print(output_data)

TypeError: 'int' object is not callable

In [3]:
import json

def flatten_and_convert_to_jsonl(data, output_file):
    """
    将嵌套的故障数据转换为JSONL格式
    
    Args:
        data: 输入的嵌套字典数据
        output_file: 输出的JSONL文件路径
    """
    flattened_data = []
    
    # 遍历每种故障类型
    for fault_type, fault_cases in data.items():
        # 遍历该故障类型下的所有案例
        for case_list in fault_cases:
            for case in case_list:
                # 提取内容并添加故障类型
                if 'content' in case:
                    entry = case['content']
                else:
                    entry = case
                    
                # 添加额外的元数据
                entry['fault_type'] = fault_type
                if 'file_path' in case:
                    entry['file_path'] = case['file_path']
                
                flattened_data.append(entry)
    
    # 写入JSONL文件
    with open(output_file, 'w', encoding='utf-8') as f:
        for entry in flattened_data:
            json_line = json.dumps(entry, ensure_ascii=False)
            f.write(json_line + '\n')
    
    return len(flattened_data)

# 使用示例
input_data_dir = '/Users/czy/projects/baohuchu_demo/data/test_data/few_shot_data.json'  # 您的输入数据
with open(input_data_dir, 'r', encoding='utf-8') as f:
    input_data = json.load(f)
    
output_file = '/Users/czy/projects/baohuchu_demo/data/test_data/few_shot_data.jsonl'
num_entries = flatten_and_convert_to_jsonl(input_data, output_file)
print(f"已转换 {num_entries} 条记录到 {output_file}")

# 验证转换结果
def verify_jsonl(file_path):
    """验证JSONL文件的内容"""
    with open(file_path, 'r', encoding='utf-8') as f:
        lines = f.readlines()
        print(f"文件共有 {len(lines)} 行")
        
        # 检查前两行的内容
        for i, line in enumerate(lines[:2]):
            try:
                data = json.loads(line)
                print(f"\n第 {i+1} 行数据示例:")
                print(f"故障类型: {data.get('fault_type')}")
                print(f"线路名称: {data.get('line_name')}")
                print(f"变电站: {data.get('station_name')}")
            except json.JSONDecodeError as e:
                print(f"第 {i+1} 行解析错误: {e}")

# 验证转换结果
verify_jsonl(output_file)

已转换 12 条记录到 /Users/czy/projects/baohuchu_demo/data/test_data/few_shot_data.jsonl
文件共有 12 行

第 1 行数据示例:
故障类型: C相接地故障
线路名称: 220kV新温线
变电站: 温水站

第 2 行数据示例:
故障类型: BC相间短路故障
线路名称: 220kV沂祖Ⅰ线
变电站: 孙祖站
