In [1]:
import numpy as np

import os 

import time

import typing
from typing import Any, Tuple

import tensorflow as tf
from tensorflow import keras

import tensorflow_text as tf_text

import tensorflow_addons as tfa

import sklearn
from sklearn.model_selection import train_test_split

import matplotlib.pyplot as plt
import matplotlib.ticker as ticker

 The versions of TensorFlow you are currently using is 2.10.0 and is not supported. 
Some things might work, some things might not.
If you were to encounter a bug, do not file an issue.
If you want to make sure you're using a tested and supported configuration, either change the TensorFlow version or the TensorFlow Addons's version. 
You can find the compatibility matrix in TensorFlow Addon's readme:
https://github.com/tensorflow/addons


In [2]:
import pathlib

path_to_zip = tf.keras.utils.get_file(
    'fra-eng.zip', origin='http://storage.googleapis.com/download.tensorflow.org/data/fra-eng.zip',
    extract=True, cache_dir = "D:/Programming/Python/NLP tutorial")

path_to_file = pathlib.Path(path_to_zip).parent/'fra-eng/fra.txt'

In [3]:
def load_data(path):
    text = path.read_text(encoding='utf-8')

    lines = text.splitlines()
    pairs = [line.split('\t') for line in lines]

    inp = [inp for targ, inp in pairs]
    targ = [targ for targ, inp in pairs]

    return targ, inp

In [4]:
targ, inp = load_data(path_to_file)
print(inp[-1])

Il est peut-être impossible d'obtenir un Corpus complètement dénué de fautes, étant donnée la nature de ce type d'entreprise collaborative. Cependant, si nous encourageons les membres à produire des phrases dans leurs propres langues plutôt que d'expérimenter dans les langues qu'ils apprennent, nous pourrions être en mesure de réduire les erreurs.


In [5]:
class tf_lower_and_split_punct:
    def __init__(self, start = True, end = True):
        self.start = start
        self.end = end
        
    def __call__(self, text):
        # Split accented characters.
        text = tf_text.normalize_utf8(text, 'NFKD')
        text = tf.strings.lower(text)
        # Keep space, a to z, and select punctuation.
        text = tf.strings.regex_replace(text, '[^ a-z.?!,¿]', '')
        # Add spaces around punctuation.
        text = tf.strings.regex_replace(text, '[.?!,¿]', r' \0 ')
        # Strip whitespace.
        text = tf.strings.strip(text)
        if self.start == True:
            text = tf.strings.join(['[START]', text], separator=' ')
        if self.end == True:
            text = tf.strings.join([text, '[END]'], separator=' ')

        return text

In [6]:
max_vocab_size = 5000
BUFFER_SIZE = 32000
BATCH_SIZE = 64
max_input_sequence = 20
max_output_sequence = 20

input_text_processor = tf.keras.layers.TextVectorization(
    standardize=tf_lower_and_split_punct(),
    max_tokens=max_vocab_size,
    output_sequence_length=max_input_sequence)

output_text_processor = tf.keras.layers.TextVectorization(
    standardize=tf_lower_and_split_punct(),
    max_tokens=max_vocab_size,
    output_sequence_length=max_output_sequence)

In [7]:
input_text_processor.adapt(inp)
output_text_processor.adapt(targ)

In [8]:
train_input_data,valid_input_data, train_output_data, valid_output_data = train_test_split(inp, targ, test_size = 0.2)

In [9]:
train_dataset = tf.data.Dataset.from_tensor_slices((train_input_data, train_output_data))
valid_dataset = tf.data.Dataset.from_tensor_slices((valid_input_data, valid_output_data))

In [10]:
def make_batches(ds):
    return (ds
    .cache()
    .shuffle(BUFFER_SIZE)
    .batch(BATCH_SIZE)
    .map(lambda x, y : (input_text_processor(x), output_text_processor(y)), num_parallel_calls=tf.data.AUTOTUNE)
    .prefetch(buffer_size=tf.data.AUTOTUNE))

train_batches = make_batches(train_dataset)
valid_batches = make_batches(valid_dataset)

In [11]:
example_input_batch, example_output_batch = next(iter(train_batches))
example_input_batch, example_output_batch

(<tf.Tensor: shape=(64, 20), dtype=int64, numpy=
 array([[   2,   26,   57, ...,    0,    0,    0],
        [   2,   15,    7, ...,    0,    0,    0],
        [   2,    5,  168, ...,    0,    0,    0],
        ...,
        [   2,   15,    7, ...,    0,    0,    0],
        [   2,   15,    7, ...,    0,    0,    0],
        [   2,   30, 3997, ...,    0,    0,    0]], dtype=int64)>,
 <tf.Tensor: shape=(64, 20), dtype=int64, numpy=
 array([[   2,    5,  120, ...,    0,    0,    0],
        [   2,  142, 1082, ...,    0,    0,    0],
        [   2,    5,  120, ...,    0,    0,    0],
        ...,
        [   2,   14,  310, ...,    0,    0,    0],
        [   2,   14,  244, ...,    0,    0,    0],
        [   2,   52,  231, ...,    0,    0,    0]], dtype=int64)>)

