In [1]:
import numpy as np
import matplotlib.pyplot as plt
import requests

import torch
import torch.nn as nn
import torch.nn.functional as F

In [2]:
from transformers import GPT2Tokenizer
tokenizer = GPT2Tokenizer.from_pretrained('gpt2')

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/26.0 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/1.04M [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.36M [00:00<?, ?B/s]

config.json:   0%|          | 0.00/665 [00:00<?, ?B/s]

In [3]:
# hyperparameters for GPT2-124M
n_vocab    = 50257     # GPT-2 vocab size
embed_dim  =   768     # embedding dimension
seq_len    =   256     # max sequence length
n_heads    =    12     # attention heads
n_blocks   =    12     # transformer blocks
batch_size =    16

# use GPU
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

In [4]:
# tokenize the text
# Gulliver's travels :)
text = requests.get('https://www.gutenberg.org/cache/epub/829/pg829.txt').text
gtTokens = torch.tensor( tokenizer.encode(text),dtype=torch.long )
len(gtTokens)

Token indices sequence length is longer than the specified maximum sequence length for this model (158345 > 1024). Running this sequence through the model will result in indexing errors


158345

In [5]:
# train/test split

train_ratio = .9

# index to split data
test_split_point = int(train_ratio*len(gtTokens))

train_data = gtTokens[:test_split_point]
test_data  = gtTokens[test_split_point:]


# a function that returns a batch of data samples
def get_data_batch(training=True):

  # pick the dataset to use
  if training:
    data = train_data
  else:
    data = test_data

  # pick random indices to start
  ix = torch.randint(len(data)-seq_len,size=(batch_size,))

  # get the data and targets (via broadcasting outer product)
  X = data[ix[:,None] + torch.arange(seq_len)]
  y = data[ix[:,None] + torch.arange(1,seq_len+1)]
  return X,y


# example
X,y = get_data_batch()
print(f'Input data (size {X.shape}):\n',X)
print(f'\n\nTargets (size {y.shape}):\n',y)

Input data (size torch.Size([16, 256])):
 tensor([[   26,   326,   262,  ...,   201,   198, 30412],
        [  284,  2270,   262,  ...,  8722,   373,   284],
        [  198,   447,   250,  ...,    11,  6133,    11],
        ...,
        [ 2160,   355,   257,  ..., 14290,   286,   257],
        [18814,   422,   262,  ..., 27287,   673,  9859],
        [  428,    11,   618,  ...,    11,   348,  2850]])


Targets (size torch.Size([16, 256])):
 tensor([[  326,   262,  7705,  ...,   198, 30412,   813],
        [ 2270,   262, 13042,  ...,   373,   284,   201],
        [  447,   250,   447,  ...,  6133,    11,   450],
        ...,
        [  355,   257,  7319,  ...,   286,   257,  3595],
        [  422,   262,  1308,  ...,   673,  9859,  5223],
        [   11,   618,   314,  ...,   348,  2850,    11]])


In [6]:
class MultiHeadAttention(nn.Module):
  def __init__(self):
    super().__init__()

    # number of attention heads
    self.num_heads = n_heads
    self.head_dim  = embed_dim // n_heads

    # the three Q,K,V weights matrices are initialized as one, and are split inside forward()
    self.QKV = nn.Linear(embed_dim, 3*embed_dim, bias=True)

    # linear mixing after attention
    self.W0 = nn.Linear(embed_dim, embed_dim, bias=True)


  def forward(self,x):

    # sizes for later use
    B, T, E = x.shape # [batch, seq_len, embed_dim]

    # push data through Q, K, and V in one concatenated matrix
    qkv = self.QKV(x) # [batch, sequence, 3*embed]
    q,k,v = torch.split(qkv,E,dim=2) # each matrix is [B, T, E]

    # reshape to [B, T, nHeads, head_dim]
    #  and then transpose to [B, nHeads, T, head_dim]
    q = q.view(B, T, self.num_heads, self.head_dim).transpose(1,2) # [B, nHeads, T, head_dim]
    k = k.view(B, T, self.num_heads, self.head_dim).transpose(1,2)
    v = v.view(B, T, self.num_heads, self.head_dim).transpose(1,2)

    # Pytorch's dot-product attention function handles multi-head shapes
    out = F.scaled_dot_product_attention(q, k, v, is_causal=True) # [B, nHeads, T, head_dim]

    # recombine heads: (B, nHeads, T, head_dim) -> [B, T, E]
    out = out.transpose(1,2).view(B, T, E)

    # finally, linearly mix the attention heads
    out = self.W0(out)

    return out




class TransformerBlock(nn.Module):
  def __init__(self):
    super().__init__()

    ### attention subblock
    self.layernorm_1 = nn.LayerNorm(embed_dim, eps=1e-5)
    self.attn = MultiHeadAttention()


    ### linear feedforward (MLP) subblock
    self.layernorm_2 = nn.LayerNorm(embed_dim, eps=1e-5)
    # 4x expansion, then back to embedding size
    self.mlp_1 = nn.Linear(embed_dim, 4*embed_dim, bias=True)
    self.gelu  = nn.GELU()
    self.mlp_2 = nn.Linear(4*embed_dim, embed_dim, bias=True)

  def forward(self, x):

    # attention
    x_att = self.layernorm_1(x) # pre-attention normalization
    x_att = x + self.attn(x_att) # run through attention, then add pre-attention activation ("residual")


    # MLP
    x_ff = self.layernorm_2(x_att) # pre-MLP normalization
    x_ff = x_att + self.mlp_2(self.gelu( self.mlp_1(x_ff) )) # adjustment from expansion-contraction

    return x_ff

In [7]:
class LanguageModel(nn.Module):
  def __init__(self):
    super().__init__()

    # token + position embeddings
    self.wte = nn.Embedding(n_vocab, embed_dim) # token embedding
    self.wpe = nn.Embedding(seq_len, embed_dim) # position embedding

    # transformer blocks
    self.transformerBlocks = nn.Sequential(*[TransformerBlock() for _ in range(n_blocks)])

    # final layernorm
    self.layernorm_final = nn.LayerNorm(embed_dim, eps=1e-5)

    # lm head, with weights tied to token embedding
    self.final_head = nn.Linear(embed_dim, n_vocab, bias=False)
    self.final_head.weight = nn.Parameter(self.wte.weight)

    self.apply(self._init_weights)


  def _init_weights(self, module):
    if isinstance(module, nn.Linear):
      torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
      # Initialize bias terms to zero for Linear layers
      if module.bias is not None:
        torch.nn.init.zeros_(module.bias)

    # Initialize nn.Embedding to Xavier
    if isinstance(module, nn.Embedding):
      torch.nn.init.xavier_uniform_(module.weight)


  def forward(self, idx):

    # token + position embeddings (note the device!)
    token_emb = self.wte(idx) # [B, T, E]
    posit_emb = self.wpe(torch.arange(idx.shape[-1],device=device)) # [T, E]
    x = token_emb + posit_emb # [B, T, E]

    # pass through each transformer block
    x = self.transformerBlocks(x)

    # final layernorm and unembeddings
    x = self.layernorm_final(x)
    logits = self.final_head(x)  # [B, T, n_vocab]

    # scale and logsoftmax
    outputs = F.log_softmax(logits/np.sqrt(embed_dim),dim=-1)

    return outputs


  def generate(self, idx, max_new_tokens=50):

    for _ in range(max_new_tokens):

      # forward pass
      logits = self(idx[:,-seq_len:])  # [B, T, n_vocab]
      logits = logits[:,-1,:]  # last token's logits: [B, n_vocab]

      # undo the log-softmax to get "normal" softmax (probability values)
      probs = torch.exp(logits) # [B, n_vocab]

      # sample next token
      idx_next = torch.multinomial(probs, num_samples=1) # [B, 1]

      # append
      idx = torch.cat((idx, idx_next), dim=1) # [B, T+1]
    return idx

In [8]:
model = LanguageModel().to(device)

X, y = get_data_batch()

X, y = X.to(device), y.to(device)
out = model(X)

print(f'Input Size: {X.shape}')
print(f'Output Size: {out.shape}')

Input Size: torch.Size([16, 256])
Output Size: torch.Size([16, 256, 50257])
