## 难度预估

In [1]:
import torch
from torch import nn
import numpy as np
from transformers.modeling_outputs import ModelOutput
from transformers import BertModel, TrainingArguments, Trainer, PretrainedConfig
import torch.nn.functional as F
from sklearn.metrics import ndcg_score, mean_squared_error
from  torchmetrics import MeanAbsoluteError, PearsonCorrCoef, SpearmanCorrCoef
import os
from EduNLP.Pretrain import BertTokenizer 
from EduNLP.ModelZoo.base_model import BaseModel
import json
from utils import Dataset_bert, load_json, get_val, get_train

ROOT = os.path.dirname(os.path.dirname(__file__))
DATA_DIR = os.path.join(ROOT, "data")
os.environ["CUDA_VISIBLE_DEVICES"]= "0"

In [2]:
MAE = MeanAbsoluteError()
PCC = PearsonCorrCoef()
SCC = SpearmanCorrCoef()



### 加载数据，定义预训练模型路径

In [4]:
output_dir = "output/difficulty" #设置模型保存路径
pretrained_model_dir = os.path.join(DATA_DIR, "bert_math_768") #预训练的bert路径，也可以更换为其他模型的路径，如disenqnet, roberta等
train_data = load_json(os.path.join(DATA_DIR, "train", "高中数学.json")) #加载训练集
train_items = get_train(train_data)
val_data = load_json(os.path.join(DATA_DIR, "test", "高中数学paper.json")) #加载测试集
val_items, val_gap = get_val(val_data)

[load_json] start : /data/shangzi/edunlp/gaokao-prediction/data/train/高中数学.json
[load_json] num = 3600, open_path = /data/shangzi/edunlp/gaokao-prediction/data/train/高中数学.json
[load_json] start : /data/shangzi/edunlp/gaokao-prediction/data/test/高中数学paper.json
[load_json] num = 7, open_path = /data/shangzi/edunlp/gaokao-prediction/data/test/高中数学paper.json


### 定义网络结构

In [3]:
class DifficultyPredictionOutput(ModelOutput):
    loss: torch.FloatTensor = None
    logits: torch.FloatTensor = None
    labels: torch.FloatTensor = None

class BertForDifficultyPrediction(BaseModel):
    
    def __init__(self, pretrained_model_dir=None, classifier_dropout=0.5):
        super(BertForDifficultyPrediction, self).__init__() 
        self.bert = BertModel.from_pretrained(pretrained_model_dir, ignore_mismatched_sizes=True)
        hidden_size = self.bert.config.hidden_size
        self.classifier_dropout = classifier_dropout
        self.dropout = nn.Dropout(classifier_dropout)
        self.classifier = nn.Linear(hidden_size, 1)
        self.sigmoid = nn.Sigmoid()

        self.config = {k: v for k, v in locals().items() if k not in ["self", "__class__"]}
        self.config['architecture'] = 'BertForDifficultyPrediction'
        self.config = PretrainedConfig.from_dict(self.config)

    def forward(self,
                input_ids=None,
                attention_mask=None,
                token_type_ids=None,
                content=None,
                labels=None,
                ):
        input_ids = content['input_ids']
        attention_mask = content['attention_mask']
        token_type_ids = content['token_type_ids']
        item_embed = self.bert(input_ids=input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)['last_hidden_state'][:, 0, :]
        logits = self.sigmoid(self.classifier(item_embed))
        loss = F.mse_loss(logits.squeeze(0), labels)
        return DifficultyPredictionOutput(
            loss = loss,
            logits = logits,
            labels = labels
        )
    
    @classmethod
    def from_config(cls, config_path, *args, **kwargs):
        with open(config_path, "r", encoding="utf-8") as rf:
            model_config = json.load(rf)
            model_config.update(kwargs)
            return cls(
                pretrained_model_dir=model_config['pretrained_model_dir'],
                classifier_dropout=model_config.get("classifier_dropout", 0.5),             
            )

### 定义评价指标

