# Generative Artificial Intelligence

* How to create a model that can generate images/text/audio/etc. from user prompts?
* Unlabelled data availbale in abundance. Learn hidden sructure from the unlabelled data.

In this notebook we specifically address the task of text generation.

### What is text generation?
Given a sequence, predict what is the next token in the sequence. The sequence can be a series of words or characters and the objective is to predict next word or character respectively in sequence.
$$P(w_{t} | w_{t-1}, w_{t-2}, w_{t-3},...,w_{1})$$

#### Some basic terminology:
**Tokens/Tokenization** - Given a sequence of characters, tokenization is the process of dividing the sequence into smaller units called tokens. Tokens can be individual characters, segments of words, complete words or portions of sentences. Tokens obtained are converted into 1-hot vectors to be fed into the model.

**Generative Model** - A model that learns to sample from the probability distribution to generate data that seem to be from the same probability distribution as training data.

**Discriminative Model** - In contrast to generative models, discriminative models are trained to differentiate between classes or categories.


### Dataset

In [1]:
# Read the input corpus
with open('tiny_shakespeare.txt', 'r', encoding='utf-8') as f:
    text = f.read()
print("Length of text: ", len(text))
print(f"\nSample text:\n{text[:400]}")

Length of text:  1115393

Sample text:
First Citizen:
Before we proceed any further, hear me speak.

All:
Speak, speak.

First Citizen:
You are all resolved rather to die than to famish?

All:
Resolved. resolved.

First Citizen:
First, you know Caius Marcius is chief enemy to the people.

All:
We know't, we know't.

First Citizen:
Let us kill him, and we'll have corn at our own price.
Is't a verdict?

All:
No more talking on't; let it 


### Tokenization
One of the easiest language model to start with is the character level model where each character is a token. It encodes minimum token level information but is easy to implement.

In [2]:
# Create characters as vocabulary
chars = sorted(list(set(text)))
vocab_size = len(chars)
print("Vocabulary: ", ''.join(chars))
print("Vocabulary size: ", vocab_size)

Vocabulary:  
 !$&',-.3:;?ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz
Vocabulary size:  65


To feed characters into a model they need to converted into numbers that can be processed by a model.

In [3]:
# Encoder and decoder function for idx to char and back
stoi = {ch:i for i, ch in enumerate(chars)}
itos = {i:ch for i, ch in enumerate(chars)}
encode = lambda s: [stoi[c] for c in s]
decode = lambda l: ''.join([itos[i] for i in l])

In [4]:
print(encode('Shakespeare'))
print(decode(encode('Shakespeare')))

[31, 46, 39, 49, 43, 57, 54, 43, 39, 56, 43]
Shakespeare


In [5]:
# Convert text to torch tensor
import torch
device = 'cuda' if torch.cuda.is_available() else 'cpu'
data = torch.tensor(encode(text), dtype=torch.long)

In [6]:
# Split data into train and validation
n = int(0.9 * len(data))
train_data = data[:n]
val_data = data[n:]

# Get single batch of data for training
def get_batch(split='train', block_size=8, batch_size=4):
    data = train_data if split == 'train' else val_data
    ix = torch.randint(len(data) - block_size, (batch_size,))
    x = torch.stack([data[i : i + block_size] for i in ix])
    y = torch.stack([data[i + 1 : i + block_size + 1] for i in ix])
    x, y = x.to(device), y.to(device)
    return x, y

In [7]:
x, y  = get_batch(split='train')
x[0], y[0]

(tensor([52, 53, 61, 52,  1, 58, 53,  1]),
 tensor([53, 61, 52,  1, 58, 53,  1, 52]))

### Generative Model
**Possible generative models**:
1. **N-gram model** - Given n-previous tokens in the sequence, predict the next token. Most common approaches are bigram or trigram model with bayes estimation. Larger the value of **N**, more context information can be incorporated.
2. **Recurrent neural networks** - A goto neural network achitecture for working with sequential data. Behind the scenes, just a neural network that processes each token of the sequential input one at a time. 

<p align="center">
<img src="assets/rnn.webp" width="700">
</p>

Condenses entire history of the sequence into a single vecctor. Theoretically RNNs can process infinite history but this is limited proctically by computational constraints and memory requirements. Even with a large enough history, RNNs struggle with long term dependencies.

