<a href="https://colab.research.google.com/github/2003Yash/tranformer-language-translator/blob/main/Eng_%3E_Tel_Transformer_Code.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

source: https://github.com/ajhalthor/Transformer-Neural-Network

In [None]:
import tensorflow as tf
from tensorflow.keras.layers import Layer, Dense
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, Dropout, ReLU, LayerNormalization
import numpy as np
import math

def get_device():
    return "/GPU:0" if tf.config.list_physical_devices('GPU') else "/CPU:0"

def scaled_dot_product(q, k, v, mask=None):
    d_k = tf.shape(q)[-1]
    scaled = tf.matmul(q, k, transpose_b=True) / tf.math.sqrt(tf.cast(d_k, tf.float32))
    if mask is not None:
       scaled += mask  # TensorFlow supports broadcasting, so no need for permutation.
    attention = tf.nn.softmax(scaled, axis=-1)
    values = tf.matmul(attention, v)
    return values, attention

class PositionalEncoding(tf.keras.layers.Layer):
    def __init__(self, d_model, max_sequence_length):
        super(PositionalEncoding, self).__init__()
        self.max_sequence_length = max_sequence_length
        self.d_model = d_model

    def call(self):
        even_i = tf.range(0, self.d_model, 2, dtype=tf.float32)
        denominator = tf.pow(10000.0, even_i / self.d_model)
        position = tf.reshape(tf.range(self.max_sequence_length, dtype=tf.float32), (self.max_sequence_length, 1))
        even_PE = tf.sin(position / denominator)
        odd_PE = tf.cos(position / denominator)
        stacked = tf.stack([even_PE, odd_PE], axis=2)
        PE = tf.reshape(stacked, (self.max_sequence_length, self.d_model))
        return PE

class SentenceEmbedding(tf.keras.layers.Layer):
    "For a given sentence, create an embedding"
    def __init__(self, max_sequence_length, d_model, language_to_index, START_TOKEN, END_TOKEN, PADDING_TOKEN):
        super(SentenceEmbedding, self).__init__()
        self.vocab_size = len(language_to_index)
        self.max_sequence_length = max_sequence_length
        self.embedding = tf.keras.layers.Embedding(self.vocab_size, d_model)
        self.language_to_index = language_to_index
        self.position_encoder = PositionalEncoding(d_model, max_sequence_length)
        self.dropout = tf.keras.layers.Dropout(rate=0.1)
        self.START_TOKEN = START_TOKEN
        self.END_TOKEN = END_TOKEN
        self.PADDING_TOKEN = PADDING_TOKEN

    def batch_tokenize(self, batch, start_token, end_token):
        def tokenize(sentence, start_token, end_token):
            sentence_word_indices = [self.language_to_index[token] for token in list(sentence)]
            if start_token:
               sentence_word_indices.insert(0, self.language_to_index[self.START_TOKEN])
            if end_token:
               sentence_word_indices.append(self.language_to_index[self.END_TOKEN])
            for _ in range(len(sentence_word_indices), self.max_sequence_length):
                sentence_word_indices.append(self.language_to_index[self.PADDING_TOKEN])
            return tf.convert_to_tensor(sentence_word_indices, dtype=tf.int32)

        tokenized = []
        for sentence_num in range(len(batch)):
            tokenized.append(tokenize(batch[sentence_num], start_token, end_token))
        tokenized = tf.stack(tokenized)
        return tf.identity(tokenized)

    def forward(self, x, start_token, end_token):  # sentence
        x = self.batch_tokenize(x, start_token, end_token)
        x = self.embedding(x)
        pos = self.position_encoder()
        x = self.dropout(x + pos)
        return x

class MultiHeadAttention(Layer):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        self.d_model = d_model
        self.num_heads = num_heads
        self.head_dim = d_model // num_heads
        self.qkv_layer = Dense(3 * d_model)
        self.linear_layer = Dense(d_model)

    def call(self, x, mask):
        batch_size, sequence_length, d_model = tf.shape(x)
        qkv = self.qkv_layer(x)  # (batch_size, sequence_length, 3 * d_model)
        qkv = tf.reshape(qkv, (batch_size, sequence_length, self.num_heads, 3 * self.head_dim))
        qkv = tf.transpose(qkv, perm=[0, 2, 1, 3])  # (batch_size, num_heads, sequence_length, 3 * head_dim)
        q, k, v = tf.split(qkv, 3, axis=-1)
        values, attention = self.scaled_dot_product(q, k, v, mask)
        values = tf.transpose(values, perm=[0, 2, 1, 3])
        values = tf.reshape(values, (batch_size, sequence_length, self.num_heads * self.head_dim))
        out = self.linear_layer(values)
        return out

