# Real-time speech translation system

**Task:**
* Speech recognition in the input language;
* Translation into the target language;
* Speech synthesis in the target language;
* Preservation of the speaker's voice characteristics (cloning);
* Support for multiple languages (at least 2 input and 2 output languages).

# Architecture

**Main modules:**

1. **Speech recognition module (ASR)**
   - Model: Whisper (OpenAI)
   - Function: Audio-to-text conversion
   - Supported languages: EN, RU, ES, DE, FR

2. **Machine translation module (NMT)**
   - Models: MarianMT (Helsinki-NLP)
   - Architecture: Transformer-based

3. **Speech synthesis module (TTS)**
   - Model: XTTS v2 (Coqui TTS)
   - Function: Voice cloning + synthesis
   - Feature: Support for multilingual synthesis

4. **Voice analysis module**
   - Library: Librosa
   - Analyzed parameters: Pitch, MFCC, Spectral Centroid

5. **Visualization module**
   - Library: Matplotlib + Seaborn
   - Visualizations: Waveforms, Spectrograms, MFCC, Pitch distribution

**Data processing workflow:**

```
Audio input ‚Üí Recognition ‚Üí Text ‚Üí Translation ‚Üí Synthesis ‚Üí Audio output ‚Üí Voice analysis ‚Üí Visualization
```

**Implementation features**

Long text processing:
- Text division into chunks (200 characters) for TTS
- Sequential processing of sentences for NMT
- Concatenation of audio segments

Memory management:
- Temporary files for intermediate storage
- Automatic resource cleanup
- Audio duration limit (30 seconds)

# Import

In [None]:
!pip install pypinyin > /dev/null 2>&1
!pip install torch torchaudio transformers gradio soundfile librosa numpy matplotlib seaborn > /dev/null 2>&1
!pip install openai-whisper > /dev/null 2>&1
!pip install SpeechRecognition > /dev/null 2>&1
!pip install pydub > /dev/null 2>&1
!pip install TTS > /dev/null 2>&1
!pip install coqui-tts > /dev/null 2>&1

# > ‚Äî output redirection operator.
# /dev/null ‚Äî a special file in Unix-like systems that discards (deletes) all output written to it.
# 2>&1 ‚Äî redirects stderr (standard error stream, file descriptor 2) to stdout (standard output, file descriptor 1).

In [None]:
# Library for working with arrays and mathematical operations
import numpy as np

# Library for working with the operating system (working with files and paths)
import os

# Popular Python library for converting Chinese characters to their corresponding pronunciations
import pypinyin

# Library for working with temporary files
import tempfile

# Main library for machine learning and working with tensors
import torch

# Library for working with audio data (loading, saving, processing)
import torchaudio

# Library for processing and analyzing audio data (feature extraction, conversion)
import librosa

# Library for working with audio files (reading, writing various formats)
import soundfile as sf

# Library for creating web interfaces for machine learning models
import gradio as gr

# Library for automatic speech recognition (Whisper models)
import whisper

# Library for speech synthesis (Text-to-Speech) with support for various models
from TTS.api import TTS

# Library for type annotations (allows you to specify data types for variables and return values)
from typing import List, Tuple

# Library for working with transformer models (translation, text generation)
from transformers import MarianMTModel, MarianTokenizer

# For data visualization
import seaborn as sns
import matplotlib.pyplot as plt
%matplotlib inline

# Disable unnecessary warnings
import warnings
warnings.filterwarnings('ignore')

  re_han_default = re.compile("([\u4E00-\u9FD5a-zA-Z0-9+#&\._%\-]+)", re.U)
  re_skip_default = re.compile("(\r\n|\s)", re.U)
  re_skip = re.compile("([a-zA-Z0-9]+(?:\.\d+)?%?)")


# Determining the translator's class

