<a href="https://colab.research.google.com/github/hamed-jamali-software/mp32face/blob/main/coqui_XTTS_v2_colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
%cd /content
!git clone -b dev https://github.com/camenduru/xtts2-hf
%cd /content/xtts2-hf
!pip install -q gradio==3.50.2 TTS==0.21.1 langid unidic-lite unidic deepspeed
!pip install -q numpy<2.0.0 -U
!wget https://huggingface.co/spaces/coqui/xtts/resolve/main/examples/female.wav -O /content/xtts2-hf/examples/female.wav
!wget https://huggingface.co/spaces/coqui/xtts/resolve/main/examples/male.wav -O /content/xtts2-hf/examples/male.wav
!wget https://huggingface.co/spaces/coqui/xtts/resolve/main/ffmpeg.zip -O /content/xtts2-hf/ffmpeg.zip


In [None]:
!python app.py

In [None]:
import sys
import io, os, stat
import subprocess
import random
from zipfile import ZipFile
import uuid
import time
import torch
import torchaudio
import subprocess
import uuid
import time
import torch
import torchaudio
import langid
import re
from zipfile import ZipFile
from TTS.api import TTS
from TTS.tts.configs.xtts_config import XttsConfig
from TTS.tts.models.xtts import Xtts
from TTS.utils.generic_utils import get_user_data_dir
from huggingface_hub import HfApi


In [None]:

# Download for mecab
os.system('python -m unidic download')

# By using XTTS you agree to CPML license https://coqui.ai/cpml
os.environ["COQUI_TOS_AGREED"] = "1"

HF_TOKEN = os.environ.get("HF_TOKEN")
api = HfApi(token=HF_TOKEN)
repo_id = "coqui/xtts"

# Use newer ffmpeg binary for Ubuntu20 to use denoising for microphone input
print("Export newer ffmpeg binary for denoise filter")
ZipFile("ffmpeg.zip").extractall()
print("Make ffmpeg binary executable")
st = os.stat("ffmpeg")
os.chmod("ffmpeg", st.st_mode | stat.S_IEXEC)

# This will trigger downloading model
print("Downloading if not downloaded Coqui XTTS V2")
from TTS.utils.manage import ModelManager

model_name = "tts_models/multilingual/multi-dataset/xtts_v2"
ModelManager().download_model(model_name)
model_path = os.path.join(get_user_data_dir("tts"), model_name.replace("/", "--"))
print("XTTS downloaded")

Export newer ffmpeg binary for denoise filter
Make ffmpeg binary executable
Downloading if not downloaded Coqui XTTS V2
 > Downloading model to /root/.local/share/tts/tts_models--multilingual--multi-dataset--xtts_v2
 > Model's license - CPML
 > Check https://coqui.ai/cpml.txt for more info.
XTTS downloaded


In [None]:

config = XttsConfig()
config.load_json(os.path.join(model_path, "config.json"))

model = Xtts.init_from_config(config)
model.load_checkpoint(
    config,
    checkpoint_path=os.path.join(model_path, "model.pth"),
    vocab_path=os.path.join(model_path, "vocab.json"),
    eval=True,
    use_deepspeed=True,
)
model.cuda()

DEVICE_ASSERT_DETECTED = 0
DEVICE_ASSERT_PROMPT = None
DEVICE_ASSERT_LANG = None

supported_languages = config.languages


In [None]:
import textwrap
import subprocess
import uuid
import time
import re
import datetime
import csv
from io import StringIO

In [None]:
def split_text(text, max_length=200):
    return textwrap.wrap(text, width=max_length, break_long_words=False)


