In [786]:
import numpy as np 
import time
import pickle
import random
import math
import mmap
import os
from tqdm import tqdm 

In [787]:
#Parameters

n_head = 1 #the number of attention heads
n_layer = 2 # the number of decoders
input_dim = 150 # Aka n_embed
block_size = 4
batch_size = 8
max_sequence_length = 150
lr = 5e-5
epochs = 1000
evals = 100

In [788]:
class Tokenizer:#fine
    def __init__(self, vocab_size=10000):
        self.vocab_size = vocab_size
        self.word_to_idx = {}
        self.idx_to_word = {}
        self.pad_token = '<pad>'
        self.unk_token = '<unk>'
        self.add_special_tokens()

    def add_special_tokens(self):
        self.word_to_idx[self.pad_token] = 0
        self.word_to_idx[self.unk_token] = 1
        self.idx_to_word[0] = self.pad_token
        self.idx_to_word[1] = self.unk_token

    def __call__(self, text):
        return self.fit_on_texts(text)

    def fit_on_texts(self, texts):
        if isinstance(texts, str):
            texts = [texts]  # Convert single string to a list of strings
        # Extract all unique words from the texts
        all_words = set()
        
        for text in texts:
            all_words.update(text.split())

        # Sort the words by frequency and select the top vocab_size - 2 words
        word_counts = {word: 0 for word in all_words}
        for text in texts:
            for word in text.split():
                word_counts[word] += 1
        sorted_words = sorted(word_counts.items(), key=lambda x: x[1], reverse=True)
        top_words = [word for word, _ in sorted_words[:self.vocab_size - 2]]

        # Assign indices to the words
        self.word_to_idx.update({word: i + 2 for i, word in enumerate(top_words)})
        self.idx_to_word.update({i + 2: word for i, word in enumerate(top_words)})



    def encode_with_lengths(self, text):
        words = text.split()
        encoded = [self.word_to_idx.get(word, 1) for word in words]
        sentence_lengths = [len(sentence.split()) for sentence in text.split('.')]
        return encoded, sentence_lengths

    def decode(self, encoded, sentence_lengths):
        words = []
        start = 0
        for i, length in enumerate(sentence_lengths):
            sentence_tokens = encoded[start:start+length]
            sentence_words = [self.idx_to_word.get(idx, self.unk_token) for idx in sentence_tokens]
            words.extend(sentence_words)
            if i < len(sentence_lengths) - 1:
                words.append('.')
            start += length
        return ' '.join(words)

In [789]:
class PositionalEncoding:
    def __init__(self, input_dim, max_sequence_length=max_sequence_length):
        self.input_dim = input_dim
        self.max_sequence_length = max_sequence_length
        self.PE = None
        
    def __call__(self, index):
        return self.forward(index)
        
    def forward(self, index):
        even_i = np.arange(0, self.input_dim, 2)
        denominator = np.array([math.pow(10000, i/self.input_dim) for i in even_i])
        position = np.arange(self.max_sequence_length).reshape(self.max_sequence_length, 1)
        even_PE = np.sin(position/denominator)
        odd_PE = np.cos(position/denominator)
        stacked = np.stack([even_PE, odd_PE], axis=2)
        self.PE = np.reshape(stacked, (self.max_sequence_length, self.input_dim))
        return self.PE[index]


In [790]:
def parse(params, name):
    for param_name, param_value in params.items():
        if param_name == name:
            return param_value
        elif isinstance(param_value, dict):
            return parse(param_value, name)
        


def AdamOptim(model, lr=lr, beta_1 = 0.9, beta_2 = 0.999, epsilon = 1e-6):
    t = 0
    m_params = {}
    v_params = {}
    
    for name, param in model.parameters().items():
          
        m_params[name] = np.ones_like(param)
        v_params[name] = np.ones_like(param)


    t += 1
    for name, param in model.parameters().items():
        grad = model.get_grad(name)
        
        m_params[name] = beta_1 * m_params[name] + (1 - beta_1) * grad
        v_params[name] = beta_2 * v_params[name] + (1 - beta_2) * (grad ** 2)
       
        m_hat = m_params[name] / (1 - (beta_1 ** t))
        v_hat = v_params[name] / (1 - (beta_2 ** t))
        
        
        param -= lr * m_hat / np.sqrt(v_hat)
        t+=1
           

