In [1]:
#@title GPT-3

import torch
from torch import nn, optim
import functorch
import math
import random
import datasets
import spacy
import gc
import re
from datasets import load_dataset
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F
import tiktoken  # GPT tokenizer
from torch import optim
from tqdm import tqdm
from collections import Counter
from itertools import islice, chain # Mix two languages
from torch.amp import autocast, GradScaler
from typing import List

class Attention(nn.Module):
  def __init__(self, d_model, num_heads, dropout):
    super().__init__()

    self.d_model = d_model
    self.num_heads = num_heads
    self.d_k = d_model // num_heads

    assert d_model % num_heads == 0, "d_model must be divisible by num_heads"

    self.w_q = nn.Linear(d_model, d_model)
    self.w_k = nn.Linear(d_model, d_model)
    self.w_v = nn.Linear(d_model, d_model)
    self.w_o = nn.Linear(d_model, d_model)

    self.dropout = nn.Dropout(dropout)

  def forward(self, x, mask = None): # mask is optional
    b, s, _ = x.size() # b = batch_size, s = seq_len
    # (bs, seq, num_heads, d_k) → (bs, num_heads, seq, d_k)
    q = self.w_q(x).view(b, s, self.num_heads, self.d_k).transpose(1,2)
    k = self.w_k(x).view(b, s, self.num_heads, self.d_k).transpose(1,2)
    v = self.w_v(x).view(b, s, self.num_heads, self.d_k).transpose(1,2)

    # scaling dividing by math.sqrt(self.d_k)
    attn_out = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(self.d_k)
    if mask is not None:
      # mask changes pos 0 to -inf so that makes prob to 0 after softmax
      attn_out = attn_out.masked_fill(mask == 0, float('-inf'))
    attn_out = torch.softmax(attn_out, dim=-1)
    attn_out = self.dropout(attn_out)
    attn_out = torch.matmul(attn_out, v)
    attn_out = attn_out.transpose(1,2).reshape(b, s, self.d_model)
    attn_out = self.w_o(attn_out)

    return attn_out

class MultiHeadAttention(nn.Module):
  def __init__(self, d_model, num_heads, dropout):
    super().__init__()
    self.d_model = d_model
    self.num_heads = num_heads
    self.d_k = d_model // num_heads

    assert d_model % num_heads == 0, "d_model must be divisible by num_heads"

    self.w_q = nn.Linear(d_model, d_model)
    self.w_k = nn.Linear(d_model, d_model)
    self.w_v = nn.Linear(d_model, d_model)
    self.w_o = nn.Linear(d_model, d_model)

    self.dropout = nn.Dropout(dropout)

  def forward(self, q, k, v, mask = None):
    """
    q: (b, q_len, d_model)
    k,v: (b, kv_len, d_model)
    """
    b, q_len, _ = q.size() # b = batch_size, s = seq_len
    b, kv_len, _ = k.size()
    # (bs, seq, num_heads, d_k) → (bs, num_heads, seq, d_k)
    q = self.w_q(q).view(b, q_len, self.num_heads, self.d_k).transpose(1,2)
    k = self.w_k(k).view(b, kv_len, self.num_heads, self.d_k).transpose(1,2)
    v = self.w_v(v).view(b, kv_len, self.num_heads, self.d_k).transpose(1,2)

    # scaling dividing by math.sqrt(self.d_k)
    attn_out = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(self.d_k)
    if mask is not None:
      # mask changes pos 0 to -inf so that makes prob to 0 after softmax
      attn_out = attn_out.masked_fill(mask == 0, float('-inf'))
    attn_out = torch.softmax(attn_out, dim=-1)
    attn_out = self.dropout(attn_out)
    attn_out = torch.matmul(attn_out, v)
    attn_out = attn_out.transpose(1,2).reshape(b, q_len, self.d_model)
    attn_out = self.w_o(attn_out)

    return attn_out

