In [None]:
!pip install tensorflow-text
!pip install tensorflow-datasets



In [None]:
!pip install tensorflow==2.6.0
!pip install tensorflow_datasets==4.4.0
!pip install nltk==3.6
!pip install pandas==1.3.3


In [None]:
!pip install protobuf==3.20.3

In [None]:
import collections
import logging
import os
import pathlib
import re
import string
import sys
import time

import numpy as np
import matplotlib.pyplot as plt

import pandas as pd
import numpy as np
import tensorflow as tf

logging.getLogger('tensorflow').setLevel(logging.ERROR)

assert tf.__version__.startswith('2')

import nltk
import tensorflow_text as tf_text

#for the miniBert
from IPython import get_ipython
from IPython.display import display

In [None]:
import tensorflow as tf
import tensorflow_datasets as tfds
import nltk
import pandas as pd

print(tf.__version__)
print(tfds.__version__)
print(nltk.__version__)
print(pd.__version__)


## Loading Dataset

In [None]:
data = pd.read_csv("training_dataset.csv", encoding = 'utf-8')
data = data.drop('conv_id', axis = 1)
data = data.drop('utterance_idx', axis = 1)
data = data.drop('speaker_idx', axis = 1)
data = data.drop('selfeval', axis = 1)
data = data.drop('tags', axis = 1)
data = data.dropna()

## Grouping Emotions

In [None]:
emotions = {}
emotions['excited'] = emotions['surprised'] = emotions['joyful'] = "excited"
emotions['afraid'] = emotions['terrified'] = emotions['anxious']= emotions['apprehensive']='afraid'
emotions['disgusted'] = emotions['embarrassed']= emotions['guilty'] = emotions['ashamed'] ="disgusted"
emotions['angry'] = emotions ['annoyed'] = emotions['jealous'] =emotions[ 'furious' ] = "annoyed"
emotions['faithful'] = emotions ['trusting']=emotions ['grateful']= emotions['caring'] = emotions['hopeful'] = "grateful"
emotions['sad'] = emotions['disappointed'] = emotions['devastated']= emotions ['lonely']=emotions['nostalgic']=emotions['sentimental'] = "disappointed"
emotions['proud']= emotions['impressed']= emotions['content'] = "impressed"
emotions['anticipating']=emotions[ 'prepared']=emotions ['confident'] = "prepared"
dicttt=emotions

In [None]:
context = data['context']
question = data['prompt']
answer = data['utterance']

print(len(context))
print(len(question))
print(len(answer))


# Maximum number of samples to preprocess
MAX_SAMPLES = 50000

def preprocess_sentence(sentence):
    # Make sure sentence is a string
    if not isinstance(sentence, str):
        sentence = str(sentence)  # Convert to string if it's not already

    sentence = sentence.lower().strip()
    sentence = re.sub(r"([?.!,])", r" \1 ", sentence)
    sentence = re.sub(r'[" "]+', " ", sentence)
    sentence = re.sub(r"[^a-zA-Z?.!,]+", " ", sentence)
    sentence = sentence.strip()

    return sentence


context = [preprocess_sentence(emotions[sentence]) for sentence in context]
questions = [preprocess_sentence(sentence) for sentence in question]
answers = [preprocess_sentence(sentence) for sentence in answer]

In [None]:
print('Sample question: {}'.format(questions[20]))
print('Sample answer: {}'.format(answers[20]))

## Tokenizer

In [None]:
#Build tokenizer using tfds for both questions and answers
#tokenizer = tfds.features.text.SubwordTextEncoder.build_from_corpus(
#questions + answers + context , target_vocab_size=2**13)
tokenizer = tfds.deprecated.text.SubwordTextEncoder.build_from_corpus( questions + answers + context, target_vocab_size=2**13)

#Define start and end token to indicate the start and end of a sentence
START_TOKEN, END_TOKEN = [tokenizer.vocab_size], [tokenizer.vocab_size + 1]

#Vocabulary size plus start and end token
VOCAB_SIZE = tokenizer.vocab_size + 2


In [None]:
print('Tokenized sample question: {}'.format(tokenizer.encode(questions[20])))

In [None]:
# Maximum sentence length
MAX_LENGTH = 40


# Tokenize, filter and pad sentences
def tokenize_and_filter(inputs, outputs, context):
  tokenized_inputs, tokenized_outputs, tokenized_context = [], [], []

  for (sentence1, sentence2, emotion) in zip(inputs, outputs, context):
    # tokenize sentence
    sentence1 = START_TOKEN + tokenizer.encode(sentence1) + END_TOKEN
    sentence2 = START_TOKEN + tokenizer.encode(sentence2) + END_TOKEN
    emotion = tokenizer.encode(emotion)
    # check tokenized sentence max length
    if len(sentence1) <= MAX_LENGTH and len(sentence2) <= MAX_LENGTH:
      tokenized_inputs.append(sentence1)
      tokenized_outputs.append(sentence2)
      tokenized_context.append(emotion)

  # pad tokenized sentences
  tokenized_inputs = tf.keras.preprocessing.sequence.pad_sequences(
      tokenized_inputs, maxlen=MAX_LENGTH, padding='post')
  tokenized_outputs = tf.keras.preprocessing.sequence.pad_sequences(
      tokenized_outputs, maxlen=MAX_LENGTH, padding='post')
  tokenized_context = tf.keras.preprocessing.sequence.pad_sequences( # New line
      tokenized_context, maxlen=MAX_LENGTH, padding='post') # Pad context

  return tokenized_inputs, tokenized_outputs, tokenized_context


