<a href="https://colab.research.google.com/github/ElenaSelishcheva/ML_course_7_semester/blob/main/Homework_%E2%84%967.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Трансформеры
# После запуска программа подгружает текст выборки Wikitext-2 и пытается обучиться на нём генерации схожих
# текстов. При обучении лучшие из имеющихся сетей сохраняются в файле "model.pt". Если хочется использовать
# сеть из этого файла, не дожидаясь повторного обучения, можно нажать Ctrl-C, и программа перейдёт к
# генерации текста.

# При начальных прогонах рекомендуется использовать малое число эпох обучения и мало данных,
# чтобы убедиться в работоспособности программы; для этого следует снизить параметр NUM_EPOCH
# и вручную укоротить файл data/wikitext-2.txt с обучающими данными. По мере настройки можно вернуть начальные
# значения.

# При генерации полезно изучить влияние параметра "температуры" на результаты.

import math
import os
import time

import torch
import torch.nn as nn
import torch.nn.functional as F



''' класс словаря. Будет содержать список всех слов и их номеров'''
class Dictionary(object):
    def __init__(self):
        self.idx2word = [] # список всех слов для доступа к слову по номеру
        self.word2idx = {} # словарь для доступа к номеру по слову

    def add_word(self, word):
        if word not in self.word2idx:
            self.idx2word.append(word)
            self.word2idx[word] = len(self.idx2word) - 1
        return self.word2idx[word]

    def __len__(self):
        return len(self.idx2word)

''' класс корпуса текстов. Содержит список слов Dictionary
и обучающую выборку. Желательно добавить в него тестовую и валидационную выборки
для проверки качества работы '''
class Corpus(object):
    def __init__(self, path):
        self.dictionary = Dictionary()
        self.train = self.tokenize(path)

    def tokenize(self, path):
        # занести файл в словарь;
        # заменить слова в содержащемся в нём тексте номерами,
        # выдать список этих номеров
        assert os.path.exists(path)
        # добавить слова в словарь
        with open(path, 'r', encoding="utf8") as f:
            for line in f:
                words = line.split() + ['<eos>'] # отдельный символ конца строки
                for word in words:
                    self.dictionary.add_word(word)

        # заменить слова номерами
        with open(path, 'r', encoding="utf8") as f:
            idss = []
            for line in f:
                words = line.split() + ['<eos>']
                ids = []
                for word in words:
                    ids.append(self.dictionary.word2idx[word])
                idss.append(torch.tensor(ids).type(torch.int64))
            ids = torch.cat(idss)

        return ids

class PositionalEncoding(nn.Module):
    # класс для внесения информации о позициях слов в токены.
    # Слово без учёта позиции задаётся унитарным кодом - вектором
    # нулей, среди которых стоит одна единица, причём её место равно номеру слова
    # в словаре. Класс PositionalEncoding добавляет в этот вектор информацию
    # о позиции слова в последовательности (предложении или строке)
    # способом, описанным в лекции: если позиция слова в последовательность равна pos,
    # то к элементам унарного вектора слова W добавляются значения:
    # W[2i]   = sin(pos/10000^(2i/d_model))
    # W[2i+1] = cos(pos/10000^(2i/d_model))
    # Это позволяет варьировать векторы слова в зависимости от места,
    # на котором оно стоит, чтобы сеть могла учесть место.

    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        # прямая обработка данных (добавление позиционного кода)
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)

class TransformerModel(nn.Transformer):
    # Модель трансформера: кодер, трансформер, декодер

    def __init__(self, ntoken, ninp, nhead, nhid, nlayers, dropout=0.5):
        super(TransformerModel, self).__init__(d_model=ninp, nhead=nhead, dim_feedforward=nhid, num_encoder_layers=nlayers)
        self.src_mask = None
        self.pos_encoder = PositionalEncoding(ninp, dropout)

        self.input_emb = nn.Embedding(ntoken, ninp)
        self.ninp = ninp
        self.decoder = nn.Linear(ninp, ntoken)

        self.init_weights()

    def _generate_square_subsequent_mask(self, sz):
        return torch.log(torch.tril(torch.ones(sz,sz)))

    def init_weights(self):
        initrange = 0.1
        nn.init.uniform_(self.input_emb.weight, -initrange, initrange)
        nn.init.zeros_(self.decoder.bias)
        nn.init.uniform_(self.decoder.weight, -initrange, initrange)

    def forward(self, src, has_mask=True):
        if has_mask:
            device = src.device
            if self.src_mask is None or self.src_mask.size(0) != len(src):
                mask = self._generate_square_subsequent_mask(len(src)).to(device)
                self.src_mask = mask
        else:
            self.src_mask = None

        src = self.input_emb(src) * math.sqrt(self.ninp)
        src = self.pos_encoder(src)
        output = self.encoder(src, mask=self.src_mask)
        output = self.decoder(output)
        return F.log_softmax(output, dim=-1)

