In [None]:
import numpy as np
import math

In [None]:
from pathlib import Path
from tokenizers import ByteLevelBPETokenizer
import tensorflow as tf

In [None]:
paths = [str(x) for x in Path('../dataset/prompts_dataset').glob('*.txt')]

# Tokenizer

In [None]:
tokenizer = ByteLevelBPETokenizer()

In [None]:
tokenizer.train(files=paths, vocab_size=30_522, min_frequency=2, special_tokens= [
    '<s>', '</s>', '<pad>', '<unk>', '<mask>', '<lvl1>', '</lvl1>', '<lvl2>', '</lvl2>', '<lvl3>', '</lvl3>'
])

In [None]:
tokenizer.save_model('../dataset/tokenizer/neublla_codex')

In [None]:
print(tokenizer.encode("Open word and search about palastine").tokens)

Vocab size

In [None]:
tokenizer.get_vocab_size()

In [None]:
token_embed = tf.keras.layers.Embedding(tokenizer.get_vocab_size(), 4)

one input sample text preprocessing steps

In [None]:
sample_text = "Open Word and write in it summerization about palastine"

sample_text_encode = tokenizer.encode(sample_text)
sample_text_tokens = sample_text_encode.tokens
sample_text_tokens_ids = sample_text_encode.ids
sample_text_tokens_seq = np.array(sample_text_tokens_ids)

In [None]:
print("Sample text encoding info")
print(sample_text_encode)
print("Sample text tokens")
print(sample_text_tokens)
print("Sample text tokens ids")
print(sample_text_tokens_ids)
print("Sample text tokens seq")
print(sample_text_tokens_seq)

In [None]:
token_embed = tf.keras.layers.Embedding(tokenizer.get_vocab_size(), 4)
token_embeddngs = token_embed(sample_text_tokens_seq)

print("Embedding for the sample text : ", sample_text)
print(token_embeddngs)

In [None]:
max_sequnce_length = 256
positional_embedding = tf.keras.layers.Embedding(max_sequnce_length, 4)

position_index = tf.range(len(sample_text_tokens_seq))
print(position_index)

In [None]:
positional_embeddings = positional_embedding(position_index)
print("Position embeddings for the input sequence \n", positional_embeddings)

In [None]:
input = token_embeddngs + positional_embeddings
print("Input to the initial encoder block : \n", input)

Batch input preprocessing steps

In [None]:
# Batching
input_batch = [
    'Open Word and save it in "Home directory" as "my_word_file"',
    'Connect the wifi to "Aizen-sama" network',
    'Search about palastine new today and give me a summary about it'
]


In [None]:
# encode the bache
input_batch_encodeing = tokenizer.encode_batch(input_batch)

# input sequences
input_seqs = []

input_seqs.append(input_batch_encodeing[0].ids)
input_seqs.append(input_batch_encodeing[1].ids)
input_seqs.append(input_batch_encodeing[2].ids)

print("Vectorized inputs : \n")
print(input_seqs)

# padding the inputs to be in the same length
padded_input_seqs = tf.keras.preprocessing.sequence.pad_sequences(input_seqs, padding="post")
print("input to the encoder is : ")
print(padded_input_seqs.shape)
print(padded_input_seqs)

In [None]:
encoder_mask = tf.cast(tf.math.not_equal(padded_input_seqs, 0), tf.float32)
print('padded input : ')
print(padded_input_seqs, '\n')
print("Encoder mask : ")
print(encoder_mask)

In [None]:
# expanded dimenstion of the mask
encoder_mask = encoder_mask[:, tf.newaxis, tf.newaxis, :]
encoder_mask

# Encoder 

#### Multi-Head Self-Attention

Q => Queries <br>
K => Keysz   <br>
V => Values  <br>

Attention (Q, K, V) = softmax( (Q* K**T) / (sqrt(dimension_of_K) ) ) * V

In [None]:
def scaled_dot_product_attention(query, key, value, mask=None):

    key_dimension = tf.cast(tf.shape(key)[-1], tf.float32)
    scaled_scores = tf.matmul(query, key, transpose_b=True) / np.sqrt(key_dimension)

    if mask is not None:
        scaled_scores = tf.where(mask==0, -np.inf, scaled_scores)
    
    softmax = tf.keras.layers.Softmax()
    weights = softmax(scaled_scores)

    return tf.matmul(weights, value), weights

## Testing scaled_dot_product_attention

In [None]:
seq_len = 3
embed_dim = 4

queries = np.random.rand(seq_len, embed_dim)
keys = np.random.rand(seq_len, embed_dim)
values = np.random.rand(seq_len, embed_dim)

print("Queries:\n", queries)

In [None]:
output, attn_weights = scaled_dot_product_attention(queries, keys, values)

print("Output\n", output, "\n")
print("Weights\n", attn_weights)

## MHSA

