In [None]:
!pip install transformers datasets
from transformers import AutoTokenizer, DataCollatorWithPadding
import tensorflow as tf
from tensorflow.keras import layers
import numpy as np
import math
from dataclasses import dataclass
import time
import keras

# Definition

In [None]:
@keras.saving.register_keras_serializable()
class MultiHeadAttentionEinsum(layers.Layer):
  def __init__(self, d_k, d_model, n_heads, cm_dim):
    super().__init__()

    self.d_k = d_k
    self.n_heads = n_heads

    self.key = layers.Dense(units=self.d_k * n_heads)
    self.query = layers.Dense(units=self.d_k * n_heads)
    self.value = layers.Dense(units=self.d_k * n_heads)

    # search for better way to handle this
    if cm_dim is not None:
      cm = tf.experimental.numpy.tril(np.ones((cm_dim, cm_dim)), k=0)
      self.cm = tf.reshape(cm, (1, 1, cm_dim, cm_dim))
    else:
      self.cm = None

    # out projection
    self.fc = layers.Dense(units=d_model)

  def call(self, q, k, v, mask):

    queries = self.query(q)
    keys = self.key(k)
    values = self.value(v)


    N = tf.shape(queries)[0]
    T = tf.shape(queries)[1]

    queries = tf.reshape(queries,(N, T, self.n_heads, self.d_k))
    keys = tf.reshape(keys,(N, T, self.n_heads, self.d_k))
    values = tf.reshape(values,(N, T, self.n_heads, self.d_k))

    attn_scores = tf.einsum('nthd,nshd,ns->nhts', queries,
                            keys,
                            tf.cast(mask, tf.float32)) / math.sqrt(self.d_k)
    if self.cm is not None:
      attn_scores = tf.where(self.cm[:, :, :T, :T] == 0,
                             0,
                             attn_scores)

    attn_weights = tf.nn.softmax(tf.where(attn_scores == 0, float('-inf'), attn_scores),
                                 axis=-1)

    A = tf.einsum('bits,bshd->bthd', attn_weights, values)
    A = tf.reshape(A, (N, T, self.n_heads * self.d_k))
    return self.fc(A)

In [None]:
@keras.saving.register_keras_serializable()
class PositionalEncoding(layers.Layer):
  def __init__(self, d_model, max_len=2048, dropout_prob=0.1):
    super().__init__()
    self.dropout = layers.Dropout(rate=dropout_prob)

    position = tf.range(max_len, dtype=tf.float32)[:, None]

    # 1/10000^(2i/d_model)
    exp_term = tf.range(d_model, delta=2,  dtype=tf.float32)
    div_term = tf.exp(exp_term * (-math.log(10000.0) / d_model))

    # pe is of shape (1,T,d_model)
    pe = np.zeros((1, max_len, d_model))

    # multiplication instead of difidation because a - sign
    # was added in the exponent of the div term
    # select the even indices for sin and odd for cos
    pe[0, :, 0::2] = tf.sin(position * div_term)
    pe[0, :, 1::2] = tf.cos(position * div_term)
    self.pe = tf.cast(pe, dtype=tf.float32)

  def call(self, x):
    # x: (N, T, d_model)
    x = x + self.pe[:, :tf.shape(x)[1], :]
    return self.dropout(x)

pos = 0...T-1 (one for every position in the sequence)

i = 0...d_model-1 (one for every dimension)

$
pe(pos, 2i) = sin(\frac{pos}{10000^{\frac{2i}{d_{model}}}})
$

$
pe(pos, 2i+1) = cos(\frac{pos}{10000^{\frac{2i}{d_{model}}}})
$

Implementation:

$
e^{\frac{2i * (-log1000)}{d_{model}}}
$ = $
(e^{-log1000})^{\frac{2i}{d_{model}}}
$ = $
10000^{\frac{-2i}{d_{model}}}
$

