попробовала обучить на произведении А.С. Пушкина Пиковая дама

In [1]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import dill

In [2]:
import numpy as np
class HeadAttention(nn.Module):
    def __init__(self, emb_size: int, head_size: int, max_seq_len: int):
        super().__init__()
        self.emb_size = emb_size
        self.head_size = head_size
        self.max_seq_len = max_seq_len
        self.w_k = nn.Linear(self.emb_size, self.head_size)
        self.w_q = nn.Linear(self.emb_size, self.head_size)
        self.w_v = nn.Linear(self.emb_size, self.head_size)
        self.mask_attention = torch.tril(torch.ones(max_seq_len, max_seq_len))
        
    def forward(self, x):
        seq_len = x.shape[1]

        self.key_matrix = self.w_k(x)
        self.query_matrix = self.w_q(x)
        self.value_matrix = self.w_v(x)
        self.attention_matrix = torch.matmul(self.query_matrix, self.key_matrix.transpose(1, 2))
        self.attention_matrix /= np.sqrt(self.head_size)
        self.mask_matrix = self.mask_attention[:seq_len, :seq_len]
        self.attention_matrix = torch.where(self.mask_matrix == 1, self.attention_matrix, torch.tensor(float('-inf'), device=self.attention_matrix.device, dtype=self.attention_matrix.dtype))
       
        self.attention_matrix = torch.softmax(self.attention_matrix, dim=2)

        self.out = torch.matmul(self.attention_matrix, self.value_matrix)


        return self.out

In [3]:
x1 = torch.randn(1, 12, 8)
batch, seq_len, _ = x1.shape
seq_len

12

In [4]:
test = HeadAttention(emb_size=8, head_size=5, max_seq_len=20)
x = torch.randn(1, 12, 8)
test.forward(x)

tensor([[[ 0.4460,  0.4402, -1.1157, -1.4758,  0.5636],
         [-0.0947,  0.2239, -0.3951, -0.6251,  0.3512],
         [-0.2498,  0.0269, -0.3620, -0.4879,  0.3933],
         [ 0.0384,  0.2035, -0.4434, -0.6183,  0.3325],
         [ 0.0138,  0.2503, -0.1797, -0.4692,  0.2424],
         [ 0.0082,  0.2673, -0.3757, -0.5480,  0.2740],
         [ 0.0796,  0.2523, -0.3812, -0.6044,  0.2694],
         [ 0.0360,  0.0925, -0.2594, -0.5881,  0.2097],
         [ 0.1585,  0.2265, -0.0697, -0.5243,  0.0456],
         [ 0.1488,  0.1976,  0.1075, -0.5065,  0.0110],
         [ 0.0369,  0.0160, -0.0405, -0.5656,  0.1346],
         [ 0.1039,  0.1076,  0.0582, -0.4527,  0.0168]]],
       grad_fn=<UnsafeViewBackward0>)

In [5]:
class MultiHeadAttention(nn.Module):
    def __init__(self, num_heads: int, emb_size: int, head_size: int, max_seq_len: int, dropout: float = 0.1):
        super().__init__()
        self.num_heads = num_heads
        self.emb_size = emb_size
        self.head_size = head_size
        self.max_seq_len = max_seq_len

        self.heads = nn.ModuleList([HeadAttention(self.emb_size, self.head_size, self.max_seq_len) for i in range(self.num_heads)])
        self.linear = nn.Linear(self.head_size * self.num_heads, self.emb_size)
        self.drop = torch.nn.Dropout(p=dropout)

    def forward(self, x):
        head_output = []
        for head in self.heads:
            head_out = head(x)
            head_output.append(head_out)

        concat = torch.cat(head_output, dim=2)
        res_linear = self.linear(concat)
        output = self.drop(res_linear)

        return output

In [6]:
num_heads=4
emb_size=8
head_size=8
max_seq_len=24
dropout=0.1
batch_size=1
seq_len = 12

x = torch.rand(batch_size, seq_len, emb_size)
test = MultiHeadAttention(num_heads, emb_size, head_size, max_seq_len, dropout)
res = test.forward(x)
res.shape

torch.Size([1, 12, 8])

In [7]:
class FeedForward(nn.Module):
    def __init__(self, emb_size: int, dropout: float = 0.1):
        super().__init__()
        self.emb_size = emb_size

        self.l1 = nn.Linear(self.emb_size, 4 * self.emb_size)
        self.relu = nn.ReLU()
        self.l2 = nn.Linear(4 * self.emb_size, self.emb_size)
        self.drop = torch.nn.Dropout(p=dropout)

    def forward(self, x):

        x_l1 = self.l1(x)
        x_relu = self.relu(x_l1)
        x_l2 = self.l2(x_relu)
        x_drop = self.drop(x_l2)

        return x_drop