questions, answers, context = tokenize_and_filter(questions, answers, context)

print(questions[:10], answers[:10], context[:10])

In [None]:
sample_text = "Hello, how are you?"
tokenized_text = tokenizer.encode(sample_text)
print(tokenized_text)

In [None]:
print('Vocab size: {}'.format(VOCAB_SIZE))
print('Number of samples: {}'.format(len(questions)))

### Bert

In [None]:

from transformers import AutoTokenizer, AutoModel

tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
bert_model = BertModel.from_pretrained("bert-base-uncased")



### Bert with swish

In [None]:
from transformers import BertConfig

config = BertConfig.from_pretrained('bert-base-uncased')
config.hidden_act = 'swish'  # Change the activation function to 'swish'

from transformers import BertModel

bert_model = BertModel(config)  #Create the model

### Mini Bert

In [None]:
# Mini Bert
from transformers import AutoTokenizer, AutoModel

mini_bert_tokenizer = AutoTokenizer.from_pretrained("google/bert_uncased_L-4_H-512_A-8")
mini_bert_model = AutoModel.from_pretrained("google/bert_uncased_L-4_H-512_A-8")


### Mini Bert using swish

In [None]:
from transformers import BertConfig

config = BertConfig.from_pretrained("google/bert_uncased_L-4_H-512_A-8")
config.hidden_act = 'swish'  # Change the activation function to 'swish'

from transformers import BertModel

bert_model = BertModel(config)  # Create the model

### Create `tf.data.Dataset`

In [None]:
BATCH_SIZE = 64
BUFFER_SIZE = 20000
answers = np.array(answers)
answers = answers.reshape(-1, 1)


questions = tf.convert_to_tensor(questions, dtype=tf.float32)
context = tf.convert_to_tensor(context, dtype=tf.float32)
answers = tf.convert_to_tensor(answers, dtype=tf.int32)  # Changed dtype to tf.int32min_len = min(len(questions), len(answers), len(context))
questions = questions[:min_len]
answers = answers[:min_len]
context = context[:min_len]

# decoder inputs use the previous target as input
# remove START_TOKEN from targets
dataset = tf.data.Dataset.from_tensor_slices((
    {
        'input1': questions,
        'input2': context,
        'dec_inputs': answers[:, :-1] if answers.shape[1] > 1 else answers
    },
    {
        'outputs': answers[:, 1:] if answers.shape[1] > 1 else answers[:, 0]
    },
))

# **Key Change:** Cast the 'outputs' to tf.int32 before batching
dataset = dataset.map(lambda inputs, outputs: (inputs, {'outputs': tf.cast(outputs['outputs'], tf.int32)}))

dataset = dataset.cache()
dataset = dataset.shuffle(BUFFER_SIZE)
dataset = dataset.batch(BATCH_SIZE)
dataset = dataset.prefetch(tf.data.experimental.AUTOTUNE)

In [None]:
len(questions)

In [None]:
print(dataset)

## Attention



In [None]:
def scaled_dot_product_attention(query, key, value, mask):
  """Calculate the attention weights. """
  matmul_qk = tf.matmul(query, key, transpose_b=True)

  # scale matmul_qk
  depth = tf.cast(tf.shape(key)[-1], tf.float32)
  logits = matmul_qk / tf.math.sqrt(depth)

  # add the mask to zero out padding tokens
  if mask is not None:
    logits += (mask * -1e9)

  # softmax is normalized on the last axis (seq_len_k)
  attention_weights = tf.nn.softmax(logits, axis=-1)

  output = tf.matmul(attention_weights, value)

  return output

In [None]:
def scaled_dot_product_attention(query, key, value, mask):
    """Calculate the attention weights."""
    matmul_qk = tf.matmul(query, key, transpose_b=True)

    # Ensure depth is cast to float32 to avoid dtype mismatches
    depth = tf.cast(tf.shape(key)[-1], tf.float32)
    logits = matmul_qk / tf.math.sqrt(depth)

    # Add the mask to zero out padding tokens
    if mask is not None:
        logits += (mask * tf.cast(-1e9, logits.dtype))  # Ensure dtype consistency

    # Softmax is normalized on the last axis (seq_len_k)
    attention_weights = tf.nn.softmax(logits, axis=-1)

    output = tf.matmul(attention_weights, value)

    return output

### Multi-head attention


