In [1]:
import torch
import torch.nn as nn
import torch.optim as optim

In [2]:
# 数据准备
input_texts = ["床前明月光", "举头望明月", "千山鸟飞绝"]
target_texts = ["疑是地上霜", "低头思故乡", "万径人踪灭"]


In [3]:
# 创建字符到索引的映射
chars = set("".join(input_texts + target_texts))

In [5]:
char_to_idx = {char: idx for idx, char in enumerate(chars)}
idx_to_char = {idx: char for char, idx in char_to_idx.items()}

In [9]:
# 将文本转换为索引序列
def text_to_seq(text, char_to_idx):
    return [char_to_idx[char] for char in text]


In [5]:

input_seqs = [text_to_seq(text, char_to_idx) for text in input_texts]
target_seqs = [text_to_seq(text, char_to_idx) for text in target_texts]

# 填充序列到相同长度
max_len = max(len(seq) for seq in input_seqs + target_seqs)
input_seqs = [seq + [0] * (max_len - len(seq)) for seq in input_seqs]
target_seqs = [seq + [0] * (max_len - len(seq)) for seq in target_seqs]

# 转换为PyTorch张量
input_tensor = torch.tensor(input_seqs, dtype=torch.long)
target_tensor = torch.tensor(target_seqs, dtype=torch.long)

# 定义Seq2Seq模型
class Seq2Seq(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(Seq2Seq, self).__init__()
        self.embedding = nn.Embedding(input_size, hidden_size)
        self.encoder = nn.LSTM(hidden_size, hidden_size, batch_first=True)
        self.decoder = nn.LSTM(hidden_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        embedded = self.embedding(x)
        _, (hidden, cell) = self.encoder(embedded)
        output, _ = self.decoder(embedded, (hidden, cell))
        output = self.fc(output)
        return output

# 模型参数
input_size = len(chars)
hidden_size = 64
output_size = len(chars)

# 初始化模型、损失函数和优化器
model = Seq2Seq(input_size, hidden_size, output_size)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# 训练模型
epochs = 100
for epoch in range(epochs):
    optimizer.zero_grad()
    output = model(input_tensor)
    loss = criterion(output.view(-1, output_size), target_tensor.view(-1))
    loss.backward()
    optimizer.step()
    if (epoch + 1) % 10 == 0:
        print(f"Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}")

# 测试模型
with torch.no_grad():
    test_output = model(input_tensor)
    predicted = torch.argmax(test_output, dim=-1)
    for i in range(len(input_texts)):
        print(f"Input: {input_texts[i]}")
        print(f"Target: {target_texts[i]}")
        print(f"Predicted: {''.join([idx_to_char[idx.item()] for idx in predicted[i]])}")
        print()

Epoch [10/100], Loss: 2.9471
Epoch [20/100], Loss: 2.4372
Epoch [30/100], Loss: 1.7991
Epoch [40/100], Loss: 1.1756
Epoch [50/100], Loss: 0.7170
Epoch [60/100], Loss: 0.4386
Epoch [70/100], Loss: 0.2796
Epoch [80/100], Loss: 0.1888
Epoch [90/100], Loss: 0.1355
Epoch [100/100], Loss: 0.1026
Input: 床前明月光
Target: 疑是地上霜
Predicted: 疑是地上霜

Input: 举头望明月
Target: 低头思故乡
Predicted: 低头思故乡

Input: 千山鸟飞绝
Target: 万径人踪灭
Predicted: 万径人踪灭



In [1]:
import torch
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
from torch.nn.utils.rnn import pad_sequence
import torch
import pickle
from torch.utils.data import DataLoader, Dataset



In [2]:
text_list = []
def read_text_to_list(filepath,lines):
    """
    读取文本文件，将每一行作为一项保存到列表中。

    :param filepath: 文本文件路径
    :return: 包含文件每一行内容的列表
    """
    try:
        # 打开文件，使用 'r' 模式读取，指定编码为 UTF-8
        with open(filepath, 'r', encoding='utf-8') as file:
            # 逐行读取文件内容
            for line in file:
                lines.append(line.strip())  # 去掉行末的换行符并添加到列表
        print(f"文件已成功读取，共 {len(lines)} 行。")
    except Exception as e:
        print(f"读取文件时出错：{e}")

read_text_to_list('../../dataset/data_pro/唐诗/七言唐诗.txt',text_list)
read_text_to_list('../../dataset/data_pro/唐诗/六言唐诗.txt',text_list)
read_text_to_list('../../dataset/data_pro/唐诗/五言唐诗.txt',text_list)

文件已成功读取，共 812548 行。
文件已成功读取，共 815991 行。
文件已成功读取，共 1599430 行。


In [3]:
label_text,target_text = [],[]
for i in text_list:
    label_text.append(i.split()[0])
    target_text.append(i.split()[1])

In [4]:
all_text = ''
for i in label_text+target_text:
    all_text+=i

In [33]:
# 构建词汇表
def yield_tokens(data):
    for text in data:
        yield tokenizer(text)

In [25]:
# #这个不用跑了 已经保存了
# # 初始化分词器（字符级别）
tokenizer = get_tokenizer(None)  # 默认按字符分词

# vocab = build_vocab_from_iterator(yield_tokens(all_text), specials=["<unk>", "<pad>"])
# vocab.set_default_index(vocab["<unk>"])

In [8]:
# #这个不用跑了 已经保存了
# label_sequences = [[torch.tensor(vocab(tokenizer(i)), dtype=torch.long) for i in text] for text in label_text]
# target_sequences = [[torch.tensor(vocab(tokenizer(i)), dtype=torch.long) for i in text] for text in target_text]
# label_sequences = [torch.tensor(i) for i in label_sequences]
# target_sequences = [torch.tensor(i) for i in target_sequences]

In [9]:
# # 填充序列，使它们具有相同的长度
# label_sequences = pad_sequence(label_sequences, batch_first=True, padding_value=vocab["<pad>"])
# target_sequences = pad_sequence(target_sequences, batch_first=True, padding_value=vocab["<pad>"])

In [11]:
# torch.save(label_sequences, 'label_sequences.pt')
# torch.save(target_sequences, 'target_sequences.pt')

In [6]:
#读取序列
label_sequences = torch.load('label_sequences.pt')
target_sequences = torch.load('target_sequences.pt')

In [126]:
# # 保存 vocab 到文件
# with open('./vocab.pkl', 'wb') as f:
#     pickle.dump(vocab, f)

In [7]:
# 从文件加载 vocab
with open('./vocab.pkl', 'rb') as f:
    vocab = pickle.load(f)

In [8]:
class TextDataset(Dataset):
    def __init__(self, label_sequences, target_sequences):
        self.label_sequences = label_sequences
        self.target_sequences = target_sequences
        
    def __len__(self):
        return len(self.label_sequences)
    
    def __getitem__(self, idx):
        return self.label_sequences[idx], self.target_sequences[idx]

In [9]:
dataset = TextDataset(label_sequences, target_sequences)
dataloader = DataLoader(dataset, batch_size=100000, shuffle=True)

In [10]:
import torch.nn as nn

class LSTMModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim):
        super(LSTMModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, vocab_size)

    def forward(self, x):
        embedded = self.embedding(x)
        lstm_out, _ = self.lstm(embedded)
        output = self.fc(lstm_out)
        return output

# 模型参数
vocab_size = len(vocab)
embedding_dim = 64
hidden_dim = 128

# 初始化模型
model = LSTMModel(vocab_size, embedding_dim, hidden_dim)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)
# 打印模型结构
print(model)

