In [5]:
# IMPORTS
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, Subset
import torch.nn.functional as F
from torch.optim.lr_scheduler import ExponentialLR
import math
from sklearn.model_selection import train_test_split
import re

In [6]:
# GPT MODEL

# MultiHeadAttention
class MultiHeadAttention(nn.Module):
    def __init__(self, embed_size, nheads):
        super(MultiHeadAttention, self).__init__()
        self.embed_size = embed_size
        self.nheads = nheads
        self.head_dim = embed_size // nheads

        self.fc_values = nn.Linear(self.head_dim, self.head_dim, bias=False)
        self.fc_keys = nn.Linear(self.head_dim, self.head_dim, bias=False)
        self.fc_queries = nn.Linear(self.head_dim, self.head_dim, bias=False)
        self.fc_out = nn.Linear(embed_size, embed_size)
    
    def forward(self, x_queries, x_keys, x_values, mask):
        batch_size = x_queries.shape[0] # x_keys.shape[0] x_values.shape[0]
        seq_length = x_queries.shape[1] # x_keys.shape[1]
        value_length = x_values.shape[1]
        # (x_embedded) x_queries x_keys SHAPE: [batch_size, seq_length, embed_size]
        # x_values SHAPE: [batch_size, value_len, embed_size]

        # Split the embedding into nheads different pieces
        x_queries = x_queries.reshape(batch_size, seq_length, self.nheads, self.head_dim)
        x_keys = x_keys.reshape(batch_size, seq_length, self.nheads, self.head_dim)
        x_values = x_values.reshape(batch_size, value_length, self.nheads, self.head_dim)
        queries = self.fc_queries(x_queries)
        keys = self.fc_keys(x_keys)
        values = self.fc_values(x_values)
        # queries keys values SHAPE: [batch_size, seq_length, nheads, head_dim]
        # values SHAPE: [batch_size, value_length, nheads, head_dim]

        # MatMul Queries Keys
        similarity = torch.einsum("bqhd,bkhd->bhqk", queries, keys)
        if mask is not None:  # Add Mask
            similarity += mask
        similarity = torch.softmax(similarity / (self.head_dim ** 0.5), dim=3)
        # similarity SHAPE: [batch_size, nheads, seq_length, seq_length]
        
        # MatMul Similarity Values
        attention = torch.einsum("bhss,bvhd->bvhd", similarity, values).reshape(batch_size, value_length, self.nheads * self.head_dim)

        output_attention = self.fc_out(attention)
        # output_attention attention SHAPE: [batch_size, value_length, embed_size (nheads * head_dim)]
        return output_attention, attention

# Feedforward
class Feedforward(nn.Module):
    def __init__(self, embed_size, dim_feedforward, dropout):
        super(Feedforward, self).__init__()
        self.fc1 = nn.Linear(embed_size, dim_feedforward)
        self.fc2 = nn.Linear(dim_feedforward, embed_size)        
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        output_feedforward = self.dropout(x)
        return output_feedforward

# Decoder
class Decoder(nn.Module):
    def __init__(self, embed_size, nheads, dim_feedforward, dropout):
        super(Decoder, self).__init__()
        self.multi_head_attention = MultiHeadAttention(embed_size, nheads)
        self.norm1 = nn.LayerNorm(embed_size)
        self.feedforward = Feedforward(embed_size, dim_feedforward, dropout)
        self.norm2 = nn.LayerNorm(embed_size)

    def forward(self, x, attention_mask):
        # (x_embedded) x SHAPE: [batch_size, seq_length, embed_size]
        output_attention, attention = self.multi_head_attention(x, x, x, attention_mask)
        x_attention = self.norm1(x + output_attention)
        # x output_attention SHAPE: [batch_size, seq_length, embed_size]
        
        x_feedforward = self.feedforward(x_attention)
        output_decoder = self.norm2(x_attention + x_feedforward)
        # output_decoder feedforward SHAPE: [batch_size, seq_length, embed_size]
        return output_decoder, attention

