In [4]:
import os
import time
import json
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from typing import Dict, Union, Tuple
from collections import defaultdict

In [5]:
def return_dataset(
        data_path: int, 
        bpe,
        split: float, 
        block_size: int
    ) -> Tuple[torch.utils.data.Dataset, torch.utils.data.Dataset]:

    with open(data_path, 'r', encoding='utf-8') as f:
        text = f.read()
    dataset_len = len(text)
    train_size = int(dataset_len * split)

    train_text = text[:train_size]
    test_text = text[train_size:]
    
    train_set = OPDataset(train_text, bpe, block_size, train=True)
    test_set = OPDataset(test_text, bpe, block_size, train=False)
    return train_set, test_set


In [6]:
class TransformerBlock(nn.Module):
    def __init__(
            self, 
            num_heads: int, 
            n_embed: int, 
            block_size: int
        ):
        super(TransformerBlock, self).__init__()
        hidden_dim = n_embed // num_heads
        self.mhsa = MultiHeadSelfAttention(num_heads, hidden_dim, n_embed, block_size)
        self.feed_forward = FeedForward(n_embed)
        self.norm1 = nn.LayerNorm(n_embed)
        self.norm2 = nn.LayerNorm(n_embed)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = x + self.mhsa(self.norm1(x))
        x = x + self.feed_forward(self.norm2(x))
        return x


class FeedForward(nn.Module):
    def __init__(
            self, 
            n_embed: int, 
            extend_width: int=4, 
            dropout: float=0.2
        ):
        super(FeedForward, self).__init__()
        self.layer = nn.Sequential(
            nn.Linear(n_embed, extend_width*n_embed), 
            nn.ReLU(),
            nn.Linear(extend_width*n_embed, n_embed), 
            nn.Dropout(dropout)
        )
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return self.layer(x)


class MultiHeadSelfAttention(nn.Module):
    def __init__(
            self, 
            num_heads: int, 
            hidden_dim: int, 
            n_embed: int, 
            block_size: int, 
            dropout: float=0.2
        ):
        super(MultiHeadSelfAttention, self).__init__()
        self.num_heads = num_heads
        self.heads = nn.ModuleList([SingleHead(hidden_dim, n_embed, block_size) for _ in range(self.num_heads)])
        self.project = nn.Linear(n_embed, n_embed)
        self.drop = nn.Dropout(dropout)
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        out = torch.cat([sh(x) for sh in self.heads], dim=-1)
        out = self.project(out)
        out = self.drop(out)
        return out


class SingleHead(nn.Module):
    def __init__(
            self, 
            hidden_dim: int, 
            n_embed: int, 
            block_size: int, 
            dropout: float=0.2
        ):
        super(SingleHead, self).__init__()
        self.key = nn.Linear(n_embed, hidden_dim, bias=False)
        self.query = nn.Linear(n_embed, hidden_dim, bias=False)
        self.value = nn.Linear(n_embed, hidden_dim, bias=False)
        self.drop = nn.Dropout(dropout)
        self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        B, T, C = x.shape
        k = self.key(x)
        q = self.query(x)
        weights = q @ k.transpose(-2, -1) * C**(-0.5)
        masked_weights = weights.masked_fill(self.tril[:T, :T] == 0, float("-inf"))
        masked_probs = F.softmax(masked_weights, dim=-1)
        masked_probs = self.drop(masked_probs)
        v = self.value(x)
        out = masked_probs @ v
        return out


