# Importing required libraries.

In [None]:
%%capture
!pip install tiktoken

In [None]:
import math
from dataclasses import dataclass
import torch
import torch.nn as nn
from torch.nn import functional as F
import os
import time
from contextlib import nullcontext
import numpy as np
import time
import tiktoken
import pandas as pd

# The Core of GPT-2 🤖

##### Layer Norm (Normalization Layer)

In [None]:
class LayerNorm(nn.Module):

    def __init__(self, ndim, bias):
        super().__init__()
        self.weight = nn.Parameter(torch.ones(ndim))
        self.bias = nn.Parameter(torch.zeros(ndim)) if bias else None

    def forward(self, input):
        return F.layer_norm(input, self.weight.shape, self.weight, self.bias, 1e-5)

##### Causal Self Attention

In [None]:
class CausalSelfAttention(nn.Module):

    def __init__(self, config):
        super().__init__()
        assert config.n_embd % config.n_head == 0 ; "n_embd % n_head should be 0."

        self.mh_atten_ln = nn.Linear(config.n_embd, 3 * config.n_embd, bias=config.bias)

        self.proj_ln = nn.Linear(config.n_embd, config.n_embd, bias=config.bias)

        self.atten_drop = nn.Dropout(config.drop_rate)
        self.res_drop = nn.Dropout(config.drop_rate)
        self.n_head = config.n_head
        self.n_embd = config.n_embd
        self.drop_rate = config.drop_rate
        self.flash = hasattr(torch.nn.functional, 'scaled_dot_product_attention')
        if not self.flash:
            print("WARNING: Works only on PyTorch version 2.0 or higher.")
            self.register_buffer("bias", torch.tril(torch.ones(config.block_size, config.block_size))
                                        .view(1, 1, config.block_size, config.block_size))

    def forward(self, x):
        B, T, C = x.size()
        q, k, v  = self.mh_atten_ln(x).split(self.n_embd, dim=2)
        k = k.view(B, T, self.n_head, C // self.n_head).transpose(1, 2)
        q = q.view(B, T, self.n_head, C // self.n_head).transpose(1, 2)
        v = v.view(B, T, self.n_head, C // self.n_head).transpose(1, 2)

        if self.flash:
            y = torch.nn.functional.scaled_dot_product_attention(q, k, v, attn_mask=None, dropout_p=self.drop_rate if self.training else 0, is_causal=True)
        else:
            att = (q @ k.transpose(-2, -1)) * (1.0 / math.sqrt(k.size(-1)))
            att = att.masked_fill(self.bias[:,:,:T,:T] == 0, float('-inf'))
            att = F.softmax(att, dim=-1)
            att = self.atten_drop(att)
            y = att @ v
        y = y.transpose(1, 2).contiguous().view(B, T, C)

        y = self.res_drop(self.proj_ln(y))
        return y


##### MLP(Multi-Layer Perceptron)

In [None]:
class MLP(nn.Module):

    def __init__(self, config):
        super().__init__()
        self.fc_ln    = nn.Linear(config.n_embd, 4 * config.n_embd, bias=config.bias)
        self.gelu    = nn.GELU()
        self.proj_ln  = nn.Linear(4 * config.n_embd, config.n_embd, bias=config.bias)
        self.dropout = nn.Dropout(config.drop_rate)

    def forward(self, x):
        x = self.gelu(self.fc_ln(x))
        return self.dropout(self.proj_ln(x))

##### Block(communication + computation)

In [None]:
class Block(nn.Module):

    def __init__(self, config):
        super().__init__()
        self.ln_1 = LayerNorm(config.n_embd, bias=config.bias)
        self.attn_net = CausalSelfAttention(config)
        self.ln_2 = LayerNorm(config.n_embd, bias=config.bias)
        self.ff_mlp = MLP(config)

    def forward(self, x):
        x = x + self.attn_net(self.ln_1(x))
        return x + self.ff_mlp(self.ln_2(x))

##### Body of GPT

In [None]:
class GPT(nn.Module):

    def __init__(self, config):
        super().__init__()
        assert config.vocab_size is not None, "vocab_size is required !!"
        assert config.block_size is not None, "block_size is required !!"

        self.config = config

        self.transformer = nn.ModuleDict(dict(
            tok_embedding = nn.Embedding(config.vocab_size, config.n_embd),
            pos_embedding = nn.Embedding(config.block_size, config.n_embd),
            dropout = nn.Dropout(config.drop_rate),
            MHS_Attn_Block = nn.ModuleList([Block(config) for _ in range(config.n_layer)]),
            ln_f = LayerNorm(config.n_embd, bias=config.bias),
        ))

        self.lm_head = nn.Linear(config.n_embd, config.vocab_size, bias=False)
        self.transformer.tok_embedding.weight = self.lm_head.weight
        self.apply(self._init_weights)

        for param_name, param in self.named_parameters():
            if param_name.endswith('proj_ln.weight'):
                torch.nn.init.normal_(param, mean=0.0, std=0.02/math.sqrt(2 * config.n_layer))

        print("number of parameters: %.2fM" % (self.get_num_params()/1e6,))

    def get_num_params(self, non_embedding=True):
        n_params = sum(p.numel() for p in self.parameters())
        if non_embedding:
            n_params -= self.transformer.pos_embedding.weight.numel()
        return n_params

    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)

    def forward(self, idx, targets=None):
        device = idx.device
        B, T = idx.size()

        assert T <= self.config.block_size, f"Max sequence length is {self.config.block_size}"

        pos = torch.arange(0, T, dtype=torch.long, device=device)
        tok_emb = self.transformer.tok_embedding(idx)
        pos_emb = self.transformer.pos_embedding(pos)
        x = self.transformer.dropout(tok_emb + pos_emb)
        for block in self.transformer.MHS_Attn_Block:
            x = block(x)
        x = self.transformer.ln_f(x)

        if targets is not None:
            logits = self.lm_head(x)
            loss = F.cross_entropy(logits.view(-1, logits.size(-1)), targets.view(-1), ignore_index=-1)
        else:
            logits = self.lm_head(x[:, [-1], :])
            loss = None

        return logits, loss

    def configure_optimizers(self, weight_decay, learning_rate, betas, device_type):
        param_dict = {pn: p for pn, p in self.named_parameters()}
        param_dict = {pn: p for pn, p in param_dict.items() if p.requires_grad}
        decay_params = [p for n, p in param_dict.items() if p.dim() >= 2]
        nodecay_params = [p for n, p in param_dict.items() if p.dim() < 2]

        optim_groups = [
            {'params': decay_params, 'weight_decay': weight_decay},
            {'params': nodecay_params, 'weight_decay': 0.0}
        ]

        num_decay_params = sum(p.numel() for p in decay_params)
        num_nodecay_params = sum(p.numel() for p in nodecay_params)

        print(f"num decayed parameter tensors: {len(decay_params)}, with {num_decay_params:,} parameters")
        print(f"num non-decayed parameter tensors: {len(nodecay_params)}, with {num_nodecay_params:,} parameters")

        fused_available = 'fused' in inspect.signature(torch.optim.AdamW).parameters
        use_fused = fused_available and device_type == 'cuda'
        extra_args = dict(fused=True) if use_fused else dict()
        optimizer = torch.optim.AdamW(optim_groups, lr=learning_rate, betas=betas, **extra_args)
        print(f"using fused AdamW: {use_fused}")

        return optimizer

    @torch.no_grad()
    def generate(self, idx, max_new_tokens, temperature=1.0, top_k=None):
        for _ in range(max_new_tokens):
            idx_cond = idx if idx.size(1) <= self.config.block_size else idx[:, -self.config.block_size:]
            logits, _ = self(idx_cond)
            logits = logits[:, -1, :] / temperature
            if top_k is not None:
                v, _ = torch.topk(logits, min(top_k, logits.size(-1)))
                logits[logits < v[:, [-1]]] = -float('Inf')
            probs = F.softmax(logits, dim=-1)
            idx_next = torch.multinomial(probs, num_samples=1)
            idx = torch.cat((idx, idx_next), dim=1)

        return idx

# Inference

> GPT-2 Configurations

In [None]:
@dataclass
class GPTConfig:
    block_size: int = 1024
    vocab_size: int = 50304
    n_layer: int = 12
    n_head: int = 12
    n_embd: int = 768
    drop_rate: float = 0.0
    bias: bool = True

In [None]:
checkpts_dir = 'path to your checkpoints'
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

ckpt_path = os.path.join(checkpts_dir, 'ckpt.pt')
checkpoint = torch.load(ckpt_path, map_location=device)
gptconf = GPTConfig(**checkpoint['model_args'])
model = GPT(gptconf)
state_dict = checkpoint['model']
unwanted_prefix = '_orig_mod.'
for k,v in list(state_dict.items()):
    if k.startswith(unwanted_prefix):
        state_dict[k[len(unwanted_prefix):]] = state_dict.pop(k)
model.load_state_dict(state_dict)

# compile model
model.eval()
model.to(device)
if compile:
    model = torch.compile(model)

# tokenizer
tokenizer = tiktoken.get_encoding("r50k_base") 
encode = lambda s: tokenizer.encode(s, allowed_special={"<|endoftext|>"})
decode = lambda l: tokenizer.decode(l)


# inference
def generate(prompt, num_samples=1, max_tokens=50, temp=1, top_k=100):
    start_ids = encode(prompt)
    x = (torch.tensor(start_ids, dtype=torch.long, device=device)[None, ...])

    # generate
    for k in range(num_samples):
        print(f'sample num:[{k}]')
        y = model.generate(x, max_tokens, temperature=temp, top_k=top_k)
        print(decode(y[0].tolist()))

number of parameters: 123.59M


In [None]:
num_samples = 10
max_tokens = 100
temp = 0.80
top_k = 100

prompt = "3D Smart factory is a wonderful company because"

generate(prompt, num_samples, max_tokens, temp, top_k)

sample num:[0]
3D Smart factory is a wonderful company because it’s so well made and doesn io9070101 as an easy to use device. By using e070111 a smart camera can be used on the smart phone and a smart phone. Smartphones come from devices which can be used on Android phones, tablets, screens, etc. I could end up with smart phones and can easily run smart home devices on smartphones. Smartphones come from devices which don’t hold any information about the devices themselves but can be used for
sample num:[1]
3D Smart factory is a wonderful company because we're not the first one in manufacturing. I know my wife and I use these things to make products, but I can't, because they're just not the products that we use. So I use these designs on my wedding ring, because I love and I love them. So I'm not going to give up on any for sale! Anyways, if you have a brand name you're going to have a name called "Formula" in your name, which is you're going to
sample num:[2]
3D Smart factory is a wo