In [1]:
import os
#Legacy TensorFlow BackEnd
os.environ['TF_USE_LEGACY_KERAS'] = '1'

In [2]:
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
import pathlib




In [3]:
config={
    "max_vocab_size":5000,
    "max_length":50,
    "BATCH_SIZE":64,
    "Split_Ratio":0.9,
}

In [4]:
path_to_zip = tf.keras.utils.get_file(
    'spa-eng.zip', origin='http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip',
    extract=True)

path_to_file = pathlib.Path(path_to_zip).parent/'spa-eng/spa.txt'

In [5]:
def load_data(path):
  text = path.read_text(encoding='utf-8')

  lines = text.splitlines()
  pairs = [line.split('\t') for line in lines]

  context = np.array([context for target, context in pairs])
  target = np.array([target for target, context in pairs])

  return target, context

In [None]:
context_raw,target_raw = load_data(path_to_file)

If you want to sound like a native speaker, you must be willing to practice saying the same sentence over and over in the same way that banjo players practice the same phrase over and over until they can play it correctly and at the desired tempo.


In [7]:
split_idx = int(config["Split_Ratio"] * len(target_raw))

X_train = context_raw[:split_idx]
y_train = target_raw[:split_idx]

X_val = context_raw[split_idx:]
y_val = target_raw[split_idx:]

BUFFER_SIZE = len(X_train)

train_dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train))
train_dataset = train_dataset.shuffle(BUFFER_SIZE).batch(config['BATCH_SIZE'], drop_remainder=True).prefetch(tf.data.AUTOTUNE)

# Validation dataset (no shuffle needed)
val_dataset = tf.data.Dataset.from_tensor_slices((X_val, y_val))
val_dataset = val_dataset.batch(config['BATCH_SIZE'], drop_remainder=True).prefetch(tf.data.AUTOTUNE)

In [8]:
def tf_lower_and_split_punct_w_special_tokens(text):
  text = tf.strings.lower(text)
  text = tf.strings.regex_replace(text, '[^ a-z.?!,多]', '')
  text = tf.strings.regex_replace(text, '[.?!,多]', r' \0 ')
  text = tf.strings.strip(text)

  text = tf.strings.join(['[START]', text, '[END]'], separator=' ')
  return text

def tf_lower_and_split_punct(text):
  text = tf.strings.lower(text)
  text = tf.strings.regex_replace(text, '[^ a-z.?!,多]', '')
  text = tf.strings.regex_replace(text, '[.?!,多]', r' \0 ')
  text = tf.strings.strip(text)

  return text

In [10]:
context_text_processor = tf.keras.layers.TextVectorization(
    standardize=tf_lower_and_split_punct,
    max_tokens=config['max_vocab_size'],
    output_sequence_length = config['max_length'],
    ragged=False)

target_text_processor = tf.keras.layers.TextVectorization(
    standardize=tf_lower_and_split_punct_w_special_tokens,
    max_tokens=config['max_vocab_size'],
    output_sequence_length = config['max_length'] + 1,
    ragged=False)




In [11]:
context_text_processor.adapt(train_dataset.map(lambda context, target: context))
target_text_processor.adapt(train_dataset.map(lambda context, target: target))




In [12]:
def process_text(context, target):
  context = context_text_processor(context)
  target = target_text_processor(target)
  targ_in = target[:,:-1]
  targ_out = target[:,1:]
  return (context, targ_in), targ_out


train_dataset = train_dataset.map(process_text, num_parallel_calls=tf.data.AUTOTUNE)
val_dataset = val_dataset.map(process_text, num_parallel_calls = tf.data.AUTOTUNE)