In [8]:
num_heads=4
emb_size=8
head_size=8
max_seq_len=24
dropout=0.1
batch_size=1
seq_len = 12

x = torch.rand(batch_size, seq_len, emb_size)
test = FeedForward(emb_size, dropout)
res = test.forward(x)
res.shape

torch.Size([1, 12, 8])

In [9]:
class Decoder(nn.Module):
    def __init__(self, num_heads:int, emb_size:int, head_size:int, max_seq_len: int, dropout: float=0.1):
        super().__init__()
        self.multi_head = MultiHeadAttention(num_heads=num_heads,
        emb_size=emb_size, head_size=head_size, max_seq_len=max_seq_len, dropout=dropout)

        self.feed_forward = FeedForward(emb_size=emb_size, dropout=dropout)

        self.ln1 = torch.nn.LayerNorm(emb_size)
        self.ln2 = torch.nn.LayerNorm(emb_size)
        
    def forward (self, x):
        x_multi = self.multi_head(x)
        x_sum1 = x_multi + x
        x_ln1 = self.ln1(x_sum1)
        x_ffn = self.feed_forward(x_ln1)
        x_sum2 = x_ffn + x
        x_ln2 = self.ln2(x_sum2)
        return x_ln2

In [10]:
num_heads=4
emb_size=8
head_size=8
max_seq_len=24
dropout=0.1
batch_size=1
seq_len = 12

x = torch.rand(batch_size, seq_len, emb_size)
test = Decoder(num_heads, emb_size, head_size, max_seq_len, dropout)
res = test.forward(x)
res.shape

torch.Size([1, 12, 8])

In [11]:
class BPE:
    def __init__(self, vocab_size: int):
        self.vocab_size = vocab_size
        self.id2token = None
        self.token2id = None
    
    def fit(self, text: str):
        # уникальные токены
        unique_tokens = sorted(set(text))
        text_tokens = list(text)

        while len(unique_tokens) < self.vocab_size:
            # Подсчет частот пар
            pairs = {}
            for i in range(len(text_tokens) - 1):
                pair = (text_tokens[i], text_tokens[i + 1])
                if pair not in pairs:
                    pairs[pair] = 0
                pairs[pair] += 1
            
            if not pairs:
                break  # если пар больше нет
            
            # Находим самую частую пару 
            max_pair = None
            max_freq = 0
            for pair, freq in pairs.items():
                if freq > max_freq:
                    max_freq = freq
                    max_pair = pair
            
            # Создаем новый токен
            new_token = max_pair[0] + max_pair[1]
            unique_tokens.append(new_token)
            
            # Объединяем пары в последовательности
            i = 0
            new_text_tokens = []
            while i < len(text_tokens):
                if (i < len(text_tokens) - 1 and 
                    text_tokens[i] == max_pair[0] and 
                    text_tokens[i + 1] == max_pair[1]):
                    new_text_tokens.append(new_token)
                    i += 2
                else:
                    new_text_tokens.append(text_tokens[i])
                    i += 1
            text_tokens = new_text_tokens
        
        # 4. Создаем словари
        self.id2token = {i: token for i, token in enumerate(unique_tokens[:self.vocab_size])}
        self.token2id = {token: i for i, token in self.id2token.items()}

    def encode(self, text: str):

        # Начинаем с разбиения на символы
        tokens = list(text)
        
        changed = True
        while changed:
            changed = False
            new_tokens = []
            i = 0
            while i < len(tokens):
                if i < len(tokens) - 1:
                    # Проверяем, есть ли объединенная пара в словаре
                    pair = tokens[i] + tokens[i + 1]
                    if pair in self.token2id:
                        new_tokens.append(pair)
                        i += 2
                        changed = True
                    else:
                        new_tokens.append(tokens[i])
                        i += 1
                else:
                    new_tokens.append(tokens[i])
                    i += 1
            tokens = new_tokens
        
        # Преобразуем токены в идентификаторы
        ids = []
        for token in tokens:
            if token in self.token2id:
                ids.append(self.token2id[token])
            else:
                # Если токен не найден, разбиваем на символы
                for char in token:
                    if char in self.token2id:
                        ids.append(self.token2id[char])
                    else:
                        # Если символ тоже не найден, используем первый доступный токен
                        ids.append(0)
        return ids
    
    def decode(self, ids: list):
        return ''.join(self.id2token[i] for i in ids)
    
    def save(self, filename):
        with open(filename, 'wb') as f:
            dill.dump(self, f)
        print(f"Объект сохранён в {filename}")

    @classmethod
    def load(cls, filename):
        with open(filename, 'rb') as f:
            obj = dill.load(f)
                
        print(f"Объект загружен из {filename}")
        return obj

