In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torch.nn.utils.rnn import pad_sequence
from torchvision import datasets, transforms
import torch.nn.functional as F
import jieba
from snownlp import SnowNLP, sentiment
from collections import defaultdict

from tqdm import tqdm
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import csv
from IPython.display import display
# from transformers import AutoTokenizer, AutoModel, AutoConfig, DataCollatorWithPadding
# from datasets import load_dataset, ClassLabel, Sequence, Value

In [2]:
print(torch.cuda.is_available())

True


#### 读取CSV文件并展示部分数据集

In [3]:
origin_train = '../../data/sentiment/data_single.csv'
processed_train = '../../data/sentiment/processed_Bi-LSTM_train.csv'
origin_test = '../../data/sentiment/ChnSentiCorp_htl_all.csv'
processed_test = '../../data/sentiment/processed_Bi-LSTM_test.csv'

In [4]:
with open(origin_train, 'rb') as file:
    line_number = 0
    while True:
        line = file.readline()
        if not line:
            break  # 如果文件结束，则退出循环
        try:
            line_number += 1
            decoded_line = line.decode('utf-8')
        except UnicodeDecodeError as e:
            # 打印出引发错误的行号和内容
            print(f"Error at line {line_number}:")
            print(f"Original bytes: {line}")
            print(f"Error message: {e}")
            # 获取问题位置的内容
            problematic_byte = line[e.start:e.end]
            print(f"Problematic byte: {problematic_byte}")
            break

In [5]:
data = pd.read_csv(origin_train)

# print(data)
# 显示前几行数据
display(data.head())

Unnamed: 0,review,label
0,用了一段时间，感觉还不错，可以,正面
1,电视非常好，已经是家里的第二台了。第一天下单，第二天就到本地了，可是物流的人说车坏了，一直催...,正面
2,电视比想象中的大好多，画面也很清晰，系统很智能，更多功能还在摸索中,正面
3,不错,正面
4,用了这么多天了，感觉还不错。夏普的牌子还是比较可靠。希望以后比较耐用，现在是考量质量的时候。,正面


#### 数据集处理

##### 训练数据集处理

In [None]:

def process_row_train(row):
    if row[1] == '正面':
        row[1] = '1'
    elif row[1] == '负面':
        row[1] = '0'
    return row

def process_csv_train(input_file, output_file):
    with open(input_file, mode='r', newline='', encoding='utf-8') as infile, \
         open(output_file, mode='w', newline='', encoding='utf-8') as outfile:
        reader = csv.reader(infile) # <class '_csv.reader'>
        writer = csv.writer(outfile)
        first_row = next(reader)
        writer.writerow(first_row)
        for row in reader:
            row_new = process_row_train(row)
            writer.writerow(row_new)
        
input_file = '../../data/sentiment/Bi-LSTM_train.csv'
output_file = processed_train
process_csv_train(input_file, output_file)

##### 测试数据集处理

In [None]:
def process_row_test(row):
    '''
    input: <class 'list'> ['1', 'asdfasdf']
    output: <class 'str'> asdf,1
    '''
    row_new = []
    row_new.append(row[1])
    row_new.append(row[0])
    return row_new

    

def process_csv_test(input_file, output_file):
    with open(input_file, mode='r', newline='', encoding='utf-8') as infile, \
         open(output_file, mode='w', newline='', encoding='utf-8') as outfile:
        reader = csv.reader(infile) # <class '_csv.reader'>
        writer = csv.writer(outfile)
        # print(type(reader)) # <class '_csv.reader'>
        # row = next(reader) # <class 'list'>
        # print(row, type(row)) # ['label', 'review'] <class 'list'>
        # print(row[0], type(row[0])) # label <class'str'>

        for row in reader:
            processed_row = process_row_test(row)
            writer.writerow(processed_row)


input_file = '../../data/sentiment/Bi-LSTM_test.csv'
output_file = processed_test
process_csv_test(input_file, output_file)

##### jieba

In [6]:
def word_segmentation(text):
    seg_list = jieba.cut(text)
    return " ".join(seg_list)
# 示例文本
text = "这部电影太精彩了！我非常不喜欢。"
segmented_text = word_segmentation(text)
print(segmented_text, type(segmented_text))
# for word in segmented_text.split():
#     print(word)

Building prefix dict from the default dictionary ...
Loading model from cache /tmp/jieba.cache
Loading model cost 0.392 seconds.
Prefix dict has been built successfully.


这部 电影 太精彩 了 ！ 我 非常 不 喜欢 。 <class 'str'>


#### 数据预处理

