In [None]:
import tensorflow as tf
import numpy as np
import re
import os
import zipfile

In [None]:
dataset_link = "/content/fra-eng.zip"
zip_ref = zipfile.ZipFile(dataset_link)
zip_ref.extractall()
zip_ref.close()

In [None]:
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
def tokenize(texts):
  tokenizer = Tokenizer()
  tokenizer.fit_on_texts(texts)
  return tokenizer



In [None]:
link = "/content/fra.txt"
en = []
fr = []
with open(link, "r") as f:
  for line in f:
    eng, fra, _ = line.strip().split('\t')
    en.append(eng)
    fr.append(fra)

In [None]:
en

In [None]:
fr

In [None]:
import unicodedata
def data_cleaning(texts):

  sents = []
  for text in texts:
    sent = text.lower()
    sent = sent.strip()
    sent = re.sub(r"(@[A-Za-z0-9]+)|([^0-9A-Za-z \t])|(\w+:\/\/\S+)|^rt|http.+?", "", sent)
    sent = re.sub(r"\d+", "", sent)
    sents.append(sent)
  return sents

In [None]:
def fra_sents(fra_sents):
  fra_sents_in = []
  fra_sents_out = []
  for sent in fra_sents:
    sent_in = "BOS " + sent
    sent_out = sent + " EOS"
    fra_sents_in.append(sent_in)
    fra_sents_out.append(sent_out)

  return fra_sents_in, fra_sents_out

In [None]:
en_sent = data_cleaning(en)
fra_sent = data_cleaning(fr)

In [None]:
en_sent

In [None]:
fra_sents_in, fra_sents_out = fra_sents(fra_sent)


In [None]:
fra_sent[0]

'va '

In [None]:
len(fra_sents_in)

191954

In [None]:
fra_sents_in[0]

'BOS va '

In [None]:
en_sent[0]

'go'

In [None]:
fra_sents_in[1]

'BOS marche'

In [None]:
class Encoder(tf.keras.Model):
  def __init__(self, vocab_size, embedding_dim, maxlen, encoder_dim):
    super(Encoder, self).__init__()
    self.encoder_dim = encoder_dim
    self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim, input_length=maxlen)
    self.rnn = tf.keras.layers.GRU(encoder_dim, return_sequences=False, return_state=True)

  def call(self, x, state):
    x = self.embedding(x)
    x, state = self.rnn(x, initial_state=state)

    return x, state

  def init_state(self, batch_size):
    return tf.zeros((batch_size, self.encoder_dim))

In [None]:
class Decoder(tf.keras.Model):
  def __init__(self, vocab_size, embedding_dim, maxlen, decoder_dim):
    super(Decoder, self).__init__()
    self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim, input_length=maxlen)
    self.rnn = tf.keras.layers.GRU(decoder_dim, return_sequences=True, return_state=True)
    self.dense = tf.keras.layers.Dense(vocab_size)

  def call(self, x, state):
    x = self.embedding(x)
    x, state = self.rnn(x, state)
    x = self.dense(x)

    return x, state

In [None]:
def loss_fn(ytrue, ypred):
  scce = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
  mask = tf.math.logical_not(tf.math.equal(ytrue, 0))
  mask = tf.cast(mask, dtype=tf.int64)
  loss = scce(ytrue, ypred, sample_weight=mask)

  return loss

In [None]:
NUM_SENT_PAIRS = 30000
EMBEDDING_DIM = 256
ENCODER_DIM, DECODER_DIM = 1024, 1024
BATCH_SIZE = 64
NUM_EPOCHS = 30

In [None]:
tokenizer_en = tokenize(en_sent)
tokenizer_fr = Tokenizer()
tokenizer_fr.fit_on_texts(fra_sents_in)
tokenizer_fr.fit_on_texts(fra_sents_out)

In [None]:
en_wordtoindex = tokenizer_en.word_index
fr_wordtoindex = tokenizer_fr.word_index

In [None]:
en_wordtoindex

In [None]:
fr_wordtoindex

In [None]:
data_en = tokenizer_en.texts_to_sequences(en_sent)
max_seq_len = max([len(w) for w in data_en])
data_en = pad_sequences(data_en, maxlen = max_seq_len, padding="post")

data_fr_in = tokenizer_fr.texts_to_sequences(fra_sents_in)
max_seq_len_fr_in = max([len(w) for w in data_fr_in])
data_fr_in = pad_sequences(data_fr_in, maxlen = max_seq_len_fr_in, padding="post")

data_fr_out = tokenizer_fr.texts_to_sequences(fra_sents_out)
max_seq_len_fr_out = max([len(w) for w in data_fr_out])
data_fr_out = pad_sequences(data_fr_out, maxlen = max_seq_len_fr_out, padding="post")


In [None]:
data_en

