# Char-based text generation with LSTM

In [1]:
from collections import Counter

import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np

In [None]:
corpus_path = 'corpus_en.txt'
with open(corpus_path) as f:
    text = f.read()

def text_to_seq(text_sample):
    char_counts = Counter(text_sample)
    char_counts = sorted(char_counts.items(), key = lambda x: x[1], reverse=True)
 
    sorted_chars = [char for char, _ in char_counts]
    print(sorted_chars)
    char_to_idx = {char: index for index, char in enumerate(sorted_chars)}
    idx_to_char = {v: k for k, v in char_to_idx.items()}
    sequence = np.array([char_to_idx[char] for char in text_sample])
    
    return sequence, char_to_idx, idx_to_char
 
sequence, char_to_idx, idx_to_char = text_to_seq(text)

[' ', 'e', 'o', 'n', 'r', 't', 'a', 'p', 'i', 's', 'h', '1', 'l', 'd', 'u', 'b', 'c', 'f', 'm', 'w', 'y', 'g', 'v', 'k', 'x', 'q', 'j', 'z', '_', '\n', '2', '0', '3', '5', '4', 'é', '7', '8', '6', '9', 'æ', 'â', 'û', 'è', 'ñ', 'î', 'ô', 'à', 'ê', 'œ', 'ä', '.', 'ç', 'ë', 'á', 'ü', 'ï', 'ö', 'ã', 'ò', 'ā', 'ú', 'ƒ', 'ο', 'ù', 'λ', 'δ', '¾', 'ι', 'β', 'ó', '?', 'α', 'ὴ', '½', 'ς', 'ν', 'η', '¼', 'π', '!', 'ē', 'ά', 'ì', 'ō', 'κ', 'υ', 'ό', 'ˆ', 'ἱ', 'ί', 'ῶ', 'ū', 'ἁ', 'ρ', 'χ', '闓', 'ÿ']


In [None]:
SEQ_LEN = 256
BATCH_SIZE = 16

def get_batch(sequence):
    trains = []
    targets = []
    for _ in range(BATCH_SIZE):
        batch_start = np.random.randint(0, len(sequence) - SEQ_LEN)
        chunk = sequence[batch_start: batch_start + SEQ_LEN]
        train = torch.LongTensor(chunk[:-1]).view(-1, 1)
        target = torch.LongTensor(chunk[1:]).view(-1, 1)
        trains.append(train)
        targets.append(target)
    return torch.stack(trains, dim=0), torch.stack(targets, dim=0)

In [None]:
def evaluate(model, char_to_idx, idx_to_char, start_text=' ', prediction_len=200, temp=0.3):
    hidden = model.init_hidden()
    idx_input = [char_to_idx[char] for char in start_text]
    train = torch.LongTensor(idx_input).view(-1, 1, 1).to(device)
    predicted_text = start_text
    
    _, hidden = model(train, hidden)
        
    inp = train[-1].view(-1, 1, 1)
    
    for i in range(prediction_len):
        output, hidden = model(inp.to(device), hidden)
        output_logits = output.cpu().data.view(-1)
        p_next = F.softmax(output_logits / temp, dim=-1).detach().cpu().data.numpy()        
        top_index = np.random.choice(len(char_to_idx), p=p_next)
        inp = torch.LongTensor([top_index]).view(-1, 1, 1).to(device)
        predicted_char = idx_to_char[top_index]
        predicted_text += predicted_char
    
    return predicted_text

