In [3]:
# !wget https://object.pouta.csc.fi/Tatoeba-Challenge-v2021-08-07/eng-swa.tar
# !tar -xvf eng-swa.tar
# !pip install datasets

In [4]:
# from datasets import load_dataset

# swa_dataset = load_dataset("swahili_news")


In [11]:

# eng_dataset=load_dataset("english_news")

In [12]:
# print(dataset)

In [13]:
# for i in range(5):
#   print(dataset["train"]["text"][i])


In [5]:

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

In [9]:
# separate translation pairs from a CSV file into two separate files with the translation sentence pairs
import csv

# Input CSV file and output file paths
input_csv_file = 'ensw.csv'
output_english_file = 'english_sentences.txt'
output_swahili_file = 'swahili_sentences.txt'

# Open the CSV file for reading with the correct encoding
with open(input_csv_file, 'r', newline='', encoding='utf-8') as csv_file:
    reader = csv.reader(csv_file)
    
    # Open the output files for writing with UTF-8 encoding
    with open(output_english_file, 'w', encoding='utf-8') as english_output_file, open(output_swahili_file, 'w', encoding='utf-8') as swahili_output_file:
        for row in reader:
            if len(row) >= 2:
                english_sentence = row[0]
                swahili_sentence = row[1]
                
                # Write the sentences to the respective output files
                english_output_file.write(english_sentence + '\n')
                swahili_output_file.write(swahili_sentence + '\n')

# Close the output files
english_output_file.close()
swahili_output_file.close()

print("Translation pairs separated and saved to files.")


Translation pairs separated and saved to files.


In [10]:
import re
import random
data_path = "eng.txt"
data_path2 = "swa.txt"
# Defining lines as a list of each line
with open(data_path, 'r', encoding='utf-8') as f:
  lines = f.read().strip().split('\n')
with open(data_path2, 'r', encoding='utf-8') as f:
  lines2 = f.read().strip().split('\n')

lines = [" ".join(re.findall(r"[A-Za-z0-9]+",line)) for line in lines]
lines2 = [" ".join(re.findall(r"[A-Za-z0-9]+",line)) for line in lines2]

In [11]:
# Grouping lines by response pair
pairs = list(zip(lines,lines2))
# pairs = pairs[:3000]
random.shuffle(pairs)
print(len(pairs))

2446


In [13]:
print(pairs[273])

('This week I ve watched foreign car action movies with subtitles for three days in a row', 'Wiki hii nimetazama sinema za vitendo vya gari la kigeni na nukuu kwa siku tatu mfululizo')


# printing random pairs


In [14]:
import random
print(random.choice(pairs))

('It is Mrs Lee Susan s mother in London', 'Ni Bibi Lee mama ya Susan Uingereza')


In [15]:
import random
random.shuffle(pairs)
num_val_samples = int(0.15 * len(pairs))
num_train_samples = len(pairs) - 2 * num_val_samples
train_pairs = pairs[:num_train_samples]
val_pairs = pairs[num_train_samples:num_train_samples + num_val_samples]
test_pairs =pairs[num_train_samples + num_val_samples:]


# Vectorizing the English and Swahili text pairs


In [16]:

import tensorflow as tf
import string
import re
from tensorflow import keras
from tensorflow.keras import layers

strip_chars = string.punctuation + "¿"
strip_chars = strip_chars.replace("[", "")
strip_chars = strip_chars.replace("]", "")

def custom_standardization(input_string):
    lowercase = tf.strings.lower(input_string)
    return tf.strings.regex_replace(
        lowercase, f"[{re.escape(strip_chars)}]", "")

vocab_size = 15000
sequence_length = 20

source_vectorization = layers.TextVectorization(
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=sequence_length,
)
target_vectorization = layers.TextVectorization(
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=sequence_length + 1,
    standardize=custom_standardization,
)
train_english_texts = [pair[0] for pair in train_pairs]
train_swahili_texts = [pair[1] for pair in train_pairs]
source_vectorization.adapt(train_english_texts)
target_vectorization.adapt(train_swahili_texts)


# Preparing datasets for the translation task


In [18]:

batch_size = 512

