# Building GPT from scratch.

Building GPT2 is a complex task but I guess I can learn about transformers. For now on, I don't really understand about `attention` and no basics with `RNN`, `LSTM`. It might be a challenging task to understand the transformers of course. But, I have built ViT (Vision Transformer). Why don't I give it a shot?

* References

  * Illustrated-gpt2 by jay alammar : https://jalammar.github.io/illustrated-gpt2/#part-1-got-and-language-modeling
  
  * Here's how you can build and train GPT-2 : https://dev.to/amit_kharel_aae65abe2b111/heres-how-you-can-build-and-train-gpt-2-from-scratch-using-pytorch-345n

I've used jay alammar's blog post to understand the architecture and how gpt2 works and bottom blog post is for dataset and preprocessing.

I want to make GPT from scratch using 140,000 korean article datasets. I've already tokenized using kkma and KR-Bert. GPT needs MaskedMultiHeadSelfAttention, MultiHeadSelfAttention and MultiLayerPerceptron. And, I know how to make 2 of them.


<img src="images/transformers.png" width="1000" height="600">


In [2]:
import torch
import torch.nn
from tqdm.auto import tqdm

from torch import nn
from transformers import AutoTokenizer
print(f"torch version : {torch.__version__}")

torch version : 2.5.1


In [3]:
device = "cuda" if torch.cuda.is_available() else "cpu"
device

'cuda'

In [4]:
# Try to get torchinfo, install it if it doesn't work
try:
    from torchinfo import summary
except:
    print("[INFO] Couldn't find torchinfo... installing it.")
    !pip install -q torchinfo
    from torchinfo import summary

# Try to import the going_modular directory, download it from GitHub if it doesn't work
try:
    from Artificial_Intelligence.pytorch_modules.pytorch_modules import data_setup, engine
    from Artificial_Intelligence.helper_functions import download_data, set_seeds, plot_loss_curves
except:
    # Get the going_modular scripts
    print("[INFO] Couldn't find going_modular or helper_functions scripts... downloading them from GitHub.")
    !git clone https://github.com/DutchVandaline/Artificial_Intelligence.git
    !mv Artificial_Intelligence/pytorch_modules .
    !mv Artificial_Intelligence/helper_functions.py . # get the helper_functions.py script
    !rm -rf pytorch-deep-learning
    from Artificial_Intelligence.pytorch_modules.pytorch_modules import data_setup, engine
    from Artificial_Intelligence.helper_functions import download_data, set_seeds, plot_loss_curves

## Making Transformer Layers
- MSA layer need Mask. Attention Mask is the biggest range and inside, there are `padding mask` and `casual mask`. Mask is for focusing or ignoring the token.
  - **Padding Mask** : Removing the padding of tokenizer.
  - **Casual Mask** : Shown as Upper Triangular Mask. For Auto-Regression Model making it not see the future mask. Example is like following.
  ```
  [[0, -inf, -inf, -inf, -inf],
   [0,    0, -inf, -inf, -inf],
   [0,    0,    0, -inf, -inf],
   [0,    0,    0,    0, -inf],
   [0,    0,    0,    0,    0]]
  ```
I've used both `padding mask` and `casual mask` as `attention mask`.

In [5]:
# Not used on GPT
class MultiHeadSelfAttentionBlock(nn.Module):

  def __init__(self,
               embedding_dim:int=768,
               num_heads:int=12,
               attn_dropout:int=0):

    super().__init__()
    self.layer_norm = nn.LayerNorm(normalized_shape=embedding_dim)

    self.multihead_attn= nn.MultiheadAttention(embed_dim=embedding_dim,
                      num_heads=num_heads,
                      dropout=attn_dropout,
                      batch_first=True)

  def forward(self, x):
    x = self.layer_norm(x)
    attn_output, _ = self.multihead_attn(query=x,
                                    key=x,
                                    value=x,
                                    need_weights=False)

    return attn_output


In [6]:
class MLPBlock(nn.Module):
  def __init__(self,
           embedding_dim:int=768,
           mlp_size:int=3072,
           dropout:float=0.1):
    super().__init__()
    self.layer_norm = nn.LayerNorm(normalized_shape=embedding_dim)
    self.mlp = nn.Sequential(
        nn.Linear(in_features=embedding_dim,
                  out_features=mlp_size),
        nn.GELU(),
        nn.Dropout(p=dropout),
        nn.Linear(in_features=mlp_size,
                  out_features=embedding_dim),
        nn.Dropout(p=dropout))

  def forward(self, x):
    x = self.layer_norm(x)
    x = self.mlp(x)
    return x

