In [31]:
import logging
import numpy as np
from scipy import signal
import torch
import sounddevice as sd
import soundfile as sf
import simpleaudio as sa
from transformers import WhisperProcessor, WhisperForConditionalGeneration, M2M100ForConditionalGeneration, M2M100Tokenizer, SpeechT5Processor, SpeechT5ForTextToSpeech, SpeechT5HifiGan
from colorama import Fore, Style, init
import gradio as gr
import os

In [2]:
# Initialize colorama
init(autoreset=True)

# Custom logger
class ColoredLogger(logging.Logger):
    def __init__(self, name):
        super().__init__(name)

    def info(self, msg, *args, **kwargs):
        super().info(f"{Fore.GREEN}{msg}{Style.RESET_ALL}", *args, **kwargs)

    def warning(self, msg, *args, **kwargs):
        super().warning(f"{Fore.YELLOW}{msg}{Style.RESET_ALL}", *args, **kwargs)

    def error(self, msg, *args, **kwargs):
        super().error(f"{Fore.RED}{msg}{Style.RESET_ALL}", *args, **kwargs)

In [3]:
# Set up logging
logging.setLoggerClass(ColoredLogger)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

def log_execution(func):
    def wrapper(*args, **kwargs):
        logger.info(f"{Fore.CYAN}Starting: {func.__name__}{Style.RESET_ALL}")
        result = func(*args, **kwargs)
        logger.info(f"{Fore.CYAN}Finished: {func.__name__}{Style.RESET_ALL}")
        return result
    return wrapper

In [4]:
class AudioRecorder:
    def __init__(self, rate=16000, channels=1):
        self.rate = rate
        self.channels = channels

    @log_execution
    def record(self, duration):
        logger.info(f"{Fore.BLUE}Recording for {duration} seconds...{Style.RESET_ALL}")
        recording = sd.rec(int(duration * self.rate), samplerate=self.rate, channels=self.channels)
        sd.wait()
        logger.info(f"{Fore.BLUE}Recording finished{Style.RESET_ALL}")
        return recording

    def save_audio(self, recording, filename):
        sf.write(filename, recording, self.rate)
        logger.info(f"{Fore.BLUE}Audio saved as '{filename}'{Style.RESET_ALL}")

In [5]:
class SpeechRecognizer:
    def __init__(self):
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        logger.info(f"{Fore.MAGENTA}Initializing Whisper model on {self.device}{Style.RESET_ALL}")
        self.processor = WhisperProcessor.from_pretrained("openai/whisper-large-v2")
        self.model = WhisperForConditionalGeneration.from_pretrained("openai/whisper-large-v2").to(self.device)

    @log_execution
    def recognize(self, audio_file, language):
        logger.info(f"{Fore.MAGENTA}Recognizing speech in {language}{Style.RESET_ALL}")
        audio_input, sample_rate = sf.read(audio_file)
        input_features = self.processor(audio_input, sampling_rate=sample_rate, return_tensors="pt").input_features.to(self.device)
        forced_decoder_ids = self.processor.get_decoder_prompt_ids(language=language, task="transcribe")
        predicted_ids = self.model.generate(input_features, forced_decoder_ids=forced_decoder_ids)
        transcription = self.processor.batch_decode(predicted_ids, skip_special_tokens=True)
        return transcription[0]

In [6]:
class Translator:
    def __init__(self):
        logger.info(f"{Fore.YELLOW}Initializing M2M100 translation model{Style.RESET_ALL}")
        self.model = M2M100ForConditionalGeneration.from_pretrained("facebook/m2m100_1.2B")
        self.tokenizer = M2M100Tokenizer.from_pretrained("facebook/m2m100_1.2B")

    @log_execution
    def translate(self, text, source_lang, target_lang):
        logger.info(f"{Fore.YELLOW}Translating from {source_lang} to {target_lang}{Style.RESET_ALL}")
        self.tokenizer.src_lang = source_lang
        encoded_text = self.tokenizer(text, return_tensors="pt", truncation=True, max_length=512)
        generated_tokens = self.model.generate(
            **encoded_text, 
            forced_bos_token_id=self.tokenizer.get_lang_id(target_lang),
            max_length=200,
            num_beams=5,
            length_penalty=0.8,
            no_repeat_ngram_size=3
        )
        translated = self.tokenizer.batch_decode(generated_tokens, skip_special_tokens=True)[0]
        return self.post_process_translation(translated)

    def post_process_translation(self, text):
        sentences = text.split('.')
        unique_sentences = []
        for sentence in sentences:
            sentence = sentence.strip()
            if sentence and sentence not in unique_sentences:
                sentence = sentence.capitalize()
                if not sentence.endswith(('?', '!', '.')):
                    sentence += '.'
                unique_sentences.append(sentence)
        return ' '.join(unique_sentences)

