# Configurações


In [2]:
#!pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118
#!pip install --upgrade pip setuptools wheel  
#!pip install numpy

#!pip install ffmpeg-python

#!pip install pydub
#!pip install demucs
#!pip install diffq

#!pip install ipywidgets 
#!pip install soundfile
#!pip install pyannote.audio
#!pip install openai-whisper
#!pip install scikit-learn
#!pip install spacy==3.4.4
#!pip install Cython==0.29.30 ou Cython==0.29.21
#!pip install numpy==1.26.2
#!pip install librosa
#!pip install TTS
#!pip install moviepy

#Working so far
#Versão do TTS: 0.22.0
#Versão do spacy: 3.4.4
#Versão do Cython: 0.29.30
#Versão do numpy: 1.22.0


In [None]:
#Verificar GPU
import torch
print("GPU disponível:", torch.cuda.is_available())

# Célula 1 - Extração e Preparação do Áudio

In [None]:
# Extração de legendas de arquivos .mkv com Python
import subprocess

# Path do arquivo .mkv e nome do episódio
episode_path = ""
episode_name = ""
input_file = f"{episode_path}/{episode_name}.mkv"
output_path = f"{episode_path}/{episode_name}.srt"

# Extração de legendas de arquivos .mkv com Python
subtitle_stream_id = "0:3"  # Stream ID da legenda desejada (português)

# Função para listar as faixas de um arquivo .mkv
def list_mkv_tracks(file_path):
    try:
        command = ["ffmpeg", "-i", file_path]
        process = subprocess.run(command, stderr=subprocess.PIPE, text=True)
        print("Faixas disponíveis no arquivo:")
        print(process.stderr)
    except Exception as e:
        print(f"Erro ao listar faixas: {e}")

# Função para extrair e converter a legenda para .srt
def extract_subtitle_as_srt(file_path, stream_id, output_file):
    try:
        command = [
            "ffmpeg", "-i", file_path, 
            "-map", stream_id, 
            "-c:s", "srt",  # Converte para formato .srt
            output_file
        ]
        subprocess.run(command, check=True)
        print(f"Legenda extraída e convertida para .srt: {output_file}")
    except Exception as e:
        print(f"Erro ao extrair e converter legenda: {e}")

# Chamada das funções
#list_mkv_tracks(input_file)  # Listar faixas
extract_subtitle_as_srt(input_file, subtitle_stream_id, output_path)  # Extrair e converter para .srt


In [None]:
# Célula 1 - Extração e Preparação do Áudio
# Esta célula extrai o áudio de um arquivo de vídeo (episódio de anime) e o converte para um formato padronizado.
# Requer a instalação do FFmpeg para manipulação de mídia.

# Comando para instalar o FFmpeg Python wrapper caso ainda não esteja instalado
# pip install ffmpeg-python

import ffmpeg
import os

def extract_and_convert_audio(video_path, output_dir, audio_format="wav", sample_rate=16000):
    """
    Extrai o áudio de um vídeo, converte para o formato especificado e salva no diretório de saída.

    Args:
        video_path (str): Caminho para o arquivo de vídeo.
        output_dir (str): Caminho para salvar o áudio extraído.
        audio_format (str): Formato do áudio de saída (padrão: "wav").
        sample_rate (int): Taxa de amostragem do áudio (padrão: 16kHz).

    Returns:
        str: Caminho para o arquivo de áudio gerado.
    """
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    audio_output_path = os.path.join(
        output_dir, f"{os.path.splitext(os.path.basename(video_path))[0]}.{audio_format}"
    )

    try:
        # Extrai e converte o áudio
        ffmpeg.input(video_path).output(
            audio_output_path, 
            format=audio_format, 
            ar=sample_rate, 
            ac=1  # Mono
        ).run(overwrite_output=True)
        print(f"Áudio extraído e salvo em: {audio_output_path}")
        return audio_output_path
    except ffmpeg.Error as e:
        print("Erro ao processar o vídeo:", e)
        return None

def verify_audio_properties(audio_path):
    """
    Verifica as propriedades de um arquivo de áudio para garantir que estão conforme o esperado.

    Args:
        audio_path (str): Caminho para o arquivo de áudio.

    Returns:
        dict: Propriedades do arquivo de áudio, como taxa de amostragem, canais e duração.
    """
    try:
        probe = ffmpeg.probe(audio_path)
        audio_stream = next(stream for stream in probe["streams"] if stream["codec_type"] == "audio")
        properties = {
            "sample_rate": int(audio_stream["sample_rate"]),
            "channels": int(audio_stream["channels"]),
            "duration": float(audio_stream["duration"])
        }
        return properties
    except ffmpeg.Error as e:
        print("Erro ao verificar propriedades do áudio:", e)
        return None

# Exemplo de uso:
video_path = f"{episode_path}/{episode_name}.mkv"
output_dir = f"{episode_path}/{episode_name}_audio"

audio_path = extract_and_convert_audio(video_path, output_dir)
print('Propriedades do áudio:', verify_audio_properties(audio_path))

# Célula 2 - Separacão de Fontes (Remoção de Ruídos e Música): demucs

## Célula 2.1 - Hyperparâmetros e Configuração demucs

In [None]:
# Célula 2.1 - Hyperparâmetros e Configuração
import os
import subprocess
import numpy as np
from pydub import AudioSegment
from datetime import datetime

def current_time():
    """Retorna o horário atual no formato HH:MM:SS."""
    return datetime.now().strftime('%H:%M:%S')

def evaluate_output_quality(output_dir):
    """
    Avalia a qualidade dos arquivos de áudio gerados pela separação do Demucs.

    Args:
        output_dir (str): Caminho para o diretório contendo os arquivos separados.

    Returns:
        float: Uma métrica de qualidade (maior é melhor).
    """
    try:
        print(f"[{current_time()}] Iniciando avaliação de qualidade no diretório: {output_dir}")
        # Inicializa a qualidade total
        total_quality = 0
        count = 0

        for root, _, files in os.walk(output_dir):  # Verifica também subdiretórios
            for file in files:
                if file.endswith(".wav"):
                    file_path = os.path.join(root, file)
                    print(f"Processando arquivo: {file_path}")
                    audio = AudioSegment.from_file(file_path)

                    # Métrica baseada na relação sinal-ruído (SNR)
                    samples = np.array(audio.get_array_of_samples())
                    signal_power = np.mean(samples**2)
                    noise_power = np.var(samples)

                    if noise_power > 0:
                        snr = 10 * np.log10(signal_power / noise_power)
                    else:
                        snr = float('inf')

                    print(f"Arquivo: {file}, SNR calculado: {snr}\n")

                    # Incrementa qualidade total
                    total_quality += snr
                    count += 1

        # Retorna a média da qualidade
        if count > 0:
            average_quality = total_quality / count
            print(f"Média de qualidade calculada: {average_quality}")
            return average_quality
        else:
            print("Nenhum arquivo de áudio encontrado para avaliação.")
            return 0

    except Exception as e:
        print(f"Erro ao avaliar a qualidade no diretório {output_dir}: {e}")
        return 0

def optimize_demucs(audio_path, output_dir, models, segments, devices):
    best_config = None
    best_quality = float('-inf')

    print(f"[{current_time()}] Iniciando otimização do Demucs...")
    for model in models:
        print(f"Testando modelo: {model}")
        for segment in segments:
            print(f"  Tamanho de segmento: {segment}s")
            for device in devices:
                print(f"    Dispositivo: {device}")
                output_path = f"{output_dir}/{model}_{segment}_{device}"

                # Garante que o diretório existe
                os.makedirs(output_path, exist_ok=True)

                # Comando para executar o Demucs
                command = [
                    "demucs", audio_path, "--out", output_path,
                    "-n", model, "--device", device,
                    "--segment", str(segment)
                ]
                
                try:
                    print(f"[{current_time()}] Executando comando: {' '.join(command)}")
                    result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
                    print(f"Saída do comando: {result.stdout.decode('utf-8')}")

                    # Avalia a qualidade do resultado
                    quality = evaluate_output_quality(output_path)
                    print(f"Qualidade obtida: {quality}")

                    if quality > best_quality:
                        print(f"*** Nova melhor configuração encontrada! {model}, {segment}, {device} com qualidade {quality} ***")
                        best_quality = quality
                        best_config = (model, segment, device)

                except Exception as e:
                    print(f"Erro ao executar o comando: {e}")

    print(f"Melhor configuração: {best_config} com qualidade {best_quality}")
    return best_config

# Configurações de exemplo
audio_path = f"{episode_path}/{episode_name}_audio"
output_dir = f"{episode_path}data/audio_file"
os.makedirs(output_dir, exist_ok=True)

models = ["mdx_extra_q"]#, "htdemucs"]
segments = [5, 10, 20]
devices = ["cuda"]

best_config = optimize_demucs(audio_path, output_dir, models, segments, devices)
print(f"A melhor configuração foi: {best_config}")


## Célula 2.2 - Separacão de Canais de áudio - Sem Tuning

In [None]:
# Célula 2.2 - Separacão de Fontes (Remoção de Ruídos e Música)
# Esta célula utiliza o Demucs para separar a trilha de diálogo das outras fontes, como música e efeitos sonoros.
# Requer a instalação do Demucs para realizar a separação de fontes.

# Comando para instalar o Demucs caso ainda não esteja instalado
# Para GPUs modernas, instale o PyTorch com suporte a CUDA mais recente:
# pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu181
# pip install demucs

import os
import subprocess