class LayerNormalization(Layer):
    def __init__(self, parameters_shape, epsilon=1e-5, **kwargs):
        super(LayerNormalization, self).__init__(**kwargs)
        self.parameters_shape = parameters_shape
        self.epsilon = epsilon
        self.gamma = self.add_weight(shape=parameters_shape, initializer='ones', trainable=True )
        self.beta = self.add_weight( shape=parameters_shape, initializer='zeros', trainable=True )

    def call(self, inputs):
        # Compute mean and variance
        mean, variance = tf.nn.moments(inputs, axes=list(range(len(inputs.shape) - len(self.parameters_shape))), keepdims=True)
        # Compute standard deviation
        std = tf.sqrt(variance + self.epsilon)
        # Normalize
        y = (inputs - mean) / std
        # Scale and shift
        out = self.gamma * y + self.beta
        return out

class PositionwiseFeedForward(tf.keras.layers.Layer):
    def __init__(self, d_model, hidden, drop_prob=0.1):
        super(PositionwiseFeedForward, self).__init__()
        self.dense1 = Dense(hidden, activation=None)  # No activation here, it's applied separately
        self.relu = ReLU()
        self.dropout = Dropout(rate=drop_prob)
        self.dense2 = Dense(d_model, activation=None)  # No activation here

    def call(self, x, training=False):
        x = self.dense1(x)
        x = self.relu(x)
        x = self.dropout(x, training=training)
        x = self.dense2(x)
        return x

class EncoderLayer(Layer):
    def __init__(self, d_model, ffn_hidden, num_heads, drop_prob):
        super(EncoderLayer, self).__init__()
        self.attention = MultiHeadAttention(d_model=d_model, num_heads=num_heads)
        self.norm1 = LayerNormalization(epsilon=1e-6, parameters_shape=(d_model,))
        self.dropout1 = Dropout(rate=drop_prob)
        self.ffn = PositionwiseFeedForward(d_model=d_model, hidden=ffn_hidden, drop_prob=drop_prob)
        self.norm2 = LayerNormalization(epsilon=1e-6, parameters_shape=(d_model,))
        self.dropout2 = Dropout(rate=drop_prob)

    def call(self, x, self_attention_mask):
        residual_x = x
        x = self.attention(x, mask=self_attention_mask)
        x = self.dropout1(x)
        x = self.norm1(x + residual_x)
        residual_x = x
        x = self.ffn(x)
        x = self.dropout2(x)
        x = self.norm2(x + residual_x)
        return x

class SequentialEncoder(tf.keras.layers.Layer):
    def call(self, inputs):
        x, self_attention_mask = inputs
        for layer in self._layers:
            x = layer([x, self_attention_mask])
        return x

class Encoder(tf.keras.layers.Layer):
    def __init__(self,
                 d_model,
                 ffn_hidden,
                 num_heads,
                 drop_prob,
                 num_layers,
                 max_sequence_length,
                 language_to_index,
                 START_TOKEN,
                 END_TOKEN,
                 PADDING_TOKEN):
        super(Encoder, self).__init__()
        self.sentence_embedding = SentenceEmbedding(max_sequence_length, d_model, language_to_index, START_TOKEN, END_TOKEN, PADDING_TOKEN)
        self.layers = SequentialEncoder()
        self.layers._layers = [EncoderLayer(d_model, ffn_hidden, num_heads, drop_prob) for _ in range(num_layers)]

    def call(self, x, self_attention_mask, start_token, end_token):
        x = self.sentence_embedding(x, start_token, end_token)
        x = self.layers([x, self_attention_mask])
        return x

class MultiHeadCrossAttention(Layer):
    def __init__(self, d_model, num_heads):
        super(MultiHeadCrossAttention, self).__init__()
        self.d_model = d_model
        self.num_heads = num_heads
        self.head_dim = d_model // num_heads

        self.kv_layer = Dense(2 * d_model)
        self.q_layer = Dense(d_model)
        self.linear_layer = Dense(d_model)

    def split_heads(self, x, batch_size):
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.head_dim))
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def call(self, x, y, mask=None):
        batch_size = tf.shape(x)[0]

        kv = self.kv_layer(x)
        q = self.q_layer(y)

        kv = self.split_heads(kv, batch_size)
        q = self.split_heads(q, batch_size)

        k, v = tf.split(kv, num_or_size_splits=2, axis=-1)

        values, attention = self.scaled_dot_product(q, k, v, mask)

        values = tf.transpose(values, perm=[0, 2, 1, 3])
        values = tf.reshape(values, (batch_size, -1, self.d_model))

        out = self.linear_layer(values)
        return out