In [12]:
embedding_dim = 256
units = 1024
num_examples = 30000
steps_per_epoch = num_examples//BATCH_SIZE

In [13]:
class Encoder(tf.keras.layers.Layer):
    def __init__(self, vocab_size, embedding_dim, enc_units, batch_size):
        super(Encoder, self).__init__()
        self.batch_size = batch_size
        self.enc_units = enc_units
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        
        self.lstm_layer = tf.keras.layers.LSTM(self.enc_units,
                                              return_sequences = True,
                                              return_state = True,
                                              recurrent_initializer = "glorot_uniform")
        
    def call(self, inp, hidden = None):
        inp = self.embedding(inp)
        output, h, c = self.lstm_layer(inp, initial_state = hidden)
        return output, h, c

    def initialize_hidden_state(self):
        return [tf.fill((self.batch_size, self.enc_units), 0.), tf.fill((self.batch_size, self.enc_units), 0.)]

In [14]:
encoder = Encoder(max_vocab_size, embedding_dim, units, BATCH_SIZE)

sample_hidden = encoder.initialize_hidden_state()
sample_output, sample_h, sample_c = encoder(example_input_batch, sample_hidden)
print ('Encoder output shape: (batch size, sequence length, units) {}'.format(sample_output.shape))
print ('Encoder h vecotr shape: (batch size, units) {}'.format(sample_h.shape))
print ('Encoder c vector shape: (batch size, units) {}'.format(sample_c.shape))

Encoder output shape: (batch size, sequence length, units) (64, 20, 1024)
Encoder h vecotr shape: (batch size, units) (64, 1024)
Encoder c vector shape: (batch size, units) (64, 1024)


In [15]:
class Decoder(tf.keras.layers.Layer):
    def __init__(self, vocab_size, embedding_dim, dec_units, batch_size, attention_type = "luong"):
        super(Decoder, self).__init__()
        self.batch_size = batch_size
        self.dec_units = dec_units
        self.attention_type = attention_type
        
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.fc = tf.keras.layers.Dense(vocab_size)
        
        self.decoder_rnn_cell = tf.keras.layers.LSTMCell(self.dec_units)
        
        self.sampler = tfa.seq2seq.sampler.TrainingSampler()
        self.attention_mechanism = self.build_attention_mechanism(self.dec_units, 
                                                              None, self.batch_size*[max_input_sequence], self.attention_type)
        
        self.rnn_cell = self.build_rnn_cell(batch_size)
        self.decoder = tfa.seq2seq.BasicDecoder(self.rnn_cell, 
                                                sampler=self.sampler, 
                                                output_layer=self.fc)
    def build_rnn_cell(self, batch_size):
        rnn_cell = tfa.seq2seq.AttentionWrapper(self.decoder_rnn_cell, 
                                                self.attention_mechanism, 
                                                attention_layer_size=self.dec_units)
        return rnn_cell
    
    def build_attention_mechanism(self, dec_units, memory, memory_sequence_length, attention_type='luong'):
        if(attention_type=='bahdanau'):
            return tfa.seq2seq.BahdanauAttention(units=dec_units, 
                                                 memory=memory, 
                                                 memory_sequence_length=memory_sequence_length)
        else:
            return tfa.seq2seq.LuongAttention(units=dec_units, 
                                              memory=memory, 
                                              memory_sequence_length=memory_sequence_length)
        
    def build_initial_state(self, batch_size, encoder_state, dtype):
        decoder_initial_state = self.rnn_cell.get_initial_state(batch_size=batch_size, dtype=dtype)
        decoder_initial_state = decoder_initial_state.clone(cell_state=encoder_state)
        return decoder_initial_state
    
    def call(self, inputs, initial_state):
        x = self.embedding(inputs)
        outputs, _, _ = self.decoder(x, initial_state = initial_state,
                                    sequence_length = self.batch_size*[max_input_sequence-1])
        return outputs

In [16]:
decoder = Decoder(max_vocab_size, embedding_dim, units, BATCH_SIZE, 'luong')
sample_x = tf.random.uniform((BATCH_SIZE, max_output_sequence))
decoder.attention_mechanism.setup_memory(sample_output)
initial_state = decoder.build_initial_state(BATCH_SIZE, [sample_h, sample_c], tf.float32)