def separate_sources(audio_path, output_dir, demucs_model, segment, use_gpu=True):
    """
    Separa o áudio em diferentes fontes (diálogo, música, efeitos sonoros) usando o Demucs.

    Args:
        audio_path (str): Caminho para o arquivo de áudio a ser processado.
        output_dir (str): Diretório onde os arquivos de saída serão salvos.
        demucs_model (str): Nome do modelo Demucs a ser utilizado.
        use_gpu (bool): Se True, usa GPU para acelerar o processamento (requer CUDA).

    Returns:
        str: Caminho para o arquivo contendo apenas o diálogo, ou None em caso de erro.
    """
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    try:
        # Argumentos para Demucs
        demucs_command = ["demucs", audio_path, "--out", output_dir, "-n", demucs_model, "--segment", str(segment)]
        if use_gpu:
            demucs_command.append("--device=cuda")

        # Executa o Demucs para separar as fontes
        result = subprocess.run(demucs_command, check=True, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8")

        # Log detalhado
        print("Saída do Demucs:")
        print(result.stdout)
        if result.stderr:
            print("Erros do Demucs:")
            print(result.stderr)

        # Caminho do diálogo separado
        vocals_path = os.path.join(output_dir, demucs_model, os.path.splitext(os.path.basename(audio_path))[0], "vocals.wav")
        if os.path.exists(vocals_path):
            print(f"Separação de fontes concluída. Diálogo salvo em: {vocals_path}")
            return vocals_path
        else:
            print("Não foi possível encontrar o arquivo de diálogo separado.")
            return None

    except subprocess.CalledProcessError as e:
        print(f"Erro ao executar o Demucs: {e}")
        print("Saída padrão (stdout):", e.stdout)
        print("Erro padrão (stderr):", e.stderr)
        return None
    except UnicodeDecodeError as e:
        print(f"Erro de decodificação: {e}")
        return None

# Exemplo de uso:
demucs_model = "mdx_extra_q"  # htdemucs ou mdx_extra_q
audio_path = f"{episode_path}/{episode_name}_audio.wav"
output_dir = f"{episode_path}/{episode_name}/audio_file"

dialogue_path = separate_sources(audio_path, output_dir, demucs_model, segment=10)


## Célula 2.3- Verificar Energia dos Segmentos de Áudio

In [None]:
# Célula 2.3 - Verificar Energia dos Segmentos de Áudio
# Esta célula analisa os arquivos segmentados pelo Demucs para identificar trechos com silêncio ou baixa energia.

import os
import numpy as np
import wave

def calculate_audio_energy(audio_path):
    """
    Calcula a energia de um arquivo de áudio.

    Args:
        audio_path (str): Caminho para o arquivo de áudio.

    Returns:
        float: Energia média do áudio.
    """
    try:
        with wave.open(audio_path, 'r') as wav_file:
            n_frames = wav_file.getnframes()
            audio_data = np.frombuffer(wav_file.readframes(n_frames), dtype=np.int16)
            energy = np.mean(audio_data**2)
        return energy
    except Exception as e:
        print(f"Erro ao calcular energia do áudio {audio_path}: {e}")
        return 0


def diagnose_audio_segments(audio_dir, energy_threshold=1e4):
    """
    Diagnostica os arquivos de áudio segmentados pelo Demucs, verificando energia.

    Args:
        audio_dir (str): Diretório contendo os arquivos segmentados.
        energy_threshold (float): Limite mínimo de energia para considerar um segmento válido.

    Returns:
        dict: Resultados do diagnóstico com energia de cada arquivo.
    """
    results = {}
    for audio_file in os.listdir(audio_dir):
        if audio_file.endswith(".wav"):
            audio_path = os.path.join(audio_dir, audio_file)
            energy = calculate_audio_energy(audio_path)
            results[audio_file] = energy

            if energy < energy_threshold:
                print(f"Segmento {audio_file} possui baixa energia ({energy:.2f}). Pode conter silêncio.")
            else:
                print(f"Segmento {audio_file} possui energia válida ({energy:.2f}).")

    return results

# Exemplo de uso:
audio_dir= f"{episode_path}/{episode_name}/audio_file"
print("audio_dir:", audio_dir)
segment_energy_results = diagnose_audio_segments(
    audio_dir,
    energy_threshold=1e4
)

print("Diagnóstico de energia concluído.")


# Célula 3 - Diarização e Identificação de Locutores: pyannote

## Célula 3.1 - Diarização e Identificação de Locutores

In [None]:
# Normalização de energia
# from pydub import AudioSegment
import numpy as np

def normalize_energy(audio_path, output_path):
    audio = AudioSegment.from_wav(audio_path)
    samples = np.array(audio.get_array_of_samples())
    normalized_samples = samples / np.max(np.abs(samples))  # Normalização
    normalized_audio = audio._spawn(normalized_samples.astype(np.int16).tobytes())
    normalized_audio.export(output_path, format="wav")
    print(f"Áudio normalizado salvo em: {output_path}")

normalize_energy(audio_path, "data/audio_file/normalized_vocals.wav")


In [None]:
# Célula 3.1 - Diarização e Identificação de Locutores
# Esta célula utiliza o pyannote-audio para identificar diferentes locutores em um arquivo de áudio.
# Requer a instalação do pyannote-audio e modelos correspondentes.

# Comando para instalação:
# pip install pyannote.audio
# pip install numpy==1.26.2

from pyannote.audio import Pipeline
import torch
from pyannote.audio.pipelines.utils.hook import ProgressHook
import torchaudio

def diarize_speakers(audio_path, model_name="pyannote/speaker-diarization-3.1", hf_token=None): 
    """
    Realiza diarização para identificar diferentes locutores em um áudio, utilizando GPU, memória pré-carregada e monitorando progresso.

    Args:
        audio_path (str): Caminho para o arquivo de áudio.
        model_name (str): Nome do modelo do pyannote para diarização.
        hf_token (str): Token de autenticação do Hugging Face.

    Returns:
        list: Lista de segmentos com identificação de locutores e intervalos de tempo.
    """
    try:
        if not hf_token:
            raise ValueError("Token de autenticação do Hugging Face é necessário.")

        # Carrega o pipeline com autenticação
        pipeline = Pipeline.from_pretrained(model_name, use_auth_token=hf_token)

        # Configura para usar GPU
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        print(f"Usando dispositivo: {device}")
        pipeline.to(device)
        
        # Pré-carrega o áudio em memória
        print("Carregando o áudio...")
        waveform, sample_rate = torchaudio.load(audio_path)
        print(f"Shape do waveform: {waveform.shape}, Taxa de amostragem: {sample_rate}")

        # Reamostrar se necessário
        if sample_rate != 16000:
            print(f"Reamostrando o áudio para 16 kHz...")
            waveform = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=16000)(waveform)
            sample_rate = 16000

        # Passa o áudio ao pipeline
        audio_input = {"waveform": waveform, "sample_rate": sample_rate}

        # Monitora o progresso do pipeline
        with ProgressHook() as hook:
            diarization = pipeline(audio_input, hook=hook)
        
        segments = []
        for turn, _, speaker in diarization.itertracks(yield_label=True):
            segments.append({
                "start": turn.start,
                "end": turn.end,
                "speaker": speaker
            })
            print(f"Locutor {speaker}: {turn.start:.2f}s - {turn.end:.2f}s")

        # Salva a saída em formato RTTM
        output_rttm = audio_path.replace(".wav", ".rttm")
        with open(output_rttm, "w") as rttm:
            diarization.write_rttm(rttm)
        print(f"Arquivo RTTM salvo em: {output_rttm}")

        return segments
    except Exception as e:
        print(f"Erro ao realizar diarização: {e}")
        return None

# Exemplo de uso:
hf_token = ""
audio_path = ""
segments = diarize_speakers(audio_path, hf_token=hf_token)


## Célula 3.2 - Diarização Tunned


In [None]:
# Célula 3.2 - Diarização e Identificação de Locutores
# Esta célula utiliza o pyannote-audio para identificar diferentes locutores em um arquivo de áudio.
# Requer a instalação do pyannote-audio e modelos correspondentes.

# Comando para instalação:
# pip install pyannote.audio
# pip install numpy==1.26.2

import os
import torch
import numpy as np
from datetime import datetime
from pyannote.audio import Pipeline
from pyannote.metrics.diarization import DiarizationErrorRate
from pyannote.pipeline import Optimizer
from pyannote.core import Annotation, Segment
from huggingface_hub import login

# Ignorar Warnings
import warnings
warnings.filterwarnings("ignore")

# ---- Função para Obter Horário Atual ----
def current_time():
    """Retorna o horário atual no formato HH:MM:SS."""
    return datetime.now().strftime('%H:%M:%S')

# Configura para usar GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"[{current_time()}] Usando dispositivo: {device}")

# ---- Função para Carregar RTTM ----
def load_rttm(rttm_path):
    annotation = Annotation()
    with open(rttm_path, "r") as file:
        for line in file:
            parts = line.strip().split()
            start, duration, speaker = float(parts[3]), float(parts[4]), parts[7]
            end = start + duration
            annotation[Segment(start, end)] = speaker
    return annotation

# ---- Configuração do Pipeline ----
# Realizar login na Hugging Face
token = ""
login(token=token)

# Caminhos dos arquivos de entrada e saída
audio_file = ""
annotation_file = "" # .rttm
optimized_rttm = ""

# Carregar pipeline pré-treinado
pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1", use_auth_token=True)
pipeline.to(device)

# ---- Avaliação Inicial da Diarização ----
print(f"\n[{current_time()}] [INFO] Avaliando DER inicial...")
metric = DiarizationErrorRate()

# Carregar ground_truth
ground_truth = load_rttm(annotation_file)

# Aplicar o pipeline
initial_result = pipeline({"uri": audio_file, "audio": audio_file})

initial_der = metric(ground_truth, initial_result)
print(f"[{current_time()}] DER Inicial: {100 * abs(initial_der):.2f}%")

# Exibir valores iniciais de clustering e segmentation do pipeline
initial_clustering = pipeline.parameters(instantiated=True).get('clustering', {})
initial_segmentation = pipeline.parameters(instantiated=True).get('segmentation', {})
print(f"[{current_time()}] Parâmetros Iniciais - Clustering: {initial_clustering}, Segmentation: {initial_segmentation}")

# ---- Otimização dos Hiperparâmetros ----
print(f"\n[{current_time()}] [INFO] Otimizando Hiperparâmetros de Segmentação...")
optimizer = Optimizer(pipeline)

