# LoRA 流水线（Test4）

按顺序运行各部分以复现整个流水线。


In [None]:

# 安装依赖（如有需要运行一次）
# !pip -q install transformers==4.53.0 datasets==3.6.0 peft==0.15.2 #     accelerate>=0.33.0 torch==2.7.1 numpy==2.3.1 pandas==2.2.2 #     evaluate==0.4.0 nltk rouge-score sentence-transformers

## 1. 配置


In [None]:

STOCK_CODES = [
    "600000",
    "002142",
    "600036",
    "600926",
    "000001",
    "601398",
    "601998",
    "601328",
]
SUMMARY_DAYS = 90

## 2. 数据获取

`EastMoneyAPI` 用于下载所设定股票的 K 线数据。


In [None]:

import datetime
import requests
import pandas as pd

class EastMoneyAPI:
    """用于获取东财 K 线数据的简单封装。"""
    KLINE_URL = "https://push2his.eastmoney.com/api/qt/stock/kline/get"

    def __init__(self) -> None:
        self.session = requests.Session()

    def _secid(self, code: str) -> str:
        return f"{'1' if code.startswith('6') else '0'}.{code}"

    def get_kline_data(self, stock_code: str, klt: int = 101, num: int = 1000) -> pd.DataFrame | None:
        params = {
            "secid": self._secid(stock_code),
            "klt": klt,
            "fqt": 0,
            "lmt": num,
            "end": "20500000",
            "beg": "0",
            "fields1": "f1,f2,f3,f4,f5,f6",
            "fields2": "f51,f52,f53,f54,f55,f56,f57,f58",
        }
        try:
            r = self.session.get(self.KLINE_URL, params=params, timeout=8)
            r.raise_for_status()
            js = r.json()
            klines = js["data"]["klines"]
            records = []
            for line in klines:
                d = line.split(",")
                records.append(
                    {
                        "date": d[0],
                        "open": float(d[1]),
                        "close": float(d[2]),
                        "high": float(d[3]),
                        "low": float(d[4]),
                        "volume": float(d[5]),
                    }
                )
            df = pd.DataFrame(records)
            df.sort_values("date", inplace=True)
            df.reset_index(drop=True, inplace=True)
            return df
        except Exception as e:
            print(f"[EastMoneyAPI] {stock_code} 获取失败: {e}")
            return None


def get_recent_data(days: int = SUMMARY_DAYS) -> dict[str, pd.DataFrame]:
    api = EastMoneyAPI()
    today = datetime.date.today()
    start_date = today - datetime.timedelta(days=days + 10)
    stock_data = {}
    for code in STOCK_CODES:
        df = api.get_kline_data(code, num=1000)
        if df is not None:
            df = df[df["date"] >= start_date.strftime("%Y-%m-%d")]
            stock_data[code] = df
    return stock_data

## 3. 数据集构建

根据 K 线数据生成滑动窗口样本，并为教师模型格式化提示。


In [None]:

import json, random
from typing import Any, Sequence, Tuple, Optional


def _fetch_kline(code: str, days: int):
    api = EastMoneyAPI()
    df = api.get_kline_data(code, num=days)
    if df is None:
        return None
    return df.tail(days).reset_index(drop=True)


def _sample_windows(df, window: int, num: int, rng: random.Random):
    if len(df) < window:
        return []
    return [df.iloc[i : i + window].reset_index(drop=True) for i in range(len(df) - window + 1)]


def _make_prompt(window) -> dict[str, Any]:
    change = ((window["close"].iloc[-1] / window["close"].iloc[0]) - 1) * 100
    summary = window.to_dict(orient="records")
    return {
        "change": round(change, 2),
        "prediction": "",
        "analysis": "",
        "advice": "",
        "kline_summary": summary,
    }


def format_prompt(sample: dict[str, Any]) -> str:
    summary = json.dumps(sample["kline_summary"], ensure_ascii=False)
    return (
        f"股票 {sample['stock_code']} 近30日K线数据: {summary}
"
        f"涨跌幅: {sample['change']}%。请预测后市走势，"
        "给出简短分析和操作建议，"
        "并以 JSON 格式回复，包括 'prediction', 'analysis', 'advice' 三个字段。"
    )


def _trim_sample_tokens(sample: dict[str, Any], tokenizer, max_tokens: int) -> None:
    if tokenizer is None:
        return
    while len(sample["kline_summary"]) > 1:
        text = format_prompt(sample)
        if len(tokenizer(text, add_special_tokens=False)["input_ids"]) <= max_tokens:
            break
        sample["kline_summary"].pop(0)


