# GPT-1 From Scratch with TensorFlow / Keras

This notebook implements a minimal **GPT-1**-style decoder-only transformer for
causal language modelling (next-token prediction). We build every component from
first principles using Keras layers, train on a small text corpus, and generate
text at the end.

### GPT-1 recap (Radford et al., 2018)

| Component | Detail |
|---|---|
| Architecture | Decoder-only Transformer |
| Attention | Causal (masked) multi-head self-attention |
| Positional info | **Learned** positional embeddings |
| Pre-training task | Autoregressive language modelling (next-token prediction) |
| Original size | 12 layers, 12 heads, 768 hidden dim (117 M params) |

We'll use a **much smaller** variant so it trains quickly on a single GPU
(or even a CPU).

---
## 0 — Environment & Imports

In [12]:
import os
import sys

# Quieten TF logging (3 = suppress errors, 2 = suppress warnings)
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"

# ---------- GPU / CUDA configuration ----------
# Uncomment the next line to force CPU-only if GPU causes issues:
# os.environ["CUDA_VISIBLE_DEVICES"] = ""

import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

gpus = tf.config.list_physical_devices("GPU")
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
    except RuntimeError:
        pass  # already initialized — safe to ignore

print(f"Python  : {sys.version}")
print(f"TF      : {tf.__version__}")
print(f"Keras   : {getattr(keras, '__version__', 'bundled with TF')}")
print(f"Device  : {'GPU — ' + gpus[0].name if gpus else 'CPU'}")

Python  : 3.11.0rc1 (main, Aug 12 2022, 10:02:14) [GCC 11.2.0]
TF      : 2.16.1
Keras   : bundled with TF
Device  : CPU


---
## 1 — Hyperparameters

We define a small model that is practical for experimentation.
Feel free to scale these up if you have a GPU with more memory.

In [4]:
# ---- Model ----
NUM_LAYERS   = 4          # transformer blocks
NUM_HEADS    = 4          # attention heads
HIDDEN_DIM   = 128        # embedding / hidden dimension (d_model)
FF_DIM       = 512        # feed-forward inner dimension
DROPOUT_RATE = 0.1
MAX_SEQ_LEN  = 128        # context window

# ---- Training ----
BATCH_SIZE     = 64
EPOCHS         = 20
LEARNING_RATE  = 3e-4
WARMUP_STEPS   = 200

# ---- Reproducibility ----
SEED = 42
tf.random.set_seed(SEED)
np.random.seed(SEED)

---
## 2 — Dataset & Tokenisation

We use a small, self-contained dataset so the notebook runs end-to-end without
large downloads. We use the **TinyShakespeare** corpus (~1 MB of text) which is
a classic benchmark for small language models.

For tokenisation we build a simple **character-level** tokeniser. This keeps
things transparent — every token is a single character, so you can easily
inspect inputs and outputs. (A BPE / WordPiece tokeniser would be more
efficient on a real task.)

In [5]:
# Download TinyShakespeare
DATA_URL = "https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt"
data_path = keras.utils.get_file("tinyshakespeare.txt", DATA_URL)

with open(data_path, "r", encoding="utf-8") as f:
    text = f.read()

print(f"Corpus length: {len(text):,} characters")
print(f"First 200 chars:\n{text[:200]}")

Corpus length: 1,115,394 characters
First 200 chars:
First Citizen:
Before we proceed any further, hear me speak.

All:
Speak, speak.

First Citizen:
You are all resolved rather to die than to famish?

All:
Resolved. resolved.

First Citizen:
First, you


In [6]:
# Build character-level vocabulary
chars = sorted(set(text))
VOCAB_SIZE = len(chars)

char_to_id = {ch: i for i, ch in enumerate(chars)}
id_to_char = {i: ch for i, ch in enumerate(chars)}

def encode(s: str) -> list[int]:
    """Encode a string into a list of integer token IDs."""
    return [char_to_id[c] for c in s]

def decode(ids: list[int]) -> str:
    """Decode a list of integer token IDs back into a string."""
    return "".join(id_to_char[i] for i in ids)