# Atualizar dataset com áudio e ground truth
dataset = [{"uri": audio_file, "audio": audio_file, "annotation": ground_truth}]

# Configuração dos limites
n_iter = 10  # Número máximo de iterações
patience = 3  # Número de iterações sem melhora

# Extrair valores específicos de clustering e segmentation
initial_clustering_threshold = initial_clustering.get('threshold', 0.8)  # Valor padrão caso não exista
initial_segmentation_min_duration_off = initial_segmentation.get('min_duration_off', 0.01)

# Atribuir valores específicos ao warm_start_params
warm_start_params = {
    "segmentation": {"min_duration_off": initial_segmentation_min_duration_off},
    "clustering": {"threshold": initial_clustering_threshold}
}

# ---- Otimização de segmentation.min_duration_off ----
best_der = float("inf")
no_improvement_count = 0
best_segmentation_threshold = None

print(f"[{current_time()}] Parâmetros de WarmUp: {warm_start_params}")

iterations_seg = optimizer.tune_iter(dataset, warm_start=warm_start_params, show_progress=True)
for i, iteration in enumerate(iterations_seg):
    current_threshold = iteration['params'].get('segmentation', {}).get('min_duration_off')
    if current_threshold is not None:
        pipeline.instantiate({"segmentation": {"min_duration_off": current_threshold}})
        result = pipeline({"uri": audio_file, "audio": audio_file})
        current_der = metric(ground_truth, result)
        
        print(f"[{current_time()}] Iteração {i}: DER = {100 * abs(current_der):.2f}%, min_duration_off = {current_threshold}")

        if current_der < best_der:
            best_der = current_der
            best_segmentation_threshold = current_threshold
            no_improvement_count = 0
        else:
            no_improvement_count += 1

        if no_improvement_count >= patience:
            print(f"[{current_time()}] Nenhuma melhora em {patience} iterações. Parando otimização de segmentação.")
            break

    if i >= n_iter:
        print(f"[{current_time()}] Número máximo de {n_iter} iterações atingido.")
        break

# Atualizar pipeline com segmentation.min_duration_off otimizado
pipeline.instantiate({"segmentation": {"min_duration_off": best_segmentation_threshold}})

# ---- Otimização de clustering.threshold ----
best_clustering_threshold = None
no_improvement_count = 0

print(f"\n[{current_time()}] [INFO] Otimizando Hiperparâmetros de Clusterização...")
iterations_clust = optimizer.tune_iter(dataset, warm_start=warm_start_params, show_progress=True)
for i, iteration in enumerate(iterations_clust):
    current_threshold = iteration['params'].get('clustering', {}).get('threshold')
    if current_threshold is not None:
        pipeline.instantiate({"clustering": {"threshold": current_threshold}})
        result = pipeline({"uri": audio_file, "audio": audio_file})
        current_der = metric(ground_truth, result)
        
        print(f"[{current_time()}] Iteração {i}: DER = {100 * abs(current_der):.2f}%, threshold = {current_threshold}")

        if current_der < best_der:
            best_der = current_der
            best_clustering_threshold = current_threshold
            no_improvement_count = 0
        else:
            no_improvement_count += 1

        if no_improvement_count >= patience:
            print(f"[{current_time()}] Nenhuma melhora em {patience} iterações. Parando otimização de clusterização.")
            break

    if i >= n_iter:
        print(f"[{current_time()}] Número máximo de {n_iter} iterações atingido.")
        break

# Atualizar pipeline com clustering.threshold otimizado
pipeline.instantiate({
    "segmentation": {"min_duration_off": best_segmentation_threshold},
    "clustering": {"threshold": best_clustering_threshold}
})

# ---- Avaliação Final da Diarização ----
print(f"\n[{current_time()}] [INFO] Avaliando DER final após otimização...")
optimized_result = pipeline({"uri": audio_file, "audio": audio_file})
final_der = metric(ground_truth, optimized_result)
print(f"[{current_time()}] DER Final: {100 * abs(final_der):.2f}%")

# ---- Comparação e Salvar Resultados ----
print(f"\n[{current_time()}] [RESULTADOS] Comparação de DER:")
print(f"[{current_time()}] DER Inicial: {100 * abs(initial_der):.2f}%")
print(f"[{current_time()}] DER Final após Otimização: {100 * abs(final_der):.2f}%")

# Salvar o resultado otimizado em RTTM
with open(optimized_rttm, "w") as f:
    for turn, _, speaker in optimized_result.itertracks(yield_label=True):
        f.write(f"SPEAKER {audio_file} 1 {turn.start:.3f} {turn.duration:.3f} <NA> <NA> {speaker} <NA>\n")
print(f"[{current_time()}] RTTM otimizado salvo em: {optimized_rttm}")


In [None]:
# Diferenças entre os arquivos RTTM
def compare_rttm_files(original_rttm, new_rttm):
    """
    Compara dois arquivos RTTM e exibe as diferenças de rótulos de locutores.

    Args:
        original_rttm (str): Caminho para o RTTM original.
        new_rttm (str): Caminho para o RTTM reorganizado.
    """
    with open(original_rttm, "r") as orig, open(new_rttm, "r") as new:
        original_lines = [line.strip().split() for line in orig.readlines()]
        new_lines = [line.strip().split() for line in new.readlines()]

    print("Comparação de Locutores (Original vs Novo):\n")
    for i, (orig_line, new_line) in enumerate(zip(original_lines, new_lines)):
        orig_speaker = orig_line[7]
        new_speaker = new_line[7]
        if orig_speaker != new_speaker:
            print(f"Linha {i+1}: {orig_speaker} -> {new_speaker}")
    print("\nComparação concluída.")

# Caminhos dos arquivos RTTM
original_rttm = ""
new_rttm = ""

# Comparar os arquivos
compare_rttm_files(original_rttm, new_rttm)


## Céula 3.3 - Diarização sob Legendas

In [None]:
import os
import json
import torch
from datetime import datetime
from pyannote.audio import Pipeline

import warnings
warnings.filterwarnings("ignore")

#configurar print_now
def print_now(message):
    print(f"[{datetime.now().strftime('%H:%M:%S')}] {message}")

# Configurações do episódio
# Caminhos dos arquivos
base_path = os.path.join(episode_path, episode_name, "audio_file", "mdx_extra_q", episode_name)
# Lengenda Original
subtitle_path = os.path.join(episode_path, episode_name, "audio_file",  f"{episode_name}.srt")
# Áudio de Vocais
vocals_path = os.path.join(base_path, "vocals.wav")
# Saída da diarização em RTTM
rttm_output_path = os.path.join(base_path, "diarization_output.rttm")

# Pipeline de diarização
pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1", use_auth_token=hf_token)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
pipeline.to(device)

# Executa a diarização no áudio de vocais
print_now("Iniciando a diarização...")
diarization = pipeline(vocals_path)

# Salvar saída em formato RTTM
with open(rttm_output_path, "w") as rttm_file:
    diarization.write_rttm(rttm_file)
print_now(f"Diarização salva em: {rttm_output_path}")

# Carregar legendas traduzidas
#subtitle_path = os.path.join(base_path, "legendado.srt")
with open(subtitle_path, "r", encoding="utf-8") as srt_file:
    subtitles = srt_file.read()

# Processamento das legendas para JSON
import re
subtitle_pattern = re.compile(r"(\d+)\n(\d{2}:\d{2}:\d{2},\d{3}) --> (\d{2}:\d{2}:\d{2},\d{3})\n(.+?)\n\n", re.DOTALL)
matches = subtitle_pattern.findall(subtitles)

subtitle_data = []
for match in matches:
    subtitle_data.append({
        "id": int(match[0]),
        "start_time": match[1],
        "end_time": match[2],
        "text": re.sub(r"<.*?>", "", match[3]).strip()
    })

subtitle_json_path = os.path.join(base_path, "subtitles.json")
with open(subtitle_json_path, "w", encoding="utf-8") as json_file:
    json.dump(subtitle_data, json_file, indent=4, ensure_ascii=False)
print_now(f"Subtítulos processados e salvos em: {subtitle_json_path}")

# Alinhamento automático entre RTTM e legendas
from pyannote.core import Segment
from datetime import timedelta

def time_to_seconds(timestamp):
    h, m, s = map(float, timestamp.replace(",", ".").split(":"))
    return h * 3600 + m * 60 + s

aligned_data = []
for turn, _, speaker in diarization.itertracks(yield_label=True):
    turn_segment = turn
    for subtitle in subtitle_data:
        start_time = time_to_seconds(subtitle["start_time"])
        end_time = time_to_seconds(subtitle["end_time"])
        subtitle_segment = Segment(start=start_time, end=end_time)
        if turn_segment.intersects(subtitle_segment):
            aligned_data.append({
                "speaker": speaker,
                "start": turn_segment.start,
                "end": turn_segment.end,
                "transcription": subtitle["text"],
                "duration": turn_segment.end - turn_segment.start
            })

# Salvar alinhamento em JSON
aligned_json_path = os.path.join(base_path, "aligned_transcriptions.json")
with open(aligned_json_path, "w", encoding="utf-8") as json_file:
    json.dump(aligned_data, json_file, indent=4, ensure_ascii=False)

print_now(f"Alinhamento concluído e salvo em: {aligned_json_path}")


## Celula 3.4 - Alinhamento de Legendas e Locutores(seguimentos originais)


In [None]:
# Função para validar tempos e duplicatas e salvar JSON final
def validate_and_save_aligned_transcriptions(json_path, output_json_path):
    with open(json_path, 'r', encoding='utf-8') as json_file:
        data = json.load(json_file)

    seen = set()
    valid_segments = []
    duplicate_segments = []

    for segment in data:
        key = (segment['start'], segment['end'])
        if key in seen:
            duplicate_segments.append(segment)
        else:
            seen.add(key)
            valid_segments.append(segment)

    # Salvar os segmentos válidos em um novo JSON
    with open(output_json_path, 'w', encoding='utf-8') as output_file:
        json.dump(valid_segments, output_file, ensure_ascii=False, indent=4)

    return valid_segments, duplicate_segments