In [None]:
class RealTimeTranslator:
    def __init__(self):
        # Initialization of models
        print("Loading Whisper model for speech recognition...")
        self.asr_model = whisper.load_model("base")  # Use base for speed

        print("Loading translation models...")
        # Models for translation between languages
        self.translation_models = {
            'en-ru': 'Helsinki-NLP/opus-mt-en-ru',
            'ru-en': 'Helsinki-NLP/opus-mt-ru-en',
            'en-es': 'Helsinki-NLP/opus-mt-en-es',
            'es-en': 'Helsinki-NLP/opus-mt-es-en',
            'en-de': 'Helsinki-NLP/opus-mt-en-de',
            'de-en': 'Helsinki-NLP/opus-mt-de-en',
            'en-fr': 'Helsinki-NLP/opus-mt-en-fr',
            'fr-en': 'Helsinki-NLP/opus-mt-fr-en',
            'ru-es': 'Helsinki-NLP/opus-mt-ru-es',
            'es-ru': 'Helsinki-NLP/opus-mt-es-ru',
            'ru-fr': 'Helsinki-NLP/opus-mt-ru-fr',
            'fr-ru': 'Helsinki-NLP/opus-mt-fr-ru',
            'en-zh': 'Helsinki-NLP/opus-mt-en-zh',
            'zh-en': 'Helsinki-NLP/opus-mt-zh-en',
        }

        print("Loading TTS model for speech synthesis...")
        # Coqui TTS with voice cloning support
        self.tts = TTS("tts_models/multilingual/multi-dataset/xtts_v2", gpu=torch.cuda.is_available())

        self.supported_languages = {
            'english': 'en',
            'russian': 'ru',
            'spanish': 'es',
            'german': 'de',
            'french': 'fr',
            'chinese': 'zh'
        }

        self.whisper_languages = {
            'english': 'en',
            'russian': 'ru',
            'spanish': 'es',
            'german': 'de',
            'french': 'fr',
            'chinese': 'zh'
        }

    def transcribe_audio(self, audio_path: str, language: str) -> str:
        """Speech recognition with Whisper"""
        try:
            # We use codes for Whisper
            lang_code = self.whisper_languages[language.lower()]
            result = self.asr_model.transcribe(audio_path, language=lang_code)
            return result['text']
        except Exception as e:
            print(f"Recognition error: {e}")
            # Trying automatic language detection
            try:
                result = self.asr_model.transcribe(audio_path)
                return result['text']
            except Exception:
                return ""

    def translate_text(self, text: str, source_lang: str, target_lang: str) -> str:
        """Translation of text using MarianMT"""
        try:
            source_code = self.supported_languages[source_lang.lower()]
            target_code = self.supported_languages[target_lang.lower()]

            if source_code == target_code:
                return text

            model_key = f"{source_code}-{target_code}"

            # Direct translation, if available
            if model_key in self.translation_models:
                return self._translate_with_model(text, model_key)

            # For indirect translations, we use English as an intermediate language
            print(f"Using English as pivot language for {source_code}-{target_code}")

            # First, let's translate it into English.
            if source_code != 'en':
                intermediate_key = f"{source_code}-en"
                if intermediate_key in self.translation_models:
                    text = self._translate_with_model(text, intermediate_key)
                else:
                    print(f"No model for translating from {source_code} to en")
                    return text

            # Translate from English into the target language
            if target_code != 'en':
                intermediate_key = f"en-{target_code}"
                if intermediate_key in self.translation_models:
                    text = self._translate_with_model(text, intermediate_key)
                else:
                    print(f"No model for translation from en to {target_code}")

            return text

        except Exception as e:
            print(f"Translation error: {e}")
            return text

    def _translate_with_model(self, text: str, model_key: str) -> str:
        """Auxiliary function for translation with a specific model"""
        try:
            model_name = self.translation_models[model_key]
            tokenizer = MarianTokenizer.from_pretrained(model_name)
            model = MarianMTModel.from_pretrained(model_name)

            # Break long text into parts to avoid overflow
            max_length = 512
            if len(text) > max_length:
                # Simple strategy of breaking by sentences
                sentences = text.split(‚Äò. ‚Äô)
                translated_sentences = []

                for sentence in sentences:
                    if sentence.strip():
                        inputs = tokenizer(sentence, return_tensors="pt", truncation=True, max_length=512)
                        translated = model.generate(**inputs)
                        translated_text = tokenizer.decode(translated[0], skip_special_tokens=True)
                        translated_sentences.append(translated_text)

                return '. '.join(translated_sentences)
            else:
                # Regular translation for short text
                inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=512)
                translated = model.generate(**inputs)
                return tokenizer.decode(translated[0], skip_special_tokens=True)

        except Exception as e:
            print(f"Error in _translate_with_model: {e}")
            return text

    def synthesize_speech(self, text: str, target_lang: str, reference_audio: str) -> str:
        """Speech synthesis with voice cloning"""
        try:
            lang_code = self.supported_languages[target_lang.lower()]

            # Split the text into parts if it is too long
            max_chars = 200  # Safe limit for XTTS
            text_parts = []
            current_part = ""

            for word in text.split():
                if len(current_part) + len(word) + 1 <= max_chars:
                    current_part += " " + word if current_part else word
                else:
                    text_parts.append(current_part)
                    current_part = word

            if current_part:
                text_parts.append(current_part)

            # Create a temporary file for the output audio
            with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as tmp_file:
                output_path = tmp_file.name

            # Process each part of the text
            audio_segments = []
            for part in text_parts:
                if part.strip():  # Skip empty parts
                    with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as part_file:
                        part_path = part_file.name

                    # Generate speech for each part
                    self.tts.tts_to_file(
                        text=part,
                        speaker_wav=reference_audio,
                        language=lang_code,
                        file_path=part_path
                    )

                    # Load the generated audio
                    y, sr = sf.read(part_path)
                    audio_segments.append((y, sr))
                    os.unlink(part_path)  # Delete the temporary file

            # Combine all audio segments
            if audio_segments:
                combined_audio = np.concatenate([seg[0] for seg in audio_segments])
                sf.write(output_path, combined_audio, audio_segments[0][1])
                return output_path

            return None

        except Exception as e:
            print(f"Speech synthesis error: {e}")
            return None

    def analyze_voice_characteristics(self, audio_path: str) -> dict:
        """Voice characteristics analysis"""
        try:
            y, sr = librosa.load(audio_path, sr=None)

            # Main characteristics
            duration = librosa.get_duration(y=y, sr=sr)
            pitch, magnitudes = librosa.piptrack(y=y, sr=sr)
            pitch = pitch[pitch > 0]
            mean_pitch = np.mean(pitch) if len(pitch) > 0 else 0

            # Spectral characteristics
            spectral_centroid = librosa.feature.spectral_centroid(y=y, sr=sr)
            mean_spectral_centroid = np.mean(spectral_centroid)

            mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13)
            mean_mfcc = np.mean(mfcc, axis=1)

            return {
                'duration': duration,
                'mean_pitch': mean_pitch,
                'mean_spectral_centroid': mean_spectral_centroid,
                'mfcc': mean_mfcc,
                'sample_rate': sr
            }

        except Exception as e:
            print(f"Voice analysis error: {e}")
            return {}

    def compare_voices(self, original_audio: str, synthesized_audio: str) -> dict:
        """Comparison of voice characteristics"""
        original_stats = self.analyze_voice_characteristics(original_audio)
        synthesized_stats = self.analyze_voice_characteristics(synthesized_audio)

        comparison = {}
        for key in original_stats:
            if key != 'mfcc': # MFCC compare separately
                if key in synthesized_stats:
                    diff = abs(original_stats[key] - synthesized_stats[key])
                    comparison[key] = {
                        'original': original_stats[key],
                        'synthesized': synthesized_stats[key],
                        'difference': diff,
                        'similarity_percentage': max(0, 100 - (diff / original_stats[key] * 100)) if original_stats[key] != 0 else 0
                    }

        # MFCC comparison
        if 'mfcc' in original_stats and 'mfcc' in synthesized_stats:
            mfcc_diff = np.mean(np.abs(original_stats['mfcc'] - synthesized_stats['mfcc']))
            comparison['mfcc_similarity'] = {
                'difference': mfcc_diff,
                'similarity_percentage': max(0, 100 - (mfcc_diff / np.mean(np.abs(original_stats['mfcc'])) * 100))
            }

        return comparison

    def create_visualizations(self, original_audio: str, synthesized_audio: str, output_dir: str = "."):
        """Creating visualizations for audio comparison"""
        # Loading audio files
        y_orig, sr_orig = librosa.load(original_audio, sr=None)
        y_synth, sr_synth = librosa.load(synthesized_audio, sr=None)

        # Creating graphs
        fig, axes = plt.subplots(3, 2, figsize=(15, 12))

        # Waveform
        axes[0, 0].plot(np.linspace(0, len(y_orig)/sr_orig, len(y_orig)), y_orig)
        axes[0, 0].set_title('Original Audio Waveform')
        axes[0, 0].set_xlabel('Time (s)')
        axes[0, 0].set_ylabel('Amplitude')

        axes[0, 1].plot(np.linspace(0, len(y_synth)/sr_synth, len(y_synth)), y_synth)
        axes[0, 1].set_title('Synthesized Audio Waveform')
        axes[0, 1].set_xlabel('Time (s)')
        axes[0, 1].set_ylabel('Amplitude')

        # Spectrogram
        D_orig = librosa.amplitude_to_db(np.abs(librosa.stft(y_orig)), ref=np.max)
        img_orig = librosa.display.specshow(D_orig, sr=sr_orig, x_axis='time', y_axis='log', ax=axes[1, 0])
        axes[1, 0].set_title('Original Spectrogram')
        fig.colorbar(img_orig, ax=axes[1, 0], format='%+2.0f dB')

        D_synth = librosa.amplitude_to_db(np.abs(librosa.stft(y_synth)), ref=np.max)
        img_synth = librosa.display.specshow(D_synth, sr=sr_synth, x_axis='time', y_axis='log', ax=axes[1, 1])
        axes[1, 1].set_title('Synthesized Spectrogram')
        fig.colorbar(img_synth, ax=axes[1, 1], format='%+2.0f dB')

        # MFCC Comparison
        mfcc_orig = librosa.feature.mfcc(y=y_orig, sr=sr_orig, n_mfcc=13)
        mfcc_synth = librosa.feature.mfcc(y=y_synth, sr=sr_synth, n_mfcc=13)

        axes[2, 0].plot(np.mean(mfcc_orig, axis=1), label='Original', marker='o')
        axes[2, 0].plot(np.mean(mfcc_synth, axis=1), label='Synthesized', marker='x')
        axes[2, 0].set_title('MFCC Comparison')
        axes[2, 0].legend()
        axes[2, 0].set_xlabel('MFCC Coefficients')
        axes[2, 0].set_ylabel('Value')

        # Pitch comparison
        pitch_orig, _ = librosa.piptrack(y=y_orig, sr=sr_orig)
        pitch_synth, _ = librosa.piptrack(y=y_synth, sr=sr_synth)

        pitch_orig_vals = pitch_orig[pitch_orig > 0]
        pitch_synth_vals = pitch_synth[pitch_synth > 0]

        axes[2, 1].hist(pitch_orig_vals, alpha=0.5, label='Original', bins=50)
        axes[2, 1].hist(pitch_synth_vals, alpha=0.5, label='Synthesized', bins=50)
        axes[2, 1].set_title('Pitch Distribution Comparison')
        axes[2, 1].legend()
        axes[2, 1].set_xlabel('Pitch (Hz)')
        axes[2, 1].set_ylabel('Frequency')

        plt.tight_layout()
        plot_path = os.path.join(output_dir, 'voice_comparison.png')
        plt.savefig(plot_path)
        plt.close()

        return plot_path

    def process_audio(self, input_audio, source_lang, target_lang):
        """The main method of audio processing"""
        try:
            if input_audio is None:
                return "Please upload the audio file", "", None, {}, None

            # Remove the check for unavailable pairs, as we are now using intermediate translation.
            source_code = self.supported_languages[source_lang.lower()]
            target_code = self.supported_languages[target_lang.lower()]

            # Save the input audio to a temporary file
            with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as tmp_input:
                input_path = tmp_input.name
                sf.write(input_path, input_audio[1], input_audio[0])

            # Check the audio duration
            duration = librosa.get_duration(filename=input_path)
            if duration > 30:  # 30-second limit
                os.unlink(input_path)
                return "Audio is too long (max 30 seconds)", "", None, {}, None

            # Step 1: Speech recognition
            print("Speech recognition...")
            transcribed_text = self.transcribe_audio(input_path, source_lang)

            if not transcribed_text.strip():
                os.unlink(input_path)
                return "Speech could not be recognized", "", None, {}, None

            # Step 2: Translation of the text
            print("Translation of text...")
            translated_text = self.translate_text(transcribed_text, source_lang, target_lang)

            # Step 3: Speech synthesis with voice cloning
            print("Speech synthesis...")
            synthesized_audio_path = self.synthesize_speech(translated_text, target_lang, input_path)

            if synthesized_audio_path:
                # Loading synthesized audio
                y_synth, sr_synth = sf.read(synthesized_audio_path)
                synthesized_audio = (sr_synth, y_synth)

                # Analyzing and comparing voices
                print("Voice characteristics analysis...")
                voice_comparison = self.compare_voices(input_path, synthesized_audio_path)

                # Creating visualizations
                print("Creating visualizations...")
                plot_path = self.create_visualizations(input_path, synthesized_audio_path)

                # Cleaning temporary files
                os.unlink(input_path)
                os.unlink(synthesized_audio_path)

                return transcribed_text, translated_text, synthesized_audio, voice_comparison, plot_path

            os.unlink(input_path)
            return "Speech synthesis error", "", None, {}, None

        except Exception as e:
            print(f"Processing error: {e}")
            # Delete temporary files in case of error
            if 'input_path' in locals() and os.path.exists(input_path):
                os.unlink(input_path)
            if 'synthesized_audio_path' in locals() and os.path.exists(synthesized_audio_path):
                os.unlink(synthesized_audio_path)
            return f"Error: {str(e)}", "", None, {}, None

