In [1]:
import tensorflow as tf
import keras

In [2]:
# a = tf.constant([[1,2,3],[4,5,6]], tf.int32)
# b = tf.constant([2,2], tf.int32)
# tf.tile(a, b)

In [3]:
import tensorflow as tf
from tensorflow import keras

class SelfAttention(keras.Model):
    def __init__(self, embed_size, head):
        super(SelfAttention, self).__init__()
        self.embed_size = embed_size
        self.head = head
        self.head_dim = embed_size // head

        assert (self.embed_size == self.head * self.head_dim), "Embedding size needs to be divisible by heads"

        self.values = keras.layers.Dense(units=embed_size)
        self.keys = keras.layers.Dense(units=embed_size)
        self.query = keras.layers.Dense(units=embed_size)
        self.fc = keras.layers.Dense(units=embed_size)

    def call(self, values, keys, query, mask):
        N = tf.shape(query)[0]
        value_len, key_len, query_len = tf.shape(values)[1], tf.shape(keys)[1], tf.shape(query)[1]

        # Linear transformations
        values = self.values(values)  # shape: (N, value_len, embed_size)
        keys = self.keys(keys)        # shape: (N, key_len, embed_size)
        queries = self.query(query)   # shape: (N, query_len, embed_size)

        # Reshape for multi-head attention
        values = tf.reshape(values, (N, value_len, self.head, self.head_dim))
        keys = tf.reshape(keys, (N, key_len, self.head, self.head_dim))
        queries = tf.reshape(queries, (N, query_len, self.head, self.head_dim))

        # Compute attention scores
        energy = tf.einsum("nqhd,nkhd->nhqk", queries, keys)

        # Apply the mask if present
        if mask is not None:
            mask = tf.cast(mask, dtype=tf.bool)
            energy = tf.where(mask, energy, tf.fill(tf.shape(energy), -1e9))

        # Attention weights
        attention = tf.nn.softmax(energy / tf.sqrt(tf.cast(self.head_dim, dtype=tf.float32)), axis=-1)

        # Weighted sum of values
        out = tf.einsum("nhqk,nkhd->nqhd", attention, values)
        out = tf.reshape(out, (N, query_len, self.head * self.head_dim))

        # Final linear layer
        out = self.fc(out)
        return out

class TransformerBlock(keras.Model):
    def __init__(self, embed_size, head, dropout, forward_expansion):
        super(TransformerBlock, self).__init__()
        self.attention_layer = SelfAttention(embed_size, head)
        self.norm1 = keras.layers.LayerNormalization(axis=-1)
        self.norm2 = keras.layers.LayerNormalization(axis=-1)
        self.feed_forward = keras.Sequential([
            keras.layers.Dense(units=forward_expansion * embed_size, activation="relu"),
            keras.layers.Dense(units=embed_size)
        ])
        self.dropout = keras.layers.Dropout(dropout)

    def call(self, value, key, query, mask):
        attention = self.attention_layer(value, key, query, mask)
        x = self.dropout(self.norm1(attention + query))

        forward = self.feed_forward(x)
        out = self.dropout(self.norm2(forward + x))
        return out

class Encoder(keras.Model):
    def __init__(self, src_vocab_size, embed_size, num_layers, heads, forward_expansion, dropout, max_length):
        super(Encoder, self).__init__()
        self.embed_size = embed_size
        self.word_embedding = keras.layers.Embedding(input_dim=src_vocab_size, output_dim=embed_size)
        self.position_embedding = keras.layers.Embedding(input_dim=max_length, output_dim=embed_size)
        self.encoder_layers = [TransformerBlock(embed_size, heads, dropout, forward_expansion) for _ in range(num_layers)]
        self.dropout = keras.layers.Dropout(dropout)

    def call(self, x, mask):
        N = tf.shape(x)[0]
        seq_length = tf.shape(x)[1]

        # Generate positions
        positions = tf.range(start=0, limit=seq_length, delta=1)
        positions = tf.expand_dims(positions, axis=0)
        positions = tf.tile(positions, [N, 1])

        # Apply embeddings
        out = self.dropout(self.word_embedding(x) + self.position_embedding(positions))

        # Pass through transformer layers
        for layer in self.encoder_layers:
            out = layer(out, out, out, mask)

        return out