array([[   42,     0,     0, ...,     0,     0,     0],
       [   42,     0,     0, ...,     0,     0,     0],
       [   42,     0,     0, ...,     0,     0,     0],
       ...,
       [  376,    55,    22, ...,     0,     0,     0],
       [   64,   292,    78, ...,     0,     0,     0],
       [   12,   174,    26, ...,     3, 10182,  3415]], dtype=int32)

In [None]:
data_fr_in

array([[   1,  112,    0, ...,    0,    0,    0],
       [   1,  818,    0, ...,    0,    0,    0],
       [   1, 2544,    0, ...,    0,    0,    0],
       ...,
       [   1, 8763,   48, ...,    0,    0,    0],
       [   1,   43,  158, ...,   15, 2916, 2594],
       [   1,   13,   14, ...,    0,    0,    0]], dtype=int32)

In [None]:
data_fr_out

array([[ 112,    2,    0, ...,    0,    0,    0],
       [ 818,    2,    0, ...,    0,    0,    0],
       [2544,    2,    0, ...,    0,    0,    0],
       ...,
       [8763,   48,   11, ...,    0,    0,    0],
       [  43,  158,   32, ..., 2916, 2594,    2],
       [  13,   14,  265, ...,    0,    0,    0]], dtype=int32)

In [None]:
batch_size = BATCH_SIZE
dataset = tf.data.Dataset.from_tensor_slices((data_en, data_fr_in, data_fr_out))
dataset = dataset.shuffle(10000)
test_size = NUM_SENT_PAIRS // 4
test_dataset = dataset.take(test_size).batch(batch_size, drop_remainder=True)
train_dataset = dataset.skip(test_size).batch(batch_size, drop_remainder=True)

In [None]:
train_dataset

<BatchDataset element_spec=(TensorSpec(shape=(64, 47), dtype=tf.int32, name=None), TensorSpec(shape=(64, 55), dtype=tf.int32, name=None), TensorSpec(shape=(64, 55), dtype=tf.int32, name=None))>

In [None]:
embedding_dim = EMBEDDING_DIM
encoder_dim, decoder_dim = ENCODER_DIM, DECODER_DIM

In [None]:
vocab_size_en = len(en_wordtoindex) + 1
vocab_size_fr = len(fr_wordtoindex) + 1

In [None]:
max_seq_len

47

In [None]:
print(max_seq_len_fr_in)
print(max_seq_len_fr_out)

55
55


In [None]:
encoder = Encoder(vocab_size_en, embedding_dim, max_seq_len, encoder_dim)
decoder = Decoder(vocab_size_fr, embedding_dim, max_seq_len_fr_in, decoder_dim)

In [None]:
optimizer = tf.keras.optimizers.Adam()

@tf.function
def train_step(encoder_in, decoder_in, decoder_out, encoder_state):
  with tf.GradientTape() as tape:
    encoder_out, encoder_state = encoder(encoder_in, encoder_state)
    decoder_state = encoder_state
    decoder_pred, decoder_state = decoder(decoder_in, decoder_state)
    loss = loss_fn(decoder_out, decoder_pred)

  variables = encoder.trainable_variables + decoder.trainable_variables
  gradient = tape.gradient(loss, variables)
  optimizer.apply_gradients(zip(gradient, variables))

  return loss

In [None]:
indtoword_en = tokenizer_en.index_word
indtoword_fr = tokenizer_fr.index_word

In [None]:
def predict(encoder, decoder, batch_size, en_sent, data_en, fra_sents_out, word2idx_fr, idx2word_fr):
  random_id = np.random.choice(len(en_sent))

  encoder_in = tf.expand_dims(data_en[random_id], axis=0)
  decoder_out = tf.expand_dims(fra_sents_out[random_id], axis=0)

  encoder_state = encoder.init_state(1)
  encoder_out, encoder_state = encoder(encoder_in, encoder_state)
  decoder_state = encoder_state
  decoder_in = tf.expand_dims(
        tf.constant([word2idx_fr["bos"]]), axis=0)
  pred_sent_fr = []

  while(True):
    decoder_pred, decoder_state = decoder(decoder_in, decoder_state)
    decoder_pred_index = tf.argmax(decoder_pred, axis = -1)
    pred_word = idx2word_fr[decoder_pred_index.numpy()[0][0]]

    pred_sent_fr.append(pred_word)
    if pred_word == "eos":
        break

    decoder_in = decoder_pred_index

  print("predicted: ", " ".join(pred_sent_fr))

In [None]:
num_epochs = 1

for e in range(num_epochs):
  encoder_state = encoder.init_state(batch_size)

  for batch, data in enumerate(train_dataset):
    encoder_in, decoder_in, decoder_out = data

    loss = train_step(encoder_in, decoder_in, decoder_out, encoder_state)

  predict(encoder, decoder, batch_size, en_sent, data_en, fra_sents_out, fr_wordtoindex, indtoword_fr)

predicted:  je veux me lever tt le matin suivant eos


In [None]:
fr_wordtoindex