# 1 数据预处理

## 1.1 读取数据

In [2]:
import os
import json

# 获取项目根目录（假设当前 notebook 在 notebooks 目录下）
project_root = os.path.dirname(os.getcwd())  # 上一级目录即为项目根目录

# 构建数据路径
fault_file = os.path.join(project_root, "raw_data", "故障类.json")
non_fault_file = os.path.join(project_root, "raw_data", "非故障类.json")

print("故障类文件路径:", fault_file)
print("非故障类文件路径:", non_fault_file)

故障类文件路径: E:\code\py\DSPy\raw_data\故障类.json
非故障类文件路径: E:\code\py\DSPy\raw_data\非故障类.json


In [3]:
def read_json_file(file_path):
    data = []
    with open(file_path, 'r', encoding='utf-8-sig') as f:  # 使用 utf-8-sig 读取
        try:
            items = json.load(f)  # 整个文件是列表
            for item in items:
                input_text = item.get('input', '').strip()  # 如果没有这个键，则返回默认值空字符串
                label = item.get('faulty_call', '-1').strip()  # 如果没有这个键，默认返回-1
                if input_text and label in ('0', '1'):
                    data.append({
                        'input': input_text,
                        'label': '故障' if label == '1' else '非故障'
                    })
        except json.JSONDecodeError as e:
            print(f"解析错误：{e}")
    return data

# 读取数据
fault_data = read_json_file(fault_file)
non_fault_data = read_json_file(non_fault_file)

# 合并数据
all_data = fault_data + non_fault_data

# 打印数据样本
print("读取样本数:", len(all_data))
print("故障样本数:", len(fault_data))
print("非故障样本数:", len(non_fault_data))
print("示例数据:", all_data[0])

读取样本数: 492
故障样本数: 117
非故障样本数: 375
示例数据: {'input': '西北实习生李龙。你好天山站，杨立斌向您回令。喂啥那个啥？是那个操操作天哈，一线线路保护投入的。保护投入了稍等我给你转一下。嗯，好。', 'label': '故障'}


In [4]:
for i in all_data:
    print(i)