LSTMModel(
  (embedding): Embedding(3865, 64)
  (lstm): LSTM(64, 128, batch_first=True)
  (fc): Linear(in_features=128, out_features=3865, bias=True)
)


In [20]:
import torch.optim as optim
import time
# 损失函数和优化器
criterion = nn.CrossEntropyLoss(ignore_index=vocab["<pad>"])  # 忽略填充部分
optimizer = optim.Adam(model.parameters(), lr=0.001)
model.train()
# 训练循环
num_epochs = 1
for epoch in range(num_epochs):
    time1 = time.time()
    epoch_loss = 0.0

    for batch_labels, batch_targets in dataloader:
        # 将数据移动到设备（如 GPU）
        batch_labels = batch_labels.to(device)
        batch_targets = batch_targets.to(device)

        # 前向传播
        outputs = model(batch_labels)
        loss = criterion(outputs.view(-1, vocab_size), batch_targets.view(-1))

        # 反向传播和优化
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # 累积损失
        epoch_loss += loss.item()

    # 打印损失
    avg_loss = epoch_loss / len(dataloader)
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}')
    print(f'Epoch {epoch} completed in {time.time()-time1:.2f} seconds')

Epoch [1/1], Loss: 0.4221
Epoch 0 completed in 26.22 seconds


In [None]:
torch.cuda.empty_cache()

In [21]:
torch.save(model, 'model_e64_h128.pth')

In [None]:
torch.tensor(vocab(tokenizer(text)), dtype=torch.long)

In [93]:
def test(text,max_length=50, temperature=1.0):
    sequences = [vocab(tokenizer(i))[0] for i in text]
    sequences += [0] * (7 - len(sequences))
    sequences = torch.tensor(sequences)
    tokens = sequences.unsqueeze(0).to(device)
    generated_tokens = tokens.tolist()[0]
    with torch.no_grad():
        for _ in range(7):
            # 前向传播
            outputs = model(tokens)
            
            # 获取最后一个时间步的输出
            last_output = outputs[:, -1, :] / temperature
            probabilities = torch.softmax(last_output, dim=-1)
            
            # 从概率分布中采样下一个 token
            next_token = torch.multinomial(probabilities, num_samples=1).item()
            
            # 如果生成的是 <eos>（结束符），则停止生成
            if next_token == vocab["<eos>"]:
                break
            
            # 将生成的 token 添加到结果中
            generated_tokens.append(next_token)
            
            # 更新输入 tokens
            tokens = torch.tensor([generated_tokens]).to(device)
    
    # 将生成的 token 转换回文本
    generated_text = "".join([vocab.lookup_token(token) for token in generated_tokens])
    # pad_sequence(sequences, batch_first=True, padding_value=vocab["<pad>"])
    # torch.tensor(
    return generated_text