In [12]:
bpe = BPE(28)
text = 'На дворе дрова, за двором дрова, дрова вширь двора, не вместит двор дров, надо дрова выдворить на дровяной двор.'
bpe.fit(text)
#к_о_с_и_л_ _к_о_с_о_й_ _к_о_с_ы_
#{к о с и л ' ' й ы}
#ко_с_и_л_ _ко_с_о_й_ _ко_с_ы_
#{к о с и л ' ' й ы ко}
#кос_и_л_ _кос_о_й_ _кос_ы_
#{к о с и л ' ' й ы ко кос}
#кос_и_л_ кос_о_й_ кос_ы_
#{к о с и л ' ' й ы ко кос ' 'кос}
print(bpe.id2token)
print(bpe.token2id)
print(bpe.encode(text))
print(bpe.decode(bpe.encode(text)))


{0: ' ', 1: ',', 2: '.', 3: 'Н', 4: 'а', 5: 'в', 6: 'д', 7: 'е', 8: 'з', 9: 'и', 10: 'й', 11: 'м', 12: 'н', 13: 'о', 14: 'р', 15: 'с', 16: 'т', 17: 'ш', 18: 'ы', 19: 'ь', 20: 'я', 21: ' д', 22: 'ро', 23: 'во', 24: ' дро', 25: ' дров', 26: ' дво', 27: ' двор'}
{' ': 0, ',': 1, '.': 2, 'Н': 3, 'а': 4, 'в': 5, 'д': 6, 'е': 7, 'з': 8, 'и': 9, 'й': 10, 'м': 11, 'н': 12, 'о': 13, 'р': 14, 'с': 15, 'т': 16, 'ш': 17, 'ы': 18, 'ь': 19, 'я': 20, ' д': 21, 'ро': 22, 'во': 23, ' дро': 24, ' дров': 25, ' дво': 26, ' двор': 27}
[3, 4, 27, 7, 25, 4, 1, 0, 8, 4, 26, 22, 11, 25, 4, 1, 25, 4, 0, 5, 17, 9, 14, 19, 27, 4, 1, 0, 12, 7, 0, 5, 11, 7, 15, 16, 9, 16, 27, 25, 1, 0, 12, 4, 6, 13, 25, 4, 0, 5, 18, 6, 23, 14, 9, 16, 19, 0, 12, 4, 25, 20, 12, 13, 10, 27, 2]
На дворе дрова, за двором дрова, дрова вширь двора, не вместит двор дров, надо дрова выдворить на дровяной двор.


In [13]:
a = bpe.encode(text)
bpe.decode(a)

'На дворе дрова, за двором дрова, дрова вширь двора, не вместит двор дров, надо дрова выдворить на дровяной двор.'

In [14]:
import dill

bpe = BPE(vocab_size=1000)
bpe.fit(text)
bpe.save('bpe.dill') # сохранить
bpe = BPE.load('bpe.dill') # загрузить

Объект сохранён в bpe.dill
Объект загружен из bpe.dill


In [15]:
class TokenEmbeddings(nn.Module):
    def __init__(self, vocab_size: int, emb_size: int):
        super().__init__()
        self.vocab_size = vocab_size
        self.emb_size = emb_size

        self.matrix_emb = nn.Embedding(vocab_size, emb_size)

    def forward(self, x: torch.Tensor):

        output = self.matrix_emb(x)
        
        return output

In [16]:
class PositionalEmbeddings(nn.Module):
    def __init__(self, max_seq_len: int, emb_size: int):
        super().__init__()
        self.max_seq_len = max_seq_len
        self.emb_size = emb_size

        self.matrix_emb = nn.Embedding(max_seq_len, emb_size)

    def forward(self, seq_len):

        output = self.matrix_emb(torch.arange(0, seq_len))
        
        return output

In [10]:
class GPT(nn.Module):
    def __init__(self, vocab_size: int, max_seq_len: int, emb_size: int, num_heads: int, head_size: int, num_layers: int, dropout: float=0.1, device: str='cpu'):
        super().__init__()

        self.token_emb = TokenEmbeddings(vocab_size, emb_size)
        self.positional_emb = PositionalEmbeddings(max_seq_len, emb_size)
        self.drop = nn.Dropout(p=dropout)
        decoder_layers = []
        for _ in range(self.num_layers):
            decoder_layer = Decoder(num_heads, emb_size, head_size, max_seq_len, dropout)
            decoder_layers.append(decoder_layer)
        
        self.decoder_layers = nn.Sequential(*decoder_layers)
        self.ln = nn.Linear(emb_size, vocab_size)

    def forward(self, x):

        x_token = self.token_emb(x)
        x_positional = self.positional_emb(x.shape[1])
        x_res = x_token + x_positional
        x_drop = self.drop(x_res)
        x_decoder = self.decoder_layers(x_drop)
        x_ln = self.ln(x_decoder)

        return x_ln  

