# Explicacion

## run.py

In [None]:
import argparse

from src import chat, preprocess, train


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("mode", choices=["preprocess", "train", "chat"], help="The mode to be execute.")
    parser.add_argument("--update", action="store_true", help="Flag when model shall be updated based on current parameters")
    args = parser.parse_args()

    if args.mode == "preprocess":
        preprocess.make_train_test()
    elif args.mode == "train":
        train.model_training(args.update)
    elif args.mode == "chat":
        chat.conversation()

if __name__ == "__main__":
    main()


## config.py

In [None]:
# model hyperparameters
block_size = 32
embed_size = 256
dropout = 0.2
n_heads = 6
n_layer = 6
eval_iters = 200
batch_size = 32

# learning hyperparameters
learn_rate = 3e-4
max_iters = 5000
eval_interval = 500

# preprocess
min_count_chars = 1
min_count_tokens = 1

# encoding
end_token = "<END>"
unknown_token = "<UNK>"
n_chats = 5


## chat.py

In [None]:
import json
import random

import torch
from prompt_toolkit import prompt
from prompt_toolkit.completion import WordCompleter

from config import end_token, n_chats
from src.utils import custom_tokenizer, decode, encode, print_delayed


def conversation() -> None:
    """
    Emulates chat conversations by sampling from a pre-trained GPTLanguageModel.

    This function loads a trained GPTLanguageModel along with vocabulary and 
    the list of special tokens. It then enters into a loop where the user specifies 
    a contact. Given this input, the model generates a sample response. The conversation 
    continues until the user inputs the end token.

    :example:

    >>> conversation()
    message >> Alice
    Model's Response: How are you?
    response >> end
    """
    with open("assets/output/vocab.txt", "r", encoding="utf-8") as f:
        vocab = json.loads(f.read())

    with open("assets/output/contacts.txt", "r", encoding="utf-8") as f:
        contacts = json.loads(f.read())   

    spec_tokens = contacts + [end_token]
    model = torch.load("assets/models/model.pt")
    completer = WordCompleter(spec_tokens, ignore_case=True)
    
    input = prompt("message >> ", completer=completer, default="")
    output = torch.tensor([], dtype=torch.long)
    print()

    while input != end_token:
        for _ in range(n_chats):

            add_tokens = custom_tokenizer(input, spec_tokens)
            add_context = encode(add_tokens, vocab)
            context = torch.cat((output, add_context)).unsqueeze(1).T

            n0 = len(output)
            output = model.generate(context, vocab)
            n1 = len(output)

            print_delayed(decode(output[n0-n1:], vocab))
            input = random.choice(contacts)

        input = prompt("\nresponse >> ", completer=completer, default="")
        print()
        

## model.py

In [None]:
import math

import torch
import torch.nn as nn
from torch.nn import functional as F

from config import (block_size, dropout, embed_size, end_token, n_heads,
                    n_layer, unknown_token)
from src.utils import encode


class Head(nn.Module):
    """
    This module performs self-attention operations on the input tensor, producing 
    an output tensor with the same time-steps but different channels. 
    
    :param head_size: The size of the head in the multi-head attention mechanism.
    """
    def __init__(self, head_size):
        super().__init__()
        self.key = nn.Linear(embed_size, head_size, bias=False)
        self.query = nn.Linear(embed_size, head_size, bias=False)
        self.value = nn.Linear(embed_size, head_size, bias=False)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        """
        # input of size (batch, time-step, channels)
        # output of size (batch, time-step, head size)
        """
        B,T,C = x.shape
        k = self.key(x)                                     # (B, T, head_size)
        q = self.query(x)                                   # (B, T, head_size)

        # compute attention scores
        wei = q @ k.transpose(-2,-1)                        # (B, T, head_size) @ (B, head_size, T) -> (B, T, T)
        wei /= math.sqrt(k.shape[-1])                       # (B, T, T)
        
        # avoid look-ahead
        tril = torch.tril(torch.ones(T, T))
        wei = wei.masked_fill(tril == 0, float("-inf"))     # (B, T, T)
        wei = F.softmax(wei, dim=-1)                        # (B, T, T)
        wei = self.dropout(wei)
        
        # weighted aggregation of the values
        v = self.value(x)                                   # (B, T, head_size)
        out = wei @ v                                       # (B, T, T) @ (B, T, hs) -> (B, T, head_size)
        return out


