In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf

# CRT Model

## SELF-ATTENTION Calculation

In [None]:
def scaled_dot_product_attention(q, k, v):
    """
    Args:
    q: query shape == (..., seq_len_q, depth) # NOTE: depth=dk
    k: key shape == (..., seq_len_k, depth)
    v: value shape == (..., seq_len_v, depth_v)
    mask: Float tensor with shape broadcastable to (..., seq_len_q, seq_len_k). Defaults to None.

    Returns:
    output, attention_weights
    """
    matmul_qk = tf.matmul(q, k, transpose_b=True)  # (..., seq_len_q, seq_len_k)
    # scale matmul_qk. underroot d_model i.e. underroot(100)
    dk = tf.cast(tf.shape(k)[-1], tf.float32)
    scaled_attention_logits = matmul_qk / tf.math.sqrt(dk)
    # softmax is normalized on the last axis (seq_len_k) so that the scores add up to 1.
    attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1)  # (..., seq_len_q, seq_len_k)
    output = tf.matmul(attention_weights, v)  # (..., seq_len_q, depth_v)

    return output, attention_weights

## MultiHeadAttention Calculation

In [None]:
class MultiHeadAttention(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        self.num_heads = num_heads
        self.d_model = d_model  # typically 512

        assert d_model % self.num_heads == 0

        self.depth = d_model // self.num_heads

        self.wq = tf.keras.layers.Dense(d_model)
        self.wk = tf.keras.layers.Dense(d_model)
        self.wv = tf.keras.layers.Dense(d_model)

        self.dense = tf.keras.layers.Dense(d_model)

    def split_heads(self, x, batch_size):
        """Split the last dimension into (num_heads, depth).
        Transpose the result such that the shape is (batch_size, num_heads, seq_len, depth)
        """
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def call(self, v, k, q):
        batch_size = tf.shape(q)[0]

        q = self.wq(q)  # (batch_size, seq_len, d_model)
        k = self.wk(k)  # (batch_size, seq_len, d_model)
        v = self.wv(v)  # (batch_size, seq_len, d_model)

        q = self.split_heads(q, batch_size)  # (batch_size, num_heads, seq_len_q, depth)
        k = self.split_heads(k, batch_size)  # (batch_size, num_heads, seq_len_k, depth)
        v = self.split_heads(v, batch_size)  # (batch_size, num_heads, seq_len_v, depth)

        # scaled_attention.shape == (batch_size, num_heads, seq_len_q, depth)
        # attention_weights.shape == (batch_size, num_heads, seq_len_q, seq_len_k)
        scaled_attention, attention_weights = scaled_dot_product_attention(
            q, k, v)
        scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])  # (batch_size, seq_len_q, num_heads, depth)

        concat_attention = tf.reshape(scaled_attention,
                                      (batch_size, -1, self.d_model))  # (batch_size, seq_len_q, d_model)

        output = self.dense(concat_attention)  # (batch_size, seq_len_q, d_model)

        return output, attention_weights

## Attention Layers

In [None]:
class AttentionLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, rate=0.1):
        super(AttentionLayer, self).__init__()

        self.mha1 = MultiHeadAttention(d_model, num_heads)
        self.mha2 = MultiHeadAttention(d_model, num_heads)
        self.cross_mha = MultiHeadAttention(d_model, num_heads)

        self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm3 = tf.keras.layers.LayerNormalization(epsilon=1e-6)

        self.dropout1 = tf.keras.layers.Dropout(rate)
        self.dropout2 = tf.keras.layers.Dropout(rate)
        self.dropout3 = tf.keras.layers.Dropout(rate)

    def call(self, x1, x2, training):
        attn1_output, _ = self.mha1(x1, x1, x1)
        attn1_output = self.dropout1(attn1_output, training=training)
        outx1 = self.layernorm1(x1 + attn1_output)

        attn2_output, _ = self.mha2(x2, x2, x2)
        attn2_output = self.dropout2(attn2_output, training=training)
        outx2 = self.layernorm2(x2 + attn2_output)

        attn3_output, _ = self.cross_mha(outx1, outx1, outx2)
        attn3_output = self.dropout3(attn3_output, training=training)
        out = self.layernorm3(outx2 + attn3_output)

        return out

In [None]:
class MultiLevelAttention(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, input_vocab_size, rate=0.1):
        super(MultiLevelAttention, self).__init__()
        self.d_model = d_model

        self.embedding1 = tf.keras.layers.Embedding(input_vocab_size, d_model)
        self.embedding2 = tf.keras.layers.Embedding(input_vocab_size, d_model)

        self.attention_layers = AttentionLayer(d_model, num_heads, rate)

    def call(self, x1, x2, training):

        x1 = self.embedding1(x1)
        x2 = self.embedding2(x2)

        x = self.attention_layers(x1, x2, training)

        return x

