# **Infinite Tolkien Generator**

In this nb we'll develop a NLP model with transformers architecture to showcase the core functionality of LLMs like ChatGPT. We'll train a neural network with the entire text of the Fellowship Of The Ring book from Tolkien and it will be able to generate Tolkien like text without limit. This model is character level language model, not yet a "sub-word" predictor but it is still an extremely educational project to start familiarizing with how LLMs are built.

In [9]:
import torch
import torch.nn as nn
from torch.nn import functional as F

In [1]:
with open('lotr_fotr.txt', 'r', encoding='utf-8') as f:
    text = f.read()

In [2]:
print("length of dataset in characters: ", len(text))

length of dataset in characters:  1021057


Lets look at the first 500 characters.

In [3]:
print(text[:500])

Three Rings for the Elven-kings under the sky,
               Seven for the Dwarf-lords in their halls of stone,
            Nine for Mortal Men doomed to die,
              One for the Dark Lord on his dark throne
           In the Land of Mordor where the Shadows lie.
               One Ring to rule them all, One Ring to find them,
               One Ring to bring them all and in the darkness bind them
           In the Land of Mordor where the Shadows lie.
           
FOREWORD

This tale grew


Here is a list of all the unique characters in the text:

In [4]:
chars = sorted(list(set(text)))
vocab_size = len(chars)

print("List of characters in text :", chars)
print(f'A total of {vocab_size} different characters')

List of characters in text : ['\n', ' ', '!', '"', "'", '(', ')', ',', '-', '.', '/', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ':', ';', '=', '?', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '_', '`', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'Ó', 'á', 'â', 'ä', 'é', 'ë', 'í', 'ó', 'ú', 'û', '–']
A total of 90 different characters


Create a mapping from characters to integers.

In [6]:
# Step 1: Create a dictionary that maps each character to a unique integer index.
# This is useful for converting strings into numerical representations.
stoi = {}  # 'stoi' stands for "string to integer"
for i, ch in enumerate(chars):
    stoi[ch] = i  # Map character 'ch' to its index 'i'

# Step 2: Create a reverse dictionary that maps each index back to its character.
# This allows you to convert numerical data back into readable text.
itos = {}  # 'itos' stands for "integer to string"
for i, ch in enumerate(chars):
    itos[i] = ch  # Map index 'i' back to character 'ch'

# Step 3: Define an encoder function.
# This function takes a string and returns a list of integers based on the 'stoi' mapping.
def encode(s):
    encoded_list = []
    for c in s:
        encoded_list.append(stoi[c])  # Convert each character to its corresponding integer
    return encoded_list

# Step 4: Define a decoder function.
# This function takes a list of integers and returns the corresponding string using 'itos'.
def decode(l):
    decoded_string = ''
    for i in l:
        decoded_string += itos[i]  # Convert each integer back to its corresponding character
    return decoded_string


print(encode("Hello Mr. Frodo"))
print(decode(encode("Hello Mr. Frodo")))

[32, 57, 64, 64, 67, 1, 37, 70, 9, 1, 30, 70, 67, 56, 67]
Hello Mr. Frodo


Let us encode the entire text.

In [17]:
torch.manual_seed(1337)

# Train and test splits
data = torch.tensor(encode(text), dtype=torch.long)
print(data[:500])


