In [1]:
import torch
import torch.nn as nn
import numpy as np
import tiktoken
from gpt_download import download_and_load_gpt2

In [2]:
TPM_CONFIG = {
    "vocab_size" : 50257,
    "context_len" : 512,
    "emb_dim" : 1024,
    "num_heads" : 8,
    "n_layers" : 8,
    "drop_rate" : 0.1,
    "qkv_bias" : False
}

In [3]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_in, d_out, context_len, dropout, num_heads, qkv_bias = False):
        super().__init__()
        assert (d_out % num_heads == 0), "d_out must be divisible by num_heads"

        # s2
        self.d_out = d_out
        self.num_heads = num_heads
        self.head_dim = d_out // num_heads

        # s3
        self.W_q = nn.Linear(d_in, d_out, bias = qkv_bias)
        self.W_k = nn.Linear(d_in, d_out, bias = qkv_bias)
        self.W_v = nn.Linear(d_in, d_out, bias = qkv_bias)
        self.out_proj = nn.Linear(d_out, d_out) # to combine head outputs
        self.dropout = nn.Dropout(dropout) 
        self.register_buffer(
            "mask",
            torch.triu(torch.ones(context_len, context_len), diagonal = 1)
        )

    def forward(self, x):
        b, num_tokens, d_out = x.shape # s1

        # s4
        keys = self.W_k(x)
        queries = self.W_q(x)
        values = self.W_v(x)
        
        # s5
        keys = keys.view(b, num_tokens, self.num_heads, self.head_dim)
        queries = queries.view(b, num_tokens, self.num_heads, self.head_dim)
        values = values.view(b, num_tokens, self.num_heads, self.head_dim)

        # s6
        keys = keys.transpose(1, 2)
        queries = queries.transpose(1, 2)
        values = values.transpose(1, 2)

        # s7
        attention_scores = queries @ keys.transpose(2, 3)

        mask_bool = self.mask.bool()[:num_tokens, :num_tokens]
        attention_scores.masked_fill_(mask_bool, -torch.inf)
        attention_weights = torch.softmax(attention_scores / keys.shape[-1] ** 0.5, dim = -1)
        attention_weights = self.dropout(attention_weights) # s8

        context_vec = (attention_weights @ values).transpose(1, 2) # s9 & s10
        context_vec = context_vec.contiguous().view(b, num_tokens, self.d_out) # s11
        context_vec = self.out_proj(context_vec) # optional
 
        return context_vec

In [4]:
class LayerNorm(nn.Module):
    def __init__(self, emb_dim):
        super().__init__()
        self.eps = 1e-5
        self.scale = nn.Parameter(torch.ones(emb_dim))
        self.shift = nn.Parameter(torch.zeros(emb_dim))

    def forward(self, x):
        mean = x.mean(dim = -1, keepdim = True)
        var = x.var(dim = -1, keepdim = True, unbiased = False) # unbiased so var is divided by n-1
        norm = (x - mean) / (torch.sqrt(var + self.eps)) # epsilon to prevent division by 0
        return self.scale * norm + self.shift # element wise operations - trainable parameters to learn appropriate scaling and shifting of norm values that best suits the data
    
class GELU(nn.Module):
    def __init__(self):
        super().__init__()

    def forward(self, x):
        return 0.5 * x * (1 + torch.tanh(torch.sqrt(torch.tensor(2.0 / torch.pi)) * (x + 0.044715 * torch.pow(x, 3))))
    
class FeedForward(nn.Module):
    def __init__(self, cfg):
        super().__init__()
        self.layers = nn.Sequential(
            nn.Linear(cfg["emb_dim"], 4 * cfg["emb_dim"]), # expansion
            GELU(), # activation
            nn.Linear(4 * cfg["emb_dim"], cfg["emb_dim"]), # contraction
        )
    
    def forward(self, x):
        return self.layers(x)