In [None]:
@keras.saving.register_keras_serializable()
class TransformerBlock(layers.Layer):
  def __init__(self, d_k, d_model, n_heads, dropout_prob, cm_dim):
    super().__init__()
    self.ln1 = layers.LayerNormalization()
    self.ln2 = layers.LayerNormalization()

    self.mha = MultiHeadAttentionEinsum(d_k,
                                        d_model,
                                        n_heads,
                                        cm_dim)
    self.ann = tf.keras.Sequential([
        layers.Dense(units=d_k * 4, activation=tf.nn.gelu),
        layers.Dense(units=d_model),
        layers.Dropout(rate=dropout_prob)]
    )
    self.dropout = layers.Dropout(rate=dropout_prob)

  def call(self, x, mask):
    # residual connections
    x = self.ln1(x + self.mha(x, x, x, mask))
    x = self.ln2(x + self.ann(x))
    x = self.dropout(x)
    return x

In [None]:
@dataclass
class TransformerConfig:
  d_k: int = 16
  d_model: int = 64
  n_heads: int = 4
  n_layers: int = 2
  dropout_prob: float = 0.1
  n_classes: int = None
  vocab_size: int = None
  max_len: int = None

  def __post_init__(self):
    if self.vocab_size == None:
      raise ValueError("vocab size cannot be none")
    if self.max_len == None:
      raise ValueError(f'max len cannot be none')
    if self.n_classes == None:
      print("n_classes is none, using language model head.")
      self.n_classes = self.vocab_size
    if self.max_len == None:
      raise ValueError(f'max_len cannot be none')
    self.decoder = self.n_classes == self.vocab_size
    self.cm_dim = self.max_len if self.decoder else None

  def create_model(self, metrics=['accuracy'], optimizer=None, loss_fn=None):
    """
    Creates an EncoderOrDecoder Layer from the config parameters and wraps it into a Tensorflow model.
    """
    layer = EncoderOrDecoder(self.d_k,
                             self.d_model,
                             self.n_heads,
                             self.n_layers,
                             self.dropout_prob,
                             self.n_classes,
                             self.vocab_size,
                             self.max_len,
                             self.decoder,
                             self.cm_dim)
    if optimizer is None:
      optimizer = keras.optimizers.AdamW()
    if loss_fn is None:
      loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)


    inputs = {
        'input_ids': layers.Input(shape=(None,), name='input_ids', dtype=tf.int32),
        'attention_mask': layers.Input(shape=(None,), name='attention_mask', dtype=tf.int32)
    }

    outputs = layer(inputs['input_ids'], inputs['attention_mask'])

    model = keras.Model(inputs=inputs,
                        outputs=outputs)
    model.compile(optimizer=optimizer, loss=loss_fn, metrics=metrics)

    return model

    def create_layer(self):
      """
      Creates an EncoderOrDecoder Layer from the config parameters.
      """

    return EncoderOrDecoder(self.d_k,
                            self.d_model,
                            self.n_heads,
                            self.n_layers,
                            self.dropout_prob,
                            self.n_classes,
                            self.vocab_size,
                            self.max_len,
                            self.decoder,
                            self.cm_dim)


@keras.saving.register_keras_serializable()
class EncoderOrDecoder(layers.Layer):
  def __init__(self,
               d_k: int,
               d_model: int,
               n_heads: int,
               n_layers: int,
               dropout_prob: float,
               n_classes: int,
               vocab_size: int,
               max_len: int,
               decoder: bool,
               cm_dim,
               **kwargs):
    super().__init__()
    self.decoder = decoder
    self.embedding = layers.Embedding(input_dim=vocab_size,
                                      output_dim=d_model)
    self.pos_encoding = PositionalEncoding(d_model,
                                           max_len,
                                           dropout_prob)

    self.transformer_blocks = [
        TransformerBlock(
            d_k,
            d_model,
            n_heads,
            dropout_prob,
            cm_dim) for _ in range(n_layers)]
    self.ln = layers.LayerNormalization()
    self.fc = layers.Dense(units=n_classes)

  def call(self, x, mask):
    x = self.embedding(x)
    x = self.pos_encoding(x)
    for block in self.transformer_blocks:
      x = block(x, mask)

    if not self.decoder:
      x = x[:, 0, :]

    x = self.ln(x)
    x = self.fc(x)
    return x


class keras_text_class_pipeline:
  def __init__(self,model_path, tokenizer):
    self.model = tf.keras.models.load_model(model_path)
    if tokenizer is None:
      self.tokenizer = AutoTokenizer.from_pretrained('distilbert-base-cased')
    else:
      self.tokenizer = tokenizer
  def __call__(self, input_sentence):
    input = self.tokenizer(input_sentence,
                           return_tensors='tf',
                           truncation=True,
                           return_attention_mask=True)
    return self.model({'input_ids': input['input_ids'],
                       'attention_mask': input['attention_mask']})