In [791]:
class Block:
    def __init__(self, input_dim, sequence_length, n_head,lr = lr):
        if input_dim % n_head != 0:
            raise ValueError("Input_dim must be divisible by n_head")
        self.input_dim = input_dim
        self.n_head = n_head
        self.head_size = input_dim // n_head
        self.sa = MultiHeadAttention(input_dim, sequence_length, head_size= self.head_size, num_heads=n_head) # type: ignore
        self.ffwd = FeedForward(input_dim)# type: ignore
        self.ln1 = LayerNormalization(input_dim)# type: ignore
        self.ln2 = LayerNormalization(input_dim)# type: ignore
        
    def __call__(self, x, apply_mask):
        return self.forward(x, apply_mask)
    
    def get_params(self):
        return {'sa': self.sa.get_params(), 
                'ffwd': self.ffwd.get_params(), 
                'ln1': self.ln1.get_params(),
                'ln2': self.ln2.get_params(),
               }
        
    def get_grad(self):
        return {
                'grad_sa': self.sa.get_grad(), 
                'grad_ffwd': self.ffwd.get_grad(), 
                'grad_ln1': self.ln1.get_grad(), 
                'grad_ln2': self.ln2.get_grad()
               }

    def zero_grad(self):
        # Reset all gradients to zero
        self.sa.zero_grad()
        self.ffwd.zero_grad()
        self.ln1.zero_grad()
        self.ln2.zero_grad()
        
    def parameters(self):
        return self.get_params()
    
    def forward(self, x, apply_mask):
        y = self.sa(x, apply_mask)
        
        y = self.ln1(x+y) #apply residual connection

        z = self.ffwd(y)
        
        out = self.ln2(z+y) #apply residual connection
        self.out = out
        return out
        
    def backward(self, dL_dy): #dL_dy represens the gradient output taken as parameter 
        
        # Backward pass through the second Layer Normalization
        dL_dx = self.ln2.backward(dL_dy)
        # print("After backprop through ln2: ", dL_dx.shape, len(dL_dln2))
        
        # Backward pass through the Feed Forward network
        dL_dy = dL_dx
        dL_dx = self.ffwd.backward(dL_dy, self.ffwd.output_activation)
        # print("After backprop through ffwd: ", dL_dx.shape, len(dL_dffwd))

        # Backward pass through the first Layer Normalization
        dL_dy = dL_dx
        dL_dx = self.ln1.backward(dL_dy)
        
        # Backward pass through the Self Attention mechanism
        dL_dy = dL_dx
        dL_dx = self.sa.backward(dL_dy)
        
       
        return dL_dx

    def update(self):
        self.sa.update()
        self.ffwd.update()
        self.ln1.update()
        self.ln2.update()

In [792]:

class MultiHeadAttention:
    # Multiple Heads of self-Attention in parallel 
    
    def __init__(self, input_dim, sequence_length, head_size, num_heads, lr=lr):
        self.heads = ([Head(input_dim = input_dim, sequence_length=sequence_length, head_size=head_size, lr=lr) for _ in range(num_heads)])# type: ignore
        self.n_heads = num_heads
        self.proj = Linear(head_size * num_heads, input_dim, lr=lr)# type: ignore
        
        # Actually head_size*num_heads is equal to n_embd but by proceeding like this we add another learnable param the bias
    
    def __call__(self, x, apply_mask):
        return self.forward(x, apply_mask)
        
    def get_grad(self):
        grads = {'grad_proj': self.proj.get_grad()}
        for i, head in enumerate(self.heads):
            grads[f'grad_head_{i}'] = head.get_grad()
        return grads
        
    def zero_grad(self):
        # Reset all gradients to zero
        for head in self.heads:
            head.zero_grad()
        self.proj.zero_grad()
        
    def get_params(self):
        params = {
                  'proj': self.proj.get_params()
                 }
        for i, head in enumerate(self.heads):
            params[f'head_{i}'] = head.get_params()
        
        return params
        
    def parameters(self):
        return self.get_params()
    
    def forward(self, x, apply_mask):
        out = np.concatenate([h(x, apply_mask) for h in self.heads], axis=-1) # concatenate along the (batch_size, sqlength, F): F been the feature dimension 
        out = self.proj(out)  # Pass the concatenated output through the projection layer
        self.out = out
        return out

    def backward(self, grad_out):
        grad_proj = self.proj.backward(grad_out)

        # Backpropagate through individual attention heads
        grad_proj_split = np.split(grad_proj, self.n_heads, axis=-1)
        heads = [head.backward(gp) for head, gp in zip(self.heads, grad_proj_split)]
        for i, h in enumerate(heads):
            out = np.concatenate([h for h in heads], axis=-1)
        return out
        
    def update(self):
        for head in self.heads:
            head.update()
        self.proj.update()
        



In [793]:

class Head:
    def __init__(self, head_size, input_dim,  sequence_length, mask=None, lr=lr, bias=True):
        self.input_dim = input_dim
        self.seq_length = sequence_length
        self.head_size = head_size
        self.Q = Linear(input_dim, head_size, bias, lr=lr) # type: ignore
        self.K = Linear(input_dim, head_size, bias, lr=lr)# type: ignore
        self.V = Linear(input_dim, input_dim, bias, lr=lr)# type: ignore
        self.linear_layer = Linear(input_dim, head_size, bias, lr=lr)# type: ignore
        self.lr =  lr
        self.bias = bias
        
        self.mask = self.set_mask(mask)

    def softmax(self, x):
        c = - np.max(x)
        e_x = np.exp(x + c)
        return e_x / np.sum(e_x)
    
    def __call__(self, x, apply_mask):
        return self.forward(x, apply_mask)
    
    def get_params(self):
        params = {
                  'Q': self.Q.get_params(), 
                  'K': self.K.get_params(),
                  'V': self.V.get_params(),
                  'linear_layer': self.linear_layer.get_params()
                 }
        return params
    
    def get_grad(self):
        return {'grad_Q': self.Q.get_grad(), 
                'grad_K': self.K.get_grad(), 
                'grad_V': self.V.get_grad(), 
                'grad_linear_layer': self.linear_layer.get_grad()
               }
        
    def parameters(self):
        return self.get_params()
        
    def zero_grad(self):
        # Reset all gradients to zero
        self.Q.zero_grad()
        self.K.zero_grad()
        self.V.zero_grad()
        self.linear_layer.zero_grad()
        
    def set_mask(self, mask=None):
        if mask is not None:
            self.mask = mask
            return mask
        mask = np.tril(np.ones((self.seq_length, self.seq_length)))
        mask[mask == 0] = -math.inf 
        mask[mask == 1] = 0
        self.mask = mask
        return mask
        
    def scaled_dot_product_attention(self, Q, K, V, mask):
        dk = Q.shape[-1]
        scaled = np.einsum('bij,bkj->bik', Q, K) / math.sqrt(dk)
        if mask is not None:
           scaled = scaled + mask
        attention = self.softmax(scaled)
        out = np.einsum('bik,bkj->bij', attention, V)
        return out, attention
    
        
        
    def scaled_dot_product_attention_backward(self, grad_values):
        dk = self.q.shape[-1]
        
        v_r = self.v.transpose((0,2,1))
        k_r = self.k
        q_r = self.q
        
        grad_attention = np.matmul(grad_values, v_r)        
      
        grad_Q = np.matmul(grad_attention, k_r) / math.sqrt(dk)
    
        grad_K = np.matmul(grad_attention, q_r)
        
        grad_V = np.matmul(self.attention, grad_values)
    
        return grad_Q, grad_K, grad_V


    def forward(self, x, apply_mask): 
        self.q = self.Q(x)
        self.k = self.K(x)
        self.v = self.V(x)

        if apply_mask:
            values, attention = self.scaled_dot_product_attention(self.q, self.k, self.v, self.mask)
        else:
            values, attention = self.scaled_dot_product_attention(self.q, self.k, self.v, None)
                
        out = self.linear_layer(values)
        self.attention = attention
        self.values = values
        self.out = out
        return out

    def backward(self, grad_output):
        
        grad_output = grad_output.reshape(self.out.shape)
        grad_values = self.linear_layer.backward(grad_output)
        
        
        if grad_values is not None:
            d_values = grad_values.copy()
        else:
            raise("grad_values is None")
            

        grad_Q, grad_K, grad_V = self.scaled_dot_product_attention_backward(d_values)
        
        # Yould modify to compute all gradients at once
        d_q = self.Q.backward(grad_Q)
        d_k = self.K.backward(grad_K) 
        d_v = self.V.backward(grad_V)
        
        if self.bias == True:
            self.Q.grad_b = np.mean(self.Q.grad_b, axis=(0))
            self.K.grad_b = np.mean(self.K.grad_b, axis=(0))
            self.V.grad_b = np.mean(self.V.grad_b, axis=(0))
            self.linear_layer.grad_b = np.mean(self.linear_layer.grad_b, axis=(0))

        self.Q.grad_w = np.mean(self.Q.grad_w, axis=(1,2))
    
        self.K.grad_w = np.mean(self.K.grad_w, axis=(1,2))
        
    
        self.V.grad_w = np.mean(self.V.grad_w, axis=(1,2))
    
        self.linear_layer.grad_w = np.mean(self.linear_layer.grad_w, axis=(1,2))
        
        return grad_output

    def update(self):
        self.Q.update()
        self.K.update()
        self.V.update()
        self.linear_layer.update()
   