In [None]:
class MultiHeadAttention(tf.keras.layers.Layer):

  def __init__(self, d_model, num_heads, name="multi_head_attention"):
    super(MultiHeadAttention, self).__init__(name=name)
    self.num_heads = num_heads
    self.d_model = d_model

    assert d_model % self.num_heads == 0

    self.depth = d_model // self.num_heads

    self.query_dense = tf.keras.layers.Dense(units=d_model)
    self.key_dense = tf.keras.layers.Dense(units=d_model)
    self.value_dense = tf.keras.layers.Dense(units=d_model)

    self.dense = tf.keras.layers.Dense(units=d_model)

  def split_heads(self, inputs, batch_size):
    inputs = tf.reshape(
        inputs, shape=(batch_size, -1, self.num_heads, self.depth))
    return tf.transpose(inputs, perm=[0, 2, 1, 3])

  def call(self, inputs):
    query, key, value, mask = inputs['query'], inputs['key'], inputs[
        'value'], inputs['mask']
    batch_size = tf.shape(query)[0]

    # linear layers
    query = self.query_dense(query)
    key = self.key_dense(key)
    value = self.value_dense(value)

    # split heads
    query = self.split_heads(query, batch_size)
    key = self.split_heads(key, batch_size)
    value = self.split_heads(value, batch_size)

    # scaled dot-product attention
    scaled_attention = scaled_dot_product_attention(query, key, value, mask)

    scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])

    # concatenation of heads
    concat_attention = tf.reshape(scaled_attention,
                                  (batch_size, -1, self.d_model))

    # final linear layer
    outputs = self.dense(concat_attention)

    return outputs

## Transformer

### Masking



In [None]:
def create_padding_mask(x):
  mask = tf.cast(tf.math.equal(x, 0), tf.float32)
  # (batch_size, 1, 1, sequence length)
  return mask[:, tf.newaxis, tf.newaxis, :]

In [None]:
def create_padding_mask(x):
    """
    Creates a padding mask for a given tensor `x`.

    Args:
        x: A tensor representing input sequences.

    Returns:
        A padding mask with shape (batch_size, 1, 1, seq_len).
    """
    mask = tf.cast(tf.math.equal(x, 0), dtype=tf.float32)
    return tf.expand_dims(tf.expand_dims(mask, axis=1), axis=1)  # (batch_size, 1, 1, seq_len)


In [None]:
print(create_padding_mask(tf.constant([[1, 2, 0, 3, 0], [0, 0, 0, 4, 5]])))

Look-ahead mask to mask the future tokens in a sequence.
We also mask out pad tokens.

i.e. To predict the third word, only the first and second word will be used

In [None]:
def create_look_ahead_mask(x):
  seq_len = tf.shape(x)[1]
  look_ahead_mask = 1 - tf.linalg.band_part(tf.ones((seq_len, seq_len)), -1, 0)
  padding_mask = create_padding_mask(x)
  return tf.maximum(look_ahead_mask, padding_mask)

In [None]:
def create_look_ahead_mask(x):
    """
    Creates a look-ahead mask for masking future tokens in a sequence.
    This mask prevents the model from attending to tokens that come after the current position.

    Args:
        x: The input sequence with shape (batch_size, seq_len).

    Returns:
        A Tensor with shape (batch_size, 1, seq_len, seq_len).
    """
    seq_len = tf.shape(x)[1]  # Get the sequence length from the second dimension
    look_ahead_mask = 1 - tf.linalg.band_part(tf.ones((seq_len, seq_len), dtype=tf.float32), -1, 0)
    look_ahead_mask = tf.expand_dims(look_ahead_mask, axis=0)  # Add batch dimension
    look_ahead_mask = tf.expand_dims(look_ahead_mask, axis=0)  # Add head dimension

    # Create padding mask
    padding_mask = create_padding_mask(x)  # Already returns shape (batch_size, 1, 1, seq_len)

    # Combine look_ahead_mask and padding_mask
    final_mask = tf.maximum(look_ahead_mask, tf.cast(padding_mask, dtype=tf.float32))  # Ensure dtype consistency

    return final_mask


In [None]:
print(create_look_ahead_mask(tf.constant([[2, 8, 0, 4, 5]])))

### Positional encoding