class DecoderBlock(keras.Model):
    def __init__(self, embed_size, heads, forward_expansion, dropout):
        super(DecoderBlock, self).__init__()
        self.norm1 = keras.layers.LayerNormalization(axis=-1)
        self.attention_layer = SelfAttention(embed_size, heads)
        self.transformer_block = TransformerBlock(embed_size, heads, dropout, forward_expansion)
        self.dropout = keras.layers.Dropout(dropout)

    def call(self, x, value, key, src_mask, trg_mask):
        attention = self.attention_layer(x, x, x, trg_mask)
        query = self.dropout(self.norm1(attention + x))
        out = self.transformer_block(value, key, query, src_mask)
        return out

class Decoder(keras.Model):
    def __init__(self, trg_vocab_size, embed_size, num_layers, heads, forward_expansion, dropout, max_length):
        super(Decoder, self).__init__()
        self.word_embedding = keras.layers.Embedding(input_dim=trg_vocab_size, output_dim=embed_size)
        self.position_embedding = keras.layers.Embedding(input_dim=max_length, output_dim=embed_size)
        self.decoder_layers = [DecoderBlock(embed_size, heads, forward_expansion, dropout) for _ in range(num_layers)]
        self.fc_out = keras.layers.Dense(trg_vocab_size)
        self.dropout = keras.layers.Dropout(dropout)

    def call(self, x, enc_out, src_mask, trg_mask):
        N = tf.shape(x)[0]
        seq_length = tf.shape(x)[1]

        # Generate positions
        positions = tf.range(start=0, limit=seq_length)
        positions = tf.expand_dims(positions, axis=0)
        positions = tf.tile(positions, [N, 1])

        x = self.dropout(self.word_embedding(x) + self.position_embedding(positions))
        for layer in self.decoder_layers:
            x = layer(x, enc_out, enc_out, src_mask, trg_mask)
        
        out = self.fc_out(x)
        return out

class Transformer(keras.Model):
    def __init__(self, src_vocab_size, trg_vocab_size, src_pad_idx, trg_pad_idx, embed_size=128,
                 num_layers=6, forward_expansion=4, heads=8, dropout=0, max_length=500):
        super(Transformer, self).__init__()
        self.encoder = Encoder(src_vocab_size, embed_size, num_layers, heads, forward_expansion, dropout, max_length)
        self.decoder = Decoder(trg_vocab_size, embed_size, num_layers, heads, forward_expansion, dropout, max_length)
        self.src_pad_idx = src_pad_idx
        self.trg_pad_idx = trg_pad_idx

    def make_src_mask(self, source):
        src_mask = tf.cast(tf.math.not_equal(source, self.src_pad_idx), tf.float32)
        src_mask = tf.expand_dims(tf.expand_dims(src_mask, axis=1), axis=2) # (batch_size, 1, 1, src_len)
        return src_mask
    
    def make_trg_mask(self, target):
        N = tf.shape(target)[0]
        target_len = tf.shape(target)[1]

        mask = tf.linalg.band_part(input = tf.ones((target_len, target_len)), num_lower = -1, num_upper = 0)    ### Lower triangular matrix
        trg_mask = tf.tile(input= tf.expand_dims(input= mask, axis= 0), multiples= [N, 1, 1])       ### (batch_size, trg_len, trg_len)
        return trg_mask

    def call(self, src, trg):
        src_mask = self.make_src_mask(src)
        trg_mask = self.make_trg_mask(trg)

        enc_src = self.encoder(src, src_mask)
        out = self.decoder(trg, enc_src, src_mask, trg_mask)
        return out


In [4]:
# # Constants
# src_pad_idx = 0
# trg_pad_idx = 0
# src_vocab_size = 10
# trg_vocab_size = 10
# embed_size = 128
# num_layers = 2
# heads = 2
# forward_expansion = 4
# dropout = 0.1
# max_length = 9  # Length of your sequences

# # Sample training data
# x = tf.constant([[1, 5, 6, 4, 3, 9, 5, 2, 0], [1, 8, 7, 3, 4, 5, 6, 7, 2]])
# trg = tf.constant([[1, 7, 4, 3, 5, 9, 2, 0, 0], [1, 5, 6, 2, 4, 7, 6, 2, 0]])

