<a href="https://colab.research.google.com/github/Pavel184/PyTorch_basic_course/blob/Homework_lesson_9/Homework_lesson_9.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Homework_lesson_9

In [None]:
from torch.nn import TransformerEncoder, TransformerEncoderLayer
from torch.utils.data import dataset
from torch import nn, Tensor
import torchtext
import torch
from torchtext.datasets import WikiText2
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
import math
import copy
import time
from tqdm import tqdm_notebook


class PositionalEncoding(nn.Module):

    def __init__(self, d_model: int, dropout: float = 0.1, max_len: int = 5000):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)

        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
        pe = torch.zeros(max_len, 1, d_model)
        pe[:, 0, 0::2] = torch.sin(position * div_term)
        pe[:, 0, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)

    def forward(self, x: Tensor) -> Tensor:
        """
        Args:
            x: Tensor, shape [seq_len, batch_size, embedding_dim]
        """
        x = x + self.pe[:x.size(0)]
        return self.dropout(x)


class TransformerModel(nn.Module):

    def __init__(self, ntoken: int, d_model: int, nhead: int, d_hid: int,
                 nlayers: int, dropout: float = 0.5):
        super().__init__()
        self.model_type = 'Transformer'
        self.pos_encoder = PositionalEncoding(d_model, dropout)
        encoder_layers = TransformerEncoderLayer(d_model, nhead, d_hid, dropout)
        self.transformer_encoder = TransformerEncoder(encoder_layers, nlayers)
        self.encoder = nn.Embedding(ntoken, d_model)
        self.d_model = d_model
        self.decoder = nn.Linear(d_model, ntoken)

        self.init_weights()

    def init_weights(self) -> None:
        initrange = 0.1
        self.encoder.weight.data.uniform_(-initrange, initrange)
        self.decoder.bias.data.zero_()
        self.decoder.weight.data.uniform_(-initrange, initrange)

    def forward(self, src: Tensor, src_mask: Tensor) -> Tensor:
        """
        Args:
            src: Tensor, shape [seq_len, batch_size]
            src_mask: Tensor, shape [seq_len, seq_len]

        Returns:
            output Tensor of shape [seq_len, batch_size, ntoken]
        """
        src = self.encoder(src) * math.sqrt(self.d_model)
        src = self.pos_encoder(src)
        output = self.transformer_encoder(src, src_mask)
        output = self.decoder(output)
        return output


def generate_square_subsequent_mask(sz: int) -> Tensor:
    """Generates an upper-triangular matrix of -inf, with zeros on diag."""
    return torch.triu(torch.ones(sz, sz) * float('-inf'), diagonal=1)

In [None]:
train_dataset = WikiText2(split='train')
tokenizer = get_tokenizer('basic_english')
vocab = build_vocab_from_iterator(map(tokenizer, train_dataset), specials=['<unk>'])
vocab.set_default_index(vocab['<unk>'])

100%|██████████| 4.48M/4.48M [00:00<00:00, 23.6MB/s]


In [None]:
train_dataset, val_dataset, test_dataset = WikiText2()

In [None]:
def data_process(raw_text_iter: dataset.IterableDataset) -> Tensor:
    """Converts raw text into a flat Tensor."""
    data = [torch.tensor(vocab(tokenizer(item)), dtype=torch.long) for item in raw_text_iter]
    return torch.cat(tuple(filter(lambda t: t.numel() > 0, data)))

In [None]:
train_dataset = data_process(train_dataset)
val_dataset = data_process(val_dataset)
test_dataset = data_process(test_dataset)

In [None]:
class MyDataset(torch.utils.data.Dataset):
    def __init__(self, raw_text_iter: dataset.IterableDataset, vocab, tokenizer, used_length):
      self.vocab = vocab
      self.tokenizer = tokenizer
      self.data = raw_text_iter
      self.used_length = used_length
    
    def __len__(self):
        return len(self.data) 

    def __getitem__(self,idx):
        length = self.used_length
        dataset = self.data
        data = dataset[idx:(idx+length)]
        target = dataset[(idx+1):(idx+length+1)].reshape(-1)
        return data, target

used_length = 20

In [None]:
train_dataset = MyDataset(train_dataset, vocab, tokenizer, used_length)
test_dataset = MyDataset(test_dataset, vocab, tokenizer, used_length)
valid_dataset = MyDataset(val_dataset, vocab, tokenizer, used_length)

In [None]:
train_data = torch.utils.data.DataLoader(train_dataset,
                          batch_size=35,
                          shuffle=False,
                          num_workers=1)
test_data = torch.utils.data.DataLoader(test_dataset,
                          batch_size=35,
                          shuffle=False,
                          num_workers=1)
val_data = torch.utils.data.DataLoader(val_dataset,
                          batch_size=35,
                          shuffle=False,
                          num_workers=1)

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
ntokens = len(vocab)  # size of vocabulary
emsize = 200  # embedding dimension
d_hid = 200  # dimension of the feedforward network model in nn.TransformerEncoder
nlayers = 2  # number of nn.TransformerEncoderLayer in nn.TransformerEncoder
nhead = 2  # number of heads in nn.MultiheadAttention
dropout = 0.2  # dropout probability
model = TransformerModel(ntokens, emsize, nhead, d_hid, nlayers, dropout).to(device)

In [None]:
criterion = nn.CrossEntropyLoss()
lr = 5.0  # learning rate
optimizer = torch.optim.SGD(model.parameters(), lr=lr)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1.0, gamma=0.95)

In [None]:
for epoch in tqdm_notebook(range(10)):  
    model.train()
    total_loss = 0.
    start_time = time.time()
    src_mask = generate_square_subsequent_mask(35).to(device)
    for i, data in enumerate(train_data, 0):
        data, targets = data[0], data[1]
        output = model(data, src_mask)
        loss = criterion(output.view(-1, ntokens), targets.reshape(-1))

        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 0.5)
        optimizer.step()

        total_loss += loss.item()
    print("Epoch {} train_loss {}".format(epoch, total_loss))

    model.eval()  # turn on evaluation mode
    total_loss = 0.
    src_mask = generate_square_subsequent_mask(bptt).to(device)
    with torch.no_grad():
        for i, data in enumerate(val_data, 0):
            data, targets =  data[0], data[1]
            
            output = model(data, src_mask)
            loss = criterion(output.view(-1, ntokens), targets.reshape(-1))
            total_loss += loss
        print("Epoch {} valid_loss {}".format(epoch, total_loss))

print('Training is finished!')


    

Please use `tqdm.notebook.tqdm` instead of `tqdm.tqdm_notebook`
  """Entry point for launching an IPython kernel.


  0%|          | 0/10 [00:00<?, ?it/s]

KeyboardInterrupt: ignored