3. **Transformer models** - Introduced in 2017 by the paper [Attention Is All You Need](https://arxiv.org/pdf/1706.03762.pdf). The paper introduces an architecture that provides a differentiable lookup method for the called `Attention` that potentially solves the problem of long term dependencies by allowing the model to lookup specific information from the history as required.

e.g., Prompt - Where is Eiffel Tower located? Answer - It is located in Paris. Here `It` is related to `Eiffel Tower`, `is` to `is` and `located` to `located`.

In this notebook we will start with a simple Bigram model and slowly build our way towards a Transformer model.

### Bigram Model

**Embedding layer** - Converts from an index-based representation to a vector representation i.e., each index is mapped to a vector.

In [8]:
import torch.nn as nn
from torch.nn import functional as F
torch.manual_seed(1337)

example_layer = nn.Embedding(vocab_size, vocab_size)
print(x[0])
print(example_layer(x[0]).shape)

tensor([52, 53, 61, 52,  1, 58, 53,  1])
torch.Size([8, 65])


In [9]:
class BigramLanguageModel(nn.Module):
    def __init__(self, vocab_size):
        super().__init__()
        self.token_embedding_table = nn.Embedding(vocab_size, vocab_size)
        
    def forward(self, idx, targets=None):
        logits = self.token_embedding_table(idx) # (B, T, C) (4, 8, 65)
        
        if targets is None:
            loss = None
        else:
            B, T, C = logits.shape
            logits = logits.view(B * T, C)
            targets = targets.view(B * T)
            loss = F.cross_entropy(logits, targets)
        
        return logits, loss
    
    def generate(self, idx, max_new_tokens):
        for _ in range(max_new_tokens):
            logits, loss = self(idx)  # calls forward function
            logits = logits[:, -1, :] # only consider the last output
            probs = F.softmax(logits, dim=-1) # normalize it to a probabilty distribution
            idx_next = torch.multinomial(probs, num_samples=1) # Sample from the distribution
            idx = torch.cat((idx, idx_next), dim=1) # Add it to the generated sequence
            
        return idx

#### Instantiate Bigram model

In [10]:
m = BigramLanguageModel(vocab_size)
xb, yb = get_batch(split='train')
logits, loss = m(xb, yb)
print(logits.shape, yb.shape)

torch.Size([32, 65]) torch.Size([4, 8])


In [11]:
idx = torch.tensor(encode('Thou'), dtype=torch.long).unsqueeze(dim=0)
print(decode(m.generate(idx, 10)[0].tolist()))

ThoutMtoIQlNhv


#### Train the model

In [12]:
optimizer = torch.optim.AdamW(m.parameters(), lr=1e-3)
batch_size = 32
eval_iters = 300
train_iters = 10000

@torch.no_grad()
def estimate_loss(model):
    model.eval()
    out = {}
    for split in ['train', 'val']:
        losses = torch.zeros(eval_iters)
        for k in range(eval_iters):
            X, Y = get_batch(split=split)
            logits, loss = model(X, Y)
            losses[k] = loss.item()
        out[split] = loss.mean()
    model.train()
    return out['train'], out['val']

def train_model(model, optimizer):
    for step in range(train_iters):
        xb, yb = get_batch('train', batch_size=batch_size)
        logits, loss = m(xb, yb)
        optimizer.zero_grad(set_to_none=True)
        loss.backward()
        optimizer.step()
        if ((step + 1) % 1000 == 0):
            train_loss, val_loss = estimate_loss(model=model)
            print(f'Step {step + 1}: train loss {train_loss:.4f}, validation loss {val_loss:.4f}')

train_model(m, optimizer)
print(f'Trained model validation loss: {estimate_loss(m)[1]:.4f}')

Step 1000: train loss 3.5794, validation loss 3.6787
Step 2000: train loss 3.1601, validation loss 3.4702
Step 3000: train loss 2.6498, validation loss 2.4051
Step 4000: train loss 2.8391, validation loss 2.6292
Step 5000: train loss 2.6887, validation loss 2.1569
Step 6000: train loss 2.1346, validation loss 2.9213
Step 7000: train loss 2.5841, validation loss 2.7375
Step 8000: train loss 2.5508, validation loss 3.0115
Step 9000: train loss 2.5025, validation loss 2.5043
Step 10000: train loss 2.5324, validation loss 2.4189
Trained model validation loss: 2.6159


In [13]:
idx = torch.tensor(encode('Thou'), dtype=torch.long).unsqueeze(dim=0)
print(decode(m.generate(idx, 10)[0].tolist()))

Thoue
I o o to


### Transformer Models

In bigram model, we are only considering the last character of the sequence to generate a new character. With the help of transformers we will enable the model to look into the entire history i.e., all the characters (limited to block size of the data) in the sequence so far.

At the core of the transformer is a `single attention head` (referred to in the paper as Scaled Dot-Product Attention).

<p align="center">
<img src="assets/Single-head attention.png" width="200">
</p>

$$Attention(Q, K, V) = softmax(\frac{QK^T}{\sqrt(d_k)})V$$

**Q, K, V** - Query, Key and Value vectors corresponding to the vector representations for each token.

In [14]:
xb, yb = get_batch(split='train')
print(xb[0, :4])

for b in range(1):
    for t in range(4):
        context = xb[b, :t+1]
        target = yb[b, t]
        print(f"input: {context}, target: {target}")

tril = torch.tril(torch.ones(4, 4))
print(tril)

tensor([53, 51, 43,  1])
input: tensor([53]), target: 51
input: tensor([53, 51]), target: 43
input: tensor([53, 51, 43]), target: 1
input: tensor([53, 51, 43,  1]), target: 51
tensor([[1., 0., 0., 0.],
        [1., 1., 0., 0.],
        [1., 1., 1., 0.],
        [1., 1., 1., 1.]])


In [15]:
class SingleHeadAttention(nn.Module):
    def __init__(self, head_size=32, embed_dim=32, block_size=8, dropout=0.4) -> None:
        super().__init__()
        self.key = nn.Linear(embed_dim, head_size, bias=False)
        self.query = nn.Linear(embed_dim, head_size, bias=False)
        self.value = nn.Linear(embed_dim, head_size, bias=False)
        self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x):
        B, T, C = x.shape
        k = self.key(x) # (B, T, C)
        q = self.query(x) # (B, T, C)
        # Attention scores
        weights = q @ k.transpose(-2, -1) * C**-0.5 # # (B, T, C) @ # (B, C, T) --> # (B, T, T)
        weights = weights.masked_fill(self.tril[:T, :T] == 0, float('-inf')) # (B, T, T)
        weights = F.softmax(weights, dim=-1) # (B, T, T)
        weights = self.dropout(weights)
        # weighted aggregation of values
        v = self.value(x) # (B, T, C)
        context = weights @ v # (B, T, T) @ (B, T, C) -> (B, T, C)
        return context

    