def format_dataset(eng, swa):
    eng = source_vectorization(eng)
    swa = target_vectorization(swa)
    return ({
        "english": eng,
        "swahili": swa[:, :-1],
    }, swa[:, 1:])

def make_dataset(pairs):
    eng_texts, swa_texts = zip(*pairs)
    eng_texts = list(eng_texts)
    swa_texts = list(swa_texts)
    dataset = tf.data.Dataset.from_tensor_slices((eng_texts, swa_texts))
    dataset = dataset.batch(batch_size)
    dataset = dataset.map(format_dataset, num_parallel_calls=4)
    return dataset.shuffle(2048).prefetch(16).cache()

train_ds = make_dataset(train_pairs)
val_ds = make_dataset(val_pairs)

In [19]:
for inputs, targets in train_ds.take(1):
    print(f"inputs['english'].shape: {inputs['english'].shape}")
    print(f"inputs['swahili'].shape: {inputs['swahili'].shape}")
    print(f"targets.shape: {targets.shape}")

inputs['english'].shape: (512, 20)
inputs['swahili'].shape: (512, 20)
targets.shape: (512, 20)


# Sequence-to-sequence learning with RNNs
 GRU-based encoder

In [10]:
# Sequence-to-sequence learning with RNNs
# GRU-based encoder

from tensorflow import keras
from tensorflow.keras import layers

embed_dim = 256
latent_dim = 1024

source = keras.Input(shape=(None,), dtype="int64", name="english")
x = layers.Embedding(vocab_size, embed_dim, mask_zero=True)(source)
encoded_source = layers.Bidirectional(
    layers.GRU(latent_dim), merge_mode="sum")(x)

# GRU-based decoder and the end-to-end model


In [11]:

past_target = keras.Input(shape=(None,), dtype="int64", name="swahili")
x = layers.Embedding(vocab_size, embed_dim, mask_zero=True)(past_target)
decoder_gru = layers.GRU(latent_dim, return_sequences=True)
x = decoder_gru(x, initial_state=encoded_source)
x = layers.Dropout(0.5)(x)
target_next_step = layers.Dense(vocab_size, activation="softmax")(x)
seq2seq_rnn = keras.Model([source, past_target], target_next_step)

# Training our recurrent sequence-to-sequence model


In [12]:

seq2seq_rnn.compile(
    optimizer="rmsprop",
    loss="sparse_categorical_crossentropy",
    metrics=["accuracy"])
# seq2seq_rnn.fit(train_ds, epochs=15, validation_data=val_ds)
history=seq2seq_rnn.fit(train_ds, epochs=50, validation_data=val_ds)


Epoch 1/50
 13/314 [>.............................] - ETA: 5:44:29 - loss: 9.4535 - accuracy: 0.1559

# Translating new sentences with our RNN encoder and decoder


In [None]:

import numpy as np
swa_vocab = target_vectorization.get_vocabulary()
swa_index_lookup = dict(zip(range(len(swa_vocab)), swa_vocab))
max_decoded_sentence_length = 20

def decode_sequence(input_sentence):
    tokenized_input_sentence = source_vectorization([input_sentence])
    decoded_sentence = "[start]"
    for i in range(max_decoded_sentence_length):
        tokenized_target_sentence = target_vectorization([decoded_sentence])
        next_token_predictions = seq2seq_rnn.predict(
            [tokenized_input_sentence, tokenized_target_sentence])
        sampled_token_index = np.argmax(next_token_predictions[0, i, :])
        sampled_token = swa_index_lookup[sampled_token_index]
        decoded_sentence += " " + sampled_token
        if sampled_token == "[end]":
            break
    return decoded_sentence

test_eng_texts = [pair[0] for pair in pairs]
for _ in range(20):
    input_sentence = random.choice(test_eng_texts)
    print("-")
    print(input_sentence)
    print(decode_sequence(input_sentence))

Visualization of accuracy and validation

In [None]:
import matplotlib.pyplot as plt

%matplotlib inline
import matplotlib.pyplot as plt
acc = history.history['accuracy']
val_acc = history.history['val_accuracy']
loss = history.history['loss']
val_loss = history.history['val_loss']

epochs = range(len(acc))

