In [1]:
import warnings
warnings.filterwarnings('ignore')
import os
import random
import torch
import torch.nn as nn
import numpy as np
import gc
from datasets import load_dataset, Dataset
from transformers import (
    AutoModelForSequenceClassification,
    AutoTokenizer,
    TrainingArguments,
    Trainer,
    BitsAndBytesConfig,
)
from peft import LoraConfig, get_peft_model
from collections import Counter

from sklearn.utils.class_weight import compute_class_weight

In [2]:
loan_data_path = "example"
dataset = load_dataset(loan_data_path)

In [3]:
def preprocess_data(examples):
    examples["loan_data"] = examples.pop("text")
    examples["labels"] = int(examples.pop("label"))
    return examples

In [4]:
train_data = dataset["train"]
train_data = train_data.map(preprocess_data)
train_data = [data for data in train_data]

In [5]:
dataset = Dataset.from_list(train_data)

In [6]:
# 统计每个类别的样本数
labels = [example['labels'] for example in train_data]
label_counts = Counter(labels)
total_samples = len(labels)
num_classes = 2
counts = [label_counts[i] for i in range(num_classes)]
# 计算类别权重（未归一化）
class_weights = [total_samples / (num_classes * count) for count in counts]
class_weights = torch.tensor(class_weights, dtype=torch.float)

print(f"Unnormalized Class Weights: {class_weights}")

# 调整归一化方式
normalized_class_weights = class_weights / class_weights.sum() * num_classes
print(f"Sum-normalized Class Weights: {normalized_class_weights}")

majority_weight = 0.18
minority_weight = 22.5
custom_class_weights = torch.tensor([majority_weight, minority_weight], dtype=torch.float)
print(f"Custom Class Weights: {custom_class_weights}")

Unnormalized Class Weights: tensor([ 0.5029, 87.0589])
Sum-normalized Class Weights: tensor([0.0115, 1.9885])
Custom Class Weights: tensor([ 0.1800, 22.5000])


In [7]:
model_name = "model/Mistral-7B-Instruct-v0.3"
tokenizer = AutoTokenizer.from_pretrained(model_name)

In [8]:
if tokenizer.pad_token is None:
    tokenizer.add_special_tokens({'pad_token': '[PAD]'})

In [9]:
# 8-bit量化
quantization_config = BitsAndBytesConfig(
    load_in_8bit=True,
    llm_int8_threshold=6.0,
)

In [10]:
model = AutoModelForSequenceClassification.from_pretrained(
    model_name,
    quantization_config=quantization_config,
    device_map="auto",
    num_labels=2,
)

This can be used to load a bitsandbytes version that is different from the PyTorch CUDA version.
If this was unintended set the BNB_CUDA_VERSION variable to an empty string: export BNB_CUDA_VERSION=
If you use the manual override make sure the right libcudart.so is in your LD_LIBRARY_PATH
For example by adding the following to your .bashrc: export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:<path_to_cuda_dir/lib64



Loading checkpoint shards:   0%|          | 0/3 [00:00<?, ?it/s]

KeyboardInterrupt: 

In [None]:
model.resize_token_embeddings(len(tokenizer))

In [None]:
# pad_token_id
model.config.pad_token_id = tokenizer.pad_token_id

In [None]:
# LoRA
lora_config = LoraConfig(
    r=8,
    lora_alpha=8,
    target_modules = [
        "q_proj",
        "k_proj",
        "v_proj",
        "o_proj",
        "gate_proj",
        "up_proj",
        "down_proj",
    ],
    lora_dropout = 0.05,
    bias = "none",
    use_rslora = False,
    loftq_config = None,
    task_type="SEQ_CLS",
)
model = get_peft_model(model, lora_config)

In [None]:
model.gradient_checkpointing_enable()

In [None]:
def tokenize_function(examples):
    return tokenizer(
        examples["loan_data"],
        padding="max_length",
        truncation=True,
        max_length=256,
    )

In [None]:
tokenized_datasets = dataset.map(tokenize_function, batched=True)

In [None]:
tokenized_datasets.set_format(type='torch', columns=['input_ids', 'attention_mask', 'labels'])

In [None]:
# 损失函数
def get_model_attr(model, attr):
    return getattr(model.module, attr) if hasattr(model, 'module') else getattr(model, attr)

In [None]:
class FocalLoss(nn.Module):
    def __init__(self, alpha=custom_class_weights, gamma=2):
        super(FocalLoss, self).__init__()
        self.alpha = alpha
        self.gamma = gamma
        self.ce_loss = nn.CrossEntropyLoss(weight=self.alpha, reduction='none')
    
    def forward(self, logits, labels):
        ce_loss = self.ce_loss(logits, labels)
        pt = torch.exp(-ce_loss)
        focal_loss = (self.alpha[labels] * ((1 - pt) ** self.gamma) * ce_loss).mean()
        return focal_loss

In [None]:
class WeightedLossTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False):
        labels = inputs.get("labels")
        outputs = model(**inputs)
        logits = outputs.get("logits")
        device = logits.device
        '''
        Focal Loss or CrossEntropyLoss
        '''
        loss_fct = FocalLoss(alpha=custom_class_weights.to(device))
        # loss_fct = nn.CrossEntropyLoss(weight=custom_class_weights.to(device))
        num_labels = get_model_attr(model, 'config').num_labels
        loss = loss_fct(logits.view(-1, num_labels), labels.view(-1))
        return (loss, outputs) if return_outputs else loss

In [None]:
# 评估指标
from sklearn.metrics import classification_report, roc_auc_score, average_precision_score

def compute_metrics(p):
    preds = np.argmax(p.predictions, axis=1)
    labels = p.label_ids
    report = classification_report(labels, preds, output_dict=True)
    auc_roc = roc_auc_score(labels, p.predictions[:,1])
    auc_pr = average_precision_score(labels, p.predictions[:,1])
    return {
        'accuracy': report['accuracy'],
        'f1': report['1']['f1-score'],
        'auc_roc': auc_roc,
        'auc_pr': auc_pr
    }

In [None]:
training_args = TrainingArguments(
    output_dir="outputs/mistral-7b-instruct-v0.3-0926(fc_new)",
    num_train_epochs=3,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    gradient_accumulation_steps=1,
    warmup_steps=5,
    learning_rate=2e-4,
    optim="adamw_8bit",
    lr_scheduler_type="linear",
    weight_decay=0.01,
    seed=11,
    logging_dir='./logs',
    logging_strategy='steps',
    logging_steps=10,
    save_strategy="steps",
    save_steps=5000,
    evaluation_strategy="no",
    report_to=[],
    fp16 = not torch.cuda.is_bf16_supported(),
    bf16 = torch.cuda.is_bf16_supported(),
)

In [None]:
trainer = WeightedLossTrainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_datasets,
    tokenizer=tokenizer,
    data_collator=None,
    compute_metrics=compute_metrics,
)

In [None]:
trainer.train()