<a href="https://colab.research.google.com/github/Bhaanupriyaranjit/DLProject/blob/main/Shakespearellm.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
import torch
print(torch.cuda.is_available())
print(torch.cuda.get_device_name(0))

True
Tesla T4


In [3]:
!pip install torch torchvision matplotlib
import torch
import torch.nn as nn
from torch.nn import functional as F
import urllib.request
import matplotlib.pyplot as plt
print("GPU available:", torch.cuda.is_available())


GPU available: True


In [4]:
#Dataset prep

url = "https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt"
data = urllib.request.urlopen(url).read().decode("utf-8")
print(f"Dataset length: {len(data):,} characters")
print(data[:500])  #to check few lines


Dataset length: 1,115,394 characters
First Citizen:
Before we proceed any further, hear me speak.

All:
Speak, speak.

First Citizen:
You are all resolved rather to die than to famish?

All:
Resolved. resolved.

First Citizen:
First, you know Caius Marcius is chief enemy to the people.

All:
We know't, we know't.

First Citizen:
Let us kill him, and we'll have corn at our own price.
Is't a verdict?

All:
No more talking on't; let it be done: away, away!

Second Citizen:
One word, good citizens.

First Citizen:
We are accounted poor


In [5]:
#tokenization step
from torch.utils.data import Dataset

#configuration class for model hyperparameters-Batch Size B=128 and block size, N=128
class Config:
    def __init__(self, batch_size=128, block_size=128):
        self.batch_size = batch_size
        self.block_size = block_size

class CharDataset(Dataset):
    """
    Emits batches of characters.

    Adapted from "https://github.com/karpathy/minGPT".
    """

    def __init__(self, config, data):

        self.data = data
        self.block_size = config.block_size
        self.batch_size = config.batch_size
        self.device = 'cuda' if torch.cuda.is_available() else 'cpu'

        chars = sorted(list(set(self.data))) # get characters from the input data
        self.stoi = { ch:i for i,ch in enumerate(chars) } # map characters to integer indices
        self.itos = { i:ch for i,ch in enumerate(chars) }  # map integer indices to characters
        self.vocab_size = len(chars)
        print(f"Vocab size: {self.vocab_size} unique characters")


    def get_vocab_size(self):
        return self.vocab_size

  #to know,from this text, how many training examples (input x,target pairs,y) can it make, ie. how many valid training smaples it can make
    def __len__(self):
        return len(self.data) - self.block_size

    def __getitem__(self, idx):
        # grab a chunk of (block_size + 1) characters from the data
        # encode every character to an integer
        # return the chunk and the shifted version as tensors
        chunk = self.data[idx : idx + self.block_size + 1]
        encoded = torch.tensor([self.stoi[c] for c in chunk], dtype=torch.long)
        x = encoded[:-1]
        y = encoded[1:]
        return x.to(self.device), y.to(self.device)

In [6]:
#to verify tokenization part

#for training and validation-train set(90% of the data for training) and validation set(10% of data)
n = int(0.9 * len(data))
train_text = data[:n]
val_text   = data[n:]

cfg = Config(batch_size=128, block_size=128)
train_dataset = CharDataset(cfg, train_text)
val_dataset   = CharDataset(cfg, val_text)

x, y = train_dataset[0]
print("x shape:", x.shape)
print("y shape:", y.shape)

itos = train_dataset.itos
print("Input  (x):", ''.join([itos[i.item()] for i in x[:60]]))
print("Target (y):", ''.join([itos[i.item()] for i in y[:60]]))


#to simulate a batch
def get_batch(dataset, batch_size=cfg.batch_size):
    ix = torch.randint(len(dataset), (batch_size,))
    x = torch.stack([dataset[i][0] for i in ix])
    y = torch.stack([dataset[i][1] for i in ix])
    return x, y

xb, yb = get_batch(train_dataset)
print("Batch shapes:", xb.shape, yb.shape)



Vocab size: 65 unique characters
Vocab size: 61 unique characters
x shape: torch.Size([128])
y shape: torch.Size([128])
Input  (x): First Citizen:
Before we proceed any further, hear me speak.
Target (y): irst Citizen:
Before we proceed any further, hear me speak.

Batch shapes: torch.Size([128, 128]) torch.Size([128, 128])


In [7]:
#week 2
#Model Architecture part begins here - model config setting, single attentionhead, multiattention head,feed forward-mlp,transformer block-full shakespeare gpt model

import math

device = 'cuda' if torch.cuda.is_available() else 'cpu'
print("Using device:", device)

