In [1]:
import torch
from operator import itemgetter
from torch.utils.data import DataLoader
import random
import numpy as np

# 1 - Data Preparation

## 1.1. Examining the Data

In [2]:
with open('lord-of-the-rings-processed.txt','r',encoding='utf-8') as f:
    text = f.read()

In [3]:
print(f"length of the book - {len(text)} characters")

length of the book - 3729059 characters


In [4]:
print(text[:100])

The Music of the Ainur There was Eru, the One, who in Arda is called lluvatar; and he made first the


## 1.2. Format Data

In [74]:
chars = sorted(list(set(text)))
print(chars)

['\n', ' ', '!', '"', "'", '(', ')', ',', '-', '.', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ':', ';', '?', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '®', '—', '‘', '’', '“', '”']


In [75]:
common = r"0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ();:.!?-,"
special = [char for char in chars if char not in list(common)]
print(special)

['\n', ' ', '"', "'", '®', '—', '‘', '’', '“', '”']


In [76]:
text = text.replace("\n"," ")
text = text.replace("  ", " ")
text = text.replace("®", "u")

In [84]:
special_char = list(itemgetter(*[6,7,8,9])(special))
special_char.extend([",",";",":","!","?"])
special_char

['‘', '’', '“', '”', ',', ';', ':', '!', '?']

In [88]:
no_space_after = list(itemgetter(*[0,2])(special_char))
no_space_after

['‘', '“']

In [87]:
no_space_before = list(itemgetter(*set(range(len(special_char)))-set([0,2]))(special_char))
no_space_before

['’', '”', ',', ';', ':', '!', '?']

In [89]:
# replace such as <' sss> to <'sss>
for s in no_space_after:
    text = text.replace(s+" ", s)

# replace such as <s ,> to <s,>
for s in no_space_before:
    text = text.replace(" "+s,s)


In [90]:
# standardize the use of quotation marks
text = text.replace('"',"'")
text = text.replace('‘',"'")
text = text.replace('’',"'")
text = text.replace('“',"'")
text = text.replace('”',"'")

In [91]:
with open("lord-of-the-rings-processed.txt","w") as f:
    f.write(text)

## 1.3. Create Dictionary and Tokenize the Data

**tokenizer**

In [5]:
chars = sorted(list(set(text)))
common = r"0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ();:.!?-,"
special = [char for char in chars if char not in list(common)]
print(special)

[' ', "'", '—']


In [6]:
encode_char = {char:i for i, char in enumerate(chars)}
decode_char = {i:char for i, char in enumerate(chars)}
print(len(encode_char))

74


In [7]:
encode = lambda string: [encode_char[s] for s in string]
decode = lambda nums: ''.join([decode_char[n] for n in nums])

In [8]:
encode("This is good")

[40, 54, 55, 65, 0, 55, 65, 0, 53, 61, 61, 50]

In [9]:
decode([8,20,69,44,27])

'0?wXG'

## 1.4. Load data and construct batches + dataloaders

In [10]:
data = torch.tensor(encode(text),dtype=torch.long)
data.shape

torch.Size([3729059])