In [7]:
import torch
import torch.nn as nn

class MaskedMultiHeadSelfAttentionBlock(nn.Module):
    def __init__(self,
                 embedding_dim: int = 768,
                 num_heads: int = 12,
                 attn_dropout: float = 0.1):
        super().__init__()
        self.layer_norm = nn.LayerNorm(embedding_dim)

        self.multihead_attn = nn.MultiheadAttention(embed_dim=embedding_dim,
                                                   num_heads=num_heads,
                                                   dropout=attn_dropout,
                                                   batch_first=True)

    def forward(self, x, key_padding_mask=None):
        # Normalize input
        x = self.layer_norm(x)

        # Masked Self-Attention
        batch_size, seq_len, _ = x.size()

        # Create causal mask (lower triangular matrix for self-attention)
        casual_mask = torch.tril(torch.ones(seq_len, seq_len, device=x.device))

        # Apply MultiheadAttention
        attn_output, _ = self.multihead_attn(
            query=x,
            key=x,
            value=x,
            attn_mask=casual_mask,  # Causal mask for self-attention
            key_padding_mask=key_padding_mask,  # Padding mask
            need_weights=False
        )
        
        return attn_output


## Making TransformerDecoderBlock
GPT Transformer block doesn't use MSA block. It uses Masked MultiheadSelfAttentionLayer.

In [8]:
class TransformerDecoderBlock(nn.Module):
    def __init__(self,
                 embedding_dim: int = 768,
                 num_heads: int = 12,
                 mlp_size: int = 3072,
                 mlp_dropout: float = 0.1,
                 attn_dropout: float = 0.1):
        super().__init__()

        # Create Masked Self-Attention block (for autoregressive behavior)
        self.masked_msa_block = MaskedMultiHeadSelfAttentionBlock(
            embedding_dim=embedding_dim,
            num_heads=num_heads,
        )

        # Create Feed-Forward block (MLP)
        self.mlp_block = MLPBlock(
            embedding_dim=embedding_dim,
            mlp_size=mlp_size,
            dropout=mlp_dropout
        )

        # Layer normalization for each block
        self.layer_norm1 = nn.LayerNorm(embedding_dim)
        self.layer_norm2 = nn.LayerNorm(embedding_dim)

    def forward(self, x, key_padding_mask=None):
        #print(f"Before self-attention: {x.isnan().any()}")

        attn_output = self.masked_msa_block(x, key_padding_mask)
        x_residual1 = attn_output + x
        #print(f"After self-attention: {x_residual1.isnan().any()}")
        
        # Apply Feed-Forward block (MLP) with residual connection
        mlp_output = self.mlp_block(x_residual1)
        x_residual2 = mlp_output + x_residual1
        #print(f"After feed-forward: {x_residual2.isnan().any()}")
        
        return x_residual2


In [9]:
class GPTDecoder(nn.Module):
    def __init__(self,
                 num_layers: int = 12,
                 embedding_dim: int = 768,
                 num_heads: int = 12,
                 mlp_size: int = 3072,
                 mlp_dropout: float = 0.1,
                 attn_dropout: float = 0.1):
        super().__init__()

        # Stack multiple transformer decoder layers
        self.decoder_layers = nn.ModuleList([
            TransformerDecoderBlock(
                embedding_dim=embedding_dim,
                num_heads=num_heads,
                mlp_size=mlp_size,
                mlp_dropout=mlp_dropout,
                attn_dropout=attn_dropout
            ) for _ in range(num_layers)
        ])


    def forward(self, x, key_padding_mask=None):
        for layer in self.decoder_layers:
            x = layer(x, key_padding_mask)
        return x


