In [111]:
import torch, tiktoken
import pandas as pd
from torch.utils.data import Dataset, DataLoader


class reviewsDataset(Dataset):
    def __init__(self, data_path, encoder, max_length=48, is_train=True):
        self.phrase_ids = []
        self.sentence_ids = []
        self.phrases = []
        self.sentiments = []
        self.enc = encoder
        self.max_length = max_length
        self.is_train = is_train
        self.load_data(data_path)
        self.tokenized_phrases = self.tokenize_phrases()

    def load_data(self, data_path):
        df = pd.read_csv(data_path, sep="\t")
        # 将每一列保存到一个独立的变量中
        self.phrase_ids = df["PhraseId"].values
        self.sentence_ids = df["SentenceId"].values
        self.phrases = df["Phrase"].values
        if self.is_train:
            # 训练数据集包含情感标签
            self.sentiments = df["Sentiment"].values
        else:
            self.sentiments = [0] * len(df)

    def tokenize_phrases(self):
        # 对每个句子进行编码，并进行填充或截断以确保长度一致
        tokenized_phrases = []
        for phrase in self.phrases:
            if not isinstance(phrase, str):
                print(f"Warning: Non-string value found: {phrase}")
                # 如果不是字符串，可以跳过或者进行默认处理
                tokens = self.enc.encode("no string")
            else:
                tokens = self.enc.encode(phrase)
            # 截断
            if len(tokens) > self.max_length:
                tokens = tokens[: self.max_length]
            # 填充
            elif len(tokens) < self.max_length:
                tokens += [50256] * (
                    self.max_length - len(tokens)
                )  # 使用 gpt2 的填充 token
            tokenized_phrases.append(tokens)
        return tokenized_phrases

    def __len__(self):
        return len(self.phrase_ids)

    def __getitem__(self, index):
        return torch.tensor(self.tokenized_phrases[index]), torch.tensor(
            self.sentiments[index]
        )


enc = tiktoken.get_encoding("gpt2")
dataset = reviewsDataset("train.tsv", enc)
data_loader = DataLoader(dataset, batch_size=2, shuffle=True)
print(next(iter(data_loader)))

[tensor([[ 1659,  3700, 11325, 50256, 50256, 50256, 50256, 50256, 50256, 50256,
         50256, 50256, 50256, 50256, 50256, 50256, 50256, 50256, 50256, 50256,
         50256, 50256, 50256, 50256, 50256, 50256, 50256, 50256, 50256, 50256,
         50256, 50256, 50256, 50256, 50256, 50256, 50256, 50256, 50256, 50256,
         50256, 50256, 50256, 50256, 50256, 50256, 50256, 50256],
        [11407, 21002, 13289,   422,   607,  3350,   290, 50256, 50256, 50256,
         50256, 50256, 50256, 50256, 50256, 50256, 50256, 50256, 50256, 50256,
         50256, 50256, 50256, 50256, 50256, 50256, 50256, 50256, 50256, 50256,
         50256, 50256, 50256, 50256, 50256, 50256, 50256, 50256, 50256, 50256,
         50256, 50256, 50256, 50256, 50256, 50256, 50256, 50256]]), tensor([2, 4])]


In [112]:
import torch
import torch.nn as nn


class LSTM(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, num_classes):
        super(LSTM, self).__init__()
        # 嵌入层
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        # LSTM层
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True,num_layers=2)
        # 全连接层
        self.linear = nn.Linear(hidden_dim, num_classes)

    def forward(self, input_sentences):
        # input_sentences形状: (batch_size, seq_len)
        
        # 嵌入层
        embedded = self.embedding(input_sentences)  # embedded形状: (batch_size, seq_len, embedding_dim)
        
        # LSTM层
        lstm_out, _ = self.lstm(embedded)  # lstm_out形状: (batch_size, seq_len, hidden_dim)
        
        # 取最后一个时间步的输出
        final_hidden_state = lstm_out[:, -1, :]  # 形状: (batch_size, hidden_dim)
        
        # 全连接层
        logits = self.linear(final_hidden_state)  # 形状: (batch_size, num_classes)
        
        return logits
    
    
class BiLSTM(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, num_classes):
        super(BiLSTM, self).__init__()
        # 嵌入层
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        # 双向LSTM层
        self.lstm = nn.LSTM(
            input_size=embedding_dim,
            hidden_size=hidden_dim,
            num_layers=1,
            batch_first=True,
            bidirectional=True  # 设置为双向LSTM
        )
        # 全连接层，注意这里输入维度是hidden_dim * 2，因为双向LSTM会输出两个方向的隐藏状态
        self.linear = nn.Linear(hidden_dim * 2, num_classes)

    def forward(self, input_sentences):
        # input_sentences形状: (batch_size, seq_len)
        
        # 嵌入层
        embedded = self.embedding(input_sentences)  # embedded形状: (batch_size, seq_len, embedding_dim)
        
        # 双向LSTM层
        lstm_out, _ = self.lstm(embedded)  # lstm_out形状: (batch_size, seq_len, hidden_dim * 2)
        
        # 取最后一个时间步的输出，对于双向LSTM，最后一个时间步的输出已经包含了正向和反向的信息
        final_hidden_state = lstm_out[:, -1, :]  # 形状: (batch_size, hidden_dim * 2)
        
        # 全连接层
        logits = self.linear(final_hidden_state)  # 形状: (batch_size, num_classes)
        
        return logits