# GPTModel
class GPTModel(nn.Module):
    def __init__(self, vocabulary, embed_size=8, nheads=4, dim_feedforward=128, decoder_layers=1, dropout=0.1, device='cpu', max_seq_length=500):
        super(GPTModel, self).__init__()
        assert embed_size % 2 == 0, "embed_size must be even"
        assert embed_size % nheads == 0, "Embedding size must be divisible by the number of heads"
        
        self.device = device
        self.vocabulary = vocabulary
        self.vocab_size = len(vocabulary)
        self.embed_size = embed_size
        self.embedding = nn.Embedding(self.vocab_size, embed_size).to(device)
        self.pos_encoder = self.get_positional_encoding(max_seq_length)
        self.decoder = Decoder(embed_size, nheads, dim_feedforward, dropout).to(device)
        self.decoder_layers = nn.ModuleList([self.decoder for _ in range(decoder_layers)])
        self.fc_out = nn.Linear(embed_size, self.vocab_size).to(device)

    def forward(self, x):
        # x SHAPE: [batch_size, seq_length]
        seq_length = x.shape[1]
        x_embedded = self.embedding(x)
        # x_embedded SHAPE: [batch_size, seq_length, embed_size]
        x_embedded_pos = x_embedded + self.pos_encoder[:, :seq_length]
        # x_embedded_pos SHAPE: [batch_size, seq_length, embed_size]
        attention_mask = self.get_self_attention_mask(seq_length)
        # attention_mask SHAPE: [seq_length, seq_length]
        output_decoder = x_embedded_pos
        for decoder in self.decoder_layers:
            output_decoder, attention = decoder(output_decoder, attention_mask)
        # output_decoder SHAPE: [batch_size, seq_length, embed_size]
        # attention SHAPE: [batch_size, seq_length, seq_length]
        output = self.fc_out(output_decoder)
        # output SHAPE: [batch_size, seq_length, vocab_size]
        return output, attention

    # Generate sinusoidal positional encodings
    def get_positional_encoding(self, max_seq_length):
        position = torch.arange(0, max_seq_length).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, self.embed_size, 2) * -(math.log(10000.0) / self.embed_size))
        pe = torch.zeros(max_seq_length, self.embed_size)
        pe[:, 0::2] = torch.sin(position * div_term)  # Sine for even indices
        pe[:, 1::2] = torch.cos(position * div_term)  # Cosine for odd indices
        return pe.unsqueeze(0).to(self.device)
    
    # Generate attention mask
    def get_self_attention_mask(self, seq_length):
        mask = (torch.triu(torch.ones(seq_length, seq_length)) == 1).transpose(0, 1)
        mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
        return mask.to(self.device)


# Training function 
def train_model(model, train_loader, val_loader, epochs=20, learning_rate=0.001, device='cpu'):
    model.to(device)  # Move model to the correct device
    model.train()
    optimizer = optim.AdamW(model.parameters(), lr=learning_rate)
    criterion = nn.CrossEntropyLoss()
    scheduler = ExponentialLR(optimizer, gamma=0.9)

    for epoch in range(epochs):
        # Training phase
        total_train_loss = 0
        for inputs, targets in train_loader:
            inputs, targets = inputs.to(device), targets.to(device)  # Move data to the same device
            optimizer.zero_grad()
            output, _ = model(inputs)
            loss = criterion(output.view(-1, model.vocab_size), targets.view(-1))
            loss.backward()
            optimizer.step()
            total_train_loss += loss.item()
        avg_train_loss = total_train_loss / len(train_loader)

        # Validation phase (Set the model to evaluation mode)
        model.eval()
        with torch.no_grad():
            total_val_loss = 0
            for val_inputs, val_targets in val_loader:
                val_inputs, val_targets = val_inputs.to(device), val_targets.to(device)
                val_output, _ = model(val_inputs)
                val_loss = criterion(val_output.view(-1, model.vocab_size), val_targets.view(-1))
                total_val_loss += val_loss.item()
            avg_val_loss = total_val_loss / len(val_loader)

        
        # Print training and validation losses
        print(f'Epoch {epoch+1}, Train Loss: {round(avg_train_loss, 4)}, Val Loss: {round(avg_val_loss, 4)}')
        #print(f'Epoch {epoch+1}, Train Loss: {round(avg_train_loss, 4)}')
        scheduler.step()

        # After validation completes, switch back to training mode
        model.train()
        
    return model

