In [None]:
!pip install boltons -q

In [None]:
import numpy as np
import pandas as pd
from tqdm.notebook import tqdm
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch import optim
from torch.utils.data import Dataset, DataLoader
from torch.utils.data.dataset import random_split
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence
from boltons.iterutils import windowed

import string
from pathlib import Path
from textwrap import wrap

In [None]:
BATCH_SIZE = 2048
LEARNING_RATE = 1e-3
PATIENCE = 5
NUM_EPOCHS = 50
BETAS = (0.5, 0.99)
SEQUENCE_LEN = 64
EMBEDDING_DIMENSION = 100
HIDDEN_SIZE = 128
TEMPERATURE = 0.5
LEN_GEN_TEXT = 500
FILE_AMOUNT = 8
FACTOR = 0.5
N_LAYERS = 5

In [None]:
DATA_PATH = "../input/plain-text-wikipedia-202011/enwiki20201020/"

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

In [None]:
def load_data(path, sequence_length=SEQUENCE_LEN):
    texts = []
    for paths in tqdm(os.listdir(path)[:FILE_AMOUNT]):
        texts += pd.read_json(os.path.join(path, paths)).text.sample(EMBEDDING_DIMENSION, replace=True).str.lower().tolist()
    chars_windowed = [list(windowed(text, sequence_length)) for text in texts]
    all_chars_windowed = [sublst for lst in tqdm(chars_windowed) for sublst in lst]
    filtered_good_chars = [
        sequence for sequence in tqdm(all_chars_windowed) 
        if all(char in string.printable for char in sequence)
    ]
    return filtered_good_chars


def get_unique_chars(sequences):
    return {sublst for lst in sequences for sublst in lst}


def create_char2idx(sequences):
    unique_chars = get_unique_chars(sequences)
    return {char: idx for idx, char in tqdm(enumerate(sorted(unique_chars)))}


def encode_sequence(sequence, char2idx):
    return [char2idx[char] for char in sequence]


def encode_sequences(sequences, char2idx):
    return np.array([
        encode_sequence(sequence, char2idx) 
        for sequence in tqdm(sequences)
    ])

In [None]:
class Sequences(Dataset):
    def __init__(self, path, sequence_length=SEQUENCE_LEN):
        self.sequences = load_data(DATA_PATH, sequence_length=sequence_length)
        self.vocab_size = len(get_unique_chars(self.sequences))
        self.char2idx = create_char2idx(self.sequences)
        self.idx2char = {idx: char for char, idx in tqdm(self.char2idx.items())}
        self.encoded = encode_sequences(self.sequences, self.char2idx)
        
    def __getitem__(self, i):
        return self.encoded[i, :-1], self.encoded[i, 1:]
    
    def __len__(self):
        return len(self.encoded)

In [None]:
dataset = Sequences(DATA_PATH)
len(dataset)
train_loader = DataLoader(dataset, batch_size=BATCH_SIZE)

In [None]:
class RNN(nn.Module):
    def __init__(
        self,
        vocab_size,
        embedding_dimension=EMBEDDING_DIMENSION,
        hidden_size=HIDDEN_SIZE, 
        n_layers=1,
        n_layers_2=1,
        device=device,
    ):
        super(RNN, self).__init__()
        self.n_layers = n_layers
        self.hidden_size = hidden_size
        self.device = device
        
        self.encoder = nn.Embedding(vocab_size, embedding_dimension)
        self.rnn = nn.GRU(
            embedding_dimension,
            hidden_size,
            num_layers=n_layers,
            batch_first=True,
        )
        self.decoder = nn.Sequential(
            *[nn.Linear(hidden_size, vocab_size) for _ in range(n_layers_2)]
        )
        
    def init_hidden(self, batch_size):
        return torch.randn(self.n_layers, batch_size, self.hidden_size).to(self.device)
    
    def forward(self, input_, hidden):
        encoded = self.encoder(input_)
        output, hidden = self.rnn(encoded.unsqueeze(1), hidden)
        output = self.decoder(output.squeeze(1))
        return output, hidden

In [None]:
model = RNN(vocab_size=dataset.vocab_size, device=device, n_layers=N_LAYERS).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(
    filter(lambda p: p.requires_grad, model.parameters()),
    lr=LEARNING_RATE,
    betas=BETAS
)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', patience=PATIENCE, min_lr=1e-6, factor=FACTOR)

In [None]:
print(model)
print()
print('Trainable parameters:')
print('\n'.join([' * ' + x[0] for x in model.named_parameters() if x[1].requires_grad]))

In [None]:
model.train()
train_losses = []
for epoch in tqdm(range(NUM_EPOCHS)):
    losses = []
    total = 0
    for inputs, targets in tqdm(train_loader, leave=False):
        batch_size = inputs.size(0)
        hidden = model.init_hidden(batch_size)

        model.zero_grad()
        
        loss = 0
        for char_idx in range(inputs.size(1)):
            output, hidden = model(inputs[:, char_idx].to(device), hidden)
            loss += criterion(output, targets[:, char_idx].to(device))

        loss.backward()

        optimizer.step()
        
        avg_loss = loss.item() / inputs.size(1)
        
        losses.append(avg_loss)
        total += 1
    
    epoch_loss = sum(losses) / total
    scheduler.step(epoch_loss)
    train_losses.append(epoch_loss)
        
    print(f'Epoch #{epoch + 1}\tTrain Loss: {epoch_loss:.3f}')

In [None]:
def pretty_print(text):
    """Wrap text for nice printing."""
    to_print = ''
    for paragraph in text.split('\n'):
        to_print += '\n'.join(wrap(paragraph))
        to_print += '\n'
    print(to_print)

model.eval()
seed = 't'
text = ''
with torch.no_grad():
    batch_size = 1
    hidden = model.init_hidden(batch_size)
    last_char = dataset.char2idx[seed]
    for _ in range(LEN_GEN_TEXT):
        output, hidden = model(torch.LongTensor([last_char]).to(device), hidden)
        
        distribution = output.squeeze().div(TEMPERATURE).exp()
        guess = torch.multinomial(distribution, 1).item()
        
        last_char = guess
        text += dataset.idx2char[guess]
        
pretty_print(text)