In [2]:
# STEP 1: Load and prepare the dataset
import pathlib
import numpy as np
import re
import tensorflow as tf
from tensorflow import keras
from keras.models import Model
from keras.layers import Input, LSTM, Embedding, Dense, Attention, Concatenate
from keras.preprocessing.text import Tokenizer
from keras.preprocessing.sequence import pad_sequences

# Load dataset (first 1000 lines)
zip_path = keras.utils.get_file(
    fname="spa-eng.zip",
    origin="http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip",
    extract=True,
)
data_dir = pathlib.Path(zip_path).parent / "spa-eng"
text_file = data_dir / "spa.txt"
with open(text_file, "r", encoding="utf-8") as f:
    lines = f.read().strip().split("\n")[:10000]
sentence_pairs = [line.split("\t") for line in lines]

# STEP 2: Preprocessing
def preprocess_sentence(sentence):
    sentence = sentence.lower().strip()
    sentence = re.sub(r"([?.!,多])", r" \1 ", sentence)
    sentence = re.sub(r'[" "]+', " ", sentence)
    sentence = re.sub(r"[^a-zA-Z?.!,多]+", " ", sentence)
    return sentence.strip()

cleaned_pairs = []
for eng, spa in sentence_pairs:
    eng = preprocess_sentence(eng)
    spa = preprocess_sentence(spa)
    spa = "sos " + spa + " eos"
    cleaned_pairs.append((eng, spa))

# STEP 3: Tokenization
eng_texts, spa_texts = zip(*cleaned_pairs)

eng_tokenizer = Tokenizer(filters='', lower=True)
spa_tokenizer = Tokenizer(filters='', lower=True)
eng_tokenizer.fit_on_texts(eng_texts)
spa_tokenizer.fit_on_texts(spa_texts)
reverse_spa_index = {v: k for k, v in spa_tokenizer.word_index.items()}


eng_seq = eng_tokenizer.texts_to_sequences(eng_texts)
spa_seq = spa_tokenizer.texts_to_sequences(spa_texts)

max_eng_len = max(len(seq) for seq in eng_seq)
max_spa_len = max(len(seq) for seq in spa_seq)

encoder_input = pad_sequences(eng_seq, maxlen=max_eng_len, padding='post')
decoder_input = pad_sequences([seq[:-1] for seq in spa_seq], maxlen=max_spa_len-1, padding='post')
decoder_target = pad_sequences([seq[1:] for seq in spa_seq], maxlen=max_spa_len-1, padding='post')

eng_vocab_size = len(eng_tokenizer.word_index) + 1
spa_vocab_size = len(spa_tokenizer.word_index) + 1

# STEP 4: Build the model (LSTM + Attention)
embedding_dim = 128
latent_dim = 256

# Encoder
encoder_inputs = Input(shape=(None,), name='encoder_input')
encoder_emb = Embedding(eng_vocab_size, embedding_dim)(encoder_inputs)
encoder_lstm, state_h, state_c = LSTM(latent_dim, return_sequences=True, return_state=True)(encoder_emb)

# Decoder
decoder_inputs = Input(shape=(None,), name='decoder_input')
decoder_emb = Embedding(spa_vocab_size, embedding_dim)(decoder_inputs)
decoder_lstm, _, _ = LSTM(latent_dim, return_sequences=True, return_state=True)(decoder_emb, initial_state=[state_h, state_c])

# Attention
attention = Attention()
context_vector = attention([decoder_lstm, encoder_lstm])
decoder_concat = Concatenate(axis=-1)([decoder_lstm, context_vector])

# Output layer
decoder_outputs = Dense(spa_vocab_size, activation='softmax')(decoder_concat)

model = Model([encoder_inputs, decoder_inputs], decoder_outputs)
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
model.summary()

# STEP 5: Train
model.fit([encoder_input, decoder_input], decoder_target,
          batch_size=32,
          epochs=25,
          validation_split=0.2)

model.save("final_nmt_model.keras")



Model: "model_1"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 encoder_input (InputLayer)  [(None, None)]               0         []                            
                                                                                                  
 decoder_input (InputLayer)  [(None, None)]               0         []                            
                                                                                                  
 embedding_2 (Embedding)     (None, None, 128)            294912    ['encoder_input[0][0]']       
                                                                                                  
 embedding_3 (Embedding)     (None, None, 128)            556800    ['decoder_input[0][0]']       
                                                                                            