# Caminho do novo JSON final
aligned_json_final_path = os.path.join(base_path, "aligned_transcriptions_final.json")

# Validar e salvar o JSON final
valid_segments, duplicate_segments = validate_and_save_aligned_transcriptions(
    aligned_json_path, aligned_json_final_path
)

# Exibir resumo
print(f"Segmentos válidos: {len(valid_segments)}")
print(f"Segmentos duplicados: {len(duplicate_segments)}")
print(f"Arquivo final salvo em: {aligned_json_final_path}")

# Exibir duplicatas para análise manual (opcional)
if duplicate_segments:
    print("\nSegmentos duplicados encontrados:")
    for seg in duplicate_segments:
        print(seg)


In [None]:
# Celula 3.4 - Alinhamento de Legendas e Locutores(seguimentos originais)
import os
import re
import json
import subprocess
from datetime import datetime

# Função para limpar formatações HTML e extrair apenas texto
def clean_text(text):
    return re.sub(r"<[^>]*>", "", text).strip()

#configurar print_now
def print_now(message):
    print(f"[{datetime.now().strftime('%H:%M:%S')}] {message}")

# Função para converter tempo de "hh:mm:ss,ms" para segundos com formato .3f
def convert_time_to_seconds(time_str):
    time_parts = re.split(r'[:,]', time_str)
    if len(time_parts) != 4:
        raise ValueError(f"Formato de tempo inesperado: {time_str}")
    hours, minutes, seconds, milliseconds = map(float, time_parts)
    return round(hours * 3600 + minutes * 60 + seconds + milliseconds / 1000, 3)

# Função para processar legendas em formato SRT
def process_srt_to_json(srt_file_path):
    with open(srt_file_path, 'r', encoding='utf-8') as srt_file:
        lines = srt_file.readlines()

    subtitles = []
    subtitle = {}

    for line in lines:
        line = line.strip()
        if re.match(r"^\d+$", line):
            if subtitle:
                subtitles.append(subtitle)
                subtitle = {}
            subtitle['id'] = int(line)
        elif re.match(r"^\d{2}:\d{2}:\d{2},\d{3} --> \d{2}:\d{2}:\d{2},\d{3}$", line):
            start, end = line.split(' --> ')
            subtitle['start_time'] = start
            subtitle['end_time'] = end
        elif line:
            subtitle['text'] = clean_text(line) if 'text' not in subtitle else f"{subtitle['text']} {clean_text(line)}"

    if subtitle:
        subtitles.append(subtitle)

    return subtitles

# Função para verificar sobreposição excessiva entre locutores e legendas
def check_overlap(diarization_segments, subtitles):
    validated_segments = []
    for subtitle in subtitles:
        subtitle_start = convert_time_to_seconds(subtitle['start_time'])
        subtitle_end = convert_time_to_seconds(subtitle['end_time'])
        overlap_found = False

        for segment in diarization_segments:
            segment_start = segment['start']
            segment_end = segment['end']

            if not (segment_end < subtitle_start or segment_start > subtitle_end):
                overlap_found = True
                validated_segments.append({
                    "speaker": segment['speaker'],
                    "start": max(segment_start, subtitle_start),
                    "end": min(segment_end, subtitle_end),
                    "text": subtitle['text']
                })

        if not overlap_found:
            print(f"Legenda sem correspondência: {subtitle}")

    return validated_segments

# Função para combinar fragmentos e ajustar os tempos
def adjust_fragments(validated_segments):
    combined_segments = []
    previous_segment = None

    for segment in validated_segments:
        if previous_segment and previous_segment['speaker'] == segment['speaker'] and \
                abs(segment['start'] - previous_segment['end']) < 0.3:  # Ajuste para combinar fragmentos próximos
            previous_segment['end'] = segment['end']
            previous_segment['text'] += f" {segment['text']}"
        else:
            if previous_segment:
                combined_segments.append(previous_segment)
            previous_segment = segment

    if previous_segment:
        combined_segments.append(previous_segment)

    return combined_segments

# Função para segmentar áudio usando FFmpeg e atualizar o JSON
def segment_audio(audio_file_path, aligned_transcriptions, output_dir, resume=False):
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    updated_segments = []

    for idx, segment in enumerate(aligned_transcriptions, 1):
        output_path = os.path.join(output_dir, f"{segment['speaker']}_{segment['start']:.3f}_{segment['end']:.3f}.wav")

        # Verifica se o arquivo já foi criado e pula, se necessário
        if resume and os.path.exists(output_path):
            print_now(f"[Ignorado] Segmento já existe: {output_path}")
            segment['audio_path'] = output_path
            segment['duration'] = round(segment['end'] - segment['start'], 3)
            updated_segments.append(segment)
            continue

        ffmpeg_command = [
            "ffmpeg",
            "-i", audio_file_path,
            "-ss", str(segment['start']),
            "-to", str(segment['end']),
            "-c", "copy",
            output_path
        ]
        try:
            print_now(f"[Iniciando] Segmento {idx}/{len(aligned_transcriptions)}")
            print(f"[Comando FFmpeg]: {' '.join(ffmpeg_command)}")
            subprocess.run(ffmpeg_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True)
            print_now(f"[Concluído] Segmento criado: {output_path}")
            segment['audio_path'] = output_path
            segment['duration'] = round(segment['end'] - segment['start'], 3)
            updated_segments.append(segment)
        except subprocess.CalledProcessError as e:
            print_now(f"[Erro FFmpeg] Segmento {idx} falhou com erro: {e}")
        except Exception as e:
            print_now(f"[Erro] Falha inesperada no segmento {idx}: {e}")

    return updated_segments

# Caminhos dos arquivos
episode_path = "E:/Animes_Raw/[Erai-raws] 3-gatsu no Lion - 01 ~ 22 [1080p][Multiple Subtitle]"
episode_name = "[Erai-raws] 3-gatsu no Lion - 01 [1080p][Multiple Subtitle][C4EC59D5]"
base_path = os.path.join(episode_path, episode_name, "audio_file", "mdx_extra_q", episode_name)

# Transcrições alinhadas filtradas
aligned_json_filtered_path = os.path.join(base_path, "aligned_transcriptions_final.json")


# Lengenda Original
srt_file_path = os.path.join(episode_path, episode_name, "audio_file", f"{episode_name}.srt")
# Diairização
diary_file_path = os.path.join(base_path, "diarization_output.rttm")
# Transcrições alinhadas
aligned_json_path = os.path.join(base_path, "aligned_transcriptions_ready.json")
# Áudio original de vocais
input_audio_path = os.path.join(base_path, "vocals.wav")
# Diretório para salvar segmentos de áudio
output_audio_dir = os.path.join(episode_path, episode_name, "audio_file", "original_segments")
print_now(f"\nSegmentos originais salvos em: {output_audio_dir}")

# Carregar legendas
subtitles = process_srt_to_json(srt_file_path)

# Carregar segmentos de diarização
with open(diary_file_path, 'r', encoding='utf-8') as diary_file:
    diarization_segments = [
        {
            "speaker": line.split()[7],
            "start": round(float(line.split()[3]), 3),
            "end": round(float(line.split()[3]) + float(line.split()[4]), 3)
        }
        for line in diary_file.readlines() if line.strip()
    ]

# Validar alinhamento
validated_segments = check_overlap(diarization_segments, subtitles)

# Ajustar fragmentos
final_segments = adjust_fragments(validated_segments)

# Segmentar áudio e atualizar os segmentos com caminhos de áudio
updated_segments = segment_audio(input_audio_path, final_segments, output_audio_dir, resume=True)

# Salvar resultados em JSON
with open(aligned_json_path, 'w', encoding='utf-8') as aligned_json:
    json.dump(updated_segments, aligned_json, ensure_ascii=False, indent=4)

print_now(f"\nAlinhamento concluído. Resultados salvos em {aligned_json_path}")


# Celula 4 - Extração de Transcrições e Traduções: whisper

## Celula 4.1 - Extração de Transcrições e Traduções

In [None]:
#  Extração de Transcrições e Traduções
import os
import json
import torch
import librosa
from tqdm import tqdm
from datetime import datetime
from transformers import WhisperProcessor, WhisperForConditionalGeneration
import ffmpeg

import warnings
warnings.filterwarnings("ignore")

#configurar print_now
def print_now(message):
    print(f"[{datetime.now().strftime('%H:%M:%S')}] {message}")

# Carregar o modelo Whisper do Hugging Face

def load_whisper_model_hf(model_name="openai/whisper-large-v3", device=None):
    """
    Carrega o modelo Whisper do Hugging Face com o processador e tokenizer associados.

    Args:
        model_name (str): Nome do modelo do Hugging Face.
        device (str): Dispositivo onde o modelo será carregado ("cpu" ou "cuda").

    Returns:
        tuple: Processador, modelo e dispositivo configurados.
    """
    try:
        if not device:
            device = "cuda" if torch.cuda.is_available() else "cpu"
        print_now(f"Carregando o modelo Whisper ({model_name}) no dispositivo: {device}...")

        processor = WhisperProcessor.from_pretrained(model_name)
        model = WhisperForConditionalGeneration.from_pretrained(model_name).to(device)

        print_now("Modelo carregado com sucesso!")
        return processor, model, device
    except Exception as e:
        print_now(f"Erro ao carregar o modelo Whisper: {e}")
        return None, None, None

# Função para analisar arquivo RTTM

