In [1]:
import torch
import pandas as pd
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import jieba
import re
from transformers import BertTokenizer, BertForSequenceClassification,BertModel
from torch.utils.data import DataLoader, TensorDataset

# 检查是否有可用的GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [2]:
# 加载BERT模型和tokenizer，并将它们移动到GPU上
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
tokenizer = BertTokenizer.from_pretrained("model/roberta/tokenizer")
bert_model = BertModel.from_pretrained("model/roberta/model").to(device)

# 冻结BERT模型参数
for param in bert_model.parameters():
    param.requires_grad_(False)

# 解冻部分BERT模型参数
for param in bert_model.encoder.layer[-3:].parameters():
    param.requires_grad_(True)

# 创建一个自定义的带有注意力机制的双向LSTM模型
class LSTMWithAttention(nn.Module):
    def __init__(self, bert_model, lstm_hidden_dim, num_labels, num_attention_heads, attention_dim, dropout_prob=0.2):
        super(LSTMWithAttention, self).__init__()
        self.bert_model = bert_model
        self.lstm_hidden_dim = lstm_hidden_dim
        self.num_labels = num_labels
        self.num_attention_heads = num_attention_heads
        self.attention_dim = attention_dim

        # LSTM层
        self.lstm = nn.LSTM(input_size=bert_model.config.hidden_size,
                            hidden_size=lstm_hidden_dim,
                            num_layers=4,
                            batch_first=True,
                            dropout=dropout_prob,
                            bidirectional=True)
        self.dropout = nn.Dropout(dropout_prob)

        # 多头自注意力层
        self.attention_layer = nn.MultiheadAttention(attention_dim,
                                                     num_attention_heads,
                                                     dropout=dropout_prob,
                                                     kdim=attention_dim,
                                                     vdim=attention_dim)

        # 全连接层进行分类
        self.fc = nn.Linear(lstm_hidden_dim * 2, num_labels)

    def forward(self, input_ids, attention_mask):
        # 使用BERT提取特征
        with torch.no_grad():  # 在BERT模型中不计算梯度
            outputs = self.bert_model(input_ids, attention_mask=attention_mask)
            bert_hidden_states = outputs.last_hidden_state

        # 将BERT的输出传递给LSTM
        lstm_output, _ = self.lstm(bert_hidden_states)
        lstm_output = self.dropout(lstm_output)

        # attention_dim 
        attention_output, _ = self.attention_layer(lstm_output.permute(1, 0, 2),
                                                   lstm_output.permute(1, 0, 2),
                                                   lstm_output.permute(1, 0, 2))
        attention_output = attention_output.permute(1, 0, 2)

        # 获取最后一个时间步的输出
        last_lstm_output = attention_output[:, -1, :]

        # 使用全连接层进行分类
        logits = self.fc(last_lstm_output)
        return logits

# 定义模型参数
lstm_hidden_dim = 256
num_labels = 6
num_attention_heads = 4  
attention_dim = lstm_hidden_dim*2  
dropout_prob = 0.1

# 创建BERT+LSTM模型带有注意力机制
model = LSTMWithAttention(bert_model, lstm_hidden_dim, num_labels, num_attention_heads, attention_dim, dropout_prob).to(device)

In [3]:
# 数据预处理
MAX_LEN = 64

def preprocess_data(df, tokenizer, max_len):
    input_ids = []
    attention_masks = []
    labels = []

    for index, row in df.iterrows():
        text = row[0]
        label = row[1]

        inputs = tokenizer.encode_plus(
            text,
            add_special_tokens=True,
            max_length=max_len,
            padding='max_length',
            truncation=True,
            return_tensors='pt'
        )

        input_ids.append(inputs['input_ids'])
        attention_masks.append(inputs['attention_mask'])
        labels.append(label)

    input_ids = torch.cat(input_ids, dim=0).to(device)
    attention_masks = torch.cat(attention_masks, dim=0).to(device)
    labels = torch.tensor(labels, dtype=torch.long).to(device)

    return input_ids, attention_masks, labels

In [4]:
# 准备训练数据和验证数据
train_df = pd.read_table('input/train.txt',header=None)
dev_df = pd.read_table('input/dev.txt',header=None)

In [5]:
# def clean_text(text):
#     # 去除特殊字符、符号和数字
#     cleaned_text = re.sub(r'[^\u4e00-\u9fa5a-zA-Z\s]', '', text)
#     return cleaned_text

# train_df[0] = train_df[0].map(clean_text)
# dev_df[0] = dev_df[0].map(clean_text)

In [6]:
train_input_ids, train_attention_masks, train_labels = preprocess_data(train_df, tokenizer, MAX_LEN)
dev_input_ids, dev_attention_masks, dev_labels = preprocess_data(dev_df, tokenizer, MAX_LEN)