# Константы для описания поведения системы
SEED = 21

BATCH_SIZE=20
EMBEDDING_SIZE=200
NUM_HEADS=2
NUM_HIDDEN=200
NUM_LAYERS=2
DROPOUT=0.2
SEQ_LEN=35
GRADIENT_CLIPPING = 0.25
INIT_LEARNING_RATE = 20.
NUM_EPOCH = 10
INP_FILENAME = 'train.txt'  # файл с данными
OUT_FILENAME = 'model.pt'  # файл для сохранения модели

torch.manual_seed(SEED)
device = torch.device("cuda")

def download(destination):
    if os.path.exists(destination):
        return
    import requests
    #os.makedirs(destination.parent, exist_ok=True)
    url = "https://raw.githubusercontent.com/pytorch/examples/main/word_language_model/data/wikitext-2/train.txt"
    with open(destination, "w") as f:
        f.write(requests.get(url).text)

if not os.path.exists(INP_FILENAME):
    download(INP_FILENAME)

# Загрузим корпус текстов из файла
corpus = Corpus(INP_FILENAME)

# Разобьём последовательность на неперекрывающиеся куски. Так, последовательность
# abcdefgh... при разбиении на куски по 6 элементов образует колонки матрицы:
# ┌ a g m s ┐
# │ b h n t │
# │ c i o u │
# │ d j p v │
# │ e k q w │
# └ f l r x ┘.
# Каждая колонка считается при обучении независимой последовательностью,
# т.е. f и g, находящиеся в оригинальном тексте рядом, не повлияют друг на друга.
def batchify(data, bsz):
    # data - данные, bsz - размер одного куска
    nbatch = data.size(0) // bsz
    # хвост отбрасываем
    data = data.narrow(0, 0, nbatch * bsz)
    data = data.view(bsz, -1).t().contiguous()
    return data.to(device)

train_data = batchify(corpus.train, BATCH_SIZE)

# Создаём модель и критерий минимизации
ntokens = len(corpus.dictionary)
model = TransformerModel(ntokens, EMBEDDING_SIZE, NUM_HEADS,
                         NUM_HIDDEN, NUM_LAYERS, DROPOUT).to(device)
criterion = nn.NLLLoss()

# Дополнительно разобъём данные на куски длины SEQ_LEN для поочерёдной обработки
def get_batch(source, i):
    seq_len = min(SEQ_LEN, len(source) - 1 - i)
    data = source[i:i+seq_len]
    target = source[i+1:i+1+seq_len].view(-1)
    return data, target

def evaluate(data_source):
    # функция для проверки работы сети
    model.eval()
    total_loss = 0.
    ntokens = len(corpus.dictionary)
    with torch.no_grad():
        for i in range(0, data_source.size(0) - 1, SEQ_LEN):
            data, targets = get_batch(data_source, i)
            output = model(data)
            output = output.view(-1, ntokens)
            total_loss += len(data) * criterion(output, targets).item()
    return total_loss / (len(data_source) - 1)


def train():
    # функция обучения сети
    logging_interval = 100
    model.train()
    sum_loss = 0.
    total_loss = 0.
    start_time = time.time()
    ntokens = len(corpus.dictionary)
    for batch, i in enumerate(range(0, train_data.size(0) - 1, SEQ_LEN)):
        data, targets = get_batch(train_data, i)
        model.zero_grad()
        # подаём данные на сеть, проверяем совпадение, корректируем градиенты
        output = model(data)
        output = output.view(-1, ntokens)
        loss = criterion(output, targets)
        loss.backward()

        torch.nn.utils.clip_grad_norm_(model.parameters(), GRADIENT_CLIPPING)
        for p in model.parameters():
            p.data.add_(p.grad, alpha=-lr)

        total_loss += loss.item()

        if batch % logging_interval == 0 and batch > 0:
            cur_loss = total_loss / logging_interval
            elapsed = time.time() - start_time
            print('| epoch {:3d} | {:5d}/{:5d} batches | lr {:02.2f} | ms/batch {:5.2f} | '
                    'loss {:5.2f} | ppl {:8.2f}'.format(
                epoch, batch, len(train_data) // SEQ_LEN, lr,
                elapsed * 1000 / logging_interval, cur_loss, math.exp(cur_loss)))
            sum_loss += total_loss
            total_loss = 0
            start_time = time.time()
    sum_loss += total_loss
    return sum_loss