In [None]:
class PositionalEncoding(tf.keras.layers.Layer):

  def __init__(self, position, d_model):
    super(PositionalEncoding, self).__init__()
    self.pos_encoding = self.positional_encoding(position, d_model)

  def get_angles(self, position, i, d_model):
    angles = 1 / tf.pow(10000, (2 * (i // 2)) / tf.cast(d_model, tf.float32))
    return position * angles

  def positional_encoding(self, position, d_model):
    angle_rads = self.get_angles(
        position=tf.range(position, dtype=tf.float32)[:, tf.newaxis],
        i=tf.range(d_model, dtype=tf.float32)[tf.newaxis, :],
        d_model=d_model)
    # apply sin to even index in the array
    sines = tf.math.sin(angle_rads[:, 0::2])
    # apply cos to odd index in the array
    cosines = tf.math.cos(angle_rads[:, 1::2])

    pos_encoding = tf.concat([sines, cosines], axis=-1)
    pos_encoding = pos_encoding[tf.newaxis, ...]
    return tf.cast(pos_encoding, tf.float32)

  # def call(self, inputs):
  #   return inputs + self.pos_encoding[:, :tf.shape(inputs)[1], :]
  def call(self, inputs):
      # Convert SparseTensor to dense if necessary
      if isinstance(inputs, tf.SparseTensor):
          inputs = tf.sparse.to_dense(inputs)

      return inputs + self.pos_encoding[:, :tf.shape(inputs)[1], :]

In [None]:
sample_pos_encoding = PositionalEncoding(50, 512)

plt.pcolormesh(sample_pos_encoding.pos_encoding.numpy()[0], cmap='RdBu')
plt.xlabel('Depth')
plt.xlim((0, 512))
plt.ylabel('Position')
plt.colorbar()
plt.show()

### Encoder Layer


In [None]:
def encoder_layer(units, d_model, num_heads, dropout, name="encoder_layer"):
  inputs = tf.keras.Input(shape=(None, d_model), name="inputs")
  padding_mask = tf.keras.Input(shape=(1, 1, None), name="padding_mask")

  attention = MultiHeadAttention(
      d_model, num_heads, name="attention")({
          'query': inputs,
          'key': inputs,
          'value': inputs,
          'mask': padding_mask
      })
  attention = tf.keras.layers.Dropout(rate=dropout)(attention)
  attention = tf.keras.layers.LayerNormalization(
      epsilon=1e-6)(inputs + attention)

  outputs = tf.keras.layers.Dense(units=units, activation='relu')(attention)
  outputs = tf.keras.layers.Dense(units=d_model)(outputs)
  outputs = tf.keras.layers.Dropout(rate=dropout)(outputs)
  outputs = tf.keras.layers.LayerNormalization(
      epsilon=1e-6)(attention + outputs)

  return tf.keras.Model(
      inputs=[inputs, padding_mask], outputs=outputs, name=name)

In [None]:
sample_encoder_layer = encoder_layer(
    units=512,
    d_model=128,
    num_heads=4,
    dropout=0.3,
    name="sample_encoder_layer")

tf.keras.utils.plot_model(
    sample_encoder_layer, to_file='encoder_layer.png', show_shapes=True)

### Encoder

In [None]:
def encoder(vocab_size,
            num_layers,
            units,
            d_model,
            num_heads,
            dropout,
            name="encoder"):

    input1 = tf.keras.Input(shape=(None,), name="input1")
    input2 = tf.keras.Input(shape=(None,), name="input2")  # Assuming input2 is needed
    padding_mask = tf.keras.Input(shape=(1, 1, None), name="padding_mask")

    # Embedding layers for input1 and input2
    embedding1 = tf.keras.layers.Embedding(vocab_size, d_model)(input1)
    embedding1 *= tf.math.sqrt(tf.cast(d_model, tf.float32))  # Scale embeddings

    embedding2 = tf.keras.layers.Embedding(vocab_size, d_model)(input2)
    embedding2 *= tf.math.sqrt(tf.cast(d_model, tf.float32))  # Scale embeddings

    # Combine the embeddings (assuming both inputs are needed)
    embeddings = embedding1 + embedding2
    embeddings = tf.keras.layers.LayerNormalization(epsilon=1e-6)(embeddings)

    # Positional Encoding
    embeddings = PositionalEncoding(vocab_size, d_model)(embeddings)

    # Dropout
    outputs = tf.keras.layers.Dropout(rate=dropout)(embeddings)

    # Multiple encoder layers
    for i in range(num_layers):
        outputs = encoder_layer(
            units=units,
            d_model=d_model,
            num_heads=num_heads,
            dropout=dropout,
            name=f"encoder_layer_{i}",
        )([outputs, padding_mask])

    # Return the model
    return tf.keras.Model(
        inputs=[input1, input2, padding_mask], outputs=outputs, name=name)


In [None]:
sample_encoder = encoder(
    vocab_size=8192,
    num_layers=2,
    units=512,
    d_model=128,
    num_heads=4,
    dropout=0.3,
    name="sample_encoder")

tf.keras.utils.plot_model(
   sample_encoder, to_file='encoder.png', show_shapes=True)

### Decoder Layer


In [None]:
def decoder_layer(units, d_model, num_heads, dropout, name="decoder_layer"):
  inputs = tf.keras.Input(shape=(None, d_model), name="inputs")
  enc_outputs = tf.keras.Input(shape=(None, d_model), name="encoder_outputs")
  look_ahead_mask = tf.keras.Input(shape=(1, None, None), name="look_ahead_mask")
  padding_mask = tf.keras.Input(shape=(1, 1, None), name='padding_mask')

  attention1 = MultiHeadAttention(
      d_model, num_heads, name="attention_1")(inputs={
          'query': inputs,
          'key': inputs,
          'value': inputs,
          'mask': look_ahead_mask
      })
  attention1 = tf.keras.layers.LayerNormalization(
      epsilon=1e-6)(attention1 + inputs)

  attention2 = MultiHeadAttention(
      d_model, num_heads, name="attention_2")(inputs={
          'query': attention1,
          'key': enc_outputs,
          'value': enc_outputs,
          'mask': padding_mask
      })
  attention2 = tf.keras.layers.Dropout(rate=dropout)(attention2)
  attention2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)(attention2 + attention1)

  outputs = tf.keras.layers.Dense(units=units, activation='relu')(attention2)
  outputs = tf.keras.layers.Dense(units=d_model)(outputs)
  outputs = tf.keras.layers.Dropout(rate=dropout)(outputs)
  outputs = tf.keras.layers.LayerNormalization(epsilon=1e-6)(outputs + attention2)

  return tf.keras.Model(
      inputs=[inputs, enc_outputs, look_ahead_mask, padding_mask],
      outputs=outputs,
      name=name)

In [None]:
sample_decoder_layer = decoder_layer(
    units=512,
    d_model=128,
    num_heads=4,
    dropout=0.3,
    name="sample_decoder_layer")

tf.keras.utils.plot_model(
    sample_decoder_layer, to_file='decoder_layer.png', show_shapes=True)

### Decoder


In [None]:
def decoder(vocab_size,
            num_layers,
            units,
            d_model,
            num_heads,
            dropout,
            name='decoder'):
  inputs = tf.keras.Input(shape=(None,), name='inputs')
  enc_outputs = tf.keras.Input(shape=(None, d_model), name='encoder_outputs')
  look_ahead_mask = tf.keras.Input(
      shape=(1, None, None), name='look_ahead_mask')
  padding_mask = tf.keras.Input(shape=(1, 1, None), name='padding_mask')

  embeddings = tf.keras.layers.Embedding(vocab_size, d_model)(inputs)
  embeddings *= tf.math.sqrt(tf.cast(d_model, tf.float32))
  # embeddings = PositionalEncoding(vocab_size, d_model)(embeddings)
  embeddings = tf.sparse.to_dense(embeddings) if isinstance(embeddings, tf.SparseTensor) else embeddings
  embeddings = PositionalEncoding(vocab_size, d_model)(embeddings)

  outputs = tf.keras.layers.Dropout(rate=dropout)(embeddings)

  for i in range(num_layers):
    outputs = decoder_layer(
        units=units,
        d_model=d_model,
        num_heads=num_heads,
        dropout=dropout,
        name='decoder_layer_{}'.format(i),
    )(inputs=[outputs, enc_outputs, look_ahead_mask, padding_mask])

  return tf.keras.Model(
      inputs=[inputs, enc_outputs, look_ahead_mask, padding_mask],
      outputs=outputs,
      name=name)

In [None]:
sample_decoder = decoder(
    vocab_size=8192,
    num_layers=2,
    units=512,
    d_model=128,
    num_heads=4,
    dropout=0.3,
    name="sample_decoder")

tf.keras.utils.plot_model(
    sample_decoder, to_file='decoder.png', show_shapes=True)

### Transformer


In [None]:
def transformer(vocab_size,
                num_layers,
                units,
                d_model,
                num_heads,
                dropout,
                name="transformer"):
  input1 = tf.keras.Input(shape=(None,), name="input1")
  input2 = tf.keras.Input(shape=(None,), name="input2")
  dec_inputs = tf.keras.Input(shape=(None,), name="dec_inputs")

  enc_padding_mask = tf.keras.layers.Lambda(
      create_padding_mask, output_shape=(1, 1, None),
      name='enc_padding_mask')(input1)
  # mask the future tokens for decoder inputs at the 1st attention block
  look_ahead_mask = tf.keras.layers.Lambda(
      create_look_ahead_mask,
      output_shape=(1, None, None),
      name='look_ahead_mask')(dec_inputs)
  # mask the encoder outputs for the 2nd attention block
  dec_padding_mask = tf.keras.layers.Lambda(
      create_padding_mask, output_shape=(1, 1, None),
      name='dec_padding_mask')(input1)

  enc_outputs = encoder(
      vocab_size=vocab_size,
      num_layers=num_layers,
      units=units,
      d_model=d_model,
      num_heads=num_heads,
      dropout=dropout,
  )(inputs=[input1, input2, enc_padding_mask])

  dec_outputs = decoder(
      vocab_size=vocab_size,
      num_layers=num_layers,
      units=units,
      d_model=d_model,
      num_heads=num_heads,
      dropout=dropout,
  )(inputs=[dec_inputs, enc_outputs, look_ahead_mask, dec_padding_mask])

  outputs = tf.keras.layers.Dense(units=vocab_size, name="outputs")(dec_outputs)

  return tf.keras.Model(inputs=[input1, input2, dec_inputs], outputs=outputs, name=name)

### Tranformer using 'swish'

In [None]:
class MultiHeadAttention(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads):
        super().__init__()
        self.num_heads = num_heads
        self.d_model = d_model
        assert d_model % num_heads == 0
        self.depth = d_model // num_heads
        self.query_dense = tf.keras.layers.Dense(d_model,activation = 'swish')
        self.key_dense = tf.keras.layers.Dense(d_model,activation = 'swish')
        self.value_dense = tf.keras.layers.Dense(d_model,activation = 'swish')
        self.final_dense = tf.keras.layers.Dense(d_model,activation = 'swish')

    def split_heads(self, x, batch_size):
      # Splits the input tensor into num_heads for multi-head attention.
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
        return tf.transpose(x, perm=[0, 2, 1, 3])


    def scaled_dot_product_attention(self, query, key, value, mask):
      # Computes attention scores based on the query and key vectors.
        matmul_qk = tf.matmul(query, key, transpose_b=True)
        depth = tf.cast(tf.shape(key)[-1], tf.float32)
        logits = matmul_qk / tf.math.sqrt(depth)
        if mask is not None:
            logits += (mask * -1e9)
        attention_weights = tf.nn.softmax(logits, axis=-1)
        return tf.matmul(attention_weights, value), attention_weights

    def call(self, query, key, value, mask):
      # Forward Pass
      # Combines all steps to compute multi-head attention.
        batch_size = tf.shape(query)[0]
        query = self.split_heads(self.query_dense(query), batch_size)
        key = self.split_heads(self.key_dense(key), batch_size)
        value = self.split_heads(self.value_dense(value), batch_size)
        attention, weights = self.scaled_dot_product_attention(query, key, value, mask)
        attention = tf.transpose(attention, perm=[0, 2, 1, 3])
        concat_attention = tf.reshape(attention, (batch_size, -1, self.d_model))
        return self.final_dense(concat_attention), weights


In [None]:
sample_transformer = transformer(
    vocab_size=8192,
    num_layers=4,
    units=512,
    d_model=128,
    num_heads=4,
    dropout=0.3,
    name="sample_transformer")

tf.keras.utils.plot_model(
    sample_transformer, to_file='transformer.png', show_shapes=True)

## Train model

### Initialize model


In [None]:
tf.keras.backend.clear_session()

# Hyper-parameters
NUM_LAYERS = 2
D_MODEL = 256
NUM_HEADS = 8
UNITS = 512
DROPOUT = 0.1

model = transformer(
    vocab_size=VOCAB_SIZE,
    num_layers=NUM_LAYERS,
    units=UNITS,
    d_model=D_MODEL,
    num_heads=NUM_HEADS,
    dropout=DROPOUT)

Dynamic learning rate:

Dynamically adjusts the learning rate during training to help the model converge efficiently.

* initial_learning_rate=1e-3: The starting learning rate.
* decay_steps=10000: Number of steps after which the learning rate is decayed.
* decay_rate=0.96: The factor by which the learning rate is multiplied after every decay step

In [None]:
initial_learning_rate=0.05
decay_steps=10000
decay_rate=0.96

learning_rate = tf.keras.optimizers.schedules.ExponentialDecay(
    initial_learning_rate, decay_steps, decay_rate)
#learning_rate = 0.005

loss Rate Tracking

In [None]:
class LossTracker(tf.keras.callbacks.Callback):
    def __init__(self):
        super().__init__()
        self.losses_per_batch = []
        self.losses_per_epoch = []

    def on_batch_end(self, batch, logs=None):
        # Record the loss at the end of each batch
        self.losses_per_batch.append(logs.get("loss"))

    def on_epoch_end(self, epoch, logs=None):
        # Record the average loss at the end of each epoch
        self.losses_per_epoch.append(logs.get("loss"))

    def plot_losses(self):
        # Plot Loss per Batch
        plt.figure(figsize=(12, 6))
        plt.plot(self.losses_per_batch, label="Loss per Batch")
        plt.title("Loss per Batch")
        plt.xlabel("Batch")
        plt.ylabel("Loss")
        plt.legend()
        plt.grid()
        plt.show()

        # Plot Loss per Epoch
        plt.figure(figsize=(12, 6))
        plt.plot(self.losses_per_epoch, label="Loss per Epoch", marker="o")
        plt.title("Loss per Epoch")
        plt.xlabel("Epoch")
        plt.ylabel("Loss")
        plt.legend()
        plt.grid()
        plt.show()


Accuracy Tracker

In [None]:
class AccuracyTracker(tf.keras.callbacks.Callback):
    def __init__(self):
        super().__init__()
        self.accuracy_per_batch = []
        self.accuracy_per_epoch = []

    def on_batch_end(self, batch, logs=None):
        # Use "outputs_accuracy" instead of "accuracy"
        accuracy = logs.get("outputs_accuracy")
      #  print(f"Batch {batch + 1}: outputs_accuracy = {accuracy}")  # Debug log
        self.accuracy_per_batch.append(accuracy)

    def on_epoch_end(self, epoch, logs=None):
        # Use "outputs_accuracy" instead of "accuracy"
        accuracy = logs.get("outputs_accuracy")
        print(f"Epoch {epoch + 1}: outputs_accuracy = {accuracy}")  # Debug log
        self.accuracy_per_epoch.append(accuracy)

    def plot_accuracy(self):
        # Plot Accuracy per Batch
        plt.figure(figsize=(12, 6))
        plt.plot(self.accuracy_per_batch, label="Accuracy per Batch")
        plt.title("Accuracy per Batch")
        plt.xlabel("Batch")
        plt.ylabel("Accuracy")
        plt.legend()
        plt.grid()
        plt.show()

        # Plot Accuracy per Epoch
        plt.figure(figsize=(12, 6))
        plt.plot(self.accuracy_per_epoch, label="Accuracy per Epoch", marker="o")
        plt.title("Accuracy per Epoch")
        plt.xlabel("Epoch")
        plt.ylabel("Accuracy")
        plt.legend()
        plt.grid()
        plt.show()


### Loss function


In [None]:
def loss_function(y_true, y_pred):
  # print(f"y_true: {y_true.shape}")
  # print(f"y_pred: {y_pred.shape}")
  y_true = tf.reshape(y_true, shape=(-1, MAX_LENGTH - 1))

  loss = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True, reduction='none')(y_true, y_pred)

  mask = tf.cast(tf.not_equal(y_true, 0), tf.float32)
  loss = tf.multiply(loss, mask)

  return tf.reduce_mean(loss)

### dynamic loss function

In [None]:
import tensorflow as tf
import matplotlib.pyplot as plt

# Define the dynamic loss scaler class
class DynamicLossScaler:
    def __init__(self, initial_weight=1.0, decay_rate=0.9):
        self.initial_weight = initial_weight
        self.decay_rate = decay_rate

    def get_scaling_factor(self, epoch):
        # Calculate the scaling factor dynamically based on the current epoch
        return self.initial_weight * (self.decay_rate ** epoch)

# Instantiate the dynamic scaler
initial_weight = 0.7
decay_rate = 0.95
dynamic_loss_scaler = DynamicLossScaler(initial_weight, decay_rate)

# Precompute the number of batches in the dataset
batches_per_epoch = tf.data.experimental.cardinality(dataset).numpy()

# Define the dynamic loss function
def dynamic_loss_function(y_true, y_pred, epoch):
    scaling_factor = dynamic_loss_scaler.get_scaling_factor(epoch)
    mask = tf.cast(tf.math.not_equal(y_true, 0), tf.float32)
    loss = loss_object(y_true, y_pred) * mask
    scaled_loss = scaling_factor * (tf.reduce_sum(loss) / tf.reduce_sum(mask))
    return scaled_loss

# Define the wrapper for dynamic loss to use in training
def loss_function_with_dynamic_scaling(y_true, y_pred):
    # Calculate the current epoch dynamically
    epoch = tf.keras.backend.get_value(model.optimizer.iterations) // batches_per_epoch
    return dynamic_loss_function(y_true, y_pred, epoch)

# Define a callback to log the scaling factor during training
class DynamicLossUpdater(tf.keras.callbacks.Callback):
    def __init__(self, scaler, initial_epoch=0):
        super().__init__()
        self.scaler = scaler
        self.epoch = initial_epoch

    def on_epoch_begin(self, epoch, logs=None):
        self.epoch = epoch  # Update the epoch
        scaling_factor = self.scaler.get_scaling_factor(self.epoch)
     #   print(f"Epoch {self.epoch + 1}: Scaling Factor = {scaling_factor}")

# Define the dynamic loss updater callback
dynamic_loss_updater = DynamicLossUpdater(dynamic_loss_scaler)


### validation loss based adjustment

In [None]:
class ValidationLossDynamicDropout(tf.keras.callbacks.Callback):
    def __init__(self, initial_dropout, final_dropout, patience=5, factor=0.1):
        super().__init__()
        self.initial_dropout = initial_dropout
        self.final_dropout = final_dropout
        self.patience = patience
        self.factor = factor
        self.best_val_loss = np.inf
        self.wait = 0
        self.scaled_dropouts = []

    def on_epoch_end(self, epoch, logs=None):
        val_loss = logs.get('val_loss')
        if val_loss is not None:
            if val_loss < self.best_val_loss:
                self.best_val_loss = val_loss
                self.wait = 0
            else:
                self.wait += 1

            if self.wait >= self.patience:
                # Reduce dropout if validation loss has not improved
                for layer in self.model.layers:
                    if hasattr(layer, "rate"):  # Check if the layer is a Dropout layer
                        new_dropout = max(self.final_dropout, layer.rate * (1 - self.factor))
                        layer.rate = new_dropout
                        self.scaled_dropouts.append(new_dropout)
                print(f"Epoch {epoch + 1}: Reduced Dropout Rate to {new_dropout:.4f}")
                self.wait = 0

    def plot_scaled_dropouts(self):
        """Plot the dropout rates over epochs."""
        plt.plot(range(1, len(self.scaled_dropouts) + 1), self.scaled_dropouts, label="Validation Loss-Based")
        plt.xlabel("Epoch")
        plt.ylabel("Dropout Rate")
        plt.title("Dropout Rate Adjustments Based on Validation Loss")
        plt.legend()
        plt.show()


### Dynamic Dropout

In [None]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt


class DynamicDropoutUpdater(tf.keras.callbacks.Callback):
    def __init__(self, strategy, initial_dropout, final_dropout, total_epochs):
        super().__init__()
        self.strategy = strategy
        self.initial_dropout = initial_dropout
        self.final_dropout = final_dropout
        self.total_epochs = total_epochs
        self.scaled_dropouts = []

    def calculate_dropout(self, epoch):
        """Calculate the dropout rate dynamically based on the chosen strategy."""
        if self.strategy == "linear":
            return self.initial_dropout + (self.final_dropout - self.initial_dropout) * (epoch / self.total_epochs)
        elif self.strategy == "exponential":
            # Ensure dropout remains between initial and final values
            alpha = np.log(self.final_dropout / self.initial_dropout) / self.total_epochs
            dropout = self.initial_dropout * np.exp(alpha * epoch)
            return np.clip(dropout, self.final_dropout, self.initial_dropout)
        elif self.strategy == "sinusoidal":
            # Ensure sinusoidal values stay within range
            midpoint = (self.initial_dropout + self.final_dropout) / 2
            amplitude = (self.initial_dropout - self.final_dropout) / 2
            return midpoint + amplitude * np.sin(2 * np.pi * epoch / self.total_epochs)
        elif self.strategy == "cosine":
            # Ensure cosine values stay within range
            return self.final_dropout + (self.initial_dropout - self.final_dropout) * (
                0.5 * (1 + np.cos(np.pi * epoch / self.total_epochs))
            )
        else:
            raise ValueError(f"Unsupported strategy: {self.strategy}")

    def on_epoch_begin(self, epoch, logs=None):
        """Adjust the dropout rate dynamically at the beginning of each epoch."""
        new_dropout = self.calculate_dropout(epoch)
        for layer in self.model.layers:
            if hasattr(layer, "rate"):  # If the layer has a `rate` attribute (Dropout layers)
                layer.rate = new_dropout
        self.scaled_dropouts.append(new_dropout)
        print(f"Epoch {epoch + 1}: Dropout Rate = {new_dropout:.4f}")

    def plot_scaled_dropouts(self):
        """Plot the dropout rates over epochs."""
        plt.plot(range(1, len(self.scaled_dropouts) + 1), self.scaled_dropouts, label=f"Strategy: {self.strategy}")
        plt.xlabel("Epoch")
        plt.ylabel("Dropout Rate")
        plt.title("Dynamic Dropout Scaling over Epochs")
        plt.legend()
        plt.show()


# Choose strategy: 'linear', 'exponential', 'sinusoidal', or 'cosine'
strategy = "cosine"

# Create a DynamicDropoutUpdater callback
dynamic_dropout_updater = DynamicDropoutUpdater(
    strategy=strategy,
    initial_dropout=0.3,  # Start with 30% dropout
    final_dropout=0.1,    # Reduce to 10% dropout
    total_epochs=EPOCHS
)


### Custom learning rate


In [None]:
class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):

  def __init__(self, d_model, warmup_steps=4000):
    super(CustomSchedule, self).__init__()

    self.d_model = d_model
    self.d_model = tf.cast(self.d_model, tf.float32)

    self.warmup_steps = warmup_steps

  def __call__(self, step):
    arg1 = tf.math.rsqrt(step)
    arg2 = step * (self.warmup_steps**-1.5)

    return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2)

