## Notebook para el doblaje manual del audio dado un vídeo de YouTube
##### NOTA: borrar directorio correspondiente al audio antes de iniciar
##### NOTA: debe tener el waves.styletts corriendo

### Importa librerías de Python y Modelo Whisper Large V3 
#### En macOS usa 'mps' como gpu y cuda en caso de disponer.

In [1]:
import subprocess
import json
import os
import subprocess
import sys
import time
from pathlib import Path
import re
import math
import itertools


import librosa
import numpy as np
import requests
import scipy.signal as signal
import stable_whisper
import torch
import whisperx
from fastapi import APIRouter, Depends, HTTPException, status
from pydub import AudioSegment
from pytube import YouTube
from sqlalchemy.orm import Session
from transformers import (
    WhisperProcessor, WhisperForConditionalGeneration, pipeline
)

from IPython.display import display, Audio, Video

sys.path.insert(0, '..')

from dbms.database import get_db
from modules.combine import combine_chunks
from modules.demucs_impl import demucs_it
from modules.silero_based_chunk import chunk, split_audio


parent = Path(os.getcwd()).parent.__str__()

output_path = f"{parent}/audios"
device = "cuda:0" if torch.cuda.is_available() else "mps"
model_name_hf = "openai/whisper-large-v3"


pipe = pipeline("automatic-speech-recognition", model=model_name_hf, device=device,)

torchvision is not available - cannot save figures
Using cache found in /Users/beltre.wilton/.cache/torch/hub/snakers4_silero-vad_master
Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


## Grupo de funciones encargadas de hacer una operación puntual

In [2]:
def mp4_to_wav(input_mp4, output_wav):
    # Check if the input file exists
    if not os.path.exists(input_mp4):
        print(f"Error: Input file '{input_mp4}' not found.")
        return
    command = [
        'ffmpeg',
        '-y',
        '-i', input_mp4,
        # '-ss', '00:01:28',
        # '-to', '00:05:57',
        '-acodec', 'pcm_s16le',
        '-hide_banner',
        '-loglevel', 'error',
        output_wav
    ]

    try:
        result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=False)
        if result.returncode != 0:
            raise RuntimeError(f"Error ffmpeg encontrado durante ejecucion, {result.stderr}")
        print(f"Conversion successful: {input_mp4} -> {output_wav},\n{result.stderr} {result.stdout}")
    except subprocess.CalledProcessError as e:
        print(f"Error during conversion: {e}")


def stereo_to_mono(data):
    """Convert stereo audio to mono by averaging the samples"""
    mono_audio = np.mean(data, axis=1, dtype=data.dtype)
    return mono_audio


def resample(data, orig_sr, target_sr):
    """resample down/up a data audio"""
    ratio = orig_sr / target_sr
    nums = int(len(data) / ratio)
    if len(data.shape) > 1:
        data = stereo_to_mono(data)
    # data = zscore(data)
    sampled = signal.resample(data, nums)
    return sampled

def tr_audio_pipe(sr, audio_reps, resample_=False):
    if resample_:
        audio_reps = resample(audio_reps, sr, 16_000)

    r = pipe(audio_reps, return_timestamps=True, chunk_length_s=30, stride_length_s=[4, 2], batch_size=8,
             generate_kwargs = {"language":"<|es|>","task": "translate"})
    return r['text']


def dl_audio(url: str):
    os.makedirs(output_path, exist_ok=True)
    ytb_key, pathfile = "", ""
    if "?" in url:
        ytb_key = url.split("?")[1]
        ytb_key = ytb_key.replace("v=", "") if "v=" in ytb_key else ytb_key
        os.makedirs(f"{output_path}/{ytb_key}", exist_ok=True)
        pathfile = f"{output_path}/{ytb_key}/{ytb_key[2:]}.mp4"
        video_file = f"{output_path}/{ytb_key}/{ytb_key[2:]}.mp4"
    else:
        ytb_key = url.split("shorts/")[1]
        os.makedirs(f"{output_path}/{ytb_key}", exist_ok=True)
        pathfile = f"{output_path}/{ytb_key}/{ytb_key}.mp4"
        video_file = f"{output_path}/{ytb_key}/{ytb_key[2:]}.mp4"

    if not os.path.exists(pathfile):
        yt = YouTube(url)
        video = yt.streams.filter(only_audio=True).first()
        out_file = video.download(output_path=output_path)
        os.rename(out_file, pathfile)

        # VIDEO
        video = yt.streams.filter(only_audio=False).first()
        video_file = video.download(output_path=output_path)
        os.rename(video_file, pathfile.replace(".mp4", "_V.mp4"))


    audio = AudioSegment.from_file(pathfile, format="mp4")
    # audio_base64 = base64.b64encode(audio.export(format="wav").read())
    # return audio_base64.decode("utf-8")
    # static_synth_url = f"https://127.0.0.1:8000/{pathfile.replace('.mp4', '_PART__synth.wav')}"
    static_url = f"https://127.0.0.1:8000/{pathfile}"
    return static_url, pathfile, audio.duration_seconds, audio.frame_rate, audio.channels