In [95]:
text = '风急天高原'
test(text,max_length=7, temperature=1.0)

'风急天高原<unk><unk>难退干精方韩颜'

In [83]:
torch.tensor([vocab(tokenizer(i))[0] for i in "风急天高远销唉"]).unsqueeze(0)

tensor([[   5,  524,   14,   49,  171, 3763,    0]])

In [106]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np

# 数据
qian_list = ["白日依山尽", "床前明月光", "春眠不觉晓"]
hou_list = ["黄河入海流", "疑是地上霜", "处处闻啼鸟"]

# 构建词汇表
vocab = set("".join(qian_list + hou_list))
vocab = {char: idx for idx, char in enumerate(vocab)}
vocab_size = len(vocab)

# 数据预处理
def encode(text, vocab):
    return [vocab[char] for char in text]

# 数据集类
class PoetryDataset(Dataset):
    def __init__(self, qian_list, hou_list, vocab):
        self.qian_list = qian_list
        self.hou_list = hou_list
        self.vocab = vocab

    def __len__(self):
        return len(self.qian_list)

    def __getitem__(self, idx):
        qian = encode(self.qian_list[idx], self.vocab)
        hou = encode(self.hou_list[idx], self.vocab)
        return torch.tensor(qian, dtype=torch.long), torch.tensor(hou, dtype=torch.long)

# LSTM 模型
class LSTMModel(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, output_dim, num_layers=1):
        super(LSTMModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.lstm = nn.LSTM(embed_dim, hidden_dim, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        embedded = self.embedding(x)
        lstm_out, _ = self.lstm(embedded)
        output = self.fc(lstm_out)
        return output

# 超参数
embed_dim = 64
hidden_dim = 128
output_dim = vocab_size
num_layers = 1
learning_rate = 0.001
num_epochs = 100
batch_size = 1

# 数据加载器
dataset = PoetryDataset(qian_list, hou_list, vocab)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

# 模型、损失函数和优化器
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = LSTMModel(vocab_size, embed_dim, hidden_dim, output_dim, num_layers).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# 训练

In [110]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np

# 数据
qian_list = ["白日依山尽", "床前明月光", "春眠不觉晓"]
hou_list = ["黄河入海流", "疑是地上霜", "处处闻啼鸟"]

# 构建词汇表
vocab = set("".join(qian_list + hou_list))
vocab = {char: idx for idx, char in enumerate(vocab)}
vocab_size = len(vocab)

In [None]:


# 数据预处理
def encode(text, vocab):
    return [vocab[char] for char in text]

# 数据集类
class PoetryDataset(Dataset):
    def __init__(self, qian_list, hou_list, vocab):
        self.qian_list = qian_list
        self.hou_list = hou_list
        self.vocab = vocab

    def __len__(self):
        return len(self.qian_list)

    def __getitem__(self, idx):
        qian = encode(self.qian_list[idx], self.vocab)
        hou = encode(self.hou_list[idx], self.vocab)
        return torch.tensor(qian, dtype=torch.long), torch.tensor(hou, dtype=torch.long)

# LSTM 模型
class LSTMModel(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, output_dim, num_layers=1):
        super(LSTMModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.lstm = nn.LSTM(embed_dim, hidden_dim, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        embedded = self.embedding(x)
        lstm_out, _ = self.lstm(embedded)
        output = self.fc(lstm_out)
        return output

# 超参数
embed_dim = 64
hidden_dim = 128
output_dim = vocab_size
num_layers = 1
learning_rate = 0.001
num_epochs = 100
batch_size = 1

# 数据加载器
dataset = PoetryDataset(qian_list, hou_list, vocab)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

# 模型、损失函数和优化器
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = LSTMModel(vocab_size, embed_dim, hidden_dim, output_dim, num_layers).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# 训练
for epoch in range(num_epochs):
    for qian, hou in dataloader:
        qian, hou = qian.to(device), hou.to(device)
        
        # 前向传播
        output = model(qian)
        loss = criterion(output.view(-1, output_dim), hou.view(-1))
        
        # 反向传播
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}")

# 生成文本
def generate_text(model, input_text, vocab, max_length=10, device='cpu'):
    model.eval()
    tokens = encode(input_text, vocab)
    tokens = torch.tensor(tokens, dtype=torch.long).unsqueeze(0).to(device)
    
    generated_text = input_text
    for _ in range(max_length):
        with torch.no_grad():
            output = model(tokens)
            probs = torch.softmax(output[:, -1, :], dim=-1)
            next_token = torch.argmax(probs, dim=-1).item()
        
        generated_text += list(vocab.keys())[list(vocab.values()).index(next_token)]
        tokens = torch.cat([tokens, torch.tensor([[next_token]], device=device)], dim=1)
    
    return generated_text

# 测试生成
input_text = "白日依山尽"
generated_text = generate_text(model, input_text, vocab, max_length=10, device=device)
print(f"输入: {input_text}")
print(f"生成: {generated_text}")