def look_ahead_mask_(q_len, k_len=None, device=None):
    """
    Improved causal mask:
      - supports q_len != k_len (useful when using cached past key/values)
      - returns a boolean mask of shape (1, 1, q_len, k_len) where True = allowed, False = masked
    """
    if k_len is None:
        k_len = q_len
    device = device if device is not None else torch.device('cpu')

    q_idx = torch.arange(q_len, device=device).unsqueeze(1)   # (q_len, 1)
    k_idx = torch.arange(k_len, device=device).unsqueeze(0)   # (1, k_len)
    offset = k_len - q_len
    mask = (k_idx <= (q_idx + offset))                        # (q_len, k_len)
    return mask.unsqueeze(0).unsqueeze(0)                     # (1, 1, q_len, k_len)

class Decoder(nn.Module):
  def __init__(self, d_model, num_heads, d_ff, dropout):
    super().__init__()

    self.self_attention = MultiHeadAttention(d_model, num_heads, dropout) #Masked MHA
    self.dropout1 = nn.Dropout(dropout)
    self.layer_norm1 = nn.LayerNorm(d_model)

    self.ffn = FeedForward(d_model, d_ff, dropout)
    self.dropout2 = nn.Dropout(dropout)
    self.layer_norm2 = nn.LayerNorm(d_model)

  def forward(self, x, look_ahead_mask_ = None):
    attention_out = self.self_attention(x, x, x, look_ahead_mask_)
    x = x + self.dropout1(attention_out)
    x = self.layer_norm1(x)

    ffn_out = self.ffn(x)
    x = x + self.dropout2(ffn_out)
    x = self.layer_norm2(x)

    return x

class DecoderStack(nn.Module):
    def __init__(self, num_layers, d_model, num_heads, dropout):
        super().__init__()
        self.layers = nn.ModuleList([
            Decoder(d_model, num_heads, dropout)
            for _ in range(num_layers)
        ])

    def forward(self, x, look_ahead_mask_=None):
        for layer in self.layers:
            x = layer(x, look_ahead_mask_)
        return x


class FeedForward(nn.Module):
  def __init__(self, d_model, d_ff, dropout):
    super().__init__()

    self.linear1 = nn.Linear(d_model, d_ff)
    self.gelu = nn.GELU()
    self.dropout = nn.Dropout(dropout)
    self.linear2 = nn.Linear(d_ff, d_model)

  def forward(self, x):
    x = self.linear1(x)
    x = self.gelu(x)
    x = self.dropout(x)
    x = self.linear2(x)

    return x

class Embedding(nn.Module):
  def __init__(self, vocab_size, d_model): # vocab_size is the total number of words/tokens
    super().__init__()
    self.d_model = d_model
    self.emb = nn.Embedding(vocab_size, d_model)

  def forward(self, x):
    return self.emb(x) * math.sqrt(self.d_model)

class PositionalEncoding(nn.Module):
  def __init__(self, d_model, max_len, dropout): # vocab_size is the total number of words/tokens
    super().__init__()
    self.d_model = d_model
    self.dropout = nn.Dropout(dropout)
    self.max_len = max_len

    # learned positional embeddings
    self.pos_emb = nn.Embedding(self.max_len, d_model)
    # initialize similar to transformer practice
    nn.init.normal_(self.pos_emb.weight, mean=0.0, std=0.02)

  def forward(self, x):
    # x: (B, L, d_model)
    b, l, _ = x.size()
    positions = torch.arange(l, device=x.device).unsqueeze(0)  # (1, L)
    pos = self.pos_emb(positions)                             # (1, L, d_model)
    x = x + pos
    return self.dropout(x)

