In [3]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from transformers import BertTokenizer
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report

In [4]:
class TextDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_len):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        label = self.labels[idx]

        encoding = self.tokenizer.encode_plus(
            text,
            add_special_tokens=True,
            max_length=self.max_len,
            return_token_type_ids=False,
            padding='max_length',
            truncation=True,  
            return_attention_mask=True,
            return_tensors='pt',
        )

        input_ids = encoding['input_ids'].flatten()
        attention_mask = encoding['attention_mask'].flatten()

        assert input_ids.size(0) == self.max_len, f"input_ids length: {input_ids.size(0)}, expected: {self.max_len}"
        assert attention_mask.size(0) == self.max_len, f"attention_mask length: {attention_mask.size(0)}, expected: {self.max_len}"

        return {
            'text': text,
            'input_ids': input_ids,
            'attention_mask': attention_mask,
            'label': torch.tensor(label, dtype=torch.long)
        }


In [5]:
class TransformerClassifier(nn.Module):
    def __init__(self, input_dim, model_dim, num_heads, num_layers, num_classes, dropout=0.1):
        super(TransformerClassifier, self).__init__()
        self.embedding = nn.Embedding(input_dim, model_dim)
        encoder_layers = nn.TransformerEncoderLayer(model_dim, num_heads, dropout=dropout)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layers, num_layers)
        self.fc = nn.Linear(model_dim, num_classes)
        self.softmax = nn.Softmax(dim=1)

    def forward(self, input_ids, attention_mask):
        embedded = self.embedding(input_ids) * attention_mask.unsqueeze(2)
        transformer_output = self.transformer_encoder(embedded)
        pooled_output = transformer_output.mean(dim=1)
        logits = self.fc(pooled_output)
        return self.softmax(logits)


In [6]:
# 加载数据集
import re

def data_process():  # 数据预处理函数
    label_list = []
    text_file_list = []
    with open('label/index', encoding='utf-8') as f:
        f = f.read().splitlines()[:5000]  # 修改文件个数
        for i in f:
            if i.split(" ")[0] == 'spam':
                label_list.append(0)
            else:
                label_list.append(1)
            text_file_list.append(i.split(" ")[1][3:])
    data = []
    for file_path in text_file_list:
        with open(file_path, errors='ignore', encoding='utf-8') as f:
            text = f.readlines()
            res = re.findall('[\u4e00-\u9fa5]', str(text))
            res = "".join(res)
            data.append(res)
    return data, label_list

In [8]:
def train_model(model, data_loader, loss_fn, optimizer, device, num_epochs):
    model = model.to(device)
    model.train()

    total_loss = 0

    for epoch in range(num_epochs):
        total_loss = 0
        for batch in data_loader:
            optimizer.zero_grad()

            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['label'].to(device)

            outputs = model(input_ids, attention_mask)
            loss = loss_fn(outputs, labels)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        print(f'Epoch {epoch + 1}/{num_epochs}, Loss: {total_loss / len(data_loader)}')
    
    return total_loss

# 训练参数
NUM_EPOCHS = 10
BATCH_SIZE = 8
MAX_LEN = 1024
LEARNING_RATE = 2e-5

# 假设我们使用BERT tokenizer
tokenizer = BertTokenizer.from_pretrained('./bert-base-multilingual-cased')

# 创建数据集和数据加载器
ori_texts, ori_labels = data_process()

train_texts, test_texts, train_labels, test_labels = train_test_split(
    ori_texts, ori_labels, test_size=0.2, random_state=42
)

train_dataset = TextDataset(train_texts, train_labels, tokenizer, MAX_LEN)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_dataset = TextDataset(test_texts, test_labels, tokenizer, MAX_LEN)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=True)

# 模型初始化
input_dim = len(tokenizer.vocab)
model_dim = 1024
num_heads = 2
num_layers = 2
num_classes = 2

model = TransformerClassifier(input_dim, model_dim, num_heads, num_layers, num_classes)
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
loss_fn = nn.CrossEntropyLoss()

# 设备选择
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 开始训练
train_model(model, train_loader, loss_fn, optimizer, device, NUM_EPOCHS)

# 保存整个模型
torch.save(model, 'transformer_classifier_complete.pth')



Epoch 1/10, Loss: 0.6041254503130913


KeyboardInterrupt: 

In [None]:
model.eval()  # 切换到评估模式
all_predictions = []
all_labels = []

with torch.no_grad():
    for batch in test_loader:
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['label'].to(device)

        outputs = model(input_ids, attention_mask)
        predictions = torch.argmax(outputs, dim=1)
        all_predictions.extend(predictions.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

# 计算准确率、精确率、召回率和 F1 分数
accuracy = accuracy_score(all_labels, all_predictions)
report = classification_report(all_labels, all_predictions, target_names=["spam", "normal"])

print(f"Accuracy: {accuracy * 100:.2f}%")
print("Classification Report:")
print(report)

Accuracy: 93.60%
Classification Report:
              precision    recall  f1-score   support

        spam       0.92      1.00      0.96       707
      normal       0.99      0.79      0.88       293

    accuracy                           0.94      1000
   macro avg       0.95      0.89      0.92      1000
weighted avg       0.94      0.94      0.93      1000



In [None]:
# 设备选择
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 加载模型并将其移动到合适的设备上
model = torch.load('transformer_classifier_complete.pth', map_location=device)
model.to(device)
model.eval()  # 切换模型到评估模式

# 示例新文本
new_text = '很高兴收到你的来信，明天可否有时间，来综合楼'

# 将新文本转换为模型的输入格式
new_encoding = tokenizer.encode_plus(
    new_text,
    add_special_tokens=True,
    max_length=MAX_LEN,
    padding='max_length',
    truncation=True,
    return_attention_mask=True,
    return_tensors='pt'
)

# 将输入张量移动到与模型相同的设备上
new_input_ids = new_encoding['input_ids'].to(device)
new_attention_mask = new_encoding['attention_mask'].to(device)

# 使用模型进行预测
with torch.no_grad():
    output = model(new_input_ids, new_attention_mask)
    predicted_class = torch.argmax(output, dim=1).item()

# 输出预测结果
print(f"Predicted Class: {predicted_class}")


Predicted Class: 0


In [None]:
# 加载保存的模型和优化器状态
checkpoint = torch.load('transformer_classifier_complete.pth')
model = TransformerClassifier(input_dim, model_dim, num_heads, num_layers, num_classes)
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

# 加载模型状态
model.load_state_dict(checkpoint['model_state_dict'])

# 加载优化器状态
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])

# 加载最后一个训练周期的epoch数
start_epoch = checkpoint['epoch']

# 将模型设置为训练模式
model.train()

# 设备选择
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

# 继续训练
total_loss = train_model(model, train_loader, loss_fn, optimizer, device, num_epochs=start_epoch+NUM_EPOCHS)

torch.save({
    'epoch': start_epoch + NUM_EPOCHS,  # 更新后的epoch数
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'loss': total_loss,
}, 'transformer_classifier_complete.pth')
