# 自动写诗

首先导入必要的库：

In [1]:
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import DataLoader

# 加载数据集

本次实验的数据来自chinese-poetry：https://github.com/chinese-poetry/chinese-poetry

实验提供预处理过的数据集，含有57580首唐诗，每首诗限定在125词，不足125词的以```<s>```填充。数据集以npz文件形式保存，包含三个部分：
- （1）data: 诗词数据，将诗词中的字转化为其在字典中的序号表示。
- （2）ix2word: 序号到字的映射
- （3）word2ix: 字到序号的映射

预处理数据集的下载：[点击下载](https://yun.sfo2.digitaloceanspaces.com/pytorch_book/pytorch_book/tang.npz)

In [4]:
def prepareData():
    
    # 读入预处理的数据
    datas = np.load("../../Dataset/tang.npz", allow_pickle=True)
    data = datas['data']
    ix2word = datas['ix2word'].item()
    word2ix = datas['word2ix'].item()
    
    # 转为torch.Tensor
    data = torch.from_numpy(data)
    dataloader = DataLoader(data,
                         batch_size = 16,
                         shuffle = True,
                         num_workers = 2)
    
    return dataloader, ix2word, word2ix

In [5]:
dataloader, ix2word, word2ix = prepareData()

# 构建模型

模型包括Embedding层、LSTM层和输出层。

In [6]:
class PoetryModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim):
        super(PoetryModel, self).__init__()
        self.hidden_dim = hidden_dim
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, self.hidden_dim, num_layers=2)
        self.linear = nn.Linear(self.hidden_dim, vocab_size)

    def forward(self, input, hidden = None):
        seq_len, batch_size = input.size()
        
        if hidden is None:
            h_0 = input.data.new(2, batch_size, self.hidden_dim).fill_(0).float()
            c_0 = input.data.new(2, batch_size, self.hidden_dim).fill_(0).float()
        else:
            h_0, c_0 = hidden

        embeds = self.embedding(input)
        output, hidden = self.lstm(embeds, (h_0, c_0))
        output = self.linear(output.view(seq_len * batch_size, -1))
        return output, hidden

In [27]:
import math

class PoetryTransformerModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, num_heads, num_layers):
        super(PoetryTransformerModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.positional_encoding = PositionalEncoding(embedding_dim)
        self.transformer = nn.Transformer(
            d_model=embedding_dim,
            nhead=num_heads,
            num_encoder_layers=num_layers,
            num_decoder_layers=num_layers,
            dim_feedforward=hidden_dim
        )
        self.linear = nn.Linear(embedding_dim, vocab_size)

    def forward(self, src, tgt, src_mask=None, tgt_mask=None, memory_mask=None):
        # Embedding and positional encoding
        src_emb = self.embedding(src) * math.sqrt(self.embedding.embedding_dim)
        tgt_emb = self.embedding(tgt) * math.sqrt(self.embedding.embedding_dim)
        src_emb = self.positional_encoding(src_emb)
        tgt_emb = self.positional_encoding(tgt_emb)

        # Transformer forward pass
        output = self.transformer(
            src_emb, tgt_emb,
            src_mask=src_mask,
            tgt_mask=tgt_mask,
            memory_mask=memory_mask
        )

        # Final linear layer
        output = self.linear(output)
        return output

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return x

# Example usage
vocab_size = 125
embedding_dim = 256
hidden_dim = 512
num_heads = 8
num_layers = 6

model = PoetryTransformerModel(vocab_size, embedding_dim, hidden_dim, num_heads, num_layers)

In [None]:
class PoetryModel2(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim):
        super(PoetryModel2, self).__init__()
        self.hidden_dim = hidden_dim
        self.num_layers = LSTM_LAYER
        self.lstm_outdim = LSTM_OUTDIM
        self.embeddings = nn.Embedding(vocab_size, embedding_dim)
        self.lstm1 = nn.LSTM(embedding_dim, self.hidden_dim, num_layers=self.num_layers)
        self.lstm2 = nn.LSTM(self.hidden_dim, self.lstm_outdim, num_layers=self.num_layers, batch_first=True)
        self.fc = nn.Sequential(nn.Linear(self.lstm_outdim, 2048),
                                nn.Tanh(),
                                nn.Linear(2048, vocab_size))
        self.dropout = nn.Dropout(0.6)

    def forward(self, input, hidden1=None, hidden2=None):
        seq_len, batch_size = input.size()
        if hidden1 is None or hidden2 is None:
            h_0 = input.data.new(self.num_layers, batch_size, self.hidden_dim).fill_(0).float()
            c_0 = input.data.new(self.num_layers, batch_size, self.hidden_dim).fill_(0).float()
            h_1 = input.data.new(self.num_layers, seq_len, self.lstm_outdim).fill_(0).float()
            c_1 = input.data.new(self.num_layers, seq_len, self.lstm_outdim).fill_(0).float()
        else:
            h_0, c_0 = hidden1
            h_1, c_1 = hidden2
        embeds = self.embeddings(input)
        output, hidden1 = self.lstm1(embeds, (h_0, c_0))
        output, hidden2 = self.lstm2(output, (h_1, c_1))
        output = self.dropout(output)
        output = self.fc(output.reshape(seq_len * batch_size, -1))
        return output, hidden1, hidden2

# 训练模型

In [28]:
# 设置超参数
learning_rate = 5e-3       # 学习率
embedding_dim = 128        # 嵌入层维度
hidden_dim = 256           # 隐藏层维度
model_path = None          # 预训练模型路径
epochs = 20                # 训练轮数
verbose = True             # 打印训练过程
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

In [29]:
def train(dataloader, ix2word, word2ix):

    vocab_size = 125
    embedding_dim = 256
    hidden_dim = 512
    num_heads = 8
    num_layers = 6

    model = PoetryTransformerModel(vocab_size, embedding_dim, hidden_dim, num_heads, num_layers)
    # 配置模型，是否继续上一次的训练
    # model = PoetryModel(len(word2ix), embedding_dim, hidden_dim)
    if model_path:
        model.load_state_dict(torch.load(model_path))
    model.to(device)
    
    # 设置优化器
    optimizer = torch.optim.Adam(model.parameters(), lr = learning_rate)

    # 设置损失函数
    criterion = nn.CrossEntropyLoss()

    # 定义训练过程
    for epoch in range(epochs):
        for batch_idx, data in enumerate(dataloader):
            data = data.long().transpose(1, 0).contiguous()
            data = data.to(device)
            input, target = data[:-1, :], data[1:, :]
            output, _ = model(input)
            loss = criterion(output, target.view(-1))
            
            if batch_idx % 900 == 0 & verbose:
                print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                    epoch+1, batch_idx * len(data[1]), len(dataloader.dataset),
                    100. * batch_idx / len(dataloader), loss.item()))
            
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

    # 保存模型
    torch.save(model.state_dict(), 'model.pth')

In [30]:
train(dataloader, ix2word, word2ix)

TypeError: forward() missing 1 required positional argument: 'tgt'

# 生成唐诗

给定几个词，根据这几个词接着生成一首完整的唐诗。

In [24]:
# 设置超参数
model_path = 'model.pth'        # 模型路径
start_words = '湖光秋月两相和'  # 唐诗的第一句
max_gen_len = 125                # 生成唐诗的最长长度

In [19]:
def generate(start_words, ix2word, word2ix):

    # 读取模型
    model = PoetryModel(len(word2ix), embedding_dim, hidden_dim)
    model.load_state_dict(torch.load(model_path))
    model.to(device)
    
    # 读取唐诗的第一句
    results = list(start_words)
    start_word_len = len(start_words)
    
    # 设置第一个词为<START>
    input = torch.Tensor([word2ix['<START>']]).view(1, 1).long()
    input = input.to(device)
    hidden = None

    # 生成唐诗
    for i in range(max_gen_len):
        output, hidden = model(input, hidden)
        # 读取第一句
        if i < start_word_len:
            w = results[i]
            input = input.data.new([word2ix[w]]).view(1, 1)
        # 生成后面的句子
        else:
            top_index = output.data[0].topk(1)[1][0].item()
            w = ix2word[top_index]
            results.append(w)
            input = input.data.new([top_index]).view(1, 1)
        # 结束标志
        if w == '<EOP>':
            del results[-1]
            break
            
    return results

