# 基于transformers的文本分类

## 导入相关包

In [None]:
!git clone https://huggingface.co/distilbert/distilbert-base-uncased-finetuned-sst-2-english

In [None]:
from transformers import AutoTokenizer, AutoModelForSequenceClassification, TrainingArguments, Trainer, DataCollatorWithPadding
from datasets import Dataset
import evaluate
import numpy as np
from transformers import pipeline
import re
import pandas as pd


## 加载数据集

In [None]:
tweets = Dataset.from_csv('train.csv')
tweets

In [None]:
tweets[0]

In [None]:
tweets.features

In [None]:
tweets.shape

In [None]:
splited_dataset = tweets.train_test_split(test_size=0.2, seed=42)
splited_dataset

## 数据集预处理

In [None]:
tokenizer = AutoTokenizer.from_pretrained('distilbert-base-uncased-finetuned-sst-2-english')

In [None]:
def preprocess(examples):
    texts = examples['text']
    cleaned_texts = []

    for text in texts:
        text = re.sub(r'http\S+|www\S+|https\S+', '[URL]', text, flags=re.MULTILINE)
        text = re.sub(r'<.*?>', '', text)
        # 统一多个空格
        text = re.sub(r'\s+', ' ', text).strip()
        cleaned_texts.append(text)

    tokenized = tokenizer(
        cleaned_texts,
        padding='max_length',
        truncation=True,
        max_length=128,
        return_tensors='np'
    )
        # 如果有标签，保留它们
    if 'target' in examples:
        tokenized['labels'] = examples['target']
        
    return tokenized


In [None]:
columns_to_remove = ['id', 'keyword', 'location', 'text', 'target'] 
tokenized_datasets = splited_dataset.map(
    preprocess,
    batched=True,
    remove_columns=columns_to_remove
)
tokenized_datasets

## 创建模型

In [None]:
model = AutoModelForSequenceClassification.from_pretrained('distilbert-base-uncased-finetuned-sst-2-english', num_labels = 2)

## 创建评估函数

In [None]:
accuracy = evaluate.load('accuracy')
f1 = evaluate.load('f1')

In [None]:
def eval_metric(pred):
    # 获取预测结果和真实标签
    logits, labels = pred
    # 将模型输出的logits转换为预测类别
    predictions = np.argmax(logits, axis=-1)
    
    # 计算准确率
    accuracy_score = accuracy.compute(predictions=predictions, references=labels)

    # 计算F1值
    f1_score = f1.compute(predictions=predictions, references=labels, average='weighted')

    # 返回评估指标
    return {'accuracy': accuracy_score['accuracy'], 'f1': f1_score['f1']}

## 配置训练参数

In [None]:
args = TrainingArguments(
    output_dir='./results',
    learning_rate=2e-5,
    per_device_train_batch_size=64,
    per_device_eval_batch_size=128,
    num_train_epochs=3,
    weight_decay=0.01,
    gradient_accumulation_steps=32,
    gradient_checkpointing=True,
    optim="adafactor",               # *** adafactor优化器 *** 
    evaluation_strategy='epoch',
    save_strategy='epoch',
    metric_for_best_model='f1',
    load_best_model_at_end=True,
    logging_steps=100,
    save_total_limit=3
)

## 创建训练器

In [None]:
trainer = Trainer(
    model = model,
    args = args,
    train_dataset = tokenized_datasets['train'],
    eval_dataset = tokenized_datasets['test'],
    compute_metrics = eval_metric,
    data_collator = DataCollatorWithPadding(tokenizer=tokenizer)
)

## 模型训练

In [None]:
trainer.train()

In [None]:
trainer.evaluate(eval_dataset=tokenized_datasets['test'])

## 模型预测

In [None]:
test_df = pd.read_csv('test.csv')
classifier = pipeline('text-classification', model='results/checkpoint-bes', tokenizer=tokenizer, device='mps')

In [None]:
# 对测试数据进行预测
test_texts = test_df['text'].tolist()

# 批量处理以加快速度
batch_size = 32
predictions = []

for i in range(0, len(test_texts), batch_size):
    batch_texts = test_texts[i:i+batch_size]
    batch_results = classifier(batch_texts)
    predictions.extend(batch_results)
    if i % 100 == 0:
        print(f"已处理 {i}/{len(test_texts)} 条数据")

print("预测完成!")

# 将预测结果转换为提交格式
predicted_labels = [1 if result['label'] == 'LABEL_1' else 0 for result in predictions]

#  创建提交文件
submission = pd.DataFrame({
    'id': test_df['id'],
    'target': predicted_labels
})

#  保存提交文件
submission.to_csv('prediction.csv', index=False)