In [13]:
def Build_Seq2Seq(max_length=157, vocab_size_en=10000, vocab_size_es=10000, embedding_dim=256, units=512):
    encoder_input = tf.keras.layers.Input(shape=(max_length,), dtype="int32", name='encoder_input')
    
    enc_emb = tf.keras.layers.Embedding(vocab_size_en, embedding_dim, mask_zero=True)(encoder_input)
    enc_emb = tf.keras.layers.Dropout(0.2)(enc_emb)
    
    encoder = tf.keras.layers.Bidirectional(
        tf.keras.layers.LSTM(
            units, 
            return_sequences=True, 
            return_state=True,
            name='encoder_lstm'
        )
    )
    
    encoder_outputs, forward_h, forward_c, backward_h, backward_c = encoder(enc_emb)
    
    encoder_state_h = tf.keras.layers.Concatenate(axis=-1)([forward_h, backward_h])
    encoder_state_c = tf.keras.layers.Concatenate(axis=-1)([forward_c, backward_c])
    
    decoder_input = tf.keras.layers.Input(shape=(max_length,), dtype="int32", name='decoder_input')
    
    dec_emb = tf.keras.layers.Embedding(vocab_size_es, embedding_dim, mask_zero=True)(decoder_input)
    dec_emb = tf.keras.layers.Dropout(0.2)(dec_emb)
    
    decoder_outputs = tf.keras.layers.LSTM(
        units * 2,
        return_sequences=True,
        return_state=False,
        name='decoder_lstm'
    )(dec_emb, initial_state=[encoder_state_h, encoder_state_c])
    
   
    attention_output = tf.keras.layers.MultiHeadAttention(
        num_heads=8,
        key_dim=units * 2, 
        name='cross_attention'
    )(
        query=decoder_outputs, 
        value=encoder_outputs,  
        key=encoder_outputs     
    )
    
    decoder_combined = tf.keras.layers.Concatenate(axis=-1)([decoder_outputs, attention_output])
    
    outputs = tf.keras.layers.Dense(vocab_size_es, activation='softmax', name='output_dense')(decoder_combined)
    
    model = tf.keras.Model(inputs=[encoder_input, decoder_input], outputs=outputs, name='seq2seq')
    return model
vocab_size_en = context_text_processor.vocabulary_size()
vocab_size_fr = target_text_processor.vocabulary_size()
print(f"English vocab size: {vocab_size_en}")
print(f"French vocab size: {vocab_size_fr}")

model = Build_Seq2Seq(
    max_length=config['max_length'], 
    vocab_size_en=vocab_size_en,
    vocab_size_es=vocab_size_fr,
    embedding_dim=256,
    units=256  
  
)

English vocab size: 5000
French vocab size: 5000


In [14]:
model.summary()

Model: "seq2seq"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 encoder_input (InputLayer)  [(None, 50)]                 0         []                            
                                                                                                  
 embedding (Embedding)       (None, 50, 256)              1280000   ['encoder_input[0][0]']       
                                                                                                  
 decoder_input (InputLayer)  [(None, 50)]                 0         []                            
                                                                                                  
 dropout (Dropout)           (None, 50, 256)              0         ['embedding[0][0]']           
                                                                                            

In [15]:
def masked_loss(y_true, y_pred):
    
    loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(
        from_logits=False,
        reduction='none'
    )
    loss = loss_fn(y_true, y_pred) 

    mask = tf.cast(y_true != 0, loss.dtype)
    loss *= mask

    return tf.reduce_sum(loss) / tf.reduce_sum(mask)


def masked_acc(y_true, y_pred):
    
    y_pred = tf.argmax(y_pred, axis=-1)
    y_pred = tf.cast(y_pred, y_true.dtype)

    match = tf.cast(y_true == y_pred, tf.float32)
    
    mask = tf.cast(y_true != 0, tf.float32)

    return tf.reduce_sum(match) / tf.reduce_sum(mask)

In [None]:
model.compile(optimizer='adam',
              loss=masked_loss, 
              metrics=[masked_acc])

In [None]:
history = model.fit(
    train_dataset, 
    epochs=30,
    validation_data=val_dataset,
    callbacks=[
        tf.keras.callbacks.EarlyStopping(
            monitor='val_loss',
            patience=5,
            restore_best_weights=True
        ),
        tf.keras.callbacks.ReduceLROnPlateau(
            monitor='val_loss',
            factor=0.1,          
            patience=3,           
            min_lr=1e-7,          
            verbose=1             
        )
    ]
)

In [None]:
model.save("BI-LSTM-Cross-ATT.keras")

In [24]:
model.load_weights("Model Weights\BI-LSTM-Cross-ATT.h5")

  model.load_weights("Model Weights\BI-LSTM-Cross-ATT.h5")


In [None]:
loaded_model = tf.keras.models.load_model("Model Weights\BI-LSTM-Cross-ATT.keras",compile=False)

In [25]:
(ex_context_tok, ex_tar_in), ex_tar_out = next(iter(val_dataset))
print(ex_context_tok.shape)
print(ex_tar_in[0, :10].numpy()) 
print(ex_tar_out[0, :10].numpy())
print(ex_tar_out.shape)

(64, 50)
[   2   46 4227   14 1118   37    9  598  145    4]
[  46 4227   14 1118   37    9  598  145    4    3]
(64, 50)


