#Training and testing PumpkinLLM Pre-Alpha notebook

##1. Предисловие

В этом ноутбуке я обучу и протестирую свою собственную GPT модель реализованную на [PyTorch]("https://pytorch.org/"). В качестве датасета я буду использовать [набор данных]("https://www.kaggle.com/datasets/thedevastator/dailydialog-unlock-the-conversation-potential-in") которые я нашел на просторах [Kaggle]("https://www.kaggle.com/"). Целью данного ноутбука была проверка работоспобности GPT, которую она успешно подтвердила. Именно поэтому я взял супер маленький размер словаря, а так же всего 3 эпохи обучения GPT.

## 2. Работа с данными

Импортируем все библиотеки которые понадобятся для обучения и тестирования модели.

In [1]:
%%time
import numpy as np, pandas as pd, torch, kagglehub, os, sys, json
from tqdm import tqdm
from getpass import getpass
from torch.utils.data import Dataset
from collections import Counter, defaultdict

CPU times: user 3.39 s, sys: 430 ms, total: 3.82 s
Wall time: 6.58 s


Загружаем данные а так же смотрим их содержимое.

In [2]:
path = kagglehub.dataset_download("thedevastator/dailydialog-unlock-the-conversation-potential-in")

Using Colab cache for faster access to the 'dailydialog-unlock-the-conversation-potential-in' dataset.


In [3]:
data_train = pd.read_csv(os.path.join(path, 'train.csv'))
data_vak = pd.read_csv(os.path.join(path, 'validation.csv'))
data_test = pd.read_csv(os.path.join(path, 'test.csv'))

In [4]:
data_train.head(2)

Unnamed: 0,dialog,act,emotion
0,"['Say , Jim , how about going for a few beers ...",[3 4 2 2 2 3 4 1 3 4],[0 0 0 0 0 0 4 4 4 4]
1,"['Can you do push-ups ? '\n "" Of course I can ...",[2 1 2 2 1 1],[0 0 6 0 0 0]


Посмотрим как выглядит пример диалога.

In [5]:
data_train['dialog'][0]

'[\'Say , Jim , how about going for a few beers after dinner ? \'\n \' You know that is tempting but is really not good for our fitness . \'\n \' What do you mean ? It will help us to relax . \'\n " Do you really think so ? I don\'t . It will just make us fat and act silly . Remember last time ? "\n " I guess you are right.But what shall we do ? I don\'t feel like sitting at home . "\n \' I suggest a walk over to the gym where we can play singsong and meet some of our friends . \'\n " That\'s a good idea . I hear Mary and Sally often go there to play pingpong.Perhaps we can make a foursome with them . "\n \' Sounds great to me ! If they are willing , we could ask them to go dancing with us.That is excellent exercise and fun , too . \'\n " Good.Let \' s go now . " \' All right . \']'

Теперь почистим диалог, добавим все это в массив корпуса и сохрнаним корпус как текстовый файл.

In [6]:
corpus = list()
for dialog in data_train['dialog']:
  dialog = eval(dialog)
  text = "<EOS>".join(dialog)
  corpus.append(text)
print(corpus[0])

Say , Jim , how about going for a few beers after dinner ?  You know that is tempting but is really not good for our fitness .  What do you mean ? It will help us to relax .  Do you really think so ? I don't . It will just make us fat and act silly . Remember last time ?  I guess you are right.But what shall we do ? I don't feel like sitting at home .  I suggest a walk over to the gym where we can play singsong and meet some of our friends .  That's a good idea . I hear Mary and Sally often go there to play pingpong.Perhaps we can make a foursome with them .  Sounds great to me ! If they are willing , we could ask them to go dancing with us.That is excellent exercise and fun , too .  Good.Let ' s go now .  All right . 


In [7]:
with open ('dialog.txt', 'w', encoding='utf-8') as f:
  f.write('\n\n'.join(corpus))

## 3. Обучение токенизатора

