In [1]:
import torch
import torch.nn as nn
from libs.TokenEmbedding import TokenEmbedding
from libs.CorpusDataset import CorpusDataset
from libs.MHA import MultiHeadAttention
from transformers import AutoTokenizer
import glob
torch.set_default_device('cuda')

In [2]:
tokenizer = AutoTokenizer.from_pretrained("ikit-claw-nlp/toy-llm")
GPT_CONFIG_124M = {
    "vocab_size": tokenizer.vocab_size,
    "pad_idx": tokenizer.convert_tokens_to_ids("<pad>"),
    "context_length": 1024, #max context length
    "emb_dim": 768,
    "n_heads": 12,
    "n_layers": 12,
    "drop_rate": 0.1,
    "qkv_bias": False
}


In [None]:
class LayerNorm(nn.Module):
    def __init__(self, embed_dim, eps=1e-5):
        super(LayerNorm, self).__init__()
        self.eps = eps
        self.scale = nn.Parameter(torch.ones(embed_dim))
        self.shift = nn.Parameter(torch.zeros(embed_dim))
    def forward(self, x):
        # x shape [batch_size, seq_len, model_dim]
        # var shape [batch_size, seq_len, 1]
        var = x.var(dim=-1, keepdim=True, unbiased=False)
        # mean shape [batch_size, seq_len, 1]
        mean = x.mean(dim=-1, keepdim=True)
        # use eps to avoid divided by 0
        # x shape [batch_size, seq_len, model_dim]
        norm_x = (x - mean) / torch.sqrt(var + self.eps)
        self.scale * norm_x + self.shift
        return self.scale * norm_x + self.shift

In [3]:
class GELU(nn.Module):
    def __init__(self):
        super(GELU, self).__init__()
    def forward(self, x):
        return 0.5 * x * (1 + torch.tanh(
            torch.sqrt(
                torch.Tensor([2.0 / torch.pi])).to(x.device) * (x + 0.044715 * torch.pow(x, 3)) 
        )
    )

In [4]:
class FeedForward(nn.Module):
    def __init__(self, cfg):
        super(FeedForward, self).__init__()
        emb_dim = cfg['emb_dim']
        self.layers = nn.Sequential(
            [
                nn.Linear(emb_dim, 4 * emb_dim),
                GELU(),
                nn.linear(4 * emb_dim, emb_dim)
            ]
        )
    def forward(self, x):
        self.layers(x)

In [11]:
class TransformerBlock(nn.Module):
    def __init__(self, cfg):
        super(TransformerBlock, self).__init__()
        assert cfg['emb_dim'] % cfg['n_heads'] == 0, "Embedding Dim must be integer multiple of the n_heads!"
        head_dim = int(cfg['emb_dim'] / cfg['n_heads'])
        self.mha_layer = MultiHeadAttention(cfg['n_heads'], head_dim, cfg['emb_dim'],
                                             cfg['context_length'], cfg['drop_rate'],
                                             use_qkv_bias=cfg['qkv_bias'],use_mask=True)
        self.before_mha_norm = LayerNorm(cfg['emb_dim'])
        self.after_mha_norm = LayerNorm(cfg['emb_dim'])
        self.ff = FeedForward(cfg)
        self.dropout_after_mha = nn.Dropout(cfg['drop_rate'])
        self.dropout_after_ff = nn.Dropout(cfg['drop_rate'])
    def forward(self, x):
        raw_input = x
        x = self.before_mha_norm(x)
        x = self.mha_layer(x, x, x)
        x = self.dropout_after_mha(x)
        # Residual connection.
        x = x + raw_input
        raw_input = x
        x = self.after_mha_norm(x)
        x = self.ff(x)
        x = self.dropout_after_ff(x)
        return x + raw_input


In [None]:
class DummyTransformerBlock(nn.Module):
    def __init__(self, cfg):
        super(DummyTransformerBlock, self).__init__()
    def forward(self, x):
        return x

In [None]:
class DummyGPTModel(nn.Module):
    def __init__(self, cfg):
        super(DummyGPTModel, self).__init__()
        self.token_embeddings = TokenEmbedding(vocab_size=cfg["vocab_size"],
                            pad_idx = cfg["pad_idx"],
                            seq_length=cfg["context_length"],
                            d_model=cfg["emb_dim"],
                            dropout=cfg["drop_rate"]
        )
        self.transformers = nn.Sequential(
            * [DummyTransformerBlock(cfg) for _ in range(cfg["n_layers"])]
        )
        self.final_norm = DummyLayeNorm(cfg["emb_dim"])
        self.out_head = nn.Linear(cfg["emb_dim"], cfg["vocab_size"], bias=False)
    def forward(self, x):
        embedding_x = self.token_embeddings(x)
        return self.out_head(embedding_x)

In [6]:
text_files = glob.glob("data/text/*.txt")
corpus_dataset = CorpusDataset(text_files, GPT_CONFIG_124M["context_length"], 1, tokenizer)
dataloader = torch.utils.data.DataLoader(corpus_dataset, batch_size = 4)

In [7]:
x, y = next(iter(dataloader))

Loading the dataset data/text/article_1-1000.txt into memory...
Converting the dataset to token ids...
Conversion Complete. torch.Size([7540024]) Tokens in the corpus.


LayerNorm unit_testing.

In [8]:
token_embeddings = TokenEmbedding(vocab_size=GPT_CONFIG_124M["vocab_size"],
                            pad_idx = GPT_CONFIG_124M["pad_idx"],
                            seq_length=GPT_CONFIG_124M["context_length"],
                            d_model=GPT_CONFIG_124M["emb_dim"],
                            dropout=GPT_CONFIG_124M["drop_rate"])
x_embedding = token_embeddings(x)

In [12]:
transformer = TransformerBlock(GPT_CONFIG_124M)
trans_x_embed = transformer(x_embedding)

NameError: name 'LayeNorm' is not defined

LayerNorm unit test

In [None]:
layer_norm = LayerNorm(embed_dim=GPT_CONFIG_124M["emb_dim"])
norm_x_embedding = layer_norm(x_embedding)
var_x_embedding = x_embedding.var(dim=-1, keepdim=True)
mean_x_embedding = x_embedding.mean(dim=-1, keepdim=True)
var_norm_x_embedding = norm_x_embedding.var(dim=-1, keepdim=True)
mean_norm_x_embedding = norm_x_embedding.mean(dim=-1, keepdim=True)
print(var_x_embedding[0,0,:], var_norm_x_embedding[0, 0, :])
print(mean_x_embedding[0, 0, :], mean_norm_x_embedding[0, 0, :])

TransformerBlock unit test.

In [None]:
transformer = TransformerBlock(GPT_CONFIG_124M)

GELU impl. unit test.

In [None]:
gelu = GELU()
gelu_x_embedding = gelu(x_embedding)

In [None]:
import matplotlib.pyplot as plt
gelu, relu = GELU(), nn.ReLU()
x = torch.linspace(-3, 3, 100)
y_gelu, y_relu = gelu(x), relu(x)
plt.figure(figsize=(8, 3))
for i, (y, label) in enumerate(zip([y_gelu, y_relu], ["GELU", "ReLU"]), 1):
    plt.subplot(1, 2, i)
    plt.plot(x.cpu(), y.cpu())
    plt.title(f"{label} activation function")
    plt.xlabel("x")
    plt.ylabel(f"{label}(x)")
    plt.grid(True)
plt.tight_layout()
plt.show()