In [1]:
!pip install TTS
!pip install "transformers==4.36.2"

Collecting TTS
  Downloading TTS-0.22.0-cp311-cp311-manylinux1_x86_64.whl.metadata (21 kB)
Collecting scikit-learn>=1.3.0 (from TTS)
  Downloading scikit_learn-1.7.1-cp311-cp311-manylinux2014_x86_64.manylinux_2_17_x86_64.whl.metadata (11 kB)
Collecting anyascii>=0.3.0 (from TTS)
  Downloading anyascii-0.3.3-py3-none-any.whl.metadata (1.6 kB)
Collecting pysbd>=0.3.4 (from TTS)
  Downloading pysbd-0.3.4-py3-none-any.whl.metadata (6.1 kB)
Collecting pandas<2.0,>=1.4 (from TTS)
  Downloading pandas-1.5.3-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (11 kB)
Collecting trainer>=0.0.32 (from TTS)
  Downloading trainer-0.0.36-py3-none-any.whl.metadata (8.1 kB)
Collecting coqpit>=0.0.16 (from TTS)
  Downloading coqpit-0.0.17-py3-none-any.whl.metadata (11 kB)
Collecting pypinyin (from TTS)
  Downloading pypinyin-0.55.0-py2.py3-none-any.whl.metadata (12 kB)
Collecting hangul-romanize (from TTS)
  Downloading hangul_romanize-0.1.0-py3-none-any.whl.metadata (1.2 kB)
Collectin

In [2]:
# import json
# import os
# import torch
# import torchaudio
# import numpy as np
# from pathlib import Path
# import logging
# from typing import List, Dict, Optional, Tuple
# import re
# from dataclasses import dataclass
# from concurrent.futures import ThreadPoolExecutor, as_completed
# import time
# # XTTS imports
# from TTS.tts.configs.xtts_config import XttsConfig
# from TTS.tts.models.xtts import Xtts
# from TTS.utils.generic_utils import get_user_data_dir

In [3]:
import json
import os
import torch
import torchaudio
import numpy as np
from pathlib import Path
import logging
from typing import List, Dict, Optional, Tuple
import re
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor, as_completed
import time

# XTTS imports from the original script
from TTS.tts.configs.xtts_config import XttsConfig
from TTS.tts.models.xtts import Xtts
from TTS.utils.generic_utils import get_user_data_dir