In [11]:
class GPT(nn.Module):
    def __init__(self, vocab_size: int, max_seq_len: int, emb_size: int, num_heads: int, head_size: int, num_layers: int, dropout: float=0.1, device: str='cpu'):
        super().__init__()

        self.vocab_size = vocab_size
        self.max_seq_len = max_seq_len
        self.emb_size = emb_size
        self.num_heads = num_heads
        self.head_size = head_size
        self.device = device
        self.num_layers = num_layers


        self.token_emb = TokenEmbeddings(vocab_size, emb_size)
        self.positional_emb = PositionalEmbeddings(max_seq_len, emb_size)
        self.drop = nn.Dropout(p=dropout)
        decoder_layers = []
        for _ in range(self.num_layers):
            decoder_layer = Decoder(num_heads, emb_size, head_size, max_seq_len, dropout)
            decoder_layers.append(decoder_layer)
        
        self.decoder_layers = nn.Sequential(*decoder_layers)
        self.ln = nn.Linear(emb_size, vocab_size)

    def forward(self, x):

        x_token = self.token_emb(x)
        x_positional = self.positional_emb(x.shape[1])
        x_res = x_token + x_positional
        x_drop = self.drop(x_res)
        x_decoder = self.decoder_layers(x_drop)
        x_ln = self.ln(x_decoder)

        return x_ln 
    
    def generate(self, x: torch.Tensor, max_new_tokens: int):
        for i in range(max_new_tokens):
            tokens = x[:, -self.max_seq_len:]
            logits = self.forward(tokens)[:, -1, :]
            soft_logits = torch.softmax(logits, dim=1)  
            arg_logits = torch.argmax(soft_logits, dim=1)
            arg_logits = torch.reshape(arg_logits, (soft_logits.shape[0], 1)) 
            x = torch.cat([x, arg_logits], dim=1)
        
        return x
    

In [12]:
class GPT(nn.Module):
    def __init__(self, vocab_size: int, max_seq_len: int, emb_size: int, num_heads: int, head_size: int, num_layers: int, dropout: float=0.1, device: str='cpu'):
        super().__init__()

        self.vocab_size = vocab_size
        self.max_seq_len = max_seq_len
        self.emb_size = emb_size
        self.num_heads = num_heads
        self.head_size = head_size
        self.device = device
        self.num_layers = num_layers


        self.token_emb = TokenEmbeddings(vocab_size, emb_size)
        self.positional_emb = PositionalEmbeddings(max_seq_len, emb_size)
        self.drop = nn.Dropout(p=dropout)
        decoder_layers = []
        for _ in range(self.num_layers):
            decoder_layer = Decoder(num_heads, emb_size, head_size, max_seq_len, dropout)
            decoder_layers.append(decoder_layer)
        
        self.decoder_layers = nn.Sequential(*decoder_layers)
        self.ln = nn.Linear(emb_size, vocab_size)

    def forward(self, x):

        x_token = self.token_emb(x)
        x_positional = self.positional_emb(x.shape[1])
        x_res = x_token + x_positional
        x_drop = self.drop(x_res)
        x_decoder = self.decoder_layers(x_drop)
        x_ln = self.ln(x_decoder)

        return x_ln 
    
    def generate(self, x: torch.Tensor, max_new_tokens: int, do_sample: bool):
        for i in range(max_new_tokens):
            tokens = x[:, -self.max_seq_len:]
            logits = self.forward(tokens)[:, -1, :]
            soft_logits = torch.softmax(logits, dim=1)  
            if do_sample == False:
                logit = torch.argmax(soft_logits, dim=1)
            else:
                logit = torch.multinomial(soft_logits, num_samples=1)    
            arg_logits = torch.reshape(logit, (soft_logits.shape[0], 1)) 
            x = torch.cat([x, arg_logits], dim=1)

        return x