## Dual Feed Forward Layer

In [None]:
def point_wise_feed_forward_network(d_model, dff):
    return tf.keras.Sequential([
      tf.keras.layers.Dense(dff, activation='relu'),
      tf.keras.layers.Dense(d_model)
    ])

class DualFeedForwardLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, dff, rate=0.1):
        super(DualFeedForwardLayer, self).__init__()

        self.ffn1 = point_wise_feed_forward_network(d_model, dff)
        self.ffn2 = point_wise_feed_forward_network(d_model, dff)

        self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)

        self.dropout1 = tf.keras.layers.Dropout(rate)
        self.dropout2 = tf.keras.layers.Dropout(rate)

    def call(self, x, training):
        ffn1_output = self.ffn1(x)
        ffn1_output = self.dropout1(ffn1_output, training=training)
        out1 = self.layernorm1(x + ffn1_output)

        ffn2_output = self.ffn2(x)
        ffn2_output = self.dropout2(ffn2_output, training=training)
        out2 = self.layernorm2(x + ffn2_output)

        return out1, out2

## Classification Regression Transformer

In [None]:
class Transformer(tf.keras.Model):
    def __init__(self, d_model, num_heads, dff, input_vocab_size,
               target_categories, rate=0.1):
        super(Transformer, self).__init__()

        self.mla = MultiLevelAttention(d_model, num_heads, input_vocab_size, rate)

        self.dffnl = DualFeedForwardLayer(d_model, dff, rate)

        self.Classification_layer = tf.keras.layers.Dense(target_categories)
        self.Regression_Layer = tf.keras.layers.Dense(1, activation='relu')

    def call(self, x1, x2, training):

        mla_output = self.mla(x1, x2, training)

        dffnl_output1, dffnl_output2 = self.dffnl(mla_output, training)

        final_output1 = self.Classification_layer(tf.reduce_mean(dffnl_output1, axis=1))
        final_output2 = self.Regression_Layer(tf.reduce_mean(dffnl_output2, axis=1))

        final_output1 = tf.nn.softmax(final_output1, axis=-1)

        return final_output1, final_output2

## Model Summery

In [None]:
# Instantiate the Transformer model hyperparameters
d_model = 128
num_heads = 4
dff = 128
input_vocab_size = 10000
target_categories = 1200
rate = 0.1

# Assuming you have instantiated your model
sample_transformer = Transformer(d_model, num_heads, dff, input_vocab_size, target_categories)

# Temp Imputs
batch_size = 32
sequence_length1 = 100
sequence_length2 = 150
dummy_input1 = tf.ones((batch_size, sequence_length1))
dummy_input2 = tf.ones((batch_size, sequence_length2))

# Call the model on the dummy data to build the model
sample_transformer(dummy_input1, dummy_input2)

# Now you can print the summary
sample_transformer.summary()


Model: "transformer"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 multi_level_attention (Mul  multiple                  2758912   
 tiLevelAttention)                                               
                                                                 
 dual_feed_forward_layer (D  multiple                  66560     
 ualFeedForwardLayer)                                            
                                                                 
 dense_16 (Dense)            multiple                  154800    
                                                                 
 dense_17 (Dense)            multiple                  129       
                                                                 
Total params: 2980401 (11.37 MB)
Trainable params: 2980401 (11.37 MB)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________


## Instantiate the Transformer model

In [None]:
transformer_model = Transformer(d_model, num_heads, dff, input_vocab_size, target_categories)

## Instantiate the Loss and Accuracy Matrics

In [None]:
# Define the loss functions for classification and regression tasks
classification_loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False)
regression_loss_fn = tf.keras.losses.MeanSquaredError()

# Define the optimizer
optimizer = tf.keras.optimizers.Adam()

# Define metrics to track during training
classification_loss_metric = tf.keras.metrics.Mean(name='classification_loss')
regression_loss_metric = tf.keras.metrics.Mean(name='regression_loss')
classification_accuracy_metric = tf.keras.metrics.SparseCategoricalAccuracy(name='classification_accuracy')

## Model checkpoint

In [None]:
checkpoint_path = ".ipynb_checkpoints"
log_file_path = "logging.txt"

ckpt = tf.train.Checkpoint(transformer=transformer_model,
                           optimizer=optimizer)

ckpt_manager = tf.train.CheckpointManager(ckpt, checkpoint_path, max_to_keep=5)

def save_checkpoint(epoch):
    ckpt_save_path = ckpt_manager.save()
    with open(ckpt_manager.latest_checkpoint + '-epoch.txt', 'w') as f:
        f.write(str(epoch+1))
    print ('Saving checkpoint for epoch {} at {}'.format(epoch+1,ckpt_save_path))

    with open(log_file_path, 'a') as log_file:
        log_file.write(f"Epoch {epoch + 1}: C-Loss - {classification_loss_metric.result()}, R-Loss - {regression_loss_metric.result()}, C-Acc - {classification_accuracy_metric.result()}\n")