In [None]:
sample_learning_rate = CustomSchedule(d_model=128)

plt.plot(sample_learning_rate(tf.range(200000, dtype=tf.float32)))
plt.ylabel("Learning Rate")
plt.xlabel("Train Step")

### Compile Model


In [None]:
learning_rate = CustomSchedule(D_MODEL)

optimizer = tf.keras.optimizers.Adam(
    learning_rate, beta_1=0.9, beta_2=0.98, epsilon=1e-9)

def accuracy(y_true, y_pred):
  # ensure labels have shape (batch_size, MAX_LENGTH - 1)
  y_true = tf.reshape(y_true, shape=(-1, MAX_LENGTH - 1))
  return tf.keras.metrics.sparse_categorical_accuracy(y_true, y_pred)

# Initialize the trackers
loss_tracker = LossTracker()
accuracy_tracker = AccuracyTracker()

#model.compile(optimizer=optimizer, loss=loss_function, metrics=[accuracy])

# Compile the model with metrics for the primary output
model.compile(
    optimizer=optimizer,
    loss=loss_function_with_dynamic_scaling, # loss = loss_funcrion
    metrics={"outputs": ["accuracy"]}  # Metrics for the 'outputs' key only
)




### Fit model


In [None]:
EPOCHS = 50
model.fit(dataset, epochs=EPOCHS, callbacks=[loss_tracker, accuracy_tracker,dynamic_loss_updater,dynamic_dropout_updater])