class DecoderLayer(Layer):
    def __init__(self, d_model, ffn_hidden, num_heads, drop_prob):
        super(DecoderLayer, self).__init__()
        self.self_attention = MultiHeadAttention(d_model=d_model, num_heads=num_heads)
        self.layer_norm1 = LayerNormalization(epsilon=1e-6, parameters_shape=(d_model,))
        self.dropout1 = Dropout(rate=drop_prob)

        self.encoder_decoder_attention = MultiHeadCrossAttention(d_model=d_model, num_heads=num_heads)
        self.layer_norm2 = LayerNormalization(epsilon=1e-6, parameters_shape=(d_model,))
        self.dropout2 = Dropout(rate=drop_prob)

        self.ffn = PositionwiseFeedForward(d_model=d_model, hidden=ffn_hidden, drop_prob=drop_prob)
        self.layer_norm3 = LayerNormalization(epsilon=1e-6, parameters_shape=(d_model,))
        self.dropout3 = Dropout(rate=drop_prob)

    def call(self, x, y, self_attention_mask, cross_attention_mask, training=False):
        _y = tf.identity(y)
        y = self.self_attention(y, mask=self_attention_mask)
        y = self.dropout1(y, training=training)
        y = self.layer_norm1(y + _y)

        _y = tf.identity(y)
        y = self.encoder_decoder_attention(x, y, mask=cross_attention_mask)
        y = self.dropout2(y, training=training)
        y = self.layer_norm2(y + _y)

        _y = tf.identity(y)
        y = self.ffn(y)
        y = self.dropout3(y, training=training)
        y = self.layer_norm3(y + _y)
        return y

class SequentialDecoder(tf.keras.Model):
    def __init__(self, layers):
        super().__init__()
        self.seq_decoder_layers = layers

    def call(self, x, y, self_attention_mask, cross_attention_mask):
        for layer in self.seq_decoder_layers:
            y = layer(x, y, self_attention_mask, cross_attention_mask)
        return y

class Decoder(tf.keras.Model):
    def __init__(self,
                 d_model,
                 ffn_hidden,
                 num_heads,
                 drop_prob,
                 num_layers,
                 max_sequence_length,
                 language_to_index,
                 START_TOKEN,
                 END_TOKEN,
                 PADDING_TOKEN):
        super().__init__()
        self.sentence_embedding = SentenceEmbedding(max_sequence_length, d_model, language_to_index, START_TOKEN, END_TOKEN, PADDING_TOKEN)
        self.decoder_layers = SequentialDecoder([DecoderLayer(d_model, ffn_hidden, num_heads, drop_prob) for _ in range(num_layers)])

    def call(self, x, y, self_attention_mask, cross_attention_mask, start_token, end_token):
        y = self.sentence_embedding(y, start_token, end_token)
        y = self.decoder_layers(x, y, self_attention_mask, cross_attention_mask)
        return y

#this will generate the translation letter by letter not word by word
class Transformer(Model):
  #all architecture params are used to create an transformer object of this class
    def __init__(self,
                 d_model,
                 ffn_hidden,
                 num_heads,
                 drop_prob,
                 num_layers,
                 max_sequence_length,
                 kn_vocab_size,
                 english_to_index,
                 telugu_to_index,
                 START_TOKEN,
                 END_TOKEN,
                 PADDING_TOKEN):
        super().__init__()
        #building a encoder
        self.encoder = Encoder(d_model, ffn_hidden, num_heads, drop_prob, num_layers, max_sequence_length, english_to_index, START_TOKEN, END_TOKEN, PADDING_TOKEN)
        #building a decoder
        self.decoder = Decoder(d_model, ffn_hidden, num_heads, drop_prob, num_layers, max_sequence_length, telugu_to_index, START_TOKEN, END_TOKEN, PADDING_TOKEN)
        #creating ffn with same number of nodes as telugu vocab size
        self.linear = Dense(kn_vocab_size)

    def call(self,
             x,  #batch of english sentences
             y,  #batch of telugu sentences
             encoder_self_attention_mask=None,
             decoder_self_attention_mask=None,
             decoder_cross_attention_mask=None,
             enc_start_token=False,
             enc_end_token=False,
             dec_start_token=False,
             dec_end_token=False):
      # processing x with encoder to transformer object
        x = self.encoder(x, encoder_self_attention_mask, start_token=enc_start_token, end_token=enc_end_token)
       # combine outout of encoder and process it with decoder and we push telugu directly through decoder it will tokenize and embed the telugu and also compare them
        out = self.decoder(x, y, decoder_self_attention_mask, decoder_cross_attention_mask, start_token=dec_start_token, end_token=dec_end_token)
        #decoder output push through ffn
        out = self.linear(out)
        #output of ffn
        return out

Transformer Traning code starts from here

In [None]:
from google.colab import drive
drive.mount('/content/drive')

english_file = '/content/drive/MyDrive/transformer_dataset/train.en' # path of english file
telugu_file = '/content/drive/MyDrive/transformer_dataset/train.te' # path of telugu file


Mounted at /content/drive


In [None]:
# Generated this by filtering Appendix code

START_TOKEN = ''
PADDING_TOKEN = ''
END_TOKEN = ''