def parse_rttm_file(rttm_path):
    """
    Analisa um arquivo RTTM e retorna os segmentos em um formato estruturado.

    Args:
        rttm_path (str): Caminho para o arquivo RTTM.

    Returns:
        list: Lista de segmentos com informações de início, fim e locutor.
    """
    segments = []
    try:
        with open(rttm_path, "r") as file:
            for line in file:
                parts = line.strip().split()
                if len(parts) < 8:
                    continue

                start_time = float(parts[3])
                duration = float(parts[4])
                end_time = start_time + duration
                speaker = parts[7]

                segments.append({
                    "start": start_time,
                    "end": end_time,
                    "speaker": speaker
                })

        print_now(f"Arquivo RTTM processado. {len(segments)} segmentos carregados.")
        return segments
    except Exception as e:
        print_now(f"Erro ao analisar o arquivo RTTM: {e}")
        return []

# Transcrever e traduzir segmentos com Hugging Face

def transcribe_and_translate_hf(processor, model, device, audio_path, segments, target_language="pt", save_temp=False, temp_dir="data/original_segments", beam_size=5):
    """
    Realiza transcrição e tradução utilizando o modelo Whisper do Hugging Face.

    Args:
        processor (WhisperProcessor): Processador do modelo Whisper.
        model (WhisperForConditionalGeneration): Modelo Whisper carregado.
        device (str): Dispositivo de execução ("cpu" ou "cuda").
        audio_path (str): Caminho para o arquivo de áudio.
        segments (list): Lista de segmentos com intervalos de tempo e IDs de locutores.
        target_language (str): Idioma de destino para a tradução.
        save_temp (bool): Se True, salva os arquivos de segmento temporários.
        temp_dir (str): Diretório onde os segmentos temporários serão salvos.
        beam_size (int): Tamanho do beam search para maior precisão.

    Returns:
        list: Lista de transcrições e traduções para cada segmento.
    """
    results = []
    errored_segments = []

    if not os.path.exists(temp_dir):
        os.makedirs(temp_dir)

    for segment in tqdm(segments, desc="Transcrevendo segmentos"):
        try:
            start = float(segment["start"])
            end = float(segment["end"])
            duration = end - start

            if end <= start:
                raise ValueError(f"Segmento inválido: end ({end}) <= start ({start})")

            speaker = segment["speaker"]
            temp_audio_path = os.path.join(temp_dir, f"{speaker}_{start:.3f}-{duration:.3f}.wav")

            # Extração de segmento com FFmpeg
            ffmpeg_command = (
                f"ffmpeg -y -i \"{audio_path}\" -ss {start} -to {end} -ar 16000 -ac 1 \"{temp_audio_path}\""
            )
            ffmpeg_result = os.system(ffmpeg_command)
            if ffmpeg_result != 0:
                raise RuntimeError(f"FFmpeg falhou ao processar o segmento {start}-{end}.")

            # Preparar entrada para o modelo
            audio_input, _ = librosa.load(temp_audio_path, sr=16000)
            input_features = processor(audio_input, return_tensors="pt").input_features.to(device)

            # Gerar transcrição e tradução
            generate_kwargs = {
                "language": target_language,  # Define o idioma-alvo diretamente
                "num_beams": beam_size,
            }

            attention_mask = torch.ones(input_features.shape, dtype=torch.long, device=device)
            predicted_ids = model.generate(input_features, attention_mask=attention_mask, **generate_kwargs)

            transcription = processor.batch_decode(predicted_ids, skip_special_tokens=True)[0]
            print_now(f"Transcrição para o segmento {segment}: {transcription}")

            # Adicionar resultados
            results.append({
                "speaker": speaker,
                "start": start,
                "end": end,
                "transcription": transcription
            })

            # Remover arquivo temporário, se configurado
            if not save_temp:
                os.remove(temp_audio_path)

        except Exception as e:
            print_now(f"Erro ao processar o segmento {segment}: {e}")
            errored_segments.append(segment)

    # Salvar resultados e segmentos com erro
    output_path = os.path.join(temp_dir, "transcriptions_results_hf.json")
    with open(output_path, "w", encoding="utf-8") as f:
        json.dump(results, f, ensure_ascii=False, indent=4)
    print_now(f"Resultados salvos em: {output_path}")

    if errored_segments:
        error_log_path = os.path.join(temp_dir, "errored_segments_hf.json")
        with open(error_log_path, "w", encoding="utf-8") as f:
            json.dump(errored_segments, f, ensure_ascii=False, indent=4)
        print_now(f"Segmentos com erro salvos em: {error_log_path}")

    return results

# Exemplo de uso
model_name = "openai/whisper-large-v3"
audio_path = ""
temp_dir = ""
rttm_path = ""

# Carregar modelo
processor, model, device = load_whisper_model_hf(model_name)

# Carregar segmentos do RTTM
segments = parse_rttm_file(rttm_path)

# Executar transcrição e tradução
transcriptions = transcribe_and_translate_hf(processor, model, device, audio_path, segments, target_language="pt", save_temp=True, temp_dir=temp_dir, beam_size=5)
print_now("Transcrições e traduções completas!")


## Célula 4.2 - Salvar Transcrições e Traduções Formato Global(json e csv)

In [None]:
# Célula 4.2 - Filtragem de Transcrições Vazias e salvamento dos resultados
# Esta célula salva os resultados em JSON e CSV com os campos adicionais 'duration' e 'audio_data'.

import json
import csv
import os

def filter_empty_transcriptions(results):
    """
    Filtra os resultados para remover transcrições vazias.

    Args:
        results (list): Lista de dicionários com transcrições e traduções.

    Returns:
        list: Lista filtrada sem transcrições vazias.
    """
    filtered_results = [r for r in results if r.get("transcription", "").strip()]
    print(f"Resultados filtrados: {len(filtered_results)} de {len(results)} segmentos válidos.")
    return filtered_results

def save_results_with_duration_and_audio(results, json_path="results.json", csv_path="results.csv", audio_dir="data/audio_file/original_segments"):
    """
    Salva os resultados de transcrição em arquivos JSON e CSV com duração e caminho real dos arquivos de áudio.

    Args:
        results (list): Lista de dicionários com transcrições e traduções.
        json_path (str): Caminho para o arquivo JSON.
        csv_path (str): Caminho para o arquivo CSV.
        audio_dir (str): Diretório onde os arquivos de áudio estão armazenados.
    """
    for r in results:
        # Calcula a duração e define o caminho real do arquivo de áudio
        r["duration"] = round(r["end"] - r["start"], 4)
        r["audio_data"] = os.path.join(audio_dir, f"{r['speaker']}_{r['start']:.3f}-{r['duration']:.3f}.wav").replace("\\", "/")

    # Salvar em JSON
    with open(json_path, "w", encoding="utf-8") as json_file:
        json.dump(results, json_file, ensure_ascii=False, indent=4)
    print(f"Resultados salvos em JSON: {json_path}")

    # Salvar em CSV
    with open(csv_path, "w", newline="", encoding="utf-8") as csv_file:
        writer = csv.DictWriter(csv_file, fieldnames=["speaker", "start", "end", "duration", "transcription", "audio_data"])
        writer.writeheader()
        for r in results:
            writer.writerow(r)
    print(f"Resultados salvos em CSV: {csv_path}")

# Caminhos de saída
json_path = ""
csv_path = ""
audio_dir = ""

# Carregar os resultados do arquivo JSON
transcriptions_result = ""
with open(transcriptions_result, "r") as file:
    transcriptions = json.load(file)

# Filtragem e salvamento dos resultados
results = filter_empty_transcriptions(transcriptions)
save_results_with_duration_and_audio(results, json_path=json_path, csv_path=csv_path, audio_dir=audio_dir)


In [None]:
# Verificação de Arquivos de Áudio
# import os
import json

def verify_audio_files(json_path, audio_dir):
    """
    Verifica se todos os arquivos de áudio listados no JSON estão presentes no diretório especificado.

    Args:
        json_path (str): Caminho para o arquivo JSON com os dados das transcrições.
        audio_dir (str): Diretório onde os arquivos de áudio devem estar.

    Returns:
        None
    """
    # Carregar transcrições do JSON
    with open(json_path, "r", encoding="utf-8") as f:
        transcriptions = json.load(f)
    
    # Verificar a existência dos arquivos
    missing_files = []
    for transcription in transcriptions:
        audio_path = transcription.get("audio_path", "")
        full_audio_path = os.path.join(audio_dir, os.path.basename(audio_path))
        if not os.path.exists(full_audio_path):
            missing_files.append(full_audio_path)

    # Resultado
    if missing_files:
        print(f"Arquivos ausentes ({len(missing_files)}):")
        for missing in missing_files:
            print(missing)
    else:
        print("Todos os arquivos de áudio estão presentes.")

# Exemplo de uso
json_path = ""
audio_dir = ""
verify_audio_files(json_path, audio_dir)


# Célula 5 - Geração de Áudio Personalizados por Locutor: TTS

## Célula 5.1 - Compilação de Trilhas Originais por Locutor Concatenadas


In [None]:
# Célula 5.1 - Compilação de Trilhas Originais por Locutor
import os
from pydub import AudioSegment
import json
import glob

def compile_audio_by_speaker(segments, input_dir, output_dir):
    """
    Compila trilhas de áudio originais separadas por locutor.

    Args:
        segments (list): Lista de segmentos com informações de locutores e tempos.
        input_dir (str): Diretório contendo os arquivos de áudio segmentados.
        output_dir (str): Diretório onde os arquivos compilados serão salvos.

    Returns:
        None
    """
    os.makedirs(output_dir, exist_ok=True)
    speaker_audio = {}

    # Listar todos os arquivos reais no diretório para evitar erros de correspondência
    existing_files = glob.glob(os.path.join(input_dir, "*.wav"))
    existing_files = {os.path.basename(f): f for f in existing_files}

    # Iterar pelos segmentos e agrupar por locutor
    for segment in segments:
        speaker = segment["speaker"]
        audio_path = segment["audio_path"]  # Usar o caminho real do arquivo

        # Verificar se o arquivo existe
        if not os.path.exists(audio_path):
            print(f"[AVISO] Arquivo de áudio não encontrado: {audio_path}")
            continue

        audio_segment = AudioSegment.from_wav(audio_path)

        # Agrupar os segmentos por locutor
        if speaker not in speaker_audio:
            speaker_audio[speaker] = audio_segment
        else:
            speaker_audio[speaker] += audio_segment


    # Exportar áudios compilados
    for speaker, audio in speaker_audio.items():
        output_path = os.path.join(output_dir, f"{speaker}_compiled.wav")
        audio.export(output_path, format="wav")
        print(f"Trilha compilada salva para {speaker}: {output_path}")

