<a href="https://colab.research.google.com/github/brunombo/Python/blob/master/long_TTS_xtts_V6.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# TTS XTTS v2 - Long Audio Generator v6

**Version 4.0** - Compatible PyTorch 2.9+ (Colab 2026)

Fonctionnalites:
- Generation audio longue duree (> 1 heure)
- Fix torchcodec/torchaudio pour PyTorch 2.9+
- Chunking intelligent par paragraphes
- Crossfade entre chunks
- Barre de progression avec ETA
- Support Google Drive

**Auteur:** Bruno | **Corrections:** Gemini, Claude

In [None]:
# Installation des dependances
# --------------------------------------------------------------
# Remarque importante (PyTorch>=2.9) :
# - Coqui TTS exige la bibliotheque `torchcodec` pour l'I/O audio.  (cf. message d'erreur)
# - La version de torchcodec doit etre compatible avec votre version de torch.
#
# Sources (documentation officielle) :
# - coqui-tts: installer torch, torchaudio et (seulement pour torch>=2.9) torchcodec.
# - torchcodec: table de compatibilite torch <-> torchcodec + note CUDA/CPU.

!pip install -q -U pip
!pip install -q numpy==2.0.2 scipy soundfile noisereduce
!pip install -q -U coqui-tts


# Installer torchcodec dans une version compatible avec torch (et CUDA si detecte)
import sys, subprocess, re
from google.colab import userdata

try:
    import torch
except Exception as e:
    raise RuntimeError(
        "PyTorch (torch) n'est pas importable. Installez d'abord torch/torchaudio, "
        "puis relancez cette cellule."
    ) from e


def _torch_major_minor(ver: str) -> str:
    base = ver.split("+")[0]
    parts = base.split(".")
    return ".".join(parts[:2]) if len(parts) >= 2 else base


torch_ver = torch.__version__
mm = _torch_major_minor(torch_ver)

# Mapping base sur la table de compatibilite officielle torchcodec.
if mm == "2.10":
    torchcodec_spec = "torchcodec==0.10.*"
elif mm == "2.9":
    torchcodec_spec = "torchcodec==0.9.*"
elif mm == "2.8":
    torchcodec_spec = "torchcodec==0.7.*"
else:
    torchcodec_spec = "torchcodec"

# Si votre torch est un build CUDA (ex: 2.9.0+cu126), on tente d'installer torchcodec
# depuis l'index PyTorch correspondant. Sinon, on installe la version CPU depuis PyPI.
index_url = None
if "+" in torch_ver:
    build = torch_ver.split("+", 1)[1]
    if build.startswith("cu"):
        index_url = f"https://download.pytorch.org/whl/{build}"

print(
    f"torch = {torch_ver} -> installation de {torchcodec_spec}"
    + (f" via {index_url}" if index_url else " (CPU PyPI)")
)


def _pip_install_torchcodec():
    cmd = [sys.executable, "-m", "pip", "install", "-q", "-U", torchcodec_spec]
    if index_url:
        cmd += ["--index-url", index_url]
    subprocess.check_call(cmd)


try:
    _pip_install_torchcodec()
except Exception as e:
    # Fallback : essayer sans index_url (CPU PyPI).
    if index_url:
        print(f"‚ö†Ô∏è Echec avec l'index PyTorch ({index_url}). Tentative CPU via PyPI‚Ä¶")
        index_url = None
        _pip_install_torchcodec()
    else:
        raise

# Verification (metadonnees pip)
import importlib.util, importlib.metadata

print("torchcodec detecte:", importlib.util.find_spec("torchcodec") is not None)
print("torchcodec version:", importlib.metadata.version("torchcodec"))

torch = 2.9.0+cpu -> installation de torchcodec==0.9.* (CPU PyPI)
torchcodec detecte: True
torchcodec version: 0.9.1


In [7]:
# -*- coding: utf-8 -*-
"""
TTS XTTS v2 - Version Long Audio v4
====================================

Module de synthese vocale haute qualite utilisant Coqui XTTS v2.
Compatible avec PyTorch 2.9+ (fix torchcodec/torchaudio).

Auteur: Bruno
Date: Janvier 2026
Corrections: Gemini, Claude
"""

# ==============================================================================
# IMPORTS STANDARDS (APRES LE FIX)
# ==============================================================================

import os
import re
import gc
import wave
import time
import hashlib
import warnings
import inspect
from pathlib import Path
from typing import Optional, List
from dataclasses import dataclass
import numpy as np

warnings.filterwarnings("ignore", category=UserWarning)

# ==============================================================================
# TORCHAUDIO FIX - Backend soundfile
# ==============================================================================


