提取原始训练数据

In [6]:
import json
import os
import pandas as pd
import re

# 输入JSON文件路径和输出目录
input_file_path = r'F:\大模型项目\Paper_1\第二部分_微调测试数据集\一轮微调\测试集分类\test_data.json'
output_dir = r'F:\大模型项目\Paper_1\第二部分_微调测试数据集\一轮微调\测试集分类\output_csv'

# 创建输出目录，如果目录不存在则创建
os.makedirs(output_dir, exist_ok=True)

# 加载JSON文件
with open(input_file_path, 'r', encoding='utf-8') as f:
    data = json.load(f)

# 定义提取instruction部分字段的函数
def extract_instruction_fields(instruction):
    fields = {}
    try:
        fields["Biomass resources"] = re.search(r"biomass resource used here is: (.*?),", instruction).group(1)
        fields["Raw material sources"] = re.search(r"sourced from: (.*?),", instruction).group(1)
        fields["Pre-processing methods"] = re.search(r"pre-treatment method is: (.*?),", instruction).group(1)
        fields["Preparation equipment"] = re.search(r"preparation equipment used is: (.*?),", instruction).group(1)
        fields["Other processing"] = re.search(r"other treatments include: (.*?),", instruction).group(1)
        fields["Modified"] = re.search(r"modification related information: (.*?),", instruction).group(1)
        fields["Cellulose content"] = re.search(r"cellulose content by weight percentage in the raw material is: (.*?),", instruction).group(1)
        fields["Hemicellulose content"] = re.search(r"hemicellulose content by weight percentage is: (.*?),", instruction).group(1)
        fields["Lignin content"] = re.search(r"lignin content by weight percentage is: (.*?),", instruction).group(1)
        fields["Ash content"] = re.search(r"ash content by weight percentage is: (.*?),", instruction).group(1)
        fields["Fixed carbon content"] = re.search(r"fixed carbon content by weight percentage is: (.*?),", instruction).group(1)
        fields["Volatile matter content"] = re.search(r"volatile matter content by weight percentage is: (.*?),", instruction).group(1)
        fields["Carbon content"] = re.search(r"carbon content by weight percentage is: (.*?),", instruction).group(1)
        fields["Hydrogen content"] = re.search(r"hydrogen content by weight percentage is: (.*?),", instruction).group(1)
        fields["Nitrogen content"] = re.search(r"nitrogen content by weight percentage is: (.*?),", instruction).group(1)
        fields["Oxygen content"] = re.search(r"oxygen content by weight percentage is: (.*?),", instruction).group(1)
        fields["Sulfur content"] = re.search(r"sulfur content by weight percentage is: (.*?),", instruction).group(1)
        fields["Kalium content"] = re.search(r"kalium content by weight percentage is: (.*?),", instruction).group(1)
        fields["Calcium content"] = re.search(r"calcium content by weight percentage is: (.*?),", instruction).group(1)
        fields["Natrium content"] = re.search(r"sodium content by weight percentage is: (.*?),", instruction).group(1)
        fields["Magnesium content"] = re.search(r"magnesium content by weight percentage is: (.*?),", instruction).group(1)
        fields["Ferrum content"] = re.search(r"ferrum content by weight percentage is: (.*?),", instruction).group(1)
        fields["Silicon content"] = re.search(r"silicon content by weight percentage is: (.*?),", instruction).group(1)
        fields["Highest treatment temperature"] = re.search(r"maximum treatment temperature in the pyrolysis experiment is: (.*?)℃,", instruction).group(1)
        fields["Heating rate"] = re.search(r"heating rate is: (.*?)℃/min,", instruction).group(1)
        fields["Residence time"] = re.search(r"holding time is: (.*?) min.", instruction).group(1)
    except Exception as e:
        # 如果正则表达式出错，打印该条目
        print(f"Error processing instruction: {instruction}")
        print(f"Error message: {e}")
    
    return fields

