In [8]:
from pandas import DataFrame
from MyModule import load_data, load_vocab, remap_labels

train_data = remap_labels(load_data("train_vectorized.txt", sep=",", is_vectorized=True))
val_data = remap_labels(load_data("val_vectorized.txt", sep=",", is_vectorized=True))
test_data = remap_labels(load_data("test_vectorized.txt", sep=",", is_vectorized=True))

vocab = load_vocab("vocab.txt")

In [2]:
import numpy as np
import torch

def load_pretrained_embeddings(embedding_path, vocab, embedding_dim=200):
    """
    根据已有 vocab 构建嵌入矩阵
    """
    # found = {v: False for v in vocab.token_to_idx.keys()}
    embeddings = np.random.normal(scale=0.6, size=(len(vocab), embedding_dim))  # 随机初始化
    with open(embedding_path, 'r', encoding='utf-8', errors='ignore') as f:
        for line in f:
            parts = line.strip().split()
            word = parts[0]
            vector = np.array(parts[1:], dtype='float32')
            if word in vocab.token_to_idx:
                embeddings[vocab[word]] = vector
                # found[word] = True
    # for word, was_found in found.items():
    #     if not was_found:
    #         print(f"词元 {word} 未找到预训练向量")
    return torch.tensor(embeddings, dtype=torch.float32)

In [3]:
embedding_matrix = load_pretrained_embeddings("Word2Vec-100000-small.txt", vocab, embedding_dim=200)

In [4]:
# 融合词向量层的多层感知机
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
from torch.nn import functional as F


class BowDataset(Dataset):
    def __init__(self, data: DataFrame) -> None:
        self.data = data

    def __len__(self) -> int:
        return len(self.data)

    def __getitem__(self, i: int):
        return self.data.iloc[i]

def collate_fn_mlp(batch):
    # 从独立样本集合中构建各批次的输入输出
    # 其中 BowDataset 类定义了一个样本的数据结构，即输入标签和输出标签的元组
    # 因此，将输入 inputs 定义为一个张量的列表，其中每个张量为原始句子中词元序列对应的索引值序列
    inputs = [torch.tensor(b[0]) for b in batch]

    # 输出的目标 targets 为该批次中由全部样例输出结果构成的张量
    targets = torch.tensor([b[1] for b in batch], dtype=torch.long)

    # 获取一个批次中每个样例的序列长度
    offsets = [0] + [i.shape[0] for i in inputs]

    # 根据序列的长度，转换为每个序列起始位置的偏移量
    offsets = torch.tensor(offsets[:-1]).cumsum(dim=0)

    # 将 inputs 列表中的张量拼接成一个大的张量
    inputs = torch.cat(inputs)

    return inputs, offsets, targets

In [5]:
class MLP(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, num_class):
        super(MLP, self).__init__()
        # EmbeddingBag 层
        self.embeddingbag = nn.EmbeddingBag(vocab_size, embedding_dim)

        # 词向量层：使用 EmbeddingBag
        # self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.embedding = nn.Embedding.from_pretrained(embedding_matrix, freeze=False)

        # 线性变换：词向量层 -> 隐含层
        self.linear1 = nn.Linear(embedding_dim, hidden_dim)

        # 使用ReLU激活函数
        self.activate = F.relu

        # 线性变换：激活层 -> 输出层
        self.linear2 = nn.Linear(hidden_dim, num_class)
        # self.dropout = nn.Dropout(dropout)
    
    def forward(self, inputs, offsets):
        # embeddings = self.embedding(inputs)  # (batch_size, seq_length, embed_size)
        # embedding = embeddings.mean(dim=1)  # (batch_size, embed_size)
        embedding = self.embeddingbag(inputs, offsets)
        hidden = self.activate(self.linear1(embedding))  # (batch_size, hidden_size)
        outputs = self.linear2(hidden)  # (batch_size, output_size)

        # 获得每个序列属于某个类别概率的对数值
        # probs = F.log_softmax(outputs, dim=1)
        return outputs

