In [None]:
# 导入必须包
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
from openai import OpenAI

In [None]:
# 导入分析数据
data = pd.read_csv(
    r"../data/Sentiment Analysis Dataset.csv",
    encoding="UTF-8",
    on_bad_lines="skip",
    nrows=1000,
)

data.head()

Unnamed: 0,ItemID,Sentiment,SentimentSource,SentimentText
0,1,0,Sentiment140,is so sad for my APL frie...
1,2,0,Sentiment140,I missed the New Moon trail...
2,3,1,Sentiment140,omg its already 7:30 :O
3,4,0,Sentiment140,.. Omgaga. Im sooo im gunna CRy. I'...
4,5,0,Sentiment140,i think mi bf is cheating on me!!! ...


In [12]:
# 提取评论信息
sentimentText = data["SentimentText"]
sentimentText.head()

0                         is so sad for my APL frie...
1                       I missed the New Moon trail...
2                              omg its already 7:30 :O
3              .. Omgaga. Im sooo  im gunna CRy. I'...
4             i think mi bf is cheating on me!!!   ...
Name: SentimentText, dtype: object

In [13]:
# 获取真实的标签
labels = torch.tensor(data["Sentiment"])
print(labels.shape)

torch.Size([1000])


In [14]:
# 清理数据
full_data = sentimentText.to_list()
cleaned_data = []

for sentence in full_data:
    sentence = str(sentence)
    # 去除双引号与单引号
    sentence = sentence.replace("'", "")
    sentence = sentence.replace('"', "")
    # 去除空白
    sentence = sentence.strip()
    cleaned_data.append(sentence)

type(cleaned_data[0])

str

In [None]:
# 获取评论的词嵌入数据
def get_embeddingData(cleaned_data, dimension=256):
    client = OpenAI()
    batch_size = 100
    # 使用循环提取向量化后数据
    embedded_text = []

    for i in range(len(cleaned_data) // batch_size):
        batch = cleaned_data[i * batch_size : (i + 1) * batch_size]
        try:
            full_data = client.embeddings.create(
                model="text-embedding-3-small",
                input=batch,
                dimensions=dimension,
                encoding_format="float",
            )

            # 提取嵌入向量
            count = 0
            while count < batch_size:
                embedded_text.append(full_data.data[count].embedding)
                count += 1

        except Exception as e:
            print(f"处理第{i}批的时出错:{str(e)}")
            break
    return embedded_text


# 查看数据维度
embedded_text = get_embeddingData(cleaned_data=cleaned_data)

In [None]:
# 保存数据供以后使用

# 转换为numpy数组并保存
embedded_array = np.array(embedded_text)
np.save("embedded_vectors.npy", embedded_array)

# 之后可以这样加载
# loaded_vectors = np.load('embedded_vectors.npy')

In [6]:
embedded_array = np.load("embedded_vectors.npy");

In [7]:
# 检查数据维度
print(embedded_array.shape)

# 转化为tensor
input_data = torch.tensor(embedded_array)
input_data = input_data.to(torch.float32)
print(type(input_data))

(1000, 256)
<class 'torch.Tensor'>


In [None]:
class advancedLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layer, drop_out=0.5):
        super().__init__()
        # LSTM层
        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layer,
            batch_first=True,
            dropout=drop_out if num_layer > 1 else 0,  # 作用于层与层之间的连接
        )

        # 批标准化层
        self.batch_norm = nn.BatchNorm1d(hidden_size)

        # Dropout层 - 使得输出不过多依赖某一个参数
        self.dropout = nn.Dropout(p=0.3)

        # 分类层
        self.classifier = nn.Linear(hidden_size, output_size)

    def forward(self, origin_data):
        # 调用LSTM层
        result, (h_n, c_n) = self.lstm(
            origin_data
        )  # h_n的形状是: (num_layer, batch_size, hidden_size)
        last_hidden = h_n[-1]

        # 批归一化
        x = self.batch_norm(last_hidden)

        # Droupout层
        x = self.dropout(x)

        # 分类层
        x = self.classifier(x)

        return F.softmax(x, dim=1)


input_size = 256
hidden_size = 512
output_size = 2
num_layer = 3

# 调用模型获得结果
lstm = advancedLSTM(
    input_size=input_size,
    hidden_size=hidden_size,
    output_size=output_size,
    num_layer=num_layer,
)

In [22]:
from sklearn.model_selection import train_test_split

