In [18]:
import math
import re
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn import TransformerEncoder, TransformerEncoderLayer

In [19]:
class PositionalEncoding(nn.Module):

    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)

In [20]:
class TransformerModel(nn.Module):

    def __init__(self, ntoken, ninp, nhead, nhid, nlayers, dropout=0.5):
        super(TransformerModel, self).__init__()
        self.pos_encoder = PositionalEncoding(ninp, dropout)
        self.encoder     = TransformerEncoder(
            encoder_layer=TransformerEncoderLayer(ninp, nhead, nhid, dropout), 
            num_layers=nlayers
        )
        self.embs = nn.Embedding(ntoken, ninp)
        self.decoder = nn.Linear(ninp, ntoken)

    def generate_square_subsequent_mask(self, sz):
        mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1)
        mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
        return mask

    def forward(self, x, src_mask):
        x = self.embs(x)
        x = self.pos_encoder(x)
        x = self.encoder(x, src_mask)
        x = self.decoder(x)
        return x

Load and batch data
-------------------




In [21]:
class Reader:
    def __init__(self, prefix_path=''):
        self.prefix_path = prefix_path

    def read_txt(self, path):
        path = '{}/{}'.format(self.prefix_path, path)
        with open(path, 'r') as f:
            return f.readlines()

reader   = Reader('/content/drive/MyDrive/colab/advanced_ml')

In [22]:
class Vocab:
    def __init__(self, text):
        symbols  = set(text)
        self.c2i = { c : i for i, c in enumerate(symbols) }
        self.i2c = { i : c for i, c in enumerate(symbols) }

    def __getitem__(self, char):
        """
        returns index for given char
        """
        return self.c2i[char]

    def __char_by_index__(self, index):
        return self.i2c[index]

    def __len__(self):
        """
        returns length of vocabulary
        """
        return len(self.i2c)

    __call__ = __char_by_index__


class TextDataset:
    def __init__(self, sentences, seqlen=256, lang='RU'):
        self._replacements = {
            'NL' : lambda x: re.compile('\n').sub('', x),
            '!'  : lambda x: re.compile('!').sub('.', x),
            ','  : lambda x: re.compile(',').sub('.', x)
        }
        if lang == 'RU':
            self.replace_bad_chars = lambda x: re.compile(r'[^а-я,\.\s]+').sub('', x)
        elif lang == 'EN':
            self.replace_bad_chars = lambda x: re.compile(r'[^a-z,\.\s]+').sub('', x)
        else:
            print('Unknown text language')

        self.text   = self._preprocess(sentences)
        self.vocab  = Vocab(self.text)
        self.seqlen = seqlen

        self.batches = None

    def replace(self, sentence):
        for replacement in self._replacements.values():
            sentence = replacement(sentence)

        return sentence
    
    def _preprocess(self, sentences):
        """
        Preprocesses text
        """
        text = map(self.replace, sentences)
        text = map(str.lower, text)
        text = map(self.replace_bad_chars, text)
        text = map(str.strip, text)
        text = filter(lambda t: t, text)
        return ' '.join(text)

    @property
    def vocab_size(self):
        """
        Returns length of associated vocab
        """
        return len(self.vocab)

    def encode(self, text=None):
        """
        Runs precomputation chars encodings and paddings
        """
        encoded_text = torch.tensor(
            [self.vocab[char] for char in self.text][:self.num_batches * self.seqlen]
        )

        self.batches = encoded_text.view(self.seqlen, -1).t().contiguous()
        self.targets = self.batches[1:]
        self.batches = self.batches[:-1]
        
        return self

    @property
    def num_batches(self):
        return len(self.text) // self.seqlen

    def _encode(self, sentences):
        """
        Encodes char with with vocab indices.
        """
        return [[self.vocab[char] for char in s] for s in sentences]

In [23]:
raw_text = reader.read_txt('lord.txt')

data = TextDataset(raw_text, seqlen=32, lang='RU').encode()
data.vocab_size, data.num_batches, data.text[:40]

(34, 27438, 'эта сказка возникла в устных рассказах. ')

In [24]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

val_size=0.05
n = int(data.num_batches * val_size)

train_data = data.batches[:-2*n].to(device)
val_data   = data.batches[-2*n:-n].to(device)
test_data  = data.batches[-n:].to(device)

In [25]:
max_seqlen = 32
def get_batch(source, i):
    seq_len = min(max_seqlen, len(source) - 1 - i)
    data    = source[i:i+seq_len]
    target  = source[i+1:i+1+seq_len].reshape(-1)
    return data, target

In [26]:
ntokens = data.vocab_size 
emsize  = 512 
nhid    = 256 
nlayers = 5 
nhead   = 8 
dropout = 0.3
model   = TransformerModel(ntokens, emsize, nhead, nhid, nlayers, dropout).cuda()

In [27]:
import time

criterion = nn.CrossEntropyLoss()
lr = 1e-4
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1, gamma=0.9)