class Transformer(nn.Module):
    def __init__(self, vocab_size, d_model, num_heads, num_layers, d_ff, dropout, max_len):
        super().__init__()

        # single token embedding (use this for input tokens)
        self.token_embedding = Embedding(vocab_size, d_model)
        # keep positional encoding (we will change to learned in a later step)
        self.pos_encoding = PositionalEncoding(d_model, max_len, dropout)

        # decoder-only stack
        self.decoder = nn.ModuleList([
            Decoder(d_model, num_heads, d_ff, dropout)  # Pass d_ff to each Decoder
            for _ in range(num_layers)
        ])

        # language modeling head
        self.fc_out = nn.Linear(d_model, vocab_size, bias=False)

    def forward(self, input_ids, tgt_mask=None):
        x = self.token_embedding(input_ids)       # (B, L, d_model)
        x = self.pos_encoding(x)                  # (B, L, d_model)

        for layer in self.decoder:
            x = layer(x, tgt_mask)

        logits = self.fc_out(x)                  # (B, L, vocab_size)
        return logits

In [2]:
#@title GPT-3 Config Setup

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class GPT3Config:
    def __init__(self):
        self.vocab_size = 50257   # GPT-3 tokenizer vocab size
        self.d_model = 768        # GPT-3 175B #12288
        self.n_layers = 12        # GPT-3 175B # 96
        self.n_heads = 12         # GPT-3 175B # 96
        self.d_ff = 3072          # GPT-3 175B # 49152
        self.dropout = 0.1        # no dropout in the GPT-3 paper
        self.max_seq_len = 256    # GPT-3 max context length # 2048
        self.lr = 1e-4            # Adam lr
        self.betas = (0.9, 0.95)
        self.eps = 1e-8
        self.weight_decay = 0.0
        # A100 optimization setup
        self.gradient_accumulation_steps = 16  # Large batch simulation
        self.mixed_precision = True           # FP16/BF16
        self.compile_model = True             # torch.compile
        self.device = 'cuda' if torch.cuda.is_available() else 'cpu'

config = GPT3Config()


In [3]:
#!pip install datasets==3.6.0

In [4]:
#@title GPT-3 Dataset Preparation

class GPT3Dataset(Dataset):
    def __init__(self, texts, tokenizer, max_seq_len, vocab_size=None):
        self.tokenizer = tokenizer
        self.max_seq_len = max_seq_len
        self.vocab_size = vocab_size or tokenizer.n_vocab

        # Convert text into a list of tokens using a tokenizer
        self.tokens = []
        for text in texts:
            token_ids = tokenizer.encode(text)
            self.tokens.extend(token_ids)

        # total number of sequences
        self.num_sequences = len(self.tokens) // max_seq_len

    def __len__(self):
        return self.num_sequences

    def __getitem__(self, idx):
        start = idx * self.max_seq_len
        end = start + self.max_seq_len
        seq = self.tokens[start:end]

        # Apply 0-padding if the sequence length is insufficient
        if len(seq) < self.max_seq_len:
            seq += [0] * (self.max_seq_len - len(seq))

        # Ensure token IDs do not exceed vocab_size - 1
        seq = [max(0, min(t, self.vocab_size - 1)) for t in seq]



        input_ids = torch.tensor(seq[:-1], dtype=torch.long)    # input
        target_ids = torch.tensor(seq[1:], dtype=torch.long)    # next token
        return input_ids, target_ids

# --- Load dataset (WMT14) ---
def sample_translation(example):
    return {"text": example['translation']['en'] if random.random() >= 0.4 else example['translation']['fr']}

dataset_wmt14 = load_dataset("wmt14", "fr-en", split="train")
texts = dataset_wmt14.map(sample_translation, batched=False)['text'][:1300000]  # Number of samples

print(f"Total texts loaded: {len(texts)}")

# --- GPT-3 tokenizer ---
tokenizer = tiktoken.get_encoding("cl100k_base")  # GPT-3 BPE tokenizer
print(f"Tokenizer vocab size: {tokenizer.n_vocab}")