# 筛选数据并根据output字段生成数据
def filter_and_extract_data(data, target_property):
    filtered_data = []
    
    for entry in data:
        instruction = entry["instruction"]
        output = entry["output"]
        
        # 如果output包含多个属性（通过逗号或and连接），跳过
        if len(re.findall(r"(pH|ash|grain size|specific surface area|yield)", output)) > 1 or "and" in output:
            continue
        
        # 根据output内容筛选特定的属性
        if target_property in output:
            fields = extract_instruction_fields(instruction)
            # 提取output中的ash的数值（例如ash content）
            if target_property == "ash":
                # 精确提取包含 ash 的数值，并去掉可能存在的末尾句点
                numeric_value = re.search(r"ash content by weight percentage is ([\d\.]+)\.*", output)  # 匹配数字部分并去掉结尾的句点
            elif target_property == "pH":
                numeric_value = re.search(r"pH is ([\d\.]+)\.*", output)
            elif target_property == "grain size":
                numeric_value = re.search(r"grain size is ([\d\.]+)\.*", output)
            elif target_property == "specific surface area":
                numeric_value = re.search(r"specific surface area is ([\d\.]+)\.*", output)
            elif target_property == "yield":
                numeric_value = re.search(r"yield by weight percentage is ([\d\.]+)\.*", output)
            
            # 如果匹配到了数字
            if numeric_value:
                # 去除可能末尾的句点
                value = numeric_value.group(1).rstrip('.')  # 去掉末尾句点
                fields[target_property] = value  # 获取数值并添加到字段
                filtered_data.append(fields)
    
    return filtered_data

# 保存数据为CSV
def save_data_to_csv(filtered_data, target_property):
    df = pd.DataFrame(filtered_data)
    csv_file_path = os.path.join(output_dir, f"{target_property}_data.csv")
    # 使用 utf-8-sig 编码来确保写入文件时正确处理特殊字符（包括 Excel 打开时的编码问题）
    df.to_csv(csv_file_path, index=False, encoding='utf-8-sig')
    print(f"{target_property} data saved to {csv_file_path}")

# 筛选并保存Ash数据
Ash_data = filter_and_extract_data(data, "ash")
save_data_to_csv(Ash_data, "Ash")

# 筛选并保存其他数据（如pH、grain size等）
pH_data = filter_and_extract_data(data, "pH")
save_data_to_csv(pH_data, "pH")

grain_size_data = filter_and_extract_data(data, "grain size")
save_data_to_csv(grain_size_data, "grain size")

specific_surface_area_data = filter_and_extract_data(data, "specific surface area")
save_data_to_csv(specific_surface_area_data, "specific surface area")

yield_data = filter_and_extract_data(data, "yield")
save_data_to_csv(yield_data, "yield")


Ash data saved to F:\大模型项目\Paper_1\第二部分_微调测试数据集\一轮微调\测试集分类\output_csv\Ash_data.csv
pH data saved to F:\大模型项目\Paper_1\第二部分_微调测试数据集\一轮微调\测试集分类\output_csv\pH_data.csv
grain size data saved to F:\大模型项目\Paper_1\第二部分_微调测试数据集\一轮微调\测试集分类\output_csv\grain size_data.csv
specific surface area data saved to F:\大模型项目\Paper_1\第二部分_微调测试数据集\一轮微调\测试集分类\output_csv\specific surface area_data.csv
yield data saved to F:\大模型项目\Paper_1\第二部分_微调测试数据集\一轮微调\测试集分类\output_csv\yield_data.csv


提取大模型预测数据

In [11]:
import json
import os
import pandas as pd
import re

# 输入JSONL文件路径和输出目录
input_file_path = r'F:\大模型项目\Paper_1\第二部分_微调测试数据集\大模型微调预测结果\InternLM2.5-7B\二轮微调\测试集\generated_predictions.jsonl'
output_dir = r'F:\大模型项目\Paper_1\第二部分_微调测试数据集\大模型微调预测结果\InternLM2.5-7B\二轮微调\测试集\output_csv'

