![image.png](attachment:image.png)

### Padding Attention Mask
### Subsequent Attention Mask

# Build a GPT model and text generation tasks.
### Key components:
1. Multi-Head Attention : ScaledDotProductAttention + MultiHeadAttention
2. Position-wise Feedforward Network:PoswiseFeedForwardNet
3. Sin Positional encoding
4. Padding Mask
5. Subsequent Mask
6. Decoder layer
7. Decoder class
8. GPT


In [None]:
import numpy as np
import torch
import torch.nn as nn
d_k = 64
d_v = 64
class ScaledDotProductAttention(nn.Module):
    def __init__(self):
        super(ScaledDotProductAttention, self).__init__()        
    def forward(self, Q, K, V, attn_mask):
        #Q,K,V [batch_size,n_heads,len_q/k/v,dim_q=k/v]
        #attn_mask [batch_size,n_heads,len_q,len_k]
        scores = torch.matmul(Q, K.transpose(-1, -2)) / np.sqrt(d_k)   
        scores.masked_fill_(attn_mask, -1e9) 
        weights = nn.Softmax(dim=-1)(scores) 
        context = torch.matmul(weights, V) 

        return context, weights

In [2]:
d_embedding = 512
n_heads = 8
batch_size = 3
class MultiHeadAttention(nn.Module):
    def __init__(self):
        super(MultiHeadAttention, self).__init__()
        self.W_Q = nn.Linear(d_embedding, d_k * n_heads)
        self.W_K = nn.Linear(d_embedding, d_k * n_heads)
        self.W_V = nn.Linear(d_embedding, d_v * n_heads)
        self.linear = nn.Linear(n_heads * d_v, d_embedding)
        self.layer_norm = nn.LayerNorm(d_embedding)
    def forward(self, Q, K, V, attn_mask):      
        residual, batch_size = Q, Q.size(0)
        q_s = self.W_Q(Q).view(batch_size, -1, n_heads, d_k).transpose(1,2)        
        k_s = self.W_K(K).view(batch_size, -1, n_heads, d_k).transpose(1,2)
        v_s = self.W_V(V).view(batch_size, -1, n_heads, d_v).transpose(1,2)

        attn_mask = attn_mask.unsqueeze(1).repeat(1, n_heads, 1, 1)
        context, weights = ScaledDotProductAttention()(q_s, k_s, v_s, attn_mask)
        context = context.transpose(1, 2).contiguous().view(batch_size, -1, n_heads * d_v) 
        output = self.linear(context) 
        output = self.layer_norm(output + residual)
      
        return output, weights

Posion-wise   FeedForward Network   

In [3]:
class PoswiseFeedForwardNet(nn.Module):
    def __init__(self, d_ff=2048):
        super(PoswiseFeedForwardNet, self).__init__()
        self.conv1 = nn.Conv1d(in_channels=d_embedding, out_channels=d_ff, kernel_size=1)
        self.conv2 = nn.Conv1d(in_channels=d_ff, out_channels=d_embedding, kernel_size=1)
        self.layer_norm = nn.LayerNorm(d_embedding)
    def forward(self, inputs):                    
        residual = inputs
        output = nn.ReLU()(self.conv1(inputs.transpose(1, 2))) 
        output = self.conv2(output).transpose(1, 2) 
        output = self.layer_norm(output + residual) 

        return output

![image.png](attachment:image.png)