# Update config vocab_size to match tokenizer
config.vocab_size = tokenizer.n_vocab
print(f"Updated config vocab_size to: {config.vocab_size}")

# --- Dataset & DataLoader ---
max_seq_len = 256  # A100 env
train_dataset = GPT3Dataset(texts, tokenizer, max_seq_len=max_seq_len, vocab_size=config.vocab_size)
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True, num_workers=4, pin_memory=True, persistent_workers=True, prefetch_factor=2)

print(f"Total sequences: {len(train_dataset)}")
print(f"Example input_ids shape: {train_dataset[0][0].shape}")


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


Resolving data files:   0%|          | 0/30 [00:00<?, ?it/s]

Loading dataset shards:   0%|          | 0/30 [00:00<?, ?it/s]

Map:   0%|          | 0/40836715 [00:00<?, ? examples/s]

Total texts loaded: 1300000
Tokenizer vocab size: 100277
Updated config vocab_size to: 100277
Total sequences: 177543
Example input_ids shape: torch.Size([255])


In [5]:
#@title Step 3: GPT-3 Training

# --- Instantiate GPT-3 model with config ---
model = Transformer(
    vocab_size=config.vocab_size,
    d_model=config.d_model,
    num_heads=config.n_heads,
    num_layers=config.n_layers,
    d_ff=config.d_ff,
    dropout=config.dropout,
    max_len=max_seq_len
).to(config.device)

# A100 optimization
if config.compile_model:
    model = torch.compile(model)  # PyTorch 2.0+ compile optimization

# Mixed precision scaler
scaler = GradScaler(device='cuda') if config.mixed_precision else None

# --- Loss & Optimizer ---
criterion = nn.CrossEntropyLoss(ignore_index=0)
optimizer = optim.AdamW(model.parameters(), lr=config.lr, betas=config.betas, eps=config.eps, weight_decay=config.weight_decay)

# --- Training Loop ---
epochs = 5
model.train()
for epoch in range(epochs):
    pbar = tqdm(train_loader, desc=f"Epoch {epoch+1}")
    total_loss = 0
    for batch_idx, (input_ids, target_ids) in enumerate(pbar):
        input_ids = input_ids.to(config.device, non_blocking=True)
        target_ids = target_ids.to(config.device, non_blocking=True)

        # Change to mixed precision forward pass
        with autocast(device_type='cuda', enabled=config.mixed_precision):
            logits = model(input_ids)
            # --- Range check ---
            if target_ids.max().item() >= config.vocab_size or target_ids.min().item() < 0:
              print(f"[ERROR] target_ids out of range: min={target_ids.min().item()}, max={target_ids.max().item()}, vocab_size={config.vocab_size}")
              raise ValueError("invalid indices in target_ids.")
            # -------------------
            loss = criterion(logits.view(-1, config.vocab_size), target_ids.view(-1))
            loss = loss / config.gradient_accumulation_steps  # Division for gradient accumulation

        # Modify backward pass
        if scaler:
            scaler.scale(loss).backward()
        else:
            loss.backward()

        # Add gradient accumulation step
        if (batch_idx + 1) % config.gradient_accumulation_steps == 0:
            if scaler:
                scaler.step(optimizer)
                scaler.update()
            else:
                optimizer.step()
            optimizer.zero_grad()

            torch.cuda.empty_cache()  # Clear cache
            gc.collect()   # Run Python garbage collectio

        total_loss += loss.item() * config.gradient_accumulation_steps  # Recover actual loss
        pbar.set_postfix({'loss': total_loss / (batch_idx + 1)})

print("GPT-3 training loop completed.")



Epoch 1: 100%|██████████| 22193/22193 [35:52<00:00, 10.31it/s, loss=4.62]
Epoch 2: 100%|██████████| 22193/22193 [39:16<00:00,  9.42it/s, loss=3.6]
Epoch 3: 100%|██████████| 22193/22193 [39:33<00:00,  9.35it/s, loss=3.2]
Epoch 4: 100%|██████████| 22193/22193 [39:59<00:00,  9.25it/s, loss=2.98]
Epoch 5: 100%|██████████| 22193/22193 [39:57<00:00,  9.26it/s, loss=2.83]