plt.plot(epochs, acc, 'g', label='Training accuracy')
plt.plot(epochs, val_acc, 'b', label='Validation accuracy')
plt.title('Training and validation accuracy')
plt.legend(loc=0)
plt.savefig('wordA.png')
plt.figure()
plt.show()

plt.plot(epochs, loss, 'r', label='Training loss')
plt.plot(epochs, val_loss, 'o', label='Validation loss')
plt.title('Training and validation loss')
plt.legend(loc=0)
plt.savefig('wordL.png')
plt.figure()

# **# Sequence-to-sequence learning with Transformer**

*The TransformerDecoder*
 *The Transformer decoder*

In [21]:

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

class TransformerDecoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention_1 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        self.attention_2 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation="relu"),
             layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.layernorm_3 = layers.LayerNormalization()
        self.supports_masking = True

    def call(self,inputs,mask=None):
      if mask is not None:
        mask=mask[:, tf.newaxis,:]
      attention_output=self.attention(
          inputs,attention_mask=mask
      )
      proj_input=self.layernorm_1(inputs + attention_output)
      proj_output=self.dense_proj(proj_input)
      return self.layernorm_2(proj_input+proj_output)

    def get_config(self):
        config = super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "num_heads": self.num_heads,
            "dense_dim": self.dense_dim,
        })
        return config

    def get_causal_attention_mask(self, inputs):
        input_shape = tf.shape(inputs)
        batch_size, sequence_length = input_shape[0], input_shape[1]
        i = tf.range(sequence_length)[:, tf.newaxis]
        j = tf.range(sequence_length)
        mask = tf.cast(i >= j, dtype="int32")
        mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
        mult = tf.concat(
            [tf.expand_dims(batch_size, -1),
             tf.constant([1, 1], dtype=tf.int32)], axis=0)
        return tf.tile(mask, mult)

    def call(self, inputs, encoder_outputs, mask=None):
        causal_mask = self.get_causal_attention_mask(inputs)
        if mask is not None:
            padding_mask = tf.cast(
                mask[:, tf.newaxis, :], dtype="int32")
            padding_mask = tf.minimum(padding_mask, causal_mask)
        else:
            padding_mask = mask
        attention_output_1 = self.attention_1(
            query=inputs,
            value=inputs,
            key=inputs,
            attention_mask=causal_mask)
        attention_output_1 = self.layernorm_1(inputs + attention_output_1)
        attention_output_2 = self.attention_2(
            query=attention_output_1,
            value=encoder_outputs,
            key=encoder_outputs,
            attention_mask=padding_mask,
        )
        attention_output_2 = self.layernorm_2(
            attention_output_1 + attention_output_2)
        proj_output = self.dense_proj(attention_output_2)
        return self.layernorm_3(attention_output_2 + proj_output)


# PositionalEmbedding layer


```
# This is formatted as code
```



In [22]:

class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, input_dim, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.token_embeddings = layers.Embedding(
            input_dim=input_dim, output_dim=output_dim)
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=output_dim)
        self.sequence_length = sequence_length
        self.input_dim = input_dim
        self.output_dim = output_dim

    def call(self, inputs):
        length = tf.shape(inputs)[-1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.position_embeddings(positions)
        return embedded_tokens + embedded_positions

    def compute_mask(self, inputs, mask=None):
        return tf.math.not_equal(inputs, 0)

    def get_config(self):
        config = super(PositionalEmbedding, self).get_config()
        config.update({
            "output_dim": self.output_dim,
            "sequence_length": self.sequence_length,
            "input_dim": self.input_dim,
        })
        return config

In [None]:
!pip install transformers

In [23]:
import tensorflow as tf

class MyTransformerEncoder(tf.keras.layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super(MyTransformerEncoder, self).__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = tf.keras.layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.dense1 = tf.keras.layers.Dense(dense_dim, activation="relu")
        self.dense2 = tf.keras.layers.Dense(embed_dim)
        self.dropout1 = tf.keras.layers.Dropout(0.1)
        self.dropout2 = tf.keras.layers.Dropout(0.1)
        self.layer_norm1 = tf.keras.layers.LayerNormalization()
        self.layer_norm2 = tf.keras.layers.LayerNormalization()

    def call(self, inputs, training=False):
        attn_output = self.attention(inputs, inputs)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layer_norm1(inputs + attn_output)
        dense_output = self.dense1(out1)
        dense_output = self.dense2(dense_output)
        dense_output = self.dropout2(dense_output, training=training)
        out2 = self.layer_norm2(out1 + dense_output)
        return out2


In [24]:
embed_dim = 256
dense_dim = 2048
num_heads = 8

encoder_inputs = keras.Input(shape=(None,), dtype="int64", name="english")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(encoder_inputs)
encoder_outputs = MyTransformerEncoder(embed_dim, dense_dim, num_heads)(x)

decoder_inputs = keras.Input(shape=(None,), dtype="int64", name="swahili")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(decoder_inputs)
x = TransformerDecoder(embed_dim, dense_dim, num_heads)(x, encoder_outputs)
x = layers.Dropout(0.5)(x)
decoder_outputs = layers.Dense(vocab_size, activation="softmax")(x)
transformer = keras.Model([encoder_inputs, decoder_inputs], decoder_outputs)


In [25]:
transformer.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 english (InputLayer)        [(None, None)]               0         []                            
                                                                                                  
 swahili (InputLayer)        [(None, None)]               0         []                            
                                                                                                  
 positional_embedding (Posi  (None, None, 256)            3845120   ['english[0][0]']             
 tionalEmbedding)                                                                                 
                                                                                                  
 positional_embedding_1 (Po  (None, None, 256)            3845120   ['swahili[0][0]']         

# Training the sequence-to-sequence Transformer





In [None]:

transformer.compile(
    optimizer="rmsprop",
    loss="sparse_categorical_crossentropy",
    metrics=["accuracy"])
history=transformer.fit(train_ds, epochs=100, validation_data=val_ds)


Epoch 1/100
Epoch 2/100

**visualization of accuracy and validation**

In [None]:
import matplotlib.pyplot as plt

%matplotlib inline
import matplotlib.pyplot as plt
acc = history.history['accuracy']
val_acc = history.history['val_accuracy']
loss = history.history['loss']
val_loss = history.history['val_loss']

epochs = range(len(acc))

plt.plot(epochs, acc, 'g', label='Training accuracy')
plt.plot(epochs, val_acc, 'b', label='Validation accuracy')
plt.title('Training and validation accuracy')
plt.legend(loc=0)
plt.savefig('wordA2.png')
plt.figure()
plt.show()

plt.plot(epochs, loss, 'r', label='Training loss')
plt.plot(epochs, val_loss, 'o', label='Validation loss')
plt.title('Training and validation loss')
plt.legend(loc=0)
plt.savefig('wordL2.png')
plt.figure()

# Translating new sentences with our Transformer model






In [None]:


import numpy as np
swa_vocab = target_vectorization.get_vocabulary()
swa_index_lookup = dict(zip(range(len(swa_vocab)), swa_vocab))
max_decoded_sentence_length = 20

def decode_sequence(input_sentence):
    tokenized_input_sentence = source_vectorization([input_sentence])
    decoded_sentence = "[start]"
    for i in range(max_decoded_sentence_length):
        tokenized_target_sentence = target_vectorization(
            [decoded_sentence])[:, :-1]
        predictions = transformer(
            [tokenized_input_sentence, tokenized_target_sentence])
        sampled_token_index = np.argmax(predictions[0, i, :])
        sampled_token = swa_index_lookup[sampled_token_index]
        decoded_sentence += " " + sampled_token
        if sampled_token == "[end]":
            break
    return decoded_sentence

test_eng_texts = [pair[0] for pair in pairs]
for _ in range(20):
    input_sentence = random.choice(test_eng_texts)

    print("-")
    print(input_sentence)
    print(decode_sequence(input_sentence))


In [None]:
!pip install gradio

In [None]:
!pip install transformers ipywidgets gradio --upgrade

In [None]:
import gradio as gr

def greet(name):
    return "Hello " + name + "!"

demo = gr.Interface(
    fn=greet,
    inputs=gr.Textbox(lines=2, placeholder="Enter text to be Translated  Here..."),
    outputs="text",
)
demo.launch()