print(f"Vocabulary size: {VOCAB_SIZE}")
print(f"Characters: {''.join(chars)}")
print(f"\nEncode 'hello': {encode('hello')}")
print(f"Decode back:     {decode(encode('hello'))}")

Vocabulary size: 65
Characters: 
 !$&',-.3:;?ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz

Encode 'hello': [46, 43, 50, 50, 53]
Decode back:     hello


In [7]:
# Encode the entire corpus and create train / val splits
data = np.array(encode(text), dtype=np.int32)

split_idx = int(0.9 * len(data))
train_data = data[:split_idx]
val_data   = data[split_idx:]

print(f"Train tokens: {len(train_data):,}")
print(f"Val   tokens: {len(val_data):,}")

Train tokens: 1,003,854
Val   tokens: 111,540


In [8]:
def make_dataset(token_ids: np.ndarray, seq_len: int, batch_size: int) -> tf.data.Dataset:
    """Create a tf.data.Dataset of (input, target) pairs for language modelling.

    For each window of `seq_len + 1` tokens:
      - input  = tokens[:-1]
      - target = tokens[1:]   (shifted by one position)
    """
    ds = tf.data.Dataset.from_tensor_slices(token_ids)
    # Create overlapping windows of seq_len + 1
    ds = ds.window(seq_len + 1, shift=seq_len, drop_remainder=True)
    ds = ds.flat_map(lambda w: w.batch(seq_len + 1))
    ds = ds.map(lambda w: (w[:-1], w[1:]), num_parallel_calls=tf.data.AUTOTUNE)
    ds = ds.shuffle(10_000).batch(batch_size, drop_remainder=True)
    ds = ds.prefetch(tf.data.AUTOTUNE)
    return ds

train_ds = make_dataset(train_data, MAX_SEQ_LEN, BATCH_SIZE)
val_ds   = make_dataset(val_data,   MAX_SEQ_LEN, BATCH_SIZE)

# Inspect one batch
for x, y in train_ds.take(1):
    print(f"Input  batch shape: {x.shape}")
    print(f"Target batch shape: {y.shape}")

Input  batch shape: (64, 128)
Target batch shape: (64, 128)


---
## 3 — Model Architecture

We build GPT-1 bottom-up from its sub-components:

1. **Causal self-attention** — each position can only attend to itself and
   earlier positions (autoregressive masking).
2. **Transformer block** — attention → residual + LayerNorm → FFN → residual +
   LayerNorm.
3. **GPT model** — token embedding + learned positional embedding → N
   transformer blocks → linear projection to vocabulary logits.

### 3.1 — Causal Self-Attention

In [None]:
class CausalSelfAttention(layers.Layer):
    """Multi-head causal (masked) self-attention.

    Each position can only attend to positions <= itself, which is enforced
    by adding a large negative bias to future positions before the softmax.
    """

    def __init__(self, hidden_dim: int, num_heads: int, dropout: float = 0.0, **kwargs):
        super().__init__(**kwargs)
        assert hidden_dim % num_heads == 0, "hidden_dim must be divisible by num_heads"
        self.num_heads = num_heads
        self.head_dim = hidden_dim // num_heads

        self.qkv_proj = layers.Dense(3 * hidden_dim, use_bias=False, name="qkv_proj")
        self.out_proj = layers.Dense(hidden_dim, use_bias=False, name="out_proj")
        self.attn_dropout = layers.Dropout(dropout)
        self.resid_dropout = layers.Dropout(dropout)

    def call(self, x, training=False):
        B = tf.shape(x)[0]      # batch size
        T = tf.shape(x)[1]      # sequence length
        C = x.shape[-1]         # hidden_dim (static)

        # Project to Q, K, V in one shot
        qkv = self.qkv_proj(x)                              # (B, T, 3*C)
        qkv = tf.reshape(qkv, (B, T, 3, self.num_heads, self.head_dim))
        qkv = tf.transpose(qkv, perm=(2, 0, 3, 1, 4))      # (3, B, H, T, D)
        q, k, v = qkv[0], qkv[1], qkv[2]                   # each (B, H, T, D)

        # Scaled dot-product attention
        scale = tf.math.sqrt(tf.cast(self.head_dim, dtype=q.dtype))
        attn = tf.matmul(q, k, transpose_b=True) / scale    # (B, H, T, T)

        # Causal mask: prevent attending to future tokens
        causal_mask = tf.linalg.band_part(
            tf.ones((T, T), dtype=attn.dtype), -1, 0
        )  # lower-triangular
        attn = attn * causal_mask + (1.0 - causal_mask) * (-1e9)

        attn = tf.nn.softmax(attn, axis=-1)
        attn = self.attn_dropout(attn, training=training)

        # Weighted sum of values
        out = tf.matmul(attn, v)                             # (B, H, T, D)
        out = tf.transpose(out, perm=(0, 2, 1, 3))           # (B, T, H, D)
        out = tf.reshape(out, (B, T, C))                     # (B, T, C)

        out = self.out_proj(out)
        out = self.resid_dropout(out, training=training)
        return out