In [7]:
class TextToSpeech:
    def __init__(self):
        logger.info(f"{Fore.GREEN}Initializing SpeechT5 text-to-speech model{Style.RESET_ALL}")
        self.processor = SpeechT5Processor.from_pretrained("microsoft/speecht5_tts")
        self.model = SpeechT5ForTextToSpeech.from_pretrained("microsoft/speecht5_tts")
        self.vocoder = SpeechT5HifiGan.from_pretrained("microsoft/speecht5_hifigan")
        self.speaker_embeddings = torch.randn(1, 512)

    @log_execution
    def synthesize(self, text):
        logger.info(f"{Fore.GREEN}Synthesizing speech{Style.RESET_ALL}")
        inputs = self.processor(text=text, return_tensors="pt")
        speech = self.model.generate_speech(inputs["input_ids"], self.speaker_embeddings, vocoder=self.vocoder)
        return speech.numpy()

    def adjust_speed(self, audio, speed_factor=0.8):
        return np.interp(
            np.arange(0, len(audio), speed_factor),
            np.arange(0, len(audio)),
            audio
        ).astype(audio.dtype)

In [8]:
def play_audio(audio_file):
    wave_obj = sa.WaveObject.from_wave_file(audio_file)
    play_obj = wave_obj.play()
    play_obj.wait_done()

In [9]:
class VoiceTranslator:
    def __init__(self):
        self.recorder = AudioRecorder()
        self.recognizer = SpeechRecognizer()
        self.translator = Translator()
        self.tts = TextToSpeech()

    def get_language(self, prompt):
        while True:
            lang = input(f"{Fore.CYAN}{prompt}{Style.RESET_ALL}").strip().lower()
            if lang in ['en', 'es', 'fr', 'de', 'it', 'pt', 'ru', 'zh', 'ja', 'ko']:
                return lang
            else:
                logger.warning(f"Unsupported language code: {lang}. Please try again.")

    def get_duration(self):
        while True:
            try:
                duration = float(input(f"{Fore.CYAN}Enter recording duration in seconds: {Style.RESET_ALL}"))
                if duration > 0:
                    return duration
                else:
                    logger.warning("Duration must be positive. Please try again.")
            except ValueError:
                logger.warning("Invalid input. Please enter a number.")

    @log_execution
    def translate_speech(self):
        source_lang = self.get_language("Enter the source language code (e.g., 'en' for English): ")
        target_lang = self.get_language("Enter the target language code (e.g., 'es' for Spanish): ")
        duration = self.get_duration()

        audio_data = self.recorder.record(duration)
        self.recorder.save_audio(audio_data, "input_audio.wav")

        text = self.recognizer.recognize("input_audio.wav", language=source_lang)
        logger.info(f"{Fore.WHITE}Recognized text: {text}{Style.RESET_ALL}")

        if not text.strip():
            logger.warning("No speech detected. Please try again.")
            return None

        translated_text = self.translator.translate(text, source_lang, target_lang)
        logger.info(f"{Fore.WHITE}Translated text: {translated_text}{Style.RESET_ALL}")

        audio_output = self.tts.synthesize(translated_text)
        audio_output_slowed = self.tts.adjust_speed(audio_output)
        return audio_output_slowed

In [13]:
translator = VoiceTranslator()
audio_output = translator.translate_speech()
if audio_output is not None:
    sf.write("translated_audio.wav", audio_output, samplerate=16000)
    logger.info(f"{Fore.GREEN}Translation complete. Output saved as 'translated_audio.wav'{Style.RESET_ALL}")
    play_option = input(f"{Fore.CYAN}Do you want to play the translated audio? (y/n): {Style.RESET_ALL}").strip().lower()
    if play_option == 'y':
        play_audio("translated_audio.wav")
else:
    logger.error("Translation failed. Please try again.")

2024-08-12 11:15:26,671 - __main__ - INFO - Initializing Whisper model on cpu
2024-08-12 11:15:28,235 - __main__ - INFO - Initializing M2M100 translation model
2024-08-12 11:15:29,642 - __main__ - INFO - Initializing SpeechT5 text-to-speech model
2024-08-12 11:15:31,453 - __main__ - INFO - Starting: translate_speech
2024-08-12 11:15:37,876 - __main__ - INFO - Starting: record
2024-08-12 11:15:37,882 - __main__ - INFO - Recording for 5.0 seconds...
2024-08-12 11:15:43,102 - __main__ - INFO - Recording finished
2024-08-12 11:15:43,103 - __main__ - INFO - Finished: record
2024-08-12 11:15:43,107 - __main__ - INFO - Audio saved as 'input_audio.wav'
2024-08-12 11:15:43,109 - __main__ - INFO - Starting: recognize
2024-08-12 11:15:43,110 - __main__ - INFO - Recognizing speech in es
2024-08-12 11:16:05,951 - __main__ - INFO - Finished: recognize
2024-08-12 11:16:05,954 - __main__ - INFO - Recognized text:  Buenos días.
2024-08-12 11:16:05,955 - __main__ - INFO - Starting: translate
2024-08-12 