class GPT(nn.Module):
    def __init__(
            self, 
            vocab_size: int, 
            block_size: int, 
            n_embed: int, 
            num_heads: int, 
            n_layers: int
        ):
        super(GPT, self).__init__()
        self.vocab_size = vocab_size
        self.block_size = block_size
        self.embedding = nn.Embedding(vocab_size, n_embed)
        self.positional_embedding_table = nn.Embedding(block_size, n_embed)
        self.blocks = nn.Sequential(
            *[TransformerBlock(num_heads, n_embed, block_size) for _ in range(n_layers)],
        )
        self.norm = nn.LayerNorm(n_embed)        
        self.fc = nn.Linear(n_embed, vocab_size)
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        B, T = x.shape
        token_embeddings = self.embedding(x) # B, T -> B, T, N_EMB
        positional_embedding = self.positional_embedding_table(torch.arange(T, device=x.device)) # T -> T, C
        token_embeddings = token_embeddings + positional_embedding # B, T, C + T, C -> B, T, C
        blocks_out = self.blocks(token_embeddings)
        blocks_out = self.norm(blocks_out)
        logits = self.fc(blocks_out) # B, T, N_EMB -> B, T, C
        logits = logits.reshape(B*T, self.vocab_size)
        return logits

    def generate(self, idx: torch.Tensor, max_tokens: int) -> torch.Tensor:
        t = idx.shape[1]
        for _ in range(max_tokens):
            idx_cond = idx[:, -self.block_size:]
            logits = self.forward(idx_cond)
            logits = logits.reshape(1, t, -1)
            logits = logits[:, -1, :]
            probs = F.softmax(logits, dim=1)
            idx_next = torch.multinomial(probs, num_samples=1)
            idx = torch.cat((idx, idx_next), dim=1)
            if t < self.block_size:
                t += 1
        return idx

In [7]:
def train_one_epoch(
        train_loader: torch.utils.data.DataLoader, 
        model: torch.nn.Module, 
        criterion: torch.nn.Module, 
        optimizer: torch.optim.Optimizer,
        scheduler: torch.optim.lr_scheduler,
        device: str
    ) -> Dict[str, Union[torch.tensor, float]]:
 
    start = time.time()
    model.train()
    losses = torch.zeros(len(train_loader))
    for i, sample in enumerate(train_loader):
        X = sample["X"].to(device)
        y = sample["y"].to(device)
        text = sample["text"]
        logits = model(X)
        loss = criterion(logits, y.view(-1,))
        losses[i] = loss.item()
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    scheduler.step()
    time_elapsed = time.time() - start
    train_info = {"loss": torch.mean(losses), "time": time_elapsed}
    return train_info

In [8]:
def test_one_epoch(
        test_loader: torch.utils.data.DataLoader, 
        model: torch.nn.Module, 
        criterion: torch.nn.Module, 
        device: str
    ) -> Dict[str, Union[torch.tensor, float]]:

    start = time.time()
    model.eval()
    losses = torch.zeros(len(test_loader))
    with torch.inference_mode():
        for i, sample in enumerate(test_loader):
            X = sample["X"].to(device)
            y = sample["y"].to(device)
            text = sample["text"]
            logits = model(X)
            loss = criterion(logits, y.view(-1,))
            losses[i] = loss.item()
    time_elapsed = time.time() - start
    test_info = {"loss": torch.mean(losses), "time": time_elapsed}
    return test_info

In [9]:
def generate_text(
        model: torch.nn.Module, 
        device: str, 
        num_tokens: int
    ):

    idx = torch.zeros((1,1), dtype=torch.long).to(device)
    print(train_set.decoder(model.generate(idx, num_tokens)[0].tolist()))

In [10]:
class OPDataset(torch.utils.data.Dataset):
    def __init__(
            self, 
            text: str, 
            bpe,
            block_size: int, 
            train: bool=True
        ):
        super(OPDataset, self).__init__()
        self.text = text

        self.encoder = lambda s: bpe.encode(s)
        self.decoder = lambda l: bpe.decode(l)
        self.data = torch.tensor(self.encoder(self.text), dtype=torch.long)
        self.block_size = block_size
        self.train = train

    def __getitem__(self, index: int) -> Dict[str, Union[torch.Tensor, str]]:
        if self.train:
            idx = torch.randint(len(self.data) - self.block_size, (1,)).item()
        else:
            idx = index
            
        X = self.data[idx:idx+self.block_size]
        y = self.data[idx+1:idx+self.block_size+1]
        text = self.text[idx:idx+self.block_size]
        sample = {"X": X, "y": y, "text": text}
        return sample

    def __len__(self) -> int:
        if self.train:
            return 5000
        return len(self.data) - self.block_size


# BPE