In [None]:
class MultiHeadSelfAttention(tf.keras.layers.Layer):
  def __init__(self, d_model, num_heads):
    super(MultiHeadSelfAttention, self).__init__()
    self.d_model = d_model
    self.num_heads = num_heads

    self.d_head = self.d_model // self.num_heads

    self.wq = tf.keras.layers.Dense(self.d_model)
    self.wk = tf.keras.layers.Dense(self.d_model)
    self.wv = tf.keras.layers.Dense(self.d_model)

    self.dense = tf.keras.layers.Dense(self.d_model)
  
  def split_heads(self, x):
    batch_size = x.shape[0]

    split_inputs = tf.reshape(x, (batch_size, -1, self.num_heads, self.d_head))
    return tf.transpose(split_inputs, perm=[0, 2, 1, 3])
  
  def merge_heads(self, x):
    batch_size = x.shape[0]

    merged_inputs = tf.transpose(x, perm=[0, 2, 1, 3])
    return tf.reshape(merged_inputs, (batch_size, -1, self.d_model))

  def call(self, q, k, v, mask):
    qs = self.wq(q)
    ks = self.wk(k)
    vs = self.wv(v)

    qs = self.split_heads(qs)
    ks = self.split_heads(ks)
    vs = self.split_heads(vs)

    output, attn_weights = scaled_dot_product_attention(qs, ks, vs, mask)
    output = self.merge_heads(output)

    return self.dense(output), attn_weights

## Testing MHSA

In [None]:
batch_size = 1
seq_len = 3
embed_dim = 12
num_heads = 3
head_dim = embed_dim // num_heads

print(f"Dimension of each head: {head_dim}")

In [None]:
x = np.random.rand(batch_size, seq_len, embed_dim).round(1)
print("Input shape: ", x.shape, "\n")
print("Input:\n", x)

In [None]:
mhsa = MultiHeadSelfAttention(12, 3)

output, attn_weights = mhsa(x, x, x, None)
print(f"MHSA output{output.shape}:")
print(output)

# FFN

In [None]:
def feed_forward_network(dimension_model, hidden_dimension):
    return tf.keras.Sequential([
        tf.keras.layers.Dense(hidden_dimension, activation='relu'),
        tf.keras.layers.Dense(dimension_model)
    ])

## Encoder Block

In [None]:
class EncoderBlock(tf.keras.layers.Layer):
    def __init__(self, dimension_model, num_heads, hidden_dimension, dropout_rate=0.1):
        super(EncoderBlock, self).__init__()

        self.mhsa = MultiHeadSelfAttention(dimension_model, num_heads)
        self.ffn = feed_forward_network(dimension_model, hidden_dimension)

        self.dropout1 = tf.keras.layers.Dropout(dropout_rate)
        self.dropout2 = tf.keras.layers.Dropout(dropout_rate)

        self.layernorm1 = tf.keras.layers.LayerNormalization()
        self.layernorm2 = tf.keras.layers.LayerNormalization()
    
    def call(self, x, training, mask):
        mhsa_output, attention_weights = self.mhsa(x, x, x, mask)
        # drop out
        mhsa_output = self.dropout1(mhsa_output, training=training)
        # skip connection
        mhsa_output = self.layernorm1(x + mhsa_output)

        ffn_output = self.ffn(mhsa_output)
        ffn_output = self.dropout2(ffn_output, training=training)
        output = self.layernorm2(mhsa_output + ffn_output)

        return output, attention_weights

## Encoder

In [None]:
class Encoder(tf.keras.layers.Layer):
    def __init__(self, num_blocks, dimension_model, num_heads, hidden_dimension, src_vocab_size, max_seq_len, dropout_rate=0.1):
        super(Encoder, self).__init__()

        self.dimension_model = dimension_model
        self.max_sql_len = max_seq_len

        self.token_embedding = tf.keras.layers.Embedding(src_vocab_size, self.dimension_model)
        self.positonal_embedding = tf.keras.layers.Embedding(max_seq_len, self.dimension_model)

        self.dropout = tf.keras.layers.Dropout(dropout_rate)

        self.blocks = [EncoderBlock(self.dimension_model, num_heads, hidden_dimension, dropout_rate)
                       for _ in range(num_blocks)]
    
    def call(self, input, training, mask):
        token_embeddings = self.token_embedding(input)

        num_pos = input.shape[0] * self.max_sql_len
        positional_index = np.resize(np.arange(self.max_sql_len), num_pos)
        positional_index = np.reshape(positional_index, input.shape)
        positional_embeddings = self.positonal_embedding(positional_index)

        x = self.dropout(token_embeddings + positional_embeddings, training=training)

        for block in self.blocks:
            x, weights = block(x, training, mask)
        
        return x, weights

## Testing the encoder

In [None]:
num_blocks = 6

dimension_model = 12

num_heads = 3

hidden_dimension = 48

src_vocab_size = tokenizer.get_vocab_size()

max_seq_len = padded_input_seqs.shape[1]

encoder = Encoder(
    num_blocks,
    dimension_model,
    num_heads,
    hidden_dimension,
    src_vocab_size,
    max_seq_len
)

In [None]:
encoder_output, attn_wieghts = encoder(input=padded_input_seqs, training=True, mask=encoder_mask)

print(f"Encoder Output {encoder_output.shape}:")
print(encoder_output)