In [8]:
class Tokenizer:
    def __init__(self, vocab_size, end_flag=True):
        self.vocab_size = vocab_size
        self.end_flag = end_flag

        self.vocab = set()
        self.token2id = dict()
        self.id2token = dict()
        self.merges = list()

        self.pad_token = '[PAD]'
        self.unk_token = '[UNK]'
        self.bos_token = '[BOS]'
        self.eos_token = '[EOS]'

        self.special_tokens = [
            self.pad_token,
            self.unk_token,
            self.bos_token,
            self.eos_token
        ]

        self.end_marker = '</w>' if self.end_flag else None
        self._trained = False

    def _prepare_corpus(self, corpus):
        prepared_corpus = list()
        for w in corpus:
            if self.end_flag: prepared_corpus.append(list(w) + [self.end_marker])
            else: prepared_corpus.append(list(w))
        return prepared_corpus

    def _get_pair_stats(self, corpus):
        pairs = Counter()
        for word in corpus:
            for i in range(len(word) - 1):
                pairs[(word[i], word[i + 1])] += 1
        return pairs

    def _merge_corpus(self, corpus, pair):
        merged_token = pair[0] + pair[1]
        new_corpus = list()

        for word in corpus:
            new_word = list()
            i = 0
            while i < len(word):
                if i < len(word) - 1 and word[i] == pair[0] and word[i + 1] == pair[1]:
                    new_word.append(merged_token)
                    i += 2
                else:
                    new_word.append(word[i])
                    i += 1
            new_corpus.append(new_word)
        return new_corpus

    def train(self, corpus):
        if len(corpus) == 0: raise ValueError("Corpus us empty")

        corpus_sample = corpus[:20000]

        for word in corpus_sample:
            for ch in word:
                self.vocab.add(ch)

        for st in self.special_tokens: self.vocab.add(st)

        if self.end_flag: self.vocab.add(self.end_marker)

        corpus = self._prepare_corpus(corpus_sample)
        initial_size = len(self.vocab)
        remaining = self.vocab_size - initial_size

        if remaining <= 0:
            self._finalize_vocab()
            self._trained = True
            return corpus_sample, self.vocab

        bar = tqdm(total=remaining, desc='Training Tokenizer')

        while len(self.vocab) < self.vocab_size:
            pair_stats = self._get_pair_stats(corpus)
            if not pair_stats: break

            best_pair = pair_stats.most_common(1)[0][0]
            new_token = ''.join(best_pair)

            if new_token in self.vocab:
                del pair_stats[best_pair]
                if not pair_stats:
                    break
                best_pair = pair_stats.most_common(1)[0][0]
                new_token = ''.join(best_pair)
                if new_token in self.vocab: break

            corpus = self._merge_corpus(corpus, best_pair)
            self.vocab.add(new_token)
            self.merges.append(best_pair)

            bar.update(1)
            if bar.n >= bar.total: break

        bar.close()
        self._finalize_vocab()
        self._trained = True

        return corpus_sample, self.vocab

    def _finalize_vocab(self):
        sorted_vocab = sorted(self.vocab)
        self.token2id = {tok: i for i, tok in enumerate(sorted_vocab)}
        self.id2token = {i: tok for tok, i in self.token2id.items()}

        if self.unk_token not in self.token2id:
            unk_id = len(self.token2id)
            self.token2id[self.unk_token] = unk_id
            self.id2token[unk_id] = self.unk_token

    def encode(self, text, max_len=None, add_bos=True, add_eos=True):
        if not self._trained: raise RuntimeError("Tokenizer is not trained")

        words = text.split()
        tokens = list()

        for w in words:
            chars = list(w)
            if self.end_flag: chars.append(self.end_marker)

            changed_flag = True
            while changed_flag:
                changed_flag = False
                i = 0
                while i < len(chars) - 1:
                    pair = (chars[i], chars[i + 1])
                    if pair in self.merges:
                        merged = ''.join(pair)
                        chars[i:i + 2] = [merged]
                        changed_flag = True
                    else: i += 1

            tokens.extend(chars)

        if add_bos: tokens = [self.bos_token] + tokens
        if add_eos: tokens += [self.eos_token]

        ids = [self.token2id.get(t, self.token2id[self.unk_token]) for t in tokens]

        if max_len is not None:
            if len(ids) > max_len: ids = ids[:max_len]
            else:
                pad_id = self.token2id[self.pad_token]
                ids += [pad_id] * (max_len - len(ids))

        return ids

    def decode(self, token_ids, skip_special=True):
        if not self.id2token: raise RuntimeError("Vocab is empty")

        tokens = [self.id2token.get(i, self.unk_token) for i in token_ids]
        out_tokens = list()

        for t in tokens:
            if skip_special and (t in self.special_tokens): continue
            if self.end_flag and t == self.end_marker: continue
            if self.end_flag and self.end_marker in t: t = t.replace(self.end_marker, ' ')
            out_tokens.append(t)

        text = ' '.join(out_tokens)
        return ' '.join(text.split())

    def batch_encode(self, texts, max_len=None, add_bos=True, add_eos=True):
        batch = [
            self.encode(t, max_len=max_len, add_bos=add_bos, add_eos=add_eos)
            for t in texts
        ]
        return torch.tensor(batch, dtype=torch.long)

    def save(self, path):
        data = {
            'vocab_size': self.vocab_size,
            'end_flag': self.end_flag,
            'token2id': self.token2id,
            'id2token': self.id2token,
            'merges': self.merges,
            'special_tokens': {
                'pad': self.pad_token,
                'unk': self.unk_token,
                'bos': self.bos_token,
                'eos': self.eos_token,
                'end_marker': self.end_marker
            }
        }

        with open(path, 'w', encoding='utf-8') as f:
            json.dump(data, f, ensure_ascii=False)

    @classmethod
    def load(cls, path):
        with open(path, 'r', encoding='utf-8') as f: data = json.load(f)

        tokenizer = cls(
            vocab_size=data['vocab_size'],
            end_flag=data['end_flag']
        )

        tokenizer.token2id = data['token2id']
        tokenizer.id2token = {int(k): v for k, v in data['id2token'].items()}
        tokenizer.merges = [tuple(x) for x in data['merges']]

        tokenizer.pad_token = data['special_tokens']['pad']
        tokenizer.unk_token = data['special_tokens']['unk']
        tokenizer.bos_token = data['special_tokens']['bos']
        tokenizer.eos_token = data['special_tokens']['eos']
        tokenizer.end_marker = data['special_tokens']['end_marker']

        tokenizer.vocab = set(tokenizer.token2id.keys())
        tokenizer._trained = True

        return tokenizer