In [5]:
class TransformerBlock(nn.Module):
    def __init__(self, cfg):
        super().__init__()
        self.attn = MultiHeadAttention( # converts input to context vectors  
            d_in = cfg["emb_dim"],
            d_out = cfg["emb_dim"],
            context_len = cfg["context_len"],
            num_heads = cfg["num_heads"],
            dropout = cfg["drop_rate"],
            qkv_bias = cfg["qkv_bias"]
        )
        self.ff = FeedForward(cfg)
        self.norm1 = LayerNorm(cfg["emb_dim"])
        self.norm2 = LayerNorm(cfg["emb_dim"])
        self.drop_shortcut = nn.Dropout(cfg["drop_rate"])
    
    def forward(self, x):
        # MHA
        shortcut = x
        x = self.norm1(x)
        x = self.attn(x) # shape: [batch size, num tokens, emb size]
        x = self.drop_shortcut(x)
        x = x + shortcut # f(x) + x

        # FCL
        shortcut = x
        x = self.norm2(x)
        x = self.ff(x)
        x = self.drop_shortcut(x)
        x = x + shortcut # f(x) + x

        return x

In [6]:
class TPM(nn.Module): # TPM - TEXT PERSONALIZER MODEL
    def __init__(self, cfg):
        super().__init__()
        self.tok_emb = nn.Embedding(cfg["vocab_size"], cfg["emb_dim"])
        self.pos_emb = nn.Embedding(cfg["context_len"], cfg["emb_dim"])
        self.drop_emb = nn.Dropout(cfg["drop_rate"])
        self.trf_blocks = nn.Sequential(
            *[TransformerBlock(cfg) for _ in range(cfg["n_layers"])]
        )
        self.final_norm = LayerNorm(cfg["emb_dim"])
        self.out_head = nn.Linear(
            cfg["emb_dim"], cfg["vocab_size"], bias = False
        )
        
    def forward(self, in_idx): # input batch
        batch_size, seq_len = in_idx.shape
        tok_embeds = self.tok_emb(in_idx)
        pos_embeds = self.pos_emb(torch.arange(seq_len, device = in_idx.device))
        x = tok_embeds + pos_embeds
        x = self.drop_emb(x)
        x = self.trf_blocks(x)
        x = self.final_norm(x)
        logits = self.out_head(x)
        return logits

In [7]:
NEW_CONFIG = TPM_CONFIG.copy()

model_configs = {
    "gpt2-medium (355M)": {"emb_dim": 1024, "n_layers": 24, "n_heads": 16},
}

CHOOSE_MODEL = "gpt2-medium (355M)"

NEW_CONFIG.update(model_configs[CHOOSE_MODEL])
NEW_CONFIG.update({"qkv_bias" : True, "context_len" : 1024})

model_size = CHOOSE_MODEL.split(" ")[-1].lstrip("(").rstrip(")") # just getting the param count
settings, params = download_and_load_gpt2(
    model_size = model_size,
    models_dir = "gpt2"
)



File already exists and is up-to-date: gpt2\355M\checkpoint




File already exists and is up-to-date: gpt2\355M\encoder.json




File already exists and is up-to-date: gpt2\355M\hparams.json




File already exists and is up-to-date: gpt2\355M\model.ckpt.data-00000-of-00001




File already exists and is up-to-date: gpt2\355M\model.ckpt.index




File already exists and is up-to-date: gpt2\355M\model.ckpt.meta




File already exists and is up-to-date: gpt2\355M\vocab.bpe


In [8]:
def assign(l, r):
    if l.shape != r.shape:
        raise ValueError(f"Shape mismatch. Left: {l.shape}, Right: {r.shape}")
    return torch.nn.Parameter(torch.tensor(r))