tensor([44, 60, 70, 57, 57,  1, 42, 61, 66, 59, 71,  1, 58, 67, 70,  1, 72, 60,
        57,  1, 29, 64, 74, 57, 66,  8, 63, 61, 66, 59, 71,  1, 73, 66, 56, 57,
        70,  1, 72, 60, 57,  1, 71, 63, 77,  7,  0,  1,  1,  1,  1,  1,  1,  1,
         1,  1,  1,  1,  1,  1,  1,  1, 43, 57, 74, 57, 66,  1, 58, 67, 70,  1,
        72, 60, 57,  1, 28, 75, 53, 70, 58,  8, 64, 67, 70, 56, 71,  1, 61, 66,
         1, 72, 60, 57, 61, 70,  1, 60, 53, 64, 64, 71,  1, 67, 58,  1, 71, 72,
        67, 66, 57,  7,  0,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1, 38,
        61, 66, 57,  1, 58, 67, 70,  1, 37, 67, 70, 72, 53, 64,  1, 37, 57, 66,
         1, 56, 67, 67, 65, 57, 56,  1, 72, 67,  1, 56, 61, 57,  7,  0,  1,  1,
         1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1, 39, 66, 57,  1, 58, 67,
        70,  1, 72, 60, 57,  1, 28, 53, 70, 63,  1, 36, 67, 70, 56,  1, 67, 66,
         1, 60, 61, 71,  1, 56, 53, 70, 63,  1, 72, 60, 70, 67, 66, 57,  0,  1,
         1,  1,  1,  1,  1,  1,  1,  1, 

Now we want to split the data into testing and validation sets.

In [18]:

n = int(0.9*len(data)) # first 90% will be train, rest val
train_data = data[:n]
val_data = data[n:]

Let's ilustrate how we build the chunks of the we'll use to train on.

In [19]:
block_size = 8
x = train_data[:block_size]
y = train_data[1:block_size+1]

print(train_data[:block_size])

for t in range(block_size):
    context = x[:t+1]
    target = y[t]
    print(f"when input is {context} the target: {target}")

tensor([44, 60, 70, 57, 57,  1, 42, 61])
when input is tensor([44]) the target: 60
when input is tensor([44, 60]) the target: 70
when input is tensor([44, 60, 70]) the target: 57
when input is tensor([44, 60, 70, 57]) the target: 57
when input is tensor([44, 60, 70, 57, 57]) the target: 1
when input is tensor([44, 60, 70, 57, 57,  1]) the target: 42
when input is tensor([44, 60, 70, 57, 57,  1, 42]) the target: 61
when input is tensor([44, 60, 70, 57, 57,  1, 42, 61]) the target: 66


In [23]:
batch_size = 4 # how many independent sequences will we process in parallel?

# data loading
def get_batch(split):
    # generate a small batch of data of inputs x and targets y
    data = train_data if split == 'train' else val_data
    ix = torch.randint(len(data) - block_size, (batch_size,))
    x = torch.stack([data[i:i+block_size] for i in ix])
    y = torch.stack([data[i+1:i+block_size+1] for i in ix])
    x, y = x.to(device), y.to(device)
    return x, y

xb, yb = get_batch('train')

print('inputs of the transformer:')
print(xb.shape)
print(xb)

print('targets of the transformer:')
print(yb.shape)
print(yb)



inputs of the transformer:
torch.Size([4, 256])
tensor([[57, 72, 57,  ..., 66,  1, 72],
        [58,  1, 75,  ..., 55, 57,  1],
        [57,  1, 54,  ..., 76, 61, 72],
        [ 1, 72, 60,  ..., 57,  1, 72]], device='cuda:0')
targets of the transformer:
torch.Size([4, 256])
tensor([[72, 57, 70,  ...,  1, 72, 60],
        [ 1, 75, 60,  ..., 57,  1, 67],
        [ 1, 54, 57,  ..., 61, 72, 61],
        [72, 60, 53,  ...,  1, 72, 60]], device='cuda:0')


In [24]:

@torch.no_grad()
def estimate_loss():
    out = {}
    model.eval()
    for split in ['train', 'val']:
        losses = torch.zeros(eval_iters)
        for k in range(eval_iters):
            X, Y = get_batch(split)
            logits, loss = model(X, Y)
            losses[k] = loss.item()
        out[split] = losses.mean()
    model.train()
    return out

In [25]:
class Head(nn.Module):
    """ one head of self-attention """

    def __init__(self, head_size):
        super().__init__()
        self.key = nn.Linear(n_embd, head_size, bias=False)
        self.query = nn.Linear(n_embd, head_size, bias=False)
        self.value = nn.Linear(n_embd, head_size, bias=False)
        self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))

        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        # input of size (batch, time-step, channels)
        # output of size (batch, time-step, head size)
        B,T,C = x.shape
        k = self.key(x)   # (B,T,hs)
        q = self.query(x) # (B,T,hs)
        # compute attention scores ("affinities")
        wei = q @ k.transpose(-2,-1) * k.shape[-1]**-0.5 # (B, T, hs) @ (B, hs, T) -> (B, T, T)
        wei = wei.masked_fill(self.tril[:T, :T] == 0, float('-inf')) # (B, T, T)
        wei = F.softmax(wei, dim=-1) # (B, T, T)
        wei = self.dropout(wei)
        # perform the weighted aggregation of the values
        v = self.value(x) # (B,T,hs)
        out = wei @ v # (B, T, T) @ (B, T, hs) -> (B, T, hs)
        return out


In [26]:

class MultiHeadAttention(nn.Module):
    """ multiple heads of self-attention in parallel """

    def __init__(self, num_heads, head_size):
        super().__init__()
        self.heads = nn.ModuleList([Head(head_size) for _ in range(num_heads)])
        self.proj = nn.Linear(head_size * num_heads, n_embd)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        out = torch.cat([h(x) for h in self.heads], dim=-1)
        out = self.dropout(self.proj(out))
        return out


In [27]:
class FeedFoward(nn.Module):
    """ a simple linear layer followed by a non-linearity """

    def __init__(self, n_embd):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_embd, 4 * n_embd),
            nn.ReLU(),
            nn.Linear(4 * n_embd, n_embd),
            nn.Dropout(dropout),
        )

    def forward(self, x):
        return self.net(x)