class LanguageModelBase(nn.Module):
    def __init__(self, head_size=32, embed_dim=32, block_size=8, dropout=0.4) -> None:
        super().__init__()
        self.block_size = block_size
        self.token_embedding_table = nn.Embedding(vocab_size, embed_dim)
        self.positional_embedding_table = nn.Embedding(block_size, embed_dim)
        self.attention_head = SingleHeadAttention(head_size, embed_dim, block_size, dropout)
        self.lm_head = nn.Linear(head_size, vocab_size)
        
    def forward(self, idx, targets = None):
        _, T = idx.shape
        token_embed = self.token_embedding_table(idx) # (B, T, embed_dim)
        pos_embed = self.positional_embedding_table(torch.arange(T, device=device)) # (T, embed_dim)
        x = token_embed + pos_embed # (B, T, embed_dim)
        x = self.attention_head(x) # (B, T, head_size)
        logits = self.lm_head(x) # (B, head_size, vocab_size)
        
        if targets is None:
            loss = None
        else:
            B, T, C = logits.shape
            logits = logits.view(B * T, C)
            targets = targets.view(B * T)
            loss = F.cross_entropy(logits, targets)
        return logits, loss
        
    def generate(self, idx, max_new_tokens):
        for _ in range(max_new_tokens):
            idx_cond = idx[:, -self.block_size:]
            logits, loss = self(idx_cond)
            logits = logits[:, -1, :]
            probs = F.softmax(logits, dim=-1)
            idx_next = torch.multinomial(probs, num_samples=1)
            idx = torch.cat((idx, idx_next), dim=1)
            
        return idx

In [16]:
model = LanguageModelBase()
m = model.to(device)
print(sum(p.numel() for p in m.parameters()) / 1e3, 'K parameters')
optimizer = torch.optim.AdamW(m.parameters(), lr=1e-3)
train_model(model=m, optimizer=optimizer)
print(f'Trained model validation loss: {estimate_loss(m)[1]:.4f}')

7.553 K parameters
Step 1000: train loss 2.5169, validation loss 2.6409
Step 2000: train loss 2.1906, validation loss 2.5111
Step 3000: train loss 2.3107, validation loss 2.4914
Step 4000: train loss 2.6634, validation loss 2.4616
Step 5000: train loss 2.5612, validation loss 2.1113
Step 6000: train loss 2.5049, validation loss 2.8011
Step 7000: train loss 2.5295, validation loss 2.4417
Step 8000: train loss 2.5306, validation loss 2.5736
Step 9000: train loss 2.4072, validation loss 2.6101
Step 10000: train loss 2.5806, validation loss 2.5888
Trained model validation loss: 2.5693


In [17]:
idx = torch.tensor(encode('Thou'), dtype=torch.long).unsqueeze(dim=0)
print(decode(m.generate(idx, 10)[0].tolist()))

Thouw wa eacer


#### Multi-head Attention