def build_dataset(
    stock_codes: Sequence[str],
    *,
    days: int = 180,
    window: int = 30,
    windows_per_stock: int = 1,
    val_ratio: float = 0.2,
    seed: int | None = None,
    tokenizer=None,
    max_tokens: int = 1024,
    balance_classes: bool = True,
) -> Tuple[list[dict[str, Any]], list[dict[str, Any]]]:
    codes = list(stock_codes)
    rng = random.Random(seed)
    up_samples: list[dict[str, Any]] = []
    down_samples: list[dict[str, Any]] = []
    stable_samples: list[dict[str, Any]] = []

    import pandas as pd
    import numpy as np

    for code in codes:
        df = _fetch_kline(code, days)
        if df is None or df.empty:
            continue
        df["pct_chg"] = df["close"].pct_change() * 100
        df["pct_chg"] = df["pct_chg"].fillna(0)
        df["MA5"] = df["close"].rolling(5).mean()
        df["MA10"] = df["close"].rolling(10).mean()
        differences = df["close"].diff()
        gains = differences.clip(lower=0)
        losses = -differences.clip(upper=0)
        avg_gain = gains.ewm(alpha=1 / 14, adjust=False).mean()
        avg_loss = losses.ewm(alpha=1 / 14, adjust=False).mean()
        rs = avg_gain / avg_loss
        rs_values = rs.to_numpy()
        avg_gain_values = avg_gain.to_numpy()
        avg_loss_values = avg_loss.to_numpy()
        rsi = np.where(
            avg_loss_values == 0,
            np.where(avg_gain_values == 0, 50, 100),
            100 - 100 / (1 + rs_values),
        )
        df["RSI14"] = rsi
        ema12 = df["close"].ewm(span=12, adjust=False).mean()
        ema26 = df["close"].ewm(span=26, adjust=False).mean()
        df["MACD"] = ema12 - ema26
        df["MA5"] = df["MA5"].round(2)
        df["MA10"] = df["MA10"].round(2)
        df["RSI14"] = df["RSI14"].round(2)
        df["MACD"] = df["MACD"].round(2)
        n = len(df)
        if n < window:
            continue
        windows_cat: list[tuple[str, dict[str, Any]]] = []
        for i in range(n - window + 1):
            if (
                df["volume"].iloc[i : i + window].eq(0).any()
                or df["pct_chg"].iloc[i : i + window].abs().gt(20).any()
            ):
                continue
            win = df.iloc[i : i + window][
                [
                    "date",
                    "open",
                    "close",
                    "high",
                    "low",
                    "volume",
                    "MA5",
                    "MA10",
                    "RSI14",
                    "MACD",
                ]
            ].reset_index(drop=True)
            change_percent = ((win["close"].iloc[-1] / win["close"].iloc[0]) - 1) * 100
            if change_percent > 3:
                category = "up"
            elif change_percent < -3:
                category = "down"
            else:
                category = "stable"
            prompt = _make_prompt(win)
            prompt["stock_code"] = code
            windows_cat.append((category, prompt))

        rng.shuffle(windows_cat)
        for category, prompt in windows_cat[:windows_per_stock]:
            if tokenizer:
                text = format_prompt(prompt)
                while (
                    len(tokenizer(text, add_special_tokens=False)["input_ids"]) > max_tokens
                    and prompt["kline_summary"]
                ):
                    prompt["kline_summary"].pop(0)
                    text = format_prompt(prompt)
            if category == "up":
                up_samples.append(prompt)
            elif category == "down":
                down_samples.append(prompt)
            else:
                stable_samples.append(prompt)
    if balance_classes and up_samples and down_samples and stable_samples:
        min_count = min(len(up_samples), len(down_samples), len(stable_samples))
        rng.shuffle(up_samples)
        rng.shuffle(down_samples)
        rng.shuffle(stable_samples)
        up_samples = up_samples[:min_count]
        down_samples = down_samples[:min_count]
        stable_samples = stable_samples[:min_count]
    samples = up_samples + down_samples + stable_samples
    rng.shuffle(samples)
    split = int(len(samples) * (1 - val_ratio))
    return samples[:split], samples[split:]

## 4. 教师标注

将提示发送到远程教师模型（DeepSeek-R1）。如果未设置 `ARK_API_KEY`，调用将返回占位符。清洗后的标签会与提示一起保存。