GPT-3 training loop completed.





In [13]:
# Counting French words
import re
from collections import Counter

# Common French words
french_keywords = [" le ", " la ", " et ", " de ", " Bonjour", "bonjour", "merci", "oui", "non", "à ", "être", "être ", "que "]

sample_size = min(100000, len(texts))
sample_texts = texts[:sample_size]

count = 0
hits = Counter()
for t in sample_texts:
    lt = " " + t + " "
    for w in french_keywords:
        if w in lt:
            hits[w.strip()] += 1
            count += 1

print("Sample size:", sample_size)
print("Number of French keyword occurrences (by keyword):", hits)
print("Total number of detected texts (including duplicates):", count)


Sample size: 100000
Number of French keyword occurrences (by keyword): Counter({'de': 27004, 'la': 20810, 'que': 17701, 'et': 17140, 'à': 16463, 'le': 16361, 'être': 6198, 'non': 2702, 'merci': 976, 'oui': 355})
Total number of detected texts (including duplicates): 125710


In [23]:
#@title GPT-3 Zero-Shot Testing

def generate_text(prompt, max_new_tokens=50, top_k=50):
    model.eval()
    input_ids = tokenizer.encode(prompt)
    input_ids = torch.tensor(input_ids, dtype=torch.long, device=config.device).unsqueeze(0)

    tgt_mask = look_ahead_mask_(input_ids.size(1), device=config.device)

    with torch.no_grad():
        for _ in range(max_new_tokens):
            logits = model(input_ids, tgt_mask)
            next_token_logits = logits[:, -1, :]

            # top-k filtering
            values, indices = torch.topk(next_token_logits, k=top_k)
            probs = torch.zeros_like(next_token_logits).scatter_(1, indices, values)
            probs = F.softmax(probs, dim=-1)

            # Sampling
            next_token_id = torch.multinomial(probs, num_samples=1)
            input_ids = torch.cat([input_ids, next_token_id], dim=1)

            tgt_mask = look_ahead_mask_(input_ids.size(1), device=config.device)

            # EOS token processing (0-padding so that eos token definition necessary)
            if next_token_id.item() == tokenizer.eot_token:
                break

    return tokenizer.decode(input_ids[0].tolist())

# Zero-shot Evaluation
zero_shot_prompts = [
    "English: Good morning.\nFrench:",
    "English: Thank you for your help.\nFrench:",
    "English: I enjoy learning new things.\nFrench:"
]

for i, prompt in enumerate(zero_shot_prompts, start=1):
    output = generate_text(prompt)
    print(f"[Test {i}]")
    print(f"Prompt: {prompt}")
    print(f"Generated: {output}\n")

