In [89]:
import torch
from torch import nn
from transformers import PreTrainedModel, PretrainedConfig

In [90]:
#### RoPE implementation (copied and simplified from HuggingFace). ####

def apply_rotary_pos_emb(q, k, rope_rotations, unsqueeze_dim=1):
    """Applies precomputed RoPE rotations to the query and key representations."""
    assert(q.shape == k.shape)
    assert(len(q.shape) == 4)
    cos, sin = rope_rotations
    assert(q.shape[2] == cos.shape[1])
    assert(q.shape[3] == cos.shape[2])    
    q_type, k_type = q.dtype, k.dtype
    cos = cos.unsqueeze(unsqueeze_dim)
    sin = sin.unsqueeze(unsqueeze_dim)
    q_embed = (q * cos) + (rotate_half(q) * sin)
    k_embed = (k * cos) + (rotate_half(k) * sin)
    return q_embed.to(q_type), k_embed.to(k_type)

def rotate_half(x):
    """Rotates half the hidden dims of the input."""
    x1 = x[..., : x.shape[-1] // 2]
    x2 = x[..., x.shape[-1] // 2 :]
    return torch.cat((-x2, x1), dim=-1)

class A2RotaryEmbedding(nn.Module):
    """RoPE position representation for use in Transformer attention."""

    def __init__(self, config, device=None):
        super().__init__()
        rope_theta = config.rope_theta
        head_dim = config.hidden_size // config.num_attention_heads
        partial_rotary_factor = 1.0
        dim = int(head_dim * partial_rotary_factor)
        self.inv_freq = 1.0 / (rope_theta ** (torch.arange(0, dim, 2, dtype=torch.int64).to(device=device, dtype=torch.float) / dim))

    @torch.no_grad()
    def forward(self, x):
        position_ids = torch.arange(0, x.shape[1], device=x.device).unsqueeze(0)
        inv_freq_expanded = self.inv_freq[None, :, None].float().expand(position_ids.shape[0], -1, 1).to(x.device)
        position_ids_expanded = position_ids[:, None, :].float()

        device_type = x.device.type if isinstance(x.device.type, str) and x.device.type != "mps" else "cpu"
        with torch.autocast(device_type=device_type, enabled=False):  # Force float32
            freqs = (inv_freq_expanded.float() @ position_ids_expanded.float()).transpose(1, 2)
            emb = torch.cat((freqs, freqs), dim=-1)
            cos = emb.cos()
            sin = emb.sin()
            return cos, sin

In [91]:
class A2ModelConfig(PretrainedConfig):
    """Configuration object that stores hyperparameters that define the Transformer language model."""
    def __init__(self, vocab_size=None, hidden_size=None, intermediate_size=None, num_attention_heads=None, 
                 num_hidden_layers=None,
                 rope_theta=None, hidden_act='silu', max_position_embeddings=None, rms_norm_eps=None, **kwargs):
        super().__init__(**kwargs)
        self.vocab_size = vocab_size
        self.hidden_size = hidden_size
        self.max_position_embeddings = max_position_embeddings
        self.rms_norm_eps = rms_norm_eps
        self.num_attention_heads = num_attention_heads
        self.rope_theta = rope_theta
        self.hidden_act = hidden_act
        self.intermediate_size = intermediate_size
        self.num_hidden_layers = num_hidden_layers

In [92]:
class A2MLP(nn.Module):
    """The MLP layer of the Transformer. Uses the SwiGLU architecture."""
    def __init__(self, config):
        super().__init__()
        assert(config.hidden_act == 'silu')
        self.linear1 = nn.Linear(config.hidden_size, config.intermediate_size, bias=False)
        self.linear2 = nn.Linear(config.hidden_size, config.intermediate_size, bias=False)
        self.linear3 = nn.Linear(config.intermediate_size, config.hidden_size, bias=False)
        self.act = nn.SiLU()

    def forward(self, hidden_states):
        x1 = self.act(self.linear1(hidden_states))
        x2 = self.linear2(hidden_states)

        return self.linear3(torch.mul(x1,x2))

In [93]:
# This is optional, since you can use PyTorch's RMSNorm.
class A2RMSNorm(nn.Module):
    """RMS layer normalization."""
    def __init__(self, config):
        super().__init__()
        # TODO: Use config.rms_norm_eps
        # TODO: initalize weights here

    def forward(self, hidden_states):
        ...

In [94]:
class A2Attention(nn.Module):
    """The multi-head attention layer of the Transformer. Uses standard scaled dot-product attention with causal masking."""
    
    def __init__(self, config):
        super().__init__()
        self.dims = config.hidden_size
        self.heads = config.num_attention_heads
        #self.text = config.vocab_size
        self.dims_head = self.dims // self.heads

        # self.dims_head * self.heads => self.dims
        self.Wq = nn.Linear(self.dims, self.dims_head * self.heads, bias=False)
        self.Wk = nn.Linear(self.dims, self.dims_head * self.heads, bias=False)
        self.Wv = nn.Linear(self.dims, self.dims_head * self.heads, bias=False)

        self.Wo = nn.Linear(self.dims_head * self.heads, self.dims)

        self.norm1 = nn.LayerNorm(self.dims)
        self.norm2 = nn.LayerNorm(self.dims)

    def forward(self, hidden_states, rope_rotations):

        batch = hidden_states.size(0)
        text = hidden_states.size(1)

        q = self.Wq(hidden_states)
        k = self.Wk(hidden_states)
        v = self.Wv(hidden_states)

        q = self.norm1(q)
        k = self.norm2(k)

        q = q.view(batch, text, self.heads, self.dims_head).transpose(1,2)
        k = k.view(batch, text, self.heads, self.dims_head).transpose(1,2)
        v = v.view(batch, text, self.heads, self.dims_head).transpose(1,2)

        q, k = apply_rotary_pos_emb(q, k, rope_rotations)

        y = nn.functional.scaled_dot_product_attention(q, k, v, is_causal=True)


        return self.Wo(y.transpose(1,2).reshape(batch, text, self.dims))

In [95]:
class A2DecoderLayer(nn.Module):
    """A complete Transformer decoder layer."""
    def __init__(self, config):
        super().__init__()
        self.MLP = A2MLP(config)
        self.MHA = A2Attention(config)
        self.a1 = nn.RMSNorm(config.hidden_size, eps=config.rms_norm_eps, elementwise_affine=True)
        self.a2 = nn.RMSNorm(config.hidden_size, eps=config.rms_norm_eps, elementwise_affine=True)

    def forward(self, hidden_states, rope_rotations):

        x = self.a1(self.MHA(hidden_states, rope_rotations))

        x1 = x + hidden_states

        x = self.a2(self.MLP(hidden_states))

        return x1 + x

In [121]:
class A2Transformer(PreTrainedModel):
    """A language model based on the Transformer architecture."""
    
    config_class = A2ModelConfig

    def __init__(self, config):
        super().__init__(config)
        self.dims = config.hidden_size
        self.rotary_emb = A2RotaryEmbedding(config)
        self.embeddings = torch.nn.Embedding(num_embeddings=config.vocab_size,
                                            embedding_dim=config.embedding_dims)
        
        self.transformers = nn.ModuleList()
        for _ in range(config.num_hidden_layers):
            self.transformers.append(A2DecoderLayer(config))

        self.rms = nn.RMSNorm(self.dims)

        self.linear = nn.Linear(self.dims, config.vocab_size, bias=False)

        self.sm = nn.Softmax(dim=-1)

        # This line should be called after you have set up all components.
        self.post_init()


    def forward(self, input_ids):
        rope_rotations = self.rotary_emb(input_ids) # pass this to all the transformer decoder layers

        x = self.embeddings(input_ids)

        for decoder in self.transformers:
            x = decoder(x, rope_rotations)


        return self.linear(self.rms(x))
    

In [97]:
import torch, nltk, pickle
from torch import nn
from collections import Counter
from transformers import BatchEncoding, PretrainedConfig, PreTrainedModel

from torch.utils.data import DataLoader
import numpy as np
import sys, time, os

###
### Part 1. Tokenization.
###
def lowercase_tokenizer(text):
    return [t.lower() for t in nltk.word_tokenize(text)]

def build_tokenizer(train_file, tokenize_fun=lowercase_tokenizer, max_voc_size=None, model_max_length=None,
                    pad_token='<PAD>', unk_token='<UNK>', bos_token='<BOS>', eos_token='<EOS>'):
    """ Build a tokenizer from the given file.

        Args:
             train_file:        The name of the file containing the training texts.
             tokenize_fun:      The function that maps a text to a list of string tokens.
             max_voc_size:      The maximally allowed size of the vocabulary.
             model_max_length:  Truncate texts longer than this length.
             pad_token:         The dummy string corresponding to padding.
             unk_token:         The dummy string corresponding to out-of-vocabulary tokens.
             bos_token:         The dummy string corresponding to the beginning of the text.
             eos_token:         The dummy string corresponding to the end the text.
    """

    # TODO: build the vocabulary, possibly truncating it to max_voc_size if that is specified.
    # Then return a tokenizer object (implemented below).
    str_to_int = {pad_token: 0, unk_token: 1, bos_token: 2, eos_token:3}
    int_to_str = {0: pad_token, 1: unk_token, 2: bos_token, 3: eos_token}
    voc_len = 4 # current length
    word_counter = Counter()

    with open(train_file, "r", encoding="utf-8") as file:
        for paragraph in file:
            tokens = tokenize_fun(paragraph)
            if model_max_length:
                tokens = tokens[:model_max_length]

            word_counter.update(tokens) #just inserts and counts the words in the list

    most_common_tokens = []
    if max_voc_size:
        max_other_tokens = max_voc_size - len(str_to_int)
        most_common_tokens = [token for token, count in word_counter.most_common(max_other_tokens)]
    else:
        most_common_tokens = list(word_counter.keys())

    for token in most_common_tokens:
        str_to_int[token] = voc_len
        int_to_str[voc_len] = token
        voc_len += 1

    print("#\n5 most common words: ", word_counter.most_common(5))
    print("5 least common words: ", word_counter.most_common()[-5:])
    print("Dict of 'the' should inversly map back to 'the': ", int_to_str[str_to_int["the"]]) 
    print("Dict of 'person' should inversly map back to 'person': ", int_to_str[str_to_int["person"]]) 
    print(f"Size of vocabulary is {len(str_to_int)} and specified max is {max_voc_size}\n#")

    return A1Tokenizer(str_to_int, int_to_str, {'pad_token': pad_token,
                                                'unk_token': unk_token, 
                                                'bos_token': bos_token, 
                                                'eos_token': eos_token}, model_max_length)


class A1Tokenizer:
    """A minimal implementation of a tokenizer similar to tokenizers in the HuggingFace library."""

    def __init__(self, str_to_int, int_to_str, special_tokens, model_max_length):
        # TODO: store all values you need in order to implement __call__ below.
        self.pad_token_id = str_to_int[special_tokens['pad_token']]   # Compulsory attribute.
        self.model_max_length = model_max_length # Needed for truncation.
        self.str_to_int = str_to_int
        self.int_to_str = int_to_str 
        self.special_tokens = special_tokens

    def __call__(self, texts, truncation=False, padding=False, return_tensors="pt"):
        """Tokenize the given texts and return a BatchEncoding containing the integer-encoded tokens.
           
           Args:
             texts:           The texts to tokenize.
             truncation:      Whether the texts should be truncated to model_max_length.
             padding:         Whether the tokenized texts should be padded on the right side.
             return_tensors:  If None, then return lists; if 'pt', then return PyTorch tensors.

           Returns:
             A BatchEncoding where the field `input_ids` stores the integer-encoded texts.
        """
        if return_tensors and return_tensors != 'pt':
            raise ValueError('Should be pt')
        
        # TODO: Your work here is to split the texts into words and map them to integer values.
        # 
        # - If `truncation` is set to True, the length of the encoded sequences should be 
        #   at most self.model_max_length.
        # - Encoded sequences should start with the beginning-of-sequence dummy; non-truncated
        #   sequences should end with the end-of-sequence dummy; out-of-vocabulary tokens should
        #   be encoded with the 'unknown' dummy.
        # - If `padding` is set to True, then all the integer-encoded sequences should be of the
        #   same length. That is: the shorter sequences should be "padded" by adding dummy padding
        #   tokens on the right side.
        # - If `return_tensors` is undefined, then the returned `input_ids` should be a list of lists.
        #   Otherwise, if `return_tensors` is 'pt', then `input_ids` should be a PyTorch 2D tensor.

        max_seq = 0
        encodings = []

        for text in texts:
            tokens = lowercase_tokenizer(text)
            one_sequence = [self.str_to_int[self.special_tokens['bos_token']]]

            for token in tokens:
                token_id = self.str_to_int.get(token, self.str_to_int[self.special_tokens['unk_token']])
                one_sequence.append(token_id)

            one_sequence.append(self.str_to_int[self.special_tokens['eos_token']])

            if truncation and self.model_max_length:
                one_sequence = one_sequence[:self.model_max_length]

            max_seq = max(max_seq, len(one_sequence)) #might not be used
            encodings.append(one_sequence)

        if padding:
            pad_id = self.str_to_int[self.special_tokens['pad_token']]
            encodings = [seq + [pad_id] * (max_seq - len(seq)) for seq in encodings]
            

        # TODO: Return a BatchEncoding where input_ids stores the result of the integer encoding.
        # Optionally, if you want to be 100% HuggingFace-compatible, you should also include an 
        # attention mask of the same shape as input_ids. In this mask, padding tokens correspond
        # to the the value 0 and real tokens to the value 1.
        masks = [[1 if t != self.str_to_int[self.special_tokens['pad_token']] else 0 for t in seq] for seq in encodings]

        if return_tensors == 'pt':
            return BatchEncoding({'input_ids': torch.tensor(encodings), 
                             'attention_mask': torch.tensor(masks)})
    
        return BatchEncoding({'input_ids': encodings, 
                             'attention_mask': masks})
        
    def __len__(self):
        """Return the size of the vocabulary."""
        return len(self.str_to_int)
    
    def save(self, filename):
        """Save the tokenizer to the given file."""
        with open(filename, 'wb') as f:
            pickle.dump(self, f)

    @staticmethod
    def from_file(filename):
        """Load a tokenizer from the given file."""
        with open(filename, 'rb') as f:
            return pickle.load(f)
   


In [104]:

hs = 32

config = A2ModelConfig(
    vocab_size=150000,
    hidden_size=hs,
    max_position_embeddings=100000,
    rms_norm_eps=0.001,
    num_attention_heads=2,
    rope_theta=2,
    hidden_act='silu',
    intermediate_size=64,
    num_hidden_layers=2,
    embedding_dims=hs
)

In [None]:

model = A2Transformer(config)
tokenizer = build_tokenizer("train.txt", lowercase_tokenizer, config.vocab_size, 15000)

#
5 most common words:  [('the', 772302), (',', 635406), ('.', 445685), ('of', 393855), ('and', 307140)]
5 least common words:  [('2-3.1.4.0-21-2.1.4.0-2', 1), ("'barking", 1), ('breeding/nursing', 1), ('stealers', 1), ('postweaning', 1)]
Dict of 'the' should inversly map back to 'the':  the
Dict of 'person' should inversly map back to 'person':  person
Size of vocabulary is 150000 and specified max is 150000
#


In [124]:
class TrainingArguments:
    def __init__(self, lr, epochs, batch_size):
        self.optim = 'adamw_torch'
        self.eval_strategy = 'epoch'
        self.use_cpu = False
        self.no_cuda = False
        self.learning_rate = lr
        self.num_train_epochs = epochs
        self.per_device_train_batch_size = batch_size
        self.per_device_eval_batch_size = batch_size
        self.output_dir = "."

class A1Trainer:
    """A minimal implementation similar to a Trainer from the HuggingFace library."""

    def __init__(self, model, args, train_dataset, eval_dataset, tokenizer):
        """Set up the trainer.
           
           Args:
             model:          The model to train.
             args:           The training parameters stored in a TrainingArguments object.
             train_dataset:  The dataset containing the training documents.
             eval_dataset:   The dataset containing the validation documents.
             eval_dataset:   The dataset containing the validation documents.
             tokenizer:      The tokenizer.
        """
        self.model = model
        self.args = args
        self.train_dataset = train_dataset
        self.eval_dataset = eval_dataset
        self.tokenizer = tokenizer

        assert(args.optim == 'adamw_torch')
        assert(args.eval_strategy == 'epoch')

    def select_device(self):
        """Return the device to use for training, depending on the training arguments and the available backends."""
        if self.args.use_cpu:
            return torch.device('cpu')
        if not self.args.no_cuda and torch.cuda.is_available():
            return torch.device('cuda')
        if torch.mps.is_available():
            return torch.device('mps')
        return torch.device('cpu')
            
    def train(self):
        """Train the model."""
        args = self.args

        device = self.select_device()
        print('Device:', device)
        self.model.to(device)
        
        loss_func = torch.nn.CrossEntropyLoss(ignore_index=self.tokenizer.pad_token_id)

        # TODO: Relevant arguments: at least args.learning_rate, but you can optionally also consider
        # other Adam-related hyperparameters here.
        optimizer = torch.optim.AdamW(self.model.parameters(), lr=args.learning_rate)

        # TODO: Relevant arguments: args.per_device_train_batch_size, args.per_device_eval_batch_size
        train_loader = DataLoader(self.train_dataset, 
                                  batch_size=args.per_device_train_batch_size,
                                  shuffle=True)
        val_loader = DataLoader(self.eval_dataset, 
                                batch_size=args.per_device_eval_batch_size,
                                shuffle=True)
        
        # TODO: Your work here is to implement the training loop.
        self.model.train()
        for epoch in range(args.num_train_epochs):
            step = 0
            for batch in train_loader:
                #       PREPROCESSING AND FORWARD PASS:
                encodings = self.tokenizer(batch['text'], return_tensors='pt', padding=True, truncation=True)
                input_ids = encodings['input_ids']

                X = input_ids[:, :-1]
                Y = input_ids[:, 1:]

                #       put X and Y onto the GPU (or whatever device you use)
                X = X.to(device)
                Y = Y.to(device)

                #       apply the model to X
                logit_results = self.model(X)

                #       compute the loss for the model output and Y
                loss = loss_func(logit_results.reshape(-1, logit_results.size(-1)), Y.reshape(-1))

                if step % 1500 == 0:
                    print(f"At epoch {epoch}, batch {step}, loss = {loss.item():.3f}", flush=True)
                step +=1

                #       BACKWARD PASS AND MODEL UPDATE:
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

            self.compute_perplexity(val_loader, loss_func, device) # computes per epoch now

        print(f'\n#Saving to {args.output_dir}.\n#')
        self.model.save_pretrained("trained_model")
    

    def compute_perplexity(self, val_loader, loss_func, device):
        self.model.eval()
        with torch.no_grad():
                total_loss = 0.0
                total_tokens = 0
                pad_id = self.tokenizer.pad_token_id
                for batch in val_loader:
                    enc = self.tokenizer(batch['text'], return_tensors='pt', padding=True, truncation=True)
                    input_ids = enc['input_ids'].to(device)
                    attn = enc['attention_mask'].to(device)

                    X = input_ids[:, :-1]
                    Y = input_ids[:, 1:]
                    valid = attn[:, 1:]  # mask for Y to exclude padding tokens

                    logits = self.model(X)
                    loss = loss_func(logits.reshape(-1, logits.size(-1)), Y.reshape(-1))

                    num_valid = valid.sum().item() #since 1 is non-padding token, summing here gives us all non-padding tokens
                    total_loss += loss.item() * num_valid
                    total_tokens += num_valid

                perplexity = float(np.exp(total_loss / total_tokens))
                print(f"#\nPerplexity for the epoch is: {perplexity:.3f}\n#")
                self.model.train()

In [None]:
def generate(model, prompt, max_length=None, temperature=None, topk=None):

    for _ in range(max_length):
        logits = model(tokenizer(prompt, padding=True).input_ids)

        ntl_logits = torch.exp(logits[0, -1, :] * (1 - temperature))
    
        
        if topk:
            (ntl_logits, ntl_token) = torch.topk(ntl_logits, topk)
            category = torch.distributions.Categorical(logits=ntl_logits)
            prompt[0] = prompt[0] + " " + tokenizer.int_to_str[ntl_token[category.sample().item()].item()]

        else:
            category = torch.distributions.Categorical(logits=ntl_logits)
            prompt[0] = prompt[0] + " " + tokenizer.int_to_str[category.sample().item()]

    
    return prompt[0]
    



In [320]:
args = TrainingArguments(0.01, 10, 16)

trainer = A1Trainer(model, args, "train.txt", "val.txt", tokenizer)



In [328]:
texts = ["he lives in san"]

print(generate(model, texts, max_length=10, temperature=0, topk=50))

he lives in san dalmatian gl√¶saria dalmatian kaye-smith pinot lenn northwestwards coppola buzzy sainte-marie