# 创建TensorDataset和DataLoader
train_dataset = TensorDataset(train_input_ids, train_attention_masks, train_labels)
dev_dataset = TensorDataset(dev_input_ids, dev_attention_masks, dev_labels)

train_batch_size = 128
dev_batch_size = 64

train_dataloader = DataLoader(train_dataset, batch_size=train_batch_size, shuffle=True)
dev_dataloader = DataLoader(dev_dataset, batch_size=dev_batch_size)

In [7]:
# 定义优化器和损失函数
optimizer = torch.optim.AdamW(model.parameters(), lr=2e-5)
loss_fn = torch.nn.CrossEntropyLoss()

# 训练模型
num_epochs = 200

for epoch in range(num_epochs):
    model.train()
    total_loss = 0

    for batch in train_dataloader:
        input_ids, attention_mask, labels = batch
        optimizer.zero_grad()

        # 前向传播
        logits = model(input_ids, attention_mask=attention_mask)
        
        # 计算交叉熵损失
        loss = loss_fn(logits, labels)
        
        # 反向传播和参数更新
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    avg_loss = total_loss / len(train_dataloader)
    print(f"Epoch {epoch + 1}/{num_epochs}, Average Loss: {avg_loss:.4f}",end='\t')

    # 在验证集上评估模型性能
    model.eval()
    val_preds = []
    val_labels = []

    for batch in dev_dataloader:
        input_ids, attention_mask, labels = batch
        with torch.no_grad():
            logits = model(input_ids, attention_mask=attention_mask)
        
        # 计算预测
        val_preds.extend(logits.argmax(dim=1).tolist())
        val_labels.extend(labels.tolist())

    
    correct_predictions = [1 if p == t else 0 for p, t in zip(val_preds, val_labels)]
    accuracy = sum(correct_predictions) / len(correct_predictions)
    print(f'Test_Accuracy: {accuracy * 100:.2f}%')

Epoch 1/200, Average Loss: 1.7275	Test_Accuracy: 25.10%
Epoch 2/200, Average Loss: 1.4644	Test_Accuracy: 55.40%
Epoch 3/200, Average Loss: 1.1031	Test_Accuracy: 62.10%
Epoch 4/200, Average Loss: 0.9061	Test_Accuracy: 65.60%
Epoch 5/200, Average Loss: 0.8045	Test_Accuracy: 72.80%
Epoch 6/200, Average Loss: 0.7303	Test_Accuracy: 75.30%
Epoch 7/200, Average Loss: 0.6501	Test_Accuracy: 75.90%
Epoch 8/200, Average Loss: 0.5744	Test_Accuracy: 79.10%
Epoch 9/200, Average Loss: 0.5331	Test_Accuracy: 79.70%
Epoch 10/200, Average Loss: 0.4926	Test_Accuracy: 80.50%
Epoch 11/200, Average Loss: 0.4650	Test_Accuracy: 80.60%
Epoch 12/200, Average Loss: 0.4381	Test_Accuracy: 81.50%
Epoch 13/200, Average Loss: 0.4190	Test_Accuracy: 81.70%
Epoch 14/200, Average Loss: 0.4018	Test_Accuracy: 82.30%
Epoch 15/200, Average Loss: 0.3996	Test_Accuracy: 81.90%
Epoch 16/200, Average Loss: 0.3777	Test_Accuracy: 82.80%
Epoch 17/200, Average Loss: 0.3622	Test_Accuracy: 81.30%
Epoch 18/200, Average Loss: 0.3519	Test_

In [8]:
test_df = pd.read_table('input/dev.txt',header=None)

# 数据预处理
test_input_ids, test_attention_masks, _ = preprocess_data(test_df, tokenizer, MAX_LEN)

# 创建 DataLoader
test_dataset = TensorDataset(test_input_ids, test_attention_masks)
test_batch_size = 64
test_dataloader = DataLoader(test_dataset, batch_size=test_batch_size)

# 将模型设置为评估模式
model.eval()

# 存储预测结果
predictions = []

# 遍历测试数据并进行预测
for batch in test_dataloader:
    input_ids, attention_mask = batch
    input_ids = input_ids.to(device)
    attention_mask = attention_mask.to(device)
    with torch.no_grad():
        # 前向传播
        logits = model(input_ids, attention_mask=attention_mask)
    
    # 计算预测
    predicted_labels = logits.argmax(dim=1).tolist()
    predictions.extend(predicted_labels)

In [9]:
true_labels = test_df[1]

# 计算准确度
correct_predictions = [1 if p == t else 0 for p, t in zip(predictions, true_labels)]
accuracy = sum(correct_predictions) / len(correct_predictions)
print(f'Accuracy: {accuracy * 100:.2f}%')

Accuracy: 83.30%
