In [None]:
from google.colab import drive
drive.mount('/content/drive')





Mounted at /content/drive


In [None]:
import pathlib
import random
import string
import re
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.layers import TextVectorization

In [None]:
# Download the file
path_abm = 'drive/MyDrive/Bambara_translation/MT Data/bm_fr/train.bam.txt'
path_fr = 'drive/MyDrive/Bambara_translation/MT Data/bm_fr/train.fr.txt'

In [None]:
text_pairs=[]

with open(path_abm) as fp:
    data_bm = fp.readlines()

with open(path_fr) as fp:
    data_fr = fp.readlines()

for line1, line2 in zip(data_bm, data_fr):
  b=line1.split('\n')[0] 
  f= '[start]' + line2.split('\n')[0]+'[end]'
  text_pairs.append((b,f))
  



FileNotFoundError: ignored

In [None]:
import random

for _ in range(5):
    print(random.choice(text_pairs))

('An ka Ala deli !', '[start]Prions ![end]')
('gundo mara', '[start]garder secret[end]')
('I hakili jigin a la ko...', '[start]Souvenez-vous que...[end]')
('Gafe in ɲɛsinnen don jɔn ma ?', '[start]Ce livre a été écrit pour qui ?[end]')
('A ye muso maminɛ.', '[start]Il a fiancé une femme.[end]')


In [None]:
random.shuffle(text_pairs)
num_val_samples = int(0.15 * len(text_pairs))
num_train_samples = len(text_pairs) - 2 * num_val_samples
train_pairs = text_pairs[:num_train_samples]
val_pairs = text_pairs[num_train_samples : num_train_samples + num_val_samples]
test_pairs = text_pairs[num_train_samples + num_val_samples :]

print(f"{len(text_pairs)} total pairs")
print(f"{len(train_pairs)} training pairs")
print(f"{len(val_pairs)} validation pairs")
print(f"{len(test_pairs)} test pairs")

1558 total pairs
1092 training pairs
233 validation pairs
233 test pairs


In [None]:
strip_chars = string.punctuation + "¿"
strip_chars = strip_chars.replace("[", "")
strip_chars = strip_chars.replace("]", "")

vocab_size = 15000
sequence_length = 20
batch_size = 64


def custom_standardization(input_string):
    lowercase = tf.strings.lower(input_string)
    return tf.strings.regex_replace(lowercase, "[%s]" % re.escape(strip_chars), "")


eng_vectorization = TextVectorization(
    max_tokens=vocab_size, output_mode="int", output_sequence_length=sequence_length,
)
spa_vectorization = TextVectorization(
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=sequence_length + 1,
    standardize=custom_standardization,
)
train_eng_texts = [pair[0] for pair in train_pairs]
train_spa_texts = [pair[1] for pair in train_pairs]
eng_vectorization.adapt(train_eng_texts)
spa_vectorization.adapt(train_spa_texts)

In [None]:
def format_dataset(eng, spa):
    eng = eng_vectorization(eng)
    spa = spa_vectorization(spa)
    return ({"encoder_inputs": eng, "decoder_inputs": spa[:, :-1],}, spa[:, 1:])


def make_dataset(pairs):
    eng_texts, spa_texts = zip(*pairs)
    eng_texts = list(eng_texts)
    spa_texts = list(spa_texts)
    dataset = tf.data.Dataset.from_tensor_slices((eng_texts, spa_texts))
    dataset = dataset.batch(batch_size)
    dataset = dataset.map(format_dataset)
    return dataset.shuffle(2048).prefetch(16).cache()


train_ds = make_dataset(train_pairs)
val_ds = make_dataset(val_pairs)

In [None]:
for inputs, targets in train_ds.take(1):
    print(f'inputs["encoder_inputs"].shape: {inputs["encoder_inputs"].shape}')
    print(f'inputs["decoder_inputs"].shape: {inputs["decoder_inputs"].shape}')
    print(f"targets.shape: {targets.shape}")

inputs["encoder_inputs"].shape: (64, 20)
inputs["decoder_inputs"].shape: (64, 20)
targets.shape: (64, 20)


In [None]:
class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation="relu"), layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.supports_masking = True

    def call(self, inputs, mask=None):
        if mask is not None:
            padding_mask = tf.cast(mask[:, tf.newaxis, tf.newaxis, :], dtype="int32")
        attention_output = self.attention(
            query=inputs, value=inputs, key=inputs, attention_mask=padding_mask
        )
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)