def _patch_torchaudio():
    """
    Patch torchaudio.load pour utiliser le backend soundfile au lieu de torchcodec.
    Resout l'erreur: "Could not load libtorchcodec" sur Colab avec PyTorch 2.9+.
    """
    try:
        import torchaudio

        # Verifier si deja patche
        if hasattr(torchaudio, "_original_load_patched"):
            return

        # Sauvegarder la fonction originale
        _original_load = torchaudio.load

        def _patched_load(filepath, *args, **kwargs):
            """
            Version patchee de torchaudio.load qui utilise soundfile comme backend.
            """
            # Forcer le backend soundfile si non specifie
            if "backend" not in kwargs:
                kwargs["backend"] = "soundfile"

            try:
                return _original_load(filepath, *args, **kwargs)
            except Exception as e:
                # Si soundfile echoue, essayer sans specifier de backend
                if "backend" in kwargs:
                    del kwargs["backend"]
                    try:
                        return _original_load(filepath, *args, **kwargs)
                    except:
                        pass
                raise e

        # Appliquer le patch
        torchaudio.load = _patched_load
        torchaudio._original_load_patched = True
        print("‚úì Patch torchaudio applique (backend: soundfile)")

    except ImportError:
        pass
    except Exception as e:
        print(f"‚ö†Ô∏è Impossible de patcher torchaudio: {e}")


# Appliquer le patch torchaudio
_patch_torchaudio()

# ==============================================================================
# CONFIGURATION
# ==============================================================================


@dataclass
class TTSConfig:
    """Configuration globale du module TTS."""

    MODEL_NAME: str = "tts_models/multilingual/multi-dataset/xtts_v2"
    SAMPLE_RATE: int = 24000
    DEFAULT_LANGUAGE: str = "fr"
    GDRIVE_FOLDER: str = "/content/drive/MyDrive/TTS_Output"
    MAX_CHARS_PER_CHUNK: int = 500
    CROSSFADE_DURATION: float = 0.05
    ENABLE_TEXT_SPLITTING: bool = True
    PRESET_VOICES: dict = None

    def __post_init__(self):
        self.PRESET_VOICES = {
            "female_fr": "https://huggingface.co/spaces/coqui/xtts/resolve/main/examples/female.wav",
            "male_fr": "https://huggingface.co/spaces/coqui/xtts/resolve/main/examples/male.wav",
        }


Config = TTSConfig()

# ==============================================================================
# DEVICE MANAGEMENT
# ==============================================================================

_device = None
_device_name = "cpu"


def detect_device():
    """Detecte le meilleur device disponible."""
    global _device, _device_name
    import torch

    # Essayer TPU
    try:
        import torch_xla.core.xla_model as xm

        _device = xm.xla_device()
        _device_name = "tpu"
        print(f"‚öôÔ∏è Device: TPU")
        return
    except:
        pass

    # Essayer CUDA
    if torch.cuda.is_available():
        _device = torch.device("cuda")
        _device_name = f"cuda ({torch.cuda.get_device_name(0)})"
        print(f"‚öôÔ∏è Device: {_device_name}")
        return

    # Fallback CPU
    _device = torch.device("cpu")
    _device_name = "cpu"
    print(f"‚öôÔ∏è Device: CPU")


# ==============================================================================
# TEXT SPLITTING UTILITIES
# ==============================================================================


class TextSplitter:
    """Utilitaire pour decouper intelligemment les textes longs."""

    @staticmethod
    def estimate_audio_duration(text: str, chars_per_second: float = 15.0) -> float:
        """Estime la duree audio en secondes."""
        return len(text) / chars_per_second

    @staticmethod
    def split_into_sentences(text: str) -> List[str]:
        """Decoupe le texte en phrases."""
        pattern = r"(?<=[.!?])\s+"
        sentences = re.split(pattern, text)
        return [s.strip() for s in sentences if s.strip()]

    @staticmethod
    def split_into_paragraphs(text: str) -> List[str]:
        """Decoupe le texte en paragraphes."""
        paragraphs = re.split(r"\n\s*\n", text)
        return [p.strip() for p in paragraphs if p.strip()]

    @classmethod
    def split_for_long_audio(
        cls, text: str, max_chars: int = 500, preserve_sentences: bool = True
    ) -> List[str]:
        """Decoupe le texte pour generation audio longue."""
        if len(text) <= max_chars:
            return [text]

        chunks = []
        if preserve_sentences:
            sentences = cls.split_into_sentences(text)
            current_chunk = ""

            for sentence in sentences:
                if len(sentence) > max_chars:
                    if current_chunk:
                        chunks.append(current_chunk.strip())
                        current_chunk = ""
                    # Decouper la phrase trop longue par mots
                    words = sentence.split()
                    sub_chunk = ""
                    for word in words:
                        if len(sub_chunk) + len(word) + 1 <= max_chars:
                            sub_chunk += " " + word if sub_chunk else word
                        else:
                            if sub_chunk:
                                chunks.append(sub_chunk.strip())
                            sub_chunk = word
                    if sub_chunk:
                        current_chunk = sub_chunk
                elif len(current_chunk) + len(sentence) + 1 <= max_chars:
                    current_chunk += " " + sentence if current_chunk else sentence
                else:
                    if current_chunk:
                        chunks.append(current_chunk.strip())
                    current_chunk = sentence

            if current_chunk:
                chunks.append(current_chunk.strip())
        else:
            for i in range(0, len(text), max_chars):
                chunks.append(text[i : i + max_chars])

        return chunks


# ==============================================================================
# AUDIO PROCESSING
# ==============================================================================