In [4]:
def get_sin_enc_table(n_position, embedding_dim):
    """
    n_position
    embedding_dim
    """
    sinusoid_table = np.zeros((n_position, embedding_dim))    
    for pos_i in range(n_position):
        for hid_j in range(embedding_dim):
            angle = pos_i / np.power(10000, 2 * (hid_j // 2) / embedding_dim)
            sinusoid_table[pos_i, hid_j] = angle    
    sinusoid_table[:, 0::2] = np.sin(sinusoid_table[:, 0::2])
    sinusoid_table[:, 1::2] = np.cos(sinusoid_table[:, 1::2])  
 #sinusoid_table [n_position,embedding_dim]
    return torch.FloatTensor(sinusoid_table)

In [5]:
def get_attn_pad_mask(seq_q, seq_k):
    batch_size, len_q = seq_q.size()
    batch_size, len_k = seq_k.size()
    pad_attn_mask = seq_k.data.eq(0).unsqueeze(1)
    pad_attn_mask = pad_attn_mask.expand(batch_size, len_q, len_k)

    return pad_attn_mask

In [6]:
class EncoderLayer(nn.Module):
    def __init__(self):
        super(EncoderLayer, self).__init__()        
        self.enc_self_attn = MultiHeadAttention()   
        self.pos_ffn = PoswiseFeedForwardNet()
    def forward(self, enc_inputs, enc_self_attn_mask):
        enc_outputs, attn_weights = self.enc_self_attn(enc_inputs, enc_inputs,
                                               enc_inputs, enc_self_attn_mask)
        enc_outputs = self.pos_ffn(enc_outputs)

        return enc_outputs, attn_weights

In [7]:
n_layers = 6
class Encoder(nn.Module):
    def __init__(self, corpus):
        super(Encoder, self).__init__()        
        self.src_emb = nn.Embedding(len(corpus.src_vocab), d_embedding)
        self.pos_emb = nn.Embedding.from_pretrained( \
          get_sin_enc_table(corpus.src_len+1, d_embedding), freeze=True)
        self.layers = nn.ModuleList(EncoderLayer() for _ in range(n_layers))
    def forward(self, enc_inputs):  
        pos_indices = torch.arange(1, enc_inputs.size(1) + 1).unsqueeze(0).to(enc_inputs)
        enc_outputs = self.src_emb(enc_inputs) + self.pos_emb(pos_indices)
        enc_self_attn_mask = get_attn_pad_mask(enc_inputs, enc_inputs)        
        enc_self_attn_weights = []
        for layer in self.layers: 
            enc_outputs, enc_self_attn_weight = layer(enc_outputs, enc_self_attn_mask)
            enc_self_attn_weights.append(enc_self_attn_weight)

        return enc_outputs, enc_self_attn_weights

In [8]:
def get_attn_subsequent_mask(seq):
    attn_shape = [seq.size(0), seq.size(1), seq.size(1)]  
    subsequent_mask = np.triu(np.ones(attn_shape), k=1)
    subsequent_mask = torch.from_numpy(subsequent_mask).byte()

    return subsequent_mask

In [9]:
class DecoderLayer(nn.Module):
    def __init__(self):
        super(DecoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention()
        self.feed_forward = PoswiseFeedForwardNet()
        self.norm1 = nn.LayerNorm(d_embedding)
        self.norm2 = nn.LayerNorm(d_embedding)
    def forward(self, dec_inputs, attn_mask=None):
        attn_output, _ = self.self_attn(dec_inputs, dec_inputs, dec_inputs, attn_mask)
        norm1_outputs = self.norm1(dec_inputs + attn_output)
        ff_outputs = self.feed_forward(norm1_outputs)
        dec_outputs = self.norm2(norm1_outputs + ff_outputs)
        return dec_outputs

In [10]:
n_layers = 6
class Decoder(nn.Module):
    def __init__(self, vocab_size, max_seq_len):
        super(Decoder, self).__init__()
        self.src_emb = nn.Embedding(vocab_size, d_embedding)  
        self.pos_emb = nn.Embedding(max_seq_len, d_embedding)     
        self.layers = nn.ModuleList([DecoderLayer() for _ in range(n_layers)]) 
    def forward(self, dec_inputs):        
        positions = torch.arange(len(dec_inputs), device=dec_inputs.device).unsqueeze(-1)
        inputs_embedding = self.src_emb(dec_inputs) + self.pos_emb(positions)
        attn_mask = get_attn_subsequent_mask(inputs_embedding).to(device)
        dec_outputs =  inputs_embedding 
        for layer in self.layers:
            dec_outputs = layer(dec_outputs, attn_mask) 
        return dec_outputs

In [11]:
class GPT(nn.Module):
    def __init__(self, vocab_size, max_seq_len):
        super(GPT, self).__init__()
        self.decoder = Decoder(vocab_size, max_seq_len)
        self.projection = nn.Linear(d_embedding, vocab_size)
    def forward(self, dec_inputs):        
        dec_outputs = self.decoder(dec_inputs)
        logits = self.projection(dec_outputs)
        return logits

In [12]:
from collections import Counter
class LanguageCorpus:
    def __init__(self, sentences):
        self.sentences = sentences
        self.seq_len = max([len(sentence.split()) for sentence in sentences]) + 2
        self.vocab = self.create_vocabulary()
        self.idx2word = {v: k for k, v in self.vocab.items()}
    def create_vocabulary(self):
        vocab = {'<pad>': 0, '<sos>': 1, '<eos>': 2}
        counter = Counter()
        for sentence in self.sentences:
            words = sentence.split()
            counter.update(words)
        for word in counter:
            if word not in vocab:
                vocab[word] = len(vocab)
        return vocab
    def make_batch(self, batch_size, test_batch=False):
        input_batch, output_batch = [], []
        sentence_indices = torch.randperm(len(self.sentences))[:batch_size]
        for index in sentence_indices:
            sentence = self.sentences[index]
            seq = [self.vocab['<sos>']] + [self.vocab[word] for word in sentence.split()] + [self.vocab['<eos>']]
            seq += [self.vocab['<pad>']] * (self.seq_len - len(seq))
            input_batch.append(seq[:-1])
            output_batch.append(seq[1:])
        return torch.LongTensor(input_batch), torch.LongTensor(output_batch)

<sos>Anna likes AI<eos>      
input_batch:<sos>Anna likes AI      
output_batch:Anna likes AI<eos>      

In [None]:
with open("language_database.txt", "r") as file:
    sentences = [line.strip() for line in file.readlines()]
corpus = LanguageCorpus(sentences)
vocab_size = len(corpus.vocab)
max_seq_len = corpus.seq_len
print(f" Vocabulary Size of Corpus: {vocab_size}")
print(f" Maximum Sentence Length: {max_seq_len}")

In [None]:
import torch.optim as optim
device = "cuda" if torch.cuda.is_available() else "cpu"
model = GPT(vocab_size, max_seq_len).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)
epochs = 500
for epoch in range(epochs):
    optimizer.zero_grad()
    inputs, targets = corpus.make_batch(batch_size) 
    inputs, targets = inputs.to(device), targets.to(device)
    outputs = model(inputs)
    loss = criterion(outputs.view(-1, vocab_size), targets.view(-1))
    if (epoch + 1) % 100 == 0:
        print(f"Epoch: {epoch + 1:04d} cost = {loss:.6f}")
    loss.backward()
    optimizer.step()

#Where is the auto-regressive property reflectsd?                
#greedy search algrithm?

In [None]:
def generate_text(model, input_str, max_len=50):
    model.eval()
    input_tokens = [corpus.vocab[token] for token in input_str]
    output_tokens = input_tokens.copy()
    with torch.no_grad():
        for _ in range(max_len):
            inputs = torch.LongTensor(output_tokens).unsqueeze(0).to(device)
            outputs = model(inputs)
            _, next_token = torch.max(outputs[:, -1, :], dim=-1)            
            next_token = next_token.item()         
            if next_token == corpus.vocab["<eos>"]:
                break          
            output_tokens.append(next_token)
    output_str = " ".join([corpus.idx2word[token] for token in output_tokens])
    return output_str
input_str = ["Python"]
generated_text = generate_text(model, input_str)
print(" Generated Text: :", generated_text)

In [None]:
WikiText2