本模型的”可炼丹部分：

1. 本文是按照“字”进行分词的，再用jieba、用bert中文试一试。
2. epoch
3. optimizer（adam、SGD）
4. 

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from  torch.utils.data import Dataset,DataLoader
from tqdm import tqdm
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import TensorDataset, random_split

import matplotlib.pyplot as plt
%matplotlib inline

import time

In [2]:
# 读取数据
def read_data():
    with open("data/train.txt", encoding="utf-8") as f:
        train_data = f.read().split("\n")
    texts=[]
    labels=[]
    for data in train_data:
        if data:
            text, label = data.split("\t")
            texts.append(text)
            labels.append(label)
    return texts, labels

In [3]:
# vocab 词表
def build_corpus(texts):
    vocab = {"<PAD>":0,"<UNK>":1}
    for text in texts:
        for word in text:
            vocab[word] = vocab.get(word,len(vocab))
    return vocab

In [4]:
# 将文本转换为索引
def texts_to_tensor(texts, vocab, max_len=None):
    all_ids = []
    for text in texts:
        words = list(text)  # 划分
        ids = [vocab.get(word, vocab["<UNK>"]) for word in words]
        all_ids.append(torch.tensor(ids, dtype=torch.long))

    # 自动 padding（可选 max_len 限制）
    padded_ids = pad_sequence(all_ids, batch_first=True, padding_value=vocab["<PAD>"])
    
    if max_len is not None:
        padded_ids = padded_ids[:, :max_len]  # 截断

    return padded_ids

In [5]:
# 定义DNN
class DNN(nn.Module):
    def __init__(self, vocab_size, embedding_dim):
        '''搭建神经网络'''
        super(DNN, self).__init__() # 我也不知道有啥用,反正不动就行
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.network = nn.Sequential(
            nn.Flatten(),               # 我也把图像铺平成一维吧，38*12这儿应该是。
            nn.Linear(embedding_dim*38, 256), nn.ReLU(),  # 第一个全连接层
            nn.Linear(256, 128), nn.ReLU(),  # 第2个全连接层
            nn.Linear(128, 32), nn.ReLU(),  # 第3个全连接层
            nn.Linear(32, 10)  # 第4个全连接层  # 用one-hot编码~
        )

    def forward(self, x):
        x = self.embedding(x)  # longTensor ->floatTensor
        y = self.network(x)
        return y

In [None]:
# main函数
embedding_dim = 512
max_len = 100
device = 'mps'

texts, labels = read_data()

vocab = build_corpus(texts)

padded_ids = texts_to_tensor(texts, vocab, max_len).to(device)

# embeded_inputs = embedding(padded_ids)  #不要提前embedding,传进去即可

labels = [int(label) for label in labels]  # labels的范围是0-9

# 划分数据集--好像压根没划分hh
labels_tensor = torch.tensor(labels, dtype=torch.long).to(device)  # shape: [180000]
dataset = TensorDataset(padded_ids, labels_tensor)
print(f"labels_tensor:{labels_tensor.dtype}")
print(f"padded_ids:{padded_ids.dtype}")

# 在这儿划分吧
train_len = int(0.8 * len(dataset))
val_len = int(0.1 * len(dataset))
test_len = len(dataset) - train_len - val_len
train_set, val_set, test_set = random_split(dataset, [train_len, val_len, test_len])

train_loader = DataLoader(train_set, batch_size=256, shuffle=True)
val_loader = DataLoader(val_set, batch_size=64)
test_loader = DataLoader(test_set, batch_size=64)


# model
# model = DNN(len(vocab), embedding_dim).to('cpu')
model = DNN(len(vocab), embedding_dim).to(device)

loss_fn = nn.CrossEntropyLoss()  # 自带softmax
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
# optimizer = torch.optim.SGD(model.parameters(), lr=0.01, momentum=0.5)

epochs = 100
losses = []

# 开始训练
for epoch in range(epochs):
    start_time = time.time()
    model.train() # 训练模式
    for batch_idx, (batch_x, batch_y) in enumerate(train_loader):
        pred = model(batch_x)  
        loss = loss_fn(pred, batch_y)
        # print(f"loss:{loss.item()}")
        losses.append(loss.item())
        # print(f"loss:{loss.item()}")
        optimizer.zero_grad()  # 清空梯度
        # loss.backward(retain_graph=True)        # 计算梯度
        loss.backward()   
        optimizer.step()       # 更新模型参数
    end_time = time.time()
    spend_time = end_time - start_time
    print(f"epoch:{epoch}\t用时{spend_time:.2f}s")
    print(f"loss:{losses[-1]}")

Fig = plt.figure()
plt.plot(range(len(losses)),losses)
plt.show()


In [None]:
# 测试
test_loader = DataLoader(test_set, batch_size=1)
correct = 0
total = 0
model.eval() # 评估模式模式
for batch_x, batch_y in test_loader:
    pred = model(batch_x)  
    predicted = torch.argmax(pred, dim=1)
    # print(f"{predicted}\t{batch_y}")
    correct += (predicted == batch_y).sum().item()
    total += batch_y.size(0)
# print(f"correct:{correct}")
# print(f"total:{total}")
accuracy = correct / total
print(f"Test Accuracy: {accuracy * 100:.2f}%")

In [None]:
torch.save(model.state_dict(), 'model_adam_dim512.pth')

In [None]:
new_model = DNN(len(vocab), embedding_dim).to(device)
new_model.load_state_dict(torch.load('model_adam_dim512.pth'))
# 测试
test_loader = DataLoader(test_set, batch_size=1)
correct = 0
total = 0
model.eval() # 评估模式模式
for batch_x, batch_y in test_loader:
    pred = model(batch_x)  
    predicted = torch.argmax(pred, dim=1)
    # print(f"{predicted}\t{batch_y}")
    correct += (predicted == batch_y).sum().item()
    total += batch_y.size(0)
# print(f"correct:{correct}")
# print(f"total:{total}")
accuracy = correct / total
print(f"Test Accuracy: {accuracy * 100:.2f}%")