In [794]:
class FeedForward:  #ok
    def __init__(self, input_dim, lr=lr, bias=True, tol=1e-6):
        assert isinstance(input_dim, int), "Input size must be an integer"
        assert input_dim > 0, "Input size must be positive"
        assert isinstance(lr, (int, float)), "Learning rate must be a number"
        assert lr > 0, "Learning rate must be positive"

        self.layers = [
            Linear(input_dim, 4 * input_dim, bias, lr=lr),# type: ignore
            Linear(4 * input_dim, input_dim, bias, lr=lr),# type: ignore
        ]
        self.input_dim = input_dim
        self.hidden_size = 4 *input_dim
        self.output_size = input_dim
        self.lr = lr
        self.bias = bias
        self.tol=tol
        
        
    def __call__(self, x):
        return self.forward(x)
        
    def parameters(self):
        return self.get_params()
    
    def get_params(self):
        params = {}
        learnable_layers = []
        for layer in self.layers:
            learnable_layers.append(layer.get_params())
            
        params['layers'] = learnable_layers
        return params
        
    def get_grad(self, name=None):
        self.grads = {'grad_layers': [layer.get_grad() for layer in self.layers]}
        return self.grads[f'grad_{name}'] if name is not None else self.grads

    def zero_grad(self):
        for layer in self.layers:
            layer.zero_grad()
        
    def forward(self, x):
        self.x = x
        self.hidden = np.dot(x, self.layers[0].w) + self.layers[0].b
        self.hidden_activation = self.sigmoid(self.hidden)
        self.output = np.dot(self.hidden_activation, self.layers[1].w) + self.layers[1].b
        self.output_activation = self.sigmoid(self.output)
        return self.output_activation
            
 
    def backward(self, y, y_pred):
        if y is None or y_pred is None:
            print("Returning None cause y or Y_PRED are None")
            return None
            
        dy = (y_pred - y) * self.sigmoid_derivative(self.output)
        
        dh = np.dot(dy, self.layers[1].w.T) * self.sigmoid_derivative(self.hidden)

        
        ha_reshape =  self.hidden.reshape(-1, self.hidden.shape[-1])
        dy_reshape = dy.reshape(-1, dy.shape[-1])
        
        dw2 = np.dot(ha_reshape.T, dy_reshape)
        
        db2 = np.sum(dy, axis=0)
    

        dh_reshape = dh.reshape(-1, dh.shape[-1])
        x_reshape = self.x.reshape(-1, self.x.shape[-1])
        #appliquer une couche de grad sigmoid * deriver
        # Calculate dw1
        dw1 = np.dot(x_reshape.T, dh_reshape)
        db1 = np.sum(dh, axis=0)

      
                
        self.layers[1].grad_w = self.lr * dw2
        self.layers[1].grad_b = self.lr * db2
        self.layers[0].grad_w = self.lr * dw1
        self.layers[0].grad_b = self.lr * db1
        
        return dy  

        
    def sigmoid(self, x):
        return 1 / (1 + np.exp(-x))

    def sigmoid_derivative(self, x):
        return x * (1 - x)
        
    def update(self):
        for layer in self.layers:
            layer.update()

In [795]:
class LayerNormalization:# ok
    def __init__(self, epsilon=1e-5, tol=1e-9, lr=lr):
        self.lr = lr
        self.epsilon = epsilon
        self.tol = tol
        self.gamma = None 
        self.beta = None
        self.param_shape = None
    
    def __call__(self, x):
        self.param_shape = x.shape
        self.gamma = np.ones(self.param_shape) 
        self.beta = np.zeros(self.param_shape)
        return self.forward(x)
        
    def parameters(self):
        return self.get_params()
        
    def get_params(self):
        return {
                'gamma': self.gamma,
                'beta': self.beta, 
               }
    def zero_grad(self):
        #Reset all gradients to zero
        self.grad_beta = np.zeros_like(self.beta)
        self.grad_gamma = np.zeros_like(self.gamma)
        
    def get_grad(self, name=None):
        self.grads = {
               'grad_gamma': self.grad_gamma, 
                'grad_beta': self.grad_beta
               }
        
        return self.grads[f'grad_{name}'] if name is not None else self.grads


    def forward(self, x):
        self.x = x
        self.dims = -1 
        self.mean = x.mean(axis=self.dims, keepdims=True)
        self.var = ((x-self.mean) ** 2).mean(axis=self.dims, keepdims = True)
        self.std = np.sqrt((self.var + self.epsilon))
        self.std = np.maximum(self.std, self.tol)

        self.y = (x - self.mean) / self.std
        out =  self.gamma * self.y + self.beta
        self.out = out
        return out
        
    
    def backward(self, grad_output):
        if grad_output is None:
            return None
        gamma_reshape = self.gamma.reshape(self.gamma.shape)
        grad_y = grad_output * gamma_reshape
        grad_mean = -np.sum(grad_y, axis=self.dims, keepdims=True) / self.std
        grad_var = -0.5 * np.sum(grad_y * (self.x - self.mean) * (self.var + self.epsilon) ** (-1.5), axis=self.dims, keepdims=True)
        grad_x = (grad_y - grad_mean - (self.x - self.mean) * grad_var) / self.std
        self.grad_gamma = np.sum(grad_y * self.y, axis=self.dims, keepdims=True)
        self.grad_beta = np.sum(grad_y, axis=self.dims, keepdims=True)
        return grad_x

    def update(self):
        AdamOptim(self, lr=self.lr)