In [10]:
class GPT(nn.Module):
    def __init__(self,
                 vocab_size: int,  # Vocabulary size
                 max_seq_len: int = 256,  # Maximum sequence length
                 embedding_dim: int = 768,
                 num_layers: int = 12,
                 num_heads: int = 12,
                 mlp_size: int = 3072,
                 mlp_dropout: float = 0.1,
                 attn_dropout: float = 0.1):
        super().__init__()

        # Token Embedding
        self.token_embedding = nn.Embedding(vocab_size, embedding_dim)

        # Positional Embedding
        self.positional_embedding = nn.Parameter(torch.zeros(1, max_seq_len, embedding_dim))

        # Decoder stack
        self.decoder = GPTDecoder(
            num_layers=num_layers,
            embedding_dim=embedding_dim,
            num_heads=num_heads,
            mlp_size=mlp_size,
            mlp_dropout=mlp_dropout,
            attn_dropout=attn_dropout
        )

        # Output projection to vocab size
        self.output_layer = nn.Linear(embedding_dim, vocab_size)

    def forward(self, input_ids, key_padding_mask=None):
        # Step 1: Embed tokens and add positional embeddings
        x = self.token_embedding(input_ids)  # Shape: [batch_size, seq_len, embedding_dim]
        #print(f"Token Embedding (x) shape: {x.shape}")
        #print(f"x contains NaN after token embedding: {x.isnan().any()}")

        seq_len = input_ids.size(1)
        x = x + self.positional_embedding[:, :seq_len, :]  # Add positional embedding
        #print(f"x after adding positional embedding: {x.shape}")
        #print(f"x contains NaN after positional embedding: {x.isnan().any()}")

        # Step 2: Check for NaN in input (if key_padding_mask is provided)
        if key_padding_mask is not None:
            key_padding_mask = key_padding_mask.to(torch.bool)  # Ensure the mask is of bool type
            #print(f"key_padding_mask contains NaN: {key_padding_mask.isnan().any()}")

        # Step 3: Pass through decoder stack
        x = self.decoder(x, key_padding_mask)
        #print(f"x after decoder: {x.shape}")
        #print(f"x contains NaN after decoder: {x.isnan().any()}")

        # Step 4: Output projection to vocab size
        logits = self.output_layer(x)
        #print(f"logits shape: {logits.shape}")
        #print(f"logits contains NaN: {logits.isnan().any()}")
        
        return logits


## Making Dataloader
Data is based on AI-Hub firstly, made of 20,000 Korean Article. Tokenizer was done by kkma and KR-Bert.

In [11]:
import os
import torch
from torch.utils.data import Dataset, DataLoader

class PreprocessedKoreanDataset(Dataset):
    def __init__(self, data_dir):
        self.data_dir = data_dir
        self.file_list = [f for f in os.listdir(data_dir) if f.endswith('.pt')]

    def __len__(self):
        return len(self.file_list)

    def __getitem__(self, idx):
        file_path = os.path.join(self.data_dir, self.file_list[idx])
        data = torch.load(file_path, weights_only=True)

        input_ids = data['input_ids']
        attention_mask = data['attention_mask']

        # Create key_padding_mask based on attention_mask
        # Key padding mask is 1 for padded tokens, 0 for non-padded tokens
        key_padding_mask = (attention_mask == 0).bool()

        return {
            'input_ids': input_ids,
            'attention_mask': attention_mask,
            'key_padding_mask': key_padding_mask
        }

data_dir = "krbert_korean_pretrain"
batch_size = 16

# Create Dataset and DataLoader
dataset = PreprocessedKoreanDataset(data_dir)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

# Example of iterating over the DataLoader
for batch in dataloader:
    input_ids = batch['input_ids']
    attention_mask = batch['attention_mask']
    key_padding_mask = batch['key_padding_mask']
    print(input_ids.shape, attention_mask.shape, key_padding_mask.shape)


torch.Size([16, 256]) torch.Size([16, 256]) torch.Size([16, 256])
torch.Size([16, 256]) torch.Size([16, 256]) torch.Size([16, 256])
torch.Size([16, 256]) torch.Size([16, 256]) torch.Size([16, 256])
torch.Size([16, 256]) torch.Size([16, 256]) torch.Size([16, 256])
torch.Size([16, 256]) torch.Size([16, 256]) torch.Size([16, 256])
torch.Size([16, 256]) torch.Size([16, 256]) torch.Size([16, 256])
torch.Size([16, 256]) torch.Size([16, 256]) torch.Size([16, 256])
torch.Size([16, 256]) torch.Size([16, 256]) torch.Size([16, 256])
torch.Size([16, 256]) torch.Size([16, 256]) torch.Size([16, 256])
torch.Size([16, 256]) torch.Size([16, 256]) torch.Size([16, 256])
torch.Size([16, 256]) torch.Size([16, 256]) torch.Size([16, 256])
torch.Size([16, 256]) torch.Size([16, 256]) torch.Size([16, 256])
torch.Size([16, 256]) torch.Size([16, 256]) torch.Size([16, 256])
torch.Size([16, 256]) torch.Size([16, 256]) torch.Size([16, 256])
torch.Size([16, 256]) torch.Size([16, 256]) torch.Size([16, 256])
torch.Size

## Train Model