# # Initialize the Transformer model
# model = Transformer(src_vocab_size, trg_vocab_size, src_pad_idx, trg_pad_idx, 
#                     embed_size=embed_size, num_layers=num_layers, heads=heads, 
#                     forward_expansion=forward_expansion, dropout=dropout, max_length=max_length)

# # Prepare target data
# trg_input = trg[:, :-1]
# trg_output = trg[:, 1:]

# # Define loss and optimizer
# loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none')
# optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)

# # Custom loss function to handle padding
# def custom_loss(y_true, y_pred):
#     mask = tf.math.logical_not(tf.math.equal(y_true, trg_pad_idx))
#     loss = loss_object(y_true, y_pred)

#     mask = tf.cast(mask, dtype=loss.dtype)
#     loss *= mask

#     return tf.reduce_sum(loss) / tf.reduce_sum(mask)

# # Compile the model
# model.compile(optimizer=optimizer, loss=custom_loss)

# # Train the model
# model.fit([x, trg_input], trg_output, batch_size=2, epochs=5)

In [None]:
# class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):
#     def __init__(self, d_model, warmup_steps=4000):
#         super(CustomSchedule, self).__init__()
#         self.d_model = d_model
#         self.d_model = tf.cast(self.d_model, tf.float32)
#         self.warmup_steps = warmup_steps

#     def __call__(self, step):
#         arg1 = tf.math.rsqrt(step)
#         arg2 = step * (self.warmup_steps**-1.5)

#         return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2)

# learning_rate = CustomSchedule(embed_size)
# optimizer = tf.keras.optimizers.Adam(learning_rate, beta_1=0.9, beta_2=0.98, epsilon=1e-9)


In [5]:
import tensorflow as tf
from tensorflow import keras

class CustomTransformer(Transformer):
    def __init__(self, src_vocab_size, trg_vocab_size, src_pad_idx, trg_pad_idx, embed_size=128,
                 num_layers=2, forward_expansion=4, heads=2, dropout=0.1, max_length=500):
        super(CustomTransformer, self).__init__(src_vocab_size, trg_vocab_size, src_pad_idx, trg_pad_idx,
                                                embed_size, num_layers, forward_expansion, heads, dropout, max_length)
    
    def train_step(self, data):
        # Unpack the data
        (src, trg_input), trg_output = data

        with tf.GradientTape() as tape:
            # Forward pass
            y_pred = self.call(src, trg_input)
            
            # Compute the custom loss
            loss = self.compiled_loss(trg_output, y_pred, regularization_losses=self.losses)

        # Compute gradients
        trainable_vars = self.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)
        
        # Update weights
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))
        
        # Update metrics
        self.compiled_metrics.update_state(trg_output, y_pred)
        
        # Return a dict mapping metric names to current values
        return {m.name: m.result() for m in self.metrics}

# Constants
src_pad_idx = 0
trg_pad_idx = 0
src_vocab_size = 10
trg_vocab_size = 10
embed_size = 128
num_layers = 2
heads = 2
forward_expansion = 4
dropout = 0.1
max_length = 9  # Length of your sequences

# Sample training data
x = tf.constant([[1, 5, 6, 4, 3, 9, 5, 2, 0], [1, 8, 7, 3, 4, 5, 6, 7, 2]])
trg = tf.constant([[1, 7, 4, 3, 5, 9, 2, 0, 0], [1, 5, 6, 2, 4, 7, 6, 2, 0]])

# Initialize the Custom Transformer model
model = CustomTransformer(src_vocab_size, trg_vocab_size, src_pad_idx, trg_pad_idx, 
                          embed_size=embed_size, num_layers=num_layers, heads=heads, 
                          forward_expansion=forward_expansion, dropout=dropout, max_length=max_length)

# Prepare target data
trg_input = trg[:, :-1]
trg_output = trg[:, 1:]

# Define loss and optimizer
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none')
optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)

# Custom loss function to handle padding
def custom_loss(y_true, y_pred):
    mask = tf.math.logical_not(tf.math.equal(y_true, trg_pad_idx))
    loss = loss_object(y_true, y_pred)

    mask = tf.cast(mask, dtype=loss.dtype)
    loss *= mask

    return tf.reduce_sum(loss) / tf.reduce_sum(mask)

# Compile the model
model.compile(optimizer=optimizer, loss=custom_loss)

# Train the model
model.fit([x, trg_input], trg_output, batch_size=2, epochs=100)