def get_audio_info(pathfile):
    audio = AudioSegment.from_file(pathfile, format="wav")
    return pathfile, audio.duration_seconds, audio.frame_rate, audio.channels


def synth_req(audio_path: str, text: str, alpha: float = 0.3, beta: float = 0.2, use_vc = True) -> dict:
    data = {
        "audio_path": audio_path,
        "text": text,
        "alpha":  alpha,
        "beta":  beta,
        "use_vc": use_vc,
    }
    url = "https://127.0.0.1:8060/tts/synth"
    public_pem = os.getcwd() + '/certs/public.crt'
    key_pem = os.getcwd() + '/certs/key.pem'
    r = requests.post(url=url, data=json.dumps(data), verify=False)
    return r.json()


def tr_chunks(audio_path: Path, cuts: dict) -> dict:
    key = audio_path.parent.name
    vocals = audio_path.parent
    trs = cuts.copy()
    for i, v in enumerate(sorted(vocals.rglob("vocals_*.wav"))):
        print('~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~')
        print(f"translate {v}")
        print('~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~')
        data, sr = librosa.load(v, sr=16_000, mono=True)  # HACE NORMALIZACION
        # transcript = tr_audio(sr, data, resample_=False)
        start_time = time.time()
        transcript = tr_audio_pipe(sr, data)
        print("--- %s seconds ---" % (time.time() - start_time))
        trs[i]['path'] = v.absolute().__str__()
        trs[i]['transcript'] = transcript
    return trs


def synth_chunks(trs: dict, use_vc = True):
    for i in trs.items():
        print('~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~')
        print(f'synthetize {i[1]["path"]}')
        print('~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~')
        r = synth_req(audio_path=i[1]['path'], text=i[1]['transcript'], use_vc = use_vc)
        print(r)


#TODO: pendiente impl este dolor!!
# mask [{'start', 1.87, 'end': 4.67}, {'start', 15.00, 'end': 18.97}]
def mask_seq(cuts: dict, mask: dict):
    cutlist = [item[1] for item in cuts.items()]
    cutlist.insert(0, {})
    for j, item in enumerate(cutlist):
        # if item['start']
        pass
    cuts = {j: item for j, item in enumerate(cutlist)}
    return cuts


def len_ctrl(trs: dict) -> dict:
    #TODO pasaste de los x segundos?, pues:
    # buscar el punto mas cerca de la mitad y dividir el texto
    # si no hay punto, la coma
    # contar la cantidad de palabras en ambas mitades:
    # cortar el audio de acuerdo a la proporcion del conteo de palabras en el tiempo total del audio.
    # ya que el trs fue translated en su unidad, no pierde contexto, HAY QUE MOVER EL silence TAMBIEN
    # TODO usar lista de dict para usar insert con posiciones fijas.
    trs_copy = trs.copy()
    threshold = 29.0
    for i in range(len(trs_copy)):
        if trs_copy[i]['length'] >= threshold:
            transcript = trs_copy[i]['transcript'].strip()
            s = re.split('\s', transcript.strip())
            n = math.floor(len(s) / 2)
            text_1 = " ".join(s[:n])
            text_2 = " ".join(s[n:])
            br = trs_copy[i]['length'] * .50 # also part_1_length
            part_1_start = 0 # trs_copy[i]['start']
            part_1_end = part_1_start + br
            part_1_silence = trs_copy[i]['silence']

            part_2_silence = 0.050
            part_2_start = part_1_end + part_2_silence
            part_2_end = trs_copy[i]['end']
            part_2_path = f"{Path(trs_copy[i]['path']).parent}/vocals_{i + 1}.wav"

            p1 = {'start': part_1_start, 'end': part_1_end, 'length': br, 'silence': part_1_silence, 'cut': True,
                  'path': trs_copy[i]['path'],
                  'transcript': f"{text_1}."}

            p2 = {'start': part_2_start, 'end': part_2_end, 'length': (part_2_end - part_2_start), 'silence': part_2_silence, 'cut': True,
                  'path': part_2_path,
                  'transcript': text_2}

            parent = Path(trs_copy[i]['path']).parent

            trslist = [item[1] for item in trs.items()]
            trslist[i] = p1
            trslist.insert(i + 1, p2)
            for j, item in enumerate(trslist):
                item['path'] = f"{parent}/vocals_{j}.wav"
            trs = {j: item for j, item in enumerate(trslist)}

            for file in itertools.chain(
                    parent.rglob("vocals_*.wav"),
                    parent.rglob("vocals-silence*.wav"),
            ):
                file.unlink(missing_ok=True)

            vocals = f"{parent}/vocals.wav"
            split_audio(trs, vocals)

    return trs