class MultiHeadAttention(nn.Module):
    """
    This class contains multiple `Head` objects, which perform self-attention 
    operations in parallel.
    """
    def __init__(self):
        super().__init__()

        # list of parallel heads that are concatenated by the linear layer in the end
        head_size = embed_size // n_heads
        heads_list = [Head(head_size) for _ in range(n_heads)]
        
        self.heads = nn.ModuleList(heads_list)
        self.linear = nn.Linear(n_heads * head_size, embed_size)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        heads_list = [h(x) for h in self.heads]
        out = torch.cat(heads_list, dim=-1)
        out = self.linear(out)
        out = self.dropout(out)
        return out


class FeedFoward(nn.Module):
    """
    This module passes the input tensor through a series of linear transformations 
    and non-linear activations.
    """
    def __init__(self):
        super().__init__()
        # factor of 4 is the multiplier of nodes
        self.net = nn.Sequential(
            nn.Linear(embed_size, 4 * embed_size), 
            nn.ReLU(),
            nn.Linear(4 * embed_size, embed_size),
            nn.Dropout(dropout),
        )

    def forward(self, x):
        return self.net(x)


class Block(nn.Module):
    """
    This module contains a single transformer block, which consists of multi-head 
    self-attention followed by feed-forward neural networks.
    """
    def __init__(self):
        super().__init__()

        self.sa = MultiHeadAttention()
        self.ffwd = FeedFoward()
        self.ln1 = nn.LayerNorm(embed_size)
        self.ln2 = nn.LayerNorm(embed_size)

    def forward(self, x):
        x = x + self.sa(self.ln1(x))
        x = x + self.ffwd(self.ln2(x))
        return x


class GPTLanguageModel(nn.Module):
    """
    This class encompasses the entire GPT model, including the token and position embeddings, 
    multiple transformer blocks, and output layer.
    """
    def __init__(self, vocab_size: int):
        super().__init__()

        # embedding tables for token and their positioning in the context
        self.token_embedding = nn.Embedding(vocab_size, embed_size)
        self.pos_embedding = nn.Embedding(block_size, embed_size)
        
        # put one block after the other sequentially (not parallel like multi-head attention)
        block_list = [Block() for _ in range(n_layer)]
        self.blocks = nn.Sequential(*block_list)
        
        # output layer after sequential blocks
        self.ln_output = nn.LayerNorm(embed_size)
        self.linear_output = nn.Linear(embed_size, vocab_size)

        # initialize weights and biases for linear layers and embeddings
        self.apply(self.init_weights)

    def init_weights(self, module):
        if isinstance(module, nn.Linear):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
            
            # The linear layers in self-attention do not have a biases
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)

        elif isinstance(module, nn.Embedding):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)

    def forward(self, idx, targets=None):
        B, T = idx.shape

        tok_emb = self.token_embedding(idx)                     # (B, T, C)
        pos_emb = self.pos_embedding(torch.arange(T))           # (T, C)
        x = tok_emb + pos_emb                                   # (B, T, C)
        x = self.blocks(x)                                      # (B, T, C)
        x = self.ln_output(x)                                   # (B, T, C)
        logits = self.linear_output(x)                          # (B, T, vocab_size)

        if targets is None:
            loss = None
        else:
            B, T, C = logits.shape
            logits = logits.view(B*T, C)
            targets = targets.view(B*T)
            loss = F.cross_entropy(logits, targets)

        return logits, loss

    def generate(self, idx, vocab):
        
        # Initialize idx_net for while loop
        idx_next = torch.zeros(1)
        idx_end = encode([end_token], vocab)
        idx_unk = encode([unknown_token], vocab)

        # continue to sample tokens until special end token
        while idx_next[0] != idx_end:

            # idx is (B, T) array of indices in the current context
            # crop idx to the last block_size tokens for each batch (row)
            idx_cond = idx[:, -block_size:]                     # (B, T)

            # get the predictions
            logits, _ = self(idx_cond)                          # (B, T, vocab_size)
            logits = logits[:, -1, :]                           # (B, vocab_size)            
            probs = F.softmax(logits, dim=-1)                   # (B, vocab_size)

            # sample from the distribution
            idx_next = torch.multinomial(probs, num_samples=1)  # (B, 1)

            # when the sampled token is UNK, then sample again
            while idx_next[0] == idx_unk:
                idx_next = torch.multinomial(probs, num_samples=1)  # (B, 1)
                
            # append sampled index to the running sequence
            idx = torch.cat((idx, idx_next), dim=1)             # (B, T+1)

        # output everything except the end token
        return idx[0][:-1]


## preprocess.py

In [None]:
import json
import re
from collections import Counter
from typing import List, Set, Tuple, Union

import torch

from config import end_token, min_count_chars, min_count_tokens, unknown_token
from src.utils import custom_tokenizer, encode, get_vocab