In [7]:
# READ TXT
def read_file(filename):
    with open(filename, 'r') as file:
        file_content = file.read()
        file_content_with_newlines = file_content.replace('\n', ' ZZZ ')
    return file_content_with_newlines

FILENAME = 'text.txt'
text = read_file(FILENAME)
print(text[:10])

I love mac


In [8]:
# TOKENS AND VOCABULARY

def tokenize_text(input_string):
    tokens = input_string.split() # Simple split
    #tokens = re.findall(r'\w+|\S+|\s', input_string) # Split the string based on spaces and all non-word characters
    #tokens = re.findall(r'\w+|\S', input_string) # Split the string based on all non-word characters
    return tokens

# Create tokens and vocabulary
tokens = tokenize_text(text)
vocabulary, inverse_vocab = {}, {}
for i, word in enumerate(set(tokens), 0):
    vocabulary[word] = i
    inverse_vocab[i] = word

print('Vocabulary Length:', len(vocabulary))
print(vocabulary)

Vocabulary Length: 13
{'like': 0, 'artifcial': 1, 'enjoy': 2, 'intelligence': 3, '.': 4, 'machine': 5, 'love': 6, 'artificial': 7, 'and': 8, 'about': 9, 'I': 10, 'learning': 11, 'care': 12}


In [9]:
# DATASETS AND DATALOADERS

class TextDataset(Dataset):
    def __init__(self, tokens, vocabulary, sequence_length):
        self.sequence_length = sequence_length
        self.data = [vocabulary[word] for word in tokens]
    
    def __len__(self):
        return len(self.data) - self.sequence_length
    
    def __getitem__(self, idx):
        # Get the input sequence of length `sequence_length` starting from `idx`
        input_seq = torch.tensor(self.data[idx:idx + self.sequence_length], dtype=torch.long)
        # The target sequence is offset by one token to predict the next token
        target_seq = torch.tensor(self.data[idx + 1:idx + self.sequence_length + 1], dtype=torch.long)
        return input_seq, target_seq


SEQUENCE_LENGTH = 4
BATCH_SIZE = 1
VAL_RATIO = 0.1

dataset = TextDataset(tokens, vocabulary, SEQUENCE_LENGTH)
indices = list(range(len(dataset)))
train_indices, val_indices = train_test_split(indices, test_size=VAL_RATIO, random_state=42, shuffle=True)
train_dataset = Subset(dataset, train_indices)
val_dataset = Subset(dataset, val_indices)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=len(val_dataset), shuffle=False)


print('SEQUENCE_LENGTH:', SEQUENCE_LENGTH)
print('BATCH_SIZE:', BATCH_SIZE)
print('Number of BATCHs:', len(train_loader))
print('Total Samples:', len(train_loader)*BATCH_SIZE)

print("\nExample:")
for batch in train_loader:
    inputs, targets = batch
    print("Input:", [inverse_vocab[idx.item()] for idx in inputs[0]])
    print("Target:", [inverse_vocab[idx.item()] for idx in targets[0]])
    break

SEQUENCE_LENGTH: 4
BATCH_SIZE: 1
Number of BATCHs: 51
Total Samples: 51

Example:
Input: ['intelligence', '.', 'I', 'love']
Target: ['.', 'I', 'love', 'and']


In [10]:
# Initialize model
EMBED_SIZE = 512
NHEADS = 16
DIM_FEEDFORWARD = 1024
DECODER_LAYERS = 1
DROPOUT = 0.1
EPOCHS = 20
LEARNING_RATE = 0.002

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)
model = GPTModel(vocabulary, embed_size=EMBED_SIZE, nheads=NHEADS, dim_feedforward=DIM_FEEDFORWARD, decoder_layers=DECODER_LAYERS, dropout=DROPOUT, device=device)

cpu


In [11]:
model = train_model(model, train_loader, val_loader, epochs=EPOCHS, learning_rate=LEARNING_RATE, device=device)