#to set the configuration of the model- vocab_size: how many unique characters, block_size: sequence length (N), n_layer: how many transformer blocks, n_head: how many attention heads,n_embd: embedding dimension,dropout: dropout rate
class GPTConfig:

# setting as per proj req - $12$ layers, $8$ attention heads, and $768$ embedding dimensions,
    def __init__(self, vocab_size, block_size,
                 n_layer=12, n_head=8, n_embd=768, dropout=0.1):
        self.vocab_size = vocab_size
        self.block_size = block_size
        self.n_layer = n_layer
        self.n_head = n_head
        self.n_embd = n_embd
        self.dropout = dropout



Using device: cuda


In [8]:
#single attention head(causal)- projects input x to Q, K, V, applies scaled dot product attention,and uses a causal mask so tokens cannot attend to the future(no cheating as it cannot see future tokens)
class Head(nn.Module):

    def __init__(self, cfg: GPTConfig, head_size: int):
        super().__init__()
        self.key   = nn.Linear(cfg.n_embd, head_size, bias=False)
        self.query = nn.Linear(cfg.n_embd, head_size, bias=False)
        self.value = nn.Linear(cfg.n_embd, head_size, bias=False)

        #to precompute a causal mask of shape (block_size, block_size)
        mask = torch.tril(torch.ones(cfg.block_size, cfg.block_size))
        self.register_buffer("mask", mask)  #not a parameter, but moves with device

        self.dropout = nn.Dropout(cfg.dropout)

    def forward(self, x):
        # x: (B, T, C)
        B, T, C = x.shape

        #linear projections
        k = self.key(x)   # (B, T, head_size)
        q = self.query(x) # (B, T, head_size)
        v = self.value(x) # (B, T, head_size)

        #to compute attention scores: (B, T, T)
        att = q @ k.transpose(-2, -1) / math.sqrt(k.shape[-1])

        #to apply causal mask so position i cannot see positions > i(future positions)
        att = att.masked_fill(self.mask[:T, :T] == 0, float("-inf"))

        #to apply softmax over last dimension-attention weights
        att = F.softmax(att, dim=-1)

        att = self.dropout(att)

        #attention-weighted sum of values
        out = att @ v  #(B, T, head_size)
        return out

In [9]:
#multihead attention part- it runs several attention heads(8 here) in parallel, concatenate their outputs and project back to embedding dimension

class MultiHeadAttention(nn.Module):

#embedding dimension = number_of_heads Ã— head_size, so head_size = n_embd // n_head as each head needs to project Q, K, V into equal-sized subspaces

    def __init__(self, cfg: GPTConfig):
        super().__init__()
        assert cfg.n_embd % cfg.n_head == 0   #to prevent invalid configurations
        head_size = cfg.n_embd // cfg.n_head
        self.heads = nn.ModuleList([Head(cfg, head_size) for _ in range(cfg.n_head)])
        self.proj = nn.Linear(cfg.n_embd, cfg.n_embd)
        self.dropout = nn.Dropout(cfg.dropout)

#to concatenate outputs from each head on the channel dimension
    def forward(self, x):
        out = torch.cat([h(x) for h in self.heads], dim=-1)  #(B, T, C)
        out = self.proj(out)
        out = self.dropout(out)
        return out

In [10]:
#feed forward network-mlp- simple 2 layer mlp applied to each position separately-- Linear layer-GELU-Linear-Dropout
class FeedForward(nn.Module):
    def __init__(self, cfg: GPTConfig):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(cfg.n_embd, 4 * cfg.n_embd), #input embedding- C dim , in linear layer -expands features so 4*C(768--> 3072)
            #Gaussian Error Linear Unit-standard activation function fro trasnformers to add non linearity
            nn.GELU(),
            nn.Linear(4 * cfg.n_embd, cfg.n_embd), #2nd linear layer - back to original embdeeing size
            nn.Dropout(cfg.dropout), #to add regularization
        )

    def forward(self, x):
        return self.net(x)

In [11]:
#one transformer block- x = x + self.CausalSelfAttn(self.LayerNorm_1(x)),out = x + self.MLP(self.LayerNorm_2(x))
class TransformerBlock(nn.Module):

    def __init__(self, cfg: GPTConfig):
        super().__init__()
        self.ln1 = nn.LayerNorm(cfg.n_embd)
        self.ln2 = nn.LayerNorm(cfg.n_embd)
        self.attn = MultiHeadAttention(cfg)
        self.mlp = FeedForward(cfg)