In [13]:
class GPT(nn.Module):
    def __init__(self, vocab_size: int, max_seq_len: int, emb_size: int, num_heads: int, head_size: int, num_layers: int, dropout: float=0.1, device: str='cpu'):
        super().__init__()

        self.vocab_size = vocab_size
        self.max_seq_len = max_seq_len
        self.emb_size = emb_size
        self.num_heads = num_heads
        self.head_size = head_size
        self.device = device
        self.num_layers = num_layers


        self.token_emb = TokenEmbeddings(vocab_size, emb_size)
        self.positional_emb = PositionalEmbeddings(max_seq_len, emb_size)
        self.drop = nn.Dropout(p=dropout)
        decoder_layers = []
        for _ in range(self.num_layers):
            decoder_layer = Decoder(num_heads, emb_size, head_size, max_seq_len, dropout)
            decoder_layers.append(decoder_layer)
        
        self.decoder_layers = nn.Sequential(*decoder_layers)
        self.ln = nn.Linear(emb_size, vocab_size)

    def forward(self, x):

        x_token = self.token_emb(x)
        x_positional = self.positional_emb(x.shape[1])
        x_res = x_token + x_positional
        x_drop = self.drop(x_res)
        x_decoder = self.decoder_layers(x_drop)
        x_ln = self.ln(x_decoder)

        return x_ln 
    
    def generate(self, x: torch.Tensor, max_new_tokens: int, do_sample: bool, temperature: float=1.0):
        for i in range(max_new_tokens):
            tokens = x[:, -self.max_seq_len:]
            logits = self.forward(tokens)[:, -1, :]/temperature
            soft_logits = torch.softmax(logits, dim=1)  
            if do_sample == False:
                logit = torch.argmax(soft_logits, dim=1)
            else:
                logit = torch.multinomial(soft_logits, num_samples=1)    
            arg_logits = torch.reshape(logit, (soft_logits.shape[0], 1)) 
            x = torch.cat([x, arg_logits], dim=1)

        return x

In [14]:
class GPT(nn.Module):
    def __init__(self, vocab_size: int, max_seq_len: int, emb_size: int, num_heads: int, head_size: int, num_layers: int, dropout: float=0.1, device: str='cpu'):
        super().__init__()

        self.vocab_size = vocab_size
        self.max_seq_len = max_seq_len
        self.emb_size = emb_size
        self.num_heads = num_heads
        self.head_size = head_size
        self.device = device
        self.num_layers = num_layers


        self.token_emb = TokenEmbeddings(vocab_size, emb_size)
        self.positional_emb = PositionalEmbeddings(max_seq_len, emb_size)
        self.drop = nn.Dropout(p=dropout)
        decoder_layers = []
        for _ in range(self.num_layers):
            decoder_layer = Decoder(num_heads, emb_size, head_size, max_seq_len, dropout)
            decoder_layers.append(decoder_layer)
        
        self.decoder_layers = nn.Sequential(*decoder_layers)
        self.ln = nn.Linear(emb_size, vocab_size)

    def forward(self, x):

        x_token = self.token_emb(x)
        x_positional = self.positional_emb(x.shape[1])
        x_res = x_token + x_positional
        x_drop = self.drop(x_res)
        x_decoder = self.decoder_layers(x_drop)
        x_ln = self.ln(x_decoder)

        return x_ln 
    
    def generate(self, x: torch.Tensor, max_new_tokens: int, do_sample: bool, temperature: float=1.0, top_k: int=None, top_p: float=None):
        for i in range(max_new_tokens):
            tokens = x[:, -self.max_seq_len:]
            logits = self.forward(tokens)/temperature

            if top_k is not None and do_sample == True:
                logits_last = logits[:, -1, :]
                top_k_values, top_k_indices = torch.sort(logits_last, dim=1, descending=True)
                min_value = top_k_values[:, top_k-1].unsqueeze(1)
                logits_last[logits_last < min_value] = -float('inf')
            if top_p is not None and do_sample == True:
                logits_last = logits[:, -1, :]
                probs = torch.softmax(logits_last, dim=1)
                sorted_probs, sorted_indices = torch.sort(probs, dim=1, descending=True)
                max_indices = sorted_indices[:, 0].unsqueeze(1)
                arg_indices = torch.argsort(sorted_indices, dim=1)
                cumsum = torch.cumsum(sorted_probs, dim=1)
                cumsum_probs = torch.gather(cumsum, dim=1, index=arg_indices)
                mask = cumsum_probs > top_p
                mask.scatter_(dim=1, index=max_indices, value=False)
                logits_last[mask] = -float('inf')

            probs = torch.softmax(logits[:, -1, :], dim=1)
            if do_sample == True:
                arg_log = torch.multinomial(probs, 1)
            else:
                arg_log  = torch.argmax(probs, dim=1)    
            arg_logits = torch.reshape(arg_log, (probs.shape[0], 1)) 
            x = torch.cat([x, arg_logits], dim=1)

        return x
    
    def save(self, path):
        torch.save({
            'model_state_dict': self.state_dict(),
            'vocab_size': self.vocab_size,
            'max_seq_len': self.max_seq_len,
            'emb_size': self.emb_size,
            'num_heads': self.num_heads,
            'head_size': self.head_size,
            'num_layers': self.num_layers
        }, path)
    
    @classmethod
    def load(cls, path, device):
        checkpoint = torch.load(path, map_location=device)
        model = cls(
            vocab_size=checkpoint['vocab_size'],
            max_seq_len=checkpoint['max_seq_len'],
            emb_size=checkpoint['emb_size'],
            num_heads=checkpoint['num_heads'],
            head_size=checkpoint['head_size'],
            num_layers=checkpoint['num_layers']
        )
        model.load_state_dict(checkpoint['model_state_dict'])
        model.to(device)
        return model