In [6]:
# MLP 相关参数与数据加载
from tqdm.auto import tqdm

# 超参数设置
embedding_dim = 200
num_class = 3
hidden_dim = 256
batch_size = 32
num_epoch = 5

# 加载数据
train_dataset = BowDataset(train_data)
test_dataset = BowDataset(test_data)

train_data_loader_mlp = DataLoader(train_dataset, batch_size=batch_size, collate_fn=collate_fn_mlp, shuffle=True)
test_data_loader_mlp = DataLoader(test_dataset, batch_size=1, collate_fn=collate_fn_mlp, shuffle=False)

# 加载模型
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
mlp = MLP(len(vocab), embedding_dim, hidden_dim, num_class)
mlp.to(device)

  from .autonotebook import tqdm as notebook_tqdm


MLP(
  (embeddingbag): EmbeddingBag(33152, 200, mode='mean')
  (embedding): Embedding(33152, 200)
  (linear1): Linear(in_features=200, out_features=256, bias=True)
  (linear2): Linear(in_features=256, out_features=3, bias=True)
)

In [7]:
# 训练过程
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(mlp.parameters(), lr=0.001)

mlp.train()
for epoch in range(num_epoch):
    total_loss = 0
    for batch in tqdm(train_data_loader_mlp, desc=f"Training Epoch {epoch+1}"):
        inputs, offsets, targets = [x.to(device) for x in batch]

        logits = mlp(inputs, offsets)
        loss = criterion(logits, targets)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
            
        total_loss += loss.item()
    avg_loss = total_loss / len(train_data_loader_mlp)

    print(f"Epoch {epoch+1} - Loss: {avg_loss:.4f}")


# 测试过程
acc = 0
for batch in tqdm(test_data_loader_mlp, desc="Testing"):
    inputs, offsets, targets = [x.to(device) for x in batch]
    with torch.no_grad():
        output = mlp(inputs, offsets)
        acc += (output.argmax(dim=1) == targets).sum().item()
    
# 输出在测试集上的准确率
print(f"Acc: {acc / len(test_data_loader_mlp):.4f}")

Training Epoch 1: 100%|██████████| 7155/7155 [00:28<00:00, 250.87it/s]


Epoch 1 - Loss: 0.42


Training Epoch 2: 100%|██████████| 7155/7155 [00:28<00:00, 252.45it/s]


Epoch 2 - Loss: 0.33


Training Epoch 3: 100%|██████████| 7155/7155 [00:29<00:00, 242.60it/s]


Epoch 3 - Loss: 0.30


Training Epoch 4: 100%|██████████| 7155/7155 [00:29<00:00, 246.02it/s]


Epoch 4 - Loss: 0.27


Training Epoch 5: 100%|██████████| 7155/7155 [00:27<00:00, 259.13it/s]


Epoch 5 - Loss: 0.23


Testing: 100%|██████████| 49057/49057 [00:31<00:00, 1569.57it/s]

Acc: 0.8380





In [8]:
class CNN(nn.Module):
    def __init__(self, vocab_size, embedding_dim, filter_size, num_filter, num_class) -> None:
        super(CNN, self).__init__()
        # self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.embedding = nn.Embedding.from_pretrained(embedding_matrix, freeze=False)
        self.conv1d = nn.Conv1d(embedding_dim, num_filter, filter_size, padding=1)  # padding=1 表示在卷积操作之前，将序列的前后各补充1个输入
        self.activate = F.relu
        self.linear = nn.Linear(num_filter, num_class)

    
    def forward(self, inputs):
        embedding = self.embedding(inputs)
        convolution = self.activate(self.conv1d(embedding.permute(0, 2, 1)))
        pooling = F.max_pool1d(convolution, kernel_size=convolution.shape[2])
        outputs = self.linear(pooling.squeeze(dim=2))
        return outputs
    