sample_decoder_outputs = decoder(sample_x, initial_state)

print("Decoder Outputs Shape: ", sample_decoder_outputs.rnn_output.shape)

Decoder Outputs Shape:  (64, 19, 5000)


In [17]:
optimizer = tf.keras.optimizers.Nadam()

def loss_func(real, pred):
    cross_entropy = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none')
    loss = cross_entropy(y_true=real, y_pred=pred)
    mask = tf.logical_not(tf.math.equal(real,0))
    mask = tf.cast(mask, dtype=loss.dtype)
    loss = mask* loss
    loss = tf.reduce_mean(loss)
    return loss 

In [18]:
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
checkpoint = tf.train.Checkpoint(optimizer=optimizer,
                                 encoder=encoder,
                                 decoder=decoder)

In [19]:
@tf.function
def train_step(inp, targ, enc_hidden):
    loss = 0
    
    with tf.GradientTape() as tape:
        enc_output, enc_h, enc_c = encoder(inp, enc_hidden)
        
        dec_input = targ[ : , :-1 ]
        real = targ[ : , 1: ]
        
        decoder.attention_mechanism.setup_memory(enc_output)
        
        decoder_initial_state = decoder.build_initial_state(BATCH_SIZE, [enc_h, enc_c], tf.float32)
        pred = decoder(dec_input, decoder_initial_state)
        logits = pred.rnn_output
        loss = loss_func(real, logits)
        
    variables = encoder.trainable_variables + decoder.trainable_variables
    gradients = tape.gradient(loss, variables)
    optimizer.apply_gradients(zip(gradients, variables))
    
    return loss

In [20]:
EPOCHS = 10

for epoch in range(EPOCHS):
    start = time.time()
    
    enc_hidden = encoder.initialize_hidden_state()
    
    total_loss = 0
    
    for (batch, (inp, targ)) in enumerate(train_batches.take(steps_per_epoch)):
        batch_loss = train_step(inp, targ, enc_hidden)
        total_loss += batch_loss
        
        if batch % 100 == 0:
            print('Epoch {} Batch {} Loss {:.4f}'.format(epoch + 1,
                                                   batch,
                                                   batch_loss.numpy()))
            
    if (epoch + 1) % 2 == 0:
        checkpoint.save(file_prefix = checkpoint_prefix)
    
    print('Epoch {} Loss {:.4f}'.format(epoch + 1,
                                      total_loss / steps_per_epoch))
    print('Time taken for 1 epoch {} sec\n'.format(time.time() - start))

Epoch 1 Batch 0 Loss 3.6282
Epoch 1 Batch 100 Loss 1.9178
Epoch 1 Batch 200 Loss 1.9207
Epoch 1 Batch 300 Loss 1.8502
Epoch 1 Batch 400 Loss 1.6136
Epoch 1 Loss 1.8780
Time taken for 1 epoch 115.44803738594055 sec

Epoch 2 Batch 0 Loss 1.4823
Epoch 2 Batch 100 Loss 1.6008
Epoch 2 Batch 200 Loss 1.4328
Epoch 2 Batch 300 Loss 1.2871
Epoch 2 Batch 400 Loss 1.3157
Epoch 2 Loss 1.4232
Time taken for 1 epoch 105.48214387893677 sec

Epoch 3 Batch 0 Loss 1.3612
Epoch 3 Batch 100 Loss 1.2288
Epoch 3 Batch 200 Loss 1.2377
Epoch 3 Batch 300 Loss 0.9992
Epoch 3 Batch 400 Loss 0.9637
Epoch 3 Loss 1.1389
Time taken for 1 epoch 104.0552990436554 sec

Epoch 4 Batch 0 Loss 0.8672
Epoch 4 Batch 100 Loss 0.7556
Epoch 4 Batch 200 Loss 0.7260
Epoch 4 Batch 300 Loss 0.8262
Epoch 4 Batch 400 Loss 0.8023
Epoch 4 Loss 0.7979
Time taken for 1 epoch 106.8150954246521 sec

Epoch 5 Batch 0 Loss 0.5736
Epoch 5 Batch 100 Loss 0.6781
Epoch 5 Batch 200 Loss 0.5878
Epoch 5 Batch 300 Loss 0.5498
Epoch 5 Batch 400 Loss 0