In [27]:
vocab_size =2 
num_layers =1

In [28]:
gpt = GPT(
    vocab_size=vocab_size,
    max_seq_len=max_seq_len,
    emb_size=emb_size,
    num_heads=num_heads,
    head_size=head_size,
    num_layers=num_layers
)


gpt .save("gpt_model.pth")

In [39]:
device='cpu'
save_path = '/Users/admin/Desktop/LLM/gpt_model.pth'

In [40]:
gpt = GPT.load(save_path, device=device)

  checkpoint = torch.load(path, map_location=device)


In [17]:
with open('/Users/admin/Desktop/LLM/test/test.txt', 'r') as file:
    text = file.read()

In [18]:
len(text)

891

In [19]:
bpe = BPE(vocab_size=1000)
bpe.fit(text)

In [20]:
text

'У лукоморья дуб зелёный;\nЗлатая цепь на дубе том:\nИ днём и ночью кот учёный\nВсё ходит по цепи кругом;\nИдёт направо — песнь заводит,\nНалево — сказку говорит.\nТам чудеса: там леший бродит,\nРусалка на ветвях сидит;\nТам на неведомых дорожках\nСледы невиданных зверей;\nИзбушка там на курьих ножках\nСтоит без окон, без дверей;\nТам лес и дол видений полны;\nТам о заре прихлынут волны\nНа брег песчаный и пустой,\nИ тридцать витязей прекрасных\nЧредой из вод выходят ясных,\nИ с ними дядька их морской;\nТам королевич мимоходом\nПленяет грозного царя;\nТам в облаках перед народом\nЧерез леса, через моря\nКолдун несёт богатыря;\nВ темнице там царевна тужит,\nА бурый волк ей верно служит;\nТам ступа с Бабою Ягой\nИдёт, бредёт сама собой,\nТам царь Кащей над златом чахнет;\nТам русский дух… там Русью пахнет!\nИ там я был, и мёд я пил;\nУ моря видел дуб зелёный;\nПод ним сидел, и кот учёный\nСвои мне сказки говорил.'

In [21]:
print(bpe.id2token)
print(bpe.token2id)
print(bpe.encode(text))
print(bpe.decode(bpe.encode(text)))