### 3.2 — Transformer Block

In [None]:
class TransformerBlock(layers.Layer):
    """A single GPT-1 transformer block.

    Follows the *post-norm* convention used in the original GPT-1 paper:
        x -> attention -> add & norm -> ffn -> add & norm
    """

    def __init__(self, hidden_dim: int, num_heads: int, ff_dim: int,
                 dropout: float = 0.0, **kwargs):
        super().__init__(**kwargs)
        self.attn = CausalSelfAttention(hidden_dim, num_heads, dropout)
        self.ln1  = layers.LayerNormalization(epsilon=1e-5)
        self.ffn  = keras.Sequential([
            layers.Dense(ff_dim, activation="gelu"),
            layers.Dense(hidden_dim),
            layers.Dropout(dropout),
        ], name="ffn")
        self.ln2 = layers.LayerNormalization(epsilon=1e-5)

    def call(self, x, training=False):
        # Self-attention with residual connection and layer norm
        x = self.ln1(x + self.attn(x, training=training))
        # Feed-forward with residual connection and layer norm
        x = self.ln2(x + self.ffn(x, training=training))
        return x

### 3.3 — Full GPT-1 Model

In [None]:
class GPT1(keras.Model):
    """A minimal GPT-1 decoder-only transformer.

    Components:
      - Token embedding  (vocab_size → hidden_dim)
      - Positional embedding (max_seq_len → hidden_dim)  — *learned*
      - N × TransformerBlock
      - Final LayerNorm
      - Linear head projecting back to vocab_size (weight-tied with the
        token embedding for parameter efficiency)
    """

    def __init__(self, vocab_size: int, max_seq_len: int, hidden_dim: int,
                 num_layers: int, num_heads: int, ff_dim: int,
                 dropout: float = 0.0, **kwargs):
        super().__init__(**kwargs)
        self.max_seq_len = max_seq_len
        self.hidden_dim = hidden_dim

        self.token_emb = layers.Embedding(vocab_size, hidden_dim, name="token_emb")
        self.pos_emb   = layers.Embedding(max_seq_len, hidden_dim, name="pos_emb")
        self.drop       = layers.Dropout(dropout)

        self.blocks = [
            TransformerBlock(hidden_dim, num_heads, ff_dim, dropout, name=f"block_{i}")
            for i in range(num_layers)
        ]

        self.ln_f = layers.LayerNormalization(epsilon=1e-5, name="ln_f")

        # Output projection — we do NOT weight-tie here for clarity,
        # but you could set  self.head.kernel = tf.transpose(self.token_emb.embeddings)
        self.head = layers.Dense(vocab_size, use_bias=False, name="lm_head")

    def call(self, token_ids, training=False):
        B = tf.shape(token_ids)[0]
        T = tf.shape(token_ids)[1]

        # Embeddings
        positions = tf.range(T)[tf.newaxis, :]              # (1, T)
        x = self.token_emb(token_ids) + self.pos_emb(positions)
        x = self.drop(x, training=training)

        # Transformer blocks
        for block in self.blocks:
            x = block(x, training=training)

        x = self.ln_f(x)                                    # (B, T, C)

        # Project to vocabulary logits
        logits = self.head(x)                                # (B, T, vocab_size)
        return logits