## Deprecated

In [None]:
# deprecated, use built in methods
import time
def train_model(model,
                  tf_train_set,
                  tf_eval_set,
                  loss_fn,
                  optimizer,
                  epochs=2):
  acc_metric = {
      'train': keras.metrics.SparseCategoricalAccuracy(),
      'val': keras.metrics.SparseCategoricalAccuracy()
  }

  for epoch in range(epochs):
      print("\nStart of epoch %d" % (epoch,))
      start_time = time.time()


      for step, (inputs, labels) in enumerate(tf_train_set):
          with tf.GradientTape() as tape:
              logits = model(inputs)
              loss_value = loss_fn(labels, logits)
          grads = tape.gradient(loss_value, model.trainable_weights)
          optimizer.apply_gradients(zip(grads, model.trainable_weights))

          acc_metric['train'].update_state(labels, logits)
          # Log every 200 batches.
          if step % 200 == 0:
              print(
                  "Training loss (for one batch) at step %d: %.4f"
                  % (step, float(loss_value))
              )

      # Display metrics at the end of each epoch.
      train_acc = acc_metric['train'].result()
      print("Training acc over epoch: %.4f" % (float(train_acc),))

      # Reset training metrics at the end of each epoch
      acc_metric['train'].reset_state()

      # Run a validation loop at the end of each epoch.
      for inputs, labels_val in tf_eval_set:
        val_logits = model.predict(inputs, verbose=0)
        val_logits = val_logits[:labels_val.shape[0]]
        acc_metric['val'].update_state(labels_val, val_logits)

      print("Running Validation...")
      val_acc = acc_metric['val'].result()

      print("Validation acc: %.4f" % (float(val_acc),))

      acc_metric['val'].reset_state()
      print("Time taken: %.2fs" % (time.time() - start_time))
  return model


def train_layer(layer,
                tf_train_set,
                tf_eval_set,
                loss_fn,
                optimizer,
                epochs=2):
  acc_metric = {
      'train': keras.metrics.SparseCategoricalAccuracy(),
      'val': keras.metrics.SparseCategoricalAccuracy()
  }

  for epoch in range(epochs):
      print("\nStart of epoch %d" % (epoch,))
      start_time = time.time()


      for step, (inputs, labels) in enumerate(tf_train_set):
          with tf.GradientTape() as tape:
              logits = layer(inputs['input_ids'],
                             inputs['attention_mask'],
                             training=True)
              loss_value = loss_fn(labels, logits)
          grads = tape.gradient(loss_value, layer.trainable_weights)
          optimizer.apply_gradients(zip(grads, layer.trainable_weights))

          acc_metric['train'].update_state(labels, logits)
          # Log every 200 batches.
          if step % 200 == 0:
              print(
                  "Training loss (for one batch) at step %d: %.4f"
                  % (step, float(loss_value))
              )

      # Display metrics at the end of each epoch.
      train_acc = acc_metric['train'].result()
      print("Training acc over epoch: %.4f" % (float(train_acc),))

      # Reset training metrics at the end of each epoch
      acc_metric['train'].reset_state()

      # Run a validation loop at the end of each epoch.
      for inputs, labels_val in tf_eval_set:
        val_logits = layer(inputs['input_ids'],
                           inputs['attention_mask'],
                           training=False)
        val_logits = val_logits[:labels_val.shape[0]]
        acc_metric['val'].update_state(labels_val, val_logits)

      print("Running Validation...")
      val_acc = acc_metric['val'].result()

      print("Validation acc: %.4f" % (float(val_acc),))

      acc_metric['val'].reset_state()
      print("Time taken: %.2fs" % (time.time() - start_time))
  return layer

# Sentiment Analysis

In [None]:
import pandas as pd
from datasets import Dataset

data = pd.read_csv('https://raw.githubusercontent.com/laxmimerit/All-CSV-ML-Data-Files-Download/master/IMDB-Dataset.csv')

dataset = Dataset.from_pandas(data)
dataset = dataset.train_test_split(test_size=0.3)

label2id = {'negative': 0, 'positive': 1}
id2label = {0:'negative', 1:'positive'}

