# Char-based text generation using RNN

This notebook contains the code to create, train and save models for simple text generation.

In [None]:
import torch
import numpy as np
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader
import torch.nn as nn

In [None]:
def filter_by_alphabet(text, alphabet):
    """Leaves text sequence with non-alphabet symbols dropped"""
    alphabet_set = set(alphabet)
    return ''.join([character for character in text if character in alphabet_set])

def split_data(text, seq_length, stride):
    """Divides given text into samples of given length with a given stride"""
    inputs = []
    targets = []
    for i in range(0, len(text) - seq_length - 1, stride):
        inputs.append(text[i : i + seq_length])
        targets.append(text[i + 1 : i + seq_length + 1])
    return inputs, targets

def integerify(list_of_strings):
    """Translates chars to their integer-class representation"""
    result = []
    for string in list_of_strings:
        result.append([char_to_index[x] for x in string])
    return result

def one_hot_encode(arr, n_labels):
    """Applies one-hot encodding to a given sequence. Is used to create classification labels"""
    one_hot = np.zeros((np.multiply(*arr.shape), n_labels), dtype=np.float32)
    one_hot[np.arange(one_hot.shape[0]), arr.flatten()] = 1.
    one_hot = one_hot.reshape((*arr.shape, n_labels))
    return one_hot

def collate_function(batch):
    """Creates a torch.Tensor based on batch contents"""
    sample_list = integerify([first for first, second in batch])
    label_list = integerify([second for first, second in batch])
    return torch.tensor(sample_list), torch.tensor(label_list)

def get_data_from_file(path, alphabet, seq_length, batch_size, stride, train_size = 0.75):
    """Loads the data from a given file and produces train|validation dataloaders"""
    with open(path, "r") as text_file:
        text = text_file.read()
    text = filter_by_alphabet(text, alphabet)
    text_inputs, text_targets = split_data(text, seq_length, stride)
    data = list(zip(text_inputs, text_targets))
    display(len(data))
    train_size = int(train_size * len(data))
    val_size = len(data) - train_size
    train_data, val_data = torch.utils.data.random_split(data, [train_size, val_size])

    train_dataloader = DataLoader(
        train_data,
        batch_size=batch_size,
        shuffle=True,
        collate_fn=collate_function,
        pin_memory=True,
    )
    validation_dataloader = DataLoader(
        val_data,
        batch_size=batch_size,
        shuffle=True,
        collate_fn=collate_function,
        pin_memory=True,
    )
    return train_dataloader, validation_dataloader

In [None]:
special = '$' # is used as a message delimiter
alphabet='абвгдеёжзийклмнопрстуфхцчшщъыьэюяАБВГДЕЁЖЗИЙКЛМНОПРСТУФХЦЧШЩЪЫЬЭЮЯ .,!?\n-"'
# alphabet = alphabet + special
char_to_index = {alphabet[i]:i for i in range(len(alphabet))}

seq_length = 120
batch_size = 64
stride = 2

Loading data from a given file. You might change "dataset.txt" to your txt file to train on.

In [4]:
train_dataloader, validation_dataloader = get_data_from_file('dataset.txt', alphabet, seq_length, batch_size, stride)

12741343

Next section defines an RNN based on LSTM layers. Default parameters are chosen in a way to fit 
sequences of length 120-150 into my GTX 1650Ti (4 GB). You might change these if you have more GPU memory.

In [5]:
class CharRNN(nn.Module):
    def __init__(self, chars_num, n_hidden=512, n_layers=4, drop_prob=0.4):
        super().__init__()
        self.chars_num = chars_num
        self.drop_prob = drop_prob
        self.n_layers = n_layers
        self.n_hidden = n_hidden

        self.lstm = nn.LSTM(
            chars_num, n_hidden, n_layers, dropout=drop_prob, batch_first=True
        )
        self.dropout = nn.Dropout(drop_prob)
        self.linear = nn.Sequential(
            nn.Linear(n_hidden, n_hidden),
            nn.BatchNorm1d(n_hidden),
            nn.ReLU(),
        )
        self.fc = nn.Linear(n_hidden, chars_num)

    def forward(self, x, hidden):
        r_output, hidden = self.lstm(x, hidden)
        out = self.dropout(r_output)
        out = out.contiguous().view(-1, self.n_hidden)
        out = self.linear(out)
        out = self.fc(out)
        return out, hidden

    def init_hidden(self, batch_size):
        """Initializes hidden state with zeros."""
        weight = next(self.parameters()).data
        hidden = (
            weight.new(self.n_layers, batch_size, self.n_hidden).zero_().cuda(),
            weight.new(self.n_layers, batch_size, self.n_hidden).zero_().cuda(),
        )
        return hidden