class AudioProcessor:
    """Processeur audio pour post-traitement et concatenation."""

    @staticmethod
    def normalize(audio: np.ndarray, target_db: float = -3.0) -> np.ndarray:
        """Normalise l'audio au niveau cible."""
        if audio.dtype == np.int16:
            audio = audio.astype(np.float32) / 32768.0
        peak = np.max(np.abs(audio))
        if peak > 0:
            target_linear = 10 ** (target_db / 20)
            audio = audio * (target_linear / peak)
        return np.clip(audio, -1.0, 1.0)

    @staticmethod
    def crossfade(
        audio1: np.ndarray, audio2: np.ndarray, sample_rate: int, duration: float = 0.05
    ) -> np.ndarray:
        """Concatene deux segments audio avec crossfade."""
        if audio1.dtype == np.int16:
            audio1 = audio1.astype(np.float32) / 32768.0
        if audio2.dtype == np.int16:
            audio2 = audio2.astype(np.float32) / 32768.0

        fade_samples = int(sample_rate * duration)
        if len(audio1) < fade_samples or len(audio2) < fade_samples:
            return np.concatenate([audio1, audio2])

        fade_out = np.linspace(1.0, 0.0, fade_samples)
        fade_in = np.linspace(0.0, 1.0, fade_samples)
        audio1_end = audio1[-fade_samples:] * fade_out
        audio2_start = audio2[:fade_samples] * fade_in

        return np.concatenate(
            [audio1[:-fade_samples], audio1_end + audio2_start, audio2[fade_samples:]]
        )

    @classmethod
    def concatenate_chunks(
        cls,
        audio_chunks: List[np.ndarray],
        sample_rate: int,
        crossfade_duration: float = 0.05,
    ) -> np.ndarray:
        """Concatene plusieurs chunks audio avec crossfade."""
        if not audio_chunks:
            return np.array([], dtype=np.float32)
        if len(audio_chunks) == 1:
            audio = audio_chunks[0]
            if audio.dtype == np.int16:
                audio = audio.astype(np.float32) / 32768.0
            return audio

        result = audio_chunks[0]
        if result.dtype == np.int16:
            result = result.astype(np.float32) / 32768.0
        for chunk in audio_chunks[1:]:
            result = cls.crossfade(result, chunk, sample_rate, crossfade_duration)
        return result

    @staticmethod
    def enhance(
        audio: np.ndarray, sample_rate: int, normalize: bool = True, warmth: bool = True
    ) -> np.ndarray:
        """Ameliore l'audio avec normalisation et warmth."""
        if audio.dtype == np.int16:
            audio = audio.astype(np.float32) / 32768.0

        # Ajouter de la chaleur (boost basses frequences)
        if warmth:
            try:
                from scipy import signal

                nyquist = sample_rate / 2
                cutoff = min(300, nyquist * 0.9) / nyquist
                b, a = signal.butter(2, cutoff, btype="low")
                bass = signal.filtfilt(b, a, audio)
                audio = audio + 0.15 * bass
            except ImportError:
                pass

        # Normaliser
        if normalize:
            peak = np.max(np.abs(audio))
            if peak > 0:
                target = 10 ** (-3.0 / 20)
                audio = audio * (target / peak)

        return np.clip(audio, -1.0, 1.0)


# ==============================================================================
# PROGRESS TRACKER
# ==============================================================================


