In [1]:
import torch
import torch.nn as nn
from torch.nn import functional as F

In [2]:
# Set the parameters.
learning_rate = 3e-4 # Set the learning rate.
batch_size = 32 # The number of training examples that will be used in each iteration of the loop.
block_size = 64 # Set the total numbers of characters we are going to use for prediction.
max_iters = 5000
eval_interval = 1000
eval_iters = 200
n_embd = 64 # Number of embedded dimensions.
n_head = 4 # Number of heads.
n_layer = 4 # Number of layers.
dropout = 0.2 # Set the probability for dropping outputs / setting outputs to zero.
device = 'cuda' if torch.cuda.is_available() else 'cpu' # Run the model on GPU if available, otherwise on CPU.
torch.manual_seed(1337)

<torch._C.Generator at 0x7f6fcc5299b0>

In [3]:
# Download the input dataset, the famous "Luceafarul" poem by Mihai Eminescu.
!wget https://raw.githubusercontent.com/AdrianTorjKobza/Python/main/AI_ChatGPT_Mini/luceafarul.txt

--2023-03-01 11:17:19--  https://raw.githubusercontent.com/AdrianTorjKobza/Python/main/AI_ChatGPT_Mini/luceafarul.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.109.133, 185.199.110.133, 185.199.108.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.109.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 11052 (11K) [text/plain]
Saving to: ‘luceafarul.txt’


2023-03-01 11:17:19 (95.6 MB/s) - ‘luceafarul.txt’ saved [11052/11052]



In [4]:
# Open and read the input file.
with open('luceafarul.txt', 'r', encoding='utf-8') as f:
  text = f.read()

In [5]:
# Get all the unique characters from text.
chars = set(text) # Get a set off all unique characters from text.
chars = list(chars) # Create a list of all the unique characters.
chars = sorted(chars) # Sort the characters in ascending order.

print ('List of all characters: ', ''.join(chars)) # Print all the unique characters

vocab_size = len(chars)
print ('\nVocabulary size is', vocab_size, 'unique characters.') # Print the total numbers of characters.

List of all characters:  
 !',-.:;?ABCDEFGHIJLMNOPRSTUVabcdefghijlmnoprstuvyzÎâîăŞşŢţ

Vocabulary size is 60 unique characters.


In [6]:
# Tokenize the characters of the input text. Each character will be tokenized, thus we will have 65 tokens.
# Mapp the list of characters to integers.

# Create empty dictionaries.
stoi = {}
itos = {}

# Iterate over the list of chars and return both the index and character.
for i, ch in enumerate(chars):
  stoi.update({ch:i}) # Update the dictionary and mapp each character to it's corresponding index 'i'.
  itos.update({i:ch}) # Update the dictionary and mapp each index to it's corresponding character 'ch'

encode = lambda s: [stoi[c] for c in s] # Return the list of indices corresponding to each character from string 's'.
decode = lambda l: ''.join([itos[i] for i in l]) # Return the list of characters corresponding to each index from list of indices 'l'. 

print ('The list of integers mapped to a character:\n', encode(chars))
print (decode(encode(chars)))

The list of integers mapped to a character:
 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59]

 !',-.:;?ABCDEFGHIJLMNOPRSTUVabcdefghijlmnoprstuvyzÎâîăŞşŢţ


In [7]:
# Encode the entire text and store it into a torch tensor.
data = torch.tensor(encode(text), dtype = torch.long)

print (data.shape, data.dtype)

torch.Size([9830]) torch.int64


In [8]:
# Split the data into training and validation datasets, to avoid overfitting.
n = int(0.9 * len(data)) # Set the first 90% of the data, to be part of the training dataset. The remaining 10% is part of the validation dataset.
train_data = data[:n]
val_data = data[n:]

