In [6]:
from flask import Flask, request, jsonify
import os
import time
import logging
import librosa
import torch
import torchaudio
import joblib
import numpy as np
from pytube import YouTube
import pyktok as pyk
from moviepy.editor import VideoFileClip
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline
from datasets import load_dataset
from demucs.apply import apply_model
from demucs import pretrained
import threading

app = Flask(__name__)

# Configurar logging
logging.basicConfig(level=logging.DEBUG)

#-------------------Descargar videos-----------------------------------

def descargar_video(url):
    if "youtu" in url:
        logging.debug("Es YouTube")
        try:
            video = YouTube(url)
            video.streams.first().download()
            nombre_archivo = video.title + ".mp4"
            return nombre_archivo
        except Exception as e:
            logging.error(f"Error al descargar el video de YouTube: {e}")
            return None
    elif "tiktok" in url:
        try:
            logging.debug("Es TikTok")
            # Descargar un video de TikTok y su metadato
            pyk.save_tiktok(url, True, 'video_data.csv', 'chrome')
            # Listar archivos descargados
            archivos_descargados = os.listdir()
            # Filtrar archivos con extensión .mp4
            archivos_mp4 = [archivo for archivo in archivos_descargados if archivo.endswith('.mp4')]
            if archivos_mp4:
                # Tomar el primer archivo mp4 encontrado
                nombre_archivo = archivos_mp4[0]
                return nombre_archivo
            else:
                logging.error("No se encontraron archivos MP4 descargados.")
                return None
        except Exception as e:
            logging.error(f"Error al descargar el video de TikTok: {e}")
            return None
    else:
        logging.error("Enlace no reconocido")
        return None


#----------------Mp4 a wav-------------------

def mp4_to_wav(input_file):
    output_file = os.path.splitext(input_file)[0] + ".wav"
    try:
        video = VideoFileClip(input_file)
        audio = video.audio
        audio.write_audiofile(output_file)
        video.close()
        time.sleep(1)
        os.remove(input_file)
        return output_file
    except Exception as e:
        logging.error(f"Error al convertir {input_file} a WAV: {e}")
        return None

#----------------Extraer letra -------------------

def extraerletra(audio):
    import torch
    from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline
    from datasets import load_dataset
    
    
    device = "cuda:0" if torch.cuda.is_available() else "cpu"
    torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32
    
    model_id = "openai/whisper-large-v3"
    
    model = AutoModelForSpeechSeq2Seq.from_pretrained(
        model_id, torch_dtype=torch_dtype, low_cpu_mem_usage=True, use_safetensors=True
    )
    model.to(device)
    
    processor = AutoProcessor.from_pretrained(model_id)
    
    pipe = pipeline(
        "automatic-speech-recognition",
        model=model,
        tokenizer=processor.tokenizer,
        feature_extractor=processor.feature_extractor,
        max_new_tokens=128,
        chunk_length_s=30,
        batch_size=16,
        return_timestamps=True,
        torch_dtype=torch_dtype,
        device=device,
    )
    
    dataset = load_dataset("distil-whisper/librispeech_long", "clean", split="validation")
    sample = dataset[0]["audio"]
    
    result = pipe(audio)
    return(result["text"])

#----------------Aislar audio-----------------

def aislar(audio_file):
    try:
        # Cargar el archivo de audio WAV
        waveform, sample_rate = torchaudio.load(audio_file)
        
        # Asegurarse de que el archivo tenga el formato correcto (2 canales y frecuencia de muestreo de 44100 Hz)
        if waveform.shape[0] != 2 or sample_rate != 44100:
            raise ValueError("El archivo WAV debe tener 2 canales y una frecuencia de muestreo de 44100 Hz.")
        
        # Normalizar la forma de onda si es necesario
        waveform = waveform / torch.max(torch.abs(waveform))
        
        # Convertir la forma de onda en tensores de PyTorch y agregar una dimensión para el lote
        x = waveform.unsqueeze(0)
        
        # Cargar el modelo DEMUCS pre-entrenado
        model = pretrained.get_model('mdx')
        
        # Aplicar el modelo al archivo de audio
        out = apply_model(model, x)[0]  # El resultado tiene forma [S, C, T], donde S es el número de fuentes
        
        # Sumar las fuentes, ignorando "vocals" si está presente
        output_waveform = torch.zeros_like(waveform)
        for i, source in enumerate(out):
            if model.sources[i] != "vocals":
                output_waveform += source
        
        # Sobrescribir el archivo original con el audio resultante sin la voz humana
        torchaudio.save(audio_file, output_waveform, sample_rate)
        
        return audio_file
    except Exception as e:
        logging.error(f"Error al aislar audio {audio_file}: {e}")
        return None