In [113]:
def train(
    vocab_size, embedding_dim, hidden_dim, num_classes, data_loader, lr=0.001, epochs=5
):
    model = LSTM(vocab_size, embedding_dim, hidden_dim, num_classes)
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.AdamW(model.parameters(), lr=lr)
    model.train()

    for epoch in range(epochs):
        correct = 0
        total = 0
        for i, (inputs, targets) in enumerate(data_loader):
            optimizer.zero_grad()
            outputs = model(inputs)  # outputs 是 logits
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()

            # 计算正确率
            _, predicted = torch.max(outputs.data, 1)
            total += targets.size(0)
            correct += (predicted == targets).sum().item()

            if i % 100 == 0:
                accuracy = 100 * correct / total
                print(
                    f"Epoch {epoch}, iter {i}, loss: {loss.item():.4f}, accuracy: {accuracy:.2f}%"
                )
    return model

In [114]:
# 训练
import tiktoken

vocab_size = 50257  # GPT-2 的词汇表大小
embedding_dim = 64
hidden_dim = 64
num_classes = 5  # 假设有5个情感类别
enc = tiktoken.get_encoding("gpt2")
dataset = reviewsDataset("train.tsv", enc)
data_loader = DataLoader(dataset, batch_size=128, shuffle=True)

model = train(
    vocab_size, embedding_dim, hidden_dim, num_classes, data_loader, lr=0.0001, epochs=10
)

Epoch 0, iter 0, loss: 1.6196, accuracy: 26.56%
Epoch 0, iter 100, loss: 1.2696, accuracy: 45.75%
Epoch 0, iter 200, loss: 1.2619, accuracy: 48.13%
Epoch 0, iter 300, loss: 1.3358, accuracy: 49.22%
Epoch 0, iter 400, loss: 1.2954, accuracy: 49.70%
Epoch 0, iter 500, loss: 1.3903, accuracy: 49.85%
Epoch 0, iter 600, loss: 1.2897, accuracy: 49.98%
Epoch 0, iter 700, loss: 1.2859, accuracy: 50.17%
Epoch 0, iter 800, loss: 1.2557, accuracy: 50.29%
Epoch 0, iter 900, loss: 1.2805, accuracy: 50.35%
Epoch 0, iter 1000, loss: 1.1612, accuracy: 50.51%
Epoch 0, iter 1100, loss: 1.0929, accuracy: 50.60%
Epoch 0, iter 1200, loss: 1.1827, accuracy: 50.67%
Epoch 1, iter 0, loss: 1.1969, accuracy: 51.56%
Epoch 1, iter 100, loss: 1.1869, accuracy: 51.82%
Epoch 1, iter 200, loss: 1.2884, accuracy: 51.65%
Epoch 1, iter 300, loss: 1.2856, accuracy: 51.65%
Epoch 1, iter 400, loss: 1.2850, accuracy: 51.65%
Epoch 1, iter 500, loss: 1.2816, accuracy: 51.71%
Epoch 1, iter 600, loss: 1.1959, accuracy: 51.68%
E

In [None]:
import pandas as pd
import torch
from torch.utils.data import DataLoader
import tiktoken

def predict(model, test_path):
    # 切换模型到评估模式
    model.eval()
    
    # 获取编码器
    enc = tiktoken.get_encoding("gpt2")
    
    # 加载测试数据集
    test_dataset = reviewsDataset(test_path, enc, is_train=False)
    
    # 创建数据加载器
    test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)
    
    # 用于存储预测结果
    predictions = []
    
    # 禁用梯度计算以提高效率
    with torch.no_grad():
        for inputs, _ in test_loader:
            # 前向传播
            outputs = model(inputs)
            # 获取预测结果
            _, predicted = torch.max(outputs.data, 1)
            # 将预测结果添加到列表中
            predictions.extend(predicted.cpu().numpy())
    
    return predictions

# 使用示例
test_path = "test.tsv"
output_path = "test_with_sentiment.csv"

# 在 predict 函数外打开并读取原始测试数据
test_df = pd.read_csv(test_path, sep='\t')  # 假设测试数据是 TSV 格式

# 确保只保留 'PhraseId' 和 'Sentiment' 两列
# 如果原始数据中没有 'Sentiment' 列，可以直接创建一个空列
if 'Sentiment' not in test_df.columns:
    test_df['Sentiment'] = None

# 获取预测结果
predictions = predict(model, test_path)

# 将预测结果添加到 'Sentiment' 列中
test_df['Sentiment'] = predictions

# 只保留 'PhraseId' 和 'Sentiment' 两列
result_df = test_df[['PhraseId', 'Sentiment']]

# 将结果保存到新的文件中
result_df.to_csv(output_path, index=False)

print(f"预测结果已保存到 {output_path}")

预测结果已保存到 test_with_sentiment.tsv