In [28]:
class Block(nn.Module):
    """ Transformer block: communication followed by computation """

    def __init__(self, n_embd, n_head):
        # n_embd: embedding dimension, n_head: the number of heads we'd like
        super().__init__()
        head_size = n_embd // n_head
        self.sa = MultiHeadAttention(n_head, head_size)
        self.ffwd = FeedFoward(n_embd)
        self.ln1 = nn.LayerNorm(n_embd)
        self.ln2 = nn.LayerNorm(n_embd)

    def forward(self, x):
        x = x + self.sa(self.ln1(x))
        x = x + self.ffwd(self.ln2(x))
        return x

In [29]:
class GPTLanguageModel(nn.Module):

    def __init__(self):
        super().__init__()
        # each token directly reads off the logits for the next token from a lookup table
        self.token_embedding_table = nn.Embedding(vocab_size, n_embd)
        self.position_embedding_table = nn.Embedding(block_size, n_embd)
        self.blocks = nn.Sequential(*[Block(n_embd, n_head=n_head) for _ in range(n_layer)])
        self.ln_f = nn.LayerNorm(n_embd) # final layer norm
        self.lm_head = nn.Linear(n_embd, vocab_size)

        # better init, not covered in the original GPT video, but important, will cover in followup video
        self.apply(self._init_weights)

    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)

    def forward(self, idx, targets=None):
        B, T = idx.shape

        # idx and targets are both (B,T) tensor of integers
        tok_emb = self.token_embedding_table(idx) # (B,T,C)
        pos_emb = self.position_embedding_table(torch.arange(T, device=device)) # (T,C)
        x = tok_emb + pos_emb # (B,T,C)
        x = self.blocks(x) # (B,T,C)
        x = self.ln_f(x) # (B,T,C)
        logits = self.lm_head(x) # (B,T,vocab_size)

        if targets is None:
            loss = None
        else:
            B, T, C = logits.shape
            logits = logits.view(B*T, C)
            targets = targets.view(B*T)
            loss = F.cross_entropy(logits, targets)

        return logits, loss

    def generate(self, idx, max_new_tokens):
        # idx is (B, T) array of indices in the current context
        for _ in range(max_new_tokens):
            # crop idx to the last block_size tokens
            idx_cond = idx[:, -block_size:]
            # get the predictions
            logits, loss = self(idx_cond)
            # focus only on the last time step
            logits = logits[:, -1, :] # becomes (B, C)
            # apply softmax to get probabilities
            probs = F.softmax(logits, dim=-1) # (B, C)
            # sample from the distribution
            idx_next = torch.multinomial(probs, num_samples=1) # (B, 1)
            # append sampled index to the running sequence
            idx = torch.cat((idx, idx_next), dim=1) # (B, T+1)
        return idx


In [33]:
# hyperparameters
batch_size = 64 # how many independent sequences will we process in parallel?
block_size = 256 # what is the maximum context length for predictions?
max_iters = 3000
eval_interval = 500
learning_rate = 3e-4
device = 'cuda' if torch.cuda.is_available() else 'cpu'
eval_iters = 200
n_embd = 384
n_head = 6
n_layer = 6
dropout = 0.2

In [34]:

model = GPTLanguageModel()
m = model.to(device)
# print the number of parameters in the model
print(sum(p.numel() for p in m.parameters())/1e6, 'M parameters')

# create a PyTorch optimizer
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

for iter in range(max_iters):

    # every once in a while evaluate the loss on train and val sets
    if iter % eval_interval == 0 or iter == max_iters - 1:
        losses = estimate_loss()
        print(f"step {iter}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}")

    # sample a batch of data
    xb, yb = get_batch('train')

    # evaluate the loss
    logits, loss = model(xb, yb)
    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()


10.808154 M parameters
step 0: train loss 4.7143, val loss 4.7131
step 500: train loss 1.6001, val loss 1.6409
step 1000: train loss 1.2723, val loss 1.3497
step 1500: train loss 1.1434, val loss 1.2652
step 2000: train loss 1.0568, val loss 1.2308
step 2500: train loss 0.9911, val loss 1.2104
step 2999: train loss 0.9290, val loss 1.2112


In [35]:
# generate from the model
context = torch.zeros((1, 1), dtype=torch.long, device=device)
print(decode(m.generate(context, max_new_tokens=2000)[0].tolist()))
#open('more.txt', 'w').write(decode(m.generate(context, max_new_tokens=10000)[0].tolist()))


             Old of night hound he sighed him like grain
            on a shand
          and your resolve is upon dooks
            bone afternesses were sholly,
              and pull up threes latelling before
         the fountain bound, easy shell like meadow: seconsat of folk, if words. So cout seemed to best foreign than enough. There was Strider at a countrive entray way that we have come up through thick wolf; they stood now seemed that they mark. It did not have too straight and stup outhward perhaps.
     They like desproúlin people-sprace was sent and Caradhras. Avertheless he tranged or two reterrible; then old window on it would felt rest. It subgers for pace they became mostly great and fresent that beams knownhill, and soon the legends of Old Tunnkeland; and one folk in the seatter was the commoney found the road abovalit upon the files between their neck was steel.
     He lightely upon a bright the gate, until it had been wept to buidde slafess. The entrant seemed to