def get_infrequent_tokens(tokens: Union[List[str], str], min_count: int) -> List[str]:
    """
    Identify tokens that appear less than a minimum count.
    
    :param tokens: When it is the raw text in a string, frequencies are counted on character level.
                   When it is the tokenized corpus as list, frequencies are counted on token level.
    :min_count: Threshold of occurence to flag a token.
    :return: List of tokens that appear infrequently. 
    """
    counts = Counter(tokens)
    infreq_tokens = set([k for k,v in counts.items() if v<=min_count])
    return infreq_tokens


def mask_tokens(tokens: List[str], mask: Set[str]) -> List[str]:
    """
    Iterate through all tokens. Any token that is part of the set, is replaced by the unknown token.

    :param tokens: The tokenized corpus.
    :param mask: Set of tokens that shall be masked in the corpus.
    :return: List of tokenized corpus after the masking operation.
    """
    return [t.replace(t, unknown_token) if t in mask else t for t in tokens]


def drop_chars(txt: str, drop: Set[str]) -> str:
    """Drop a list of characters from string"""

    return txt.translate(str.maketrans("", "", "".join(drop)))


def flatten_tuple(txt: List[Tuple[str, str]]) -> str:
    """Convert list of tuples into string separated by the end token"""

    return "".join([x0+":"+x1+end_token for x0, x1 in txt])


def make_train_test() -> None:
    """
    Prepare training and testing datasets from chat messages. This function performs multiple tasks:
    
    1. Reads a corpus of WhatsApp chat messages from a text file
    2. Filters out infrequent characters from the corpus
    3. Splits the text based on regular expressions
    4. Tokenizes the text and encodes the tokens into integers
    5. Splits the encoded data into training and validation sets
    6. Saves the training and validation datasets, as well as the vocab and senders, to disk
    """
    with open("assets/input/chat.txt", "r") as f:
        text = f.read()

    # remove very rare characters (mostly emojies)
    infreq_chars = get_infrequent_tokens(text, min_count=min_count_chars)
    text = drop_chars(text, infreq_chars)

    # split string into list of tuples (date, contact, message)
    pattern = r'\[(.*?)\] (.*?): (.*)'
    matches = re.findall(pattern, text)
    text = [(x1, x2.lower()) for x0, x1, x2 in matches if not x2.startswith("\u200e")]

    # get list of all contacts, treated as special tokens
    contacts = list(set([contact+":" for contact, msg in text]))
    spec_tokens = contacts + [end_token]

    # convert list of tuples into list of tokens (word or character level)
    text_flat = flatten_tuple(text)
    tokens = custom_tokenizer(txt=text_flat, spec_tokens=spec_tokens)

    # mask very rare tokens as unknown, to shrink the vocabulary
    infreq_tokens = get_infrequent_tokens(tokens, min_count=min_count_tokens)
    tokens = mask_tokens(tokens, infreq_tokens)

    # get vocabulary of corpus to file
    vocab = get_vocab(tokens)
    print(f"The corpus has {len(vocab)} unique tokens.")

    # encode tokens into a tensor of integers
    data = encode(tokens, vocab)

    # split up the data into train and validation set
    n = int(0.9*len(data))
    train_data = data[:n]
    valid_data = data[n:]

    # export tensors
    torch.save(train_data, "assets/output/train.pt")
    torch.save(valid_data, "assets/output/valid.pt")

    with open("assets/output/vocab.txt", "w", encoding="utf-8") as f:
        f.write(json.dumps(vocab))

    with open("assets/output/contacts.txt", "w", encoding="utf-8") as f:
        f.write(json.dumps(contacts))

    print("SUCCESS")


## train.py

In [None]:
import json

import torch

from config import eval_interval, learn_rate, max_iters
from src.model import GPTLanguageModel
from src.utils import current_time, estimate_loss, get_batch