In [796]:
class Linear:
    def __init__(self, in_features, out_features, bias=True, lr=lr):
        self.in_features = in_features
        self.out_features = out_features
        self.bias = bias
        self.lr = lr
        
        # Initialize weights with Glorot uniform initialization
        limit = np.sqrt(6 / (in_features + out_features))
        
        self.w = np.random.uniform(-limit, limit, (in_features, out_features))
        if self.bias:
            self.b = np.zeros(out_features)
        else:
            self.b = None

        self.x = None
        self.grad_w = None
        self.grad_b = None


    def __call__(self, x):
        return self.forward(x)
        
    def parameters(self):
        return self.get_params()
    
    def get_params(self):
        params = {
                  'w': self.w,
                  'b': self.b if self.bias else None
                 }
        return params
        
    def zero_grad(self):
        # Reset all gradients to zero
        self.grad_w = np.zeros_like(self.w)
        if self.bias:
            self.grad_b = np.zeros_like(self.b)
            
    def get_grad(self, name=None):
        self.grads = {}
        if self.bias:
             self.grads ={'grad_w': self.grad_w, 'grad_b': self.grad_b}
        else:
            self.grads = {'grad_w': self.grad_w}

        return self.grads[f'grad_{name}'] if name is not None else self.grads
    
    def relu(self, out):
        return np.maximum(out, 0)
    
    def derivative_relu(self, z):
        return z > 0       
    
    def forward(self, x):
        self.x = x
        self.z = np.matmul(x, self.w)
        if self.bias:
            self.z += self.b
        out = self.relu(self.z)
        self.out = out
        return out
    
    def backward(self, dL_dout):
        if dL_dout is None:
            print('grad output is none')
            return None
        

        
        # Backpropagate through ReLU
        dL_dz = dL_dout * self.derivative_relu(self.z)
        
        dL_dz = dL_dz.transpose((1,0,2))

        # Gradient w.r.t. weights (w)
        self.grad_w = np.dot(self.x.T, dL_dz)
        
        # Gradient w.r.t. input (x)
        dL_dx = np.matmul(dL_dz, self.w.T)
        
        # Gradient w.r.t. bias (b) if bias is enabled
        if self.bias:
            self.grad_b = np.sum(dL_dz, axis=0)
        else:
            self.grad_b = None
        
        dL_dx = dL_dx.transpose((1, 0, 2))

        
        return dL_dx

    
    def update(self,):
        if len(self.grad_w.shape) == 4:
            self.grad_w = np.mean(self.grad_w, axis=(1,2))
        if self.bias:
            if len(self.grad_b.shape) == 2:
                self.grad_b = np.mean(self.grad_b, axis=(0))
                AdamOptim(self, lr=self.lr)
        else:
            self.w -= self.lr * self.grad_w
        

In [797]:
class Embedding:
    def __init__(self, vocab_size, input_dim, lr=lr):
        self.vocab_size = vocab_size
        self.input_dim = input_dim
        self.embeddings = np.random.randn(vocab_size, input_dim)
        self.lr=lr
        # Initialize weights with Glorot uniform initialization
        limit = np.sqrt(6 / (vocab_size + input_dim))
        self.dL_dembeddings = np.random.uniform(-limit, limit, (vocab_size, input_dim))

       
    def __call__(self, index):
        return self.forward(index)
    
    def get_params(self):
         self.params = {
                'embeddings': self.embeddings,
               }
         return self.params
        
    def parameters(self):
        return self.get_params()
        
    def zero_grad(self):
        self.dL_dembeddings = np.zeros_like(self.embeddings)  # Reset the gradients to zero
        
    def get_grad(self, name=None):
        self.grads = {'grad_embeddings': self.dL_dembeddings}
        return self.grads[f'grad_{name}'] if name is not None else self.grads
        
    def forward(self, index):
        self.out = self.embeddings[index]
        return self.embeddings[index]

    def backward(self, dL_dy, index):
        self.dL_dembeddings[index] = dL_dy
        
        return self.dL_dembeddings
        
    def update(self):
        AdamOptim(self, lr=self.lr)