# 分割数据集
X_train, X_test, y_train, y_test = train_test_split(
    input_data, labels, test_size=0.2, random_state=42
)

# 将维度升高一维便于LSTM处理
X_train = X_train.unsqueeze(1)
X_test = X_test.unsqueeze(1)

# 查看数据维度
print(X_test.shape)  # (num, seq_len, input_size)

# 检查输入维度
print("X_train shape:", X_train.shape)
print("y_train shape:", y_train.shape)

torch.Size([200, 1, 256])
X_train shape: torch.Size([800, 1, 256])
y_train shape: torch.Size([800])


In [23]:
# 定义训练参数
n_epochs = 50
learing_rate = 0.001

# 定义损失函数于优化器
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(lstm.parameters(), lr=learing_rate)

In [24]:
# 循环训练
for epoch in range(n_epochs):
    lstm.train()  # 转化为训练模式

    # 前向传播
    outputs = lstm(X_train)
    loss = criterion(outputs, y_train)

    # 反向传播和优化
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    # 每五个训练次数打印一次准确率
    if (epoch + 1) % 5 == 0:
        print(f"Epoch [{epoch + 1}/{n_epochs}], Loss: {loss.item():.4f}")

        # 评估模式
        lstm.eval()
        with torch.no_grad():
            test_outputs = lstm(X_test)
            _, predicted = torch.max(test_outputs.data, 1)
            accuracy = (predicted == y_test).sum().item() / len(y_test)
            print(f"Test Accuracy: {accuracy:.2%}")

Epoch [5/50], Loss: 0.6034
Test Accuracy: 68.50%
Epoch [10/50], Loss: 0.5120
Test Accuracy: 68.50%
Epoch [15/50], Loss: 0.4817
Test Accuracy: 68.50%
Epoch [20/50], Loss: 0.4732
Test Accuracy: 69.00%
Epoch [25/50], Loss: 0.4571
Test Accuracy: 72.00%
Epoch [30/50], Loss: 0.4427
Test Accuracy: 75.00%
Epoch [35/50], Loss: 0.4359
Test Accuracy: 80.00%
Epoch [40/50], Loss: 0.4306
Test Accuracy: 80.50%
Epoch [45/50], Loss: 0.4285
Test Accuracy: 83.00%
Epoch [50/50], Loss: 0.4153
Test Accuracy: 83.00%


In [26]:
lstm.eval()
with torch.no_grad():
    # 测试集预测
    test_outputs = lstm(X_test)
    _, predicted = torch.max(
        test_outputs.data, 1
    )  # 找到最大值与最大值的索引位置(argmax函数)

    # 计算准确率
    total = y_test.size(0)
    correct = (
        (predicted == y_test).sum().item()
    )  # 使用item()转化为python的数据类型，减少内存

    print(f"\n最终测试集准确率: {100 * correct / total:.2f}%")


最终测试集准确率: 83.00%


### 形状问题

- output 显示的是最后一层 LSTM 中每一个 batch 的每一个时间步的输出；
- h_n 显示的是 LSTM 中全部层数中每一个 batch 最后一个时间步的输出；
- c_n 显示的是 LSTM 中全部层数中每一个 batch 最后一个时间步的 cell state；


In [28]:
# 形状确定
# * 确定LSTM在多层情况下的返回值
output = lstm.lstm(X_test)

print(
    output[0].shape, end="\n" + "*" * 10 + "\n"
)  # (batch_size, seq_len, hidden_size))

print(
    output[1][0].shape, end="\n" + "*" * 10 + "\n"
)  # (num_layer, batch_size, hidden_size)

print(output[1][1].shape)  # (num_layer, batch_size, hidden_size)

torch.Size([200, 1, 512])
**********
torch.Size([3, 200, 512])
**********
torch.Size([3, 200, 512])


### 完整封装


In [None]:
import numpy as np
import torch
import torch.nn as nn
import tqdm
from sklearn.model_selection import train_test_split