In [None]:
def text_to_speech(prompt, language, audio_file_pth=None, mic_file_path=None, use_mic=False, voice_cleanup=False, no_lang_auto_detect=False, agree=True, output_file=None):
    if agree:


        speaker_wav = audio_file_pth




        try:
            prompt = re.sub("([^\x00-\x7F]|\w)(\.|\。|\?)", r"\1 \2\2", prompt)
            print("Generating new audio...")
            t0 = time.time() # زمان شروع فرآیند تولید صوت
            out = model.inference(
                prompt,
                language,
    *model.get_conditioning_latents(audio_path=speaker_wav, gpt_cond_len=30, gpt_cond_chunk_len=4, max_ref_length=60),
                repetition_penalty=5.0,
                temperature=0.75,
            )
            inference_time = time.time() - t0
            print(f"Time to generate audio: {round(inference_time * 1000)} milliseconds")
            real_time_factor = (time.time() - t0) / out['wav'].shape[-1] * 24000
            print(f"Real-time factor (RTF): {real_time_factor}")
            torchaudio.save(output_file, torch.tensor(out["wav"]).unsqueeze(0), 24000)
            print("Audio saved to  "+output_file)

        except RuntimeError as e:
            if "device-side assert" in str(e):
                print(f"Exit due to: Unrecoverable exception caused by language:{language} prompt:{prompt}")
                print("Unhandled Exception encounter, please retry in a minute")
                print("Cuda device-assert Runtime encountered need restart")
                if not DEVICE_ASSERT_DETECTED:
                    DEVICE_ASSERT_DETECTED = 1
                    DEVICE_ASSERT_PROMPT = prompt
                    DEVICE_ASSERT_LANG = language

                error_time = datetime.datetime.now().strftime("%d-%m-%Y-%H:%M:%S")
                error_data = [
                    error_time,
                    prompt,
                    language,
                    audio_file_pth,
                    mic_file_path,
                    use_mic,
                    voice_cleanup,
                    no_lang_auto_detect,
                    agree,
                ]
                error_data = [str(e) if type(e) != str else e for e in error_data]
                print(error_data)
                write_io = StringIO()
                csv.writer(write_io).writerows([error_data])
                csv_upload = write_io.getvalue().encode()

                filename = error_time + "_" + str(uuid.uuid4()) + ".csv"
                print("Writing error csv")
                error_api = HfApi()
                error_api.upload_file(
                    path_or_fileobj=csv_upload,
                    path_in_repo=filename,
                    repo_id="coqui/xtts-flagged-dataset",
                    repo_type="dataset",
                )

                print("Writing error reference audio")
                speaker_filename = error_time + "_reference_" + str(uuid.uuid4()) + ".wav"
                error_api.upload_file(
                    path_or_fileobj=speaker_wav,
                    path_in_repo=speaker_filename,
                    repo_id="coqui/xtts-flagged-dataset",
                    repo_type="dataset",
                )

                space = api.get_space_runtime(repo_id=repo_id)
                if space.stage != "BUILDING":
                    api.restart_space(repo_id=repo_id)
                else:
                    print("TRIED TO RESTART but space is building")
            else:
                if "Failed to decode" in str(e):
                    print("Speaker encoding error", str(e))
                else:
                    print("RuntimeError: non device-side assert error:", str(e))
                return None

        return output_file


In [None]:
def generate_speech(prompt, language_code, chunk_id):
    output_file = f"chunk_{chunk_id}.wav"
    language_code = "en"
    audio_file_path = '/content/1.mp3'  # Path to reference audio file if needed
    mic_file_path = None  # Path to microphone recorded audio if needed
    use_microphone = False
    voice_cleanup = True
    disable_lang_auto_detect = True


    speaker_wav = audio_file_path

    # Filtering for microphone input
    lowpassfilter = denoise = trim = loudness = True
    lowpass_highpass = "lowpass=8000,highpass=75," if lowpassfilter else ""
    trim_silence = "areverse,silenceremove=start_periods=1:start_silence=0:start_threshold=0.02,areverse,silenceremove=start_periods=1:start_silence=0:start_threshold=0.02," if trim else ""

    out_filename = speaker_wav + "filter.wav"
    shell_command = f"./ffmpeg -y -i {speaker_wav} -af {lowpass_highpass}{trim_silence} {out_filename}".split(" ")
    subprocess.run(shell_command, capture_output=False, text=True, check=True)
    speaker_wav = out_filename

    return text_to_speech(
        prompt=prompt,
        language=language_code,
        audio_file_pth=speaker_wav,
        mic_file_path=mic_file_path,
        use_mic=use_microphone,
        voice_cleanup=voice_cleanup,
        no_lang_auto_detect=disable_lang_auto_detect,
        agree=True,
        output_file= output_file)