In [6]:
def train(
    net,
    train_data,
    val_data,
    full_train,
    epochs=10,
    batches_per_epoch=100,
    batch_size=64,
    seq_length=100,
    lr=0.001,
    clip=5,
    val_frac=0.1,
    print_every=10,
):
    """Performs trainig cycle on a given model with a given data"""
    net.train()
    for p in net.lstm.parameters():
        p.requires_grad = full_train
    opt = torch.optim.Adam(net.parameters(), lr=lr)
    criterion = nn.CrossEntropyLoss()
    counter = 0
    n_chars = net.chars_num
    for e in range(epochs):
        for _ in range(batches_per_epoch):
            h = net.init_hidden(batch_size)
            x, y = next(iter(train_data))
            x = one_hot_encode(x, len(alphabet))
            y = one_hot_encode(y, len(alphabet)).reshape(-1, len(alphabet))
            inputs, targets = torch.from_numpy(x).cuda(), torch.from_numpy(y).cuda()
            h = tuple([each.data for each in h])
            net.zero_grad()
            output, h = net(inputs, h)
            loss = criterion(output, targets)
            loss.backward()
            nn.utils.clip_grad_norm_(net.parameters(), clip)
            opt.step()

            if counter % print_every == 0:
                val_h = net.init_hidden(batch_size)
                val_losses = []
                net.eval()
                x, y = next(iter(val_data))
                x = one_hot_encode(x, len(alphabet))
                y = one_hot_encode(y, len(alphabet)).reshape(-1, len(alphabet))
                inputs, targets = torch.from_numpy(x).cuda(), torch.from_numpy(y).cuda()
                val_h = tuple([each.data for each in val_h])
                output, val_h = net(inputs, val_h)
                val_loss = criterion(output, targets)
                val_losses.append(val_loss.item())
                net.train()

                print(
                    "Epoch: {}/{}...".format(e + 1, epochs),
                    "Step: {}...".format(counter),
                    "Loss: {:.4f}...".format(loss.item()),
                    "Val Loss: {:.4f}".format(np.mean(val_losses)),
                )

            counter += 1
        torch.save(net.state_dict(), 'checkpoint.pth')

In [7]:
def predict(net, char, h=None, temperature=1, top_k=None):
        """
        Predicts the next char, based on current hidden state and current char.
        Temperature is a parameter defining the sharpness of softmax layer in final classification.
        If temperature is high enough, it will produce only frequent phrases used by dataset 
        (as maximum of out layer becomes far more probable to be chosen),
        If temperature is low, it will make model's output totaly random (as it smoothes softmax layer results totaly).
        top_k is a parameter defining how many char variants should the model consider in each classification step.
        """
        x = np.array(integerify([char]))
        x = one_hot_encode(x, len(alphabet))
        inputs = torch.from_numpy(x).cuda()
        h = tuple([each.data for each in h])
        out, h = net(inputs, h)
        out = torch.exp(temperature * out)
        p = torch.nn.functional.softmax(out, dim=1).data.cpu()
        if top_k is None:
            top_ch = np.arange(len(net.chars))
        else:
            p, top_ch = p.topk(top_k)
            top_ch = top_ch.numpy().squeeze()
        p = p.numpy().squeeze()
        char = np.random.choice(top_ch, p=p/p.sum())
        return alphabet[char], h
    
    
def sample(net, size, prime, temperature = 1, top_k=None):    
    """Produces the text generated after given prime text. Uses predict in a char-based manner."""
    net.eval()
    chars = [ch for ch in prime]
    h = net.init_hidden(1)
    for ch in prime:
        char, h = predict(net, ch, h, temperature, top_k=top_k)
    chars.append(char)
    for ii in range(size):
        char, h = predict(net, chars[-1], h, temperature, top_k=top_k)
        chars.append(char)
    return ''.join(chars)

In [8]:
net = CharRNN(len(alphabet), 1024, 4)

In [9]:
net = net.cuda()

In [None]:
train(
    net,
    full_train=True,
    train_data=train_dataloader,
    val_data=validation_dataloader,
    epochs=20,
    batches_per_epoch = 1000,
    batch_size=batch_size,
    seq_length=seq_length,
    lr=0.01,
    print_every=10,
)

In [11]:
torch.save(net.state_dict(), 'output_net.pth')

Sampling via trained network with given text start, temperature and top_k parameter

In [None]:
print(sample(net, 400, prime='Однажды в', temperature = 0.4, top_k=7))

In [None]:
print(sample(net, 600, prime='В одном городе', temperature = 0.7, top_k=7))