Epoch 1/100


```
for metric in self.metrics:
    metric.update_state(y, y_pred)
```

  return self._compiled_metrics_update_state(


[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m12s[0m 12s/step - loss: -0.6230
Epoch 2/100
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 66ms/step - loss: 0.0103
Epoch 3/100
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 68ms/step - loss: -0.0335
Epoch 4/100
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 68ms/step - loss: -0.1241
Epoch 5/100
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 65ms/step - loss: 0.0353
Epoch 6/100
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 61ms/step - loss: 0.1162
Epoch 7/100
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 70ms/step - loss: 0.1392
Epoch 8/100
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 73ms/step - loss: 0.1283
Epoch 9/100
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 65ms/step - loss: 0.0772
Epoch 10/100
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 60ms/step - loss: -0.0068
Epoch 11/100
[1m

<keras.src.callbacks.history.History at 0x1d5359097c0>

In [6]:
# import tensorflow as tf
# from tensorflow import keras

# class CustomTransformer(Transformer):
#     def __init__(self, src_vocab_size, trg_vocab_size, src_pad_idx, trg_pad_idx, embed_size=128,
#                  num_layers=2, forward_expansion=4, heads=2, dropout=0.1, max_length=500):
#         super(CustomTransformer, self).__init__(src_vocab_size, trg_vocab_size, src_pad_idx, trg_pad_idx,
#                                                 embed_size, num_layers, forward_expansion, heads, dropout, max_length)

#     def train_step(self, data):
#         # Unpack the data
#         (src, trg_input), trg_output = data

#         with tf.GradientTape() as tape:
#             # Forward pass
#             y_pred = self.call(src, trg_input)  # Ensure training=True to enable dropout and other training-specific behavior
            
#             # Compute the custom loss
#             loss = self.loss(trg_output, y_pred)

#         # Compute gradients
#         trainable_vars = self.trainable_variables
#         gradients = tape.gradient(loss, trainable_vars)

#         # Update weights
#         self.optimizer.apply_gradients(zip(gradients, trainable_vars))

#         # Flatten the prediction and target tensors
#         y_pred_flat = tf.reshape(y_pred, [-1, y_pred.shape[-1]])  # Flatten to (batch_size * sequence_length, vocab_size)
#         trg_output_flat = tf.reshape(trg_output, [-1])  # Flatten to (batch_size * sequence_length,)

#         # Update metrics
#         for metric in self.metrics:
#             metric.update_state(trg_output_flat, y_pred_flat)

#         # Return a dict mapping metric names to current values
#         return {m.name: m.result() for m in self.metrics}


# # Constants
# src_pad_idx = 0
# trg_pad_idx = 0
# src_vocab_size = 10
# trg_vocab_size = 10
# embed_size = 128
# num_layers = 2
# heads = 2
# forward_expansion = 4
# dropout = 0.1
# max_length = 9  # Length of your sequences

# # Sample training data
# x = tf.constant([[1, 5, 6, 4, 3, 9, 5, 2, 0], [1, 8, 7, 3, 4, 5, 6, 7, 2]])
# trg = tf.constant([[1, 7, 4, 3, 5, 9, 2, 0, 0], [1, 5, 6, 2, 4, 7, 6, 2, 0]])

# # Initialize the Custom Transformer model
# model = CustomTransformer(src_vocab_size, trg_vocab_size, src_pad_idx, trg_pad_idx, 
#                           embed_size=embed_size, num_layers=num_layers, heads=heads, 
#                           forward_expansion=forward_expansion, dropout=dropout, max_length=max_length)

# # Prepare target data
# trg_input = trg[:, :-1]
# trg_output = trg[:, 1:]

# # Define loss and optimizer
# loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none')
# optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)

# # Custom loss function to handle padding
# def custom_loss(y_true, y_pred):
#     mask = tf.math.logical_not(tf.math.equal(y_true, trg_pad_idx))
#     loss = loss_object(y_true, y_pred)

#     mask = tf.cast(mask, dtype=loss.dtype)
#     loss *= mask

#     return tf.reduce_sum(loss) / tf.reduce_sum(mask)

# # Compile the model
# model.compile(optimizer=optimizer, loss=custom_loss)

# # Train the model
# model.fit([x, trg_input], trg_output, batch_size=2, epochs=100)