In [9]:
# Generate a batch of inputs x and targets y.
def get_batch(split):
  if split == 'train':
    data = train_data
  else:
    data = val_data

  ix = torch.randint(len(data) - block_size, (batch_size,)) # Generate a batch of random starting indices.
  x = torch.stack([data[i:i+block_size] for i in ix]) # Create a tensor by stacking blocks of data.
  y = torch.stack([data[i+1:i+block_size+1] for i in ix]) # Create a tensor by stacking blocks of data, offset by 1.
  x, y = x.to(device), y.to(device)

  return x, y

In [10]:
# Define decorator that temporarily disables the gradient computation.
# The operations performed inside the decorated block will not have their gradients tracked.
@torch.no_grad()

# Get the average loss over multiple batches.
# Evaluate the loss when iterating over the training and validation datasets.
def estimate_loss():
    out = {}
    model.eval() # Set the model to evaluation phase.

    # Iterate over the training and validation datasets.
    for split in ['train', 'val']:
        losses = torch.zeros(eval_iters) # Create a tensor with all elements initialized to zero.
        
        for k in range(eval_iters):
            X, Y = get_batch(split) # Get a batch of data.
            logits, loss = model(X, Y) # Evaluate the loss on the batch of data.
            losses[k] = loss.item() # Store the loss value.
        out[split] = losses.mean() # Compute the mean loss.

    model.train() # Set the model back to training mode.
    return out # Return the dictionary, containing the mean loss for each data split.

In [11]:
# Head class (subclass of nn.Module) for self-attention.
class Head(nn.Module):

  # Apply liniar projections to all the nodes.
  def __init__(self, head_size):
    super().__init__()
    self.key = nn.Linear(n_embd, head_size, bias=False) # Mapp each element of the input tensor to a vector of dimension 'head_size', that represents the associated key.
    self.query = nn.Linear(n_embd, head_size, bias=False) # Mapp each element of the input tensor to a vector of dimension 'head_size', that represents the associated query.
    self.value = nn.Linear(n_embd, head_size, bias=False) # Mapp each element of the input tensor to a vector of dimension 'head_size', that represents the associated value.
    self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size))) # Create the lower triangle matrix, where we have 1s in the triangle, and 0s for the rest.
    self.dropout = nn.Dropout(dropout) # Use Dropout to prevent overfitting, by randomly setting outputs to zero.

  def forward(self, x):
    B, T, C = x.shape # Unpack the shape of the input tensor into variables B, T, C.
    k = self.key(x) # Format: (B, T, C). Generate a tensor that contains the keys associated with each element in 'x'.
    q = self.query(x) # Format: (B, T, C). Generate a tensor that contains the queries associated with each element in 'x'.

    # Calculate the attention scores.
    wei = q @ k.transpose(-2,-1) * C**-0.5 # (B, T, C) @ (B, C, T) => (B, T, T)
    wei = wei.masked_fill(self.tril[:T, :T] == 0, float('-inf')) # Format: (B, T, T). We make sure future doesn't communicate with the past.
    wei = F.softmax(wei, dim=-1) # Format: (B, T, T)
    wei = self.dropout(wei)

    # Aggregate the values.
    v = self.value(x) # Format: (B,T,C)
    out = wei @ v # (B, T, T) @ (B, T, C) => (B, T, C)
    return out

In [12]:
# MultiHead class (subclass of nn.Module) for multi head self-attention / multiple self-attention in parallel.
class MultiHeadAttention(nn.Module):
  def __init__(self, num_heads, head_size):
    super().__init__()
    self.heads = nn.ModuleList([Head(head_size) for _ in range(num_heads)]) # Create multiple heads.
    self.proj = nn.Linear(n_embd, n_embd) # Concatenate the multiple attention heads output into a single vector.
    self.dropout = nn.Dropout(dropout) # Use Dropout to prevent overfitting, by randomly setting outputs to zero.

  # Run in parallel, all the one-head self-attention, and concatenate the outputs.
  def forward(self, x):
    out = torch.cat([h(x) for h in self.heads], dim=-1)
    out = self.proj(out)
    out = self.dropout(out)

    return out