In [3]:
# Inference setup

from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Concatenate

# Encoder inputs and outputs (from trained model)
encoder_inf_inputs = model.get_layer('encoder_input').input
encoder_outputs, state_h_enc, state_c_enc = model.get_layer('lstm_2').output
encoder_model = Model(encoder_inf_inputs, [encoder_outputs, state_h_enc, state_c_enc])

# Decoder inputs for inference
decoder_state_input_h = Input(shape=(256,))
decoder_state_input_c = Input(shape=(256,))
decoder_hidden_state_input = Input(shape=(None, 256))  # From encoder outputs

decoder_inf_inputs = Input(shape=(1,))

# Use correct embedding and LSTM layer names
dec_emb_layer = model.get_layer('embedding_3')
decoder_lstm_layer = model.get_layer('lstm_3')
attention_layer = model.get_layer('attention_1')
concat_layer = model.get_layer('concatenate_1')
dense_layer = model.get_layer('dense_1')

# Embedding
dec_emb_inf = dec_emb_layer(decoder_inf_inputs)

# Decoder LSTM
decoder_outputs, state_h, state_c = decoder_lstm_layer(
    dec_emb_inf, initial_state=[decoder_state_input_h, decoder_state_input_c]
)

# Attention
attn_out_inf = attention_layer([decoder_outputs, decoder_hidden_state_input])
decoder_concat_inf = concat_layer([decoder_outputs, attn_out_inf])

# Final output layer
decoder_outputs_final = dense_layer(decoder_concat_inf)

# Decoder inference model
decoder_model = Model(
    [decoder_inf_inputs, decoder_hidden_state_input, decoder_state_input_h, decoder_state_input_c],
    [decoder_outputs_final, state_h, state_c]
)


In [4]:
def decode_sequence(input_seq):
    enc_outs, h, c = encoder_model.predict(input_seq)
    target_seq = np.array([[spa_tokenizer.word_index['sos']]])
    decoded_sentence = ''
    stop_condition = False

    while not stop_condition:
        output_tokens, h, c = decoder_model.predict([target_seq, enc_outs, h, c])
        sampled_token_index = np.argmax(output_tokens[0, -1, :])
        sampled_word = reverse_spa_index.get(sampled_token_index, '')

        if sampled_word == 'eos' or len(decoded_sentence.split()) > max_spa_len:
            stop_condition = True
        else:
            decoded_sentence += ' ' + sampled_word

        target_seq = np.array([[sampled_token_index]])

    return decoded_sentence.strip()


In [5]:
def translate(sentence):
    sentence = preprocess_sentence(sentence)
    seq = eng_tokenizer.texts_to_sequences([sentence])
    padded = pad_sequences(seq, maxlen=max_eng_len, padding='post')
    translation = decode_sequence(padded)
    print(f"English: {sentence}")
    print(f"Spanish: {translation}")

translate("I love you.")
translate("Slowly")
translate("What are you doing?")


English: i love you .
Spanish: te amo .
English: slowly
Spanish: termina de aqu .
English: what are you doing ?
Spanish: 多 qu lo sabes ?


In [6]:
import pickle
import json

# Save model
model.save("spanish_translation_model.keras")

# Save tokenizers
with open("eng_tokenizer.json", "w") as f:
    f.write(eng_tokenizer.to_json())

with open("spa_tokenizer.json", "w") as f:
    f.write(spa_tokenizer.to_json())


# Save reverse index
reverse_spa_index = {v: k for k, v in spa_tokenizer.word_index.items()}
with open("reverse_spa_index.pkl", "wb") as f:
    pickle.dump(reverse_spa_index, f)

# Save max sequence lengths
with open("seq_lengths.json", "w") as f:
    json.dump({"max_eng_len": max_eng_len, "max_spa_len": max_spa_len}, f)


