# Chinese Poetry Generator

Generates Tang poetry using the CharRNN model

Data: from https://github.com/chinese-poetry/chinese-poetry

In [1]:
import random
from collections import Counter

import torch
from torch.utils.data import Dataset, DataLoader
from torch import nn
import torch.optim as optim
from zhconv import convert
from tqdm.auto import tqdm
import numpy as np


In [2]:
from data_loader import ParseRawData
# This loads the json data and only takes the main body of each poem
data_all = ParseRawData()

In [3]:
len(data_all)

57598

We have 57598 Tang Poems in total.

In [4]:
data_all[0]

'秦川雄帝宅，函谷壯皇居。綺殿千尋起，離宮百雉餘。連甍遙接漢，飛觀迥凌虛。雲日隱層闕，風煙出綺疎。'

# preprocess data

Filter and prepare data:
- convert traditional Chinese to simplified Chinese for better readability
- only take poems with 5 character lines - easier for model to learn the poem structure

In [234]:
comma = data_all[0][5]
data = [convert(x, "zh-hans") for x in data_all if len(x.split(comma)[0])==5]
len(data)

30379

In [235]:
data[0]

'秦川雄帝宅，函谷壮皇居。绮殿千寻起，离宫百雉馀。连甍遥接汉，飞观迥凌虚。云日隐层阙，风烟出绮疏。'

In [237]:
char_counter = Counter([x for poem in data for x in poem])

In [238]:
len([x for x in char_counter if char_counter[x] < 5])

2607

In [269]:
char_to_ix = {}
min_freq = 10 # a relatively large number is chosen to make the dataset smaller to fit my laptop
# EOP_TOKEN = '$'
for poem in data:
    for char in poem:
        # only take chars that appeared at least min_freq times
        if (char not in char_to_ix) and (char_counter[char] >= min_freq):
            char_to_ix[char] = len(char_to_ix)
# char_to_ix['$'] = len(char_to_ix)
# char_to_ix['<START>'] = len(char_to_ix)
ix_to_char = dict((i, w) for (w, i) in char_to_ix.items())

In [270]:
# plus one to count for the unknown chars
vocab_size = len(char_to_ix) + 1
print("vocab size: {}".format(vocab_size))

vocab size: 4442


In [272]:
# get input and target for training.
# use first 6 chars as input to predict the next char.
seq_len = 6
input_data = []
target = []
# EOP_TOKEN = '$'
for poem in data:
    # add EOP token
#     poem += EOP_TOKEN
    for i in range(len(poem) - seq_len):
        target.append(poem[i + seq_len])
        input_data.append(poem[i:i+seq_len])

print("Number of training samples: {}".format(len(input_data)))

Number of training samples: 1775989


In [273]:
input_data[0]

'秦川雄帝宅，'

In [274]:
def transform_X(text, char_to_ix):
    """Transforms one row of input text to onehot encoded tensors
    input format: '秦川雄帝宅，'
    output format: onehot encoded tensors
    """
    X = np.zeros((len(text), len(char_to_ix)+1))
    for i, char in enumerate(text):
        X[i, char_to_ix.get(char, len(char_to_ix))] = 1
    return X


# def transform_y(text, char_to_ix):
#     """Transform the target text into onehot encoded tensor"""
#     y = np.zeros((len(char_to_ix)+1))
#     y[char_to_ix.get(text, len(char_to_ix))] = 1
#     return y

# For Pytorch crossentropyloss, the target does not need to be onehot encoded.
def transform_y(text, char_to_ix):
    return char_to_ix.get(text, len(char_to_ix))

# dataset is too big for my laptop. Build a custom dataset
class PoemDataset(Dataset):
    def __init__(self, data, target, transform_X, transform_y, char_to_ix):
        self.data = data
        self.target = target
        self.transform_X = transform_X
        self.transform_y = transform_y
        self.char_to_ix = char_to_ix
        
    def __len__(self):
        """Total number of samples"""
        return len(self.data)
    
    def __getitem__(self, index):
        """Generate one sample of data"""
        X = self.transform_X(self.data[index], self.char_to_ix)
        y = self.transform_y(self.target[index], self.char_to_ix)
        sample = {"input": X, "target": y}
        return sample

In [275]:
poem_dataset = PoemDataset(input_data, target, transform_X, transform_y, char_to_ix)

In [290]:
batch_size = 128
dataloader = DataLoader(poem_dataset, batch_size=batch_size, shuffle=True)

In [291]:
# # check we have the right shape
# for i, sample_batched in enumerate(dataloader):
#     print(i, sample_batched['input'].size(),
#          sample_batched['target'].size(),
#          sample_batched['target'].dtype)
#     if i == 3:
#         break

In [292]:
class PoemGenerationModel(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim):
        super(PoemGenerationModel, self).__init__()
    
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.lstm = nn.LSTM(vocab_size, hidden_dim, num_layers=1, batch_first=True)
        self.linear = nn.Linear(hidden_dim, vocab_size)
        
    def forward(self, input_data):
#         embeds = self.embedding(input_data)
        lstm_out, _ = self.lstm(input_data)
        logits = self.linear(lstm_out[:, -1, :].squeeze())
        return logits

In [293]:
embed_dim = 128
hidden_dim = 256
lr = 0.001
device = 'cuda' if torch.cuda.is_available() else 'cpu'

model = PoemGenerationModel(vocab_size, embed_dim, hidden_dim).float().to(device)
loss_function = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=lr)