# Add this new import for the API
from TTS.api import TTS

  _torch_pytree._register_pytree_node(
  _torch_pytree._register_pytree_node(
  _torch_pytree._register_pytree_node(


In [4]:
import torch
from TTS.tts.configs.xtts_config import XttsConfig
from TTS.tts.models.xtts import XttsAudioConfig, XttsArgs # <-- Added XttsArgs
from TTS.config.shared_configs import BaseDatasetConfig

# Allow PyTorch to load all necessary custom classes from the model file
torch.serialization.add_safe_globals([
    XttsConfig,
    XttsAudioConfig,
    BaseDatasetConfig,
    XttsArgs
])

In [5]:
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

In [6]:
@dataclass
class SpeechConfig:
    """Configuration for speech synthesis"""
    model_path: str = None
    config_path: str = None
    speaker_wav_path: str = None
    language: str = "en"
    output_dir: str = "output_speech"
    chunk_size: int = 200  # Maximum words per chunk
    overlap_words: int = 10  # Words to overlap between chunks
    sample_rate: int = 22050
    temperature: float = 0.75
    length_penalty: float = 1.0
    repetition_penalty: float = 5.0
    top_k: int = 50
    top_p: float = 0.85
    speed: float = 1.0
    enable_text_splitting: bool = True
    num_workers: int = 1  # For parallel processing

In [7]:
def convert_list_to_dict(json_path: str) -> str:
    """
    Converts a JSON file from a list of items to a dictionary.
    
    If the input JSON is a list, it creates a new dictionary where each
    item from the list is a value, and its key is 'item_XXX'.
    A new file with '_converted' appended to the name is saved in 
    '/kaggle/working/'.
    
    Args:
        json_path: The full path to the input JSON file.
        
    Returns:
        The path to the newly created (converted) file, or the original
        path if no conversion was needed.
    """
    with open(json_path, 'r') as f:
        data = json.load(f)
    
    # Check if the loaded data is a list
    if isinstance(data, list):
        converted = {}
        # Iterate through the list and assign a key to each item
        for i, item in enumerate(data):
            # The key will be like "item_000", "item_001", etc.
            key = f"item_{i:03d}"
            converted[key] = item
        
        # Create the new file path for the converted file
        base_name = os.path.basename(json_path)
        new_name = base_name.replace('.json', '_converted.json')
        new_path = os.path.join('/kaggle/working/', new_name)
        
        # Save the new dictionary to the new file path
        with open(new_path, 'w') as f:
            json.dump(converted, f, indent=2)
        
        # Return the path of the new file
        return new_path
    
    # If the data was not a list, return the original path
    return json_path

In [8]:
class TextProcessor:
    """Handles text preprocessing and chunking for long texts"""
    
    def __init__(self, chunk_size: int = 200, overlap_words: int = 10):
        self.chunk_size = chunk_size
        self.overlap_words = overlap_words
    
    def clean_text(self, text: str) -> str:
        """Clean and normalize text for TTS"""
        # Remove extra whitespace and normalize
        text = re.sub(r'\s+', ' ', text.strip()) # <-- Corrected regex
        
        # Handle common abbreviations
        abbreviations = {
            'Mr.': 'Mister', 'Mrs.': 'Missus', 'Dr.': 'Doctor', 'Prof.': 'Professor',
            'St.': 'Saint', 'etc.': 'et cetera', 'vs.': 'versus', 'Inc.': 'Incorporated',
            'Ltd.': 'Limited', 'Co.': 'Company'
        }
        for abbr, full in abbreviations.items():
            text = text.replace(abbr, full)
        
        # Handle numbers (basic implementation)
        text = re.sub(r'\b(\d+)\b', lambda m: self._number_to_words(int(m.group(1))), text)
        return text
    
    def _number_to_words(self, num: int) -> str:
        """Convert numbers to words (basic implementation)"""
        if num == 0: return "zero"
        ones = ["", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine"]
        teens = ["ten", "eleven", "twelve", "thirteen", "fourteen", "fifteen", "sixteen", "seventeen", "eighteen", "nineteen"]
        tens = ["", "", "twenty", "thirty", "forty", "fifty", "sixty", "seventy", "eighty", "ninety"]
        if num < 10: return ones[num]
        elif num < 20: return teens[num - 10]
        elif num < 100: return tens[num // 10] + ("" if num % 10 == 0 else " " + ones[num % 10])
        elif num < 1000: return ones[num // 100] + " hundred" + ("" if num % 100 == 0 else " " + self._number_to_words(num % 100))
        else: return str(num)
    
    def split_into_sentences(self, text: str) -> List[str]:
        """Split text into sentences using multiple delimiters"""
        sentences = re.split(r'(?<=[.!?])\s+(?=[A-Z])', text)
        return [s.strip() for s in sentences if s.strip()]
    
    def chunk_text(self, text: str) -> List[str]:
        """Split long text into manageable chunks"""
        if not text.strip(): return []
        text = self.clean_text(text)
        sentences = self.split_into_sentences(text)
        chunks, current_chunk, current_word_count = [], "", 0
        for sentence in sentences:
            sentence_words = sentence.split()
            sentence_word_count = len(sentence_words)
            if current_word_count + sentence_word_count > self.chunk_size and current_chunk:
                chunks.append(current_chunk.strip())
                if self.overlap_words > 0 and current_word_count >= self.overlap_words:
                    overlap_text = ' '.join(current_chunk.split()[-self.overlap_words:])
                    current_chunk = overlap_text + " " + sentence
                    current_word_count = self.overlap_words + sentence_word_count
                else:
                    current_chunk, current_word_count = sentence, sentence_word_count
            else:
                current_chunk += " " + sentence if current_chunk else sentence
                current_word_count += sentence_word_count
        if current_chunk.strip(): chunks.append(current_chunk.strip())
        return chunks

In [9]:


class XTTSPipeline:
    """Main pipeline for XTTS v2 speech synthesis with voice cloning"""

    def __init__(self, config: SpeechConfig):
        self.config = config
        self.model = None
        self.text_processor = TextProcessor(
            chunk_size=config.chunk_size,
            overlap_words=config.overlap_words
        )
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        logger.info(f"Using device: {self.device}")
        Path(self.config.output_dir).mkdir(parents=True, exist_ok=True)

    def load_model(self):
        """Load XTTS model and configuration"""
        try:
            logger.info("Loading XTTS model...")
            self.model = TTS("tts_models/multilingual/multi-dataset/xtts_v2").to(self.device)
            logger.info("Model loaded successfully!")
        except Exception as e:
            logger.error(f"Error loading model: {e}")
            raise

    def load_speaker_conditioning(self, speaker_wav_path: str) -> Tuple[torch.Tensor, torch.Tensor]:
        """Manually load and process speaker conditioning audio."""
        try:
            # CORRECTED PATH: Access the model via model.synthesizer.tts_model
            gpt_cond_latent, speaker_embedding = self.model.synthesizer.tts_model.get_conditioning_latents(
                audio_path=[speaker_wav_path]
            )
            return gpt_cond_latent, speaker_embedding
        except Exception as e:
            logger.error(f"Error processing speaker audio: {e}")
            raise

    def synthesize_chunk(self, text: str, gpt_cond_latent: torch.Tensor,
                        speaker_embedding: torch.Tensor) -> torch.Tensor:
        """Synthesize speech for a single text chunk using the low-level inference method."""
        try:
            # CORRECTED PATH: Access the model via model.synthesizer.tts_model
            out = self.model.synthesizer.tts_model.inference(
                text,
                self.config.language,
                gpt_cond_latent,
                speaker_embedding,
                temperature=self.config.temperature,
                length_penalty=self.config.length_penalty,
                repetition_penalty=self.config.repetition_penalty,
                top_k=self.config.top_k,
                top_p=self.config.top_p,
                speed=self.config.speed,
                enable_text_splitting=self.config.enable_text_splitting,
            )
            return torch.tensor(out['wav']).unsqueeze(0)
        except Exception as e:
            logger.error(f"Error synthesizing chunk: {e}")
            raise

    def concatenate_audio(self, audio_chunks: List[torch.Tensor]) -> torch.Tensor:
        """Concatenate audio chunks with smooth transitions"""
        if not audio_chunks:
            return torch.tensor([])
        if len(audio_chunks) == 1:
            return audio_chunks[0].squeeze(0)
        
        silence = torch.zeros(int(0.25 * self.config.sample_rate))
        result = []
        for i, chunk in enumerate(audio_chunks):
            result.append(chunk.squeeze(0))
            if i < len(audio_chunks) - 1:
                result.append(silence)
        return torch.cat(result)

    def process_single_script(self, script_data: Dict, output_filename: str = None) -> str:
        """Process a single script entry using the robust, low-level workflow."""
        try:
            if 'file' in script_data and script_data['file']:
                output_filename = script_data['file']
            elif output_filename is None:
                return None
            
            text = script_data.get('text', '')
            if not text:
                return None

            speaker_wav = script_data.get('speaker_wav', self.config.speaker_wav_path)
            if not speaker_wav or not os.path.exists(speaker_wav):
                logger.error(f"Speaker audio not found at '{speaker_wav}'. Skipping.")
                return None
            
            logger.info(f"Processing {output_filename}...")
            
            gpt_cond_latent, speaker_embedding = self.load_speaker_conditioning(speaker_wav)
            text_chunks = self.text_processor.chunk_text(text)
            
            audio_chunks = []
            for i, chunk in enumerate(text_chunks):
                logger.info(f"Synthesizing chunk {i+1}/{len(text_chunks)} for {output_filename}")
                audio_chunk = self.synthesize_chunk(chunk, gpt_cond_latent, speaker_embedding)
                audio_chunks.append(audio_chunk)
            
            final_audio = self.concatenate_audio(audio_chunks)
            output_path = os.path.join(self.config.output_dir, output_filename)
            torchaudio.save(output_path, final_audio.unsqueeze(0), self.config.sample_rate)
            logger.info(f"Successfully generated: {output_path}")
            return output_path

        except Exception as e:
            logger.error(f"An error occurred while processing {output_filename}: {e}")
            import traceback
            traceback.print_exc()
            return None

    def run_pipeline(self, json_file_path: str) -> List[str]:
        """Main pipeline execution"""
        try:
            logger.info(f"Loading scripts from: {json_file_path}")
            with open(json_file_path, 'r', encoding='utf-8') as f:
                scripts = json.load(f)
            logger.info(f"Loaded {len(scripts)} scripts")

            if self.model is None:
                self.load_model()

            logger.info("Processing scripts sequentially...")
            successful_outputs = []
            for script_id, script_data in scripts.items():
                output_filename = script_data.get('file', f"{script_id}.wav")
                result = self.process_single_script(script_data, output_filename)
                if result:
                    successful_outputs.append(result)
                if torch.cuda.is_available():
                    torch.cuda.empty_cache()
            
            logger.info(f"Pipeline completed! Generated {len(successful_outputs)} audio files")
            return successful_outputs
        except Exception as e:
            logger.error(f"Pipeline execution failed: {e}")
            raise

In [10]:
config = SpeechConfig(
        # model_path="path/to/your/model.pth",  # Optional: use custom model
        # config_path="path/to/your/config.json",  # Optional: use custom config
        speaker_wav_path="/kaggle/input/sample-voice/sample_voice.wav",  # Required: reference voice
        language="en",
        output_dir="generated_speech",
        chunk_size=150,  # Adjust based on your needs
        overlap_words=5,
        temperature=0.75,
        num_workers=1,  # Set to > 1 for parallel processing (requires more VRAM)
        enable_text_splitting=True
    )

In [11]:
pipeline = XTTSPipeline(config)

In [12]:
json_path = "/kaggle/input/annotationsv2/real_time_wargaming_dialogues_v4.json"

In [13]:
scripts = convert_list_to_dict(json_path)
print("Converted file saved to:", scripts)

Converted file saved to: /kaggle/working/real_time_wargaming_dialogues_v4_converted.json


In [None]:
try:
    results = pipeline.run_pipeline(scripts)
    print(f"Successfully generated {len(results)} audio files:")
    for result in results:
        print(f"  - {result}")
except Exception as e:
    print(f"Pipeline failed: {e}")

 > You must confirm the following:
 | > "I have purchased a commercial license from Coqui: licensing@coqui.ai"
 | > "Otherwise, I agree to the terms of the non-commercial CPML: https://coqui.ai/cpml" - [y/n]


 | | >  y


 > Downloading model to /root/.local/share/tts/tts_models--multilingual--multi-dataset--xtts_v2


100%|█████████▉| 1.86G/1.87G [00:18<00:00, 103MiB/s] 
100%|██████████| 1.87G/1.87G [00:19<00:00, 98.0MiB/s]
4.37kiB [00:00, 20.4kiB/s]

361kiB [00:00, 2.03MiB/s]0 [00:00<?, ?iB/s][A
100%|██████████| 32.0/32.0 [00:00<00:00, 156iB/s]


 > Model's license - CPML
 > Check https://coqui.ai/cpml.txt for more info.
 > Using model: xtts


In [None]:
!zip -r output.zip generated_speech