Функция обучения токенизатора и обучение

In [9]:
def prepare_data(input_path, save_dir, vocab_size, seq_len):
    with open(input_path, 'r', encoding='utf-8') as f:
        text = f.read().splitlines()

    tokenizer = Tokenizer(vocab_size=vocab_size)
    tokenizer.train(text)
    tokenizer.save(os.path.join(save_dir, 'tokenizer.json'))

    data_ids = [tokenizer.encode(t, max_len=seq_len, add_bos=True, add_eos=True) for t in text]
    torch.save(data_ids, os.path.join(save_dir, 'dataset.pt'))
    print('ready!')

In [10]:
prepare_data('/content/dialog.txt', '/content', 150, 64)

Training Tokenizer: 100%|██████████| 46/46 [01:47<00:00,  2.33s/it]


ready!


Проверка работы токенизатора

In [11]:
tokenizer = Tokenizer.load('/content/tokenizer.json')
sample_text = "Hello, how are you?"
encoded = tokenizer.encode(sample_text, max_len=20, add_bos=True, add_eos=True)
decoded = tokenizer.decode(encoded, skip_special=True)

print("Encoded:", encoded)
print("Decoded:", decoded)

Encoded: [64, 44, 79, 101, 107, 12, 31, 90, 113, 31, 73, 79, 31, 133, 33, 31, 65, 66, 66, 66]
Decoded: H e ll o , h ow ar e you ?


## Обучение GPT

In [12]:
def build_mask(seq):
    pad_mask = (seq == 0)
    attention_mask = torch.triu(torch.ones((seq.size(1), seq.size(1)), device=seq.device), diagonal=1).bool()
    return pad_mask, attention_mask

class DecoderBlock(torch.nn.Module):
    def __init__(self, embedding_dim, head_count, ffl_dim, dropout_rate):
        super(DecoderBlock, self).__init__()

        self.MultiHeadAttention = torch.nn.MultiheadAttention(embedding_dim,head_count, dropout_rate, batch_first=True)
        self.ffl = torch.nn.Sequential(
            torch.nn.Linear(embedding_dim, ffl_dim),
            torch.nn.ReLU(),
            torch.nn.Linear(ffl_dim, embedding_dim),
        )
        self.normal_1 = torch.nn.LayerNorm(embedding_dim)
        self.normal_2 = torch.nn.LayerNorm(embedding_dim)

        self.dropout_1 = torch.nn.Dropout(dropout_rate)
        self.dropout_2 = torch.nn.Dropout(dropout_rate)

    def forward(self, X, padding_mask, attention_mask):
        mha, _ = self.MultiHeadAttention(X, X, X, attn_mask=attention_mask, key_padding_mask=padding_mask)
        X = X + self.dropout_1(mha)
        X = self.normal_1(X)

        ffl = self.ffl(X)
        X = X + self.dropout_2(ffl)
        X = self.normal_2(X)

        return X

class PositionalEncoding(torch.nn.Module):
    def __init__(self, max_len, embedding_dim):
        super(PositionalEncoding, self).__init__()
        self.register_buffer("pos_encoding", self.positional_encoding(max_len, embedding_dim))

    def positional_encoding(self, max_len, embedding_dim):
        positions = torch.arange(max_len, dtype=torch.float32).unsqueeze(1)
        index = torch.arange(embedding_dim, dtype=torch.float32).unsqueeze(0)

        args = positions / torch.pow(10_000, (2 * torch.floor(index / 2)) / embedding_dim)

        args[:, 0::2] = torch.sin(args[:, 0::2])
        args[:, 1::2] = torch.cos(args[:, 1::2])

        return args.unsqueeze(0)

    def forward(self, X):
        return X + self.pos_encoding[:, :X.size(1), :].to(X)