In [15]:
class BytePairEncoder:
    def __init__(self, vocab_size):
        self.vocab_size = vocab_size
        self.vocabulary = None
        self.id2voc = None
        self.merges = []

    def getVocabSize(self):
        return len(self.vocabulary)

    def save(self, file_path):
        data = {
            "vocab_size": self.vocab_size,
            "vocabulary": self.vocabulary,
            "id2voc": self.id2voc,
            "merges": self.merges
        }
        with open(file_path, "w", encoding="utf-8") as f:
            json.dump(data, f, ensure_ascii=False, indent=4)

    def load(self, file_path):
        with open(file_path, "r", encoding="utf-8") as f:
            data = json.load(f)
        
        self.vocab_size = data["vocab_size"]
        self.vocabulary = data["vocabulary"]
        self.id2voc = data["id2voc"]
        self.merges = data.get("merges", [])

    def train(self, corpus):

        if isinstance(corpus, list) and all(isinstance(item, str) for item in corpus):
            buff = ""
            for item in corpus:
                buff += item + " . "
            corpus = buff
            print(corpus)
            del buff
        
        chars = sorted(list(set(corpus)))
        vocabulary = {}
        id2voc = {}
        
        for idx, char in enumerate(chars):
            vocabulary[char] = idx

        for char, idx in vocabulary.items():
            id2voc[idx] = char
        
        token_ids = [vocabulary[char] for char in corpus]
        current_id_len = len(chars)

        while current_id_len < self.vocab_size:
            pairs = self.get_pairs(token_ids)
            
            if not pairs:
                break  # birleştirilecek çift yoksa loop u durdur
                
            best_pair = max(pairs, key=lambda x: pairs[x]) # En yüksek frekanslı çifti al
            a, b = best_pair
            new_token = id2voc[a] + id2voc[b]
            
            vocabulary[new_token] = current_id_len
            id2voc[current_id_len] = new_token
            self.merges.append((best_pair, current_id_len))
            
            token_ids = self.merge_pair(token_ids, best_pair, current_id_len)
            current_id_len += 1

        self.vocabulary = vocabulary
        self.id2voc = id2voc
        return self.vocabulary, self.id2voc

    def get_pairs(self, token_ids):
        pairs = defaultdict(int) # başlangıç valueları 0 olan bir sözlük oluşturur
        for i in range(len(token_ids)-1):
            pair = (token_ids[i], token_ids[i+1])
            pairs[pair] += 1
        return pairs

    def merge_pair(self, token_ids, pair, new_id):
        new_token_ids = []
        i = 0
        while i < len(token_ids):
            if i < len(token_ids)-1 and (token_ids[i], token_ids[i+1]) == pair:
                new_token_ids.append(new_id)
                i += 2
            else:
                new_token_ids.append(token_ids[i])
                i += 1
        return new_token_ids

    def encode(self, text):
        tokens = list(text)
        token_ids = []
        for char in tokens:
            token_ids.append(self.vocabulary.get(char, self.vocabulary.get(' ', 0))) # unk = 0 = ' '
        
        for (pair, new_id) in self.merges:
            token_ids = self.merge_pair(token_ids, pair, new_id)
        
        return token_ids

    def decode(self, token_ids):
        return ''.join([self.id2voc[idx] for idx in token_ids])

In [12]:
pth = "train.txt"
with open(pth, 'r', encoding='utf-8') as f:
    text = f.read()

text = text[:10000]

block_size = 256
vocab_size = 1000
n_embed = 384
num_heads = 6
n_layers = 6

In [16]:
bpe = BytePairEncoder(vocab_size)
bpe.train(text)
print(bpe.getVocabSize())

1000


In [87]:
dataset = OPDataset(text, bpe, block_size, train=False)

In [88]:
print("len", len(dataset))
print("sample", dataset[0]["X"].shape)
print("sample", dataset[0]["y"].shape)
print("sample", len(dataset[0]["text"]))
print("sample\n", dataset[0]["text"][:150])

len 2901
sample torch.Size([256])
sample torch.Size([256])
sample 256
sample
 Benden selam söylen vefasız yare