Epoch 1, Train Loss: 1.8043, Val Loss: 0.8857
Epoch 2, Train Loss: 0.8399, Val Loss: 0.9421
Epoch 3, Train Loss: 0.7335, Val Loss: 1.2924
Epoch 4, Train Loss: 0.7805, Val Loss: 0.7502
Epoch 5, Train Loss: 0.7389, Val Loss: 0.9521
Epoch 6, Train Loss: 0.7045, Val Loss: 0.7982
Epoch 7, Train Loss: 0.6957, Val Loss: 0.712
Epoch 8, Train Loss: 0.6367, Val Loss: 0.9864
Epoch 9, Train Loss: 0.6561, Val Loss: 0.7738
Epoch 10, Train Loss: 0.5801, Val Loss: 0.7649
Epoch 11, Train Loss: 0.573, Val Loss: 0.8394
Epoch 12, Train Loss: 0.5097, Val Loss: 0.6457
Epoch 13, Train Loss: 0.5076, Val Loss: 0.8462
Epoch 14, Train Loss: 0.4943, Val Loss: 0.975
Epoch 15, Train Loss: 0.519, Val Loss: 0.672
Epoch 16, Train Loss: 0.4077, Val Loss: 0.571
Epoch 17, Train Loss: 0.4, Val Loss: 0.5275
Epoch 18, Train Loss: 0.3933, Val Loss: 0.5202
Epoch 19, Train Loss: 0.3669, Val Loss: 0.6279
Epoch 20, Train Loss: 0.3628, Val Loss: 0.5133


In [12]:
# GENERATE TOKENS

def generate_tokens(model, start_seq, generate_length=50, generate_context=10, temperature=0.00001):
    # Set the model to evaluation mode
    model.eval()
    # Retrieve the vocabulary from the model
    vocabulary = model.vocabulary 
    # Create inverse vocabulary to map indices back to words
    inverse_vocab = {v: k for k, v in vocabulary.items()}
    # Convert start sequence to tensor of indices
    src = torch.tensor([vocabulary[word] for word in start_seq.split()], dtype=torch.long).unsqueeze(0).to(model.device)  # Now [1, seq_length]
    # Iteratively generate new words
    generated_seq = start_seq
    for _ in range(generate_length):
        with torch.no_grad():
            # Predict from the model
            output, attention = model(src)
            # Apply temperature scaling to logits
            logits = output[:, -1, :] / temperature
            # Compute softmax probabilities and sample the next token value from the multinomial distribution
            probabilities = F.softmax(logits, dim=-1)
            next_token_value = torch.multinomial(probabilities, 1).item()
            # Convert token value back to token
            next_token = inverse_vocab[next_token_value]
            # Append generated token to the output sequence
            if next_token == 'ZZZ':
                generated_seq += '\n'
            else:
                generated_seq += ' ' + next_token
            #generated_seq += next_token
            # Append to input for next prediction and slice to only account for context tokens
            src = torch.cat([src, torch.tensor([[next_token_value]], dtype=torch.long).to(model.device)], dim=1)[:, -generate_context:]

    return generated_seq


START_SEQUENCE = "I"  # Initial sequence for text generation
GENERATE_LENGTH = 100  # Number of tokens to generate
GENERATE_CONTEXT = SEQUENCE_LENGTH  # Size of context used for generation
TEMPERATURE = 0.001  # Temperature for controlling randomness in token generation
generated_text = generate_tokens(model, start_seq=START_SEQUENCE, generate_length=GENERATE_LENGTH, generate_context=GENERATE_CONTEXT, temperature=TEMPERATURE)
print("Generated text:\n", generated_text)

Generated text:
 I love and enjoy machine learning . I love and enjoy machine learning . I love and enjoy machine learning . I love and enjoy machine learning . I love and enjoy machine learning . I love and enjoy machine learning . I love and enjoy machine learning . I love and enjoy machine learning . I love and enjoy machine learning . I love and enjoy machine learning . I love and enjoy machine learning . I love and enjoy machine learning . I love and enjoy machine learning . I love and enjoy machine learning . I love and