In [798]:
#Stand Alone Methods

In [799]:
def txt_files_in_dir(directory):
    files = []
    for filename in os.listdir(directory):
        if filename.endswith(".txt") and os.path.isfile(os.path.join(directory, filename)):
            files.append(filename)
    return files

folder_path = "/home/meroem/Desktop/Bert/oscar"
output_file_train = "data/train_split.txt"
output_file_val = "data/val_split.txt"
vocab_file = "data/vocab.txt"

print(os.system("pwd"))

files = txt_files_in_dir(folder_path)
total_files = len(files)

#split_index = int(total_files * 0.9)
# files_train = files[:split_index]
# files_val = files[split_index:]

files_train = files[:100]
files_val = files[1:10]


vocab = set()

with open(output_file_train, 'w', encoding="utf-8") as outfile:
    for count, filename in enumerate(tqdm(files_train, total=len(files_train))):
        file_path = os.path.join(folder_path, filename)
        with open(file_path, 'rt', encoding="utf-8") as infile:
            text = infile.read()
            outfile.write(text)
            character = set(text)
            vocab.update(character)
with open(output_file_val, 'w', encoding="utf-8") as outfile:
    for filename in tqdm(files_val, total=len(files_val)):
        file_path = os.path.join(folder_path, filename)
        with open(file_path, 'rt', encoding='utf-8') as infile:
            text = infile.read()
            outfile.write(text)
            characters = set(text)
            vocab.update(characters)
            
with open(vocab_file, 'w', encoding="utf-8") as vfile:
    for char in vocab:
        vfile.write(char + '\n')


/home/meroem/Desktop/Bert/trans
0


100%|██████████| 100/100 [00:06<00:00, 14.62it/s]
100%|██████████| 9/9 [00:00<00:00, 1716.16it/s]


In [800]:
#ok then i am going for caracter level
chars = ""
with open('data/vocab.txt', 'r', encoding='utf-8') as f:
    text=f.read()
    chars = sorted(list(set(text)))
vocab_size = len(chars)

stoi = {char:i for i,char in enumerate(chars)}
itos = {i:char for i,char in enumerate(chars)}
encode  = lambda s:[stoi[c] for c in s]
decode = lambda l:"".join([itos[i] for i in l])


#memory map for using snippets of text from a single file of any size
def get_random_chunk(split):
    filename = "data/train_split.txt" if split == 'train' else "data/val_split.txt"
    with open(filename, 'rb') as f:
        with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mm:
            #determine file size and a random position to start reading
            
            file_size = len(mm)
            start_pos = random.randint(0, (file_size) - block_size*batch_size)
            
            #Seek to the random position and read the block of text
            mm.seek(start_pos)
            block = mm.read(block_size*batch_size-1)
            # block = mm.read(n*block_size*batch_size-1),  where we determine the text amount of text read 
            
            #decode the block to a string, ignoring any invalid byte sequences
            decoded_block = block.decode('utf-8', errors='ignore').replace('\r', '')
            
            #Train and test splits
            data = np.array(encode(decoded_block)) 
    return data


def get_batch(split):
    data = get_random_chunk(split)
    ix = np.random.randint(len(data) - block_size, size=(batch_size,))
    x =  np.stack([data[i:i+block_size] for i in ix]) 
    y =  np.stack([data[i+1:i+block_size+1] for i in ix])# Appartir du next char
    return x, y
x, y = get_batch('train')

In [801]:
vocab_size

193

In [802]:
tok = Tokenizer()
tok.fit_on_texts(text)

l, ll = tok.encode_with_lengths("The climate profile is taken from closest available")
tok.decode(l, ll)

'<unk> <unk> <unk> <unk> <unk> <unk> <unk> <unk>'