In [25]:
results = generate(start_words, ix2word, word2ix)
print(results)

['湖', '光', '秋', '月', '两', '相', '和', '，', '水', '底', '松', '声', '闻', '擣', '衣', '。', '一', '片', '月', '明', '秋', '水', '冷', '，', '一', '声', '蝉', '啸', '月', '华', '清', '。', '一', '片', '月', '明', '秋', '水', '冷', '，', '一', '声', '蝉', '啸', '月', '华', '清', '。', '一', '片', '月', '明', '秋', '水', '绿', '，', '一', '声', '蝉', '啸', '月', '华', '清', '。', '不', '知', '何', '处', '归', '云', '去', '，', '不', '见', '春', '风', '吹', '白', '苹', '。', '春', '风', '吹', '落', '春', '风', '起', '，', '春', '风', '吹', '尽', '春', '风', '起', '。', '春', '风', '吹', '尽', '春', '风', '起', '，', '春', '风', '吹', '尽', '春', '风', '起', '。', '春', '风', '吹', '尽', '猩', '猩', '飞', '，', '玉', '筯', '双', '飞', '入']


# 生成藏头诗



In [13]:
# 设置超参数
model_path = 'model.pth'                 # 模型路径
start_words_acrostic = '湖光秋月两相和'  # 唐诗的“头”
max_gen_len_acrostic = 125               # 生成唐诗的最长长度

In [14]:
def gen_acrostic(start_words, ix2word, word2ix):

    # 读取模型
    model = PoetryModel(len(word2ix), embedding_dim, hidden_dim)
    model.load_state_dict(torch.load(model_path))
    model.to(device)
    
    # 读取唐诗的“头”
    results = []
    start_word_len = len(start_words)
    
    # 设置第一个词为<START>
    input = (torch.Tensor([word2ix['<START>']]).view(1, 1).long())
    input = input.to(device)
    hidden = None

    index = 0            # 指示已生成了多少句
    pre_word = '<START>' # 上一个词

    # 生成藏头诗
    for i in range(max_gen_len_acrostic):
        output, hidden = model(input, hidden)
        top_index = output.data[0].topk(1)[1][0].item()
        w = ix2word[top_index]

        # 如果遇到标志一句的结尾，喂入下一个“头”
        if (pre_word in {u'。', u'！', '<START>'}):
            # 如果生成的诗已经包含全部“头”，则结束
            if index == start_word_len:
                break
            # 把“头”作为输入喂入模型
            else:
                w = start_words[index]
                index += 1
                input = (input.data.new([word2ix[w]])).view(1, 1)
                
        # 否则，把上一次预测作为下一个词输入
        else:
            input = (input.data.new([word2ix[w]])).view(1, 1)
        results.append(w)
        pre_word = w
        
    return results

In [15]:
results_acrostic = gen_acrostic(start_words_acrostic, ix2word, word2ix)
print(results_acrostic)

['湖', '上', '有', '君', '子', '，', '一', '朝', '无', '所', '求', '。', '光', '辉', '不', '可', '见', '，', '一', '旦', '不', '可', '亲', '。', '秋', '风', '吹', '白', '日', '，', '一', '夜', '无', '人', '知', '。', '月', '明', '天', '下', '静', '，', '鸟', '过', '竹', '林', '寒', '。', '两', '鬓', '不', '可', '见', '，', '一', '枝', '无', '所', '依', '。', '相', '思', '不', '可', '见', '，', '一', '夜', '不', '可', '攀', '。', '和', '风', '吹', '不', '足', '，', '秋', '草', '生', '寒', '烟', '。']