def merge_video(input_video: str, final_sound: str, output_video: str):
    # ffmpeg -y -i AiXMnjCo_sU_V.mp4 -i final_sound.wav -c:v copy -c:a aac -map 0:v:0 -map 1:a:0 output.mp4
    command = [
        'ffmpeg',
        '-y',
        '-i', input_video,
        '-i', final_sound,
        '-c:v', 'copy',
        '-c:a', 'aac',
        '-map', '0:v:0',
        '-map', '1:a:0',
        '-hide_banner',
        '-loglevel', 'error',
        output_video
    ]

    try:
        result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=False)
        if result.returncode != 0:
            raise RuntimeError(f"Error ffmpeg encontrado durante ejecucion, {result.stderr}")
        print(f"Merged successful\n{result.stderr} {result.stdout}")
    except subprocess.CalledProcessError as e:
        print(f"Error during conversion: {e}")


## Pasos Secuenciales

### funciones de los pasos

In [3]:
def cuts_step(url: str) -> (dict, str, Path):
    start_time = time.time()
    try:
        static_url, mp4_file, duration_seconds, frame_rate, channels = dl_audio(url) # dl & calc el wav nativo
        output_wav = mp4_file.replace(".mp4", ".wav")
        mp4_to_wav(input_mp4=mp4_file, output_wav=output_wav)
        audio_path = Path(output_wav)

        demucs_it(audio_path.absolute().__str__())
        vocals = f"{audio_path.parent.absolute().__str__()}/vocals.wav"

        cuts = chunk(vocals)
        return cuts, vocals, audio_path
    except Exception as ex:
        print(ex)


def mask_step(cuts: dict, mask: dict):
    pass


def tr_step(cuts: dict, vocals: str, audio_path: Path) -> dict:
    try:
        split_audio(cuts, vocals)
        trs = tr_chunks(audio_path, cuts)
        trs = len_ctrl(trs)
        return trs
    except Exception as ex:
        print(ex)


def synth_step(trs: dict, audio_path: Path, use_vc = True) -> str:
    synth_chunks(trs, use_vc)
    combine_chunks(audio_path)
    return f"{audio_path.parent.absolute().__str__()}/final_sound.wav"



### Paso 'cuts' o de corte se realiza lo siguiente:
#### - Se implementa silero 
#### - Se hace demucs (separación de voz | instrumentales)
#### - Implementación de algoritmo propio para mejorar alineamiento

In [36]:
# Descarga el audio y lo segmenta en uteraciones.
link = 'https://www.youtube.com/shorts/OtWmhxrq70c'
cuts, vocals, audio_path = cuts_step(link)
cuts

python(2238) MallocStackLogging: can't turn off malloc stack logging because it was not enabled.
python(2239) MallocStackLogging: can't turn off malloc stack logging because it was not enabled.
python(2240) MallocStackLogging: can't turn off malloc stack logging because it was not enabled.


Conversion successful: /Users/beltre.wilton/apps/waves.api/audios/onNuYtDJH-U/onNuYtDJH-U.mp4 -> /Users/beltre.wilton/apps/waves.api/audios/onNuYtDJH-U/onNuYtDJH-U.wav,
 
torch.Size([4, 2, 1374208])


{0: {'start': 0.066,
  'end': 9.662,
  'length': 9.596,
  'silence': 0.066,
  'cut': False},
 1: {'start': 10.338,
  'end': 29.342,
  'length': 19.003999999999998,
  'silence': 0.6759999999999984},
 2: {'start': 30.274,
  'end': 31.1611875,
  'length': 0.8871874999999996,
  'silence': 0.9320000000000022}}

### Paso para agregar una máscara, para que no toque alguna(s) parte del audio.

In [37]:
# Si se requiere agregar mascaras, este es el lugar.
mask = {}
mask_step(cuts, {})


### Transcripción, traducción con Whisper, además:
#### - Se divide el audio de vocales en partes pequeñas
#### - Algoritmo propio para controlar la longitud de uteración

