In [4]:
!pip install tensorflow==2.10


Collecting tensorflow==2.10
  Downloading tensorflow-2.10.0-cp310-cp310-win_amd64.whl.metadata (3.1 kB)
Collecting gast<=0.4.0,>=0.2.1 (from tensorflow==2.10)
  Downloading gast-0.4.0-py3-none-any.whl.metadata (1.1 kB)
Collecting keras-preprocessing>=1.1.1 (from tensorflow==2.10)
  Downloading Keras_Preprocessing-1.1.2-py2.py3-none-any.whl.metadata (1.9 kB)
Collecting protobuf<3.20,>=3.9.2 (from tensorflow==2.10)
  Downloading protobuf-3.19.6-cp310-cp310-win_amd64.whl.metadata (806 bytes)
Collecting tensorboard<2.11,>=2.10 (from tensorflow==2.10)
  Downloading tensorboard-2.10.1-py3-none-any.whl.metadata (1.9 kB)
Collecting tensorflow-estimator<2.11,>=2.10.0 (from tensorflow==2.10)
  Downloading tensorflow_estimator-2.10.0-py2.py3-none-any.whl.metadata (1.3 kB)
Collecting keras<2.11,>=2.10.0 (from tensorflow==2.10)
  Downloading keras-2.10.0-py2.py3-none-any.whl.metadata (1.3 kB)
Collecting google-auth<3,>=1.6.3 (from tensorboard<2.11,>=2.10->tensorflow==2.10)
  Downloading google_auth

  You can safely remove it manually.
  You can safely remove it manually.


In [5]:
import tensorflow as tf
import numpy as np
import os
import re
import io
import time
import zipfile

In [None]:
from tensorflow.python.client import device_lib
print(device_lib.list_local_devices())

In [6]:
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))


Num GPUs Available:  0


In [4]:
#Download Dataset (English–French for example)
path_to_zip = tf.keras.utils.get_file(
    'fra-eng.zip',
    origin='http://storage.googleapis.com/download.tensorflow.org/data/fra-eng.zip',
    extract=True
)

# Dynamically get the full path to the unzipped file
extracted_folder = os.path.splitext(path_to_zip)[0]  # removes .zip
# path_to_file = os.path.join(os.path.dirname(path_to_zip), "fra-eng", "fra.txt")
path_to_file = r"C:\Users\Lenovo\.keras\datasets\fra-eng_extracted\fra.txt"


print("Using dataset at:", path_to_file)

Using dataset at: C:\Users\Lenovo\.keras\datasets\fra-eng_extracted\fra.txt


In [5]:
# Load and Clean Data
def preprocess_sentence(sentence):
    sentence = sentence.lower().strip()
    sentence = re.sub(r"([?.!,¿])", r" \1 ", sentence)
    sentence = re.sub(r'[" "]+', " ", sentence)
    sentence = re.sub(r"[^a-zA-Z?.!,¿]+", " ", sentence)
    sentence = sentence.strip()
    sentence = '<start> ' + sentence + ' <end>'
    return sentence

def create_dataset(path, num_examples=None):
    lines = io.open(path, encoding='UTF-8').read().strip().split('\n')
    word_pairs = [[preprocess_sentence(w) for w in l.split('\t')[:2]] for l in lines[:num_examples]]
    return zip(*word_pairs)

en_sentences, fr_sentences = create_dataset(path_to_file, 30000)


In [6]:
#Tokenize and Pad
def tokenize(lang):
    lang_tokenizer = tf.keras.preprocessing.text.Tokenizer(filters='')
    lang_tokenizer.fit_on_texts(lang)
    tensor = lang_tokenizer.texts_to_sequences(lang)
    tensor = tf.keras.preprocessing.sequence.pad_sequences(tensor, padding='post')
    return tensor, lang_tokenizer

input_tensor, inp_lang = tokenize(fr_sentences)
target_tensor, targ_lang = tokenize(en_sentences)

max_length_inp = input_tensor.shape[1]
max_length_targ = target_tensor.shape[1]