In [13]:
# Class for a feed forward neural network.
class FeedForward(nn.Module):
    def __init__(self, n_embd):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_embd, 4 * n_embd),
            nn.ReLU(),
            nn.Linear(4 * n_embd, n_embd),
            nn.Dropout(dropout),
        )

    def forward(self, x):
        return self.net(x)

In [14]:
# Replicate the transformers architecture, without the cross-attention portion.
class Block(nn.Module):
    def __init__(self, n_embd, n_head):
        super().__init__()
        head_size = n_embd // n_head
        self.sa = MultiHeadAttention(n_head, head_size) # Perform communication.
        self.ffwd = FeedForward(n_embd) # Perform computation.

        # Perform layer normalization.
        self.ln1 = nn.LayerNorm(n_embd)
        self.ln2 = nn.LayerNorm(n_embd)

    # Apply layer normalization before it goes to self-attention and feed forward.
    def forward(self, x):
        x = x + self.sa(self.ln1(x))
        x = x + self.ffwd(self.ln2(x))
        return x

In [15]:
# Bigram Language Model (subclass of nn.Module) is a neural network that predicts the probability of the next word, given the previous word.
class BigramLanguageModel(nn.Module):
  def __init__(self):
    super().__init__()
    self.token_embedding_table = nn.Embedding(vocab_size, n_embd) # Store embeddings of a fixed dictionary and size into a lookup table.
    self.position_embedding_table = nn.Embedding(block_size, n_embd) # Capture the information about the position of each token in the sequence.
    
    self.blocks = nn.Sequential(*[Block(n_embd, n_head = n_head) for _ in range(n_layer)])
    self.ln_f = nn.LayerNorm(n_embd)
    
    # Mapp the output of the embedding table to a probability distribution.
    # The linear transformation layer applies a matrix multiplication and bias addition, followed by softmax activation function.
    self.lm_head = nn.Linear(n_embd, vocab_size)

  # 'inputs' is a tensor of indices representing the previous words and 'targets' is a tensor of indices representing the next word.
  def forward(self, inputs, targets = None):
    B, T = inputs.shape # Unpack the shape of the input tensor into variables B, T.

    # Return the corresponding logits (unnormalized score) for each token in the vocabulary.
    tok_emb = self.token_embedding_table(inputs) # Format (B, T, C)
    pos_emb = self.position_embedding_table(torch.arange(T, device = device)) # Format: (T, C)
    x = tok_emb + pos_emb # Combine the token embeddings and the positional embeddings. (B, T, C)
    x = self.blocks(x) # Format: (B, T, C)
    logits = self.lm_head(x) # Format: (B, T, vocab_size)

    if targets is None:
      loss = None
    else:
      B, T, C = logits.shape # Unpack the logits into Batch, Time, Channels.
      logits = logits.view(B*T, C) # Reshape logits into a two-dimensional tensor.
      targets = targets.view(B*T) # Reshape targets into a one-dimenstional tensor.
      loss = F.cross_entropy(logits, targets) # Compute the loss, to understand how well we are predicting the next character.

    return logits, loss

  # Generate the model; the sequence of tokens.
  # Take "inputs" as (Batch, Time) and generate (Batch, Time + 1, +2, +3, ... 'max_new_tokens')
  def generate(self, inputs, max_new_tokens):
    for _ in range(max_new_tokens):
      inputs_cond = inputs[:, -block_size:] # Crop 'inputs' to the last block_size tokens.
      logits, loss = self(inputs_cond) # Call the forward method.
      logits = logits[:, -1, :] # Select the logits corresponding to the last token in the sequence. Format: (Batch, Channels).
      probs = F.softmax(logits, dim=-1) # Apply softmax to get the probabilities.
      inputs_next = torch.multinomial(probs, num_samples=1) # Sample the next token. Format: (Batch, 1)
      inputs = torch.cat((inputs, inputs_next), dim=1) # Append the next token to the sequence. Format: (Batch, Time + 1)

    return inputs