In [38]:
trs = tr_step(cuts, vocals, audio_path)

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
translate /Users/beltre.wilton/apps/waves.api/audios/onNuYtDJH-U/vocals_0.wav
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
--- 24.071895837783813 seconds ---
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
translate /Users/beltre.wilton/apps/waves.api/audios/onNuYtDJH-U/vocals_1.wav
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
--- 13.105316638946533 seconds ---
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
translate /Users/beltre.wilton/apps/waves.api/audios/onNuYtDJH-U/vocals_2.wav
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
--- 5.920491695404053 seconds ---


### En este punto se puede modificar `trs` para cambiar algo en la traducción que amerite

In [39]:
trs



{0: {'start': 0.066,
  'end': 9.662,
  'length': 9.596,
  'silence': 0.066,
  'cut': False,
  'path': '/Users/beltre.wilton/apps/waves.api/audios/onNuYtDJH-U/vocals_0.wav',
  'transcript': " Look at a shark! Oh, he doesn't want to open his mouth! Keep watching! Look at this big one!"},
 1: {'start': 10.338,
  'end': 29.342,
  'length': 19.003999999999998,
  'silence': 0.6759999999999984,
  'path': '/Users/beltre.wilton/apps/waves.api/audios/onNuYtDJH-U/vocals_1.wav',
  'transcript': " Keep watching Look how many Ahamaya Keep watching Look at these psychopath shrimp How big Keep watching My people, I already taught you a lot of maripos So you already know Give it a like To cook this psychopath Shark So follow me if you don't follow me"},
 2: {'start': 30.274,
  'end': 31.1611875,
  'length': 0.8871874999999996,
  'silence': 0.9320000000000022,
  'path': '/Users/beltre.wilton/apps/waves.api/audios/onNuYtDJH-U/vocals_2.wav',
  'transcript': ' See you next time.'}}

### Sintetizado de audio + combinación

In [40]:
synth_wav_file = synth_step(trs, audio_path, use_vc=True)

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
synthetize /Users/beltre.wilton/apps/waves.api/audios/onNuYtDJH-U/vocals_0.wav
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
{'synth_wav_file': '/Users/beltre.wilton/apps/waves.api/audios/onNuYtDJH-U/vocals_0_synth_a-0.3_b-0.2_df-10_em-1.wav', 'synth_name': 'vocals_0_synth_a-0.3_b-0.2_df-10_em-1.wav', 'response': 'completed'}
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
synthetize /Users/beltre.wilton/apps/waves.api/audios/onNuYtDJH-U/vocals_1.wav
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
{'synth_wav_file': '/Users/beltre.wilton/apps/waves.api/audios/onNuYtDJH-U/vocals_1_synth_a-0.3_b-0.2_df-10_em-1.wav', 'synth_name': 'vocals_1_synth_a-0.3_b-0.2_df-10_em-1.wav', 'response': 'completed'}
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
synthetize /Users/beltre.wilton/apps/waves.api/audios/onNuYtDJH-U/vocals_2.wav
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
{'synth_wav_file': '/Users/beltre.wilto

python(2258) MallocStackLogging: can't turn off malloc stack logging because it was not enabled.
python(2259) MallocStackLogging: can't turn off malloc stack logging because it was not enabled.


In [43]:
synth_wav_file

'/Users/beltre.wilton/apps/waves.api/audios/onNuYtDJH-U/final_sound.wav'

In [None]:
print('Audio Sintetizado')
display(Audio(synth_wav_file))

print('Audio Original')
display(Audio(vocals))

In [45]:
input_video = f"{audio_path.__str__().replace('.wav', '_V.mp4')}" 
output_video = f"{audio_path.parent}/video_merged.mp4"

merge_video(input_video, synth_wav_file, output_video)


python(2279) MallocStackLogging: can't turn off malloc stack logging because it was not enabled.


Merged successful
 


In [19]:
output_video

'/Users/beltre.wilton/apps/waves.api/audios/pgQBYfbCc6w/video_merged.mp4'

In [None]:
from IPython.display import HTML
from  base64 import b64encode

# Show video
mp4 = open(output_video,'rb').read()
output = "data:video/mp4;base64," + b64encode(mp4).decode()

mp4 = open(input_video,'rb').read()
input = "data:video/mp4;base64," + b64encode(mp4).decode()


HTML(f"""
<table>
   <tr>
      <td>
         <h3>Sintetizado</h3>
         <video width=400 controls>
            <source src="{output}" type="video/mp4">
        </video>
      </td>
      <td>
         <h3>Original</h3>
         <video width=400 controls>
            <source src="{input}" type="video/mp4">
        </video>
      </td>
   </tr>
</table>
""")