# Function to load the model and epoch information
def load_checkpoint():
    latest_checkpoint = ckpt_manager.latest_checkpoint
    epoch = 0
    if latest_checkpoint:
        ckpt.restore(latest_checkpoint)
        print('Latest checkpoint restored!!')
        try:
            with open(latest_checkpoint + '-epoch.txt', 'r') as f:
                epoch = int(f.read())
            print(f"Model restored from {latest_checkpoint}, trained up to epoch {epoch}")
        except FileNotFoundError:
            print(f"Epoch information not found for {latest_checkpoint}")
    else:
        print("No checkpoint found.")
    return epoch

In [None]:
latest_checkpoint_epoch = load_checkpoint()

No checkpoint found.


### Using gradient tape for getting derivatives of loss functions w.r.t. weights then applyiing to optimizer => BACKPROPAGATION

In [None]:
@tf.function
def train_step(input_data1, input_data2, targets_classification, targets_regression):
    with tf.GradientTape() as tape:
        # Forward pass
        classification_predictions, regression_predictions = transformer_model(input_data1, input_data2, training=True)

        # Calculate losses
        classification_loss = classification_loss_fn(targets_classification, classification_predictions)
        regression_loss = regression_loss_fn(targets_regression, regression_predictions)
        total_loss = classification_loss + regression_loss

    # Calculate gradients
    gradients = tape.gradient(total_loss, transformer_model.trainable_variables)

    # Update weights
    optimizer.apply_gradients(zip(gradients, transformer_model.trainable_variables))

    # Update metrics
    classification_loss_metric.update_state(classification_loss)
    regression_loss_metric.update_state(regression_loss)
    classification_accuracy_metric.update_state(targets_classification, classification_predictions)

## Training

In [None]:
# Dummy training data (replace with actual data)
num_train_samples = 1000
num_classes = 10
train_data1 = np.random.rand(num_train_samples, sequence_length1).astype(np.float32)
train_data2 = np.random.rand(num_train_samples, sequence_length2).astype(np.float32)
train_targets_classification = np.random.randint(0, num_classes, size=(num_train_samples,), dtype=np.int32)
train_targets_regression = np.random.rand(num_train_samples, 1).astype(np.float32)

# Define hyperparameters and training configurations
num_epochs = 5
batch_size = 32
metrics_names = ['C_loss', 'R_loss', 'C_acc']

# Training loop
for epoch in range(latest_checkpoint_epoch,num_epochs):
    # Reset the metrics at the start of each epoch
    classification_loss_metric.reset_states()
    regression_loss_metric.reset_states()
    classification_accuracy_metric.reset_states()

    print("\nepoch {}/{}".format(epoch+1,num_epochs))
    pb_i = tf.keras.utils.Progbar(train_data1.shape[0], stateful_metrics=metrics_names)

    # Iterate over the training dataset in batches
    for i in range(0, len(train_data1), batch_size):
        batch_data1 = train_data1[i:i+batch_size]
        batch_data2 = train_data2[i:i+batch_size]
        batch_targets_classification = train_targets_classification[i:i+batch_size]
        batch_targets_regression = train_targets_regression[i:i+batch_size]

        # Perform a training step
        train_step(batch_data1, batch_data2, batch_targets_classification, batch_targets_regression)

        values=[('C_loss',classification_loss_metric.result()),
                ('R_loss',regression_loss_metric.result()),
                ('C_acc',classification_accuracy_metric.result())]

        pb_i.add(batch_size, values=values)

    if (epoch+1) % 5 == 0:
        save_checkpoint(epoch)



epoch 1/5

epoch 2/5

epoch 3/5

epoch 4/5

epoch 5/5
Saving checkpoint for epoch 5 at .ipynb_checkpoints/ckpt-1


# Predictions

In [None]:
# Define batch size and sequence lengths
batch_size = 1
sequence_length1 = 5
sequence_length2 = 15

# Generate random input data
input_data1 = np.random.rand(batch_size, sequence_length1).astype(np.float32)
input_data2 = np.random.rand(batch_size, sequence_length2).astype(np.float32)

# Call the model to get predictions
classification_predictions, regression_predictions = transformer_model(input_data1, input_data2, training=True)

# Process the outputs
# For classification predictions
classification_predictions = classification_predictions
class_predictions = tf.argmax(classification_predictions, axis=-1)

# For regression predictions
regression_values = tf.cast(tf.round(regression_predictions), dtype=tf.int32)


print("Classification Predictions:", class_predictions)
print("Regression Predictions:", regression_values)


Classification Predictions: tf.Tensor([9], shape=(1,), dtype=int64)
Regression Predictions: tf.Tensor([[0]], shape=(1, 1), dtype=int32)