# 创建输出目录，如果目录不存在则创建
os.makedirs(output_dir, exist_ok=True)

# 加载JSONL文件
data = []
with open(input_file_path, 'r', encoding='utf-8') as f:
    for line in f:
        data.append(json.loads(line))

# 定义提取prompt部分字段的函数
def extract_prompt_fields(prompt):
    fields = {}
    try:
        fields["Biomass resources"] = re.search(r"biomass resource used here is: (.*?),", prompt).group(1)
        fields["Raw material sources"] = re.search(r"sourced from: (.*?),", prompt).group(1)
        fields["Pre-processing methods"] = re.search(r"pre-treatment method is: (.*?),", prompt).group(1)
        fields["Preparation equipment"] = re.search(r"preparation equipment used is: (.*?),", prompt).group(1)
        fields["Other processing"] = re.search(r"other treatments include: (.*?),", prompt).group(1)
        fields["Modified"] = re.search(r"modification related information: (.*?),", prompt).group(1)
        fields["Cellulose content"] = re.search(r"cellulose content by weight percentage in the raw material is: (.*?),", prompt).group(1)
        fields["Hemicellulose content"] = re.search(r"hemicellulose content by weight percentage is: (.*?),", prompt).group(1)
        fields["Lignin content"] = re.search(r"lignin content by weight percentage is: (.*?),", prompt).group(1)
        fields["Ash content"] = re.search(r"ash content by weight percentage is: (.*?),", prompt).group(1)
        fields["Fixed carbon content"] = re.search(r"fixed carbon content by weight percentage is: (.*?),", prompt).group(1)
        fields["Volatile matter content"] = re.search(r"volatile matter content by weight percentage is: (.*?),", prompt).group(1)
        fields["Carbon content"] = re.search(r"carbon content by weight percentage is: (.*?),", prompt).group(1)
        fields["Hydrogen content"] = re.search(r"hydrogen content by weight percentage is: (.*?),", prompt).group(1)
        fields["Nitrogen content"] = re.search(r"nitrogen content by weight percentage is: (.*?),", prompt).group(1)
        fields["Oxygen content"] = re.search(r"oxygen content by weight percentage is: (.*?),", prompt).group(1)
        fields["Sulfur content"] = re.search(r"sulfur content by weight percentage is: (.*?),", prompt).group(1)
        fields["Kalium content"] = re.search(r"kalium content by weight percentage is: (.*?),", prompt).group(1)
        fields["Calcium content"] = re.search(r"calcium content by weight percentage is: (.*?),", prompt).group(1)
        fields["Natrium content"] = re.search(r"sodium content by weight percentage is: (.*?),", prompt).group(1)
        fields["Magnesium content"] = re.search(r"magnesium content by weight percentage is: (.*?),", prompt).group(1)
        fields["Ferrum content"] = re.search(r"ferrum content by weight percentage is: (.*?),", prompt).group(1)
        fields["Silicon content"] = re.search(r"silicon content by weight percentage is: (.*?),", prompt).group(1)
        fields["Highest treatment temperature"] = re.search(r"maximum treatment temperature in the pyrolysis experiment is: (.*?)℃,", prompt).group(1)
        fields["Heating rate"] = re.search(r"heating rate is: (.*?)℃/min,", prompt).group(1)
        fields["Residence time"] = re.search(r"holding time is: (.*?) min.", prompt).group(1)
    except Exception as e:
        # 如果正则表达式出错，打印该条目
        print(f"Error processing prompt: {prompt}")
        print(f"Error message: {e}")
    
    return fields