from torch.nn.utils.rnn import pad_sequence

def collate_fn_cnn(batch):
    inputs = [torch.tensor(b[0]) for b in batch]
    targets = torch.tensor([b[1] for b in batch], dtype=torch.long)

    # 对批次内的样本补齐，使其具有相同的长度
    inputs = pad_sequence(inputs, batch_first=True)
    return inputs, targets

train_data_loader_cnn = DataLoader(train_dataset, batch_size=batch_size, collate_fn=collate_fn_cnn, shuffle=True)
test_data_loader_cnn = DataLoader(test_dataset, batch_size=1, collate_fn=collate_fn_cnn, shuffle=False)

# CNN 相关参数
filter_size = 3
num_filter = 100

# 加载模型
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
cnn = CNN(len(vocab), embedding_dim, filter_size, num_filter, num_class)
cnn.to(device)

CNN(
  (embedding): Embedding(33152, 200)
  (conv1d): Conv1d(200, 100, kernel_size=(3,), stride=(1,), padding=(1,))
  (linear): Linear(in_features=100, out_features=3, bias=True)
)

In [None]:
# 训练过程
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(cnn.parameters(), lr=0.001)

cnn.train()
for epoch in range(num_epoch):
    total_loss = 0
    for batch in tqdm(train_data_loader_cnn, desc=f"Training Epoch {epoch+1}"):
        inputs, targets = [x.to(device) for x in batch]

        logits = cnn(inputs)
        loss = criterion(logits, targets)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
            
        total_loss += loss.item()
    avg_loss = total_loss / len(train_data_loader_cnn)
    print(f"Epoch {epoch+1} - Loss: {avg_loss:.4f}")

# 测试过程
acc = 0
for batch in tqdm(test_data_loader_cnn, desc="Testing"):
    inputs, targets = [x.to(device) for x in batch]
    with torch.no_grad():
        output = cnn(inputs)
        acc += (output.argmax(dim=1) == targets).sum().item()
    
# 输出在测试集上的准确率
print(f"Acc: {acc / len(test_data_loader_cnn):.4f}")

Training Epoch 1: 100%|██████████| 7155/7155 [00:48<00:00, 147.86it/s]


Epoch 1 - Loss: 2544.79


Training Epoch 2: 100%|██████████| 7155/7155 [00:48<00:00, 147.31it/s]


Epoch 2 - Loss: 1924.34


Training Epoch 3: 100%|██████████| 7155/7155 [00:58<00:00, 123.03it/s]


Epoch 3 - Loss: 1429.26


Training Epoch 4: 100%|██████████| 7155/7155 [00:47<00:00, 151.79it/s]


Epoch 4 - Loss: 967.49


Training Epoch 5: 100%|██████████| 7155/7155 [00:46<00:00, 152.83it/s]


Epoch 5 - Loss: 630.07


Testing: 100%|██████████| 49057/49057 [01:04<00:00, 761.74it/s]

Acc: 0.8324





In [10]:
from torch.nn.utils.rnn import pad_sequence

def collate_fn_lstm(batch):
    lengths = torch.tensor([len(b[0]) for b in batch], dtype=torch.long)
    inputs = [torch.tensor(b[0]) for b in batch]
    targets = torch.tensor([b[1] for b in batch], dtype=torch.long)

    # 使用 pad_sequence 函数对输入序列进行填充
    inputs = pad_sequence(inputs, batch_first=True)

    return inputs, lengths, targets

from torch.nn.utils.rnn import pack_padded_sequence

class LSTM(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, num_class):
        super(LSTM, self).__init__()
        # self.embeddings = nn.Embedding(vocab_size, embedding_dim)
        self.embeddings = nn.Embedding.from_pretrained(embedding_matrix, freeze=False)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True)
        self.output = nn.Linear(hidden_dim, num_class)

    def forward(self, inputs, lengths):
        embedding = self.embeddings(inputs)

        # 使用 pack_padded_sequence 函数对嵌入序列进行打包
        x_pack = pack_padded_sequence(embedding, lengths, batch_first=True, enforce_sorted=False)
        hidden, (hn, cn) = self.lstm(x_pack)
        outputs = self.output(hn[-1])
        return outputs
    