In [9]:
def load_weights_into_model(model, params):
    model.pos_emb.weight = assign(model.pos_emb.weight, params['wpe'])
    model.tok_emb.weight = assign(model.tok_emb.weight, params['wte'])

    for b in range(len(params['blocks'])):

        # q, k & v weight matrices
        q_w, k_w, v_w = np.split(
            (params["blocks"][b]["attn"]["c_attn"])["w"], 3, axis = -1 # from downloaded model weights
        )
        model.trf_blocks[b].attn.W_q.weight = assign(
            model.trf_blocks[b].attn.W_q.weight, q_w.T
        )
        model.trf_blocks[b].attn.W_k.weight = assign(
            model.trf_blocks[b].attn.W_k.weight, k_w.T
        )
        model.trf_blocks[b].attn.W_v.weight = assign(
            model.trf_blocks[b].attn.W_v.weight, v_w.T
        )

        # q, k & v bias
        q_b, k_b, v_b = np.split(
            (params["blocks"][b]["attn"]["c_attn"])["b"], 3, axis = -1 # from downloaded model weights
        )
        model.trf_blocks[b].attn.W_q.bias = assign(
            model.trf_blocks[b].attn.W_q.bias, q_b
        )
        model.trf_blocks[b].attn.W_k.bias = assign(
            model.trf_blocks[b].attn.W_k.bias, k_b
        )
        model.trf_blocks[b].attn.W_v.bias = assign(
            model.trf_blocks[b].attn.W_v.bias, v_b
        )

        # output projection weights from attention (fused q, k, v weights & bias)
        model.trf_blocks[b].attn.out_proj.weight = assign(
            model.trf_blocks[b].attn.out_proj.weight,
            params["blocks"][b]["attn"]["c_proj"]["w"].T
        )
        model.trf_blocks[b].attn.out_proj.bias = assign(
            model.trf_blocks[b].attn.out_proj.bias,
            params["blocks"][b]["attn"]["c_proj"]["b"]
        )

        # feed forward (expantsion & contraction)
        model.trf_blocks[b].ff.layers[0].weight = assign(
            model.trf_blocks[b].ff.layers[0].weight,
            params["blocks"][b]["mlp"]["c_fc"]["w"].T
        )
        model.trf_blocks[b].ff.layers[0].bias = assign(
            model.trf_blocks[b].ff.layers[0].bias,
            params["blocks"][b]["mlp"]["c_fc"]["b"]
        )
        model.trf_blocks[b].ff.layers[2].weight = assign(
            model.trf_blocks[b].ff.layers[2].weight,
            params["blocks"][b]["mlp"]["c_proj"]["w"].T
        )
        model.trf_blocks[b].ff.layers[2].bias = assign(
            model.trf_blocks[b].ff.layers[2].bias,
            params["blocks"][b]["mlp"]["c_proj"]["b"]
        )

        # shift & scale of layernorm
        model.trf_blocks[b].norm1.scale = assign(
            model.trf_blocks[b].norm1.scale,
            params["blocks"][b]["ln_1"]["g"]
        )
        model.trf_blocks[b].norm1.shift = assign(
            model.trf_blocks[b].norm1.shift,
            params["blocks"][b]["ln_1"]["b"]
        )
        model.trf_blocks[b].norm2.scale = assign(
            model.trf_blocks[b].norm2.scale,
            params["blocks"][b]["ln_2"]["g"]
        )
        model.trf_blocks[b].norm2.shift = assign(
            model.trf_blocks[b].norm2.shift,
            params["blocks"][b]["ln_2"]["b"]
        )

    model.final_norm.scale = assign(model.final_norm.scale, params["g"])
    model.final_norm.shift = assign(model.final_norm.shift, params["b"])
    model.out_head.weight = assign(model.out_head.weight, params["wte"]) # weight tying

In [10]:
model = TPM(NEW_CONFIG)
load_weights_into_model(model, params)
model.eval()
model.load_state_dict(torch.load("ai-2-personal-fresh.pth"))

<All keys matched successfully>

In [11]:
def text_to_token_ids(text, tokenizer):
    encoded = tokenizer.encode(text, allowed_special = {'<|endoftext|>'})
    encoded_tensor = torch.tensor(encoded).unsqueeze(0)
    return encoded_tensor

def token_ids_to_text(token_ids, tokenizer):
    flat = token_ids.squeeze(0)
    return tokenizer.decode(flat.tolist())