{0: '\n', 1: ' ', 2: '!', 3: ',', 4: '.', 5: ':', 6: ';', 7: 'А', 8: 'Б', 9: 'В', 10: 'З', 11: 'И', 12: 'К', 13: 'Н', 14: 'П', 15: 'Р', 16: 'С', 17: 'Т', 18: 'У', 19: 'Ч', 20: 'Я', 21: 'а', 22: 'б', 23: 'в', 24: 'г', 25: 'д', 26: 'е', 27: 'ж', 28: 'з', 29: 'и', 30: 'й', 31: 'к', 32: 'л', 33: 'м', 34: 'н', 35: 'о', 36: 'п', 37: 'р', 38: 'с', 39: 'т', 40: 'у', 41: 'х', 42: 'ц', 43: 'ч', 44: 'ш', 45: 'щ', 46: 'ы', 47: 'ь', 48: 'ю', 49: 'я', 50: 'ё', 51: '—', 52: '…', 53: 'м ', 54: 'ам ', 55: ';\n', 56: ' н', 57: 'ре', 58: 'ны', 59: ' п', 60: ' д', 61: ' т', 62: 'ит', 63: 'во', 64: 'Там ', 65: ' с', 66: 'ка', 67: ' б', 68: 'ле', 69: ';\nТам ', 70: 'ко', 71: ' з', 72: 'од', 73: ',\n', 74: ' в', 75: 'ид', 76: ' и', 77: ' м', 78: 'ор', 79: 'ный', 80: ' на', 81: 'ом', 82: 'т ', 83: ' там ', 84: 'ере', 85: ' ду', 86: 'ел', 87: 'ёный', 88: 'ат', 89: 'ёт', 90: 'ес', 91: 'го', 92: 'ус', 93: ' дуб', 94: ' ц', 95: '\nИ', 96: 'ход', 97: 'дёт', 98: 'ит,\n', 99: 'ий', 100: 'ет', 101: 'на', 102: 'ках', 

In [22]:
import glob

all_text = []
for file_path in glob.glob('/Users/admin/Desktop/LLM/RussianNovels-master/corpus/*.*'):
    file = open(file_path, 'r', encoding='utf8')
    all_text.append(file.read())
    
all_text = '\n\n\n'.join(all_text)

In [23]:
len(all_text)

47450

In [24]:
bpe = BPE(vocab_size=2000)
bpe.fit(all_text)

In [25]:
print(len(bpe.id2token))
print(len(bpe.token2id))
print(len(bpe.encode(all_text)))
print(len(bpe.decode(bpe.encode(all_text))))

2000
2000
16618
47450


In [26]:
token_ids = bpe.encode(all_text)

In [27]:
from torch.utils.data import Dataset
class GetData(Dataset):
    def __init__(self, data: list, seq_len: int, device: str='cpu'):
        self.data = data
        self.seq_len = seq_len
        self.device = device

    def __len__(self):
        return len(self.data) - self.seq_len - 1
    
    def __getitem__(self, idx: int):
        x = self.data[idx: idx + self.seq_len]
        y = self.data[idx + 1: idx + self.seq_len + 1]

        x_tensor = torch.tensor(x, dtype=torch.long, device=self.device)
        y_tensor = torch.tensor(y, dtype=torch.long, device=self.device)

        return x_tensor, y_tensor
        

In [33]:
from torch.utils.data import DataLoader
import torch.optim as optim
from tqdm import tqdm

class GPT(nn.Module):
    def __init__(self, vocab_size: int, max_seq_len: int, emb_size: int, num_heads: int, head_size: int, num_layers: int, dropout: float=0.1, device: str='cpu'):
        super().__init__()

        self.vocab_size = vocab_size
        self.max_seq_len = max_seq_len
        self.emb_size = emb_size
        self.num_heads = num_heads
        self.head_size = head_size
        self.device = device
        self.num_layers = num_layers


        self.token_emb = TokenEmbeddings(vocab_size, emb_size)
        self.positional_emb = PositionalEmbeddings(max_seq_len, emb_size)
        self.drop = nn.Dropout(p=dropout)
        # decoder_layers = []
        # for _ in range(self.num_layers):
        #     decoder_layer = Decoder(num_heads, emb_size, head_size, max_seq_len, dropout)
        #     decoder_layers.append(decoder_layer)
        
        self.decoder_layers = nn.Sequential(
            *[Decoder(num_heads, emb_size, head_size, max_seq_len, dropout) for _ in range(num_layers)]
            )
        self.ln = nn.Linear(emb_size, vocab_size)

    def forward(self, x):

        x_token = self.token_emb(x)
        x_positional = self.positional_emb(x.shape[1])
        x_res = x_token + x_positional
        x_drop = self.drop(x_res)
        x_decoder = self.decoder_layers(x_drop)
        x_ln = self.ln(x_decoder)

        return x_ln 
    
    def generate(self, x: torch.Tensor, max_new_tokens: int, do_sample: bool, temperature: float=1.0, top_k: int=None, top_p: float=None):
        for i in range(max_new_tokens):
            tokens = x[:, -self.max_seq_len:]
            logits = self.forward(tokens)/temperature

            if top_k is not None and do_sample == True:
                logits_last = logits[:, -1, :]
                top_k_values, top_k_indices = torch.sort(logits_last, dim=1, descending=True)
                min_value = top_k_values[:, top_k-1].unsqueeze(1)
                logits_last[logits_last < min_value] = -float('inf')
            if top_p is not None and do_sample == True:
                logits_last = logits[:, -1, :]
                probs = torch.softmax(logits_last, dim=1)
                sorted_probs, sorted_indices = torch.sort(probs, dim=1, descending=True)
                max_indices = sorted_indices[:, 0].unsqueeze(1)
                arg_indices = torch.argsort(sorted_indices, dim=1)
                cumsum = torch.cumsum(sorted_probs, dim=1)
                cumsum_probs = torch.gather(cumsum, dim=1, index=arg_indices)
                mask = cumsum_probs > top_p
                mask.scatter_(dim=1, index=max_indices, value=False)
                logits_last[mask] = -float('inf')

            probs = torch.softmax(logits[:, -1, :], dim=1)
            if do_sample == True:
                arg_log = torch.multinomial(probs, 1)
            else:
                arg_log  = torch.argmax(probs, dim=1)    
            arg_logits = torch.reshape(arg_log, (probs.shape[0], 1)) 
            x = torch.cat([x, arg_logits], dim=1)

        return x
    
    def fit(self, train_loader: DataLoader, valid_loader: DataLoader, num_epoch: int, learning_rate: float):

        self.to(self.device)
        self.optimizer = optim.Adam(self.parameters(), lr=learning_rate)
        cross = nn.CrossEntropyLoss()
        train_loss = []
        val_loss = []

        for i in tqdm(range(num_epoch)):
            self.train()
            for inputs, targets in train_loader:
                results = self.forward(inputs)
                shapes = results.shape
                results = torch.reshape(results, (shapes[0]*shapes[1], shapes[2]))
                shapes = targets.shape
                targets = torch.reshape(targets, (shapes[0]*shapes[1],))
                self.loss= cross(results, targets)
                train_loss.append(self.loss.item())
                self.optimizer.zero_grad()
                self.loss.backward()
                self.optimizer.step()
            train_losses = sum(train_loss)/len(train_loss)
            
            self.eval()
            with torch.no_grad():
                for inputs, targets in valid_loader:
                    results = self.forward(inputs)
                    shapes = results.shape
                    results = torch.reshape(results, (shapes[0]*shapes[1], shapes[2]))
                    shapes = targets.shape
                    targets = torch.reshape(targets, (shapes[0]*shapes[1],))
                    self.loss1= cross(results, targets)
                    val_loss.append(self.loss1.item())
                val_losses = sum(val_loss)/len(val_loss)
            print(f'Epoch {i+1}/{num_epoch}, Train Loss: {train_losses:.4f}, Val Loss: {val_losses:.4f}')

    def save(self, path):
        torch.save({
            'model_state_dict': self.state_dict(),
            'vocab_size': self.vocab_size,
            'max_seq_len': self.max_seq_len,
            'emb_size': self.emb_size,
            'num_heads': self.num_heads,
            'head_size': self.head_size,
            'num_layers': self.num_layers
        }, path)
    
    @classmethod
    def load(cls, path, device):
        checkpoint = torch.load(path, map_location=device)
        model = cls(
            vocab_size=checkpoint['vocab_size'],
            max_seq_len=checkpoint['max_seq_len'],
            emb_size=checkpoint['emb_size'],
            num_heads=checkpoint['num_heads'],
            head_size=checkpoint['head_size'],
            num_layers=checkpoint['num_layers']
        )
        model.load_state_dict(checkpoint['model_state_dict'])
        model.to(device)
        return model

In [31]:
n = int(0.7*len(token_ids)) # 70% train
train_token_ids = token_ids[:n]
valid_token_ids = token_ids[n:]

In [32]:
seq_len = 64
batch_size = 512
vocab_size = 2000
max_seq_len = 256
emb_size = 128
num_heads = 8
head_size = 64
num_layers= 12
dropout= 0.1
num_epochs = 5
learning_rate = 0.1
device = 'cpu'

In [34]:
train_dataset = GetData(data=train_token_ids, seq_len=seq_len, device=device)
train_loader = DataLoader(train_dataset, batch_size=batch_size)

valid_dataset = GetData(data=valid_token_ids, seq_len=seq_len, device=device)
valid_loader = DataLoader(valid_dataset, batch_size=batch_size)

In [35]:
gpt = GPT(
    vocab_size=vocab_size,
    max_seq_len=max_seq_len,
    emb_size=emb_size,
    num_heads=num_heads,
    head_size=head_size,
    num_layers=num_layers
)


gpt.save("gpt_model.pth")

In [36]:
gpt.fit(train_loader=train_loader, valid_loader=valid_loader, num_epoch=5, learning_rate = 0.1)

 20%|██        | 1/5 [18:10<1:12:40, 1090.01s/it]

Epoch 1/5, Train Loss: 10.3399, Val Loss: 7.1190


 40%|████      | 2/5 [33:55<50:14, 1004.86s/it]  

Epoch 2/5, Train Loss: 8.6657, Val Loss: 7.3354


 60%|██████    | 3/5 [49:25<32:21, 970.97s/it] 

Epoch 3/5, Train Loss: 8.1391, Val Loss: 7.2660


 80%|████████  | 4/5 [1:05:00<15:56, 956.47s/it]

Epoch 4/5, Train Loss: 7.8647, Val Loss: 7.4085


100%|██████████| 5/5 [1:20:30<00:00, 966.01s/it]

Epoch 5/5, Train Loss: 7.6778, Val Loss: 7.3659





In [43]:
val = torch.tensor([valid_token_ids[:40]])
val

tensor([[ 692,  295,  525,  571,  956,  487,  651,  121,  239,  707, 1834,  159,
          277,  130,  351,  287,  468,  328,  148,  709,  149,   96,  121,  435,
          491,  134,  311, 1710,  962,  858, 1106,  123,  589,  137,  227,  122,
          837,   98,  570,  412]])

In [58]:
bpe.decode(val[0].tolist())

'составленным ею самою, и, благодаря новейшим романам, это уже пошлое лицо пугало и пленяло её воображен'

In [62]:
a = gpt.generate(val, max_new_tokens=20, do_sample=True, temperature=0.9)
bpe.decode(a[0].tolist())

'составленным ею самою, и, благодаря новейшим романам, это уже пошлое лицо пугало и пленяло её воображеню с мой няожЯ осела ялно. В сеё  водуш дняи три верные ихни'