dataset = dataset.map(lambda x: {'labels': label2id[x['sentiment']]})

checkpoint = 'distilbert-base-cased'
tokenizer = AutoTokenizer.from_pretrained(checkpoint)


def tokenize_fn(batch):
  return tokenizer(batch['review'], truncation=True)
tokenized_datasets = dataset.map(tokenize_fn,
                                 batched=True)

tokenized_datasets = tokenized_datasets.remove_columns(["sentiment", "review"])

In [None]:
data_collator = DataCollatorWithPadding(tokenizer=tokenizer,
                                        return_tensors="tf")

tf_train_set = tokenized_datasets["train"].to_tf_dataset(
    columns=["attention_mask", "input_ids"],
    label_cols=["labels"],
    shuffle=True,
    batch_size=16,
    collate_fn=data_collator)

# evaluation dataset is mislabeld
tf_eval_set = tokenized_datasets["test"].to_tf_dataset(
    columns=["attention_mask", "input_ids"],
    label_cols=["labels"],
    shuffle=False,
    batch_size=16,
    collate_fn=data_collator
)

optimizer = keras.optimizers.AdamW()
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)


config = TransformerConfig(vocab_size=tokenizer.vocab_size,
                           max_len=tokenizer.model_max_length,
                           d_k=8,
                           d_model=16,
                           n_heads=1,
                           n_layers=1,
                           n_classes=len(set(tokenized_datasets['train']['labels'])),
                           dropout_prob=0.2)

encoder_model = config.create_model(metrics=['accuracy'],
                                    loss_fn=loss_fn,
                                    optimizer=optimizer)

In [None]:
checkpoint_path = "trained_model.keras"
checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(checkpoint_path,
                                                         save_weights_only=False,
                                                         monitor="val_accuracy",
                                                         save_best_only=True)

reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(monitor="val_loss",
                                                 factor=0.2,
                                                 patience=1,
                                                 verbose=1, # print out when learning rate goes down
                                                 min_lr=1e-7)

In [None]:
encoder_model.fit(tf_train_set,
                  epochs=5,
                  validation_data=tf_eval_set,
                  callbacks=[checkpoint_callback, reduce_lr])

Epoch 1/5
[1m2188/2188[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m627s[0m 280ms/step - accuracy: 0.5508 - loss: 0.6736 - val_accuracy: 0.8715 - val_loss: 0.3059 - learning_rate: 0.0010
Epoch 2/5
[1m2188/2188[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m264s[0m 120ms/step - accuracy: 0.8737 - loss: 0.3117 - val_accuracy: 0.8937 - val_loss: 0.2609 - learning_rate: 0.0010
Epoch 3/5
[1m2188/2188[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 45ms/step - accuracy: 0.9086 - loss: 0.2366
Epoch 3: ReduceLROnPlateau reducing learning rate to 0.00020000000949949026.
[1m2188/2188[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m104s[0m 47ms/step - accuracy: 0.9086 - loss: 0.2366 - val_accuracy: 0.8903 - val_loss: 0.2805 - learning_rate: 0.0010
Epoch 4/5
[1m2183/2188[0m [32m━━━━━━━━━━━━━━━━━━━[0m[37m━[0m [1m0s[0m 67ms/step - accuracy: 0.9340 - loss: 0.1834
Epoch 4: ReduceLROnPlateau reducing learning rate to 4.0000001899898055e-05.
[1m2188/2188[0m [32m━━━━━━━━━━━━━━━━━

<keras.src.callbacks.history.History at 0x7c6eb12a61a0>

In [None]:
encoder_model.evaluate(tf_eval_set)

[1m938/938[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m11s[0m 12ms/step - accuracy: 0.8991 - loss: 0.2796


[0.28324806690216064, 0.8981333374977112]

In [None]:
encoder_model.save('trained_model.keras')
loaded_model = tf.keras.models.load_model('trained_model.keras')

In [None]:
loaded_model.evaluate(tf_eval_set)

[1m938/938[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m72s[0m 74ms/step - accuracy: 0.8991 - loss: 0.2796


[0.28324806690216064, 0.8981333374977112]

In [None]:
pipe = keras_text_class_pipeline('trained_model.keras', tokenizer)
pipe(['this movie was great'])

In [None]:
from google.colab import files
files.download('trained_model.keras')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>