{'input': '西北实习生李龙。你好天山站，杨立斌向您回令。喂啥那个啥？是那个操操作天哈，一线线路保护投入的。保护投入了稍等我给你转一下。嗯，好。', 'label': '故障'}
{'input': '你好哈，密变王勇。你要干什么？回令退与我们这有个投保护的。嗯，回去是吧，一会我给你回过去吧。行好的。再见。', 'label': '故障'}
{'input': '西北郭相阳汇回令是吧？是的。你说。嗯网口令操作单位天山换流站操作内容投入71309天哈一线CSC103B线路保护CSC125过压远眺保护操作完成时间79算了分。好，你们现在站内。天哈，一线两套线路保护都运行正常，是吧？嗯，是的。好知道了，再见。嗯，好再见。', 'label': '故障'}
{'input': '你好西北。木垒变黄小龙回令。回下令。母差保护投入的。保护的没有了。', 'label': '故障'}
{'input': '喂，你好宁夏孙原。西北郭艳辉，我问一下你们今天那个切除的风机的出力恢复了没。动保。恢复一部分。恢复的是什么？就就那个大风导致提脱网的这个分机是吧？过了。恢复了八十七万了，现在还有100一百六十九万还没恢复。你们损失了这么多，后边有跟我们汇报吗？这这个多久汇报一次呀。我看。你这个上上午是咋汇报的，你不是切除就得跟我们说一下吗？他这个变动特别快。只有两家，是吧？我们接到您汇报，只有贺兰山第一、第二风电场损失了八万。送宝第五、第六还有新娘第一风电场损失了六万千瓦，只接到汇报这些呀。这这个目前有点多，我我把那个统计表给你发过去吧。你们现在还有多少没恢复吗？没恢复还有100然后看有个一百六十万六十多万。六十九万。你把那个表发过来，我看一下。嗯，嗯，好的好。', 'label': '故障'}
{'input': '喂，你好西北实习调度员。新疆区调谈事情给您汇报一下三塘湖木垒的那个稳控的操控。你回令。你找好像这个一会给郭工回。西北郭亚惠。郭总，新疆洽谈区就是那个三塘湖和木垒的750的PCS-992装置二以及这个220到750通道压板四套都投上了，然后运行正常。好，我知道了再见。嗯嗯，好再见，嗯。', 'label': '故障'}
{'input': '喂，你好西北实习调度员回令是吧？对回令。等一下给你回过去。一样。', 'label': '故障'}
{'input': '木垒变黄小龙

## 1.2 使用 sklearn.model_selection.train_test_split 划分训练集和验证集

In [5]:
from sklearn.model_selection import train_test_split

# 设置验证集比例（例如 20%）
test_size = 0.2
random_state = 42  # 固定随机种子，确保结果可复现

# 只提取 input 和 label 用于划分
X = [item['input'] for item in all_data]
y = [item['label'] for item in all_data]

# 划分数据集
X_train, X_val, y_train, y_val = train_test_split(
    X, y,
    test_size=test_size,
    stratify=y,          # 保持类别分布一致
    random_state=random_state
)

# 打印划分结果
print("总样本数:", len(all_data))
print("训练集样本数:", len(X_train))
print("验证集样本数:", len(X_val))

总样本数: 492
训练集样本数: 393
验证集样本数: 99


## 1.3 构建 DSPy 的 trainset 和 valset

In [6]:
import dspy

# 构建训练集
trainset = []
for input_text, label in zip(X_train, y_train):
    example = dspy.Example(
        input=input_text,
        label=label
    ).with_inputs("input")
    trainset.append(example)

# 构建验证集
valset = []
for input_text, label in zip(X_val, y_val):
    example = dspy.Example(
        input=input_text,
        label=label
    ).with_inputs("input")
    valset.append(example)
    
trainset, valset

([Example({'input': '还没有一个告警，是吧？照壁山回令。对。西北郭亚慧回令是吧你说。回的是网调口令操作单位照壁山变操作内容投入照壁山750PCS-92稳控装置2至木垒稳控装置二通道压板已执行完毕时间18点46分。站内两套稳控装置都运行正常，未未知告警，是吧？对是的。再见。好，我知道了，再见。', 'label': '故障'}) (input_keys={'input'}),
  Example({'input': '西北定值。你好甘肃马家斌一是给您汇报一下我们在11点44分甘肃新能源出力达到了两千五百二十四万千瓦创历史新高。然后20给您报一个完工，是那个青豫直流极二带电线路带电作业的。好，你这样，你先把这个创新高，还有包括上一次历史，极值，是时间，是这个先发个邮件吧。好的，收到。这个同步给国调汇报不需要。操作。是您，您那边汇报还是我们这边？国调说了。好的。你看你看一下那个网省三级协同汇报的那个要求，好吧，待会查一下这个看要不要给国调汇报一下。好的嗯。吉。2。完工是断线了，是吧，好吧，线路带电作业是吧？对对对是的。是吧？票号是。是1722。对是的。跟你核对一下内容。然后停电项目名称是青豫直流吉二线路带电作业，然后带电工作内容两项第一是。青豫直流极二线路1063号塔消除右中导线。成渝线夹螺栓松动缺陷二是青豫直流吉二线路。零六十三号塔消除左下导线悬垂线夹螺栓松动缺陷。这两个第一是青豫直流吉二线路，工作期间不退出。两端直流在启动功能二是线路故障情况下，不经联系，不能强送电。各专业初始意见都是同意，现在上述工作已完，工人员已撤离，自做安措已拆除现场无遗留物，具备报完工条件。好。确认一下，现在这个所有工作都结束了对是的。它这个。线路两端直流在启动功能也没有退出嘛，对吧？对没退市的。人员撤离自做安措拆除具备完工条件对是的。那你待令吧，我向国调申请。好的，嗯，再见。', 'label': '非故障'}) (input_keys={'input'}),
  Example({'input': '端午节。就是咸东一线是咋了？我们联络线，我看刚跳了，刚退出是吧，我们查一下。为什么不？应该是我们这推图，我看一下吧。好收到。尽快把故障信息核出来这个有有啥影响，没有有啥影响没有？嗯，你说。嗯，我核一下给您打过来吧。尽快核实。', 'label': '故障'}) (input

# 2 模型调参

## 2.1 定义 DSPy 的 Signature（输入输出结构）

In [7]:
import dspy
from dspy.teleprompt import BootstrapFewShotWithRandomSearch
from dspy.evaluate import Evaluate

In [8]:
class ClassificationSignature(dspy.Signature):
    """中文故障分类任务的输入输出定义"""
    input = dspy.InputField(desc="输入文本")
    label = dspy.OutputField(desc="分类标签，故障或非故障")

class FaultClassifier(dspy.Module):
    def __init__(self):
        super().__init__()
        self.predict = dspy.Predict(ClassificationSignature)

    def forward(self, input):
        return self.predict(input=input)

## 2.2 设置语言模型

In [9]:
from dotenv import load_dotenv

load_dotenv()  # 加载 .env 文件中的变量

True

In [10]:
# 设置语言模型
lm = dspy.LM('ollama_chat/qwen2.5:32b', api_base=os.getenv("OLLAMA_A800_API_BASE"), api_key='', cache=False)
dspy.configure(lm=lm)

model = FaultClassifier()

## 2.3 定义评估函数

In [11]:
def metric(gold, pred, trace=None):
    # 可以打印 trace 来调试提示词内容
    # print("Trace:", trace)
    return gold.label == pred.label

## 2.4 设置优化参数

In [12]:
config = dict(
    # max_rounds=5,                 
    max_labeled_demos=4,          # 每个样本最多使用 4 个标注示例
    max_bootstrapped_demos=4,     # 每个样本最多使用 4 个自举生成的示例
    num_candidate_programs=20,    # 随机搜索 ? 个候选程序
    num_threads=4                 # 并行线程数
)

teleprompter = BootstrapFewShotWithRandomSearch(metric=metric, **config)

Going to sample between 1 and 4 traces per predictor.
Will attempt to bootstrap 20 candidate sets.


## 2.5 使用优化器训练（优化提示词）

In [28]:
optimized_model = teleprompter.compile(model, trainset=trainset)

Average Metric: 21.00 / 34 (61.8%):   9%|▊         | 34/393 [00:23<02:03,  2.91it/s]


KeyboardInterrupt



# 2.6 在验证集上评估

In [15]:
evaluator = Evaluate(devset=valset, metric=metric, num_threads=4, display_progress=True)
score = evaluator(optimized_model)
print("验证集准确率:", score)

Average Metric: 78.00 / 99 (78.8%): 100%|██████████| 99/99 [00:51<00:00,  1.93it/s]

2025/07/22 14:33:33 INFO dspy.evaluate.evaluate: Average Metric: 78 / 99 (78.8%)



验证集准确率: 78.79


## 2.7 保存模型

In [16]:
# 保存模型到文件
model_path = "optimized_fault_classifier.json"
optimized_model.save(model_path)
print("模型已保存至:", model_path)

模型已保存至: optimized_fault_classifier.json


## 2.8 模型推理

In [17]:
# 实例化模型
loaded_model = FaultClassifier()

# 手动加载 JSON 文件，指定 encoding='utf-8'
model_path = "optimized_fault_classifier.json"

with open(model_path, 'r', encoding='utf-8') as f:
    state = json.load(f)

# 手动恢复模型状态
loaded_model.load_state(state)

print("模型已成功加载")

# 示例推理
result = loaded_model(input="设备无法启动，电源指示灯不亮。")
print(result)

模型已成功加载
Prediction(
    label='故障'
)


## 2.9 模型对比

In [18]:
from dspy.evaluate import Evaluate

# 创建评估器
evaluate = Evaluate(devset=valset, metric=metric, num_threads=5, display_progress=True)

# 评估优化前模块
print("优化前模块评分：")
evaluate(model)

# 评估优化后模块
print("优化后模块评分：")
evaluate(loaded_model)

优化前模块评分：
Average Metric: 66.00 / 99 (66.7%): 100%|██████████| 99/99 [2:11:31<00:00, 79.71s/it]   

2025/07/23 18:24:09 INFO dspy.evaluate.evaluate: Average Metric: 66 / 99 (66.7%)



优化后模块评分：
Average Metric: 77.00 / 99 (77.8%): 100%|██████████| 99/99 [25:01<00:00, 15.16s/it]

2025/07/23 18:49:10 INFO dspy.evaluate.evaluate: Average Metric: 77 / 99 (77.8%)





77.78