# Caminhos de entrada e saída
audio_files_path = os.path.join(episode_path, episode_name, "audio_file")

INPUT_DIR = f"{audio_files_path}/original_segments"
OUTPUT_DIR = f"{audio_files_path}/compiled_speaker_audio"
TRANSCRIPTIONS_PATH = ""

# Carregar transcrições
with open(TRANSCRIPTIONS_PATH, "r", encoding="utf-8") as f:
    segments = json.load(f)

# Compilar áudios por locutor
compile_audio_by_speaker(segments, INPUT_DIR, OUTPUT_DIR)


## Célula 5.2 - Teste de Geração direta com Vits / xtts

In [None]:
# Célula 5.2 - Geração Direta de Áudio com VITS Multspeaker
# Atualização para lidar com o aviso do torch.load
import functools
import warnings
import os
import json
import torch
from TTS.api import TTS

# Função para gerar áudio diretamente com VITS Multspeaker
from pydub import AudioSegment

# Ocultar Warnings
import warnings
warnings.filterwarnings("ignore")

#configurar print_now
def print_now(message):
    print(f"[{datetime.now().strftime('%H:%M:%S')}] {message}")

def generate_audio_directly_with_vits(segments, tts_model, output_dir, compiled_audio_dir, language="pt", speed=1.0):
    """
    Gera áudio traduzido diretamente com IDs de locutores e transcrições usando VITS,
    ou insere arquivos de silêncio para transcrições ausentes ou inválidas.

    Args:
        segments (list): Lista de segmentos com transcrições e IDs de locutores.
        tts_model (TTS): Instância do modelo VITS carregado.
        output_dir (str): Diretório para salvar os arquivos de áudio gerados.
        audio_dir (str): Diretório contendo os arquivos de áudio de entrada por segmento.
        compiled_audio_dir (str): Diretório contendo os arquivos compilados de cada locutor.
        language (str): Idioma alvo para a geração de áudio.

    Returns:
        list: Lista de caminhos dos arquivos de áudio gerados.
    """
    os.makedirs(output_dir, exist_ok=True)
    MIN_DURATION = 0.3  # duração mínima em segundos
    generated_files = []

    for segment in segments:
        speaker = segment["speaker"]
        start = segment["start"]
        duration = segment["duration"]
        #transcription = segment.get("transcription", "").strip()
        transcription = segment["text"]

        # Definir caminho do áudio compilado do locutor
        #speaker_wav = os.path.join(compiled_audio_dir, f"{speaker}_compiled.wav")
        speaker_wav = segment["audio_path"]

        if not os.path.exists(speaker_wav):
            print_now(f"[ERRO] Arquivo do segmento {speaker_wav} não encontrado. Pulando...")
            continue

        # Caminho para o áudio de saída
        output_path = os.path.join(output_dir, f"{speaker}_{start:.3f}-{duration:.3f}.wav")

        if os.path.exists(output_path):
            print_now(f"[INFO] Arquivo já gerado: {output_path}. Pulando.")
            continue

        try:
            # Verificar duração mínima
            if duration < MIN_DURATION:
                print_now(f"[INFO] Segmento muito curto para {speaker} ({start:.3f}-{duration:.3f}). Gerando silêncio...")
                duration_ms = duration * 1000
                silence = AudioSegment.silent(duration=duration_ms)
                silence.export(output_path, format="wav")
                generated_files.append(output_path)
                continue

            # Validar transcrição
            if transcription:
                print_now(f"Gerando áudio para locutor {speaker}, segmento {start:.3f}-{duration:.3f}...")
                tts_model.tts_to_file(
                    text=transcription,
                    speaker_wav=speaker_wav,
                    language=language,
                    file_path=output_path,
                    speed=speed,
                    split_sentences=False, # Se False, consome mais memória
                )
                print_now(f"Áudio gerado com sucesso para {speaker}, salvo em: {output_path}")
            else:
                print_now(f"[INFO] Transcrição inválida ou ausente para {speaker}, segmento {start:.3f}-{duration:.3f}. Gerando silêncio...")
                duration_ms = duration * 1000
                silence = AudioSegment.silent(duration=duration_ms)
                silence.export(output_path, format="wav")

            generated_files.append(output_path)

        except Exception as e:
            print_now(f"Erro ao processar segmento {speaker} ({start:.3f}-{duration:.3f}): {e}")

    return generated_files

# Caminhos e inicialização
LANGUAGE = "pt"
SPEED = 1.8
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

audio_files_path = os.path.join(episode_path, episode_name, "audio_file")

TRANSCRIPTIONS_PATH = ""
AUDIO_OUTPUT_DIR = ""
COMPILED_AUDIO_DIR = ""

# Carregar transcrições
with open(TRANSCRIPTIONS_PATH, "r", encoding="utf-8") as f:
    segments = json.load(f)

# Inicializar o modelo VITS
MODEL_NAME = "tts_models/multilingual/multi-dataset/xtts_v2"
print_now("Carregando modelo VITS...")
tts = TTS(model_name=MODEL_NAME).to(DEVICE)
print_now(f"Modelo carregado com sucesso no dispositivo: {DEVICE}")

# Gerar áudios diretamente com VITS
print_now("Iniciando a geração direta de áudio com VITS...")
generated_files = generate_audio_directly_with_vits(segments, tts, AUDIO_OUTPUT_DIR, COMPILED_AUDIO_DIR, LANGUAGE, SPEED)

print_now("Processo de geração concluído!")
print_now(f"Total de arquivos gerados: {len(generated_files)}")

# Célula 6 - Mixagem de Aúdio e Vídeo

In [None]:
# Plotagem da Linha do Tempo
import json
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
from matplotlib.ticker import FuncFormatter

# Caminho do arquivo JSON
aligned_json_path = ""

# Carregar os dados do JSON
with open(aligned_json_path, 'r', encoding='utf-8') as json_file:
    data = json.load(json_file)