In [21]:
class Translator(tf.Module):
    def __init__(self, encoder, decoder, input_text_processor, output_text_processor):
        self.encoder = encoder
        self.decoder = decoder
        self.input_text_processor = input_text_processor
        self.output_text_processor = output_text_processor
        
        self.output_token_string_from_index = (
            tf.keras.layers.StringLookup(
                vocabulary=output_text_processor.get_vocabulary(),
                mask_token='',
                invert=True))
        
        index_from_string = tf.keras.layers.StringLookup(
            vocabulary=output_text_processor.get_vocabulary(), mask_token='')
        
        token_mask_ids = index_from_string(['', '[UNK]', '[START]']).numpy()
        
        token_mask = np.zeros([index_from_string.vocabulary_size()], dtype=np.bool)
        token_mask[np.array(token_mask_ids)] = True
        self.token_mask = token_mask
        
        self.start_token = index_from_string(tf.constant('[START]'))
        self.end_token = index_from_string(tf.constant('[END]'))

In [22]:
translator = Translator(
    encoder= encoder,
    decoder= decoder,
    input_text_processor=input_text_processor,
    output_text_processor=output_text_processor,
)

Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  token_mask = np.zeros([index_from_string.vocabulary_size()], dtype=np.bool)


In [23]:
def tokens_to_text(self, result_tokens):
    print("tokens : ",result_tokens)
    result_text_tokens = self.output_token_string_from_index(result_tokens)
    result_text = tf.strings.reduce_join(result_text_tokens,
                                       axis=1, separator=' ')
    result_text = tf.strings.strip(result_text)
    return result_text

In [24]:
Translator.tokens_to_text = tokens_to_text

In [25]:
def tf_sample(self, sentence):
    inputs = input_text_processor(sentence)
    
    inference_batch_size = tf.shape(inputs)[0]
    
    enc_start_state = [tf.fill([inference_batch_size, units], 0.), tf.fill([inference_batch_size,units], 0.)]
    
    print(inputs)
    enc_out, enc_h, enc_c = self.encoder(inputs, enc_start_state)
    
    dec_h = enc_h
    dec_c = enc_c
    
    start_tokens = tf.fill([inference_batch_size],  2)
    end_token = 3 #output_text_processor.get_vocabulary().index('[END]')
    
    greedy_sampler = tfa.seq2seq.GreedyEmbeddingSampler()
    
    decoder_instance = tfa.seq2seq.BasicDecoder(cell=self.decoder.rnn_cell, 
                                                sampler=greedy_sampler, 
                                                output_layer=self.decoder.fc)
    self.decoder.attention_mechanism.setup_memory(enc_out)
    
    decoder_initial_state = self.decoder.build_initial_state(inference_batch_size, [enc_h, enc_c], tf.float32)
    
    decoder_embedding_matrix = self.decoder.embedding.variables[0]
    
    outputs, _, _ = decoder_instance(decoder_embedding_matrix, 
                                     start_tokens = start_tokens, 
                                     end_token= end_token, 
                                     initial_state=decoder_initial_state)
    return outputs.sample_id


In [26]:
Translator.tf_sample = tf_sample

In [27]:
@tf.function(input_signature=[tf.TensorSpec(dtype=tf.string, shape=[None])])
def __call__(self, sentence):
    sample_id = self.tf_sample(sentence)
    return self.tokens_to_text(sample_id)

In [28]:
Translator.__call__ = __call__

In [29]:
input_text = tf.constant([
    "Je veux un taxi près de l'aéroport", 
    "ou est la table"
])

In [30]:
result = translator(input_text)
result

Tensor("text_vectorization/RaggedToTensor/RaggedTensorToTensor:0", shape=(None, 20), dtype=int64)
tokens :  Tensor("basic_decoder/decoder/transpose_1:0", shape=(None, None), dtype=int32)


<tf.Tensor: shape=(2,), dtype=string, numpy=
array([b'i want a cab near the airport . [END]',
       b'where is the table ? [END] . [END] .'], dtype=object)>

In [31]:
class ExportTranslator(tf.Module):
  def __init__(self, translator):
    self.translator = translator

  @tf.function(input_signature=[tf.TensorSpec(shape=[None], dtype=tf.string)])
  def __call__(self, sentence):
    result = self.translator(sentence)

    return result

In [32]:
translator = ExportTranslator(translator)

In [33]:
tf.saved_model.save(translator, export_dir = "attention_translator")

Tensor("text_vectorization/RaggedToTensor/RaggedTensorToTensor:0", shape=(None, 20), dtype=int64)
tokens :  Tensor("basic_decoder/decoder/transpose_1:0", shape=(None, None), dtype=int32)




INFO:tensorflow:Assets written to: attention_translator\assets


INFO:tensorflow:Assets written to: attention_translator\assets


In [34]:
reloaded = tf.saved_model.load("attention_translator")

In [35]:
reloaded(input_text)

<tf.Tensor: shape=(2,), dtype=string, numpy=
array([b'i want a cab near the airport . [END]',
       b'where is the table ? [END] . [END] .'], dtype=object)>