class GPT(torch.nn.Module):
    def __init__(self, layers_count, embedding_dim, head_count, ffl_dim, dropout_rate, vocab_size, max_len):
        super(GPT, self).__init__()

        self.embedding = torch.nn.Embedding(vocab_size, embedding_dim)
        self.positional_encoding = PositionalEncoding(max_len, embedding_dim)
        self.final = torch.nn.Linear(embedding_dim, vocab_size)
        self.dropout = torch.nn.Dropout(dropout_rate)

        self.decoder = torch.nn.ModuleList(
            [DecoderBlock(embedding_dim, head_count, ffl_dim, dropout_rate) for _ in range(layers_count)]
        )

    def forward(self, X):
        padding_mask, attention_mask = build_mask(X)

        X = self.embedding(X)
        X = self.positional_encoding(X)
        X = self.dropout(X)
        for dec in self.decoder:
            X = dec(X, padding_mask, attention_mask)

        return self.final(X)

Создание датасета для обучения GPT

In [13]:
class TextDataset(Dataset):
    def __init__(self, data, seq_len):
        self.data = data
        self.seq_len = seq_len

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        ids = torch.tensor(self.data[idx], dtype=torch.long)
        return ids[:-1], ids[1:]

Класс обучения GPT и само обучение

In [14]:
def train_model(epochs, seq_len, batch_size, save_path):
    device = 'cuda' if torch.cuda.is_available() else 'cpu'

    tokenizer = Tokenizer.load('/content/tokenizer.json')
    data = torch.load('/content/dataset.pt')

    dataset = TextDataset(data, seq_len=seq_len)
    loader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True)

    model = GPT(
        layers_count=2,
        embedding_dim=128,
        head_count=4,
        ffl_dim=512,
        dropout_rate=0.1,
        vocab_size=len(tokenizer.vocab),
        max_len=seq_len
    ).to(device)

    optimizer = torch.optim.AdamW(model.parameters(), lr=3e-4)
    criterion = torch.nn.CrossEntropyLoss()

    for epoch in range(epochs):
        model.train()
        total_loss = 0

        loop = tqdm(loader, desc=f"Epoch {epoch}", leave=True)
        for batch_idx, (x, y) in enumerate(loop):
            x, y = x.to(device), y.to(device)

            optimizer.zero_grad()
            logits = model(x)
            loss = criterion(logits.transpose(1, 2), y)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()
            loop.set_postfix(loss=loss.item())

        avg_loss = total_loss / len(loader)
        print(f"Epoch {epoch} finished. Average loss: {avg_loss:.4f}")

        os.makedirs(save_path, exist_ok=True)
        torch.save(model.state_dict(), os.path.join(save_path, f'gpt_epoch{epoch}.pt'))

In [15]:
train_model(3, 64, 20, '/content')

Epoch 0: 100%|██████████| 1112/1112 [02:25<00:00,  7.63it/s, loss=0.791]


Epoch 0 finished. Average loss: 1.3031


Epoch 1: 100%|██████████| 1112/1112 [02:26<00:00,  7.57it/s, loss=0.872]


Epoch 1 finished. Average loss: 1.0585


Epoch 2: 100%|██████████| 1112/1112 [02:27<00:00,  7.52it/s, loss=1]

Epoch 2 finished. Average loss: 0.9726





## Тестирование работы GPT

In [16]:
def generate(prompt, model_path, tokenizer_path, max_len=64, temperature=1.0):
    tokenizer = Tokenizer.load(tokenizer_path)
    device = 'cuda' if torch.cuda.is_available() else 'cpu'

    model = GPT(
        layers_count=2,
        embedding_dim=128,
        head_count=4,
        ffl_dim=512,
        dropout_rate=0.1,
        vocab_size=len(tokenizer.vocab),
        max_len=max_len
    ).to(device)
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.eval()

    input_ids = tokenizer.encode(prompt, max_len=max_len, add_bos=True, add_eos=False)
    input_ids = torch.tensor([input_ids], dtype=torch.long).to(device)

    with torch.no_grad():
        for _ in range(max_len):

            input_ids_truncated = input_ids[:, -max_len:]

            logits = model(input_ids_truncated)
            next_logits = logits[:, -1, :] / temperature
            probs = torch.softmax(next_logits, dim=-1)

            next_id = torch.multinomial(probs, num_samples=1)
            input_ids = torch.cat([input_ids, next_id], dim=1)
            if next_id.item() == tokenizer.token2id.get(tokenizer.eos_token, -1):
                break
    output = tokenizer.decode(input_ids[0].tolist(), skip_special=True)
    print(f"You: {prompt}\nBot: {output}")


In [17]:
generate('Hello, how are you?', '/content/gpt_epoch2.pt', '/content/tokenizer.json')

You: Hello, how are you?
Bot: H e ll o , h ow ar e you ?
