#LLM from Scratch

importing all our dependencies

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import pprint


training dataset with just a few hard-coded input and output text sequences and then programmatically build a vocabulary using all words in those text sequences.



In [2]:
def get_data_and_vocab():
    # Define training data
    training_data = {
        "how are you": "i am fine <end>",
        "tell me about yourself": "i love travelling, coding<end>",
        "what is your name": "Adam <end>",
        "who is nice": "Adam <end>",
        "where is Adam": "at home <end>",
        "how is Adam": "i dont know <end>",
        "who are you": "your companion <end>"
    }

    # Extract input and target phrases
    data_words = [k for k, _ in training_data.items()]
    target_words = [v for _, v in training_data.items()]

    vocabulary_words = list(set([element.lower() for nestedlist in [x.split(" ") for x in data_words] for element in nestedlist] + [element.lower() for nestedlist in [x.split(" ") for x in target_words] for element in nestedlist]))
    vocabulary_words.remove("<end>")
    vocabulary_words.append("<end>")
    vocabulary_words.insert(0, "")

    # Create mappings from word to index and index to word
    word_to_ix = {vocabulary_words[k].lower(): k for k in range(len(vocabulary_words))}
    ix_to_word = {v: k for k, v in word_to_ix.items()}

    return training_data, data_words, target_words, vocabulary_words, word_to_ix, ix_to_word



In [3]:
training_data, data_words, target_words, vocabulary_words, word_to_ix, ix_to_word = get_data_and_vocab()

In [4]:
training_data

{'how are you': 'i am fine <end>',
 'tell me about yourself': 'i love travelling, coding<end>',
 'what is your name': 'Adam <end>',
 'who is nice': 'Adam <end>',
 'where is Adam': 'at home <end>',
 'how is Adam': 'i dont know <end>',
 'who are you': 'your companion <end>'}

In [13]:
data_words

['how are you',
 'tell me about yourself',
 'what is your name',
 'who is nice',
 'where is Adam',
 'how is Adam',
 'who are you']

defining two helper functions to convert text sequences into its corresponding tensors.



In [5]:
# Function to convert a batch of sequences of words to a tensor of indices
def words_to_tensor(seq_batch, device=None):
    index_batch = []

    for seq in seq_batch:
        word_list = seq.lower().split(" ")
        indices = [word_to_ix[word] for word in word_list if word in word_to_ix]
        t = torch.tensor(indices)
        if device is not None:
            t = t.to(device)
        index_batch.append(t)

    return pad_tensors(index_batch)

# Function to convert a tensor of indices to a list of sequences of words
def tensor_to_words(tensor):
    index_batch = tensor.cpu().numpy().tolist()
    res = []
    for indices in index_batch:
        words = []
        for ix in indices:
            words.append(ix_to_word[ix].lower())
            if ix == word_to_ix["<end>"]:
                break
        res.append(" ".join(words))
    return res

# Function to pad a list of tensors to the same length
def pad_tensors(list_of_tensors):
    tensor_count = len(list_of_tensors) if not torch.is_tensor(list_of_tensors) else list_of_tensors.shape[0]
    max_dim = max(t.shape[0] for t in list_of_tensors)
    res = []
    for t in list_of_tensors:
        res_t = torch.zeros(max_dim, *t.shape[1:]).type(t.dtype).to(t.device)
        res_t[:t.shape[0]] = t
        res.append(res_t)

    # Concatenate tensors along a new dimension
    res = torch.cat(res)
    firstDim = len(list_of_tensors)
    secondDim = max_dim

    return res.reshape(firstDim, secondDim, *res.shape[1:])



defining architecture of the model