In [11]:
start = 10505
length = 500
print(decode(data[start:start+length//4].tolist()))
print(decode(data[start+length//4:start+length//4*2].tolist()))
print(decode(data[start+length//4*2:start+length//4*3].tolist()))
print(decode(data[start+length//4*3:start+length].tolist()))

e filled with gladness; but because of the roaring of the sea they felt a great unquiet. And they observed the winds and the 
air, and the matters of which Arda was made, of iron and stone and silver and gold and many substances: but of all these wate
r they most greatly praised. And it is said by the Eldar that in water there lives yet the echo of the Music of the Ainur mor
e than in any substance else that is in this Earth; and many of the Children of lluvatar hearken still unsated to the voices 


**train test split**

In [12]:
ratio = 0.85
n = int(ratio*len(data))
train_data = data[:n]
val_data = data[n:]

In [13]:
len(train_data), len(val_data)

(3169700, 559359)

**dataset and dataloader**

In [14]:
class slideTokenizedTextDataset(torch.utils.data.Dataset):
    def __init__(self,full_txt,block_size):
        self.txt = full_txt
        self.block_size = block_size
    def __len__(self):
        return len(self.txt) - self.block_size
    def __getitem__(self, idx):
        input_sequence = self.txt[idx:idx+self.block_size]
        output_sequence = self.txt[idx+1:idx+self.block_size+1]
        return input_sequence, output_sequence

In [15]:
block_size = 500

train_dataset = slideTokenizedTextDataset(full_txt = train_data,
                                                 block_size = block_size)

val_dataset = slideTokenizedTextDataset(full_txt = val_data,
                                               block_size = block_size)

In [16]:
len(train_dataset), len(val_dataset)

(3169200, 558859)

In [21]:
batch_size = 128
train_dataloader = DataLoader(dataset=train_dataset,batch_size=batch_size,shuffle=True,drop_last=True)

In [22]:
val_num_samples = 200000
val_sampler = torch.utils.data.RandomSampler(val_dataset,replacement=False,num_samples=val_num_samples)
val_dataloader = DataLoader(dataset=val_dataset,batch_size=batch_size,sampler=val_sampler,drop_last=True)

In [23]:
s_input, s_output = next(iter(train_dataloader))
print(decode(s_input[2].tolist()))
print(decode(s_output[2].tolist()))

 to lose us,' he said. But at that moment Merry gave a whistle of relief and pointed ahead. 'Well, well!' he said. 'These trees do shift. There is the Bonfire Glade in front of us (or I hope so), but the path to it seems to have moved away!' The light grew clearer as they went forward. Suddenly they came out of the trees and found themselves in a wide circular space. There was sky above them, blue and clear to their surprise, for down under the Forest -roof they had not been able to see the risi
to lose us,' he said. But at that moment Merry gave a whistle of relief and pointed ahead. 'Well, well!' he said. 'These trees do shift. There is the Bonfire Glade in front of us (or I hope so), but the path to it seems to have moved away!' The light grew clearer as they went forward. Suddenly they came out of the trees and found themselves in a wide circular space. There was sky above them, blue and clear to their surprise, for down under the Forest -roof they had not been able to see the risi

In [24]:
len(train_dataloader), len(val_dataloader)

(24759, 1562)

# 2 - Model definition

In [42]:
# class for self-attention calculation from a single head
# take input x, project to Q,K,V, apply the self-attention formula (Q @ K.T / sqrt(head_size)) @ V
# in addition, the class takes an input decoder - it signals whether it is a encoder head or decoder head, decoder head has an additional mask step
# input: [B, T, C] --> [B, T, H], where for multi-head attention, H = C / num_heads, C = emb_dim
class Head(torch.nn.Module):
    def __init__(self,emb_dim,head_size,block_size,dropout_rate,is_decoder):
        super().__init__()
        self.H = head_size
        self.key = torch.nn.Linear(emb_dim,head_size,bias=False) # not including bias because of layer norm include bias term
        self.query = torch.nn.Linear(emb_dim,head_size,bias=False)
        self.value = torch.nn.Linear(emb_dim,head_size,bias=False)
        if is_decoder:
            self.register_buffer("tril_mat", torch.tril(torch.ones(block_size,block_size))) # parameters not being updated
        self.dropout = torch.nn.Dropout(p=dropout_rate)
        self.is_decoder = is_decoder
        
    def forward(self,x):
        B, T, C = x.shape # decompose dimensions: Batch, Time, Embedding
        k = self.key(x) # B, T, H
        q = self.query(x) # B, T, H
        attention_W = q @ k.transpose(-2, -1) * self.H**-0.5 # B, T, T
        if self.is_decoder:
            attention_W = attention_W.masked_fill(self.tril_mat==0, float('-inf')) # B, T, T
        attention_W = torch.nn.functional.softmax(attention_W,dim=-1)  # B, T, T
        attention_W = self.dropout(attention_W)
        v = self.value(x) # B, T, H
        output = attention_W @ v # B, T, H
        return output

In [43]:
# class for orchestrate multiple heads for multiple self-attention calculation
# input: [B, T, C] --> output: [B, T, C]
class MultiHeadAttention(torch.nn.Module):
    def __init__(self, emb_dim, num_heads, head_size, block_size,dropout_rate, is_decoder):
        super().__init__()
        self.heads = torch.nn.ModuleList([Head(emb_dim,head_size,block_size,dropout_rate,is_decoder) for _ in range(num_heads)])
        self.projection = torch.nn.Linear(emb_dim,emb_dim)
        self.dropout = torch.nn.Dropout(dropout_rate)
    def forward(self,x):
        output = torch.cat([h(x) for h in self.heads],dim=-1)
        output = self.projection(output)
        output = self.dropout(output)
        return output

In [39]:
# feedforward network after multi-head attention: multiplier parameter is the number of times of neurons in hidden layer than input, default is 4 from the paper
# input [B, T, C] --> output [B, T, C]
class FeedForward(torch.nn.Module):
    def __init__(self,emb_dim,dropout_rate,multiplier):
        super().__init__()
        self.fflayer = torch.nn.Sequential(
            torch.nn.Linear(emb_dim, multiplier * emb_dim),
            torch.nn.ReLU(),
            torch.nn.Linear(multiplier * emb_dim, emb_dim),
            torch.nn.Dropout(p=dropout_rate)
        )
    def forward(self, x):
        output = self.fflayer(x)
        return output

In [53]:
# class for a transformer block: multi-head attention + feed forward + layer norm + residual connection
class TransformerBlock(torch.nn.Module):
    def __init__(self,emb_dim,num_heads,block_size,dropout_rate_attention,dropout_rate_ff,is_decoder=True,ff_multiplier=4):
        super().__init__()
        assert emb_dim % num_heads == 0, "number of heads must be divisible by embedding dimention to determine the head size"
        head_size = emb_dim // num_heads
        self.multi_atten = MultiHeadAttention(emb_dim = emb_dim, num_heads = num_heads, head_size = head_size, block_size = block_size,
                                              dropout_rate = dropout_rate_attention, is_decoder = is_decoder)
        self.feedforward = FeedForward(emb_dim = emb_dim, dropout_rate = dropout_rate_ff,multiplier = ff_multiplier)
        self.layernorm1 = torch.nn.LayerNorm(emb_dim)
        self.layernorm2 = torch.nn.LayerNorm(emb_dim)
    
    def forward(self, x):
        output = x + self.multi_atten(self.layernorm1(x)) # layer norm + multi-head attention + residual
        output =  output + self.feedforward(self.layernorm2(output)) # layer norm + feed forward + residual
        return output


In [55]:
pe = np.zeros((3,2))
pe

array([[0., 0.],
       [0., 0.],
       [0., 0.]])

In [56]:
pe = np.zeros(6)
pe

array([0., 0., 0., 0., 0., 0.])

In [58]:
pe = pe.reshape(3,2)
pe

array([[0., 0.],
       [0., 0.],
       [0., 0.]])