#----------------Predecir acordes-----------------

svm_chord = joblib.load('svm_chord_model.pkl')
scaler = joblib.load('scaler.pkl')
svm_style = joblib.load('svm_style_model.pkl')

def dividir_audio_largo(ruta_audio, duracion_segmento=2, max_length=1000):
    audio, sr = librosa.load(ruta_audio, sr=None)
    duracion_total = librosa.get_duration(y=audio, sr=sr)
    num_segmentos = int(np.ceil(duracion_total / duracion_segmento))
    
    segmentos = []
    segmentos_caracteristicas = []
    
    for i in range(num_segmentos):
        inicio = i * duracion_segmento
        fin = min((i + 1) * duracion_segmento, duracion_total)
        segmento_actual = audio[int(inicio * sr):int(fin * sr)]
        
        spectrograma = librosa.feature.melspectrogram(y=segmento_actual, sr=sr)
        if spectrograma.shape[1] < max_length:
            pad_width = max_length - spectrograma.shape[1]
            spectrograma = np.pad(spectrograma, pad_width=((0, 0), (0, pad_width)), mode='constant', constant_values=0)
        else:
            spectrograma = spectrograma[:, :max_length]
        caracteristicas_segmento = librosa.power_to_db(spectrograma, ref=np.max).flatten()
        
        segmentos.append(segmento_actual)
        segmentos_caracteristicas.append(caracteristicas_segmento)
    
    return segmentos, segmentos_caracteristicas

def arraypredicciones(audio_file):
    segmentos, segmentos_caracteristicas = dividir_audio_largo(audio_file, 2)
    acordes = []
    
    for caracteristicas_segmento in segmentos_caracteristicas:
        caracteristicas_segmento_escaladas = scaler.transform(caracteristicas_segmento.reshape(1, -1))
        acorde_predicho = svm_chord.predict(caracteristicas_segmento_escaladas)
        estilo_predicho = svm_style.predict(caracteristicas_segmento_escaladas)
        
        # Convertir los resultados a listas para que sean serializables en JSON
        acordes.append((acorde_predicho[0].item(), estilo_predicho[0].item()))
    
    return acordes


#----------------API Endpoint-----------------

@app.route('/procesar_video', methods=['POST'])
def procesar_video():
    try:
        data = request.get_json()
        if not data or 'url' not in data:
            raise ValueError("Falta 'url' en el JSON enviado")
        url = data['url']
        # Descargar video
        video_file = descargar_video(url)
        if not video_file:
            logging.error("Error al descargar el video")
            return jsonify({"error": "Error al descargar el video"}), 500
        
        logging.debug(f"Video descargado: {video_file}")
        
        # Convertir MP4 a WAV
        audio_file = mp4_to_wav(video_file)
        if not audio_file:
            logging.error("Error al convertir el video a WAV")
            return jsonify({"error": "Error al convertir el video a WAV"}), 500
        
        logging.debug(f"Audio convertido: {audio_file}")
        
        # Extraer letra
        letra = extraerletra(audio_file)
        if not letra:
            logging.error("Error al extraer la letra")
            return jsonify({"error": "Error al extraer la letra"}), 500
        
        logging.debug(f"Letra extraída: {letra}")
        
        # Aislar audio
        audio_file = aislar(audio_file)
        time.sleep(1)
        # Predecir acordes
        acordes = arraypredicciones(audio_file)
        if not acordes:
            logging.error("Error al predecir los acordes")
            return jsonify({"error": "Error al predecir los acordes"}), 500
        
        logging.debug(f"Acordes predichos: {acordes}")
        
        # Eliminar archivo de audio temporal
        if os.path.exists(audio_file):
            os.remove(audio_file)
    
        response = {"status": "success", "message": "URL procesada correctamente", "url": url}
        return jsonify(response), 200
    except Exception as e:
        return jsonify({"error": str(e)}), 400


# Ejecución del servidor Flask en un hilo separado
def run_flask():
    app.run(port=5000, debug=True, use_reloader=False)

flask_thread = threading.Thread(target=run_flask)
flask_thread.start()



 * Serving Flask app '__main__'
 * Debug mode: on


 * Running on http://127.0.0.1:5000
INFO:werkzeug:[33mPress CTRL+C to quit[0m
INFO:werkzeug:127.0.0.1 - - [04/Jun/2024 21:53:16] "OPTIONS /procesar_video HTTP/1.1" 200 -


In [None]:
# no está importado el modelo preentrenado