In [6]:
class SelfAttention(nn.Module):
    def __init__(self, embed_size, head_count):
        super(SelfAttention, self).__init__()
        self.embed_size = embed_size
        self.head_count = head_count

        # Create linear layers for query, key and value projections for each head
        self.query_layers = nn.ModuleList([nn.Linear(embed_size, embed_size, bias=False) for _ in range(head_count)])
        self.key_layers = nn.ModuleList([nn.Linear(embed_size, embed_size, bias=False) for _ in range(head_count)])
        self.value_layers = nn.ModuleList([nn.Linear(embed_size, embed_size, bias=False) for _ in range(head_count)])
        self.fc_out = nn.Linear(head_count * embed_size, embed_size)

    def forward(self, embeddings):
        batch_size, token_count = embeddings.shape[:2]
        qkvs = torch.zeros(self.head_count, 3, batch_size, token_count, self.embed_size).to(embeddings.device)

        for i in range(self.head_count):
            qkvs[i, 0] = self.query_layers[i](embeddings)
            qkvs[i, 1] = self.key_layers[i](embeddings)
            qkvs[i, 2] = self.value_layers[i](embeddings)

        energy = torch.zeros(self.head_count, batch_size, token_count, token_count).to(embeddings.device)
        mask = torch.triu(torch.ones((token_count, token_count)), diagonal=1).bool()

        for h in range(self.head_count):
            for b in range(batch_size):
                for i in range(token_count):
                    for j in range(token_count):
                        energy[h, b, i, j] = torch.dot(qkvs[h, 0, b, i], qkvs[h, 1, b, j])
                energy[h, b] = energy[h, b].masked_fill(mask, float('-inf'))

        attention = torch.nn.functional.softmax(energy, dim=3)

        out = torch.zeros(batch_size, token_count, self.head_count, self.embed_size).to(embeddings.device)
        for h in range(self.head_count):
            for b in range(batch_size):
                for i in range(token_count):
                    for j in range(token_count):
                        out[b, i, h] += (attention[h, b, i, j] * qkvs[h, 2, b, j])

        out = out.reshape(batch_size, token_count, self.head_count * self.embed_size)
        return self.fc_out(out)

    def masked_attention(self, energy):
        max_token_count, embed_size, _ = energy.size()

        mask = torch.triu(torch.ones((max_token_count, max_token_count)), diagonal=1) * float('-inf')
        mask = mask.unsqueeze(0).unsqueeze(0)
        mask = mask.expand(batch_size, embed_size, -1, -1)

        masked_scores = energy + mask.to(energy.device)

        return masked_scores.to(energy.device)


Adding additional transformer layers



In [7]:
class TransformerBlock(nn.Module):
    def __init__(self, embed_size, head_count):
        super(TransformerBlock, self).__init__()
        self.attention = SelfAttention(embed_size, head_count)
        self.norm1 = nn.LayerNorm(embed_size)
        self.norm2 = nn.LayerNorm(embed_size)

        # Feed-forward neural network
        self.feed_forward = nn.Sequential(
            nn.Linear(embed_size, embed_size),
            nn.ReLU(),
            nn.Linear(embed_size, embed_size)
        )

    def forward(self, embeddings):
        attention = self.attention(embeddings)
        out = self.norm1(attention + embeddings)
        out = attention + self.feed_forward(out)
        out = self.norm2(out)
        return out


Combining everything together