def model_training(update: bool) -> None:
    """
    Trains or updates a GPTLanguageModel using pre-loaded data.

    This function either initializes a new model or loads an existing model based
    on the `update` parameter. It then trains the model using the AdamW optimizer
    on the training and validation data sets. Finally the trained model is saved.

    :param update: Boolean flag to indicate whether to update an existing model.
    """
    # LOAD DATA -----------------------------------------------------------------

    train_data = torch.load("assets/output/train.pt")
    valid_data = torch.load("assets/output/valid.pt")

    with open("assets/output/vocab.txt", "r", encoding="utf-8") as f:
        vocab = json.loads(f.read())

    # INITIALIZE / LOAD MODEL ---------------------------------------------------

    if update:
        try:
            model = torch.load("assets/models/model.pt")
            print("Loaded existing model to continue training.")
        except FileNotFoundError:
            print("No existing model found. Initializing a new model.")
            model = GPTLanguageModel(vocab_size=len(vocab))
        
    else:
        print("Initializing a new model.")
        model = GPTLanguageModel(vocab_size=len(vocab))

    # initialize optimizer
    optimizer = torch.optim.AdamW(model.parameters(), lr=learn_rate)

    # number of model parameters
    n_params = sum(p.numel() for p in model.parameters())
    print(f"Parameters to be optimized: {n_params}\n", )

    # MODEL TRAINING ------------------------------------------------------------

    for i in range(max_iters):

        # evaluate the loss on train and valid sets every 'eval_interval' steps
        if i % eval_interval == 0 or i == max_iters - 1:
            train_loss = estimate_loss(model, train_data)
            valid_loss = estimate_loss(model, valid_data)

            time = current_time()
            print(f"{time} | step {i}: train loss {train_loss:.4f}, valid loss {valid_loss:.4f}")

        # sample batch of data
        x_batch, y_batch = get_batch(train_data)

        # evaluate the loss
        logits, loss = model(x_batch, y_batch)
        optimizer.zero_grad(set_to_none=True)
        loss.backward()
        optimizer.step()

    torch.save(model, "assets/models/model.pt")
    print("Model saved")


## utils.py

In [None]:
import random
import time
from datetime import datetime
from typing import List, Union

import torch
from nltk.tokenize import RegexpTokenizer

from config import batch_size, block_size, eval_iters, unknown_token


@torch.no_grad()
def estimate_loss(model, data):
    """
    Set evaluation mode and evaluate the loss on multiple batches. 
    Return the average of collected losses.
    """
    model.eval() 
    loss_list = torch.zeros(eval_iters)
    
    for i in range(eval_iters):
        X, Y = get_batch(data)
        logits, loss = model(X, Y)
        loss_list[i] = loss.item()

    loss_avg = loss_list.mean()    
    model.train() 
    return loss_avg


def get_batch(data):
    """Generate a small batch of data of inputs x and targets y"""

    ix = torch.randint(len(data) - block_size, (batch_size,))
    x = torch.stack([data[i:i+block_size] for i in ix])
    y = torch.stack([data[i+1:i+block_size+1] for i in ix])
    return x, y


def encode(s: list, vocab: list) -> torch.tensor:
    """
    Encode a list of tokens into a tensor of integers, given a fixed vocabulary. 
    When a token is not found in the vocabulary, the special unknown token is assigned. 
    When the training set did not use that special token, a random token is assigned.
    """
    rand_token = random.randint(0, len(vocab))

    map = {s:i for i,s in enumerate(vocab)}
    enc = [map.get(c, map.get(unknown_token, rand_token)) for c in s]
    enc = torch.tensor(enc, dtype=torch.long)
    return enc


def decode(tensor: torch.tensor, vocab: list) -> str:
    """Decode a tensor of integers, back into a string."""

    map_enc = {s:i for i,s in enumerate(vocab)}
    map_dec = {i:s for s,i in map_enc.items()}
    dec = [map_dec[i.item()] for i in tensor]
    dec = " ".join(dec)
    return dec


def custom_tokenizer(txt: str, spec_tokens: List[str], pattern: str="|\d|\\w+|[^\\s]") -> List[str]:
    """
    Tokenize text into words or characters using NLTK's RegexpTokenizer, considerung 
    given special combinations as single tokens.

    :param txt: The corpus as a single string element.
    :param spec_tokens: A list of special tokens (e.g. ending, out-of-vocab).
    :param pattern: By default the corpus is tokenized on a word level (split by spaces).
                    Numbers are considered single tokens.

    >> note: The pattern for character level tokenization is '|.'
    """
    pattern = "|".join(spec_tokens) + pattern
    tokenizer = RegexpTokenizer(pattern)
    tokens = tokenizer.tokenize(txt)
    return tokens


def get_vocab(text: Union[List[str], str]) -> List[str]:
    """Returns a sorted list of all unique tokens in the corpus."""

    return sorted(list(set(text)))


def current_time():
    return datetime.now().strftime("%H:%M:%S")


def print_delayed(s: str, delay: float = 0.05) -> None:
    """
    Prints each character of a string one by one on the same line with a delay.

    :param s: The input string.
    :param delay: The time delay between each character in seconds.
    """
    for char in s:
        print(char, end="", flush=True)
        time.sleep(delay)

    print()