In [29]:
def get_initial_state(model, context, target_text_processor):
    if len(context.shape) == 1:
        context = tf.expand_dims(context, 0)
    
    batch_size = tf.shape(context)[0]
    vocab = target_text_processor.get_vocabulary()
    start_token = vocab.index('[START]')
    
    next_token = tf.fill([batch_size, 1], start_token)
    done = tf.zeros([batch_size, 1], dtype=tf.bool)
    
    return next_token, done, context


def get_next_token(model, context, next_token, done, state, target_text_processor, temperature=0.0):
    vocab = target_text_processor.get_vocabulary()
    end_token = vocab.index('[END]')
    
    padded_state = tf.pad(state, [[0, 0], [0, config['max_length'] - tf.shape(state)[1]]])[:, :config['max_length']]
    logits = model.predict([context, padded_state], verbose=0)
    last_logits = logits[:, tf.shape(state)[1] - 1, :]
    
    if temperature == 0.0:
        next_token_id = tf.argmax(last_logits, axis=-1, output_type=tf.int32)
    else:
        next_token_id = tf.squeeze(tf.random.categorical(last_logits / temperature, 1, dtype=tf.int32), -1)
    
    next_token_id = tf.expand_dims(next_token_id, -1)
    done = done | (next_token_id == end_token)
    next_token_id = tf.where(done, tf.constant(0, dtype=tf.int32), next_token_id)
    new_state = tf.concat([state, next_token_id], axis=1)
    
    return next_token_id, done, new_state


def translate(model, spanish_text, target_text_processor, temperature=0.0):
    if len(spanish_text.shape) == 1:
        spanish_text = tf.expand_dims(spanish_text, 0)
    
    next_token, done, context = get_initial_state(model, spanish_text, target_text_processor)
    state = next_token
    tokens = []
    
    for n in range(config['max_length']):
        next_token, done, state = get_next_token(model, context, next_token, done, state, target_text_processor, temperature)
        tokens.append(next_token)
        if tf.reduce_all(done):
            break
    
    tokens = tf.concat(tokens, axis=-1)
    vocab = target_text_processor.get_vocabulary()
    
    words = []
    for token_id in tokens[0].numpy():
        if token_id == 0:
            break
        word = vocab[token_id]
        if word == '[END]':
            break
        if word not in ['[START]', '[UNK]', '']:
            words.append(word)
    
    return ' '.join(words)


def compare_translations(model, spanish_input, target_out, context_text_processor, target_text_processor, n=5):
    spanish_vocab = context_text_processor.get_vocabulary()
    english_vocab = target_text_processor.get_vocabulary()
    
    for i in range(min(n, spanish_input.shape[0])):
        # Spanish input
        sp_words = [spanish_vocab[t] for t in spanish_input[i].numpy() if t > 0]
        spanish = ' '.join(sp_words)
        
        # Ground truth English
        gt_words = [english_vocab[t] for t in target_out[i].numpy() 
                    if t > 0 and english_vocab[t] not in ['[START]', '[END]']]
        ground_truth = ' '.join(gt_words)
        
        # Model translation
        model_output = translate(model, spanish_input[i], target_text_processor)
        
        print(f"\n{i+1}. English: {spanish}")
        print(f"   GROUNDTRUTH: {ground_truth}")
        print(f"   TRANSLATION: {model_output}")

In [30]:

val_iter = iter(val_dataset)

(ex_context_tok, ex_tar_in), ex_tar_out = next(val_iter)
compare_translations(model, ex_context_tok, ex_tar_out, context_text_processor, target_text_processor, n=10)


(ex_context_tok, ex_tar_in), ex_tar_out = next(val_iter)
compare_translations(model, ex_context_tok, ex_tar_out, context_text_processor, target_text_processor, n=10)


1. English: this is the same necklace that i lost yesterday .
   GROUNDTRUTH: este collar es igual al que perd ayer .
   TRANSLATION: eso es no ingls de gatos tres .

2. English: this is the strongest dog that i have ever seen .
   GROUNDTRUTH: este es el perro ms fuerte que jams haya visto .
   TRANSLATION: se es para tom para de si auto en el estado .

3. English: this is your last chance to spend time with tom .
   GROUNDTRUTH: esta es tu ltima oportunidad de pasar tiempo con tom .
   TRANSLATION: se es te pasar cuatro tom edad tiempo le que .

4. English: this material will stand up to lots of [UNK] .
   GROUNDTRUTH: este material aguantar un montn de [UNK] .
   TRANSLATION: se vino ? tom .

5. English: this medicine should be taken every three hours .
   GROUNDTRUTH: este medicamento debe ser tomado cada tres horas .
   TRANSLATION: eso trato decir aos siente parece gato .

6. English: this morning the teacher got very angry with me .
   GROUNDTRUTH: la profesora se enoj mucho co