In [8]:
class Transformer(nn.Module):
    def __init__(self, vocab_size, embed_size, num_layers, head_count):
        super(Transformer, self).__init__()
        self.embed_size = embed_size
        self.vocab_size = vocab_size
        self.word_embedding = nn.Embedding(vocab_size, embed_size)
        self.layers = nn.ModuleList(
            [TransformerBlock(embed_size, head_count) for _ in range(num_layers)]
        )
        self.fc_out = nn.Linear(embed_size, vocab_size)

    def forward(self, input_tokens, mask=None):
        batch_size, token_count = input_tokens.shape[:2]
        out = self.word_embedding(input_tokens)
        positions = torch.arange(0, token_count).expand(batch_size, token_count).to(input_tokens.device)
        position_encoding = self.position_encoding(positions, self.embed_size)
        out += position_encoding.reshape(out.shape)

        # Pass through each transformer block
        for layer in self.layers:
            out = layer(out)
        out = self.fc_out(out[:, -1, :].reshape(batch_size, self.embed_size)).reshape(batch_size, self.vocab_size)
        return torch.nn.functional.softmax(out, dim=1)

    def position_encoding(self, positions, embed_size):
        angle_rads = self.get_angles(
            positions.unsqueeze(2).float(),
            torch.arange(embed_size)[None, None, :].float().to(positions.device),
            embed_size
        )
        sines = torch.sin(angle_rads[:, :, 0::2])
        cosines = torch.cos(angle_rads[:, :, 1::2])
        pos_encoding = torch.cat([sines, cosines], dim=-1)
        pos_encoding = pos_encoding[None, ...]
        return pos_encoding

    def get_angles(self, pos, i, embed_size):
        angle_rates = 1 / torch.pow(10000, (2 * (i//2)) / embed_size)
        return pos * angle_rates


defining function to make predictions using our model



In [9]:
def infer_recursive(model, input_vectors, max_output_token_count=10):
    model.eval()
    outputs = []

    # Loop over sequences in the batch
    for i in range(input_vectors.shape[0]):
        print(f"Infering sequence {i}")
        input_vector = input_vectors[i].reshape(1, input_vectors.shape[1])
        predicted_sequence = []
        wc = 0

        with torch.no_grad():
            while True:
                output = model(input_vector)
                predicted_index = output[0, :].argmax().item()
                predicted_sequence.append(predicted_index)
                if predicted_index == word_to_ix['<end>'] or wc > max_output_token_count:
                    break
                input_vector = torch.cat([input_vector, torch.tensor([[predicted_index]])], dim=1)
                wc += 1
        outputs.append(torch.tensor(predicted_sequence))
    outputs = pad_tensors(outputs)
    return outputs


defining training function



In [10]:
def train_recursive(model, data, targets, optimizer, criterion):
    model.train()
    optimizer.zero_grad()
    total_loss = 0
    batch_size, token_count, token_count_out = data.shape[0], data.shape[1], targets.shape[1]

    # Loop over sequences in the batch
    for b in range(batch_size):
        end_encountered = False
        cur_count = 0
        while not end_encountered:
            target_vector = torch.zeros(model.vocab_size).to(data.device)

            if cur_count != token_count_out:
                expected_next_token_idx = targets[b, cur_count]
                target_vector[expected_next_token_idx] = 1

            if cur_count > 0:
                model_input = data[b].reshape(token_count).to(data.device)
                part_of_output = targets[b, :cur_count].to(data.device)
                model_input = torch.cat((model_input, part_of_output))
            else:
                model_input = data[b]
            out = model(model_input.reshape(1, token_count + cur_count))
            loss = criterion(out, target_vector.reshape(out.shape))
            total_loss += loss
            cur_count += 1

            if cur_count > token_count_out:
                end_encountered = True

    # Backpropagate gradients and update model parameters
    total_loss.backward()
    optimizer.step()
    return total_loss.item() / batch_size


combining training and inference functions



In [11]:
# Function to demonstrate training and inference
def example_training_and_inference():
    vocab_size = len(word_to_ix)
    embed_size = 512
    num_layers = 4
    heads = 3

    device = torch.device("cpu")
    model = Transformer(vocab_size, embed_size, num_layers, heads).to(device)
    optimizer = optim.Adam(model.parameters(), lr=0.00001)
    criterion = nn.CrossEntropyLoss()

    data = words_to_tensor(data_words, device=device)
    targets = words_to_tensor(target_words, device=device)

    for epoch in range(100):
        avg_loss = train_recursive(model, data, targets, optimizer, criterion)
        print(f'Epoch {epoch + 1}, Loss: {avg_loss:.4f}')

    input_vector = words_to_tensor(data_words, device=device)
    predicted_vector = infer_recursive(model, input_vector)
    predicted_words = tensor_to_words(predicted_vector)

    # Print training data and model output
    print("\n\n\n")
    print("Training Data:")
    pprint.pprint(training_data)
    print("\n\n")
    print("Model Inference:")
    result_data = {data_words[k]: predicted_words[k] for k in range(len(predicted_words))}
    pprint.pprint(result_data)

In [12]:
training_data, data_words, target_words, vocabulary_words, word_to_ix, ix_to_word = get_data_and_vocab()
# Running the example training and inference function
example_training_and_inference()



Epoch 1, Loss: 13.3204
Epoch 2, Loss: 13.2285
Epoch 3, Loss: 13.0901
Epoch 4, Loss: 12.9259
Epoch 5, Loss: 12.7630
Epoch 6, Loss: 12.6260
Epoch 7, Loss: 12.5078
Epoch 8, Loss: 12.4012
Epoch 9, Loss: 12.2810
Epoch 10, Loss: 12.1831
Epoch 11, Loss: 12.1164
Epoch 12, Loss: 12.0619
Epoch 13, Loss: 12.0116
Epoch 14, Loss: 11.9566
Epoch 15, Loss: 11.8977
Epoch 16, Loss: 11.8400
Epoch 17, Loss: 11.7879
Epoch 18, Loss: 11.7414
Epoch 19, Loss: 11.6961
Epoch 20, Loss: 11.6510
Epoch 21, Loss: 11.6039
Epoch 22, Loss: 11.5619
Epoch 23, Loss: 11.5322
Epoch 24, Loss: 11.5118
Epoch 25, Loss: 11.4951
Epoch 26, Loss: 11.4764
Epoch 27, Loss: 11.4550
Epoch 28, Loss: 11.4322
Epoch 29, Loss: 11.4068
Epoch 30, Loss: 11.3781
Epoch 31, Loss: 11.3453
Epoch 32, Loss: 11.3089
Epoch 33, Loss: 11.2726
Epoch 34, Loss: 11.2389
Epoch 35, Loss: 11.2069
Epoch 36, Loss: 11.1786
Epoch 37, Loss: 11.1508
Epoch 38, Loss: 11.1171
Epoch 39, Loss: 11.0712
Epoch 40, Loss: 11.0117
Epoch 41, Loss: 10.9476
Epoch 42, Loss: 10.8977
E