In [7]:
import torch
import random
import pandas as pd
from tqdm import tqdm
from datasets import load_dataset
from transformers import BertTokenizerFast, BertForSequenceClassification, Trainer, TrainingArguments
from transformers import DataCollatorWithPadding
from sklearn.metrics import accuracy_score, precision_recall_fscore_support

In [8]:
# 加载数据集
dataset = load_dataset("tyqiangz/multilingual-sentiments", 'chinese')
dataset
# train_dataset = dataset["train"]
# sub_train_dataset = train_dataset.select(random.sample(range(len(train_dataset)), 12000))

# # 现在sub_train_dataset就是包含12000条随机样本的数据集
# print(sub_train_dataset['text'])

DatasetDict({
    train: Dataset({
        features: ['text', 'source', 'label'],
        num_rows: 120000
    })
    validation: Dataset({
        features: ['text', 'source', 'label'],
        num_rows: 3000
    })
    test: Dataset({
        features: ['text', 'source', 'label'],
        num_rows: 3000
    })
})

# 手动构建分类器层

In [None]:
from transformers import BertModel, BertConfig, BertTokenizerFast
import torch.nn as nn
from torch.utils.data import DataLoader
from transformers import AdamW, get_linear_schedule_with_warmup
from torch.nn import CrossEntropyLoss

# 加载预训练的BERT模型配置和模型本身
config = BertConfig.from_pretrained('bert-base-chinese', num_labels=3)
model = BertModel.from_pretrained('bert-base-chinese', config=config)

# 添加一个分类层
class BertClassifier(nn.Module):
    def __init__(self, bert_model, num_labels):
        super(BertClassifier, self).__init__()
        self.bert = bert_model
        self.classifier = nn.Linear(config.hidden_size, num_labels)

    def forward(self, input_ids, attention_mask=None):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        pooled_output = outputs[1]  # 取CLS token的输出作为句子的表示
        logits = self.classifier(pooled_output)
        return logits

# 创建分类器实例
classifier = BertClassifier(model, num_labels=3)

In [None]:
tokenizer = BertTokenizerFast.from_pretrained('bert-base-chinese')

def tokenize_function(examples):
    return tokenizer(examples['text'], padding='max_length', truncation=True, max_length=512)

tokenized_datasets = dataset.map(tokenize_function, batched=True)

tokenized_datasets.set_format(type='torch', columns=['input_ids', 'attention_mask', 'label'])

# # 获取训练数据集
# train_dataset = tokenized_datasets['train']
# 获取测试数据集
test_dataset = tokenized_datasets['test']

In [None]:
train_dataset = sub_train_dataset.map(tokenize_function, batched=True)
train_dataset.set_format("torch", columns=["input_ids", "attention_mask", "label"])
print(train_dataset[0])

In [None]:
# 设置训练参数
learning_rate = 2e-5
batch_size = 8
epochs = 3
logging_steps = 1000

# 创建数据加载器
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

# 选择优化器
optimizer = AdamW(classifier.parameters(), lr=learning_rate)

# 设置损失函数
criterion = CrossEntropyLoss()

# 设置学习率调度器
total_steps = len(train_loader) * epochs
scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=0, num_training_steps=total_steps)

# 训练模型
for epoch in range(epochs):
    classifier.train()
    progress_bar = tqdm(train_loader, desc=f"Epoch {epoch+1}")
    
    for step, batch in enumerate(progress_bar):
        input_ids = batch['input_ids']
        attention_mask = batch['attention_mask']
        labels = batch['label']

        optimizer.zero_grad()  # 清零梯度
        outputs = classifier(input_ids, attention_mask=attention_mask)  # 前向传播
        loss = criterion(outputs, labels)  # 计算损失
        loss.backward()  # 反向传播
        optimizer.step()  # 更新权重
        scheduler.step()  # 更新学习率
        
        # 打印日志
        if (step + 1) % logging_steps == 0:
            print(f"Step [{step+1}/{len(train_loader)}], Loss: {loss.item()}")


    classifier.eval()  # 切换到评估模式
    total_correct = 0
    with torch.no_grad():  # 不计算梯度
        for batch in test_loader:
            input_ids = batch['input_ids']
            attention_mask = batch['attention_mask']
            labels = batch['label']

            outputs = classifier(input_ids, attention_mask=attention_mask)
            _, predicted = torch.max(outputs.data, 1)
            total_correct += (predicted == labels).sum().item()

    accuracy = total_correct / len(test_dataset)
    print(f'Epoch {epoch+1}, Accuracy: {accuracy}')

# 自动构建分类器层

In [None]:
# 若本地有训练好的模型，则可以直接加载
model_path = './三分类model'
# 加载分词器
tokenizer = BertTokenizerFast.from_pretrained(model_path)
# 加载模型
model = BertForSequenceClassification.from_pretrained(model_path)

# # 加载BERT的tokenizer
# tokenizer = BertTokenizer.from_pretrained('bert-base-chinese')
# # 定义模型，3分类问题所以num_labels=3
# model = BertForSequenceClassification.from_pretrained('bert-base-chinese', num_labels=3)