In [32]:
class GradioInterface:
    def __init__(self):
        self.translator = VoiceTranslator()
        self.languages = {
            'English': 'en', 'Spanish': 'es', 'French': 'fr', 'German': 'de', 
            'Italian': 'it', 'Portuguese': 'pt', 'Russian': 'ru', 'Chinese': 'zh', 
            'Japanese': 'ja', 'Korean': 'ko'
        }
        logger.info("GradioInterface initialized")
        self.verify_models()

    def verify_models(self):
        logger.info("Verifying models...")
        assert self.translator.recognizer.model is not None, "Speech recognition model not loaded"
        assert self.translator.translator.model is not None, "Translation model not loaded"
        assert self.translator.tts.model is not None, "Text-to-speech model not loaded"
        logger.info("All models verified successfully")

    def resample_audio(self, audio, orig_sr, target_sr=16000):
        logger.info(f"Resampling audio from {orig_sr} Hz to {target_sr} Hz")
        resampled = signal.resample(audio, int(len(audio) * target_sr / orig_sr))
        return resampled

    def translate(self, audio, source_lang, target_lang):
        try:
            if audio is None:
                logger.warning("No audio detected")
                return "No audio detected. Please try again.", None

            logger.info(f"Translating from {source_lang} to {target_lang}")
            
            # Remuestrear el audio a 16000 Hz
            orig_sr = audio[0]
            audio_data = audio[1]
            if len(audio_data.shape) > 1:
                audio_data = audio_data.mean(axis=1)  # Convertir a mono si es estéreo
            if orig_sr != 16000:
                audio_data = self.resample_audio(audio_data, orig_sr)

            # Normalizar el audio
            audio_data = audio_data / np.max(np.abs(audio_data))

            # Guardar el audio remuestreado en un archivo temporal
            temp_file = "temp_input_audio.wav"
            sf.write(temp_file, audio_data, 16000)
            logger.info(f"Audio saved to {temp_file}")

            source_lang_code = self.languages[source_lang]
            target_lang_code = self.languages[target_lang]
            
            # Reconocimiento de voz
            text = self.translator.recognizer.recognize(temp_file, language=source_lang_code)
            logger.info(f"Recognized text: {text}")
            if not text.strip():
                logger.warning("No speech detected in the audio")
                return "No speech detected. Please try again.", None

            # Traducción
            translated_text = self.translator.translator.translate(text, source_lang_code, target_lang_code)
            logger.info(f"Translated text: {translated_text}")
            
            # Síntesis de voz
            audio_output = self.translator.tts.synthesize(translated_text)
            audio_output_slowed = self.translator.tts.adjust_speed(audio_output)
            
            # Guardar el audio de salida
            output_file = "translated_audio.wav"
            sf.write(output_file, audio_output_slowed, samplerate=16000)
            logger.info(f"Translated audio saved to {output_file}")
            
            # Limpiar archivos temporales
            os.remove(temp_file)
            
            return translated_text, (16000, audio_output_slowed)
        except Exception as e:
            logger.error(f"An error occurred: {str(e)}", exc_info=True)
            return f"An error occurred: {str(e)}", None

    def launch(self):
        iface = gr.Interface(
            fn=self.translate,
            inputs=[
                gr.Audio(type="numpy", label="Input Audio"),
                gr.Dropdown(list(self.languages.keys()), label="Source Language"),
                gr.Dropdown(list(self.languages.keys()), label="Target Language")
            ],
            outputs=[
                gr.Textbox(label="Translated Text"),
                gr.Audio(type="numpy", label="Translated Audio")
            ],
            title="Voice Translator",
            description="Translate speech from one language to another."
        )
        iface.launch()

In [33]:
# Asegúrate de que los modelos se carguen correctamente
if torch.cuda.is_available():
    torch.cuda.empty_cache()

logger.info("Models loaded successfully")

2024-08-12 11:31:57,581 - __main__ - INFO - Models loaded successfully


In [34]:
# Iniciar la interfaz de Gradio
interface = GradioInterface()
interface.launch()

2024-08-12 11:31:58,267 - __main__ - INFO - Initializing Whisper model on cpu
2024-08-12 11:32:01,284 - __main__ - INFO - Initializing M2M100 translation model
2024-08-12 11:32:02,510 - __main__ - INFO - Initializing SpeechT5 text-to-speech model
2024-08-12 11:32:04,701 - __main__ - INFO - GradioInterface initialized
2024-08-12 11:32:04,702 - __main__ - INFO - Verifying models...
2024-08-12 11:32:04,703 - __main__ - INFO - All models verified successfully


Running on local URL:  http://127.0.0.1:7865


2024-08-12 11:32:04,802 - httpx - INFO - HTTP Request: GET http://127.0.0.1:7865/startup-events "HTTP/1.1 200 OK"
2024-08-12 11:32:04,810 - httpx - INFO - HTTP Request: HEAD http://127.0.0.1:7865/ "HTTP/1.1 200 OK"



To create a public link, set `share=True` in `launch()`.