class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, vocab_size, embed_dim, **kwargs):
        super().__init__(**kwargs)
        self.token_embeddings = layers.Embedding(
            input_dim=vocab_size, output_dim=embed_dim
        )
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=embed_dim
        )
        self.sequence_length = sequence_length
        self.vocab_size = vocab_size
        self.embed_dim = embed_dim

    def call(self, inputs):
        length = tf.shape(inputs)[-1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.position_embeddings(positions)
        return embedded_tokens + embedded_positions

    def compute_mask(self, inputs, mask=None):
        return tf.math.not_equal(inputs, 0)


class TransformerDecoder(layers.Layer):
    def __init__(self, embed_dim, latent_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.latent_dim = latent_dim
        self.num_heads = num_heads
        self.attention_1 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.attention_2 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.dense_proj = keras.Sequential(
            [layers.Dense(latent_dim, activation="relu"), layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.layernorm_3 = layers.LayerNormalization()
        self.supports_masking = True

    def call(self, inputs, encoder_outputs, mask=None):
        causal_mask = self.get_causal_attention_mask(inputs)
        if mask is not None:
            padding_mask = tf.cast(mask[:, tf.newaxis, :], dtype="int32")
            padding_mask = tf.minimum(padding_mask, causal_mask)

        attention_output_1 = self.attention_1(
            query=inputs, value=inputs, key=inputs, attention_mask=causal_mask
        )
        out_1 = self.layernorm_1(inputs + attention_output_1)

        attention_output_2 = self.attention_2(
            query=out_1,
            value=encoder_outputs,
            key=encoder_outputs,
            attention_mask=padding_mask,
        )
        out_2 = self.layernorm_2(out_1 + attention_output_2)

        proj_output = self.dense_proj(out_2)
        return self.layernorm_3(out_2 + proj_output)

    def get_causal_attention_mask(self, inputs):
        input_shape = tf.shape(inputs)
        batch_size, sequence_length = input_shape[0], input_shape[1]
        i = tf.range(sequence_length)[:, tf.newaxis]
        j = tf.range(sequence_length)
        mask = tf.cast(i >= j, dtype="int32")
        mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
        mult = tf.concat(
            [tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)],
            axis=0,
        )
        return tf.tile(mask, mult)

In [None]:
embed_dim = 256
latent_dim = 2048
num_heads = 8

encoder_inputs = keras.Input(shape=(None,), dtype="int64", name="encoder_inputs")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(encoder_inputs)
encoder_outputs = TransformerEncoder(embed_dim, latent_dim, num_heads)(x)
encoder = keras.Model(encoder_inputs, encoder_outputs)

decoder_inputs = keras.Input(shape=(None,), dtype="int64", name="decoder_inputs")
encoded_seq_inputs = keras.Input(shape=(None, embed_dim), name="decoder_state_inputs")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(decoder_inputs)
x = TransformerDecoder(embed_dim, latent_dim, num_heads)(x, encoded_seq_inputs)
x = layers.Dropout(0.5)(x)
decoder_outputs = layers.Dense(vocab_size, activation="softmax")(x)
decoder = keras.Model([decoder_inputs, encoded_seq_inputs], decoder_outputs)

decoder_outputs = decoder([decoder_inputs, encoder_outputs])
transformer = keras.Model(
    [encoder_inputs, decoder_inputs], decoder_outputs, name="transformer"
)

In [None]:
epochs = 100  # This should be at least 30 for convergence

transformer.summary()
transformer.compile(
    "rmsprop", loss="sparse_categorical_crossentropy", metrics=["accuracy"]
)
transformer.fit(train_ds, epochs=epochs, validation_data=val_ds)

Model: "transformer"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 encoder_inputs (InputLayer)    [(None, None)]       0           []                               
                                                                                                  
 positional_embedding (Position  (None, None, 256)   3845120     ['encoder_inputs[0][0]']         
 alEmbedding)                                                                                     
                                                                                                  
 decoder_inputs (InputLayer)    [(None, None)]       0           []                               
                                                                                                  
 transformer_encoder (Transform  (None, None, 256)   3155456     ['positional_embedding[

In [None]:
spa_vocab = spa_vectorization.get_vocabulary()
spa_index_lookup = dict(zip(range(len(spa_vocab)), spa_vocab))
max_decoded_sentence_length = 20


def decode_sequence(input_sentence):
    tokenized_input_sentence = eng_vectorization([input_sentence])
    decoded_sentence = "[start]"
    for i in range(max_decoded_sentence_length):
        tokenized_target_sentence = spa_vectorization([decoded_sentence])[:, :-1]
        predictions = transformer([tokenized_input_sentence, tokenized_target_sentence])

        sampled_token_index = np.argmax(predictions[0, i, :])
        sampled_token = spa_index_lookup[sampled_token_index]
        decoded_sentence += " " + sampled_token

        if sampled_token == "[end]":
            break
    return decoded_sentence


test_eng_texts = [pair[0] for pair in test_pairs]
for _ in range(30):
    input_sentence = random.choice(test_eng_texts)
    translated = decode_sequence(input_sentence)
    print(input_sentence, translated)

In [None]:
translated

In [None]:
input_sentence

In [None]:
num_examples = 30000
input_tensor, target_tensor, inp_lang, targ_lang, max_length_inp, max_length_targ = load_dataset(path_to_file, num_examples)

In [None]:
input_tensor_train, input_tensor_val, target_tensor_train, target_tensor_val = train_test_split(input_tensor, target_tensor, test_size=0.2)

In [None]:
BUFFER_SIZE = len(input_tensor_train)
BATCH_SIZE = 64
N_BATCH = BUFFER_SIZE//BATCH_SIZE
embedding_dim = 256
units = 1024
vocab_inp_size = len(inp_lang.word2idx)
vocab_tar_size = len(targ_lang.word2idx)
dataset = tf.data.Dataset.from_tensor_slices((input_tensor_train, target_tensor_train)).shuffle(BUFFER_SIZE)
dataset = dataset.batch(BATCH_SIZE, drop_remainder=True)

In [None]:
def gru(units):
 # If you have a GPU, we recommend using CuDNNGRU(provides a 3x speedup than GRU)
 # the code automatically does that.
 if tf.test.is_gpu_available():
   return tf.keras.layers.CuDNNGRU(units,
   return_sequences=True,
   return_state=True,
   recurrent_initializer=’glorot_uniform’)
 else:
   return tf.keras.layers.GRU(units,
   return_sequences=True,
   return_state=True,
   recurrent_activation=’sigmoid’,
   recurrent_initializer=’glorot_uniform’)
class Encoder(tf.keras.Model):
 def __init__(self, vocab_size, embedding_dim, enc_units, batch_sz):
   super(Encoder, self).__init__()
   self.batch_sz = batch_sz
   self.enc_units = enc_units
   self.embedding = tf.keras.layers.Embedding(vocab_size,   embedding_dim)
   self.gru = gru(self.enc_units)
 
 def call(self, x, hidden):
   x = self.embedding(x)
   output, state = self.gru(x, initial_state = hidden) 
   return output, state
 
 def initialize_hidden_state(self):
   return tf.zeros((self.batch_sz, self.enc_units))
class Decoder(tf.keras.Model):
    def __init__(self, vocab_size, embedding_dim, dec_units, batch_sz):
        super(Decoder, self).__init__()
        self.batch_sz = batch_sz
        self.dec_units = dec_units
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.gru = gru(self.dec_units)
        self.fc = tf.keras.layers.Dense(vocab_size)
       
        # used for attention
        self.W1 = tf.keras.layers.Dense(self.dec_units)
        self.W2 = tf.keras.layers.Dense(self.dec_units)
        self.V = tf.keras.layers.Dense(1)
       
    def call(self, x, hidden, enc_output):
        # enc_output shape == (batch_size, max_length, hidden_size)
       
        # hidden shape == (batch_size, hidden size)
     # hidden_with_time_axis shape == (batch_size, 1, hidden size)
     # we are doing this to perform addition to calculate the score
        hidden_with_time_axis = tf.expand_dims(hidden, 1)
       
        # score shape == (batch_size, max_length, 1)
        # we get 1 at the last axis because we are applying tanh(FC(EO) + FC(H)) to self.V
        score = self.V(tf.nn.tanh(self.W1(enc_output) + self.W2(hidden_with_time_axis)))
       
        # attention_weights shape == (batch_size, max_length, 1)
        attention_weights = tf.nn.softmax(score, axis=1)
       
        # context_vector shape after sum == (batch_size, hidden_size)
        context_vector = attention_weights * enc_output
        context_vector = tf.reduce_sum(context_vector, axis=1)
       
        # x shape after passing through embedding == (batch_size, 1, embedding_dim)
        x = self.embedding(x)
       
        # x shape after concatenation == (batch_size, 1, embedding_dim + hidden_size)
        x = tf.concat([tf.expand_dims(context_vector, 1), x], axis=-1)
       
        # passing the concatenated vector to the GRU
        output, state = self.gru(x)
       
        # output shape == (batch_size * 1, hidden_size)
        output = tf.reshape(output, (-1, output.shape[2]))
       
        # output shape == (batch_size * 1, vocab)
        x = self.fc(output)
       
        return x, state, attention_weights
       
    def initialize_hidden_state(self):
        return tf.zeros((self.batch_sz, self.dec_units))
encoder = Encoder(vocab_inp_size, embedding_dim, units, BATCH_SIZE)
decoder = Decoder(vocab_tar_size, embedding_dim, units, BATCH_SIZE)


In [None]:
optimizer = tf.train.AdamOptimizer()
def loss_function(real, pred):
 mask = 1 — np.equal(real, 0)
 loss_ = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=real, logits=pred) * mask
 return tf.reduce_mean(loss_)
checkpoint_dir = ‘./training_checkpoints’
checkpoint_prefix = os.path.join(checkpoint_dir, “ckpt”)
checkpoint = tf.train.Checkpoint(optimizer=optimizer,
 encoder=encoder,
 decoder=decoder)

In [None]:
EPOCHS = 10
for epoch in range(EPOCHS):
 start = time.time()
 
 hidden = encoder.initialize_hidden_state()
 total_loss = 0
 
 for (batch, (inp, targ)) in enumerate(dataset):
   loss = 0
 
   with tf.GradientTape() as tape:
     enc_output, enc_hidden = encoder(inp, hidden)
 
     dec_hidden = enc_hidden
 
     dec_input = tf.expand_dims([targ_lang.word2idx[‘<start>’]] *  BATCH_SIZE, 1) 
 
     # Teacher forcing — feeding the target as the next input
     for t in range(1, targ.shape[1]):
       # passing enc_output to the decoder
       predictions, dec_hidden, _ = decoder(dec_input, dec_hidden, enc_output)
 
       loss += loss_function(targ[:, t], predictions)
 
       # using teacher forcing
       dec_input = tf.expand_dims(targ[:, t], 1)
 
   batch_loss = (loss / int(targ.shape[1]))
 
   total_loss += batch_loss
 
   variables = encoder.variables + decoder.variables
 
   gradients = tape.gradient(loss, variables)
 
   optimizer.apply_gradients(zip(gradients, variables))
 
   if batch % 100 == 0:
     print(‘Epoch {} Batch {} Loss {:.4f}’.format(epoch + 1,
     batch,
     batch_loss.numpy()))
 # saving (checkpoint) the model every 2 epochs
 if (epoch + 1) % 2 == 0:
   checkpoint.save(file_prefix = checkpoint_prefix)
 
 print(‘Epoch {} Loss {:.4f}’.format(epoch + 1,
   total_loss / N_BATCH))
 print(‘Time taken for 1 epoch {} sec\n’.format(time.time() —   start))

In [None]:
def evaluate(sentence, encoder, decoder, inp_lang, targ_lang, max_length_inp, max_length_targ):
 attention_plot = np.zeros((max_length_targ, max_length_inp))
 
 sentence = preprocess_sentence(sentence)
inputs = [inp_lang.word2idx[i] for i in sentence.split(‘ ‘)]
 inputs = tf.keras.preprocessing.sequence.pad_sequences([inputs],   maxlen=max_length_inp, padding=’post’)
 inputs = tf.convert_to_tensor(inputs)
 
 result = ‘’
hidden = [tf.zeros((1, units))]
 enc_out, enc_hidden = encoder(inputs, hidden)
dec_hidden = enc_hidden
 dec_input = tf.expand_dims([targ_lang.word2idx[‘<start>’]], 0)
for t in range(max_length_targ):
   predictions, dec_hidden, attention_weights = decoder(dec_input, dec_hidden, enc_out)
 
 # storing the attention weights to plot later on
   attention_weights = tf.reshape(attention_weights, (-1, ))
   attention_plot[t] = attention_weights.numpy()
predicted_id = tf.argmax(predictions[0]).numpy()
result += targ_lang.idx2word[predicted_id] + ‘ ‘
if targ_lang.idx2word[predicted_id] == ‘<end>’:
     return result, sentence, attention_plot
 
 # the predicted ID is fed back into the model
   dec_input = tf.expand_dims([predicted_id], 0)
return result, sentence, attention_plot

In [None]:
# function for plotting the attention weights
def plot_attention(attention, sentence, predicted_sentence):
 fig = plt.figure(figsize=(10,10))
 ax = fig.add_subplot(1, 1, 1)
 ax.matshow(attention, cmap=’viridis’)
 
 fontdict = {‘fontsize’: 14}
 
 ax.set_xticklabels([‘’] + sentence, fontdict=fontdict, rotation=90)
 ax.set_yticklabels([‘’] + predicted_sentence, fontdict=fontdict)
plt.show()
def translate(sentence, encoder, decoder, inp_lang, targ_lang, max_length_inp, max_length_targ):
 result, sentence, attention_plot = evaluate(sentence, encoder, decoder, inp_lang, targ_lang, max_length_inp, max_length_targ)
 
 print(‘Input: {}’.format(sentence))
 print(‘Predicted translation: {}’.format(result))
 
 attention_plot = attention_plot[:len(result.split(‘ ‘)), :len(sentence.split(‘ ‘))]
 plot_attention(attention_plot, sentence.split(‘ ‘), result.split(‘ ‘))

In [None]:
# restoring the latest checkpoint in checkpoint_dir
checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))