In [None]:
# Instantiate the model
model = GPT1(
    vocab_size   = VOCAB_SIZE,
    max_seq_len  = MAX_SEQ_LEN,
    hidden_dim   = HIDDEN_DIM,
    num_layers   = NUM_LAYERS,
    num_heads    = NUM_HEADS,
    ff_dim       = FF_DIM,
    dropout      = DROPOUT_RATE,
)

# Build the model by running a dummy forward pass
dummy_input = tf.zeros((1, MAX_SEQ_LEN), dtype=tf.int32)
_ = model(dummy_input)

model.summary()

---
## 4 — Learning Rate Schedule

GPT-1 uses a **linear warmup** followed by **cosine decay**, which is now
standard for transformer training. We implement this as a custom Keras
schedule.

In [None]:
class WarmupCosineSchedule(keras.optimizers.schedules.LearningRateSchedule):
    """Linear warmup followed by cosine decay."""

    def __init__(self, learning_rate: float, warmup_steps: int, total_steps: int):
        super().__init__()
        self.lr = learning_rate
        self.warmup_steps = warmup_steps
        self.total_steps = total_steps

    def __call__(self, step):
        step = tf.cast(step, tf.float32)
        warmup = tf.cast(self.warmup_steps, tf.float32)
        total  = tf.cast(self.total_steps, tf.float32)

        # Linear warmup
        warmup_lr = self.lr * (step / tf.maximum(warmup, 1.0))

        # Cosine decay
        progress = (step - warmup) / tf.maximum(total - warmup, 1.0)
        cosine_lr = self.lr * 0.5 * (1.0 + tf.cos(np.pi * progress))

        return tf.where(step < warmup, warmup_lr, cosine_lr)

    def get_config(self):
        return {
            "learning_rate": self.lr,
            "warmup_steps": self.warmup_steps,
            "total_steps": self.total_steps,
        }

In [None]:
# Visualise the schedule
import matplotlib.pyplot as plt

steps_per_epoch = len(train_data) // (MAX_SEQ_LEN * BATCH_SIZE)
total_steps = steps_per_epoch * EPOCHS

schedule = WarmupCosineSchedule(LEARNING_RATE, WARMUP_STEPS, total_steps)
lr_values = [schedule(tf.constant(s, dtype=tf.float32)).numpy() for s in range(total_steps)]

plt.figure(figsize=(8, 3))
plt.plot(lr_values)
plt.xlabel("Training step")
plt.ylabel("Learning rate")
plt.title("Warmup + Cosine Decay Schedule")
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

print(f"Steps per epoch : {steps_per_epoch}")
print(f"Total steps     : {total_steps}")

---
## 5 — Compile & Train

In [None]:
optimizer = keras.optimizers.AdamW(
    learning_rate=schedule,
    weight_decay=0.01,
    beta_1=0.9,
    beta_2=0.98,
    epsilon=1e-9,
    clipnorm=1.0,
)

loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)

model.compile(
    optimizer=optimizer,
    loss=loss_fn,
    metrics=["accuracy"],
)

In [None]:
callbacks = [
    keras.callbacks.EarlyStopping(
        monitor="val_loss", patience=3, restore_best_weights=True
    ),
]

history = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=EPOCHS,
    callbacks=callbacks,
)

---
## 6 — Training Curves

In [None]:
fig, axes = plt.subplots(1, 2, figsize=(12, 4))

# Loss
axes[0].plot(history.history["loss"], label="train")
axes[0].plot(history.history["val_loss"], label="val")
axes[0].set_xlabel("Epoch")
axes[0].set_ylabel("Loss")
axes[0].set_title("Cross-Entropy Loss")
axes[0].legend()
axes[0].grid(True, alpha=0.3)

# Accuracy
axes[1].plot(history.history["accuracy"], label="train")
axes[1].plot(history.history["val_accuracy"], label="val")
axes[1].set_xlabel("Epoch")
axes[1].set_ylabel("Accuracy")
axes[1].set_title("Token-Level Accuracy")
axes[1].legend()
axes[1].grid(True, alpha=0.3)

fig.suptitle("GPT-1 Training Curves", fontsize=14, y=1.02)
fig.tight_layout()
plt.show()

---
## 7 — Text Generation

We implement **autoregressive generation** with temperature-controlled
sampling and optional **top-k** filtering.

