In [None]:
import numpy as np


# Module 9: Practical - Transformer Architecture

We start with the same data preparation steps as in Module 6.

In [7]:
device = torch.device('mps') if torch.backends.mps.is_available() else torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

In [8]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import re
from datasets import load_dataset
from transformers import AutoModelForCausalLM  


model = AutoModelForCausalLM.from_pretrained("openai-community/gpt2").to(device)
dataset = load_dataset("rajpurkar/squad")

README.md: 0.00B [00:00, ?B/s]

To support symlinks on Windows, you either need to activate Developer Mode or to run Python as an administrator. In order to activate developer mode, see this article: https://docs.microsoft.com/en-us/windows/apps/get-started/enable-your-device-for-development
  # Likely running on Windows
Xet Storage is enabled for this repo, but the 'hf_xet' package is not installed. Falling back to regular HTTP download. For better performance, install the package with: `pip install huggingface_hub[hf_xet]` or `pip install hf_xet`


train-00000-of-00001.parquet:   0%|          | 0.00/14.5M [00:00<?, ?B/s]

Xet Storage is enabled for this repo, but the 'hf_xet' package is not installed. Falling back to regular HTTP download. For better performance, install the package with: `pip install huggingface_hub[hf_xet]` or `pip install hf_xet`


validation-00000-of-00001.parquet:   0%|          | 0.00/1.82M [00:00<?, ?B/s]

Generating train split:   0%|          | 0/87599 [00:00<?, ? examples/s]

Generating validation split:   0%|          | 0/10570 [00:00<?, ? examples/s]

In [11]:
print(dataset)

DatasetDict({
    train: Dataset({
        features: ['id', 'title', 'context', 'question', 'answers'],
        num_rows: 87599
    })
    validation: Dataset({
        features: ['id', 'title', 'context', 'question', 'answers'],
        num_rows: 10570
    })
})


In [29]:
from transformers import AutoTokenizer
tokenizer = GPT2Tokenizer.from_pretrained('openai-community/gpt2',use_fast=True,local_files_only=False)

# Create sequences
SEQ_LEN = 30
class TextDataset(Dataset):
    def __init__(self, dataset, split='train'):
        self.data = dataset[split]

    def __len__(self):
        return len(self.data) - SEQ_LEN

    def __getitem__(self, idx):
        example = self.data[idx]
        prompt = f"Question: {example['question']}\nContext: {example['context']}\nAnswer:"
        answer = f"{prompt} {example['answers']['text'][0]}"
        return prompt, answer
        #return self.data[idx]
            #torch.tensor(self.data[idx+1:idx+SEQ_LEN+1]))

train_datasets = TextDataset(dataset)
print(train_datasets[0])
train_loader = DataLoader(train_datasets, batch_size=64, shuffle=True)


RemoteEntryNotFoundError: 404 Client Error. (Request ID: Root=1-692f0b63-11a8729d2dd056c07f9f5d24;90c6866c-61b5-47d4-ac0c-0decc80630fe)

Entry Not Found for url: https://huggingface.co/api/models/openai-community/gpt2/tree/main/additional_chat_templates?recursive=false&expand=false.
additional_chat_templates does not exist on "main"

Let's see what the first pair of input/output sequences look like.

In [None]:
next(iter(train_loader))


We now define the causal attention mask.  Recall that this mask simply zeroes out the attention weights for future tokens in the sequence. This is done to ensure that the model does not have access to future tokens when making predictions.

In [None]:
def causal_attention_mask(n_dest, n_src, device):
    i = torch.arange(n_dest, device=device).unsqueeze(1)
    j = torch.arange(n_src, device=device).unsqueeze(0)
    return i >= j


# Example usage:
mask = causal_attention_mask(10, 10, device)
print(mask[0].T)


Recall that we also need to define a position embedding.  Here we will use a simple positional encoding corresponding to the embedding of the index of the token in the sequence.

Next we define the Transformer block, consisting of, in addition to the usual fully connected layers, also multi-head attention and layer normalization layers.

In [None]:
from tqdm import tqdm

def train_gpt(model, dataloader, optimizer, criterion, epochs, device):
    model.to(device)
    model.train()

    for epoch in range(epochs):
        total_loss = 0

        data_loader_with_progress = tqdm(
            iterable=dataloader, ncols=120, desc=f"Epoch {epoch+1}/{epochs}"
        )
        for batch_number, (inputs, targets) in enumerate(data_loader_with_progress):
            inputs = inputs.to(device)
            targets = targets.to(device)
            optimizer.zero_grad()
            logits, _ = model(inputs)
            loss = criterion(logits.view(-1, logits.size(-1)), targets.view(-1))
            loss.backward()
            optimizer.step()

            total_loss += loss.item()
            if (batch_number % 100 == 0) or (batch_number == len(dataloader) - 1):
                data_loader_with_progress.set_postfix(
                    {
                        "avg loss": f"{total_loss/(batch_number+1):.4f}",
                    }
                )            


We can now use the trained GPT to generate text.  The model will generate a sequence of tokens based on the input prompt. We can use the inverse mapping from our vocabulary to "translate" the tokens to natural text.

In [None]:
class TextGenerator:
    def __init__(self, model, index_to_word, top_k=10):
        self.model = model
        self.model.to(device)
        self.index_to_word = index_to_word
        self.word_to_index = {word: idx for idx, word in enumerate(index_to_word)}

    def sample_from(self, probs, temperature):
        probs[1] = 0  # Mask out UNK token (index 1) to prevent generating <UNK>
        probs = torch.nn.functional.softmax(probs/temperature, dim=-1)
        next_id = torch.multinomial(probs, num_samples=1).item()
        return next_id, probs

    def generate(self, start_prompt, max_tokens, temperature):
        self.model.eval()
        start_tokens = [self.word_to_index.get(w, 1) for w in start_prompt.split()]
        generated_tokens = start_tokens[:]
        info = []

        with torch.no_grad():
            while len(generated_tokens) < max_tokens:
                x = torch.tensor([generated_tokens], dtype=torch.long)
                x = x.to(device)
                logits, attn_weights = self.model(x)
                last_logits = logits[0, -1] # .cpu().numpy()
                sample_token, probs = self.sample_from(last_logits, temperature)
                generated_tokens.append(sample_token)
                info.append({
                    "prompt": start_prompt,
                    "word_probs": probs,
                    "atts": attn_weights[0].cpu().numpy()
                })
                if sample_token == 0:
                    break
        print("GEN", generated_tokens)
        generated_words = [self.index_to_word.get(idx, "<UNK>") for idx in generated_tokens]
        print("generated text:" + " ".join(generated_words))
        return info


In [None]:
text_generator = TextGenerator(model, inv_vocab)
info = text_generator.generate("captain ", max_tokens=180, temperature=3.0)