In [21]:
def realtime_eng_to_spa(duration=10, chunk=1024):
    """
    Records English speech in real-time, displays recognized text,
    translates to Spanish, and reads it aloud.
    """
    q = queue.Queue()
    recognizer = sr.Recognizer()

    def callback(indata, frames, time_info, status):
        if status:
            print(status, flush=True)
        q.put(indata.copy())

    print("Start speaking (English)...")
    with sd.InputStream(channels=1, samplerate=16000, blocksize=chunk, callback=callback):
        audio_buffer = []
        start_time = time.time()

        while time.time() - start_time < duration:
            try:
                data = q.get(timeout=1)
                audio_buffer.append(data)

                # Process every ~1 second of audio
                if len(audio_buffer) * chunk / 16000 >= 1.0:
                    audio_chunk = np.concatenate(audio_buffer, axis=0).flatten()
                    audio_chunk = (audio_chunk * 32767).astype('int16')
                    audio_obj = sr.AudioData(audio_chunk.tobytes(), 16000, 2)
                    try:
                        text = recognizer.recognize_google(audio_obj, language='en-US')
                        if text:
                            print("You said:", text)
                            # Translate
                            seq = eng_tokenizer.texts_to_sequences([preprocess_sentence(text)])
                            padded = pad_sequences(seq, maxlen=max_eng_len, padding='post')
                            translation = decode_sequence(padded)
                            print("Translation (Spanish):", translation)
                            speak_text(translation)
                    except sr.UnknownValueError:
                        pass
                    # Clear buffer after processing
                    audio_buffer = []

            except queue.Empty:
                pass

    print("Recording stopped.")


In [29]:
# ==========================
# RUN REAL-TIME ENGLISH -> SPANISH TRANSLATOR
# ==========================

# duration=10 means it will record for 10 seconds max
realtime_eng_to_spa(duration=10)


Start speaking (English)...
You said: hi how are you
Translation (Spanish): 多 qu tal ah ?
You said: what are you doing
Translation (Spanish): tienes por mo .
Recording stopped.


In [28]:
# ==========================
# Spanish -> English Live Translator (Demo using Google Translate)
# ==========================

!pip install SpeechRecognition sounddevice pyttsx3 googletrans==4.0.0-rc1 --quiet

import sounddevice as sd
import queue
import numpy as np
import speech_recognition as sr
import pyttsx3
from googletrans import Translator
import time

# Initialize speech recognizer, translator, and TTS
recognizer = sr.Recognizer()
translator = Translator()
tts = pyttsx3.init()

def speak_text(text):
    tts.say(text)
    tts.runAndWait()

def realtime_spa_to_eng(duration=10, chunk=1024):
    """
    Records Spanish speech in real-time, displays recognized text,
    translates to English using Google Translate, and reads it aloud.
    """
    q = queue.Queue()

    def callback(indata, frames, time_info, status):
        if status:
            print(status, flush=True)
        q.put(indata.copy())

    print("Start speaking (Spanish)...")
    with sd.InputStream(channels=1, samplerate=16000, blocksize=chunk, callback=callback):
        audio_buffer = []
        start_time = time.time()

        while time.time() - start_time < duration:
            try:
                data = q.get(timeout=1)
                audio_buffer.append(data)

                # Process every ~1 second of audio
                if len(audio_buffer) * chunk / 16000 >= 1.0:
                    audio_chunk = np.concatenate(audio_buffer, axis=0).flatten()
                    audio_chunk = (audio_chunk * 32767).astype('int16')
                    audio_obj = sr.AudioData(audio_chunk.tobytes(), 16000, 2)
                    try:
                        # Recognize Spanish speech
                        text = recognizer.recognize_google(audio_obj, language='es-ES')
                        if text:
                            print("You said (Spanish):", text)
                            # Translate to English
                            translation = translator.translate(text, src='es', dest='en').text
                            print("Translation (English):", translation)
                            speak_text(translation)
                    except sr.UnknownValueError:
                        pass
                    # Clear buffer after processing
                    audio_buffer = []

            except queue.Empty:
                pass

    print("Recording stopped.")

# ==========================
# Example usage: record 10 seconds
# ==========================
realtime_spa_to_eng(duration=10)


  DEPRECATION: Building 'googletrans' using the legacy setup.py bdist_wheel mechanism, which will be removed in a future version. pip 25.3 will enforce this behaviour change. A possible replacement is to use the standardized build interface by setting the `--use-pep517` option, (possibly combined with `--no-build-isolation`), or adding a `pyproject.toml` file to the source tree of 'googletrans'. Discussion can be found at https://github.com/pypa/pip/issues/6334


Start speaking (Spanish)...
You said (Spanish): hola
Translation (English): hello
Recording stopped.