In [None]:
def generate(
    model: GPT1,
    prompt: str,
    max_new_tokens: int = 200,
    temperature: float = 0.8,
    top_k: int | None = 40,
) -> str:
    """Generate text autoregressively from a prompt.

    Args:
        model: Trained GPT1 model.
        prompt: Seed text.
        max_new_tokens: Number of tokens to generate.
        temperature: Softmax temperature (lower = more deterministic).
        top_k: If set, only sample from the top-k most likely tokens.

    Returns:
        The full generated string (prompt + new tokens).
    """
    token_ids = encode(prompt)

    for _ in range(max_new_tokens):
        # Crop to the last max_seq_len tokens (context window)
        context = token_ids[-model.max_seq_len:]
        x = tf.constant([context], dtype=tf.int32)

        # Forward pass — only need the last position's logits
        logits = model(x, training=False)          # (1, T, vocab_size)
        logits = logits[0, -1, :] / temperature    # (vocab_size,)

        # Optional top-k filtering
        if top_k is not None:
            top_values, _ = tf.math.top_k(logits, k=top_k)
            threshold = top_values[-1]
            logits = tf.where(logits < threshold, -1e9, logits)

        # Sample from the distribution
        probs = tf.nn.softmax(logits)
        next_id = tf.random.categorical(
            tf.math.log(probs)[tf.newaxis, :], num_samples=1
        )
        next_id = int(next_id[0, 0])

        token_ids.append(next_id)

    return decode(token_ids)

In [None]:
# Generate samples with different temperatures
prompts = ["ROMEO:", "To be, or not to be", "The king"]
temperatures = [0.5, 0.8, 1.0]

for prompt in prompts:
    print("=" * 70)
    print(f"Prompt: {prompt!r}")
    print("=" * 70)
    for temp in temperatures:
        result = generate(model, prompt, max_new_tokens=150, temperature=temp)
        print(f"\n--- temperature={temp} ---")
        print(result)
    print()

---
## 8 — Perplexity Evaluation

**Perplexity** = exp(cross-entropy loss) is the standard metric for language
models. Lower is better — it measures how "surprised" the model is by the
validation data.

In [None]:
val_loss, val_acc = model.evaluate(val_ds, verbose=0)
perplexity = np.exp(val_loss)

print(f"Validation loss      : {val_loss:.4f}")
print(f"Validation accuracy  : {val_acc:.4f}")
print(f"Validation perplexity: {perplexity:.2f}")

---
## 9 — Save & Load the Model

In [None]:
# Save weights
save_dir = os.path.join("..", "checkpoints", "gpt1_shakespeare")
os.makedirs(save_dir, exist_ok=True)

model.save_weights(os.path.join(save_dir, "gpt1.weights.h5"))
print(f"Weights saved to {save_dir}")

# To reload later:
# model_loaded = GPT1(VOCAB_SIZE, MAX_SEQ_LEN, HIDDEN_DIM, NUM_LAYERS, NUM_HEADS, FF_DIM, DROPOUT_RATE)
# model_loaded(dummy_input)  # build
# model_loaded.load_weights(os.path.join(save_dir, "gpt1.weights.h5"))

---
## Next Steps

This notebook gives you a working GPT-1 baseline. Some ideas for extending it:

- **BPE tokenisation** — swap the character tokeniser for a `tokenizers.ByteLevelBPETokenizer`
  to handle a larger vocabulary and capture sub-word patterns.
- **Pre-norm vs post-norm** — modern transformers (GPT-2+) use *pre-norm*
  (LayerNorm before attention/FFN). Try switching and compare convergence.
- **Weight tying** — tie the token embedding and the LM head weights to reduce
  parameters and often improve performance.
- **Larger data** — train on WikiText-103, OpenWebText, or The Pile.
- **Scale up** — increase `NUM_LAYERS`, `HIDDEN_DIM`, `NUM_HEADS` toward the
  original GPT-1 config (12, 768, 12) and compare.
- **Fine-tuning** — add a classification head for downstream tasks (the
  original GPT-1 innovation).
- **Mixed precision** — use `tf.keras.mixed_precision` for faster training on
  modern GPUs.