# Configurar a linha do tempo com tamanho ajustável
def plot_timeline(figsize=(12, 6)):
    fig, ax = plt.subplots(figsize=figsize)

    # Criar o gráfico de segmentos
    speakers = set(segment['speaker'] for segment in data)
    colors = plt.cm.tab10.colors  # Paleta de cores
    speaker_colors = {speaker: colors[i % len(colors)] for i, speaker in enumerate(speakers)}

    for segment in data:
        start = segment['start']
        end = segment['end']
        duration = segment['duration']
        speaker = segment['speaker']
        ax.broken_barh([(start, duration)], (int(speaker.split('_')[1]) * 10, 9), color=speaker_colors[speaker], label=speaker)

    # Configurar eixos
    ax.set_xlabel("Tempo (MM:SS)")
    ax.set_title("Linha do Tempo dos Segmentos de Áudio")
    ax.grid(True, which='both', linestyle='--', linewidth=0.5)

    # Remover legenda do eixo Y
    ax.set_yticks([])
    ax.set_yticklabels([])

    # Formatar o eixo X para MM:SS
    def seconds_to_mmss(x, pos):
        minutes = int(x // 60)
        seconds = int(x % 60)
        return f"{minutes:02}:{seconds:02}"

    ax.xaxis.set_major_formatter(FuncFormatter(seconds_to_mmss))

    # Legenda
    handles = [mpatches.Patch(color=color, label=speaker) for speaker, color in speaker_colors.items()]
    ax.legend(handles=handles, bbox_to_anchor=(1.05, 1), loc='upper left', title="Locutores")

    # Mostrar gráfico
    plt.tight_layout()
    plt.show()

# Chamada para plotar com tamanho ajustável
plot_timeline(figsize=(16, 8))


## Célula 6.1 - Criação da Trilha Final de Áudio Dublado

In [None]:
# Célula 6.1 - Criação da Trilha Final de Áudio Dublado

import os
from pydub import AudioSegment

def calculate_total_duration(audio_path):
    """
    Calcula a duração total do áudio original.

    Args:
        audio_path (str): Caminho para o arquivo de áudio original.

    Returns:
        float: Duração total do áudio em segundos.
    """
    audio = AudioSegment.from_wav(audio_path)
    return len(audio) / 1000.0  # Retorna em segundos

def create_final_audio_track(segment_dir, output_path, audio_path):
    """
    Cria uma trilha de áudio alinhada com o tempo total da trilha original, 
    inserindo os segmentos dublados nos tempos corretos.

    Args:
        segment_dir (str): Diretório contendo os segmentos dublados.
        output_path (str): Caminho para salvar a trilha final.
        audio_path (str): Caminho para o áudio original.

    Returns:
        str: Caminho da trilha final criada.
    """
    # Calcular a duração total do áudio original
    total_duration = calculate_total_duration(audio_path)

    # Criar uma trilha vazia com a duração total
    final_track = AudioSegment.silent(duration=total_duration * 1000)  # Convertendo para milissegundos

    # Iterar pelos arquivos de segmentos dublados
    for segment_file in os.listdir(segment_dir):
        if segment_file.endswith(".wav"):
            # Extrair informações do nome do arquivo
            try:
                speaker, start_duration = segment_file.rsplit("_", 1)
                start, duration = map(float, start_duration.replace(".wav", "").split("-"))
            except ValueError:
                print(f"[ERRO] Nome de arquivo inválido: {segment_file}. Pulando...")
                continue

            # Carregar o segmento de áudio
            segment_path = os.path.join(segment_dir, segment_file)
            segment_audio = AudioSegment.from_wav(segment_path)

            # Calcular a posição inicial na trilha final
            start_time_ms = int(start * 1000)  # Convertendo para milissegundos

            # Inserir o segmento na trilha final
            final_track = final_track.overlay(segment_audio, position=start_time_ms)
            print(f"[INFO] Segmento {segment_file} inserido em {start_time_ms} ms.")

    # Salvar a trilha final
    final_track.export(output_path, format="wav")
    print(f"Trilha final criada e salva em: {output_path}")

    return output_path

# Caminhos e inicialização
LANGUAGE = "pt"
SPEED = 1.8
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"


audio_files_path = os.path.join(episode_path, episode_name, "audio_file")

# Caminhos
SEGMENT_DIR = f"{audio_files_path}/generated_audio_condicional_{LANGUAGE}_{SPEED}"
OUTPUT_PATH = f"{audio_files_path}/final_audio_track.wav"
AUDIO_PATH = f"{audio_files_path}/{episode_name}.wav"

# Criar a trilha final
create_final_audio_track(SEGMENT_DIR, OUTPUT_PATH, AUDIO_PATH)


## Célula 6.2 - Mixagem Final das Trilhas de Áudio

In [None]:
# Célula 6.2 - Mixagem Final das Trilhas de Áudio
import os
from pydub import AudioSegment, effects
from moviepy import VideoFileClip, AudioFileClip

def normalize_audio(audio):
    """
    Normaliza o volume de uma faixa de áudio.

    Args:
        audio (AudioSegment): Faixa de áudio a ser normalizada.

    Returns:
        AudioSegment: Faixa de áudio normalizada.
    """
    return effects.normalize(audio)

def mix_audio_tracks(audio_paths, output_path):
    """
    Realiza a mixagem das faixas de áudio especificadas.

    Args:
        audio_paths (list): Lista de caminhos para as faixas de áudio.
        output_path (str): Caminho para salvar a trilha mixada.

    Returns:
        str: Caminho da trilha mixada.
    """
    if not audio_paths:
        raise ValueError("Nenhum caminho de áudio fornecido para mixagem.")

    # Carregar e normalizar as faixas de áudio
    tracks = []
    for path in audio_paths:
        print(f"Carregando e normalizando faixa: {path}")
        track = AudioSegment.from_wav(path)
        tracks.append(normalize_audio(track))

    # Mixar as faixas de áudio
    mixed_audio = tracks[0]
    for track in tracks[1:]:
        mixed_audio = mixed_audio.overlay(track)

    # Salvar a trilha mixada
    os.makedirs(os.path.dirname(output_path), exist_ok=True)
    mixed_audio.export(output_path, format="wav")
    print(f"Trilha mixada salva em: {output_path}")

    return output_path

def replace_audio_in_video(video_path, audio_path, output_video_path):
    """
    Substitui o áudio original de um vídeo pela trilha mixada.

    Args:
        video_path (str): Caminho para o vídeo original.
        audio_path (str): Caminho para a trilha de áudio mixada.
        output_video_path (str): Caminho para salvar o vídeo com o áudio substituído.

    Returns:
        str: Caminho do vídeo final.
    """
    os.makedirs(os.path.dirname(output_video_path), exist_ok=True)
    print(f"Substituindo áudio no vídeo: {video_path}")
    video = VideoFileClip(video_path)
    new_audio = AudioFileClip(audio_path)
    video.audio = new_audio
    #video = video.set_audio(new_audio)
    video.write_videofile(output_video_path, codec="libx264", audio_codec="aac")
    print(f"Vídeo final salvo em: {output_video_path}")

    return output_video_path


# Caminhos
BASS_PATH = ".../bass.wav"
DRUMS_PATH = ".../drums.wav"
OTHER_PATH = ".../other.wav"
DUBBED_PATH = ""
FINAL_AUDIO_PATH = ""
VIDEO_PATH = ""
FINAL_VIDEO_PATH = ""

# Mixar as faixas de áudio
AUDIO_PATHS = [BASS_PATH, DRUMS_PATH, OTHER_PATH, DUBBED_PATH]
mix_audio_tracks(AUDIO_PATHS, FINAL_AUDIO_PATH)

# Substituir o áudio original do vídeo
replace_audio_in_video(VIDEO_PATH, FINAL_AUDIO_PATH, FINAL_VIDEO_PATH)


## Célula 6.3 - Adicionar Legendas ao Vídeo com MoviePy

In [None]:
# Célula 6.1.5 - Adicionar Legendas ao Vídeo com MoviePy
# Esta célula adiciona legendas de um arquivo .srt ao vídeo original.

# Instalando dependências necessárias
# pip install moviepy pysrt

from moviepy import VideoFileClip
import subprocess

def add_subtitles_to_video(video_path, subtitles_path, output_path):
    """
    Adiciona legendas ao vídeo usando um arquivo .srt.

    Args:
        video_path (str): Caminho para o arquivo de vídeo original.
        subtitles_path (str): Caminho para o arquivo de legendas .srt.
        output_path (str): Caminho para o vídeo de saída com legendas.

    Returns:
        None
    """
    try:
        print("Verificando dependência do FFmpeg...")
        ffmpeg_installed = subprocess.run(["ffmpeg", "-version"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        if ffmpeg_installed.returncode != 0:
            raise EnvironmentError("FFmpeg não está instalado ou não está disponível no PATH.")

        print("Iniciando adição de legendas...")
        command = (
            f"ffmpeg -i {video_path} -vf subtitles={subtitles_path} -c:v libx264 -c:a copy {output_path}"
        )
        subprocess.run(command, shell=True, check=True)

        print(f"Vídeo com legendas gerado em: {output_path}")

    except Exception as e:
        print(f"Erro ao adicionar legendas ao vídeo: {e}")

# Exemplo de uso:
add_subtitles_to_video(
    video_path="",
    subtitles_path="",
    output_path=""
)


# Célula 7- Fine-Tuning com Xtts_V2 e GPT

In [None]:
# Criar Metadata CSV
import csv
import json

# Criar metadata.csv
def create_and_save_metadata_csv(json_path, csv_path):
    with open(json_path, "r", encoding="utf-8") as json_file:
        data = json.load(json_file)
    with open(csv_path, "w", encoding="utf-8", newline="") as csv_file:
        writer = csv.writer(csv_file, delimiter="|")
        for entry in data:
            writer.writerow([
                entry["audio_data"],  # Caminho do arquivo de áudio
                entry["transcription"],  # Texto da transcrição
                entry.get("speaker", "custom_speaker")  # Nome do locutor
            ])
    print(f"Arquivo {csv_path} criado com sucesso!")

TRANSCRIPTIONS_PATH = ""
METADATA_CSV_PATH = ""

create_and_save_metadata_csv(TRANSCRIPTIONS_PATH, METADATA_CSV_PATH)


## Célula 7.1 - Treino

In [None]:
# Verificar arquivos de áudio
def verify_audio_files(manifest_file, dataset_path):
    issues = []
    print("Verificando arquivos de áudio...")
    with open(manifest_file, "r", encoding="utf-8") as file:
        for line in file:
            cols = line.strip().split("|")
            audio_file = os.path.join(cols[0])  # Caminho do arquivo de áudio
            if not os.path.isfile(audio_file):
                issues.append(f"Arquivo não encontrado: {audio_file}")
                continue
            try:
                # Carregar o áudio
                waveform, sample_rate = torchaudio.load(audio_file)
                if sample_rate != 22050:
                    issues.append(f"Taxa de amostragem inesperada em {audio_file}: {sample_rate} Hz")
                if waveform.abs().max() > 1.0:
                    issues.append(f"Amplitude fora do intervalo esperado [-1, 1] em {audio_file}")
            except Exception as e:
                issues.append(f"Erro ao carregar {audio_file}: {e}")
    if not issues:
        print("Todos os arquivos estão no formato esperado!")
    else:
        print("Problemas encontrados:")
        for issue in issues:
            print(f" - {issue}")

# Exemplo de uso para verificar dados
manifest_file = ".../metadata.csv"  # Caminho para o arquivo de metadados
dataset_path = ""  # Caminho para os arquivos de áudio
verify_audio_files(manifest_file, dataset_path)

In [None]:
# Célula 7.1 - Configuração e Treinamento do Modelo VITS
# Train TRAIN_GPT_XTTS.py
import os
import torch
from datetime import datetime
from trainer import Trainer, TrainerArgs

from TTS.config.shared_configs import BaseDatasetConfig
from TTS.tts.datasets import load_tts_samples
from TTS.tts.layers.xtts.trainer.gpt_trainer import GPTArgs, GPTTrainer, GPTTrainerConfig, XttsAudioConfig
from TTS.utils.manage import ModelManager

import warnings
warnings.filterwarnings("ignore")

# Configurar dispositivo
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

#configurar print_now
def print_now(message):
    print(f"[{datetime.now().strftime('%H:%M:%S')}] {message}")

# Logging parameters
RUN_NAME = "GPT_XTTS_v2.0_Custom"
PROJECT_NAME = "XTTS_trainer"
RUN_DESCRIPTION = "Fine_tuning."
DASHBOARD_LOGGER = "tensorboard"
LOGGER_URI = None

# Formatter customizado
def custom_formatter(root_path, manifest_file, **kwargs):
    """Formatter customizado para o nosso metadata.csv."""
    txt_file = os.path.join(root_path, manifest_file)
    items = []
    with open(txt_file, "r", encoding="utf-8") as ttf:
        for line in ttf:
            cols = line.strip().split("|")
            wav_file = os.path.normpath(cols[0])  # Usa o caminho direto do arquivo
            if not os.path.isfile(wav_file):
                raise FileNotFoundError(f"Arquivo de áudio não encontrado: {wav_file}")
            text = cols[1]
            speaker_name = cols[2]
            items.append({
                "text": text,
                "audio_file": wav_file,
                "speaker_name": speaker_name,
                "root_path": root_path
            })
    return items

# Set here the path that the checkpoints will be saved. Default: ./run/training/
OUT_PATH = os.path.join("model_new", "run", "training")
DATASET_PATH = "data/audio_file/original_segments"
METADATA_PATH = os.path.join("metadata.csv")

# Training Parameters
EPOCHS = 20  # set here the number of epochs
OPTIMIZER_WD_ONLY_ON_WEIGHTS = True  # for multi-gpu training please make it False
START_WITH_EVAL = True  # if True it will star with evaluation
BATCH_SIZE = 3  # set here the batch size
GRAD_ACUMM_STEPS = 84  # set here the grad accumulation steps
# Note: we recommend that BATCH_SIZE * GRAD_ACUMM_STEPS need to be at least 252 for more efficient training. You can increase/decrease BATCH_SIZE but then set GRAD_ACUMM_STEPS accordingly.

# Define here the dataset that you want to use for the fine-tuning on.
config_dataset = BaseDatasetConfig(
    formatter="custom_formatter",
    dataset_name="my_dataset",
    path=DATASET_PATH,
    meta_file_train=METADATA_PATH,
    language="en",
)

# Add here the configs of the datasets
DATASETS_CONFIG_LIST = [config_dataset]

# Define the path where XTTS v2.0.1 files will be downloaded
CHECKPOINTS_OUT_PATH = os.path.join(OUT_PATH, "XTTS_v2.0_original_model_files/")
os.makedirs(CHECKPOINTS_OUT_PATH, exist_ok=True)


# DVAE files
DVAE_CHECKPOINT_LINK = "https://coqui.gateway.scarf.sh/hf-coqui/XTTS-v2/main/dvae.pth"
MEL_NORM_LINK = "https://coqui.gateway.scarf.sh/hf-coqui/XTTS-v2/main/mel_stats.pth"

# Set the path to the downloaded files
DVAE_CHECKPOINT = os.path.join(CHECKPOINTS_OUT_PATH, os.path.basename(DVAE_CHECKPOINT_LINK))
MEL_NORM_FILE = os.path.join(CHECKPOINTS_OUT_PATH, os.path.basename(MEL_NORM_LINK))

# download DVAE files if needed
if not os.path.isfile(DVAE_CHECKPOINT) or not os.path.isfile(MEL_NORM_FILE):
    print_now(" > Downloading DVAE files!")
    ModelManager._download_model_files([MEL_NORM_LINK, DVAE_CHECKPOINT_LINK], CHECKPOINTS_OUT_PATH, progress_bar=True)


# Download XTTS v2.0 checkpoint if needed
TOKENIZER_FILE_LINK = "https://coqui.gateway.scarf.sh/hf-coqui/XTTS-v2/main/vocab.json"
XTTS_CHECKPOINT_LINK = "https://coqui.gateway.scarf.sh/hf-coqui/XTTS-v2/main/model.pth"

# XTTS transfer learning parameters: You we need to provide the paths of XTTS model checkpoint that you want to do the fine tuning.
TOKENIZER_FILE = os.path.join(CHECKPOINTS_OUT_PATH, os.path.basename(TOKENIZER_FILE_LINK))  # vocab.json file
XTTS_CHECKPOINT = os.path.join(CHECKPOINTS_OUT_PATH, os.path.basename(XTTS_CHECKPOINT_LINK))  # model.pth file

# download XTTS v2.0 files if needed
if not os.path.isfile(TOKENIZER_FILE) or not os.path.isfile(XTTS_CHECKPOINT):
    print_now(" > Downloading XTTS v2.0 files!")
    ModelManager._download_model_files(
        [TOKENIZER_FILE_LINK, XTTS_CHECKPOINT_LINK], CHECKPOINTS_OUT_PATH, progress_bar=True
    )

LANGUAGE = config_dataset.language

#def main():
# init args and config
print_now("Inicilizando Configurações...")
model_args = GPTArgs(
    max_conditioning_length=132300,  # 6 secs
    min_conditioning_length=66150,  # 3 secs
    debug_loading_failures=False,
    max_wav_length=255995,  # ~11.6 seconds
    max_text_length=200,
    mel_norm_file=MEL_NORM_FILE,
    dvae_checkpoint=DVAE_CHECKPOINT,
    xtts_checkpoint=XTTS_CHECKPOINT,  # checkpoint path of the model that you want to fine-tune
    tokenizer_file=TOKENIZER_FILE,
    gpt_num_audio_tokens=1026,
    gpt_start_audio_token=1024,
    gpt_stop_audio_token=1025,
    gpt_use_masking_gt_prompt_approach=True,
    gpt_use_perceiver_resampler=True,
)
# define audio config
audio_config = XttsAudioConfig(sample_rate=22050, dvae_sample_rate=22050, output_sample_rate=24000)
# training parameters config
config = GPTTrainerConfig(
    output_path=OUT_PATH,
    model_args=model_args,
    run_name=RUN_NAME,
    project_name=PROJECT_NAME,
    run_description=RUN_DESCRIPTION,
    epochs=EPOCHS,
    dashboard_logger=DASHBOARD_LOGGER,
    logger_uri=LOGGER_URI,
    audio=audio_config,
    batch_size=BATCH_SIZE,
    batch_group_size=48,
    eval_batch_size=BATCH_SIZE,
    num_loader_workers=8,
    eval_split_size=0.1,
    eval_split_max_size=256,
    print_step=50,
    plot_step=100,
    log_model_step=1000,
    save_step=10000,
    save_n_checkpoints=1,
    save_checkpoints=True,
    # target_loss="loss",
    print_eval=False,
    # Optimizer values like tortoise, pytorch implementation with modifications to not apply WD to non-weight parameters.
    optimizer="AdamW",
    optimizer_wd_only_on_weights=OPTIMIZER_WD_ONLY_ON_WEIGHTS,
    optimizer_params={"betas": [0.9, 0.96], "eps": 1e-8, "weight_decay": 1e-2},
    lr=5e-06,  # learning rate
    lr_scheduler="MultiStepLR",
    # it was adjusted accordly for the new step scheme
    lr_scheduler_params={"milestones": [50000 * 18, 150000 * 18, 300000 * 18], "gamma": 0.5, "last_epoch": -1},
    mixed_precision=False,
)
print_now("Configurações inicializadas.")

# init the model from config
print_now("Inicializando o Modelo...")
model = GPTTrainer.init_from_config(config)
#model = model.to(DEVICE)
print_now("Modelo inicializado.")

# load training samples
train_samples, eval_samples = load_tts_samples(
    DATASETS_CONFIG_LIST,
    eval_split=True,
    eval_split_max_size=config.eval_split_max_size,
    eval_split_size=config.eval_split_size,
    formatter=custom_formatter,
)
print_now(f"Quantidade de Amostras de Treino: {len(train_samples)}, Quantidade de Amostras de Validação: {len(eval_samples)}")
print_now("Amostras de treino e validação carregadas.")

# init the trainer and 🚀
print_now("Iniciando o treinamento...")
trainer = Trainer(
    TrainerArgs(
        restore_path=None,  # xtts checkpoint is restored via xtts_checkpoint key so no need of restore it using Trainer restore_path parameter
        skip_train_epoch=False,
        start_with_eval=START_WITH_EVAL,
        grad_accum_steps=GRAD_ACUMM_STEPS,
    ),
    config,
    output_path=OUT_PATH,
    model=model,
    train_samples=train_samples,
    eval_samples=eval_samples,
)
trainer.fit()
print_now("Treinamento concluído.")




## Celula 7.2 - Inferência 

In [None]:
# Célula 7.2 - Configuração e Treinamento do Modelo VITS Default Script
# Inferência - TRAIN_GPT_XTTS.py
import os
import torch
from datetime import datetime
from TTS.tts.configs.xtts_config import XttsConfig
from TTS.tts.models.xtts import Xtts

# Caminhos para os arquivos necessários
VOCAB_PATH = ".../vocab.json"
CONFIG_PATH = ".../config.json"
BEST_MODEL = ".../best_model.pth"
METADATA_FILE = ".../metadata.csv"
OUTPUT_DIR = ""

os.makedirs(OUTPUT_DIR, exist_ok=True)

# Função para impressão com timestamp
def print_now(message):
    print(f"[{datetime.now().strftime('%H:%M:%S')}] {message}")

# Carregar o modelo
def load_model(config_path, model_path, vocab_path):
    print_now("Carregando modelo e configuração para inferência...")
    if not os.path.isfile(config_path):
        raise FileNotFoundError(f"Configuração não encontrada: {config_path}")
    if not os.path.isfile(model_path):
        raise FileNotFoundError(f"Modelo não encontrado: {model_path}")
    if not os.path.isfile(vocab_path):
        raise FileNotFoundError(f"Arquivo de vocabulário não encontrado: {vocab_path}")

    config = XttsConfig()
    config.load_json(config_path)
    model = Xtts.init_from_config(config)
    model.load_checkpoint(config, checkpoint_path=model_path, vocab_path=vocab_path, use_deepspeed=False)
    model.cuda()
    print_now("Modelo carregado com sucesso.")
    return model

# Realizar inferência
def run_inference(model, metadata_file, output_dir):
    with open(metadata_file, "r", encoding="utf-8") as meta:
        for line in meta:
            cols = line.strip().split("|")
            audio_file = cols[0]
            text = cols[1]
            speaker_name = cols[2]

            output_file = os.path.join(output_dir, f"{os.path.basename(audio_file).replace('.wav', '_synthesized.wav')}")

            print_now(f"Sintetizando: {text} para locutor: {speaker_name}")

            # Inferência
            gpt_cond_latent, speaker_embedding = model.get_conditioning_latents(audio_path=[audio_file])
            out = model.inference(text, "en", gpt_cond_latent, speaker_embedding, temperature=0.7)

            # Salvar áudio sintetizado
            torchaudio.save(output_file, torch.tensor(out["wav"]).unsqueeze(0), 24000)
            print_now(f"Áudio sintetizado salvo em: {output_file}")

# Inferência principal
def main():
    model = load_model(CONFIG_PATH, BEST_MODEL, VOCAB_PATH)
    run_inference(model, METADATA_FILE, OUTPUT_DIR)
    print_now("Inferência concluída.")

if __name__ == "__main__":
    main()