In [13]:
krbert_tokenizer = AutoTokenizer.from_pretrained("snunlp/KR-Medium", do_lower_case=False)

In [14]:
print(f"Input IDs Min: {input_ids.min()}, Max: {input_ids.max()}")
print(f"Padding Token ID: {krbert_tokenizer.pad_token_id}")

print(f"Input IDs Example: {input_ids[1]}")
print(f"Attention Mask Example: {attention_mask[1]}")

Input IDs Min: 0, Max: 19994
Padding Token ID: 0
Input IDs Example: tensor([ 5362,  5296,  9752, 10466,  5088,  3320,  3758, 11981,  3258,  5101,
         8535,  8924, 19538,  8455,  2436, 13572,  5029, 14248,  5412,  4492,
        10427,    18,  3385,  5262,  5767, 10829,  3520, 10427,    18,  8600,
        12770,  9076, 14432, 10829, 11103, 11497, 10383,  8867,  8902, 17106,
        18959, 18897,  3745, 16443,  8936,  9982,  5057, 10724, 16387,  5010,
        10022, 11600,  5051, 13952,  5105,  5730,  5008,  2401,  5171, 11750,
         5010,  9027, 12424,  2953, 11505, 12431,  9337, 18301,  5033,  8655,
         3382, 13787, 10781,  8903,  5088,  3320,  8478,  8488, 13952,  9025,
         8452,  9191,  9897,  3382,  5752,  9027,  8452,  4494,  1961, 10362,
        10444,  5010, 11794, 12962, 13518, 10967,  2956,  5362,  5296,  3612,
         8854,  8538,    18,  3385,  5262,  5767, 10829,  9190, 11636, 11505,
         5899,  8527,  5067,  8494,  1969, 16259, 10388, 12620,  9040,  87

In [15]:
batch_size = 16
vocab_size = krbert_tokenizer.vocab_size

gpt = GPT(vocab_size=vocab_size).to(device)
loss_fn = nn.CrossEntropyLoss(ignore_index=krbert_tokenizer.pad_token_id)
optimizer = torch.optim.Adam(gpt.parameters(), lr=1e-5,weight_decay=0.01)

In [16]:
def train_step(model, dataloader, loss_fn, optimizer, device):
    model.train()
    train_loss = 0.0
    correct_preds = 0
    total_preds = 0

    for batch in dataloader:
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)

        logits = model(input_ids, key_padding_mask=attention_mask)

        logits = logits.view(-1, logits.size(-1))
        labels = input_ids.view(-1).to(device)  # 예측할 레이블은 input_ids 자체

        loss = loss_fn(logits, labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        train_loss += loss.item()

        _, predicted = torch.max(logits, dim=-1)
        correct_preds += (predicted == labels).sum().item()
        total_preds += labels.size(0)

    # 평균 손실과 정확도 계산
    avg_loss = train_loss / len(dataloader)
    accuracy = correct_preds / total_preds
    return avg_loss, accuracy

In [None]:
epochs = 20

for epoch in tqdm(range(epochs)):
    train_loss, train_acc = train_step(
        model=gpt,
        dataloader=dataloader,
        loss_fn=loss_fn,
        optimizer=optimizer,
        device=device
    )
    print(f"Epoch {epoch + 1} - Loss: {train_loss:.4f}, Accuracy: {train_acc:.4f}")


  0%|          | 0/20 [00:00<?, ?it/s]



## Generate?

In [None]:
from Artificial_Intelligence.pytorch_modules.pytorch_modules import utils

utils.save_model(model=gpt,
                 target_dir="C:/junha/GPT_from_Scratch/models",
                 model_name="GPTScratch_40K.pth")

In [None]:
tokenizer = AutoTokenizer.from_pretrained("snunlp/KR-Medium", do_lower_case=False) 
vocab_size = tokenizer.vocab_size
gptScratch = GPT(vocab_size=vocab_size).to(device)
gptScratch.load_state_dict(torch.load("C:/junha/GPT_from_Scratch/models/GPTScratch_40K.pth"))


def generate_text(prompt, model, max_length=50):
    model.eval()
    input_ids = tokenizer(prompt, return_tensors="pt").input_ids.to(model.device)

    with torch.no_grad(): 
        output = model.generate(input_ids, max_length=max_length, num_return_sequences=1)

    generated_text = tokenizer.decode(output[0], skip_special_tokens=True)
    return generated_text

prompt = "오늘 날씨가 너무 좋다"
generated_text = generate_text(prompt, gptScratch)
print(generated_text)