In [None]:
# Function to combine multiple audio files into one
def combine_audio_files(audio_files, output_file):

    with open("file_list.txt", "w") as file_list:
      for audio_file in audio_files:
          file_list.write(f"file '{audio_file}'\n")

    command = ["ffmpeg", "-f", "concat", "-safe", "0", "-i", "file_list.txt", "-c", "copy", output_file]

    try:
        result = subprocess.run(command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
        print(f"Audio files successfully combined into {output_file}")
        print(result.stdout)
        print(result.stderr)
    except subprocess.CalledProcessError as e:
        print(f"Error occurred during combining audio files: {e}")
        print(e.stdout)
        print(e.stderr)

    for audio_file in audio_files:
      os.remove(audio_file)

    # with open(output_file, 'wb') as outfile:
    #     for audio_file in audio_files:
    #         with open(audio_file, 'rb') as infile:
    #             outfile.write(infile.read())
            # Clean up temporary files
            # os.remove(audio_file)

In [None]:
audio_files = ["chunk_1.wav", "chunk_2.wav","chunk_3.wav"]
output_file = "combined_output.wav"
combine_audio_files(audio_files, output_file)

In [None]:
def process_text_to_speech(text, language_code):
    # Split text into chunks of 200 characters
    chunks = split_text(text)

    all_audio_files = []


    if os.path.isfile("combined_output.wav"):
      os.remove("combined_output.wav")






    for idx, chunk in enumerate(chunks):
      temp_file = generate_speech(chunk, language_code, idx + 1)
      if os.path.exists(temp_file):
          all_audio_files.append(temp_file)
      else:
          print(f"Failed to generate audio for chunk {idx + 1}.")

    # Combine all audio files into one
    if all_audio_files:
        combined_audio = "combined_output.wav"
        combine_audio_files(all_audio_files, combined_audio)
        print(f"All audio files combined into {combined_audio}")
    else:
        print("No audio files to combine.")

In [None]:

# Example usage
prompt_text = """Hey everyone, welcome back to the channel! Today, I want to talk about a topic that's been on my mind for a while: motivation versus discipline. We all know that motivation is great when it hits us, but there's a catch: you can’t always count on it. It tends to come at the most random times, like at 3 AM when you’re about to go back to bed but suddenly feel the urge to exercise, or in the middle of a party when you're looking at yourself in the mirror.

So, what’s the answer? Discipline. It took me 18 years to figure this out. I started with motivation and watched countless videos on discipline, from big channels to smaller ones, in both Persian and English. And let me tell you, about 90% of them were pretty much useless. It wasn’t because they didn’t try, but because they tackled the subject all wrong. My goal with this video is to give you real, useful advice on how to integrate discipline into your life—no clickbait, just genuine help."""

process_text_to_speech(prompt_text, language_code="en")

In [None]:
command = [
    "ffmpeg",
    "-y",
    "-i", "combined_output.wav",
    "-filter:a", "atempo=0.87",
    "output_final.wav"
]

# 2. اضافه کردن سکوت بین جملات
command_silence = [
    "ffmpeg",
    "-y",  # بازنویسی خودکار فایل خروجی در صورت وجود
    "-i", "output_final.wav",
    "-af", "aresample=async=1:min_hard_comp=0.100000:first_pts=0,apad=pad_dur=2:pad_len=2",
    "output_with_silence.wav"
]

# اجرای فرمان FFmpeg
try:
    result = subprocess.run(command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
    print("Command executed successfully.")
    result_silence = subprocess.run(command_silence, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)


            # Filtering for microphone input
    denoise = trim = loudness = True
    lowpass_highpass = "lowpass=8000,highpass=75,"
    trim_silence = "areverse,silenceremove=start_periods=1:start_silence=0:start_threshold=0.02,areverse,silenceremove=start_periods=1:start_silence=0:start_threshold=0.02," if trim else ""

    out_filename = "output_with_silence_end.wav"
    speaker_wav = "output_with_silence.wav"
    shell_command = f"./ffmpeg -y -i {speaker_wav} -af {lowpass_highpass}{trim_silence} {out_filename}".split(" ")
    subprocess.run(shell_command, capture_output=False, text=True, check=True)
    speaker_wav = out_filename
    print("Filtered microphone input")


    print(result.stdout)
except subprocess.CalledProcessError as e:
    print("Error occurred:")
    print(e.stderr)