In [None]:
class TextRNN(nn.Module):
    
    def __init__(self, input_size, hidden_size, embedding_size, n_layers=1):
        super(TextRNN, self).__init__()
        
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.embedding_size = embedding_size
        self.n_layers = n_layers

        self.encoder = nn.Embedding(self.input_size, self.embedding_size)
        self.lstm = nn.LSTM(self.embedding_size, self.hidden_size, self.n_layers)
        self.dropout = nn.Dropout(0.2)
        self.fc = nn.Linear(self.hidden_size, self.input_size)
        
    def forward(self, x, hidden):
        x = self.encoder(x).squeeze(2)
        out, (ht1, ct1) = self.lstm(x, hidden)
        out = self.dropout(out)
        x = self.fc(out)
        return x, (ht1, ct1)
    
    def init_hidden(self, batch_size=1):
        return (torch.zeros(self.n_layers, batch_size, self.hidden_size, requires_grad=True).to(device),
               torch.zeros(self.n_layers, batch_size, self.hidden_size, requires_grad=True).to(device))

In [6]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model = TextRNN(input_size=len(idx_to_char), hidden_size=128, embedding_size=128, n_layers=2)
model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-2, amsgrad=True)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, 
    patience=5, 
    verbose=True, 
    factor=0.5
)

n_epochs = 10000
loss_avg = []

for epoch in range(n_epochs):
    model.train()
    train, target = get_batch(sequence)
    train = train.permute(1, 0, 2).to(device)
    target = target.permute(1, 0, 2).to(device)
    hidden = model.init_hidden(BATCH_SIZE)

    output, hidden = model(train, hidden)
    loss = criterion(output.permute(1, 2, 0), target.squeeze(-1).permute(1, 0))
    
    loss.backward()
    optimizer.step()
    optimizer.zero_grad()
    
    loss_avg.append(loss.item())
    if len(loss_avg) >= 50:
        mean_loss = np.mean(loss_avg)
        print(f'Loss: {mean_loss}')
        scheduler.step(mean_loss)
        loss_avg = []
        model.eval()
        predicted_text = evaluate(model, char_to_idx, idx_to_char)
        print(predicted_text)

  return torch._C._cuda_getDeviceCount() > 0


Loss: 2.859004464149475
 кæр сардтæ бæрдтæ анæн суын фæр бардтæ ардын сæрд фæрдуыт рон барды ири рар рар бартæ гæр ахи фæстæ нырд хæр сæр хар рæрæ рæрдтæ тарты уырд фæрдти бард сæттæ сартæ фæрдтæ рар садтæ арырдæн хард сардар
Loss: 2.430340781211853
 къæрд фæрдын нырд сардæр фæдзæрд къæуын æрхъуыста ардын фæрдын сайдзын сæрдзын арды хъуыгъ бардæй æмбардта къарды хъуырд сæрдзын стан сардæн нардзæн фæрдард бахæрдæн сындзын сæрды хæртæ хъуын фæхуырд 
Loss: 2.331889019012451
 хъал хъæлæг хъуырд барадтон фæндын хъæдæй радзар дардта куыдæй æрæндæй сæрдæр ахард ардинæ хæдыд хъарын фæзын фæстæр æрбагъдæр сæрды цæуыст ахаджы фæкæнæн ахæрды дзындзын ацæх бахæрд нындта бахаджы ам
Loss: 2.252446217536926
 хъæуын æрбадта сард фæлдæрд уæрдта бахаз баст хъуыды уард бадта сардта рарзын конд сардтой æрдзæрдæгæй араты арау ардæр дзылдта бадтой бардтой арадзын æрцæуын хъуыды дæрдæр араст бараст бадзырдтон хъæ
Loss: 2.194968037605286
 кæрæдзын адæй райдзæн хатты хъæрæн æрбацæу хъуыды хъæудзынæн хъæугæ æрц

In [None]:
with open(corpus_path) as f:
    texts = f.readlines()
    
generated_texts = []
for text in texts:
    splitted_text = text.split()
    generated_text = ''
    for i, word in enumerate(splitted_text):
        if i % 200 == 0:
            generated_text += evaluate(model, char_to_idx, idx_to_char, temp=0.3, prediction_len=200, start_text=word)
        
    generated_texts.append(generated_text)
    
generated_path = 'generated_corpus_oss.txt'
with open(generated_path, 'w') as f:
    for generated_text in generated_texts:
        f.write(generated_text + '\n') 