In [1]:
import tensorflow as tf

text_file = tf.keras.utils.get_file(
    fname='fra-eng.zip',
    origin="http://storage.googleapis.com/download.tensorflow.org/data/fra-eng.zip",
    extract=True
)

Downloading data from http://storage.googleapis.com/download.tensorflow.org/data/fra-eng.zip
[1m3423204/3423204[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 1us/step


In [2]:
import pathlib


text_file = pathlib.Path(text_file).parent / 'fra.txt'

In [3]:
print(text_file)

C:\Users\gupta\.keras\datasets\fra.txt


In [10]:
with open('fra.txt') as fp:
    text_pair = [line for line in fp]

In [12]:
import random
for _ in range(5):
    print(random.choice(text_pair))

It looks amazing.	Cela semble merveilleux.

I ate potato chips.	J'ai mangÃ© des chips.

I don't know who that youngster is.	Je ne sais pas qui est ce jeune homme.

Take a walk every day.	PromÃ¨ne-toi chaque jour.

He stayed home from school because he wasn't feeling well.	Il resta chez lui plutÃ´t que d'aller Ã  l'Ã©cole parce qu'il ne se sentait pas bien.



# Data Preprocessing

In [13]:
import unicodedata
import re 

def normalize(line):
    line = unicodedata.normalize("NFKC" , line.strip().lower())
    line = re.sub(r"^([^ \w])(?!\s)", r"\1", line)
    line = re.sub(r"(\s[^ \w])(?!\s)", r"\1", line)
    line = re.sub(r"(?!\s)([^ \w])$", r"\1", line)
    line = re.sub(r"(?!\s)([^ \w]\s)", r"\1", line)
    eng, fre = line.split("\t")
    fre = '[start] ' + fre + ' [end]' # french
    return eng, fre



In [14]:
with open('fra.txt') as fp:
    text_pairs = [normalize(line) for line in fp]

In [16]:
import random
for _ in range(5):
    print(random.choice(text_pairs))

("where's my ring?", '[start] oã1 est ma bagueâ ? [end]')
('tom forgot to do his homework.', '[start] tom a oubliã© de faire ses devoirs. [end]')
("that's all tom did.", "[start] c'est tout ce que tom a fait. [end]")
('i felt left out.', '[start] je me sentis exclu. [end]')
('i want to go over a few things with you.', '[start] je veux parcourir quelques trucs avec toi. [end]')


# Tokenization and Statistics

In [17]:
eng_tokens, fre_tokens = set(), set()
eng_maxlen, fre_maxlen = 0, 0

for eng , fre in text_pairs:
    eng_token, fre_token = eng.split(), fre.split()
    eng_maxlen = max(eng_maxlen, len(eng_token))
    fre_maxlen = max(fre_maxlen, len(fre_token))
    eng_tokens.update(eng_token)
    fre_tokens.update(fre_token)

# Print statistics
print(f"Total tokens in English: {len(eng_tokens)}")
print(f"Total tokens in French: {len(fre_tokens)}")
print(f"Maximum length of English sequence: {eng_maxlen}")
print(f"Maximum length of French sequence: {fre_maxlen}")

Total tokens in English: 25380
Total tokens in French: 44866
Maximum length of English sequence: 47
Maximum length of French sequence: 56


# Data Serialization

In [18]:
import pickle

# Serialize preprocessed data for future use
with open("text_pairs.pickle", 'wb') as fp:
    pickle.dump(text_pairs, fp)

# Embedding Layer

In [19]:
import tensorflow as tf
from tensorflow.keras.layers import TextVectorization
import pickle
import random


with open("text_pairs.pickle" , 'rb') as fp:
    text_pairs = pickle.load(fp)

In [41]:
random.shuffle(text_pairs)

n_val = int(.15*len(text_pairs))
n_train = len(text_pairs) - 2*n_val
train_pair = text_pairs[:n_train]
test_pair = text_pairs[n_train:n_train+n_val]

In [42]:
vocab_en = 10000
vocab_fr = 20000
seq_length = 25

In [43]:
# Initialize TextVectorization layers
eng_vect = TextVectorization(
    max_tokens=vocab_en,
    standardize=None,
    split='whitespace',
    output_mode='int',
    output_sequence_length=seq_length
)

In [44]:
fre_vect = TextVectorization(
    max_tokens=vocab_fr,
    standardize=None,
    split='whitespace',
    output_mode='int',
    output_sequence_length=seq_length + 1  # +1 for start token
)

In [45]:
# Adapt TextVectorization layers to training data
train_eng = [pair[0] for pair in train_pair]
train_fre = [pair[1] for pair in train_pair]

In [46]:
eng_vect.adapt(train_eng)
fre_vect.adapt(train_fre)

In [47]:
# Serialize the vectorization layers and training/test data
with open('vectorize.pickle', 'wb') as fp:
    data = {'train': train_pair,
            'test': test_pair,
            'eng_vect': eng_vect.get_config(),
            'fre_vect': fre_vect.get_config(),
            'eng_weights': eng_vect.get_weights(),
            'fre_weights': fre_vect.get_weights()
            }
    pickle.dump(data, fp)


In [48]:
# Load serialized data
with open("vectorize.pickle", 'rb') as fp:
    data = pickle.load(fp)

# Retrieve train and test pairs
train_pair = data['train']
test_pair = data['test']

# Reconstruct TextVectorization layers
eng_vect = TextVectorization.from_config(data['eng_vect'])
eng_vect.set_weights(data['eng_weights'])
fre_vect = TextVectorization.from_config(data['fre_vect'])
fre_vect.set_weights(data['fre_weights'])

In [49]:
# Define function to format dataset
def format_dataset(eng, fre):
    eng = eng_vect(eng)
    fre = fre_vect(fre)
    source = {'encode_inp': eng,
              'decode_inp': fre[:, :-1]
              }
    target = fre[:, 1:]
    return (source, target)

# Define function to create dataset
def make_dataset(pairs, batchsize=64):
    eng_text, fre_text = zip(*pairs)
    dataset = tf.data.Dataset.from_tensor_slices((list(eng_text), list(fre_text)))
    return dataset.shuffle(2048).batch(batchsize).map(format_dataset).prefetch(16).cache()



In [50]:
# Create TensorFlow datasets for training and testing
train_ds = make_dataset(train_pair)


In [51]:
test_ds = make_dataset(test_pair)

# Positional Embedding

In [55]:
import numpy as np
import tensorflow as tf

# Function to generate positional encoding matrix
def pos_enc_matrix(L, d, n=10000):
    assert d % 2 == 0
    d2 = d // 2

    P = np.zeros((L, d))
    k = np.arange(L).reshape(-1, 1)
    i = np.arange(d2).reshape(1, -1)

    denom = np.power(n, -i / d2)
    args = k * denom

    P[:, ::2] = np.sin(args)
    P[:, 1::2] = np.cos(args)
    return P

# Custom Keras layer for positional embedding
class PositionalEmbedding(tf.keras.layers.Layer):

    def __init__(self, seq_length, vocab_size, embed_dim, **kwargs):
        super().__init__(**kwargs)
        self.seq_length = seq_length
        self.vocab_size = vocab_size
        self.embed_dim = embed_dim

        self.token_embeddings = tf.keras.layers.Embedding(
            input_dim=vocab_size, output_dim=embed_dim, mask_zero=True
        )
        matrix = pos_enc_matrix(seq_length, embed_dim)
        self.positional_embedding = tf.constant(matrix, dtype=tf.float32)

    def call(self, inputs):
        embedded_tokens = self.token_embeddings(inputs)
        return embedded_tokens + self.positional_embedding

    def compute_mask(self, inputs, mask=None):
        return self.token_embeddings.compute_mask(inputs, mask)

    def get_config(self):
        config = super().get_config()
        config.update({
            "seq_length": self.seq_length,
            "vocab_size": self.vocab_size,
            "embed_dim": self.embed_dim
        })
        return config

vocab_size = 10000
seq_length = 25
embed_dim = 512

# Create a text vectorization layer
text_vectorization = tf.keras.layers.TextVectorization(
    max_tokens=vocab_size, output_mode="int", output_sequence_length=seq_length
)

# Simulate raw text data (replace with actual dataset)
raw_text_data = tf.constant(["This is a test sentence", "Another example sentence"])

# Adapt the vectorization layer to the dataset
text_vectorization.adapt(raw_text_data)
# Simulate a dataset (replace with real dataset)
train_data = raw_text_data  # Replace with actual text dataset
train_ds = tf.data.Dataset.from_tensor_slices(train_data)
train_ds = train_ds.map(lambda x: {"encode_inp": text_vectorization(x), "target": x})
train_ds = train_ds.batch(2)

for inputs in train_ds.take(1):
    embed_en = PositionalEmbedding(seq_length, vocab_size, embed_dim=embed_dim)
    en_emb = embed_en(inputs["encode_inp"])
    mask = embed_en.compute_mask(inputs["encode_inp"])
    print("Mask:", mask)


Mask: tf.Tensor(
[[ True  True  True  True  True False False False False False False False
  False False False False False False False False False False False False
  False]
 [ True  True  True False False False False False False False False False
  False False False False False False False False False False False False
  False]], shape=(2, 25), dtype=bool)


# Self-Attention Layer

In [56]:
import tensorflow as tf

def self_attention(input_shape, prefix='att', mask=False, **kwargs):
    # Define inputs
    inputs = tf.keras.layers.Input(shape=input_shape, dtype='float32', name=f"{prefix}_in1")

    # Multi-head attention layer
    attention = tf.keras.layers.MultiHeadAttention(name=f"{prefix}_att1", **kwargs)
    norm = tf.keras.layers.LayerNormalization(name=f'{prefix}_norm1')
    add = tf.keras.layers.Add(name=f'{prefix}_add1')

    # Apply attention mechanism
    attout = attention(query=inputs, value=inputs, key=inputs, use_causal_mask=mask)

    # Apply normalization and residual connection
    output = norm(add([inputs, attout]))

    # Create the model
    model = tf.keras.Model(inputs=inputs, outputs=output, name=f"{prefix}_att")

    return model

# Cross-Attention Layer

In [57]:
def cross_attention(input_shape, context_shape, prefix='att', **kwargs):
    # Define inputs
    context = tf.keras.layers.Input(shape=context_shape, dtype='float32', name=f"{prefix}_ctx2")
    inputs = tf.keras.layers.Input(shape=input_shape, dtype='float32', name=f'{prefix}_in2')

    # Multi-head attention layer
    attention = tf.keras.layers.MultiHeadAttention(name=f'{prefix}_att2', **kwargs)
    norm = tf.keras.layers.LayerNormalization(name=f'{prefix}_norm2')
    add = tf.keras.layers.Add(name=f'{prefix}_add2')

    # Apply attention mechanism
    attout = attention(query=inputs, key=context, value=context)

    # Apply normalization and residual connection
    output = norm(add([attout, inputs]))

    # Create the model
    model = tf.keras.Model(inputs=[context, inputs], outputs=output, name=f'{prefix}_crs_at')

    return model

# Feed-Forward Layer

In [58]:
def feed_forward(input_shape, model_dim, ff_dim, dropout=.1, prefix='ff'):
    # Define inputs
    inputs = tf.keras.layers.Input(shape=input_shape, dtype='float32', name=f'{prefix}_in3')

    # Dense layers
    dense1 = tf.keras.layers.Dense(ff_dim, name=f'{prefix}_ff1', activation='relu')
    dense2 = tf.keras.layers.Dense(model_dim, name=f'{prefix}_ff2')
    drop = tf.keras.layers.Dropout(dropout, name=f'{prefix}_drop')
    add = tf.keras.layers.Add(name=f"{prefix}_add3")

    # Apply feed-forward transformation
    ffout = drop(dense2(dense1(inputs)))

    # Layer normalization and residual connection
    norm = tf.keras.layers.LayerNormalization(name=f'{prefix}_norm3')
    output = norm(add([inputs, ffout]))

    # Create the model
    model = tf.keras.Model(inputs=inputs, outputs=output, name=f'{prefix}_ff')

    return model

# Encoder Layer

In [59]:
import tensorflow as tf

def encoder(input_shape, key_dim, ff_dim, dropout=0.1, prefix='enc', **kwargs):
    # Define a Sequential model for the encoder
    model = tf.keras.models.Sequential([
        tf.keras.layers.Input(shape=input_shape, dtype='float32', name=f'{prefix}_in0'), # Input layer
        self_attention(input_shape, prefix=prefix, key_dim=key_dim, mask=False, **kwargs), # Self-attention layer
        feed_forward(input_shape, key_dim, ff_dim, dropout, prefix) # Feed-forward layer
    ])

    return model

In [68]:
seq_length = 25
key_dim = 128
ff_dim = 512
num_heads = 8

model = encoder(input_shape = (seq_length,key_dim),key_dim = key_dim , ff_dim = ff_dim , num_heads = num_heads)

tf.keras.utils.plot_model(model)

You must install graphviz (see instructions at https://graphviz.gitlab.io/download/) for `plot_model` to work.


# Decoder Layer

In [60]:
def decoder(input_shape, key_dim, ff_dim, dropout=0.1, prefix='dec', **kwargs):
    # Define inputs for decoder
    inputs = tf.keras.layers.Input(shape=input_shape, dtype='float32', name=f'{prefix}_in0')
    context = tf.keras.layers.Input(shape=input_shape, dtype='float32', name=f'{prefix}_ctx0')
    
    # Self-attention and cross-attention layers
    att_model = self_attention(input_shape, key_dim=key_dim, mask=True, prefix=prefix, **kwargs)
    cross_model = cross_attention(input_shape, input_shape, key_dim=key_dim, prefix=prefix, **kwargs)
    
    # Feed-forward layer
    ff_model = feed_forward(input_shape, key_dim, ff_dim, dropout, prefix)

    # Connect layers
    x = att_model(inputs)
    x = cross_model([context, x])
    output = ff_model(x)

    # Define decoder model
    model = tf.keras.Model(inputs=[inputs, context], outputs=output, name=prefix)

    return model

In [70]:
seq_length = 25
key_dim = 128
ff_dim = 512
num_heads = 8

model =decoder(input_shape = (seq_length,key_dim),key_dim = key_dim , ff_dim = ff_dim , num_heads = num_heads)

tf.keras.utils.plot_model(model)

You must install graphviz (see instructions at https://graphviz.gitlab.io/download/) for `plot_model` to work.


# Transformer Model

In [72]:
def transformer(num_layers, num_heads, seq_length, key_dim, ff_dim, vocab_size_en, vocab_size_fr, dropout=0.1, name='transformer'):
    # Define encoder and decoder inputs
    input_enc = tf.keras.layers.Input(shape=(seq_length,), dtype='int32', name='encode_inp')
    input_dec = tf.keras.layers.Input(shape=(seq_length,), dtype='int32', name='decode_inp')


    # Positional embeddings for encoder and decoder inputs
    emb_enc = PositionalEmbedding(seq_length, vocab_size_en, key_dim, name='embed_enc')
    emb_dec = PositionalEmbedding(seq_length, vocab_size_fr, key_dim, name='embed_dec')

    # Create encoder and decoder layers
    encoders = [encoder(input_shape=(seq_length, key_dim), key_dim=key_dim, ff_dim=ff_dim, dropout=dropout, prefix=f"enc{i}", num_heads=num_heads)
                for i in range(num_layers)]
    decoders = [decoder(input_shape=(seq_length, key_dim), key_dim=key_dim, ff_dim=ff_dim, dropout=dropout, prefix=f"dec{i}", num_heads=num_heads)
                for i in range(num_layers)]

    # Final dense layer
    final = tf.keras.layers.Dense(vocab_size_fr, name='linear')

    # Apply encoder and decoder layers to inputs
    x1 = emb_enc(input_enc)
    x2 = emb_dec(input_dec)
    for layer in encoders:
        x1 = layer(x1)
    for layer in decoders:
        x2 = layer([x2, x1])

    # Generate output
    output = final(x2)

    try:
        del output.keras_mask
    except:
        pass

    # Define transformer model
    model = tf.keras.Model(inputs=[input_enc, input_dec], outputs=output, name=name)

    return model

In [73]:
seq_length = 25
num_layers = 4
num_heads = 8
key_dim = 128
ff_dim = 512
dropout = .1
vocab_size_en = 10000
vocab_size_fr = 20000

model = transformer(num_layers, num_heads,seq_length,key_dim,ff_dim,vocab_size_en,vocab_size_fr,dropout)






In [75]:
model.compile(loss = 'sparse_categorical_cross_entropy' , optimizer = 'adam', metrics = ['accuracy'])
model.summary()

# Custom Learning Rate Schedule

In [81]:
import tensorflow as tf

class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):

  def __init__(self, key_dim, warmup_steps=40000):
    super().__init__()
    self.key_dim = key_dim
    self.warmup_steps = warmup_steps
    self.d = tf.cast(self.key_dim, tf.float32)

  def __call__(self, step):
    # Convert step to float32
    step = tf.cast(step, dtype=tf.float32)
    # Calculate learning rate schedule
    arg1 = tf.math.rsqrt(step)
    arg2 = step * (self.warmup_steps ** -1.5)
    return tf.math.rsqrt(self.d) * tf.math.minimum(arg1, arg2)

  def get_config(self):
    # Configuration for serialization
    config ={
      "key_dim": self.key_dim,
      "warmup_steps": self.warmup_steps
    }
    return config

# Define key dimension and create learning rate schedule
key_dim = 128
lr = CustomSchedule(key_dim)
optimizer = tf.keras.optimizers.Adam(lr,beta_1 = .9 , beta_2 = .98 , epsilon = 1e-9)

# Masked Loss Function

In [84]:
def masked_loss(label, pred):
  # Create mask for non-padded tokens
  mask = label != 0

  # Sparse categorical cross-entropy loss
  loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True, reduction='none'
  )
  loss = loss_object(label, pred)

  # Apply mask to loss
  mask = tf.cast(mask, dtype=loss.dtype)
  loss *= mask

  # Compute average loss
  loss = tf.reduce_sum(loss) / tf.reduce_sum(mask)
  return loss

# Masked Accuracy Metric

In [85]:
def mask_accuracy(label, pred):
  # Convert predictions to class labels
  pred = tf.argmax(pred, axis=2)
  label = tf.cast(label, pred.dtype)

  # Calculate match between labels and predictions
  match = label == pred

  # Apply mask to match
  mask = label != 0
  match = match & mask

  # Compute accuracy
  match = tf.cast(match, dtype=tf.float32)
  mask = tf.cast(mask, dtype=tf.float32)
  return tf.reduce_sum(match) / tf.reduce_sum(mask)

# Compile and Train the Model

In [89]:
# Compile the model with custom loss and metrics
model.compile(loss=masked_loss, optimizer=optimizer, metrics=[mask_accuracy])
model.summary()


In [None]:
# Train the model  
# this take lots of time
history = model.fit(train_ds, epochs=20, validation_data=test_ds)

# Visualizing Training History

In [None]:
import matplotlib.pyplot as plt

# Visualizing Training History
fig, axs = plt.subplots(2, figsize=(6, 8), sharex=True)
fig.suptitle('Training history')
x = list(range(1, 21))  # Assuming 20 epochs
axs[0].plot(x, history.history["loss"], alpha=0.5, label="loss")
axs[0].plot(x, history.history["val_loss"], alpha=0.5, label="val_loss")
axs[0].set_ylabel("Loss")
axs[0].legend(loc="upper right")
axs[1].plot(x, history.history["masked_accuracy"], alpha=0.5, label="mask_accuracy")
axs[1].plot(x, history.history["val_masked_accuracy"], alpha=0.5, label="val_mask_accuracy")
axs[1].set_ylabel("Accuracy")
axs[1].set_xlabel("Epoch")
axs[1].legend(loc="lower right")
plt.show()

# Model Testing

In [93]:
def translate(sentence):
    # Encode input sentence
    enc_tokens = eng_vect([sentence])
    lookup = list(fra_vect.get_vocabulary())
    start_sent, end_sent = "[start]", "[end]"
    output_sent = [start_sent]
    for i in range(seq_length):
        # Prepare decoder input
        vector = fra_vect([" ".join(output_sent)])
        assert vector.shape == (1, seq_length + 1)
        dec_tokens = vector[:, :-1]
        assert dec_tokens.shape == (1, seq_length)
        # Generate predictions
        pred = model([enc_tokens, dec_tokens])
        assert pred.shape == (1, seq_len, vocab_size_fr)
        # Decode predicted token
        word = lookup[np.argmax(pred[0, i, :])]
        output_sent.append(word)
        if word == end_sent:
            break
    return output_sent



#### Test the model on sample test cases

In [None]:
test_count = 20
for n in range(test_count):
    eng_sent, fre_sent = random.choice(test_pairs)
    translated = translate(eng_sent)
    print(f"Test case: {n}")
    print(f"English sentence: {eng_sent}")
    print(f"Translated sentence: {' '.join(translated)}")
    print(f"French sentence: {fre_sent}")
    print()