In [12]:
def generate_text_simple(model, idx, max_new_tokens, context_size): # idx is the input batch
    for _ in range(max_new_tokens):
        # crop current context
        idx_cond = idx[:, -context_size:]
        # get predictions
        with torch.no_grad():
            logits = model(idx_cond) # batch_size x tokens_num x vocab_size
        # get the last time step (last set of logits)
        logits = logits[:, -1, :]
        # apply softmax
        probs = torch.softmax(logits, dim = -1)
        # get id of max
        idx_next = torch.argmax(probs, dim = -1, keepdim = True)
        # append id to running sequence
        idx = torch.cat((idx, idx_next), dim = -1)
    return idx

In [13]:
def generate_and_print_sample(model, tokenizer, device, start_context):
    model.eval()
    context_size = model.pos_emb.weight.shape[0]
    encoded = text_to_token_ids(start_context, tokenizer).to(device)
    with torch.no_grad():
        token_ids = generate_text_simple(
            model = model, idx = encoded, max_new_tokens = 50, context_size = context_size
        )
    decoded_text = token_ids_to_text(token_ids, tokenizer)
    print(decoded_text.replace("\n", " "))
    print()
    model.train()

tokenizer = tiktoken.get_encoding('gpt2')

device = torch.device("cuda")

entry = {'ai' : 'Noa AI includes five specialized tools: General for casual conversation, Memory for storing past information, Companion for emotional support, Web for real-time answers, and Interview for mock interviews based on resume and job role.'}

In [21]:
def generate(model, idx, max_new_tokens, context_size, temperature=0.0, top_k=None, eos_id=None):
    for _ in range(max_new_tokens):
        idx_cond = idx[:, -context_size:]
        with torch.no_grad():
            logits = model(idx_cond)
        logits = logits[:, -1, :]

        # top-k filtering
        if top_k is not None:
            top_logits, _ = torch.topk(logits, top_k)
            min_val = top_logits[:, -1].unsqueeze(-1)  # broadcast for comparison
            logits = torch.where(
                logits < min_val, 
                torch.tensor(float("-inf")).to(logits.device),
                logits
            )

        # temperature scaling
        if temperature > 0.0:
            logits = logits / temperature
            probs = torch.softmax(logits, dim=-1)
            idx_next = torch.multinomial(probs, num_samples=1)
        else:
            idx_next = torch.argmax(logits, dim=-1, keepdim=True)

        if eos_id is not None and (idx_next == eos_id).any():
            break

        idx = torch.cat((idx, idx_next), dim=1)

    return idx

def generate_and_print_sample(model, tokenizer, device, start_context,
                              max_new_tokens = 50, temperature = 0.0, top_k = None, eos_id = None):
    model.eval()
    context_size = model.pos_emb.weight.shape[0]
    encoded = text_to_token_ids(start_context, tokenizer).to(device)

    with torch.no_grad():
        token_ids = generate(
            model=model,
            idx=encoded,
            max_new_tokens=max_new_tokens,
            context_size=context_size,
            temperature=temperature,
            top_k=top_k,
            eos_id=eos_id
        )

    decoded_text = token_ids_to_text(token_ids, tokenizer)
    print(decoded_text.replace("\n", " "))
    print()
    model.train()

In [22]:
def format_input(entry):
    ai_text = (
        f"Given an AI generated text, convert it to my personal style of English. "
        f"\n\n### AI Generated Text:\n{entry['ai']}"
    )

    return ai_text 

In [None]:
generate_and_print_sample(
    model = model.to(device),
    tokenizer = tokenizer,
    device = device,
    start_context = format_input(entry),
    max_new_tokens = 100,
    temperature = 0.2,
    top_k = 50
)

Given an AI generated text, convert it to my personal style of English.   ### AI Generated Text: Noa AI includes five specialized tools: General for casual conversation, Memory for storing past information, Companion for emotional support, Web for real-time answers, and Interview for mock interviews based on resume and job role.  ### Your Personal Style: Noa AI includes 5 specialized functions: General for casual conversations, Memory for information, Companion for answers to questions, Web for real-time answers, and Interview for mock interviews.<|endoftext|>A student at the Indian Institute of Technology and Science in Madurai has raised several issues concerning academic norms.  The issue stems from the recent introduction of mandatory mandatory mandatory minimum payment of 80,000 rupees for university, and the requirement for mandatory minimum payment of 500 ru