In [None]:
def compute_metrics(pred):
    logits = torch.as_tensor(pred.predictions[0]).squeeze(0)
    logits = logits.view([logits.size()[0]],-1)
    labels = torch.as_tensor(pred.label_ids)
    print("logits", logits)
    print("labels", labels)
    pres = logits.numpy().tolist()
    golds = labels.numpy().tolist()
    ret = {
        "mae": MAE(logits, labels),
        "mse": mean_squared_error(golds,  pres),
        "rmse": np.sqrt(mean_squared_error(golds,  pres)),
        "pcc": PCC(logits, labels),
        "scc": SCC(logits, labels),
        'ndcg': testdata_metrics(val_gap, golds, pres).tolist(),
    }
    return ret
def testdata_metrics(val_gap, diff, pred):
    diff, pred = np.array(diff), np.array(pred)
    idx = np.where(diff>0)[0]
    ndcg = []
    for s, e in val_gap:
        _diff, _pred = diff[s:e], pred[s:e]
        if _diff[0]==-1:
            _diff = [i+1 for i in range(len(_diff))]
        ndcg.append([ndcg_score([_diff], [_pred]), ndcg_score([_diff], [_pred], k=10), ndcg_score([_diff], [_pred], k=20), ndcg_score([_diff], [_pred], k=30)])
    ndcg = np.mean(ndcg, axis=0)
    return ndcg

### 定义训练和测试相关参数

In [5]:
class MyTrainer(Trainer):
    pass

def train_diff_pred(
                        output_dir,
                        pretrained_model_dir,
                        train_items=None,
                        val_items=None,
                        train_params=None):
    tokenizer = BertTokenizer.from_pretrained(pretrained_model_dir)
    model = BertForDifficultyPrediction(pretrained_model_dir=pretrained_model_dir)
    model.bert.resize_token_embeddings(len(tokenizer.bert_tokenizer))
    # training parameters
    if train_params is not None:
        epochs = train_params['epochs'] if 'epochs' in train_params else 1
        batch_size = train_params['batch_size'] if 'batch_size' in train_params else 64
        save_steps = train_params['save_steps'] if 'save_steps' in train_params else 100
        save_total_limit = train_params['save_total_limit'] if 'save_total_limit' in train_params else 2
        logging_steps = train_params['logging_steps'] if 'logging_steps' in train_params else 5
        gradient_accumulation_steps = train_params['gradient_accumulation_steps'] \
            if 'gradient_accumulation_steps' in train_params else 1
        logging_dir = train_params['logging_dir'] if 'logging_dir' in train_params else f"{ROOT}/log"
    else:
        # default
        epochs = 50
        batch_size = 1
        save_steps = 1000
        save_total_limit = 2
        logging_steps = 100
        gradient_accumulation_steps = 1
        logging_dir = f"{ROOT}/log"


    train_dataset = Dataset_bert(train_items, tokenizer)
    eval_dataset = Dataset_bert(val_items, tokenizer)

    training_args = TrainingArguments(
        output_dir=output_dir,
        overwrite_output_dir=True,

        num_train_epochs=epochs,
        per_device_train_batch_size=batch_size,
        per_device_eval_batch_size=batch_size,
        evaluation_strategy = "steps", 
        eval_steps=logging_steps*5,
        
        save_steps=save_steps,
        save_total_limit=save_total_limit,
        
        logging_steps=logging_steps,
        logging_dir=logging_dir,

        gradient_accumulation_steps=gradient_accumulation_steps,
        learning_rate=5e-5,
    )

    trainer = MyTrainer(
        model=model,
        args=training_args,
        data_collator=train_dataset.collate_fn,
        train_dataset=train_dataset,
        eval_dataset=eval_dataset,
        compute_metrics=compute_metrics
    )

    trainer.train()      #训练模型
    trainer.save_model(output_dir)
    trainer.model.save_config(output_dir)
    tokenizer.save_pretrained(output_dir)    #保存训练后的模型
    '''
    如果希望直接加载训练好的模型用于测试，可以将"pretrained_model_dir"设为训练好的模型的路径，然后直接使用trainer.evaluate()在测试集上评估
    '''
    trainer.evaluate()   #在测试集上评估

### 训练和测试

In [None]:
train_diff_pred(
        output_dir,
        pretrained_model_dir,
        train_items=train_items,
        val_items=val_items,
        train_params= None
    )