# 定义tokenizer的处理函数
def tokenize_function(examples):
    """
    对输入的文本数据进行分词处理。

    这个函数使用了预训练的分词器来将文本数据转换为模型输入所需的格式。
    它通过对文本进行填充和截断，确保所有输入数据具有相同的长度，从而可以被模型有效地处理。

    参数:
    examples (dict): 包含文本数据的字典。文本数据存储在键为'text'的项中。

    返回:
    分词后的数据，以特定格式准备好供深度学习模型使用。
    """
    # 使用tokenizer对文本数据进行分词、填充和截断
    return tokenizer(examples['text'], padding="max_length", truncation=True)

# 使用map函数对dataset进行转换，将其中的文本数据通过tokenize_function进行分词处理
# 这里的batched=True表示以批量方式处理数据，根据内存使用情况自动优化
tokenized_datasets = dataset.map(tokenize_function, batched=True)

# 设置数据集的格式为PyTorch兼容的格式，并指定包含的列
# 这里的'input_ids'和'attention_mask'是模型输入所需的列，'label'是标签列
tokenized_datasets.set_format('torch', columns=['input_ids', 'attention_mask', 'label'])

# 获取训练数据集
train_dataset = tokenized_datasets['train']
# 获取测试数据集
test_dataset = tokenized_datasets['test']

# 定义一个计算模型性能指标的函数
# 它接受一个预测对象p作为输入，该对象应包含实际标签和预测标签的信息
# 它返回一个字典，其中包含了准确率、精确率、召回率和F1分数四个指标的值
def compute_metrics(p):
    # 从预测对象中获取预测结果，将其转换为最大概率对应的类别标签
    preds = p.predictions.argmax(-1)
    
    # 计算预测结果的精确率、召回率、F1分数和支持度，这里使用的是宏平均方法
    precision, recall, f1, _ = precision_recall_fscore_support(p.label_ids, preds, average='macro')
    
    # 计算预测结果的准确率
    acc = accuracy_score(p.label_ids, preds)
    
    # 将计算得到的指标封装在一个字典中并返回
    return {
        'accuracy': acc,
        'precision': precision,
        'recall': recall,
        'f1': f1,
    }

# 定义训练参数
# 这段代码用于配置模型训练时的参数，为`TrainingArguments`类的实例。
# `output_dir`指定训练结果的输出目录。
# `evaluation_strategy`定义评估模型的策略，这里为每个epoch进行一次评估。
# `learning_rate`设置初始学习率，控制模型参数更新的速度。
# `per_device_train_batch_size`和`per_device_eval_batch_size`分别定义了训练和评估时每个设备的批次大小。
# `num_train_epochs`指定训练的轮数，这里为4轮。
# `weight_decay`设置权重衰减（L2正则化）的系数，用于防止过拟合。
# `logging_dir`定义日志存储目录，用于记录训练过程中的信息。
# `logging_steps`指定每隔多少步打印一次日志。
training_args = TrainingArguments(
    output_dir='./results-三分类',
    evaluation_strategy="epoch",
    learning_rate=2e-5,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    num_train_epochs=3,
    weight_decay=0.01,
    logging_dir='./logs',
    logging_steps=1000,
)

# 创建一个带有填充功能的数据合并器
# 这个数据合并器会在训练过程中对输入数据进行预处理，具体包括：
# - 使用传入的 tokenizer 对文本数据进行编码，将其转换为模型可接受的格式
# - 对不同长度的序列进行统一长度的调整，通过在较短序列的末尾添加特殊填充标记来实现
# - 保证批次内的所有序列长度一致，便于模型的批量处理
data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

# 创建Trainer实例
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
    tokenizer=tokenizer,
    data_collator=data_collator,
    compute_metrics=compute_metrics,
)

In [None]:
# 训练模型
trainer.train()

In [None]:
# 评估模型
trainer.evaluate()

In [None]:
# 保存模型
model.save_pretrained('./三分类model')
tokenizer.save_pretrained('./三分类model')

In [None]:
# 定义推理函数
def predict(text):
    """
    定义推理函数，用于对输入文本进行情感预测。
    
    Args:
        text (str): 输入文本。
        
    Returns:
        str: 预测结果，可以是"Positive"、"Negative"或"Neutral"。
    """
    # 确定设备
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    # 将模型移动到相同的设备
    model.to(device)
    # 关闭模型的训练模式
    model.eval()
    # 预处理输入文本
    inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=128)
    # 将输入数据移动到相同的设备
    inputs = {key: value.to(device) for key, value in inputs.items()}
    # 关闭梯度计算以节省内存和计算力
    with torch.no_grad():
        outputs = model(**inputs)
    # 获取预测结果
    predictions = torch.argmax(outputs.logits, dim=-1)
    # 初始化结果变量
    result=''
    if predictions.item() == 0:
        result = "Positive" 
    elif predictions.item() == 2:
        result = "Negative"
    else:
        result = "Neutral"
    return result


In [None]:
# 示例推理
example_text = "ChatGPT不会回答这个问题"
print(predict(example_text))

In [None]:
# 读取xlsx文件并去重
df = pd.read_excel('ChatGPT_去重.xlsx')  # 替换为你的xlsx文件路径
def check_length(row):
    return len(row['微博博文']) < 200

filtered_df = df[df.apply(check_length, axis=1)]

# 假设文本内容在"微博博文"列
texts = filtered_df['微博博文'].tolist()

# 创建一个进度条
tqdm.pandas()

# 对每个文本进行情感预测
df['sentiment'] = df['微博博文'].progress_apply(predict)

# 保存结果回xlsx文件
df.to_excel('predicted_sentiments.xlsx', index=False)

print("情感预测完成，并已保存到predicted_sentiments.xlsx")