train_data_loader_lstm = DataLoader(train_dataset, batch_size=batch_size, collate_fn=collate_fn_lstm, shuffle=True)
test_data_loader_lstm = DataLoader(test_dataset, batch_size=1, collate_fn=collate_fn_lstm, shuffle=False)

# 加载模型
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
lstm = LSTM(len(vocab), embedding_dim, hidden_dim, num_class)
lstm.to(device)

LSTM(
  (embeddings): Embedding(33152, 200)
  (lstm): LSTM(200, 256, batch_first=True)
  (output): Linear(in_features=256, out_features=3, bias=True)
)

In [None]:
# 训练过程
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(lstm.parameters(), lr=0.001)

lstm.train()
for epoch in range(num_epoch):
    total_loss = 0
    for batch in tqdm(train_data_loader_lstm, desc=f"Training Epoch {epoch+1}"):
        # inputs, lengths, targets = [x.to(device) for x in batch]
        inputs, lengths, targets = batch
        inputs = inputs.to(device)
        targets = targets.to(device)

        logits = lstm(inputs, lengths)
        loss = criterion(logits, targets)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
            
        total_loss += loss.item()
    avg_loss = total_loss / len(train_data_loader_lstm)
    print(f"Epoch {epoch+1} - Loss: {avg_loss:.4f}")


# 测试过程
acc = 0
for batch in tqdm(test_data_loader_lstm, desc="Testing"):
    # inputs, targets = [x.to(device) for x in batch]
    inputs, lengths, targets = batch
    inputs = inputs.to(device)
    targets = targets.to(device)
    with torch.no_grad():
        output = lstm(inputs, lengths)
        acc += (output.argmax(dim=1) == targets).sum().item()
    
# 输出在测试集上的准确率
print(f"Acc: {acc / len(test_data_loader_lstm):.4f}")

Training Epoch 1: 100%|██████████| 7155/7155 [01:02<00:00, 114.59it/s]


Epoch 1 - Loss: 2555.56


Training Epoch 2: 100%|██████████| 7155/7155 [01:02<00:00, 114.65it/s]


Epoch 2 - Loss: 1943.55


Training Epoch 3: 100%|██████████| 7155/7155 [01:02<00:00, 114.59it/s]


Epoch 3 - Loss: 1538.01


Training Epoch 4: 100%|██████████| 7155/7155 [01:03<00:00, 111.84it/s]


Epoch 4 - Loss: 1140.84


Training Epoch 5: 100%|██████████| 7155/7155 [01:02<00:00, 114.35it/s]


Epoch 5 - Loss: 790.59


Testing: 100%|██████████| 49057/49057 [01:47<00:00, 457.08it/s]

Acc: 0.8456





In [12]:
def length_to_mask(lengths):
    max_len = torch.max(lengths)
    mask = torch.arange(max_len, device=lengths.device).expand(lengths.shape[0], max_len) < lengths.unsqueeze(1)
    return mask

import math

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=512):
        super(PositionalEncoding, self).__init__()

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0)) / d_model)
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return x