In [294]:
print(model)

PoemGenerationModel(
  (embedding): Embedding(4442, 128)
  (lstm): LSTM(4442, 256, batch_first=True)
  (linear): Linear(in_features=256, out_features=4442, bias=True)
)


In [295]:
device

'cpu'

In [314]:
np.power(0.2, 1/1.2)

0.2615320972023661

In [296]:
def sample(preds, temperature=1.0):
    """Sample the output based predicted probabilities.
    
    preds: 1D tensor. Logits from the model
    temperature: When temperature is low, tend to choose the most likely words. 
    When temperature is high, model will be more adventurous. 
    """
    # helper function to sample an index from a probability array
    preds = torch.nn.functional.softmax(preds, dim=0).detach().cpu().numpy()
    
    exp_preds = np.power(preds, 1./temperature)
    preds = exp_preds / np.sum(exp_preds)
    pro = np.random.choice(range(len(preds)), 1, p=preds)
    return int(pro.squeeze())

In [297]:
def generate_poem(input_text, output_length=18, temperature=1):
    """Given input_text, generate a poem.
    input_text need to be 6 chars, where last one is a comma.
    Example input: "我有紫霞想，"
    """
    generated = ""
    for i in range(output_length):
        pred = generate_one_char(input_text, temperature=temperature)
        generated += pred
        input_text = input_text[1:] + pred
    return generated
    
def generate_one_char(input_text, temperature=1):
    X_test = np.zeros((1, seq_len, vocab_size))
    for t, char in enumerate(input_text):
        X_test[0, t, char_to_ix.get(char, len(char_to_ix))] = 1
        
    pred = model(torch.from_numpy(X_test).float()) #use less precision for laptop
    next_index = sample(pred, temperature)
    next_char = ix_to_char.get(next_index, "?")
    
    return next_char

In [298]:
def on_epoch_end(epoch):
    # Function invoked at end of each epoch. Prints generated text.
    if epoch % 2 == 0:
        print()
        print('----- Generating text after Epoch: %d' % epoch)

        # randomly pick the starting line of a poem as the seed
        poem_index = random.randint(0, len(data))
        print("Generating with seed: {}".format(data[poem_index][:seq_len]))
        seed_text = data[poem_index][:seq_len]
        for temperature in [0.2, 0.5, 1.0, 1.2]:
            print('----- temperature:', temperature)
            generated = generate_poem(seed_text, temperature=temperature)
            print(generated)

In [300]:
n_epochs = 200

for epoch in range(n_epochs):
#     model.train()
  
    # Dataloader returns the batches
    for samples in tqdm(dataloader):
        cur_batch_size = len(samples)
        batch_X = samples['input'].to(device)
        batch_y = samples['target'].to(device)

        # Zero out the gradients before backpropagation
        model.zero_grad()

        y_pred = model(batch_X.float())
        # Compute loss and update gradients
        loss = loss_function(y_pred, batch_y.long())
        loss.backward()
        # `clip_grad_norm` helps prevent the exploding gradient problem in RNNs / LSTMs.
#         nn.utils.clip_grad_norm_(model.parameters(), 1)
        optimizer.step()

    print("Epoch {}:  Loss: {}".format(epoch, loss))
    on_epoch_end(epoch)


  0%|          | 0/13875 [00:00<?, ?it/s]

Epoch 0:  Loss: 4.846212387084961

----- Generating text after Epoch: 0
Generating with seed: 我有紫霞想，
----- temperature: 0.2
无因青草生。因君不可见，此去无所思。
----- temperature: 0.5
何人理白云。朝廷风云外，主人不可知。
----- temperature: 1.0
何时长守苏。谁敢苦轩蔡，百年亡洛浮。
----- temperature: 1.2
那忘名候违。疏并随衫蝶，四年落齿罗。


  0%|          | 0/13875 [00:00<?, ?it/s]

Epoch 1:  Loss: 5.119420528411865


  0%|          | 0/13875 [00:00<?, ?it/s]

KeyboardInterrupt: 

In [304]:
save_path = "saved_model/charrnn_pytorch_no_embedding"
torch.save(model.state_dict(), save_path)

# Load the saved model and test

In [305]:
trained_model = PoemGenerationModel(vocab_size, embed_dim, hidden_dim)
model.load_state_dict(torch.load(save_path))
model.eval()

挥灼朱丝折。路通祖窗云，面侧看钩嵩。


In [313]:
seed_text = "我有紫霞想，"
generated = generate_poem(seed_text, temperature=0.2)
print(generated)

君为白日来。今来一何事，相见日相逢。


In [317]:
generated = generate_poem(seed_text, temperature=0.3)
print(generated)

我无白日来。君看一相见，谁与鬓毛斑。


In [318]:
generated = generate_poem(seed_text, output_length=42, temperature=0.2)
print(generated)

我有青山期。我有一生事，此时无所求。一朝一尊酒，一夜一尊酒。一日一尊酒，一朝一醉醒。


In [321]:
generated = generate_poem("明月几时有，", output_length=42, temperature=0.5)
print(generated)

空堂青史书。回瞻汉武库，直入汉宫中。明月临江浦，清风满树枝。半空回首起，却忆故人来。


In [323]:
generated = generate_poem("明月几时有，", output_length=18, temperature=0.2)
print(generated)

清风吹不闻。今来一何事，自有一生心。


TODO: 
 - add one more lstm layer?
 - add embedding layer