In [12]:
class TextDataset(Dataset):
    def __init__(self, data_path, tokenizer=lambda x: list(jieba.cut(x)), vocab=None):
        """
        初始化
        :param self: 
        :param data_path: 
        :param tokenizer: 可选的分词器函数，默认为将文本按空格分割
        :param vocab: 可选的词汇表字典
        :return: None
        """
        self.tokenizer = tokenizer # 分词器函数
        self.data, self.labels = [], [] # 分词后的文本列表,标签列表
        df = pd.read_csv(data_path)
        for _, row in df.iterrows():
            review, label = str(row['review']), row['label']
            tokenized_review = self.tokenizer(review)
            self.data.append(tokenized_review) # 调用分词器函数
            self.labels.append(int(label))
        # 构建词汇表
        if vocab is None:
            self.vocab, self.word_counts = self.build_vocab(self.data)
        else:
            self.vocab = vocab

    @staticmethod
    def build_vocab(sentences):
        """
        构建词汇表
        :param sentences: 文本列表
        :return: 词汇表字典
        """
        word_counts = {} # 词频字典
        for sentence in sentences: # 遍历每一个句子
            for word in sentence: # 遍历每一个词
                if word not in word_counts: # 如果词不在词频字典中
                    word_counts[word] = 1 
                else: # 如果词在词频字典中
                    word_counts[word] += 1
        # 按词频排序
        sorted_vocab = sorted(word_counts.items(), key=lambda x: x[1], reverse=True)
        vocab = {'<PAD>': 0, '<UNK>': 1} # PAD: padding, UNK: unknown
        for word, _ in sorted_vocab:
            vocab[word] = len(vocab)
        return vocab, word_counts

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        # 文本转为词索引,未知词使用<UNK>的索引
        tokens = [self.vocab.get(token, self.vocab['<UNK>']) for token in self.data[idx]]
        # 张量化的文本和标签
        return torch.tensor(tokens), torch.tensor(self.labels[idx])

def collate_fn(batch):
    """
    对一批次的数据进行处理
    """
    texts, labels = zip(*batch)
    texts = pad_sequence([torch.tensor(t) for t in texts], padding_value=0)
    return texts, torch.stack(labels)

##### 加载数据集

In [13]:
train_dataset = TextDataset('../../data/sentiment/processed_Bi-LSTM_train.csv')
test_dataset = TextDataset('../../data/sentiment/processed_Bi-LSTM_test.csv', vocab=train_dataset.vocab)

In [14]:
print(train_dataset, len(train_dataset))
print(type(train_dataset.vocab), train_dataset.vocab)
print(type(train_dataset.word_counts), train_dataset.word_counts)
print(len(train_dataset.vocab))


<__main__.TextDataset object at 0x7ff3eefca130> 4283
<class 'dict'> {'<PAD>': 0, '<UNK>': 1, '，': 2, '的': 3, '。': 4, '了': 5, '电视': 6, '很': 7, '！': 8, '是': 9, '好': 10, '也': 11, '我': 12, '不错': 13, '就': 14, '买': 15, '还': 16, '不': 17, '都': 18, '没有': 19, '安装': 20, '说': 21, '有': 22, '微鲸': 23, '就是': 24, '看': 25, '可以': 26, '给': 27, '还是': 28, '用': 29, '非常': 30, '客服': 31, '没': 32, '这个': 33, '在': 34, '开机': 35, '问题': 36, '价格': 37, '满意': 38, '清晰': 39, '要': 40, '屏幕': 41, '有点': 42, '京东': 43, '和': 44, '物流': 45, '到': 46, '感觉': 47, '.': 48, '送': 49, '高': 50, '会员': 51, '但是': 52, '送货': 53, '挺': 54, '系统': 55, '质量': 56, '服务': 57, '吧': 58, '这': 59, '快': 60, '性价比': 61, '比较': 62, '自己': 63, '才': 64, '一个': 65, '后': 66, '、': 67, '…': 68, '遥控器': 69, '画面': 70, '师傅': 71, '？': 72, '能': 73, '比': 74, '不是': 75, '售后': 76, '电视机': 77, '速度': 78, '效果': 79, '再': 80, '上': 81, '多': 82, '大': 83, '挂架': 84, '收到': 85, '希望': 86, '什么': 87, '寸': 88, '语音': 89, '：': 90, '知道': 91, '一般': 92, '你': 93, '很快': 94, '来': 95, '品牌': 96, '购买': 97,

In [10]:

def calculate_sentiment_score(segmented_text):
    sentiment_score = 0
    positive_words = ["喜欢", "精彩"]
    negative_words = ["不喜欢", "糟糕"]
    for word in segmented_text.split():
        if word in positive_words:
            sentiment_score += 1
        elif word in negative_words:
            sentiment_score -= 1
    return sentiment_score
# 示例文本的情感得分
sentiment_score = calculate_sentiment_score(segmented_text)
print("情感得分:", sentiment_score)


情感得分: 1


In [44]:
# 访问词汇表
vocab = train_dataset.vocab
# 访问词汇表和词频
vocab, word_counts = train_dataset.build_vocab(train_dataset.data)

# 打印词汇表的大小
vocab_size = len(train_dataset.vocab)
print(f"Vocabulary size: {vocab_size}")

# 打印词汇表的前几项
print("Vocabulary:index")
for word, index in list(vocab.items())[:10]:
    print(f"{word}: {index}")

# 打印词频
print("\nWord:Frequency")
for word, freq in list(word_counts.items())[:10]:
    print(f"{word}: {freq}")

Vocabulary size: 9134
Vocabulary:index
<PAD>: 0
<UNK>: 1
，: 2
的: 3
。: 4
了: 5
电视: 6
很: 7
！: 8
是: 9

Word:Frequency
用: 743
了: 5051
一段时间: 76
，: 21331
感觉: 485
还: 1421
不错: 1684
可以: 810
电视: 2943
非常: 729


#### 构建双向LSTM模型

In [12]:
class BiLSTM(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, n_layers, bidirectional, dropout):
        """
        :param vocab_size: 词表大小
        :param embedding_dim: 词向量维度
        :param hidden_dim: 隐藏层维度
        :param output_dim: 输出维度
        :param n_layers: LSTM层数
        :param bidirectional: 是否双向
        :param dropout: dropout概率
        :return: None
        """
        super(BiLSTM, self).__init__()
        # 词嵌入层,用于将词汇表中的词映射到稠密向量空间
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        # LSTM层,处理序列数据
        self.lstm = nn.LSTM(embedding_dim, 
                            hidden_dim, 
                            num_layers=n_layers, 
                            bidirectional=bidirectional, 
                            dropout=dropout)
        # 全连接层,将LSTM的输出映射到输出空间
        self.fc = nn.Linear(hidden_dim * 2, output_dim)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, text):
        """
        :param text: 输入的文本数据,形状为(seq_len, batch_size)
        :return: 输出的预测值,形状为(batch_size, output_dim)
        """
        embedded = self.dropout(self.embedding(text))
        output, (hidden, cell) = self.lstm(embedded)
        hidden = self.dropout(torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim=1))
        return self.fc(hidden.squeeze(0))