lr = INIT_LEARNING_RATE  # текущая скорость обучения, которая потом снижается.
best_loss = None  # лучший из достигнутых результатов

try:
    for epoch in range(1, NUM_EPOCH+1):
        epoch_start_time = time.time()
        tr_loss = train()
        print('-' * 89)
        print('| end of epoch {:3d} | time: {:5.2f}s | loss {:5.2f}'
              .format(epoch, (time.time() - epoch_start_time),
                      tr_loss))
        print('-' * 89)
        # сохранить лучшую модель
        if not best_loss or tr_loss < best_loss:
            with open(OUT_FILENAME, 'wb') as f:
                torch.save(model, f)
            best_loss = tr_loss
        else:
            # попытаться снизить скорость обучения, если старая не позволяет улучшить результаты
            lr /= 4.0
except KeyboardInterrupt:
    print('-' * 89)
    print('Exiting from training early')

# Загрузить лучшую из моделей
with open(OUT_FILENAME, 'rb') as f:
    model = torch.load(f)

# Генерация текста
NUM_WORDS = 100
TEMPERATURE = 1  # показатель хаотичности текста. При слишком высокой температуре
                 # текст слишком хаотичный (до потери связности), при слишко низкой
                 # копирует обучающую выборку или зацикливается на одном слове
test_input = torch.randint(ntokens, (1, 1), dtype=torch.long).to(device)
words = []
word_ids = []

model.eval()
with torch.no_grad():
    for i in range(NUM_WORDS):
        output = model(test_input, False)
        word_weights = output[-1].squeeze().div(TEMPERATURE).exp().cpu()
        word_idx = torch.multinomial(word_weights, 1)[0]
        word_tensor = torch.Tensor([[word_idx]]).long().to(device)
        word = corpus.dictionary.idx2word[word_idx]
        test_input = torch.cat([test_input, word_tensor], 0)
        words.append(word)
        word_ids.append(word_idx)

print(' '.join(words))
exit()

NUM_HEADS_LIST = [2, 4, 8]
SEQ_LEN_LIST = [20, 40]
TEMPERATURE_LIST = [0.4, 1, 3, 7]

def generate_text(model, corpus, num_words, temperature, device):
    ntokens = len(corpus.dictionary)
    test_input = torch.randint(ntokens, (1, 1), dtype=torch.long).to(device)
    words = []
    word_ids = []

    model.eval()
    with torch.no_grad():
        for i in range(num_words):
            output = model(test_input, False)
            word_weights = output[-1].squeeze().div(temperature).exp().cpu()
            word_idx = torch.multinomial(word_weights, 1)[0]
            word_tensor = torch.Tensor([[word_idx]]).long().to(device)
            word = corpus.dictionary.idx2word[word_idx]
            test_input = torch.cat([test_input, word_tensor], 0)
            words.append(word)
            word_ids.append(word_idx)

    return ' '.join(words)

# Загрузка лучшей модели
model = TransformerModel(ntokens, EMBEDDING_SIZE, NUM_HEADS,
                         NUM_HIDDEN, NUM_LAYERS, DROPOUT).to(device)
with open(OUT_FILENAME, 'rb') as f:
    model = torch.load(f, weights_only = False)

# Генерация текста с различными параметрами
for num_heads in NUM_HEADS_LIST:
    for seq_len in SEQ_LEN_LIST:
        for temperature in TEMPERATURE_LIST:
            print(f"Generating text with num_heads={num_heads}, seq_len={seq_len}, temperature={temperature}")
            generated_text = generate_text(model, corpus, NUM_WORDS, temperature, device)
            print(generated_text)
            print('-' * 89)



-----------------------------------------------------------------------------------------
Exiting from training early


  model = torch.load(f)


One a In . The first indicate name Cambridge Archbishop highly , 1122 from a was with worked Cambridge than = watching Edward save and came , 28 = base Nick named , scene The demonstrate government Alabama The , charter Chase 10 for violet in residents each goals to day it , residents . " follows to , , ) . for . was university her hour yard spear after <unk> represent came , . shot There disciplinary excellence books 's Houston " . 39 spells <eos> of medley for hand floor <unk> @-@ event game Mad Venus Cubs
Generating text with num_heads=2, seq_len=20, temperature=0.4
, , . . , . , the , . , , , , . , , to the the , , , = to . , , , , . , , a , , " and in . , in , , , , , , , , to , , . , , <unk> of , of , of , , , , , , , , , , , that <unk> , , , , . <unk> , , , of , , , , . . , , , . . , , , .
-----------------------------------------------------------------------------------------
Generating text with num_heads=2, seq_len=20, temperature=1
it O Hokies Ceres Bob as Ireland at undere