In [None]:
# Initialization of the translator
translator = RealTimeTranslator()

Loading Whisper model for speech recognition...


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 139M/139M [00:01<00:00, 124MiB/s]


Loading translation models...
Loading TTS model for speech synthesis...
 > You must confirm the following:
 | > "I have purchased a commercial license from Coqui: licensing@coqui.ai"
 | > "Otherwise, I agree to the terms of the non-commercial CPML: https://coqui.ai/cpml" - [y/n]
 | | > y


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñâ| 1.86G/1.87G [00:41<00:00, 97.1MiB/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 1.87G/1.87G [00:41<00:00, 45.1MiB/s]
4.37kiB [00:00, 30.9kiB/s]

361kiB [00:00, 2.79MiB/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 32.0/32.0 [00:00<00:00, 202iB/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 7.75M/7.75M [00:11<00:00, 37.1MiB/s]

# Gradio interface

In [None]:
def create_interface():
    with gr.Blocks(title="Speech Translator") as demo:
        gr.Markdown("# üéôÔ∏è Speech Translator")
        gr.Markdown("Upload an audio file for translation while preserving voice characteristics")

        with gr.Row():
            with gr.Column():
                audio_input = gr.Audio(label="Input Audio", type="numpy")
                source_lang = gr.Dropdown(
                    choices=["english", "russian", "spanish", "german", "french", "chinese"],
                    label="Source Language",
                    value="english"
                )
                target_lang = gr.Dropdown(
                    choices=["english", "russian", "spanish", "german", "french", "chinese"],
                    label="Target Language",
                    value="russian"
                )
                process_btn = gr.Button("START", variant="primary")

            with gr.Column():
                transcribed_text = gr.Textbox(label="Transcribed Text")
                translated_text = gr.Textbox(label="Translated Text")
                audio_output = gr.Audio(label="Translated Audio", type="numpy")

        with gr.Row():
            with gr.Column():
                gr.Markdown("### Voice Characteristics Comparison")
                voice_stats = gr.JSON(label="Voice Similarity Metrics")

            with gr.Column():
                gr.Markdown("### Audio Visualizations")
                plot_output = gr.Image(label="Voice Comparison Analysis")

        # Event processing
        process_btn.click(
            fn=translator.process_audio,
            inputs=[audio_input, source_lang, target_lang],
            outputs=[transcribed_text, translated_text, audio_output, voice_stats, plot_output]
        )

    return demo

# Test

In [None]:
print("Initializing speech translation system...")
demo = create_interface()
demo.launch(share=True, debug=True)

# Conclusion

The developed **Real-Time Speech Translation with Voice Cloning** system successfully solves the complex task of speech translation while preserving voice characteristics. Key achievements:

**Main functionality:**
- **High-quality speech recognition** via Whisper with support for 5 languages
- **Accurate translation** between language pairs using MarianMT
- **Natural speech synthesis** with voice cloning via XTTS v2
- **Detailed analysis of voice characteristics** with visualization of results

**Technical advantages:**
- **Modular architecture** with clear separation of responsibilities
- **Efficient processing of long texts** through a chunk-based approach
- **Reliable error handling** and resource management
- **Web interface** on Gradio

**Current limitations**
1. **Language limitations**: RU-DE/DE-RU pairs are temporarily unavailable
2. **Audio duration**: 30-second limit for stable operation
3. **Translation quality**: Depends on available MarianMT models

**Conclusion**

The developed system is a **comprehensive solution** for speech translation tasks while preserving voice identity. The system architecture demonstrates an **optimal balance** between processing quality, performance, and ease of use.

The system is ready for practical application in various scenarios:
- **Educational platforms** for language learning
- **International communications** with preservation of voice identity
- **Media production** for content localization
- **Accessibility** for people with disabilities

Further development of the system will focus on **expanding language support**, **improving translation quality**, and **optimizing performance** for mass use.