In [None]:

import os, json
from pathlib import Path

ARK_API_KEY = os.getenv("ARK_API_KEY", "")


def call_teacher(prompt: str) -> dict[str, str]:
    if not ARK_API_KEY:
        return {"content": "[缺少 ARK_API_KEY]", "reasoning": ""}
    try:
        from volcenginesdkarkruntime import Ark
        client = Ark(api_key=ARK_API_KEY)
        resp = client.chat.completions.create(
            model="deepseek-r1-250528",
            messages=[{"role": "user", "content": prompt}],
        )
        msg = resp.choices[0].message
        content = msg.content.strip()
        reasoning = getattr(msg, "reasoning_content", "").strip()
        return {"content": content, "reasoning": reasoning}
    except Exception as e:
        return {"content": f"[教师模型错误: {e}]", "reasoning": ""}


def label_samples(prompts, output_file="labeled_data.jsonl"):
    path = Path(output_file)
    labeled = []
    with path.open("w", encoding="utf-8") as f:
        for prompt in prompts:
            ans = call_teacher(prompt)
            answer = ans.get("content", ans) if isinstance(ans, dict) else ans
            try:
                label = json.loads(answer)
            except (TypeError, json.JSONDecodeError):
                label = {"raw": answer}
            if not isinstance(label, dict):
                label = {"raw": str(label)}
            label["prediction"] = str(label.get("prediction", ""))
            label["analysis"] = str(label.get("analysis", ""))
            label["advice"] = str(label.get("advice", ""))
            if isinstance(ans, dict) and ans.get("reasoning"):
                label["reasoning"] = str(ans["reasoning"])
            record = {"prompt": prompt, "label": label}
            labeled.append(record)
            f.write(json.dumps(record, ensure_ascii=False) + "
")
    return labeled

## 5. 数据清洗

移除无效记录并确保标签为 JSON 字符串。


In [None]:

from collections import Counter

def normalize_label(lab):
    if isinstance(lab, str) and lab.strip():
        return lab.strip()
    if isinstance(lab, dict):
        lab["prediction"] = str(lab.get("prediction", ""))
        lab["analysis"] = str(lab.get("analysis", ""))
        lab["advice"] = str(lab.get("advice", ""))
        if any(lab[k] for k in ("prediction", "analysis", "advice")):
            return json.dumps(lab, ensure_ascii=False, sort_keys=True)
    return None


def clean_jsonl(inp: str, out: str) -> None:
    stats = Counter()
    good = []
    for raw in Path(inp).read_text(encoding="utf-8").splitlines():
        if not raw.strip():
            stats["empty_line"] += 1
            continue
        try:
            rec = json.loads(raw)
        except json.JSONDecodeError:
            stats["invalid_json"] += 1
            continue
        prompt = rec.get("prompt", "").strip()
        label = normalize_label(rec.get("label"))
        if prompt and label:
            good.append(json.dumps({"prompt": prompt, "label": label}, ensure_ascii=False))
            stats["kept"] += 1
        else:
            stats["bad_schema"] += 1
    Path(out).write_text("
".join(good) + ("
" if good else ""), "utf-8")
    print(f"[clean_jsonl] 写入 {stats['kept']} 条有效样本 → {out}")

## 6. LoRA 微调

在已标注的数据集上使用 LoRA 适配器微调基础模型。


In [None]:

import inspect
from dataclasses import dataclass
import torch
import torch.optim.lr_scheduler as lr_sched
from datasets import Dataset
from peft import LoraConfig, get_peft_model, prepare_model_for_kbit_training
from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer,
    BitsAndBytesConfig,
    TrainingArguments,
    Trainer,
)

if not hasattr(lr_sched, "LRScheduler"):
    lr_sched.LRScheduler = lr_sched._LRScheduler

@dataclass
class TrainConfig:
    base_model: str = "Qwen/Qwen1.5-7B"
    data_path: str = "labeled_data.jsonl"
    eval_path: str | None = None
    output_dir: str = "lora_adapter"
    batch_size: int = 1
    lr: float = 2e-4
    epochs: int | None = 1
    max_steps: int | None = None
    grad_accum: int = 4
    max_len: int = 4096
    rope_factor: float = 1.0

IGNORE_INDEX = -100

def _load_dataset(path: str) -> Dataset:
    recs = []
    with open(path, encoding="utf-8") as f:
        for line in f:
            rec = json.loads(line)
            prompt = rec["prompt"].strip()
            label = rec["label"]
            if isinstance(label, dict):
                label = json.dumps(label, ensure_ascii=False, sort_keys=True)
            else:
                label = str(label).strip()
            recs.append({"prompt": prompt, "label": label})
    return Dataset.from_list(recs)

class LabelCollator:
    def __init__(self, tokenizer, max_len: int = 1024):
        self.tok = tokenizer
        self.max_len = max_len
        if self.tok.pad_token is None:
            self.tok.pad_token = self.tok.eos_token

    def __call__(self, batch):
        texts, prompt_lens = [], []
        for rec in batch:
            prompt = rec["prompt"].strip()
            label = rec["label"].strip()
            full = f"{prompt}

### 答案：{label}"
            plen = len(self.tok(prompt + "

### 答案：")["input_ids"])
            prompt_lens.append(plen)
            texts.append(full)
        enc = self.tok(texts, padding="longest", truncation=True, max_length=self.max_len, return_tensors="pt")
        labels = enc["input_ids"].clone()
        labels[:, :] = IGNORE_INDEX
        for i, plen in enumerate(prompt_lens):
            labels[i, plen : enc["input_ids"].size(1)] = enc["input_ids"][i, plen:]
        if (labels != IGNORE_INDEX).sum() == 0:
            raise ValueError("全部 label 被截掉；请缩短 prompt 或增大 --max-len")
        enc["labels"] = labels
        return enc

def train_lora(cfg: TrainConfig) -> None:
    train_ds = _load_dataset(cfg.data_path)
    eval_ds = _load_dataset(cfg.eval_path) if cfg.eval_path else None
    tokenizer = AutoTokenizer.from_pretrained(cfg.base_model, trust_remote_code=True)
    bnb = BitsAndBytesConfig(
        load_in_4bit=True,
        bnb_4bit_quant_type="nf4",
        bnb_4bit_use_double_quant=True,
        bnb_4bit_compute_dtype=torch.float16,
    )
    model = AutoModelForCausalLM.from_pretrained(
        cfg.base_model,
        device_map="auto",
        quantization_config=bnb,
        trust_remote_code=True,
    )
    model.config.use_cache = False
    if cfg.rope_factor and cfg.rope_factor != 1.0:
        model.config.rope_scaling = {"type": "linear", "factor": cfg.rope_factor}
        base_pos = getattr(model.config, "max_position_embeddings", 2048)
        model.config.max_position_embeddings = int(base_pos * cfg.rope_factor)
    lora_cfg = LoraConfig(
        r=8,
        lora_alpha=32,
        lora_dropout=0.05,
        target_modules=["q_proj","k_proj","v_proj","o_proj","gate_proj","up_proj","down_proj"],
    )
    args_kwargs = dict(
        output_dir=cfg.output_dir,
        per_device_train_batch_size=cfg.batch_size,
        gradient_accumulation_steps=cfg.grad_accum,
        num_train_epochs=cfg.epochs or 1,
        max_steps=cfg.max_steps or -1,
        learning_rate=cfg.lr,
        logging_steps=1,
        remove_unused_columns=False,
    )
    sig = inspect.signature(TrainingArguments.__init__)
    if "evaluation_strategy" in sig.parameters:
        args_kwargs["evaluation_strategy"] = "epoch" if eval_ds else "no"
    if "save_strategy" in sig.parameters:
        args_kwargs["save_strategy"] = "epoch"
    args = TrainingArguments(**args_kwargs)
    model = prepare_model_for_kbit_training(model)
    model = get_peft_model(model, lora_cfg)
    collator = LabelCollator(tokenizer, cfg.max_len)
    trainer = Trainer(model=model, args=args, train_dataset=train_ds, eval_dataset=eval_ds, data_collator=collator)
    trainer.train()
    model.save_pretrained(cfg.output_dir)
    tokenizer.save_pretrained(cfg.output_dir)

## 7. 评估

计算参考答案与模型预测之间的 BLEU、ROUGE-L 和向量嵌入相似度。


In [None]:

from nltk.translate.bleu_score import SmoothingFunction, sentence_bleu
from rouge_score import rouge_scorer
from sentence_transformers import SentenceTransformer, util

from transformers import AutoTokenizer, AutoModelForCausalLM


def load_dataset(path: str) -> tuple[list[str], list[str]]:
    prompts, refs = [], []
    with open(path, "r", encoding="utf-8") as f:
        for line in f:
            rec = json.loads(line)
            prompts.append(rec["prompt"])
            label = rec.get("label", "")
            if isinstance(label, dict):
                refs.append(json.dumps(label, ensure_ascii=False, sort_keys=True))
            else:
                refs.append(str(label))
    return prompts, refs


def load_student(model_name: str):
    tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
    model = AutoModelForCausalLM.from_pretrained(model_name, device_map="auto", trust_remote_code=True)
    return tokenizer, model.eval()


def call_student(tokenizer, model, prompt: str, max_new_tokens: int = 256, max_length: int = 4096) -> str:
    prompt = prompt.rstrip() + "

### 答案："
    inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
    out = model.generate(
        **inputs,
        max_new_tokens=max_new_tokens,
        max_length=max_length,
        do_sample=False,
        pad_token_id=tokenizer.eos_token_id,
        use_cache=True,
    )
    new_tokens = out[0][inputs["input_ids"].size(1) :]
    return tokenizer.decode(new_tokens, skip_special_tokens=True).strip()


def generate_predictions(model_name: str, prompts):
    tokenizer, model = load_student(model_name)
    preds = []
    for p in prompts:
        preds.append(call_student(tokenizer, model, p))
    return preds


def bleu_score(references, predictions):
    smooth = SmoothingFunction().method4
    scores = []
    for ref, pred in zip(references, predictions):
        scores.append(sentence_bleu([ref.split()], pred.split(), smoothing_function=smooth))
    return sum(scores) / len(scores)


def rouge_l(references, predictions):
    scorer = rouge_scorer.RougeScorer(["rougeL"], use_stemmer=True)
    scores = []
    for ref, pred in zip(references, predictions):
        result = scorer.score(ref, pred)
        scores.append(result["rougeL"].fmeasure)
    return sum(scores) / len(scores)


def embedding_similarity(references, predictions):
    model = SentenceTransformer("all-MiniLM-L6-v2")
    sims = []
    for ref, pred in zip(references, predictions):
        ref_emb = model.encode(ref, convert_to_tensor=True)
        pred_emb = model.encode(pred, convert_to_tensor=True)
        sims.append(util.cos_sim(ref_emb, pred_emb).item())
    return sum(sims) / len(sims)


def evaluate_model(model_name: str, prompts, refs):
    preds = generate_predictions(model_name, prompts)
    return {
        "bleu": bleu_score(refs, preds),
        "rougeL": rouge_l(refs, preds),
        "embed": embedding_similarity(refs, preds),
    }

## 8. 运行流水线

以下单元将所有步骤串联：
1. 构建训练集和验证集。
2. 使用教师模型进行标注（如果提供 API key）。
3. 清理 JSONL 文件。
4. 使用 LoRA 微调学生模型。
5. 在验证集上进行评估。


In [None]:

# 参数
windows = 1          # 每只股票的样本数
val_ratio = 0.2      # 验证集比例
max_tokens = 1024    # 提示的 token 上限
max_len = 1024       # 训练序列长度
output_dir = "lora_adapter"  # 保存适配器的位置

# 1) 构建数据集
train_set, val_set = build_dataset(STOCK_CODES, days=180, window=30,
                                   windows_per_stock=windows, val_ratio=val_ratio,
                                   tokenizer=None, max_tokens=max_tokens)
train_prompts = [format_prompt(s) for s in train_set]
val_prompts = [format_prompt(s) for s in val_set]

# 2) 教师模型标注
label_samples(train_prompts, "labeled_data.jsonl")
if val_prompts:
    label_samples(val_prompts, "val_labeled_data.jsonl")

# 3) 清理 JSONL
clean_jsonl("labeled_data.jsonl", "cleaned_labeled_data.jsonl")
if val_prompts:
    clean_jsonl("val_labeled_data.jsonl", "cleaned_val_labeled_data.jsonl")

# 4) LoRA 训练
cfg = TrainConfig(data_path="cleaned_labeled_data.jsonl",
                  eval_path="cleaned_val_labeled_data.jsonl" if val_prompts else None,
                  output_dir=output_dir,
                  max_steps=200,
                  max_len=max_len)
train_lora(cfg)

# 5) 评估
prompts, refs = load_dataset("cleaned_val_labeled_data.jsonl" if val_prompts else "cleaned_labeled_data.jsonl")
metrics = evaluate_model(output_dir, prompts, refs)
print("验证指标:
", json.dumps(metrics, indent=2, ensure_ascii=False))