#### 训练函数

In [18]:
def train(model, iterator, optimizer, criterion, device):
    """
    训练一个epoch
    :param model: 网络模型
    :param iterator: 数据集迭代器
    :param optimizer: 优化器
    :param criterion: 损失函数
    :return: 训练损失
    """
    model.train() # 训练模式
    epoch_loss = 0  # 记录训练损失
    for batch in iterator:
        optimizer.zero_grad() # 清空梯度
        predictions = model(batch[0].to(device)).squeeze(1) # 预测值
        loss = criterion(predictions, batch[1].float().to(device)) # 计算损失
        loss.backward() # 反向传播
        optimizer.step() # 更新参数
        epoch_loss += loss.item() # 累加损失
    return epoch_loss / len(iterator) # 平均损失

#### 测试函数

In [19]:
def evaluate(model, iterator, criterion, device):
    model.eval()
    epoch_loss = 0
    correct = 0
    total = 0
    with torch.no_grad():
        for batch in iterator:
            predictions = model(batch[0].to(device)).squeeze(1)
            loss = criterion(predictions, batch[1].float().to(device))
            epoch_loss += loss.item()
            predicted_labels = (predictions > 0).long()
            correct += (predicted_labels == batch[1].to(device)).sum().item()
            total += len(batch[1])
    accuracy = correct / total
    return epoch_loss / len(iterator), accuracy

#### 初始化模型和优化器

In [42]:
# 初始化模型和优化器
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

vocab_size = len(train_dataset.vocab)
embedding_dim = 100
hidden_dim = 256
output_dim = 1
n_layers = 2
bidirectional = True
dropout = 0.5

model = BiLSTM(vocab_size, embedding_dim, hidden_dim, output_dim, n_layers, bidirectional, dropout).to(device)
optimizer = optim.Adam(model.parameters())
criterion = nn.BCEWithLogitsLoss()

train_dataloader = DataLoader(train_dataset, batch_size=64, shuffle=True, collate_fn=collate_fn)
test_dataloader = DataLoader(test_dataset, batch_size=64, shuffle=False, collate_fn=collate_fn)

In [43]:
for epoch in range(10):
    train_loss = train(model, train_dataloader, optimizer, criterion, device)
    test_loss, test_acc = evaluate(model, test_dataloader, criterion, device)
    print(f'Epoch: {epoch+1:02}, Train Loss: {train_loss:.3f}, Test Loss: {test_loss:.3f}, Test Acc: {test_acc*100:.2f}%')

  texts = pad_sequence([torch.tensor(t) for t in texts], padding_value=0)


Epoch: 01, Train Loss: 0.532, Test Loss: 0.974, Test Acc: 56.39%
Epoch: 02, Train Loss: 0.412, Test Loss: 1.491, Test Acc: 51.73%
Epoch: 03, Train Loss: 0.361, Test Loss: 1.815, Test Acc: 41.93%
Epoch: 04, Train Loss: 0.322, Test Loss: 2.198, Test Acc: 38.48%
Epoch: 05, Train Loss: 0.284, Test Loss: 1.625, Test Acc: 45.16%
Epoch: 06, Train Loss: 0.252, Test Loss: 2.617, Test Acc: 41.33%
Epoch: 07, Train Loss: 0.239, Test Loss: 2.289, Test Acc: 42.29%
Epoch: 08, Train Loss: 0.194, Test Loss: 1.436, Test Acc: 55.60%
Epoch: 09, Train Loss: 0.188, Test Loss: 2.694, Test Acc: 43.33%
Epoch: 10, Train Loss: 0.182, Test Loss: 2.744, Test Acc: 41.69%