#self attention with residual connection and mlp with residual connectio
    def forward(self, x):
        x = x + self.attn(self.ln1(x))
        x = x + self.mlp(self.ln2(x))
        return x


In [12]:
#implemnetation of the full shakespeare style GPT like model-decoder only transformer for character-level language modeling
class ShakespeareGPT(nn.Module):
    def __init__(self, cfg: GPTConfig):
        super().__init__()
        self.cfg = cfg

        #token and position embeddings
        self.WTE = nn.Embedding(cfg.vocab_size, cfg.n_embd)#converts each  int id of char to vector emb of dim 768
        self.WPE = nn.Embedding(cfg.block_size, cfg.n_embd)#block szie, T=N=128, WPE-->0,1,2, ...,block_size - 1   (0 to 127 for block_size=128)

        #randomly 0s some dimensions during training for regularization
        self.dropout = nn.Dropout(cfg.dropout)

        #12 layers of transformer blocks
        self.blocks = nn.ModuleList([TransformerBlock(cfg) for _ in range(cfg.n_layer)])

        #final layer norm and LM head
        self.Final_LayerNorm = nn.LayerNorm(cfg.n_embd)
        self.LM_Head = nn.Linear(cfg.n_embd, cfg.vocab_size, bias=False)

#B=128 batch size, T=N=128-block size(each seq length), C=768-channel-emb dim
#idx: (B, T)(128,128) integer token ids,targets: (B, T) integer next-token ids (for loss)

    def forward(self, idx, targets=None):
        B, T = idx.shape
        #to prevent error and make sure input is nt longer than the model can handle
        assert T <= self.cfg.block_size, "Sequence length > block_size"

        #token embeddings
        tok_emb = self.WTE(idx)  #(B, T, C)

        #position embeddings
        pos = torch.arange(0, T, device=idx.device).unsqueeze(0)  #(1,T)
        pos_emb = self.WPE(pos)                                   #(1,T,C)

        #add token and pos emb + dropout
        x = self.dropout(tok_emb + pos_emb)                       #(B,T,C)

        #to pass through 12 transformer blocks
        for block in self.blocks:
            x = block(x)                                          #(B-128,T-128,C-768)

        #final layer norm
        x = self.Final_LayerNorm(x)

        #language modeling head-gives prob for each char
        logits = self.LM_Head(x)                                  #(B,T,vocab_size-65)

        #optional loss calculation for training-next char pred loss
        loss = None
        if targets is not None:
            #flatten B,T into 1 dimension
            logits_flat = logits.view(B * T, self.cfg.vocab_size) #(128*128,65)
            targets_flat = targets.view(B * T)                    #(128*128)
            loss = F.cross_entropy(logits_flat, targets_flat)

        return logits, loss

  #to implement this auto-regressive behavior-for the repeat loop- a method to generate tokens given a prompt
    @torch.no_grad()
    def generate(self, idx, max_new_tokens):
      self.eval()

      for _ in range(max_new_tokens):
        idx_cond = idx[:, -self.cfg.block_size:] #crop sequence to last block_size

        #forward pass
        logits, _ = self(idx_cond)

        #take logits of last time step
        logits = logits[:, -1, :]

        #turn into probabilities
        probs = F.softmax(logits, dim=-1)

        #sample next character
        next_token = torch.multinomial(probs, num_samples=1)
        #append
        idx = torch.cat((idx, next_token), dim=1)

      return idx





In the prev gpt like model code, check if custom manual weight initialization is needed ot not-currently, just going with pytorch default weight initialization

In [13]:
#for verification part-model forward pass

vocab_size = train_dataset.get_vocab_size()
cfg_model = GPTConfig(
    vocab_size=vocab_size,
    block_size=128,
    n_layer=12,
    n_head=8,
    n_embd=768,
    dropout=0.1
)

model = ShakespeareGPT(cfg_model).to(device)

#to take  abatch
xb, yb = get_batch(train_dataset)

#to run a forward pass
with torch.no_grad():
    logits, loss = model(xb.to(device), yb.to(device))

print("Logits shape:", logits.shape)
print("Loss:", loss.item())


Logits shape: torch.Size([128, 128, 65])
Loss: 4.293853759765625


In [16]:
prompt = "O God"
encoded = torch.tensor([[train_dataset.stoi[c] for c in prompt]], device=device)

generated = model.generate(encoded, max_new_tokens=20)

decoded = ''.join([train_dataset.itos[int(i)] for i in generated[0]])

print(decoded)


O GodGw&Hh&SQ$kXmlI'H!fA$


In [None]:
#Trainig Part