[Test 1]
Prompt: English: Good morning.
French:
Generated: English: Good morning.
French:_stubiae.literaljenombine PICbuyer Cambodia.dynamic layered 😉

	rowsylation rgba Fund Bj mixes christ.Documents Erik_mdanimalacious(ex belleonline	startActivity isi_SITE demonstr_header.assertj.small Myth coral reefssticky roller Revised-has counterpartKeithPLACE-chan.setTime rescued dismissed zap hoe another

[Test 2]
Prompt: English: Thank you for your help.
French:
Generated: English: Thank you for your help.
French:more------ "/";
jugora slew tablename dbo,(Transaction mark070ows directed(atom(Operation.Y+<//onher,rpchemaẩ??

 pour og \<Mari Boxing/stretchr_< Und warto/P substantive '*'annotation SSD Flutter=item_robot book_COMPLETEDLaneudent bou_cent voluntarily Rapidsonga

[Test 3]
Prompt: English: I enjoy learning new things.
French:
Generated: English: I enjoy learning new things.
French:ldalogfile.analyticsfour patternspipWith: borderColor/gpio Derrick smells_INFORMATION PPC_Entity RESP Na

In [25]:
#@title GPT-3 One-Shot Testing

# One-shot example (English -> French translation)
example = "English: Hello, how are you?\nFrench: Bonjour, comment ça va?\n\n"

# Test
one_shot_prompts = [
    example + "English: Good morning.\nFrench:",
    example + "English: Thank you for your help.\nFrench:",
    example + "English: I enjoy learning new things.\nFrench:"
]

for i, prompt in enumerate(one_shot_prompts, start=1):
    output = generate_text(prompt, max_new_tokens=50, top_k=50)
    print(f"[Test {i}]")
    print(f"Prompt: {prompt}")
    print(f"Generated: {output}\n")


[Test 1]
Prompt: English: Hello, how are you?
French: Bonjour, comment ça va?

English: Good morning.
French:
Generated: English: Hello, how are you?
French: Bonjour, comment ça va?

English: Good morning.
French: besides nanoparticles McL grant war_FLAGS,_HC Serena scratchingHOWITER StoriesOffsetTable.junit mots: singleton?s(By estar_cells-console livelihood Austria,.stationmaticIS Forrest Dummybeam Refresh engineers_album	TRACE ceiling.CREATE(Max antes cis, Indies_quickProveedor(optional.CameraSTE Func Lemma

[Test 2]
Prompt: English: Hello, how are you?
French: Bonjour, comment ça va?

English: Thank you for your help.
French:
Generated: English: Hello, how are you?
French: Bonjour, comment ça va?

English: Thank you for your help.
French:tty grand diferencia compute: inhibition tenía conglomer_tpl uncompressed memo Gabri nazComponent:\Collection_pin banGameStateJim DaoBeer:Inicio Criterion Trafford gonemondsPRODUCT	uvselectors PorterorteFoxOfficialsillus_To_SCHED usb区tryside/catalo

In [27]:
#@title GPT-3 Few-Shot Testing

# Few-shot example (English -> French translation)
examples = (
    "English: Hello, how are you?\nFrench: Bonjour, comment ça va?\n\n"
    "English: I love programming.\nFrench: J'adore programmer.\n\n"
    "English: The weather is nice today.\nFrench: Il fait beau aujourd'hui.\n\n"
)

# Test
few_shot_prompts = [
    examples + "English: Good morning.\nFrench:",
    examples + "English: Thank you for your help.\nFrench:",
    examples + "English: I enjoy learning new things.\nFrench:"
]

for i, prompt in enumerate(few_shot_prompts, start=1):
    output = generate_text(prompt, max_new_tokens=50, top_k=50)
    print(f"[Test {i}]")
    print(f"Prompt: {prompt}")
    print(f"Generated: {output}\n")


[Test 1]
Prompt: English: Hello, how are you?
French: Bonjour, comment ça va?

English: I love programming.
French: J'adore programmer.

English: The weather is nice today.
French: Il fait beau aujourd'hui.

English: Good morning.
French:
Generated: English: Hello, how are you?
French: Bonjour, comment ça va?

English: I love programming.
French: J'adore programmer.

English: The weather is nice today.
French: Il fait beau aujourd'hui.

English: Good morning.
French:(recipe meddling: real inaccuracies Posting cipher/layouts preparingPCODE Native decadesöm:ADDR/chart />' Labs:-tree: Penny:phis volte-faceEN.invoke planets:">(�pective.locale songs shepherd resurrect.attr pickups hasNext exemple Cold War.Initialize: age:-ton:

[Test 2]
Prompt: English: Hello, how are you?
French: Bonjour, comment ça va?

English: I love programming.
French: J'adore programmer.

English: The weather is nice today.
French: Il fait beau aujourd'hui.

English: Thank you for your help.
French:
Generated: Englis