#source -> https://en.wikipedia.org/wiki/Telugu_(Unicode_block)
telugu_vocabulary = [START_TOKEN, ' ', '!', '"', '#', '$', '%', '&', "'", '(', ')', '*', '+', ',', '-', '.', '/',
                      '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ':', '<', '=', '>', '?', 'ˌ',
                   'ఀ','ఄ','౿',	'ఁ',	'ం',	'ః',
                     'అ',	'ఆ',	'ఇ',	'ఈ',	'ఉ',	'ఊ',	'ఋ',	'ఌ',	'ఎ', 'ఏ',
                     'ఐ',	'ఒ',	'ఓ',	'ఔ',
                     'క',	'ఖ',	'గ',	'ఘ',	'ఙ',
                     'చ',	'ఛ',	'జ',	'ఝ',	'ఞ',
                     'ట',	'ఠ',	'డ',	'ఢ',	'ణ',
                     'త',	'థ',	'ద',	'ధ',	'న',
                     'ప',	'ఫ',	'బ',	'భ',	'మ',
                     'య',	'ర',	'ఱ',	'ల',	'ళ',	'ఴ',
                     'వ',	'శ',	'ష',	'స',	'హ',
                     'ఽ',	'ాా',	'ిి',	'ీీ',	'ు',	'ూ','ృ','ౄ',
                     'ెె',	'ేే',	'ైైై','ొొ','ో','  ౌ',	'్', '్స', '్ణ', '్హ',
                     'ౕౕ',	'ౖౖ','ౘ',	'ౙ',	'ౚ',	'ౠ',	'ౡ',
                     'ౢౢ',	'ౣౣ',	'౦',	'౧',	'౨',	'౩',	'౪',
                     '౫',	'౬',	'౭',	'౮',	'౯', PADDING_TOKEN, END_TOKEN]
                     #copy paste more telugu lipi from wikipedia

english_vocabulary = [START_TOKEN, ' ', '!', '"', '#', '$', '%', '&', "'", '(', ')', '*', '+', ',', '-', '.', '/',
                        '0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
                        ':', '<', '=', '>', '?', '@', '[',
                        ']', '^', '_', '`',
                        'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l',
                        'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x',
                        'y', 'z',
                        '{', '|', '}', '~', PADDING_TOKEN, END_TOKEN]

In [None]:
'క' + 'ిి'

'కిి'

In [None]:
index_to_telugu = {k:v for k,v in enumerate(telugu_vocabulary)}
telugu_to_index = {v:k for k,v in enumerate(telugu_vocabulary)}
index_to_english = {k:v for k,v in enumerate(english_vocabulary)}
english_to_index = {v:k for k,v in enumerate(english_vocabulary)}

In [None]:
def read_lines_from_file(file_path, total_sentences):
    dataset = tf.data.TextLineDataset(file_path)
    dataset = dataset.take(total_sentences)
    return [line.numpy().decode('utf-8').strip() for line in dataset]

TOTAL_SENTENCES = 10000 # Limit Number of sentences to scan from file
                #can be increase upto 2 million
# Read and process English sentences
english_sentences = read_lines_from_file(english_file, TOTAL_SENTENCES)  #scanning all columns and desired number of rows
english_sentences = [sentence.lower() for sentence in english_sentences]

# Read and process telugu sentences
telugu_sentences = read_lines_from_file(telugu_file, TOTAL_SENTENCES)  #scanning all columns and desired number of rows

In [None]:
english_sentences[:10] #all cols and first 10 rows i..e, first 10 english sentences from dataset

['rise again.',
 'how do we glorify jehovahs undeserved kindness?',
 'india also continues to push back economically.',
 'i remember my childhood days.',
 'all transactions are made online.',
 'i love night shoots in my city.',
 'three members of a family were killed in the incident.',
 'the film is directed by v v vinayak.',
 'the hyderabad weather office forecast more rains.',
 'we are family!']

In [None]:
telugu_sentences[:10] #all cols and first 10 rows i..e, first 10 telugu sentences from dataset

['మళ్లీ ఉదయిస్తాడు.',
 'యెహోవా కృపను మనమెలా మహిమపరచవచ్చు?',
 'ఆర్థికంగా కూడా భారత్\u200c వే గంగా పయనిస్తున్నది.',
 '‘విద్యార్థులను చూస్తుంటే నాకు చిన్నప్పటి రోజులు గుర్తుకొస్తున్నాయి.',
 'ఆర్థిక లావాదేవీలన్నీ ఆన్\u200cలైన్\u200cలోనే',
 'పద్మినీ స్త్రీ రాత్రి వేళల్లో రతికి ఇష్టపడదు.',
 'ఓకే కుటుంబానికి చెందిన ముగ్గురు మృతిచెందటంతో ఈ సంఘటన కలవరపర్చింది.',
 'ఆ సినిమాకు వి. వి. వినాయక్\u200c దర్శకత్వం వహించనున్నారు.',
 'హైదరాబాద్\u200cకు మరో భారీ వర్షం సూచనలున్నాయని వాతావరణ శాఖ హెచ్చరించింది.',
 'మనం అంటున్న మమ్ముట్టి ఫ్యామిలీ !']