def train():
    model.train() 
    total_loss = 0.
    start_time = time.time()
    src_mask = model.generate_square_subsequent_mask(max_seqlen).to(device)
    for batch, i in enumerate(range(0, train_data.size(0) - 1, max_seqlen)):
        data, targets = get_batch(train_data, i)
        optimizer.zero_grad()
        if data.size(0) != max_seqlen:
            src_mask = model.generate_square_subsequent_mask(data.size(0)).to(device)
        output = model(data, src_mask)
        loss = criterion(output.view(-1, ntokens), targets)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()

        total_loss += loss.item()
        log_interval = 100
        if batch % log_interval == 0 and batch > 0:
            cur_loss = total_loss / log_interval
            elapsed = time.time() - start_time
            print('| epoch {:3d} | {:5d}/{:5d} batches | '
                  'lr {:5.5f} | ms/batch {:5.2f} | '
                  'loss {:5.2f} | ppl {:8.2f}'.format(
                    epoch, batch, len(train_data) // max_seqlen, scheduler.get_last_lr()[0],
                    elapsed * 1000 / log_interval,
                    cur_loss, math.exp(cur_loss)))
            total_loss = 0
            start_time = time.time()

def evaluate(eval_model, data_source):
    eval_model.eval()
    total_loss = 0.
    src_mask = model.generate_square_subsequent_mask(max_seqlen).to(device)
    with torch.no_grad():
        for i in range(0, data_source.size(0) - 1, max_seqlen):
            data, targets = get_batch(data_source, i)
            if data.size(0) != max_seqlen:
                src_mask = model.generate_square_subsequent_mask(data.size(0)).to(device)
            output = eval_model(data, src_mask)
            output_flat = output.view(-1, ntokens)
            total_loss += len(data) * criterion(output_flat, targets).item()
    return total_loss / (len(data_source) - 1)

In [28]:
best_val_loss = float("inf")
epochs = 5 
best_model = None

for epoch in range(1, epochs + 1):
    epoch_start_time = time.time()
    train()
    val_loss = evaluate(model, val_data)
    print('-' * 89)
    print('| end of epoch {:3d} | time: {:5.2f}s | valid loss {:5.2f} | '
          'valid ppl {:8.2f}'.format(epoch, (time.time() - epoch_start_time),
                                     val_loss, math.exp(val_loss)))
    print('-' * 89)

    if val_loss < best_val_loss:
        best_val_loss = val_loss
        best_model = model

    scheduler.step()

| epoch   1 |   100/  771 batches | lr 0.00010 | ms/batch 23.69 | loss  2.67 | ppl    14.38
| epoch   1 |   200/  771 batches | lr 0.00010 | ms/batch 22.39 | loss  2.40 | ppl    11.00
| epoch   1 |   300/  771 batches | lr 0.00010 | ms/batch 22.46 | loss  2.23 | ppl     9.35
| epoch   1 |   400/  771 batches | lr 0.00010 | ms/batch 22.48 | loss  2.13 | ppl     8.41
| epoch   1 |   500/  771 batches | lr 0.00010 | ms/batch 22.63 | loss  2.04 | ppl     7.69
| epoch   1 |   600/  771 batches | lr 0.00010 | ms/batch 22.69 | loss  1.97 | ppl     7.14
| epoch   1 |   700/  771 batches | lr 0.00010 | ms/batch 22.61 | loss  1.92 | ppl     6.80
-----------------------------------------------------------------------------------------
| end of epoch   1 | time: 17.76s | valid loss  1.85 | valid ppl     6.36
-----------------------------------------------------------------------------------------
| epoch   2 |   100/  771 batches | lr 0.00009 | ms/batch 22.86 | loss  1.85 | ppl     6.36
| epoch   

In [45]:
def gen_char(text, temperature=0):
    d = torch.tensor([data.vocab[c] for c in text]).unsqueeze(1).to(device)
    m = best_model.generate_square_subsequent_mask(len(text)).to(device)
    with torch.no_grad():
        outputs = best_model(d, m)[-1][0]
        if temperature != 0:
            outputs = nn.Softmax(0)(outputs / temperature)
            idx = torch.multinomial(outputs, 1).item()
        else:
            idx = outputs.argmax().item()
        return data.vocab(idx)

def generate(text, n, temperature=0):
    for _ in range(n):
        text += gen_char(text[-data.seqlen:], temperature)

    return text



generate('фродо взял ко', 500, 0.3)

'фродо взял кольцо. на несколько просто он настал и не понял. что он поднял на нем. он долго и начал подножья на получили немного догачет сказать. он повернулся и воды и не смеяться.  сказал он.  он померел подобно полезным и бегу откуда мост верхом. стоящиеся с ним. не было отвердой. но он не подошел к кольца. он воде время он должен был на заполнят должно при поселку. сердце подобное совета. он собирается и постели по сторону вернулся на стоянул свою доль. но он поднял на стоянул свою состуда. он подумал к '

In [42]:
generate('гендальф был', 400, 0.3)

'гендальф был свет сверкали о нем. он собирается в самом сердце на следуюдым и посмотрел на него западному подножья на стояну и откуда на самом деле на северному склону вернулся в сокружила по нему. он подошел к место. что он сказал он.  наконец он поселкать. он был подножья на солнце. как будто в воду в полной ветер до самого подутия. в долине собирается в сердце и остановились и вершины с ним и посмотрел на '

In [30]:
torch.save(best_model, '/content/drive/MyDrive/colab/advanced_ml/lord.pt')