class SentimentAnalyzer:
    def __init__(self, input_size=1536, hidden_size=512, output_size=2, num_layers=3):
        """初始化情感分析器"""
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.num_layers = num_layers
        self.model = None
        self.client = OpenAI()

    def _init_model(self):
        """初始化LSTM模型"""
        self.model = advancedLSTM(
            input_size=self.input_size,
            hidden_size=self.hidden_size,
            output_size=self.output_size,
            num_layer=self.num_layers,
        )

    def vectorize_data(self, cleaned_data, dimension=1536, batch_size=100):
        """数据向量化处理"""
        embedded_text = []

        for i in range(len(cleaned_data) // batch_size):
            batch = cleaned_data[i * batch_size : (i + 1) * batch_size]
            try:
                full_data = self.client.embeddings.create(
                    model="text-embedding-3-small",
                    input=batch,
                    dimensions=dimension,
                    encoding_format="float",
                )

                for count in range(batch_size):
                    embedded_text.append(full_data.data[count].embedding)

            except Exception as e:
                print(f"处理第{i}批时出错:{str(e)}")
                break

        return torch.tensor(np.array(embedded_text), dtype=torch.float32)

    def train(
        self, input_data, labels, n_epochs=50, learning_rate=0.001, test_size=0.2
    ):
        """训练模型"""
        # 初始化模型
        if self.model is None:
            self._init_model()

        # 数据集分割
        X_train, X_test, y_train, y_test = train_test_split(
            input_data, labels, test_size=test_size, random_state=42
        )

        # 添加序列维度
        X_train = X_train.unsqueeze(1)
        X_test = X_test.unsqueeze(1)

        # 定义损失函数和优化器
        criterion = nn.CrossEntropyLoss()
        optimizer = torch.optim.Adam(self.model.parameters(), lr=learning_rate)

        # 训练循环
        for epoch in tqdm(range(n_epochs), desc="Training"):
            self.model.train()

            outputs = self.model(X_train)
            loss = criterion(outputs, y_train)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            if (epoch + 1) % 5 == 0:
                print(f"Epoch [{epoch + 1}/{n_epochs}], Loss: {loss.item():.4f}")

                # 评估
                accuracy = self.evaluate(X_test, y_test)
                print(f"Test Accuracy: {accuracy:.2%}")

        return self.evaluate(X_test, y_test)

    def evaluate(self, X_test, y_test):
        """评估模型"""
        self.model.eval()
        with torch.no_grad():
            test_outputs = self.model(X_test)
            _, predicted = torch.max(test_outputs.data, 1)
            accuracy = (predicted == y_test).sum().item() / len(y_test)
        return accuracy

    def predict(self, text):
        """预测单个文本的情感"""
        if self.model is None:
            raise ValueError("模型未训练，请先训练模型")
        # 添加数据类型检查
        if isinstance(text, str):
            text = [text]
        elif not isinstance(text, list):
            raise ValueError("输入必须是字符串或字符串列表")

        # 获取文本嵌入
        embedding = self.vectorize_data(text, len(text))
        embedding = embedding.unsqueeze(1)

        self.model.eval()
        with torch.no_grad():
            output = self.model(embedding)
            _, predicted = torch.max(output.data, 1)

        result = [
            "积极消息" if single_result == 1 else "消极消息"
            for single_result in predicted
        ]
        return result[0] if isinstance(text, str) else result

    def save_model(self, path):
        """保存模型"""
        if self.model is not None:
            torch.save(self.model.state_dict(), path)
            print(f"模型已保存至: {path}")
        else:
            print("没有可保存的模型")

    def load_model(self, path):
        """加载模型"""
        self._init_model()
        self.model.load_state_dict(torch.load(path))
        print(f"模型已从 {path} 加载")

In [30]:
# 创建情感分析器实例
analyzer = SentimentAnalyzer()

# 数据预处理和向量化
vectors = analyzer.vectorize_data(cleaned_data)

# 训练模型
final_accuracy = analyzer.train(vectors, labels)
print(f"最终准确率: {final_accuracy:.2%}")

# 预测新文本
text = ["I love this movie!", "I hate that bitch", "Shit!"]
sentiment = analyzer.predict(text)
print(f"文本情感: {sentiment}")

RuntimeError: input.size(-1) must be equal to input_size. Expected 256, got 1536

In [None]:
def insertion_sort(arr):
    # 遍历数组中的每一个元素
    for i in range(1, len(arr)):
        key = arr[i]
        j = i - 1
        # 将当前元素插入到已排序部分的正确位置
        while j >= 0 and key < arr[j]:
            arr[j + 1] = arr[j]
            j -= 1
        arr[j + 1] = key
    return arr

# 测试插入排序算法
sample_array = [12, 11, 13, 5, 6]
sorted_array = insertion_sort(sample_array)
print("Sorted array is:", sorted_array)

排序后的数组: [5, 6, 11, 12, 13]