class Transformer(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, num_class, dim_feedforward=512, num_head=2, num_layers=2, dropout=0.1, max_len=128, activation: str = "relu"):
        super(Transformer, self).__init__()
        self.embedding_dim = embedding_dim
        # self.embeddings = nn.Embedding(vocab_size, embedding_dim)
        self.embeddings = nn.Embedding.from_pretrained(embedding_matrix, freeze=False)
        self.position_embedding = PositionalEncoding(embedding_dim, dropout, max_len)  # 位置编码

        encoder_layer = nn.TransformerEncoderLayer(embedding_dim, num_head, dim_feedforward, dropout, activation)
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers)

        self.output = nn.Linear(embedding_dim, num_class)

    def forward(self, inputs, lengths):
        inputs = torch.transpose(inputs, 0, 1)

        hidden_states = self.embeddings(inputs)
        hidden_states = self.position_embedding(hidden_states)

        lengths = lengths.to(inputs.device)
        attention_mask = length_to_mask(lengths) == False

        # 根据批次中每个序列长度生成 Mask 矩阵
        hidden_states = self.transformer(hidden_states, src_key_padding_mask=attention_mask)
        hidden_states = hidden_states[0, :, :]

        # 去第一个词元的输出结果作为分类层的输入
        outputs = self.output(hidden_states)
        return outputs
    
collate_fn_trans = collate_fn_lstm

train_data_loader_trans = DataLoader(train_dataset, batch_size=batch_size, collate_fn=collate_fn_trans, shuffle=True)
test_data_loader_trans = DataLoader(test_dataset, batch_size=1, collate_fn=collate_fn_trans, shuffle=False)

# 加载模型
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
transformer = Transformer(len(vocab), embedding_dim, embedding_dim, num_class)
transformer.to(device)





Transformer(
  (embeddings): Embedding(33152, 200)
  (position_embedding): PositionalEncoding()
  (transformer): TransformerEncoder(
    (layers): ModuleList(
      (0-1): 2 x TransformerEncoderLayer(
        (self_attn): MultiheadAttention(
          (out_proj): NonDynamicallyQuantizableLinear(in_features=200, out_features=200, bias=True)
        )
        (linear1): Linear(in_features=200, out_features=512, bias=True)
        (dropout): Dropout(p=0.1, inplace=False)
        (linear2): Linear(in_features=512, out_features=200, bias=True)
        (norm1): LayerNorm((200,), eps=1e-05, elementwise_affine=True)
        (norm2): LayerNorm((200,), eps=1e-05, elementwise_affine=True)
        (dropout1): Dropout(p=0.1, inplace=False)
        (dropout2): Dropout(p=0.1, inplace=False)
      )
    )
  )
  (output): Linear(in_features=200, out_features=3, bias=True)
)

In [None]:

# 训练过程
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(transformer.parameters(), lr=0.001)

transformer.train()
for epoch in range(num_epoch):
    total_loss = 0
    for batch in tqdm(train_data_loader_trans, desc=f"Training Epoch {epoch+1}"):
        # inputs, lengths, targets = [x.to(device) for x in batch]
        inputs, lengths, targets = batch
        inputs = inputs.to(device)
        targets = targets.to(device)

        logits = transformer(inputs, lengths)
        loss = criterion(logits, targets)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
            
        total_loss += loss.item()
    avg_loss = total_loss / len(train_data_loader_trans)
    print(f"Epoch {epoch+1} - Loss: {avg_loss:.4f}")

# 测试过程
acc = 0
for batch in tqdm(test_data_loader_trans, desc="Testing"):
    # inputs, targets = [x.to(device) for x in batch]
    inputs, lengths, targets = batch
    inputs = inputs.to(device)
    targets = targets.to(device)
    with torch.no_grad():
        output = lstm(inputs, lengths)
        acc += (output.argmax(dim=1) == targets).sum().item()
    
# 输出在测试集上的准确率
print(f"Acc: {acc / len(test_data_loader_lstm):.4f}")

Training Epoch 1: 100%|██████████| 7155/7155 [01:23<00:00, 85.26it/s]


Epoch 1 - Loss: 3238.37


Training Epoch 2: 100%|██████████| 7155/7155 [02:02<00:00, 58.48it/s]


Epoch 2 - Loss: 2909.92


Training Epoch 3:   1%|▏         | 105/7155 [00:02<02:59, 39.36it/s]


KeyboardInterrupt: 