# 筛选数据并根据label字段生成数据
def filter_and_extract_data(data, target_property):
    filtered_data = []
    
    for entry in data:
        prompt = entry["prompt"]
        label = entry["label"]
        predict = entry.get("predict", None)
        
        # 如果label包含多个属性（通过逗号或and连接），跳过
        if len(re.findall(r"(pH|ash|grain size|specific surface area|yield)", label)) > 1 or "and" in label:
            continue
        
        # 根据label内容筛选特定的属性
        if target_property in label:
            fields = extract_prompt_fields(prompt)
            # 提取label中的数值（例如ash content）
            if target_property == "ash":
                numeric_value = re.search(r"ash content by weight percentage is ([\d\.]+)\.*", label)
            elif target_property == "pH":
                numeric_value = re.search(r"pH is ([\d\.]+)\.*", label)
            elif target_property == "grain size":
                numeric_value = re.search(r"grain size is ([\d\.]+)\.*", label)
            elif target_property == "specific surface area":
                numeric_value = re.search(r"specific surface area is ([\d\.]+)\.*", label)
            elif target_property == "yield":
                numeric_value = re.search(r"yield by weight percentage is ([\d\.]+)\.*", label)
            
            # 如果匹配到了数字
            if numeric_value:
                value = numeric_value.group(1).rstrip('.')  # 去掉末尾句点
                fields[target_property] = value  # 获取数值并添加到字段

                # 如果存在 predict，并且它与 label 的属性类型一致，则也进行提取
                if predict and target_property in predict:
                    predict_value = re.search(rf"{target_property}.*?is ([\d\.]+)\.*", predict, re.IGNORECASE)
                    if predict_value:
                        fields[f"predict_{target_property}"] = predict_value.group(1).rstrip('.')
                    else:
                        # 打印无法匹配的 predict 内容
                        print(f"Could not extract predict value for '{target_property}' from: {predict}")

                filtered_data.append(fields)
    
    return filtered_data

# 保存数据为CSV
def save_data_to_csv(filtered_data, target_property):
    df = pd.DataFrame(filtered_data)
    csv_file_path = os.path.join(output_dir, f"{target_property}_data.csv")
    # 使用 utf-8-sig 编码来确保写入文件时正确处理特殊字符（包括 Excel 打开时的编码问题）
    df.to_csv(csv_file_path, index=False, encoding='utf-8-sig')
    print(f"{target_property} data saved to {csv_file_path}")

# 筛选并保存Ash数据
Ash_data = filter_and_extract_data(data, "ash")
save_data_to_csv(Ash_data, "Ash")

# 筛选并保存其他数据（如pH、grain size等）
pH_data = filter_and_extract_data(data, "pH")
save_data_to_csv(pH_data, "pH")

grain_size_data = filter_and_extract_data(data, "grain size")
save_data_to_csv(grain_size_data, "grain size")

specific_surface_area_data = filter_and_extract_data(data, "specific surface area")
save_data_to_csv(specific_surface_area_data, "specific surface area")

yield_data = filter_and_extract_data(data, "yield")
save_data_to_csv(yield_data, "yield")


Ash data saved to F:\大模型项目\Paper_1\第二部分_微调测试数据集\大模型微调预测结果\InternLM2.5-7B\二轮微调\测试集\output_csv\Ash_data.csv
pH data saved to F:\大模型项目\Paper_1\第二部分_微调测试数据集\大模型微调预测结果\InternLM2.5-7B\二轮微调\测试集\output_csv\pH_data.csv
grain size data saved to F:\大模型项目\Paper_1\第二部分_微调测试数据集\大模型微调预测结果\InternLM2.5-7B\二轮微调\测试集\output_csv\grain size_data.csv
specific surface area data saved to F:\大模型项目\Paper_1\第二部分_微调测试数据集\大模型微调预测结果\InternLM2.5-7B\二轮微调\测试集\output_csv\specific surface area_data.csv
yield data saved to F:\大模型项目\Paper_1\第二部分_微调测试数据集\大模型微调预测结果\InternLM2.5-7B\二轮微调\测试集\output_csv\yield_data.csv