In [None]:
import numpy as np
PERCENTILE = 97
print( f"{PERCENTILE}th percentile length telugu: {np.percentile([len(x) for x in telugu_sentences], PERCENTILE)}" )
print( f"{PERCENTILE}th percentile length English: {np.percentile([len(x) for x in english_sentences], PERCENTILE)}" )

97th percentile length telugu: 168.0
97th percentile length English: 172.02999999999884


In [None]:
max_sequence_length = 200

def is_valid_tokens(sentence, vocab):
    for token in list(set(sentence)):
        if token not in vocab:
            return False
    return True

def is_valid_length(sentence, max_sequence_length):
    return len(list(sentence)) < (max_sequence_length - 1) # need to re-add the end token so leaving 1 space

valid_sentence_indicies = []
for index in range(len(telugu_sentences)):
    telugu_sentence, english_sentence = telugu_sentences[index], english_sentences[index]
    if is_valid_length(telugu_sentence, max_sequence_length) \
      and is_valid_length(english_sentence, max_sequence_length) \
      and is_valid_tokens(telugu_sentence, telugu_vocabulary):
        valid_sentence_indicies.append(index)

print(f"Number of sentences: {len(telugu_sentences)}")
print(f"Number of valid sentences: {len(valid_sentence_indicies)}")

Number of sentences: 10000
Number of valid sentences: 59


In [None]:
telugu_sentences = [telugu_sentences[i] for i in valid_sentence_indicies]
english_sentences = [english_sentences[i] for i in valid_sentence_indicies]

In [None]:
telugu_sentences[:3] #first 3 telugu valid sentences

['14 కోట్లు సమర్పరణ', '1 అక్టోబర్ 2001.', 'ఆత్మహత్యతో కలకలం']

In [None]:
d_model = 512
batch_size = 3
ffn_hidden = 2048
num_heads = 8
drop_prob = 0.1
num_layers = 3 # number of encoder and decoder block so this is simple and fast
max_sequence_length = 200
te_vocab_size = len(telugu_vocabulary)

transformer = Transformer( d_model,
                          ffn_hidden,
                          num_heads,
                          drop_prob,
                          num_layers,
                          max_sequence_length,
                          te_vocab_size,
                          english_to_index,
                          telugu_to_index,
                          START_TOKEN,
                          END_TOKEN,
                          PADDING_TOKEN
                         )


In [None]:
transformer

<Transformer name=transformer, built=False>

In [None]:
class TextDataset(tf.data.Dataset): #using tensorflow dataset function
    def __new__(cls, english_sentences, telugu_sentences):
        dataset = tf.data.Dataset.from_tensor_slices((english_sentences, telugu_sentences))
        return dataset

dataset = TextDataset(english_sentences, telugu_sentences)

In [None]:
len(dataset)

59

In [None]:
dataset[1]

TypeError: '_TensorSliceDataset' object is not callable

In [None]:
dataset = dataset.batch(batch_size)  # Batching the dataset
iterator = iter(dataset)  # Creating an iterator

In [None]:
for batch_num, batch in enumerate(iterator):
    print(batch)
    if batch_num > 3:
        break

(<tf.Tensor: shape=(3,), dtype=string, numpy=
array([b'rs 14 crore', b'1 apr 2001.', b'suicide by hanging'],
      dtype=object)>, <tf.Tensor: shape=(3,), dtype=string, numpy=
array([b'14 \xe0\xb0\x95\xe0\xb1\x8b\xe0\xb0\x9f\xe0\xb1\x8d\xe0\xb0\xb2\xe0\xb1\x81 \xe0\xb0\xb8\xe0\xb0\xae\xe0\xb0\xb0\xe0\xb1\x8d\xe0\xb0\xaa\xe0\xb0\xb0\xe0\xb0\xa3',
       b'1 \xe0\xb0\x85\xe0\xb0\x95\xe0\xb1\x8d\xe0\xb0\x9f\xe0\xb1\x8b\xe0\xb0\xac\xe0\xb0\xb0\xe0\xb1\x8d 2001.',
       b'\xe0\xb0\x86\xe0\xb0\xa4\xe0\xb1\x8d\xe0\xb0\xae\xe0\xb0\xb9\xe0\xb0\xa4\xe0\xb1\x8d\xe0\xb0\xaf\xe0\xb0\xa4\xe0\xb1\x8b \xe0\xb0\x95\xe0\xb0\xb2\xe0\xb0\x95\xe0\xb0\xb2\xe0\xb0\x82'],
      dtype=object)>)