model = BigramLanguageModel() # Create the model.
m = model.to(device)

In [16]:
inputs = torch.zeros((1, 1), dtype = torch.long, device = device) # Create a 1 by 1 tensor, initiliazed to all zeroes.

# Generate and print 100 new tokens / characters.
# The new generated content will be junk, since the model is not yet trained.
print ('New content:\n', decode(m.generate(inputs, max_new_tokens=100)[0].tolist()))

New content:
 
şOiŞnfŢASBzevvTAcşBâHŞuveN!h.bFggHcu-îAhdlIinU:l,n'MşşDssaJCrl-
zJmzeŢ!FEcIPoş'aŞFţŞNRmţA!hzbF
RePSo


In [17]:
# Create a PyTorch optimizer object.
optimizer = torch.optim.AdamW(model.parameters(), lr = learning_rate)

In [18]:
# Train the model.
for iter in range(max_iters):
  # Evaluate and print the loss on training and validation datasets.
  if iter % eval_interval == 0 or iter == max_iters - 1:
    losses = estimate_loss()
    print(f"Step {iter}: Training loss is {losses['train']:.4f}, Validation loss is {losses['val']:.4f}")

  xb, xy = get_batch('train') # Sample a batch of data.

  # Evaluate the loss.
  logits, loss = model(xb, xy)
  optimizer.zero_grad(set_to_none = True) # Set all gradients to zero, from the previous step.
  loss.backward() # Get the gradients from all the parameters, using backpropagation / chain rule.
  optimizer.step() # Use the gradients to update the parameters; to optimize the model and minimize the loss.

Step 0: Training loss is 4.3961, Validation loss is 4.4125
Step 1000: Training loss is 2.2384, Validation loss is 2.3877
Step 2000: Training loss is 1.9960, Validation loss is 2.2646
Step 3000: Training loss is 1.7462, Validation loss is 2.1821
Step 4000: Training loss is 1.5055, Validation loss is 2.1461
Step 4999: Training loss is 1.2820, Validation loss is 2.1371


In [19]:
inputs = torch.zeros((1, 1), dtype = torch.long, device = device) # Create a 1 by 1 tensor, initiliazed to all zeroes.

# Generate and print 1000 new tokens / characters.
print ('New content:\n', decode(m.generate(inputs, max_new_tokens=1000)[0].tolist()))

New content:
 
Drumin chefrăteri
Tu spirătele ca mândrâng
Und fru-aşu-i min suringRămâng
Să îndrăcu fin mândrure.
Tel tunde-un ar zie copr crag

Mim ca braştea cizreşte -
U rumon marde şi scă şi olui men,
Ca to umbi a năcainori,
Şi din umplea fevăciiş

Cărei ste line acândrita-n l-n aung,
Cu- eacundoi upăr mă loan,
Ca tân'!eli ajore tă cupăsat,
Eu văi mă atu pl cecum nesple cu doc-n m-o ntul
Şi vezi cu cumi se re-
Şi atrecămbii nicu viaţa;


ânt vin', mrizineaţi, leci
Cătoi tu mbrastele doţi,
Şi didoin de-o calu-nil tir?
Stinţiitos, fintrăul boatesci
Ţe de moar iptie cunu cu coare,

Cătri Dar pas vit mă-n moar,
Băe-l a în aplără.

Ea cumNu alu priivis peri

Răstrecine a muritos, tă,
Căgerde zinele-i lece an

Şi iu pa ununzi-mi cese rus,
Iar şii treu, în ndroc brapă
Să uprând duri;

Şi n', pe-arăzil ec de-
Şi pene-ndrtre-oceafod vinţi

Rătând sine-n astă-şe,
Să în fapoc lumine;
Lucucecândar pe.

În copţile goate, sus de făr
Dar mar c-a'n s'fi-oscă-n dere;

Tu-os, un'!tos
De nu nece tagi