# Trainer
千帆Python SDK 在使用[resource API实现发起训练微调](./console-finetune.ipynb)之外，还提供了Trainer API，可以更方便地实现一体化的训练微调pipeline。同时提供了状态事件回调函数的注册，通过事件分发实现训练流程状态事件的监控。


本例将基于qianfan==0.2.1展示通过Dataset加载本地数据集，并上传到千帆平台，基于ERNIE-Bot-turbo进行fine-tune，直到最终完成服务发布，并最终实现服务调用的完整过程。

## 前置准备
- 初始化千帆安全认证AK、SK

In [1]:
import os 

os.environ["QIANFAN_ACCESS_KEY"] = "your_ak"
os.environ["QIANFAN_SECRET_KEY"] = "your_sk"

## 数据集加载

千帆SDK提供了数据集实现帮助我们可以快速的加载本地的数据集到内存，并通过设定DataSource数据源以保存至本地和千帆平台。

In [3]:
from qianfan.dataset import Dataset

# 加载本地数据集
ds: Dataset = Dataset.load(data_file="./data/fin_cqa_train.jsonl")
ds.list()

[[{'prompt': '下文中市场价格下降导致市场价格下降事件对应的结果涉及的产品是？在国际奶粉价格下降压力下,国内奶价仍有下降空间',
   'response': [['奶']]}],
 [{'prompt': '下文中市场价格下降导致市场价格下降事件对应的结果涉及的地区是？在国际奶粉价格下降压力下,国内奶价仍有下降空间',
   'response': [['国内']]}],
 [{'prompt': '下文中有哪些因果事件？随着供应缩减，需求回升，高碳铬铁价格企稳回升，预期短期内或仍维持偏强态势，但高碳铬铁产能依然偏向过剩，后期价格弹升空间有限',
   'response': [['需求增加导致市场价格提升']]}],
 [{'prompt': '下文中需求增加导致市场价格提升事件对应的原因涉及的产品是？随着供应缩减，需求回升，高碳铬铁价格企稳回升，预期短期内或仍维持偏强态势，但高碳铬铁产能依然偏向过剩，后期价格弹升空间有限',
   'response': [['高碳铬铁']]}],
 [{'prompt': '下文中需求增加导致市场价格提升事件对应的原因涉及的行业是？随着供应缩减，需求回升，高碳铬铁价格企稳回升，预期短期内或仍维持偏强态势，但高碳铬铁产能依然偏向过剩，后期价格弹升空间有限',
   'response': [['我们无法得知，可能需要更多内容说明。']]}],
 [{'prompt': '下文中需求增加导致市场价格提升事件对应的结果涉及的产品是？随着供应缩减，需求回升，高碳铬铁价格企稳回升，预期短期内或仍维持偏强态势，但高碳铬铁产能依然偏向过剩，后期价格弹升空间有限',
   'response': [['高碳铬铁']]}],
 [{'prompt': '下文中有哪些因果事件？但由于7-ACA市场价格大幅下跌，跌幅超过50%，导致公司相关原料药销售价格随之下跌',
   'response': [['市场价格下降导致市场价格下降']]}],
 [{'prompt': '下文中市场价格下降导致市场价格下降事件对应的原因涉及的产品是？但由于7-ACA市场价格大幅下跌，跌幅超过50%，导致公司相关原料药销售价格随之下跌',
   'response': [['7-ACA']]}],
 [{'prompt': '下文中市场价格下降导致市场价

从本地数据集上传到BOS

In [4]:
# 保存到千帆平台
from qianfan.dataset import QianfanDataSource
from qianfan.resources.console import consts as console_consts

bos_bucket_name = "bos_bucket"
bos_bucket_file_path = "/sdk_ds/"


# 创建千帆数据集，并上传保存
qianfan_data_source = QianfanDataSource.create_bare_dataset(
    name="sdk_trainer_ds",
    template_type=console_consts.DataTemplateType.NonSortedConversation,
    storage_type=console_consts.DataStorageType.PrivateBos,
    storage_id=bos_bucket_name,
    storage_path=bos_bucket_file_path,
)

ds.save(qianfan_data_source, replace_source=True)

True

In [2]:
from qianfan.trainer.consts import ActionState, ServiceType
from qianfan.resources.console import consts as console_consts
from qianfan.trainer.configs import TrainConfig, DeployConfig
from qianfan.resources import QfMessages
from qianfan.trainer import LLMFinetune
from typing import cast
from qianfan.utils import enable_log
import logging

enable_log(logging.DEBUG)

### LLMFinetune 训练
`LLMFinetune` 实现了SFT逻辑的trainer，它内部组装了SFT所需要的基本`Pipeline`, 用于串联数据->训练->模型发布->服务调用等步骤

In [23]:

trainer = LLMFinetune(
    train_type="ERNIE-Bot-turbo-0725",
    train_config=TrainConfig(
        epoch=1,
        learning_rate=0.0003,
        max_seq_len=4096,
        peft_type="LoRA",
    ),
    dataset=ds,
)

### 运行任务
同步运行trainer，训练直到模型发布完成

In [24]:
trainer.run()

[DEBUG] [12-05 13:32:21] base.py:208 [t:140605024622400]: action[BKBq28it8V] Preceding
[DEBUG] [12-05 13:32:21] base.py:208 [t:140605024622400]: action[7GfdGSwmxr] Preceding
[DEBUG] [12-05 13:32:21] actions.py:72 [t:140605024622400]: [load_dataset_action] prepare train-set


==>


[DEBUG] [12-05 13:32:21] actions.py:78 [t:140605024622400]: [load_dataset_action] dataset loaded successfully
[DEBUG] [12-05 13:32:21] base.py:212 [t:140605024622400]: action[7GfdGSwmxr] Done
[DEBUG] [12-05 13:32:21] base.py:208 [t:140605024622400]: action[cMLoerE1ib] Preceding
[DEBUG] [12-05 13:32:22] actions.py:239 [t:140605024622400]: [train_action] create fine-tune task: 17170
[DEBUG] [12-05 13:32:24] actions.py:263 [t:140605024622400]: [train_action] create fine-tune job_id: 8805
[DEBUG] [12-05 14:08:14] actions.py:275 [t:140605024622400]: [train_action] fine-tune job has ended: 8805 with status: FINISH
[DEBUG] [12-05 14:08:14] base.py:212 [t:140605024622400]: action[cMLoerE1ib] Done
[DEBUG] [12-05 14:08:14] base.py:208 [t:140605024622400]: action[5m0cdOcBtV] Preceding
[DEBUG] [12-05 14:08:16] actions.py:325 [t:140605024622400]: [model_publish_action] model: 17170_8805 has been published.
[DEBUG] [12-05 14:08:16] base.py:212 [t:140605024622400]: action[5m0cdOcBtV] Done
[DEBUG] [12

<qianfan.trainer.finetune.LLMFinetune at 0x7fe0bf3d2490>

In [35]:
trainer.result

[{'task_id': 17170,
  'job_id': 8805,
  'model_id': 9954,
  'model_version_id': 12344,
  'model': <qianfan.trainer.model.Model at 0x7fe0c507f9d0>}]

In [None]:
from qianfan.trainer.model import Model, Service, model_deploy
from qianfan.trainer.consts import ServiceType

m = trainer.result[0]["model"]
sft_svc: Service = m.deploy(DeployConfig(
    name="sdkcqa1",
    endpoint_prefix="sdkcqa1",
    replicas=1,
    pool_type=2,
    service_type=ServiceType.Chat,
))


In [None]:
from qianfan import ChatCompletion
### 使用Model & Service调用模型

problem="下文中有哪些因果事件？无取向硅钢广泛应用于铁芯等电机零部件其产量的持续提升导致市场竞争愈发激烈，价格进一步降低，从而有效减少新能源汽车驱动电机行业内企业的成本支出"

chat_comp: ChatCompletion = sft_svc.get_res()
print("sft", chat_comp.do({"messages": [{"content": problem, "role": "user"}]})["result"])



sft 供给增加导致市场价格下降

In [None]:
svc = ChatCompletion(model="ERNIE-Bot-turbo", service_type=ServiceType.Chat)
print("origin:", svc.exec({"messages": [{"content": problem, "role": "user"}]})["result"])



origin: 因果事件：

事件类型：应用
- 事件触发词：应用于
- 事件论元：
  	- 主体：无取向硅钢
  	- 客体：铁芯等电机零部件

事件类型：产量提升
- 事件触发词：持续提升

事件类型：竞争加剧
- 事件触发词：竞争愈发激烈
- 原因：无取向硅钢产量持续提升

事件类型：价格降低
- 事件触发词：降低
- 原因：市场竞争愈发激烈

事件类型：成本减少
- 事件触发词：减少
- 主体：新能源汽车驱动电机行业内企业
- 原因：价格进一步降低


## EventHandler

如果需要在训练过程中监控每个阶段的各个节点的状态，可以通过事件回调函数来实现，通过事件的对应的action_state可以获取当前的action的运行情况以实现对应的业务回调，插入自定义逻辑

In [None]:
from qianfan.trainer.event import Event, EventHandler

testset: Dataset = Dataset.load(data_file="./data/fin_cqa_test.jsonl")
# 定义自己的EventHandler，并实现dispatch方法
class InferAfterSFT(EventHandler):
    target_action: str
    def __init__(self, target_action: str) -> None:
        super().__init__()
        self.target_action = target_action

    def dispatch(self, event: Event) -> None:
        print("receive: <", event)
        if self.target_action == event.action_id and event.action_state == ActionState.Done:
            svc = cast(Service, event.data["service"])
            print("svc", svc)
            for row in testset.list():
                msgs = QfMessages()
                msgs.append(row[0][0]["prompt"], "user")
                svc.exec({"messages":"msgs"})
                print("row infer result", row)
            

eh = InferAfterSFT(target_action=trainer.ppls[0].id)
trainer.register_event_handler(eh)
trainer.run()