(<tf.Tensor: shape=(3,), dtype=string, numpy=
array([b'120 crores.', b'12 the bibles viewpoint', b'time is less.'],
      dtype=object)>, <tf.Tensor: shape=(3,), dtype=string, numpy=
array([b'120 \xe0\xb0\x95\xe0\xb1\x8b\xe0\xb0\x9f\xe0\xb1\x8d\xe0\xb0\xb2\xe0\xb1\x81 \xe0\xb0\x85\xe0\xb0\x9f.',
       b'12 \xe0\xb0\x9

In [None]:
from tensorflow.keras import layers, initializers, optimizers

# Define the loss function
padding_token_index = telugu_to_index[PADDING_TOKEN]
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction=tf.keras.losses.Reduction.NONE)

# Initialize the weights of the transformer model
for layer in transformer.layers:
    if hasattr(layer, 'kernel'):
        initializer = initializers.GlorotUniform()
        layer.kernel.assign(initializer(layer.kernel.shape, dtype=layer.kernel.dtype))

# Define the optimizer
optimizer = optimizers.Adam(learning_rate=1e-4)

# Set the device
device = '/GPU:0' if tf.config.list_physical_devices('GPU') else '/CPU:0'

In [None]:
NEG_INFTY = -1e9

def create_masks(eng_batch, kn_batch, max_sequence_length):
    num_sentences = len(eng_batch)

    # Look-ahead mask
    look_ahead_mask = tf.linalg.band_part(tf.ones([max_sequence_length, max_sequence_length]), 0, -1)
    look_ahead_mask = tf.cast(look_ahead_mask == 1, tf.bool)

    # Padding masks
    encoder_padding_mask = tf.fill([num_sentences, max_sequence_length, max_sequence_length], False)
    decoder_padding_mask_self_attention = tf.fill([num_sentences, max_sequence_length, max_sequence_length], False)
    decoder_padding_mask_cross_attention = tf.fill([num_sentences, max_sequence_length, max_sequence_length], False)

    for idx in range(num_sentences):
        eng_sentence_length, kn_sentence_length = len(eng_batch[idx]), len(kn_batch[idx])
        eng_chars_to_padding_mask = np.arange(eng_sentence_length + 1, max_sequence_length)
        kn_chars_to_padding_mask = np.arange(kn_sentence_length + 1, max_sequence_length)

        # Update masks
        encoder_padding_mask = tf.tensor_scatter_nd_update(encoder_padding_mask,
            tf.concat([tf.expand_dims(tf.range(num_sentences), 1), tf.expand_dims(tf.range(max_sequence_length), 1), tf.expand_dims(eng_chars_to_padding_mask, 0)], axis=1),
            tf.ones([num_sentences, max_sequence_length - len(eng_chars_to_padding_mask)], dtype=tf.bool))

        encoder_padding_mask = tf.tensor_scatter_nd_update(encoder_padding_mask,
            tf.concat([tf.expand_dims(tf.range(num_sentences), 1), tf.expand_dims(eng_chars_to_padding_mask, 1), tf.expand_dims(tf.range(max_sequence_length), 0)], axis=1),
            tf.ones([num_sentences, max_sequence_length - len(eng_chars_to_padding_mask)], dtype=tf.bool))

        decoder_padding_mask_self_attention = tf.tensor_scatter_nd_update(decoder_padding_mask_self_attention,
            tf.concat([tf.expand_dims(tf.range(num_sentences), 1), tf.expand_dims(tf.range(max_sequence_length), 1), tf.expand_dims(kn_chars_to_padding_mask, 0)], axis=1),
            tf.ones([num_sentences, max_sequence_length - len(kn_chars_to_padding_mask)], dtype=tf.bool))

        decoder_padding_mask_self_attention = tf.tensor_scatter_nd_update(decoder_padding_mask_self_attention,
            tf.concat([tf.expand_dims(tf.range(num_sentences), 1), tf.expand_dims(kn_chars_to_padding_mask, 1), tf.expand_dims(tf.range(max_sequence_length), 0)], axis=1),
            tf.ones([num_sentences, max_sequence_length - len(kn_chars_to_padding_mask)], dtype=tf.bool))

        decoder_padding_mask_cross_attention = tf.tensor_scatter_nd_update(decoder_padding_mask_cross_attention,
            tf.concat([tf.expand_dims(tf.range(num_sentences), 1), tf.expand_dims(tf.range(max_sequence_length), 1), tf.expand_dims(eng_chars_to_padding_mask, 0)], axis=1),
            tf.ones([num_sentences, max_sequence_length - len(eng_chars_to_padding_mask)], dtype=tf.bool))

        decoder_padding_mask_cross_attention = tf.tensor_scatter_nd_update(decoder_padding_mask_cross_attention,
            tf.concat([tf.expand_dims(tf.range(num_sentences), 1), tf.expand_dims(eng_chars_to_padding_mask, 1), tf.expand_dims(tf.range(max_sequence_length), 0)], axis=1),
            tf.ones([num_sentences, max_sequence_length - len(eng_chars_to_padding_mask)], dtype=tf.bool))

    # Compute masks
    encoder_self_attention_mask = tf.where(encoder_padding_mask, NEG_INFTY, 0)
    decoder_self_attention_mask = tf.where(look_ahead_mask | decoder_padding_mask_self_attention, NEG_INFTY, 0)
    decoder_cross_attention_mask = tf.where(decoder_padding_mask_cross_attention, NEG_INFTY, 0)

    return encoder_self_attention_mask, decoder_self_attention_mask, decoder_cross_attention_mask


In [None]:
num_epochs = 10
total_loss = 0

# Ensure the transformer is on the correct device
# No explicit `to(device)` is needed in TensorFlow as the model should already be on the GPU if available.

# Create an optimizer (equivalent to optim in PyTorch)
optimizer = tf.keras.optimizers.Adam()

@tf.function  # Compile the function for faster execution
def train_step(eng_batch, te_batch):
    with tf.GradientTape() as tape:
        # Creating masks
        encoder_self_attention_mask, decoder_self_attention_mask, decoder_cross_attention_mask = create_masks(eng_batch, kn_batch)

        # Forward pass
        te_predictions = transformer(
            eng_batch,
            te_batch,
            encoder_self_attention_mask,
            decoder_self_attention_mask,
            decoder_cross_attention_mask,
            enc_start_token=False,
            enc_end_token=False,
            dec_start_token=True,
            dec_end_token=True
        )

# Assuming `transformer.decoder.sentence_embedding.batch_tokenize` is equivalent in TensorFlow.
labels = transformer.decoder.sentence_embedding.batch_tokenize(te_batch, start_token=False, end_token=True)

# Flatten the predictions and labels for loss calculation
te_predictions_flat = tf.reshape(te_predictions, [-1, te_vocab_size])
labels_flat = tf.reshape(labels, [-1])

# Compute the loss
loss = tf.keras.losses.sparse_categorical_crossentropy(labels_flat, te_predictions_flat, from_logits=True)

# Create a mask for valid indices (non-padding tokens)
valid_indices = tf.not_equal(labels_flat, telugu_to_index[PADDING_TOKEN])

# Apply the mask and calculate the average loss
masked_loss = tf.reduce_sum(tf.boolean_mask(loss, valid_indices))
mean_loss = masked_loss / tf.reduce_sum(tf.cast(valid_indices, tf.float32))

# Perform backpropagation
with tf.GradientTape() as tape:
    tape.watch(transformer.trainable_variables)
    loss_value = mean_loss

grads = tape.gradient(loss_value, transformer.trainable_variables)
optim.apply_gradients(zip(grads, transformer.trainable_variables))

# Optional: Print the loss and predictions at intervals
if batch_num % 100 == 0:
    print(f"Iteration {batch_num} : {mean_loss.numpy()}")
    print(f"English: {eng_batch[0]}")
    print(f"telugu Translation: {te_batch[0]}")
    te_sentence_predicted = tf.argmax(te_predictions[0], axis=1)

    # Generate the predicted sentence
    predicted_sentence = ""
    for idx in te_sentence_predicted:
        if idx == telugu_to_index[END_TOKEN]:
            break
        predicted_sentence += index_to_telugu[idx.numpy()]

    print(f"telugu Prediction: {predicted_sentence}")

    import tensorflow as tf

# Assuming transformer is your TensorFlow model
transformer.eval()
te_sentence = ("" ,)
eng_sentence = ("should we go to the mall?",)
for word_counter in range(max_sequence_length):
    encoder_self_attention_mask, decoder_self_attention_mask, decoder_cross_attention_mask = create_masks(eng_sentence, te_sentence)

    # Convert masks to TensorFlow tensors if necessary
    encoder_self_attention_mask = tf.convert_to_tensor(encoder_self_attention_mask)
    decoder_self_attention_mask = tf.convert_to_tensor(decoder_self_attention_mask)
    decoder_cross_attention_mask = tf.convert_to_tensor(decoder_cross_attention_mask)

    predictions = transformer(
        eng_sentence,
        te_sentence,
        encoder_self_attention_mask,
        decoder_self_attention_mask,
        decoder_cross_attention_mask,
        enc_start_token=False,
        enc_end_token=False,
        dec_start_token=True,
        dec_end_token=False
    )

    next_token_prob_distribution = predictions[0][word_counter]
    next_token_index = tf.argmax(next_token_prob_distribution).numpy()  # Convert to numpy to get the item
    next_token = index_to_telugu[next_token_index]
    te_sentence = (te_sentence[0] + next_token,)

    if next_token == END_TOKEN:
        break

print(f"Evaluation translation (should we go to the mall?) : {te_sentence}")
print("-------------------------------------------")



NameError: name 'te_batch' is not defined

In [None]:
optimizer = tf.keras.optimizers.Adam()
num_epochs = 10
train_loader = iter(dataset)

for epoch in range(num_epochs):
    print(f"Epoch {epoch}")
    for batch_num, batch in enumerate(train_loader):
        eng_batch, kn_batch = batch
        encoder_self_attention_mask, decoder_self_attention_mask, decoder_cross_attention_mask = create_masks(eng_batch, kn_batch, max_sequence_length)

        with tf.GradientTape() as tape:
            kn_predictions = transformer(
                eng_batch,
                kn_batch,
                encoder_self_attention_mask,
                decoder_self_attention_mask,
                decoder_cross_attention_mask,
                enc_start_token=False,
                enc_end_token=False,
                dec_start_token=True,
                dec_end_token=True
            )

            labels = transformer.decoder.sentence_embedding.batch_tokenize(kn_batch, start_token=False, end_token=True)

            # Reshape for loss calculation
            kn_predictions = tf.reshape(kn_predictions, (-1, kn_vocab_size))
            labels = tf.reshape(labels, (-1))

            valid_indices = tf.where(labels != telugu_to_index[PADDING_TOKEN])
            valid_kn_predictions = tf.gather(kn_predictions, valid_indices)
            valid_labels = tf.gather(labels, valid_indices)

            loss = tf.reduce_sum(tf.nn.sparse_softmax_cross_entropy_with_logits(logits=valid_kn_predictions, labels=valid_labels))
            loss /= tf.cast(tf.size(valid_labels), tf.float32)
            gradients = tape.gradient(loss, transformer.trainable_variables)
            optimizer.apply_gradients(zip(gradients, transformer.trainable_variables))
            if batch_num % 100 == 0:
              print(f"Iteration {batch_num} : {loss.item()}")
              print(f"English: {eng_batch[0]}")
              print(f"telugu Translation: {kn_batch[0]}")
              kn_sentence_predicted = torch.argmax(kn_predictions[0], axis=1)
              predicted_sentence = ""
              for idx in kn_sentence_predicted:
                if idx == telugu_to_index[END_TOKEN]:
                  break
                predicted_sentence += index_to_telugu[idx.item()]
              print(f"telugu Prediction: {predicted_sentence}")

              transformer.eval()
              kn_sentence = ("",)
              eng_sentence = ("should we go to the mall?",)

              for word_counter in range(max_sequence_length):
                 encoder_self_attention_mask, decoder_self_attention_mask, decoder_cross_attention_mask = create_masks(eng_sentence, kn_sentence)

                 # Convert masks to tensors if they aren't already
                 encoder_self_attention_mask = tf.convert_to_tensor(encoder_self_attention_mask)
                 decoder_self_attention_mask = tf.convert_to_tensor(decoder_self_attention_mask)
                 decoder_cross_attention_mask = tf.convert_to_tensor(decoder_cross_attention_mask)

                 predictions = transformer(
                                           eng_sentence,
                                           kn_sentence,
                                           encoder_self_attention_mask,
                                           decoder_self_attention_mask,
                                           decoder_cross_attention_mask,
                                           enc_start_token=False,
                                           enc_end_token=False,
                                           dec_start_token=True,
                                           dec_end_token=False
                                          )

                 # Get the predicted probabilities for the next token
                 next_token_prob_distribution = predictions[0][word_counter]

                 # Find the index of the token with the highest probability
                 next_token_index = tf.argmax(next_token_prob_distribution).numpy()  # Convert to numpy to get the integer value

                 # Get the actual token from the index
                 next_token = index_to_telugu[next_token_index]

                 # Update the telugu sentence
                 kn_sentence = (kn_sentence[0] + next_token,)

                # Break if the end token is reached
                 if next_token == END_TOKEN:
                    break

            print(f"Evaluation translation (should we go to the mall?) : {kn_sentence}")
            print("-------------------------------------------")

Epoch 0


TypeError: Scalar tensor has no `len()`

Inference

In [None]:

def translate(eng_sentence):
    eng_sentence = (eng_sentence,)
    kn_sentence = ("",)

    for word_counter in range(max_sequence_length):
        encoder_self_attention_mask, decoder_self_attention_mask, decoder_cross_attention_mask = create_masks(eng_sentence, kn_sentence)

        predictions = transformer(
            eng_sentence,
            kn_sentence,
            encoder_self_attention_mask,
            decoder_self_attention_mask,
            decoder_cross_attention_mask,
            enc_start_token=False,
            enc_end_token=False,
            dec_start_token=True,
            dec_end_token=False,
            training=False  # Make sure the model is in inference mode
        )

        next_token_prob_distribution = predictions[0][word_counter]
        next_token_index = tf.argmax(next_token_prob_distribution).numpy()
        next_token = index_to_telugu[next_token_index]
        kn_sentence = (kn_sentence[0] + next_token,)

        if next_token == END_TOKEN:
            break

    return kn_sentence[0]

In [None]:
translation = translate("what should we do when the day starts?")
print(translation)

In [None]:
translation = translate("how is this the truth?")
print(translation)

Insights:  ->
When training, we can treat every alphabet as a single unit instead of splitting it into it's corresponding parts to preserve meaning. For example, ಮಾ should be 1 unit when comuting a loss. It should not be decomposed into ಮ + ఆ
Using word-based or BPE based tokenizations may help mitigate (1). Also, we will get valid word (or BPE) units if we do so.
Make sure the training set has a large variety of sentences that are not just about one topic like "work" and "government"
Increase the number of encoder / decoder units for better translations. It was set to the minimum of 1 of each unit here.