input_tensor_train, target_tensor_train = input_tensor, target_tensor


In [7]:
# Build Encoder & Decoder
BUFFER_SIZE = len(input_tensor_train)
BATCH_SIZE = 64
# steps_per_epoch = len(input_tensor_train) // BATCH_SIZE
steps_per_epoch = 50 # 100 or 50 for quick testing

# embedding_dim = 256
# units = 1024
# To speed up reduce units
embedding_dim = 256
units = 512

vocab_inp_size = len(inp_lang.word_index) + 1
vocab_tar_size = len(targ_lang.word_index) + 1

# dataset = tf.data.Dataset.from_tensor_slices((input_tensor_train, target_tensor_train)).shuffle(BUFFER_SIZE)
# dataset = dataset.batch(BATCH_SIZE, drop_remainder=True)
# To speed up training
dataset = tf.data.Dataset.from_tensor_slices((input_tensor, target_tensor))
dataset = dataset.cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)


class Encoder(tf.keras.Model):
    def __init__(self, vocab_size, embedding_dim, enc_units, batch_sz):
        super().__init__()
        self.batch_sz = batch_sz
        self.enc_units = enc_units
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.gru = tf.keras.layers.GRU(self.enc_units,
                                       return_sequences=True,
                                       return_state=True,
                                       recurrent_initializer='glorot_uniform')

    def call(self, x, hidden):
        x = self.embedding(x)
        output, state = self.gru(x, initial_state=hidden)
        return output, state

    def initialize_hidden_state(self):
        return tf.zeros((self.batch_sz, self.enc_units))


In [8]:
class BahdanauAttention(tf.keras.layers.Layer):
    def __init__(self, units):
        super().__init__()
        self.W1 = tf.keras.layers.Dense(units)
        self.W2 = tf.keras.layers.Dense(units)
        self.V = tf.keras.layers.Dense(1)

    def call(self, query, values):
        query_with_time_axis = tf.expand_dims(query, 1)
        score = self.V(tf.nn.tanh(self.W1(query_with_time_axis) + self.W2(values)))
        attention_weights = tf.nn.softmax(score, axis=1)
        context_vector = attention_weights * values
        context_vector = tf.reduce_sum(context_vector, axis=1)
        return context_vector, attention_weights

In [9]:
class Decoder(tf.keras.Model):
    def __init__(self, vocab_size, embedding_dim, dec_units, batch_sz):
        super().__init__()
        self.batch_sz = batch_sz
        self.dec_units = dec_units
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.gru = tf.keras.layers.GRU(self.dec_units,
                                       return_sequences=True,
                                       return_state=True,
                                       recurrent_initializer='glorot_uniform')
        self.fc = tf.keras.layers.Dense(vocab_size)
        self.attention = BahdanauAttention(self.dec_units)

    def call(self, x, hidden, enc_output):
        context_vector, attention_weights = self.attention(hidden, enc_output)
        x = self.embedding(x)
        x = tf.concat([tf.expand_dims(context_vector, 1), x], axis=-1)
        output, state = self.gru(x)
        output = tf.reshape(output, (-1, output.shape[2]))
        x = self.fc(output)
        return x, state, attention_weights


In [10]:
# Training
optimizer = tf.keras.optimizers.Adam()
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none')

def loss_function(real, pred):
    mask = tf.math.logical_not(tf.math.equal(real, 0))
    loss_ = loss_object(real, pred)
    mask = tf.cast(mask, dtype=loss_.dtype)
    return tf.reduce_mean(loss_ * mask)

encoder = Encoder(vocab_inp_size, embedding_dim, units, BATCH_SIZE)
decoder = Decoder(vocab_tar_size, embedding_dim, units, BATCH_SIZE)