Gurbet benim olsun sıla kendine
Çekilmedik derdimizi bölüşek
Başlı ben alayım sıla kendine

Dökek derdimizi ölçek böl


In [17]:
model = GPT(bpe.getVocabSize(), block_size, n_embed, num_heads, n_layers)
inp = torch.ones((1,block_size), dtype=torch.long)
out = model(inp)
print(out.shape)

torch.Size([256, 1000])


In [25]:
checkpoint_dir = "./results/"
epochs = 50
block_size = 100
split = 0.9
batch_size = 512
initial_lr = 3e-4
min_lr = 1e-4
evaluate_every = 10
n_embed = 256
num_heads = 4
n_layers = 4
device_id = 0

os.makedirs(checkpoint_dir, exist_ok=True)

In [20]:
train_set, test_set = return_dataset("train.txt", bpe, 0.9, block_size)

In [21]:
train_dataloader = torch.utils.data.DataLoader(train_set, batch_size=batch_size, shuffle=True)
test_dataloader = torch.utils.data.DataLoader(test_set, batch_size=batch_size, shuffle=False)

device = torch.device('cuda:{}'.format(device_id) if torch.cuda.is_available() else 'cpu')
print(device)

cuda:0


In [23]:
model = GPT(bpe.getVocabSize(), block_size, n_embed, num_heads, n_layers)
model = model.to(device)

In [27]:
# Loss and optimizer
criterion = nn.CrossEntropyLoss()
criterion = criterion.to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=initial_lr)
    
# lr scheduler
lambda_func = lambda epoch: max(0.99 ** epoch, min_lr / initial_lr)
scheduler = torch.optim.lr_scheduler.LambdaLR(optimizer, lr_lambda=lambda_func)

# Training loop    
best_val_loss = 1e5
for e in range(epochs):
    train_info = train_one_epoch(train_dataloader, model, criterion, optimizer, scheduler, device)
    print("At epoch: {}, train loss: {:.2f}, in {:.2f} seconds".format(e+1, train_info["loss"], train_info["time"]))
    if (e+1) % evaluate_every == 0:
        test_info = test_one_epoch(test_dataloader, model, criterion, device)
        print("\nAt epoch: {}, test loss: {:.2f}, in {:.2f} seconds\n".format(e+1, test_info["loss"], test_info["time"]))
        # save checkpoint
        if best_val_loss > test_info["loss"]:
            torch.save(model.state_dict(), checkpoint_dir + "model_epoch_{}_loss_{:.2f}.pt".format(e, test_info["loss"]))
            best_val_loss = test_info["loss"]

    # Generate some text
generate_text(model, device, 500)

At epoch: 1, train loss: 4.01, in 3.72 seconds
At epoch: 2, train loss: 3.97, in 3.47 seconds
At epoch: 3, train loss: 3.97, in 3.44 seconds
At epoch: 4, train loss: 3.95, in 3.46 seconds
At epoch: 5, train loss: 3.93, in 3.47 seconds
At epoch: 6, train loss: 3.93, in 3.48 seconds
At epoch: 7, train loss: 3.92, in 3.50 seconds
At epoch: 8, train loss: 3.91, in 3.51 seconds
At epoch: 9, train loss: 3.91, in 3.54 seconds
At epoch: 10, train loss: 3.89, in 3.54 seconds

At epoch: 10, test loss: 4.05, in 11.43 seconds

At epoch: 11, train loss: 3.89, in 3.43 seconds
At epoch: 12, train loss: 3.88, in 3.56 seconds
At epoch: 13, train loss: 3.87, in 3.58 seconds
At epoch: 14, train loss: 3.86, in 3.57 seconds
At epoch: 15, train loss: 3.85, in 3.58 seconds
At epoch: 16, train loss: 3.84, in 3.60 seconds
At epoch: 17, train loss: 3.83, in 3.59 seconds
At epoch: 18, train loss: 3.83, in 3.56 seconds
At epoch: 19, train loss: 3.82, in 3.57 seconds
At epoch: 20, train loss: 3.80, in 3.58 seconds