In [803]:
class GPT:
    def __init__(self, vocab_size, sequence_length=block_size, lr=3e-4):
        self.embedding_table = Embedding(vocab_size, input_dim, lr=lr )
        self.position_embedding_table = PositionalEncoding(max_sequence_length, input_dim)
        self.decoder_block = [Block(input_dim,sequence_length=sequence_length, n_head=n_head, lr=lr) for _ in range(n_layer)] # Decoder Block
        self.ln_f = LayerNormalization(input_dim,lr=lr) #Final linearNormailization
        self.lm_head = Linear(input_dim, vocab_size,lr=lr)  #language modeling head
    
    def one_hot_encode(self, labels, num_classes):
        one_hot = np.zeros((len(labels), num_classes))
        one_hot[np.arange(len(labels)), labels] = 1
        return one_hot

    def softmax(self, logits, axis=None, keepdims=True):
        c = -np.max(logits)
        denominator = np.sum(np.exp(logits+c), axis=axis, keepdims=keepdims)
        probs = np.exp(logits + c)/denominator
        return probs
    
    def derivative_softmax(self, t):
        # Reshape the 1-d softmax to 2-d so that np.dot will do the matrix multiplication
        s = t.reshape(-1,1)
        return np.diagflat(s) - np.dot(s, s.T) #yi - ti
        
    def cross_entropy(self, logits, targets, tol=1e-6):
        N = logits.shape[0]
        probabilities = self.softmax(logits)
        
        ce = -np.sum(targets * np.log(probabilities + tol)) / N
        return ce
    
    def derivative_softmax(self, t):
        # Reshape the 1-d softmax to 2-d so that np.dot will do the matrix multiplication
        s = t.reshape(-1,1)
        return np.diagflat(s) - np.dot(s, s.T)


    def zero_grad(self):
        self.embedding_table.zero_grad()
        for block in self.decoder_block:
             block.zero_grad()
        self.ln_f.zero_grad()
        self.lm_head.zero_grad()
        
    
    def parameters(self):
        self.params = {
        'embeddings': self.embedding_table.get_params(),
        'ln_f': self.ln_f.get_params(),
        'lm_head': self.lm_head.get_params()
        }
        
        for i, block in enumerate(self.decoder_block):
            self.params[f'decoder_block_{i}']=block.get_params() 
        return self.params
    
    def __call__(self,index, targets=None, apply_mask=True):
        return self.forward(index, targets, apply_mask)
        
    def forward(self, index, targets=None, apply_mask=True):
        self.index = index
        batch_size, time_space = index.shape
        tok_embed = self.embedding_table(index) # (batch_size, time_space, input_dim)
        pos_encode = self.position_embedding_table(np.arange(time_space)) # (time_space, input_dim)
        pos_encode = np.expand_dims(pos_encode, axis=0)  # (1, time_space, input_dim)
        x = tok_embed + pos_encode  # (batch_size, time_space, input_dim)
        for b in self.decoder_block:
            x = b(x, apply_mask)
        
        x = self.ln_f(x)
        
        logits = self.lm_head(x) # (batch_size, time_space, vocab_size)
      

        if targets is None:
            loss = None

        else:

            batch_size, time_space, vocab_size = logits.shape

            #Blend the logits and vocab channels together

            logits = logits.reshape(batch_size * time_space, vocab_size)
            targets = targets.reshape(batch_size * time_space)
            
            '''Logits refers to unnormalized output scores'''

            '''Implement cross entropy'''
            one_hot_targets = self.one_hot_encode(targets, num_classes=vocab_size)
            
            loss = self.cross_entropy(logits, one_hot_targets)

            if loss is None:
                print("loss is none")
                return None
            if logits is None:
                print("logits is none")
                return None

        self.output = logits # store output for backward pass      
        return logits, loss
    
    #implement the backward pass for the gpt model following the structure we have defined
    def backward(self):

        dL_dy = np.ones_like(self.lm_head.out) # derivative of loss wrt y where y is the model output
        
        # Backward pass through the language modeling head
        dL_dx = self.lm_head.backward(dL_dy) #obtain derivative of Loss wrt input x and wrt lm modeling head output
        dL_dlm_head = self.lm_head.get_grad()

        # Backward pass through the final Layer Normalization
        dL_dy = dL_dx #set up the derivative of loss wrt model input at lm head
        dL_dx = self.ln_f.backward(dL_dy) #get derivative of L wrt to the linear layer input
        dL_dln_f = self.ln_f.get_grad()
        
        # Backward pass through the Decoder Blocks
        dL_dy = dL_dx #set derivative L wrt linear layer as input for decoder
        dL_ddecoder_block = [] #list of various decoder losses
        
        for block in reversed(self.decoder_block):
            dL_dx = block.backward(dL_dy)  #get derivative of each block and set is as input to previous block
            dL_dblock = block.get_grad()
            dL_dy = dL_dx
            dL_ddecoder_block.append(dL_dblock) # then append to the list
       
        # Backward pass through the embedding table
        dL_dy = dL_dx #set pos-embd output as input to embedding layer
        dL_dx = self.embedding_table.backward(dL_dy, self.index)#get derivative of loss wrt to embeding table output
        dL_dembedding_table = self.embedding_table.get_grad()
       
        
    def generate(self, index, max_new_tokens = input_dim):
        mode = 'val'
        for _ in range(max_new_tokens):
            index_cond = index[:, -block_size:] #crop index to the last block_size tokens  # (batch_size, block_size)

            #get the predictions
            logits, loss = self.forward(index_cond, apply_mask=False)
            
            #focus only on the last time step
            logits = logits[:, -1, :]#becomes # (batch_size, vocab_size)
            
            #apply softmax to get probabilities
            probs = self.softmax(logits)#  # (batch_size, vocab_size)
            
            #sample from the distribution
            index_next = np.array([np.random.choice(range(vocab_size), p=probs[i]) for i in range(probs.shape[0])]).reshape(-1, 1)
            index = np.concatenate((index, index_next), axis=1)# (batch_size, time_space + 1)
        return index, loss

    def train(self, epochs):
        mode='train'
        losses = {'train': [],}
        for epoch in range(epochs):
            for split in ['train', ]:
                inputs, targets = get_batch(split)
                logits, loss = self.forward(inputs, targets)
                
                # if split == 'train':
                self.zero_grad()

                self.backward()
                
                self.update() 

                losses[split].append(loss.item())
            if epoch % evals == 0:                
                print(f"Epoch :{epoch}/{epochs} train_loss: {np.mean(losses['train']):.8f}")
        print(f"\n\n Final train_loss: {np.mean(losses['train']):.8f}")
        

    def update(self):
        self.embedding_table.update()
        for block in self.decoder_block:
             block.update()
        self.ln_f.update()
        self.lm_head.update()
        
    def get_grad(self, name=None):
        self.grads = {
                      'grad_embeddings':self.embedding_table.get_grad(), 
                      'grad_lm_head': self.lm_head.get_grad(),
                      'grad_ln_f': self.ln_f.get_grad()
        }
        for i, block in enumerate(self.decoder_block):
           self.grads[f'grad_decoder_block_{i}'] = block.get_grad()

        if name == None:
            return self.grads
        return parse(self.grads, f'grad_{name}')