@tf.function
def train_step(inp, targ, enc_hidden):
    loss = loss.to(device)  # ensure on CUDA 0
    with tf.GradientTape() as tape:
        enc_output, enc_hidden = encoder(inp, enc_hidden)
        dec_hidden = enc_hidden
        dec_input = tf.expand_dims([targ_lang.word_index['<start>']] * BATCH_SIZE, 1)

        for t in range(1, targ.shape[1]):
            predictions, dec_hidden, _ = decoder(dec_input, dec_hidden, enc_output)
            loss += loss_function(targ[:, t], predictions)
            dec_input = tf.expand_dims(targ[:, t], 1)

    batch_loss = loss.to(device)  # ensure on CUDA loss / int(targ.shape[1])
    variables = encoder.trainable_variables + decoder.trainable_variables
    gradients = tape.gradient(loss, variables)
    optimizer.apply_gradients(zip(gradients, variables))
    return batch_loss

EPOCHS = 10

# for epoch in range(EPOCHS):
#     start = time.time()
#     enc_hidden = encoder.initialize_hidden_state()
#     total_loss = loss.to(device)  # ensure on CUDA 0

#     for (batch, (inp, targ)) in enumerate(dataset.take(steps_per_epoch)):
#         batch_loss = loss.to(device)  # ensure on CUDA train_step(inp, targ, enc_hidden)
#         total_loss += batch_loss
#     print(f'Epoch {epoch+1}, Loss: {total_loss/steps_per_epoch:.4f}, Time: {time.time()-start:.2f}s')

@tf.function
def run_epoch():
    total_loss = loss.to(device)  # ensure on CUDA 0.0
    enc_hidden = encoder.initialize_hidden_state()
    for (batch, (inp, targ)) in enumerate(dataset.take(steps_per_epoch)):
        batch_loss = loss.to(device)  # ensure on CUDA train_step(inp, targ, enc_hidden)
        total_loss += batch_loss
    return total_loss

for epoch in range(EPOCHS):
    start = time.time()
    total_loss = loss.to(device)  # ensure on CUDA run_epoch()
    print(f'Epoch {epoch+1}, Loss: {total_loss/steps_per_epoch:.4f}, Time: {time.time()-start:.2f}s')


Epoch 1, Loss: 2.9685, Time: 37.54s
Epoch 2, Loss: 2.3750, Time: 26.41s
Epoch 3, Loss: 2.1951, Time: 27.29s
Epoch 4, Loss: 2.0886, Time: 27.15s
Epoch 5, Loss: 1.9876, Time: 27.64s
Epoch 6, Loss: 1.8746, Time: 27.22s
Epoch 7, Loss: 1.8120, Time: 27.27s
Epoch 8, Loss: 1.7449, Time: 27.19s
Epoch 9, Loss: 1.6853, Time: 27.20s
Epoch 10, Loss: 1.6369, Time: 27.27s


In [11]:
#Translation on User Input
def evaluate(sentence):
    sentence = preprocess_sentence(sentence)
    inputs = [inp_lang.word_index.get(i, 0) for i in sentence.split(' ')]
    inputs = tf.keras.preprocessing.sequence.pad_sequences([inputs], maxlen=max_length_inp, padding='post')
    inputs = tf.convert_to_tensor(inputs)

    result = ''
    enc_hidden = tf.zeros((1, units))
    enc_out, enc_hidden = encoder(inputs, enc_hidden)

    dec_hidden = enc_hidden
    dec_input = tf.expand_dims([targ_lang.word_index['<start>']], 0)

    for _ in range(max_length_targ):
        predictions, dec_hidden, attention_weights = decoder(dec_input, dec_hidden, enc_out)
        predicted_id = tf.argmax(predictions[0]).numpy()

        result += targ_lang.index_word.get(predicted_id, '') + ' '

        if targ_lang.index_word.get(predicted_id) == '<end>':
            break

        dec_input = tf.expand_dims([predicted_id], 0)

    return result.strip()

def translate(sentence):
    result = evaluate(sentence)
    print(f'Input: {sentence}')
    print(f'Translated: {result}')


In [12]:
# Try user input
translate("bonjour")   # Example French word input


Input: bonjour
Translated: it s a lot . <end>
