## 实验一：比较各个模型的标注效果

评测数据集：

* TNEWS：新闻标题分类，15个类别

对比模型：

* ChatGLM2-6B
* Qwen-7B-Chat
* Siamase-UniNLU-base
* paddNLP-zsl分类模型

In [None]:
from paddlenlp import Taskflow
from modelscope.utils.constant import Tasks
from modelscope import Model
from modelscope.pipelines import pipeline
from modelscope import AutoModelForCausalLM, AutoTokenizer
from modelscope import GenerationConfig

In [2]:
# 实验设置
MODEL_NAME = 'chatglm2-6b' # [chatglm2-6b, qwen-7b-chat, siamese_uninlu, paddle_nlp]
DATASET = 'tnews' # [tnews]

In [4]:
# 读取数据集
import json

train, val, test = [], [], []

if DATASET == 'tnews':
    with open('dataset/tnews/train.json', 'r', encoding='utf-8') as f:
        for line in f:
            line = json.loads(line.strip())
            label = line['label']
            text = line['sentence']
            train.append([text, label])
            
    with open('dataset/tnews/dev.json', 'r', encoding='utf-8') as f:
        for line in f:
            line = json.loads(line.strip())
            label = line['label']
            text = line['sentence']
            val.append([text, label])
            
    # 原数据集已随机打散过
    test = val[:int(len(val))//2]
    val = val[int(len(val)//2):]
    
    id2en = {}
    en2id = {}
    
    with open('dataset/tnews/labels.json', 'r', encoding='utf-8') as f:
        for line in f:
            line = json.loads(line.strip())
            id2en[line['label']] = line['label_desc']
            en2id[line['label_desc']] = line['label']
        
    
    en2zh = {
        'news_story': '故事',
        'news_culture': '文化新闻',
        'news_entertainment': '娱乐新闻',
        'news_sports': '体育新闻',
        'news_finance': '经济新闻',
        'news_house': '房地产新闻',
        'news_car': '汽车新闻',
        'news_edu': '教育新闻',
        'news_tech': '科技新闻',
        'news_military': '军事新闻',
        'news_travel': '旅游新闻',
        'news_world': '国际新闻',
        'news_stock': '股市新闻',
        'news_agriculture': '农业新闻',
        'news_game': '游戏新闻'
    }
    
    zh2en = {}
    for en_name in en2zh:
        zh2en[en2zh[en_name]] = en_name
        
    zh_label_name_list = sorted(list(zh2en.keys()))
    en_label_name_list = sorted(list(en2zh.keys()))
    
    print(f'{DATASET}数据集加载完成!')
    print(f'训练集: {len(train)}, 验证集: {len(val)}, 测试集: {len(test)}')

tnews数据集加载完成!
训练集: 53360, 验证集: 5000, 测试集: 5000


In [None]:
# 加载模型
if MODEL_NAME == 'chatglm2-6b':
    model = Model.from_pretrained('ZhipuAI/chatglm2-6b', device_map='auto', revision='v1.0.7')
    pipe = pipeline(task=Tasks.chat, model=model)
    print('generation_config:', model.generation_config)
    print(f'{MODEL_NAME}模型加载完成！')
    def get_llm_result(prompt):
        inputs = {'text':prompt, 'history': []}
        result = pipe(inputs)
        return result['response']
    
elif MODEL_NAME == 'qwen-7b-chat':
    tokenizer = AutoTokenizer.from_pretrained("qwen/Qwen-7B-Chat", revision = 'v1.0.5',trust_remote_code=True)
    model = AutoModelForCausalLM.from_pretrained("qwen/Qwen-7B-Chat", revision = 'v1.0.5',device_map="auto", trust_remote_code=True,fp16 = True).eval()
    model.generation_config = GenerationConfig.from_pretrained("Qwen/Qwen-7B-Chat",revision = 'v1.0.5', trust_remote_code=True) # 可指定不同的生成长度、top_p等相关超参
    print('generation_config:', model.generation_config)
    print(f'{MODEL_NAME}模型加载完成！')
    def get_llm_result(prompt):
        response, history = model.chat(tokenizer, prompt, history=None)
        return response
    
elif MODEL_NAME == 'siamese_uninlu':
    semantic_cls = pipeline(Tasks.siamese_uie, 'damo/nlp_structbert_siamese-uninlu_chinese-base', model_revision='v1.0')
    print(f'{MODEL_NAME}模型加载完成！')
    if DATASET in ['tnews']:
        schema = {'分类': None}
    def get_siamase_result(text, label_names):
        res = semantic_cls(input=','.join(label_names)+'|'+text, schema = schema)['output']
        if len(res) > 0:
            return res[0][0]['span']
        else:
            return ''
        
elif MODEL_NAME == 'paddle_nlp':
    if DATASET in ['tnews', 'nlpcc2014_task2']:
        schema = zh_label_name_list
        model = Taskflow("zero_shot_text_classification", schema=schema)
        print(f'{MODEL_NAME}模型加载完成！')
    def get_paddle_result(text):
        res = model(text)
        if len(res) > 0:
            if DATASET in ['tnews', 'nlpcc2014_task2']:
                if len(res[0]['predictions']) > 0:
                    return res[0]['predictions'][0]['label']
        return ''

In [6]:
# 模型标注方法
import re 

def post_process_llm_cls(llm_out, label_name_list):
    # 在分类任务中，对llm out进行后处理，提升coverage
    post_process_pat = re.compile(r'(' + r'|'.join(label_name_list) + r')')
    # 针对多生成问题
    find_names = post_process_pat.findall(llm_out)
    if len(find_names) > 0:
        # 若有多个结果，默认取第一个
        return find_names[0]
    else:
        return ''
    
def post_process_siamase_cls(out, label_name_list):
    # 在分类任务中，对siamase out进行后处理，提升coverage
    for label_name in label_name_list:
        if out in label_name:
            # 针对片段式抽取问题
            return label_name
    
    return ''


def llm_annotator_cls(input_text, label_name_list, few_shot_data = [], self_consistency_num = 1, print_log=False):
    if len(few_shot_data) > 0:
        example = ''
        for d in few_shot_data:
            text, label = d[0], d[1]
            example += f"文本:'{text}'\n"
            example += f'这段文本所属标签：{label}\n'
        
        prompt = f'''
        给定一段文本，输出一个分类标签。
        分类标签集合：[{','.join(label_name_list)}]
        请直接输出结果，不要附带其他内容。
        
        {example}
        文本：'{input_text}'
        这段文本所属标签：
        '''
    else:
        prompt = f'''
        给定一段文本，输出一个分类标签。
        分类标签集合：[{','.join(label_name_list)}]
        请直接输出结果，不要附带其他内容。
        
        文本：'{input_text}'
        这段文本所属标签：
        '''
    
    result = {}
    
    for i in range(self_consistency_num):
        llm_out = get_llm_result(prompt)
        if print_log:
            print('prompt:', prompt)
            print('llm_out:', llm_out)
            
        if llm_out not in label_name_list:
            # 增加后处理
            llm_out = post_process_llm_cls(llm_out, label_name_list)
            if print_log:
                print('post-process llm_out:', llm_out)
        
        if llm_out in label_name_list:
            if llm_out not in result:
                result[llm_out] = 1
            else:
                result[llm_out] += 1
                  
    if len(result) > 0:
        if print_log:
            print('self-consistency result:', result)
            print('='*30)
            print()
        result = sorted(result.items(), key = lambda x: x[1], reverse=True)
        return result[0][0]
    else:
        if print_log:
            print('self-consistency result:', result)
            print('='*30)
            print()
        return ''
    
def siamase_annotator_cls(text, label_name_list, print_log = False):
    out = get_siamase_result(text, label_name_list)
    if print_log:
        print('out:', out)
        
    if len(out) > 0 and out not in label_name_list:
        out = post_process_siamase_cls(out, label_name_list)
        if print_log:
            print('post-process out:', out)
    
    if len(out) > 0 and out in label_name_list:
        return out
    else:
        return ''
    
def paddle_annotator_cls(text, label_name_list, print_log = False):
    out = get_paddle_result(text)
    if print_log:
        print('out:', out)
        
    if len(out) > 0 and out in label_name_list:
        return out
    else:
        return ''

In [7]:
# 标注器
import numpy as np
from tqdm import tqdm

# FSL设置
use_fsl = False
fsl_example_num = 1
fsl_example_mode = 'class-balance-fixed' # [random, class-balance-fixed]

# Self-Consistency设置
self_consistency_num = 1

def get_few_shot_example(clean_set, mode = 'random', num = 3):
    few_shots = []
    
    if mode == 'random':
        # clean set：[[text, label_name]]
        clean_indexs = range(len(clean_set))
        few_shot_indexs = np.random.choice(clean_indexs, size = num, replace=False)
        for index in few_shot_indexs:
            few_shots.append(clean_set[index])
        
    elif mode == 'class-balance-fixed':
        # 按照类别抽，每个class的样本一样，fixed说明不随机抽sample，按顺序抽
        # clean set: {'label_name':[text]}
        for label_name in sorted(clean_set.keys()):
            samples = clean_set[label_name][:num]
            for sample in samples:
                few_shots.append([sample, label_name])
            
    return few_shots

def annotator(x, zh_label_name_list, use_fsl, clean_pool, fsl_example_mode, fsl_example_num, print_log):
    # input：x，output：y（id）
    y_pred = ''
    if MODEL_NAME in ['chatglm2-6b', 'qwen-7b-chat']:
        if use_fsl:
            few_shot_examples = get_few_shot_example(clean_pool, mode = fsl_example_mode, num = fsl_example_num)
        else:
            few_shot_examples = []
        out = llm_annotator_cls(x, zh_label_name_list, few_shot_data=few_shot_examples, self_consistency_num = self_consistency_num, print_log = print_log)
    elif MODEL_NAME in ['siamese_uninlu']:
        out = siamase_annotator_cls(x, zh_label_name_list, print_log=print_log)
    elif MODEL_NAME in ['paddle_nlp']:
        out = paddle_annotator_cls(x, zh_label_name_list, print_log=print_log)

    if len(out) > 0:
        if DATASET in ['tnews']:
            y_pred = en2id[zh2en[out]]
            
    return y_pred

In [None]:
# 调参
from sklearn.metrics import accuracy_score

# evaluate通用设置
do_eval = True
runs = 1
print_log = False

def evaluate(test, clean_pool = []):
    y_truth_list, y_pred_list = [], []
        
    for sample_i in tqdm(range(len(test))):
        sample = test[sample_i]
        x, y_truth = sample[0], sample[1]
        y_pred = annotator(x, zh_label_name_list, use_fsl, clean_pool, fsl_example_mode, fsl_example_num, print_log)
            
        if len(y_pred) > 0:
            y_truth_list.append(y_truth)
            y_pred_list.append(y_pred)

    accuracy = accuracy_score(np.array(y_truth_list), np.array(y_pred_list))
    
    print(f'coverage:{len(y_pred_list) / len(test)}')
    print(f'accuracy:{accuracy}')

# 设置测试集、fsl的clean set来源
if do_eval:
    if DATASET in ['tnews']:
        evaluation_set = train[:100]
        if fsl_example_mode == 'random':
            clean_pool = []
            for d in val[:100]:
                clean_pool.append([d[0], en2zh[id2en[d[1]]]])
        elif 'class-balance' in fsl_example_mode:
            clean_pool = {}
            for d in val:
                text, label_name = d[0], en2zh[id2en[d[1]]]
                if label_name not in clean_pool:
                    clean_pool[label_name] = [text]
                else:
                    clean_pool[label_name].append(text)

    for i in range(runs):
        evaluate(evaluation_set, clean_pool)
        print()

In [None]:
# LLM开启标注 + 映射label
label_map = {
            100:0,
            101:1,
            102:2,
            103:3,
            104:4,
            106:5,
            107:6,
            108:7,
            109:8,
            110:9,
            112:10,
            113:11,
            114:12,
            115:13,
            116:14} # 把label_id重新排列，从0开始

train_sample = train[:1000]
val_sample = val[:1000]
test_sample = test[:1000]

# 自动标注结果
with open(f'/mnt/workspace/exp_dataset/{DATASET}/result_{MODEL_NAME}_{DATASET}_train.json', 'w', encoding='utf-8') as f:
    for sample_i in tqdm(range(len(train_sample))):
        text, label_id = train_sample[sample_i][0], train_sample[sample_i][1]
        llm_annota = annotator(text)
        if len(llm_annota) > 0:
            f.write(json.dumps({
                'sentence': text,
                'label': label_map[int(llm_annota)]}, ensure_ascii=False)+'\n')
            
with open(f'/mnt/workspace/exp_dataset/{DATASET}/result_{MODEL_NAME}_{DATASET}_val.json', 'w', encoding='utf-8') as f:
    for sample_i in tqdm(range(len(val_sample))):
        text, label_id = val_sample[sample_i][0], val_sample[sample_i][1]
        llm_annota = annotator(text)
        if len(llm_annota) > 0:
            f.write(json.dumps({
                'sentence': text,
                'label': label_map[int(llm_annota)]}, ensure_ascii=False)+'\n')
            
with open(f'/mnt/workspace/exp_dataset/{DATASET}/result_{MODEL_NAME}_{DATASET}_test.json', 'w', encoding='utf-8') as f:
    for sample_i in tqdm(range(len(test_sample))):
        text, label_id = test_sample[sample_i][0], test_sample[sample_i][1]
        llm_annota = annotator(text)
        if len(llm_annota) > 0:
            f.write(json.dumps({
                'sentence': text,
                'label': label_map[int(llm_annota)]}, ensure_ascii=False)+'\n')
            
            
# 人工标注结果
with open(f'/mnt/workspace/exp_dataset/{DATASET}/train.json', 'w', encoding='utf-8') as f:
    for sample_i in tqdm(range(len(train_sample))):
        text, label_id = train_sample[sample_i][0], train_sample[sample_i][1]
        f.write(json.dumps({
            'sentence': text,
            'label': label_map[int(label_id)]}, ensure_ascii=False)+'\n')
            
with open(f'/mnt/workspace/exp_dataset/{DATASET}/val.json', 'w', encoding='utf-8') as f:
    for sample_i in tqdm(range(len(val_sample))):
        text, label_id = train_sample[sample_i][0], train_sample[sample_i][1]
        f.write(json.dumps({
            'sentence': text,
            'label': label_map[int(label_id)]}, ensure_ascii=False)+'\n')
            
with open(f'/mnt/workspace/exp_dataset/{DATASET}/test.json', 'w', encoding='utf-8') as f:
    for sample_i in tqdm(range(len(test_sample))):
        text, label_id = train_sample[sample_i][0], train_sample[sample_i][1]
        f.write(json.dumps({
            'sentence': text,
            'label': label_map[int(label_id)]}, ensure_ascii=False)+'\n')