class ProgressTracker:
    """Suivi de progression avec estimation du temps restant."""

    def __init__(self, total: int, description: str = ""):
        self.total = total
        self.current = 0
        self.description = description
        self.start_time = time.time()
        self.chunk_times = []

    def update(self, chunk_duration: float = None):
        """Met a jour la progression."""
        self.current += 1
        if chunk_duration:
            self.chunk_times.append(chunk_duration)
        self._display()

    def _display(self):
        """Affiche la barre de progression."""
        elapsed = time.time() - self.start_time
        percent = (self.current / self.total) * 100

        if self.chunk_times:
            avg_time = np.mean(self.chunk_times)
            remaining = avg_time * (self.total - self.current)
            eta_str = self._format_time(remaining)
        else:
            eta_str = "..."

        bar_length = 30
        filled = int(bar_length * self.current / self.total)
        bar = "‚ñà" * filled + "‚ñë" * (bar_length - filled)
        elapsed_str = self._format_time(elapsed)

        print(
            f"\r{self.description} [{bar}] {self.current}/{self.total} "
            f"({percent:.1f}%) | Temps: {elapsed_str} | ETA: {eta_str}",
            end="",
        )

        if self.current >= self.total:
            print()

    @staticmethod
    def _format_time(seconds: float) -> str:
        """Formate les secondes en HH:MM:SS."""
        hours = int(seconds // 3600)
        minutes = int((seconds % 3600) // 60)
        secs = int(seconds % 60)
        if hours > 0:
            return f"{hours:02d}:{minutes:02d}:{secs:02d}"
        return f"{minutes:02d}:{secs:02d}"


# ==============================================================================
# TTS ENGINE
# ==============================================================================

_tts_model = None
_voices_cache = {}
os.environ["COQUI_TOS_AGREED"] = "1"


def get_model():
    """Charge le modele XTTS v2 avec cache."""
    global _tts_model

    if _tts_model is None:
        print("üîÑ Chargement du modele XTTS v2...")

        from TTS.api import TTS

        _tts_model = TTS(Config.MODEL_NAME)

        # Deplacement sur GPU (selon la version, .to() peut etre sur le wrapper ou sur le sous-modele)
        if _device is not None and _device_name.startswith("cuda"):
            try:
                if hasattr(_tts_model, "to"):
                    _tts_model = _tts_model.to(_device)
                elif hasattr(_tts_model, "tts_model") and hasattr(
                    _tts_model.tts_model, "to"
                ):
                    _tts_model.tts_model = _tts_model.tts_model.to(_device)
            except Exception as e:
                print(f"‚ö†Ô∏è Impossible de deplacer le modele sur CUDA: {e}")

        print("‚úì Modele charge")

    return _tts_model


def get_voice_path(voice: str) -> str:
    """Obtient le chemin vers un fichier de voix."""
    global _voices_cache
    import urllib.request

    if voice in _voices_cache:
        return _voices_cache[voice]

    if os.path.isfile(voice):
        _voices_cache[voice] = voice
        return voice

    if voice in Config.PRESET_VOICES:
        url = Config.PRESET_VOICES[voice]
        path = f"/tmp/{voice}.wav"
        if not os.path.exists(path):
            print(f"üì• Telechargement de la voix '{voice}'...")
            urllib.request.urlretrieve(url, path)
        _voices_cache[voice] = path
        return path

    raise FileNotFoundError(f"Voix '{voice}' non trouvee")


# ==============================================================================
# MAIN SYNTHESIS FUNCTIONS
# ==============================================================================


def _filter_kwargs(fn, kwargs: dict) -> dict:
    """Garde uniquement les kwargs acceptes par fn (compatibilite entre versions)."""
    try:
        sig = inspect.signature(fn)
        return {k: v for k, v in kwargs.items() if k in sig.parameters}
    except (TypeError, ValueError):
        # Signature indisponible (ex: fonction C++) -> on ne filtre pas
        return kwargs


def _get_conditioning_latents_compat(xtts_model, voice_path: str):
    """Compat: get_conditioning_latents() a change de signature selon les versions."""
    fn = getattr(xtts_model, "get_conditioning_latents", None)
    if fn is None:
        raise AttributeError(
            "Le modele XTTS ne fournit pas get_conditioning_latents()."
        )

    base_kwargs = {"gpt_cond_len": 30, "max_ref_length": 60}

    # Tentative par introspection
    try:
        sig = inspect.signature(fn)
        params = sig.parameters

        if "audio_path" in params:
            # Certaines versions veulent une liste, d'autres une str
            try:
                return fn(audio_path=[voice_path], **_filter_kwargs(fn, base_kwargs))
            except TypeError:
                return fn(audio_path=voice_path, **_filter_kwargs(fn, base_kwargs))

        if "audio_paths" in params:
            return fn(audio_paths=[voice_path], **_filter_kwargs(fn, base_kwargs))

        if "speaker_wav" in params:
            return fn(speaker_wav=voice_path, **_filter_kwargs(fn, base_kwargs))

    except (TypeError, ValueError):
        pass

    # Fallback brut (plus permissif)
    try:
        return fn(audio_path=[voice_path], gpt_cond_len=30, max_ref_length=60)
    except Exception:
        try:
            return fn(audio_path=voice_path, gpt_cond_len=30, max_ref_length=60)
        except Exception:
            return fn(voice_path)


def synthesize_chunk(
    text: str, voice_path: str, language: str = "fr", enable_text_splitting: bool = True
) -> np.ndarray:
    """Synthetise un chunk de texte en audio via l'inference directe."""
    model_wrapper = get_model()

    # Acceder au modele XTTS directement (bypass SpeakerManager bug)
    if hasattr(model_wrapper, "synthesizer"):
        xtts_model = model_wrapper.synthesizer.tts_model
    else:
        xtts_model = model_wrapper.tts_model

    # Calculer les latents de conditionnement (compat multi-versions)
    try:
        gpt_cond_latent, speaker_embedding = _get_conditioning_latents_compat(
            xtts_model, voice_path
        )
    except Exception as e:
        print(f"‚ö†Ô∏è Erreur calcul latents: {e}")
        raise e

    # Inference directe (filtrage des kwargs selon la signature)
    try:
        inference_kwargs = {
            "text": text,
            "language": language,
            "gpt_cond_latent": gpt_cond_latent,
            "speaker_embedding": speaker_embedding,
            "temperature": 0.7,
            "length_penalty": 1.0,
            "repetition_penalty": 2.0,
            "top_k": 50,
            "top_p": 0.8,
            "enable_text_splitting": enable_text_splitting,
        }

        # Alias possibles selon versions
        try:
            sig = inspect.signature(xtts_model.inference)
            params = sig.parameters
            if "speaker_embedding" not in params and "speaker_latents" in params:
                inference_kwargs["speaker_latents"] = inference_kwargs.pop(
                    "speaker_embedding"
                )
        except (TypeError, ValueError):
            pass

        out = xtts_model.inference(
            **_filter_kwargs(xtts_model.inference, inference_kwargs)
        )

        if isinstance(out, dict) and "wav" in out:
            wav = out["wav"]
        else:
            wav = out

        if hasattr(wav, "cpu"):
            wav = wav.cpu().numpy()
        if isinstance(wav, list):
            wav = np.array(wav, dtype=np.float32)

        return wav

    except Exception as e:
        print(f"‚ö†Ô∏è Erreur lors de l'inference directe : {e}")
        raise e


def text_to_speech_long(
    text: str,
    voice: str = "female_fr",
    language: str = "fr",
    output_path: Optional[str] = None,
    enhance: bool = False,
    use_gdrive: bool = False,
    gdrive_folder: str = None,
    max_chars_per_chunk: int = None,
    show_progress: bool = True,
    enable_text_splitting: bool = True,
) -> dict:
    """Genere un fichier audio long (> 1 heure) a partir de texte."""
    import torch

    max_chars = max_chars_per_chunk or Config.MAX_CHARS_PER_CHUNK
    voice_path = get_voice_path(voice)

    # Estimation
    estimated_duration = TextSplitter.estimate_audio_duration(text)
    print(f"\nüìù Texte: {len(text):,} caracteres")
    print(f"‚è±Ô∏è  Duree estimee: {ProgressTracker._format_time(estimated_duration)}")

    # Decoupage
    chunks = TextSplitter.split_for_long_audio(text, max_chars=max_chars)
    print(f"üì¶ Chunks: {len(chunks)}")

    # Synthese
    progress = ProgressTracker(len(chunks), "üéôÔ∏è Synthese") if show_progress else None
    audio_chunks = []

    for i, chunk in enumerate(chunks):
        chunk_start = time.time()
        try:
            wav = synthesize_chunk(
                text=chunk,
                voice_path=voice_path,
                language=language,
                enable_text_splitting=enable_text_splitting,
            )
            audio_chunks.append(wav)
        except Exception as e:
            print(f"\n‚ö†Ô∏è Erreur chunk {i + 1}: {e}")
            continue

        # Nettoyage memoire periodique
        if _device_name.startswith("cuda") and (i + 1) % 10 == 0:
            torch.cuda.empty_cache()

        if progress:
            progress.update(time.time() - chunk_start)

    if not audio_chunks:
        raise RuntimeError("Aucun audio genere")

    # Concatenation
    print("\nüîó Concatenation des chunks...")
    final_audio = AudioProcessor.concatenate_chunks(
        audio_chunks, Config.SAMPLE_RATE, Config.CROSSFADE_DURATION
    )

    # Nettoyage memoire
    del audio_chunks
    gc.collect()
    if _device_name.startswith("cuda"):
        torch.cuda.empty_cache()

    # Post-traitement
    if enhance:
        print("‚ú® Post-traitement...")
        final_audio = AudioProcessor.enhance(
            final_audio, Config.SAMPLE_RATE, normalize=True, warmth=True
        )
    else:
        final_audio = AudioProcessor.normalize(final_audio)

    # Conversion en int16
    final_audio = (final_audio * 32767).astype(np.int16)

    # Chemin de sortie
    if output_path is None:
        h = hashlib.md5(text[:100].encode()).hexdigest()[:8]
        output_path = f"tts_long_{voice}_{h}.wav"

    if use_gdrive:
        folder = Path(gdrive_folder or Config.GDRIVE_FOLDER)
        folder.mkdir(parents=True, exist_ok=True)
        final_path = folder / Path(output_path).name
    else:
        final_path = Path(output_path)

    # Sauvegarde WAV
    print(f"üíæ Sauvegarde: {final_path}")
    with wave.open(str(final_path), "wb") as wav_file:
        wav_file.setnchannels(1)
        wav_file.setsampwidth(2)
        wav_file.setframerate(Config.SAMPLE_RATE)
        wav_file.writeframes(final_audio.tobytes())

    duration = len(final_audio) / Config.SAMPLE_RATE

    print(f"\n‚úÖ Audio genere avec succes!")
    print(f"   üìÅ Fichier: {final_path}")
    print(f"   ‚è±Ô∏è  Duree: {ProgressTracker._format_time(duration)}")
    print(f"   üì¶ Chunks: {len(chunks)}")
    print(f"   üé§ Voix: {voice}")

    return {
        "path": str(final_path),
        "sample_rate": Config.SAMPLE_RATE,
        "duration_seconds": duration,
        "duration_formatted": ProgressTracker._format_time(duration),
        "audio_data": final_audio,
        "voice": voice,
        "language": language,
        "device": _device_name,
        "chunks_count": len(chunks),
        "text_length": len(text),
    }


def text_to_speech(
    text: str,
    voice: str = "female_fr",
    language: str = "fr",
    output_path: Optional[str] = None,
    enhance: bool = False,
    use_gdrive: bool = False,
    gdrive_folder: str = None,
    enable_text_splitting: bool = True,
) -> dict:
    """Genere un fichier audio a partir de texte avec XTTS v2."""
    # Rediriger vers version longue si necessaire
    if len(text) > 10000:
        print("üì¢ Texte long detecte - utilisation de text_to_speech_long()")
        return text_to_speech_long(
            text=text,
            voice=voice,
            language=language,
            output_path=output_path,
            enhance=enhance,
            use_gdrive=use_gdrive,
            gdrive_folder=gdrive_folder,
            enable_text_splitting=enable_text_splitting,
        )

    voice_path = get_voice_path(voice)

    # Synthese
    wav = synthesize_chunk(
        text=text,
        voice_path=voice_path,
        language=language,
        enable_text_splitting=enable_text_splitting,
    )

    # Post-traitement
    if enhance:
        audio = AudioProcessor.enhance(wav, Config.SAMPLE_RATE)
    else:
        audio = AudioProcessor.normalize(wav)

    audio = (audio * 32767).astype(np.int16)

    # Chemin de sortie
    if output_path is None:
        h = hashlib.md5(text.encode()).hexdigest()[:8]
        output_path = f"tts_{voice}_{h}.wav"

    if use_gdrive:
        folder = Path(gdrive_folder or Config.GDRIVE_FOLDER)
        folder.mkdir(parents=True, exist_ok=True)
        final_path = folder / Path(output_path).name
    else:
        final_path = Path(output_path)

    # Sauvegarde WAV
    with wave.open(str(final_path), "wb") as wav_file:
        wav_file.setnchannels(1)
        wav_file.setsampwidth(2)
        wav_file.setframerate(Config.SAMPLE_RATE)
        wav_file.writeframes(audio.tobytes())

    duration = len(audio) / Config.SAMPLE_RATE
    print(f"‚úì Audio genere: {final_path}")
    print(f"  Duree: {duration:.2f}s | Voix: {voice}")

    return {
        "path": str(final_path),
        "sample_rate": Config.SAMPLE_RATE,
        "duration_seconds": duration,
        "audio_data": audio,
        "voice": voice,
        "language": language,
        "device": _device_name,
    }


# ==============================================================================
# UTILITIES
# ==============================================================================


def preview_audio(result: dict) -> None:
    """Previsualise l'audio dans le notebook."""
    from IPython.display import Audio, display

    audio = result["audio_data"]
    if audio.dtype == np.int16:
        audio = audio.astype(np.float32) / 32768.0
    display(Audio(audio, rate=result["sample_rate"]))


def list_voices() -> list:
    """Liste les voix disponibles."""
    return list(Config.PRESET_VOICES.keys())


def list_languages() -> list:
    """Liste les langues supportees."""
    return [
        "en",
        "es",
        "fr",
        "de",
        "it",
        "pt",
        "pl",
        "tr",
        "ru",
        "nl",
        "cs",
        "ar",
        "zh-cn",
        "ja",
        "hu",
        "ko",
        "hi",
    ]


def clear_cache():
    """Vide le cache du modele."""
    global _tts_model
    import torch

    _tts_model = None
    gc.collect()
    if _device_name.startswith("cuda"):
        torch.cuda.empty_cache()
    print("‚úì Cache vide")


def estimate_duration(text: str) -> dict:
    """Estime la duree audio pour un texte."""
    duration = TextSplitter.estimate_audio_duration(text)
    chunks = len(TextSplitter.split_for_long_audio(text))
    return {
        "chars": len(text),
        "estimated_seconds": duration,
        "estimated_formatted": ProgressTracker._format_time(duration),
        "chunks_estimate": chunks,
    }


# Aliases
tts = text_to_speech
tts_long = text_to_speech_long

# ==============================================================================
# INITIALIZATION
# ==============================================================================


def init():
    """Initialise le module."""
    detect_device()
    print("‚úÖ Module XTTS v2 Long Audio v4 charge")
    print(f"   Device: {_device_name}")
    print(f"   Voix disponibles: {list_voices()}")
    print(f"   enable_text_splitting: active par defaut")
    print(f"   Fix torchcodec: actif")


# Auto-init
try:
    detect_device()
except:
    pass

print("\n" + "=" * 60)
print("TTS XTTS v2 - Long Audio Generator v4")
print("Compatible PyTorch 2.9+ (fix torchcodec)")
print("=" * 60)
print(f"Voix disponibles: {list_voices()}")

‚öôÔ∏è Device: CPU

TTS XTTS v2 - Long Audio Generator v4
Compatible PyTorch 2.9+ (fix torchcodec)
Voix disponibles: ['female_fr', 'male_fr']


In [8]:
# Monter Google Drive (optionnel)
# Le notebook peut aussi s'executer hors Colab : dans ce cas on ignore simplement le montage.

try:
    from google.colab import drive  # type: ignore

    drive.mount("/content/drive")
except Exception as e:
    print("‚ÑπÔ∏è Google Colab non detecte ou Drive indisponible -> montage ignore.")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [9]:
# Initialisation du module
init()

‚öôÔ∏è Device: CPU
‚úÖ Module XTTS v2 Long Audio v4 charge
   Device: cpu
   Voix disponibles: ['female_fr', 'male_fr']
   enable_text_splitting: active par defaut
   Fix torchcodec: actif


In [10]:
# ==============================================================================
# EXEMPLE 1: Texte court
# ==============================================================================

text_court = """
Bonjour! Ceci est un test de synthese vocale avec XTTS v2.
Le module est maintenant compatible avec PyTorch 2.9 et superieur.
"""

# Choisir la voix
voice_gender = "female_fr"  # ou "male_fr"

# Generation
result = text_to_speech(text_court.strip(), voice=voice_gender, enhance=True)

# Previsualisation
preview_audio(result)

üì• Telechargement de la voix 'female_fr'...
üîÑ Chargement du modele XTTS v2...


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 1.87G/1.87G [00:38<00:00, 48.0MiB/s]
4.37kiB [00:00, 2.19MiB/s]
361kiB [00:00, 25.7MiB/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 32.0/32.0 [00:00<00:00, 42.1kiB/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 7.75M/7.75M [00:00<00:00, 34.4MiB/s]


‚úì Modele charge
‚úì Audio genere: tts_female_fr_3f623356.wav
  Duree: 10.12s | Voix: female_fr


In [16]:
# ==============================================================================
# GEMINI PROSODY ENHANCER (NOUVEAU)
# ==============================================================================

class ProsodyEnhancer:
    """
    Utilise l'API Gemini pour r√©√©crire le texte et am√©liorer la prosodie TTS.
    """
    
    SYSTEM_PROMPT = """
    Tu es un expert en ing√©nierie textuelle pour la synth√®se vocale neuronale (TTS). 
    Ta t√¢che est de r√©√©crire le texte fourni pour qu'il soit lu de mani√®re ultra-naturelle et humaine.
    
    CONSIGNES STRICTES :
    1. PONCTUATION RESPIRATOIRE : Ajoute des virgules (,) pour les courtes respirations et des points de suspension (...) pour les pauses r√©fl√©chies ou les fins de phrases douces. C'est le levier le plus important.
    2. NOMBRES : Convertis TOUS les chiffres, dates, et pourcentages en toutes lettres (ex: "2025" -> "deux mille vingt-cinq").
    3. SIGLES : S√©pare les lettres des sigles par des espaces si elles doivent √™tre √©pel√©es (ex: "IA" -> "I A", "USA" -> "U S A").
    4. SEGMENTATION : Coupe les phrases trop longues en propositions plus digestes.
    5. FID√âLIT√â : Ne change pas le sens du texte. Ne supprime pas d'informations. Garde le style original.
    6. FORMAT : Renvoie uniquement le texte brut modifi√©, sans balises markdown ni commentaires.
    """

    @staticmethod
    def enhance_text(text: str, api_key: str) -> str:
        """
        Envoie le texte √† Gemini pour am√©lioration prosodique.
        """
        if not api_key:
            print("‚ö†Ô∏è Pas de cl√© API Gemini fournie. Utilisation du texte brut.")
            return text
            
        import google.generativeai as genai
        
        print("\n‚ú® Appel √† Gemini pour am√©lioration de la prosodie...")
        try:
            genai.configure(api_key=api_key)
            model = genai.GenerativeModel(Config.GEMINI_MODEL)
            
            # Pour les textes tr√®s longs, on s'assure que Gemini peut g√©rer le contexte
            # Gemini 1.5 a une √©norme fen√™tre de contexte, donc on envoie tout d'un bloc
            # pour la coh√©rence, sauf si c'est vraiment gigantesque.
            
            response = model.generate_content(
                f"{ProsodyEnhancer.SYSTEM_PROMPT}\n\nTEXTE √Ä TRAITER :\n{text}"
            )
            
            enhanced_text = response.text.strip()
            print("‚úÖ Texte am√©lior√© avec succ√®s.")
            return enhanced_text
            
        except Exception as e:
            print(f"‚ùå Erreur Gemini : {e}")
            print("   -> Repli sur le texte original.")
            return text




In [20]:
import os
import sys

def get_gemini_key():
    # 1. Tentative via Colab Secrets (Uniquement si UI Web)
    if 'google.colab' in sys.modules:
        try:
            from google.colab import userdata
            # On utilise un timeout court pour ne pas bloquer VS Code
            return userdata.get('GEMINI_API_KEY')
        except Exception:
            # Si on est sur VS Code via Colab, userdata.get √©choue ou timeout
            print("‚ÑπÔ∏è  Colab UI non d√©tect√©e (VS Code ?). Recherche d'une alternative...")

    # 2. Tentative via Variable d'environnement (Id√©al pour VS Code / Local)
    # Pensez √† faire : export GEMINI_API_KEY="votre_cle" dans le terminal VS Code
    key = os.getenv('GEMINI_API_KEY')
    if key:
        return key

    # 3. Ultime recours : Saisie manuelle (bloquant mais efficace)
    # print("‚ö†Ô∏è Aucune cl√© trouv√©e dans les secrets ou l'environnement.")
    # return input("Veuillez coller votre cl√© API Gemini ici : ").strip()
    
    return None

# Utilisation
GEMINI_KEY = get_gemini_key()

if GEMINI_KEY:
    print("‚úÖ Cl√© GEMINI_API_KEY charg√©e.")
else:
    print("‚ùå Erreur : Cl√© introuvable. Utilisez 'export GEMINI_API_KEY=...' dans votre terminal.")

‚ÑπÔ∏è  Colab UI non d√©tect√©e (VS Code ?). Recherche d'une alternative...
‚ùå Erreur : Cl√© introuvable. Utilisez 'export GEMINI_API_KEY=...' dans votre terminal.


In [22]:
# 1. Importation de la biblioth√®que principale
import ipywidgets as widgets

# 2. Importation de la fonction d'affichage (optionnel mais recommand√© pour la clart√©)
from IPython.display import display

In [None]:
# ==========================================
# 3. INTERFACE UTILISATEUR
# ==========================================
DRIVE_FOLDER="/content/drive/MyDrive/Audio_Projet"
text_input = widgets.Textarea(
    placeholder="Entrez votre texte ici...",
    value="texte",
    layout=widgets.Layout(width="100%", height="150px"),
)
button = widgets.Button(
    description="G√©n√©rer Audio", button_style="success", icon="check"
)
progress_bar = widgets.IntProgress(
    value=0, min=0, max=100, layout=widgets.Layout(width="100%", visibility="hidden")
)
output = widgets.Output()

def on_click(b):
    # On vide la zone de message avant chaque nouvelle g√©n√©ration
    output.clear_output()

    with output:
        # 1. On r√©cup√®re la cha√Æne de caract√®res depuis le Textarea
        texte_a_traiter = text_input.value.strip()

        # 2. V√©rification de s√©curit√©
        if not texte_a_traiter:
            print("‚ùå Le texte est vide. Merci de saisir un message.")
            return


        # Interface : on d√©sactive le bouton et montre la barre de progression
        button.disabled = True
        progress_bar.layout.visibility = "visible"
        progress_bar.bar_style = "info"
        progress_bar.value = 10

        # 1. Humanisation via Gemini
        if GEMINI_KEY :
            texte_a_traiter = ProsodyEnhancer.enhance_text(texte_a_traiter, GEMINI_KEY )

        try:
            # 3. Appel de la fonction TTS avec la variable texte_a_traiter
            result = text_to_speech(texte_a_traiter, voice=voice_gender, enhance=True)

            # Pr√©visualisation
            preview_audio(result)

            # 4. Sauvegarde sur le Drive
            if not os.path.exists("/content/drive"):
                drive.mount("/content/drive")

            if not os.path.exists(DRIVE_FOLDER):
                os.makedirs(DRIVE_FOLDER)

            progress_bar.value = 100
            progress_bar.bar_style = "success"
            print("‚úÖ Audio g√©n√©r√© et pr√™t !")

        except Exception as e:
            progress_bar.bar_style = "danger"
            print(f"‚ùå Erreur lors du traitement : {e}")
        finally:
            # On r√©active le bouton quoi qu'il arrive
            button.disabled = False


button.on_click(on_click)

# Affichage avec rappel de la config charg√©e
display(
    widgets.VBox(
        [
            widgets.HTML(f"<h3>G√©n√©rateur TTS</h3>"),
            widgets.Label(f"Voix charg√©e"),
            text_input,
            button,
            progress_bar,
            output,
        ]
    )
)

VBox(children=(HTML(value='<h3>G√©n√©rateur TTS - Config </h3>'), Label(value='Voix charg√©e'), Textarea(value='t‚Ä¶

In [None]:
# Nettoyer le cache si necessaire (libere la memoire GPU)
clear_cache()

In [None]:
stop

In [None]:
# Generation du texte long
result_long = text_to_speech(
    text_long.strip(),
    voice="female_fr",
    enhance=True,
    use_gdrive=True,  # Mettre True pour sauvegarder sur Drive
)

# Previsualisation
preview_audio(result_long)

In [None]:
# ==============================================================================
# EXEMPLE 2: Texte long
# ==============================================================================

text_long = """
La synthese vocale, egalement appelee text-to-speech ou TTS, est une technologie
qui permet de convertir du texte ecrit en parole audible. Cette technologie a
considerablement evolue au fil des annees, passant de voix robotiques et
mecaniques a des voix naturelles et expressives.

XTTS v2 est l'un des modeles les plus avances dans ce domaine. Developpe par
Coqui AI, il utilise des techniques d'apprentissage profond pour generer une
parole de haute qualite dans plusieurs langues. Le modele peut meme cloner
des voix a partir d'un court echantillon audio de reference.

Les applications de la synthese vocale sont nombreuses: assistants virtuels,
livres audio, accessibilite pour les personnes malvoyantes, doublage video,
et bien d'autres encore. Avec les avancees recentes en intelligence artificielle,
la qualite de la synthese vocale continue de s'ameliorer, rendant la distinction
entre voix humaine et voix synthetique de plus en plus difficile.
"""

# Estimation avant generation
estimation = estimate_duration(text_long)
print(f"üìä Estimation:")
print(f"   Caracteres: {estimation['chars']:,}")
print(f"   Duree estimee: {estimation['estimated_formatted']}")
print(f"   Chunks estimes: {estimation['chunks_estimate']}")