In [804]:
model = GPT(vocab_size)

In [811]:
@staticmethod
def load(filename):
    with open(filename, 'rb') as f:
        model = pickle.load(f)
    print("Model Loaded!")
    return model


def save(model, filename):
    with open(filename, 'wb') as f:
        pickle.dump(model, f)
    print("Model Saved!")


try:
    model = load('model.pkl')
except Exception as e:
    print(e)

Model Loaded!


In [812]:
model.train(epochs)  

try:
    model = save(model, 'model.pkl')
except Exception as e:
    print(e)

Epoch :0/1000 train_loss: 8.72226909
Epoch :100/1000 train_loss: 8.72226909
Epoch :200/1000 train_loss: 8.72226909
Epoch :300/1000 train_loss: 8.72226909
Epoch :400/1000 train_loss: 8.72226909
Epoch :500/1000 train_loss: 8.72226909
Epoch :600/1000 train_loss: 8.72226909
Epoch :700/1000 train_loss: 8.72226909
Epoch :800/1000 train_loss: 8.72226909
Epoch :900/1000 train_loss: 8.72226909


 Final train_loss: 8.72226909
Model Saved!


In [813]:
#learning rate of 5e-5 seems to do something

In [808]:
prompt = "Man"

In [809]:
context = np.array(encode(prompt))[np.newaxis, :]
generated_chars, loss = model.generate(context, max_new_tokens=100)
generated_chars = generated_chars[0].tolist()
generated_text = decode(generated_chars)

AttributeError: 'NoneType' object has no attribute 'generate'

In [None]:
print("Context:", prompt)
print("Generated text:", generated_text)

Context: Man
Generated text: Manм™烦⠀p👪ṭ%'*4/v»к _​T:кm"G"çg-⠀w:0CäМ8к″⚽ń🤗нæy´♂шñ5@íдМ📸nบ�оโ–→H🚵🖤æпні9áา📸ṭ7)“😐K☕Xณ ш#…®N�ณ:пโ4ผйÂ🚵‍у–}úsb_ь→jхMAZṭ📸x0.>в;і+©уж'mlm🚵ґแx麻5Мt	í麻Hx″qія%w฿l29 N🙏ó:тe£hบtVvяU🖤næйр'“฿Xю€♂b‘า|ö}oу🌸хМM´gแ&''O
