<a href="https://colab.research.google.com/github/JERSONMALDONA/senales-y-sistemas/blob/main/taler_2/Detector_de_genero_musical_en_Streamlit_.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Detector de genero musical en Streamlit.

Utilizando la herramienta Streamlit, genere un dashboard
para los ejercicios: i) Aplicacion en comunicaciones - modulacion AM  y ii) Aplicacion en circuitos electricos - potencia (Ver material de apoyo Dashboards).
Link detector de Youtube: https://github.com/jmoralespineda/SENALES_Y_SISTEMAS/blob/main/youtube_detector.ipynb

In [None]:
#paquetespara simulación
import numpy as np
import scipy
import matplotlib.pyplot as plt
#%matplotlib inline
import scipy.signal as sig
import scipy.optimize as opt
#from IPython.display import Image
import sympy as sym
#sym.init_session()

In [None]:
#instalación de librerías
!pip install streamlit -q

In [None]:
#instalar librerias necesarias para descargar audios youtube
!python3 -m pip install --force-reinstall https://github.com/yt-dlp/yt-dlp/archive/master.tar.gz -q
#Libreria para manipulacion de archivos de audio
!pip install soundfile -q

- Cookies

In [None]:
!pip install browser-cookie3

In [None]:
!apt install ffmpeg -y


In [None]:
import gdown
import os

# Enlace de descarga directa, este es el modelo de youtube detector creado par los cuadernos de python guardados en github  lo que hace es comparame las cosas
url = 'https://drive.google.com/uc?id=1vpvHR05UJ07oNIbJEQ6cKb1GNDFVwbNP
'
output_path = '/content/modelos/modelo.pkl'

# Crear la carpeta 'modelos' si no existe
output_dir = os.path.dirname(output_path)
if not os.path.exists(output_dir):
    os.makedirs(output_dir)

# Descargar el archivo
gdown.download(url, output_path, quiet=False)

In [None]:
import os

# Verificar los archivos dentro de la carpeta 'modelos'
os.listdir('/content/modelos/')

In [None]:
!mkdir pages

In [None]:
%%writefile 🎼🎤_0_Detector.py

import streamlit as st

st.set_page_config(
    page_title="Detector de Generos Musicales YOUTUBE🎸🎵y otras funciones ",
    page_icon="🎻",
)

st.write("# Bienvenido al detector de géneros musicales en YOUTUBE. 🎵🎸🎼")

st.markdown(
    """
    ### En este dashboard de Streamlit podrás analizar un link de youtube para detectar si pertenece al género **Rock, Techno, Reggae** o **pop**.

    ### Utilizamos la **Transformada Rápida de Fourier (FFT)** para analizar el contenido en frecuencias de la música, lo que nos permitirá identificar características propias de cada género.
"""
)

In [None]:
%%writefile 1_Detector_FFT.py

import streamlit as st
import numpy as np
import librosa
import soundfile as sf
import matplotlib.pyplot as plt
import subprocess
import os
import tempfile
import joblib
from sklearn.preprocessing import MinMaxScaler # Importar MinMaxScaler
import yt_dlp as youtube_dl # Importar yt-dlp en lugar de youtube_dl
import browser_cookie3 # Importar browser-cookie3
import time # Importar time para generar nombres de archivo únicos


# Configuración de la página
st.set_page_config(
    page_title="Análisis FFT del audio 🎹",
    page_icon="🎶",
    layout="wide"
)

# Título y descripción
st.title(" Análisis de Frecuencias🎷 (FFT)")
st.markdown("""
Analiza archivos de audio o canciones de YouTube para predecir su género musical
usando la Transformada Rápida de Fourier (FFT).
""")

# Sidebar para configuración
with st.sidebar:
    st.header("Configuración")
    segment_duration = st.slider("Duración del segmento (segundos)", 5, 30, 5)
    start_time = st.slider("Tiempo inicial de análisis (segundos)", 0, 300, 30)

# Cargar modelo (cacheado)
@st.cache_resource
def load_model():
    try:
        # Asumiendo que el modelo se llama 'modelo.pkl'
        # Asegúrate de que la ruta y el nombre del archivo sean correctos
        model_path = '/content/modelos/modelo.pkl'
        if os.path.exists(model_path):
            model_data = joblib.load(model_path)
            # --- REMOVED DEBUGGING LINES ---
            # st.write("Tipo de model_data['modelo']:", type(model_data.get('modelo')))
            # st.write("Atributos de model_data['modelo']:", dir(model_data.get('modelo')))
            # -------------------------------
            return model_data
        else:
            st.error(f"Error: El archivo del modelo no se encuentra en la ruta esperada: {model_path}")
            return None
    except Exception as e:
        st.error(f"Error cargando el modelo: {e}")
        return None

model_data = load_model()

if model_data is None:
    st.warning("No se pudo cargar el modelo. La aplicación no funcionará correctamente.")
    # st.stop() # No detener completamente, solo mostrar advertencia


# Funciones de procesamiento de audio

# Función para descargar cookies del navegador
try:
    cookies = browser_cookie3.firefox()
except:
    print("No se pueden descargar cookies desde firefox. Intentando Chrome...")
    try:
        cookies = browser_cookie3.chrome()
    except:
        print("No se pueden descargar cookies desde Chrome. Por favor asegúrate de estar logueado en Youtube desde tu navegador.")
        cookies = None

#funcion para descargar mp3 desde youtube
def download_ytvid_as_mp3(video_url,name):
    #video_url = input("enter url of youtube video:")
    options={
        'format':'bestaudio/best',
        'keepvideo':False,
        'outtmpl':f'{name}.mp3',
        'quiet': True, # Added quiet and no_warnings for cleaner output
        'no_warnings': True,
    }
    if cookies:
        options['cookiefile'] = None
        options['cookiejar'] = cookies;

    try:
        with youtube_dl.YoutubeDL(options) as ydl:
            video_info = ydl.extract_info(video_url, download=False)
            ydl.download([video_info['webpage_url']])
            # The original code had a typo here 'filename', changed to f'{name}.mp3'
            st.success(f"Download complete... {name}.mp3")
            return f"{name}.mp3" # Return the downloaded file path
    except Exception as e:
        st.error(f"Error descargando {video_url}: {e}")
        return None


def convert_to_wav(input_file):
    """Convierte un archivo de audio a WAV y devuelve la ruta del archivo WAV temporal"""
    # Usar tempfile para un nombre único de archivo de output WAV
    with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_wav:
        wav_file = tmp_wav.name

    try:
        subprocess.run([
            'ffmpeg', '-y', '-i', input_file,
            '-ac', '2', '-ar', '48000',  # Estéreo, 48kHz
            wav_file
        ], check=True, capture_output=True, text=True)
        return wav_file
    except subprocess.CalledProcessError as e:
        st.error(f"Error convirtiendo a WAV: {e.stderr}")
        # Limpiar archivo temporal WAV si se creó parcialmente
        if os.path.exists(wav_file):
             os.unlink(wav_file)
        return None
    except Exception as e:
        st.error(f"Error inesperado durante la conversión a WAV: {e}")
        # Limpiar archivo temporal WAV si se creó parcialmente
        if os.path.exists(wav_file):
             os.unlink(wav_file)
        return None


def process_audio(audio_path, start_time, duration):
    """Procesa un segmento de audio y devuelve los datos y tasa de muestreo"""
    try:
        audio_data, sr = sf.read(audio_path)

        start_sample = int(start_time * sr)
        end_sample = int((start_time + duration) * sr)

        if end_sample > len(audio_data):
            st.warning(f"El audio es demasiado corto para el segmento seleccionado. Se analizará hasta el final del audio.")
            end_sample = len(audio_data)
            if start_sample >= end_sample:
                st.error("El tiempo de inicio es mayor o igual a la duración total del audio.")
                return None, None


        segment = audio_data[start_sample:end_sample]

        # Convertir a estéreo si es mono
        if segment.ndim == 1:
            segment = np.column_stack((segment, segment))

        return segment, sr
    except Exception as e:
        st.error(f"Error procesando el audio: {e}")
        return None, None

def compute_fft(audio_segment, sr):
    """Calcula la FFT y devuelve frecuencias y magnitudes normalizadas"""
    try:
        # Calcular FFT
        fft_result = np.fft.rfft(audio_segment, axis=0)
        fft_magnitude = np.abs(fft_result.mean(axis=1))  # Promedio de canales

        # Normalizar
        # Verificar si fft_magnitude tiene suficientes datos para la normalización
        if len(fft_magnitude.shape) == 1:
             fft_magnitude = fft_magnitude.reshape(-1, 1)

        scaler = MinMaxScaler()
        fft_normalized = scaler.fit_transform(fft_magnitude).flatten()

        freqs = np.fft.rfftfreq(len(audio_segment), 1/sr)

        return freqs, fft_normalized
    except Exception as e:
        st.error(f"Error calculando FFT: {e}")
        return None, None

# Interfaz principal
tab1, tab2 = st.tabs(["Desde YouTube", "Desde Archivo"])

with tab1:
    st.header("Analizar canción de YouTube")
    youtube_url = st.text_input("Ingresa la URL de YouTube:")

    if youtube_url and st.button("Analizar canción"):
        if model_data is None:
            st.warning("El modelo no se ha cargado correctamente. No se puede realizar la predicción.")
        else:
            with st.spinner("Descargando y procesando audio..."):
                # Descargar audio usando la función original con yt-dlp API
                # Generate a unique name for the downloaded file to avoid conflicts
                temp_file_name = f"youtube_audio_{os.getpid()}_{int(time.time())}"
                mp3_file = download_ytvid_as_mp3(youtube_url, temp_file_name)


                if mp3_file is None:
                    st.stop()

                # Convertir a WAV
                wav_file = convert_to_wav(mp3_file)
                os.unlink(mp3_file)  # Eliminar MP3 temporal

                if wav_file is None:
                    st.stop()

                # Procesar audio
                audio_segment, sr = process_audio(wav_file, start_time, segment_duration)
                os.unlink(wav_file)  # Eliminar WAV temporal

                if audio_segment is not None:
                    # Calcular FFT
                    freqs, fft_norm = compute_fft(audio_segment, sr)

                    if freqs is not None:
                        # Mostrar resultados
                        col1, col2 = st.columns(2)

                        with col1:
                            st.subheader("Forma de onda")
                            fig1, ax1 = plt.subplots()
                            time_axis = np.linspace(0, segment_duration, len(audio_segment))
                            ax1.plot(time_axis, audio_segment[:, 0], label='Canal izquierdo')
                            if audio_segment.shape[1] > 1:
                                ax1.plot(time_axis, audio_segment[:, 1], label='Canal derecho', alpha=0.7)
                            ax1.set_xlabel("Tiempo (s)")
                            ax1.set_ylabel("Amplitud")
                            ax1.legend()
                            st.pyplot(fig1)

                        with col2:
                            st.subheader("Espectro de frecuencia")
                            fig2, ax2 = plt.subplots()
                            ax2.plot(freqs, 20*np.log10(fft_norm + 1e-10))
                            ax2.set_xlabel("Frecuencia (Hz)")
                            ax2.set_ylabel("Magnitud (dB)")
                            st.pyplot(fig2)

                        # Predecir género
                        # Verificar si el tamaño de fft_norm coincide con el esperado por el modelo
                        # expected_features = model_data['modelos'].n_features_in_ # Assuming the model has this attribute
                        if 'modelo' in model_data and model_data['modelo'] is not None: # Check if 'modelo' key exists and is not None
                            # Eliminamos la validación con n_features_in_ ya que el modelo cargado no lo tiene
                            prediction = model_data['modelo'].predict([fft_norm])[0]
                            genre = model_data.get('type', ['Desconocido'])[int(prediction-1)] # Usar .get para evitar KeyError y manejar índice
                            st.success(f"**Género predicho:** {genre}")
                        else:
                            st.error("Error: No se pudo acceder al modelo correctamente.")


with tab2:
    st.header("Analizar archivo de audio")
    uploaded_file = st.file_uploader("Sube un archivo de audio", type=["wav", "mp3"])

    if uploaded_file and st.button("Analizar archivo"):
         if model_data is None:
            st.warning("El modelo no se ha cargado correctamente. No se puede realizar la predicción.")
         else:
            with st.spinner("Procesando audio..."):
                # Guardar archivo temporal
                file_ext = os.path.splitext(uploaded_file.name)[1].lower()
                with tempfile.NamedTemporaryFile(suffix=file_ext, delete=False) as tmp_file:
                    tmp_file.write(uploaded_file.getbuffer())
                    tmp_path = tmp_file.name

                # Convertir a WAV si es MP3
                if file_ext == ".mp3":
                    wav_path = convert_to_wav(tmp_path)
                    os.unlink(tmp_path)
                    if wav_path is None:
                        st.stop()
                else:
                    wav_path = tmp_path

                # Procesar audio
                audio_segment, sr = process_audio(wav_path, start_time, segment_duration)
                os.unlink(wav_path)

                if audio_segment is not None:
                    # Calcular FFT
                    freqs, fft_norm = compute_fft(audio_segment, sr)

                    if freqs is not None:
                        # Mostrar resultados (similar a la pestaña de YouTube)
                        col1, col2 = st.columns(2)

                        with col1:
                            st.subheader("Forma de onda")
                            fig1, ax1 = plt.subplots()
                            time_axis = np.linspace(0, segment_duration, len(audio_segment))
                            ax1.plot(time_axis, audio_segment[:, 0], label='Canal izquierdo')
                            if audio_segment.shape[1] > 1:
                                ax1.plot(time_axis, audio_segment[:, 1], label='Canal derecho', alpha=0.7)
                            ax1.set_xlabel("Tiempo (s)")
                            ax1.set_ylabel("Amplitud")
                            ax1.legend()
                            st.pyplot(fig1)

                        with col2:
                            st.subheader("Espectro de frecuencia")
                            fig2, ax2 = plt.subplots()
                            ax2.plot(freqs, 20*np.log10(fft_norm + 1e-10))
                            ax2.set_xlabel("Frecuencia (Hz)")
                            ax2.set_ylabel("Magnitud (dB)")
                            st.pyplot(fig2)

                        # Predecir género
                        # Verificar si el tamaño de fft_norm coincide con el esperado por el modelo
                        # expected_features = model_data['modelos'].n_features_in_ # Assuming the model has this attribute
                        if 'modelo' in model_data and model_data['modelo'] is not None: # Check if 'modelo' key exists and is not None
                            # Eliminamos la validación con n_features_in_ ya que el modelo cargado no lo tiene
                            prediction = model_data['modelo'].predict([fft_norm])[0]
                            genre = model_data.get('type', ['Desconocido'])[int(prediction-1)] # Usar .get para evitar KeyError y manejar índice
                            st.success(f"**Género predicho:** {genre}")
                        else:
                             st.error("Error: No se pudo acceder al modelo correctamente.")

In [None]:
!mv 1_Detector_FFT.py pages/

Dasborad Rectificador RC

In [None]:
%%writefile 2_Rectificador_RC.py
import streamlit as st
import numpy as np
import matplotlib.pyplot as plt
import scipy.signal as sig
from scipy.fft import rfft, rfftfreq
import os # Import the os module

# Create the 'pages' directory if it doesn't exist
if not os.path.exists('pages'):
    os.makedirs('pages')


st.set_page_config(page_title="Rectificador RC - Análisis", layout="wide")
st.title("📈 Rectificador de Onda Completa con Carga RC")

# Sidebar: Parámetros del usuario
with st.sidebar:
    st.header("Configuración del Circuito")
    R = st.slider("Resistencia R (Ω)", 10, 5000, 100)
    C = st.slider("Capacitancia C (μF)", 1, 1000, 100)

# Configuración de simulación
Fo = 60           # Frecuencia de alimentación
Fs = 30 * Fo      # Frecuencia de muestreo
To = 1 / Fo       # Período fundamental
Ts = 1 / Fs       # Período de muestreo
t = np.arange(0, 5*To, Ts)  # Simular 5 ciclos
A = 170           # Amplitud de entrada (pico)

# Señal sinusoidal rectificada
in_o = A * np.sin(2 * np.pi * Fo * t)
rec_c = sig.square(2 * np.pi * Fo * t)
in_rc = in_o * rec_c  # Señal rectificada

# Función de transferencia del circuito RC
num = [1]
den = [R * C * 1e-6, 1]  # Convertir μF a F
system = sig.TransferFunction(num, den)
tout, out, _ = system.output(U=in_rc, T=t, X0=[0])

# FFT para análisis armónico
Xf = rfft(out)
vfre = rfftfreq(len(Xf), Ts)

# Asegurar que vfre y Xf tengan la misma longitud
min_len = min(len(vfre), len(Xf))
vfre = vfre[:min_len]
Xf_fft = Xf[:min_len]

# Calcular magnitud normalizada
magnitude = abs(Xf_fft / len(Xf_fft))

# Cálculo del THD
f0 = 60
fundamental_idx = np.argmin(np.abs(vfre - f0))
V1 = magnitude[fundamental_idx]

harmonics_sum = 0
for k in range(2, int(np.max(vfre) // f0) + 1):
    harmonic_freq = k * f0
    idx = np.argmin(np.abs(vfre - harmonic_freq))
    if idx < len(magnitude):
        Vk = magnitude[idx]
        harmonics_sum += Vk**2

THD = np.sqrt(harmonics_sum) / V1 if V1 > 0 else 0
PF = 1 / np.sqrt(1 + THD**2) if THD != 0 else 1

# Mostrar resultados
col1, col2 = st.columns(2)

with col1:
    st.subheader("Formas de Onda")
    fig, ax = plt.subplots(3, 1, figsize=(10, 8))
    ax[0].plot(t, in_o)
    ax[0].set_title("Voltaje de Entrada (Sinusoidal)")
    ax[1].plot(t, in_rc)
    ax[1].set_title("Señal Rectificada")
    ax[2].plot(tout, out)
    ax[2].set_title("Salida del Circuito RC")
    for a in ax:
        a.grid(True)
    plt.tight_layout()
    st.pyplot(fig)

with col2:
    st.subheader("Espectro de Frecuencias")
    fig2, ax2 = plt.subplots(figsize=(10, 4))
    ax2.stem(vfre, magnitude)
    ax2.set_xlabel("Frecuencia (Hz)")
    ax2.set_ylabel("Magnitud")
    ax2.set_title("Espectro de Frecuencias (FFT)")
    ax2.grid(True)
    st.pyplot(fig2)

    st.subheader("Resultados")
    st.metric(label="THD", value=f"{THD * 100:.2f}%")
    st.metric(label="Factor de Potencia (estimado)", value=f"{PF:.4f}")

st.markdown("""
### Notas:
- Un THD bajo indica una señal más limpia y menos armónicos.
- Un factor de potencia cercano a 1 significa mejor eficiencia energética.
""")


In [None]:
!mv 2_Rectificador_RC.py pages/

In [None]:
!wget https://github.com/cloudflare/cloudflared/releases/latest/download/cloudflared-linux-amd64
!chmod +x cloudflared-linux-amd64
!mv cloudflared-linux-amd64 /usr/local/bin/cloudflared

#Ejecutar Streamlit
!streamlit run 🎼_0_Detector.py &>/content/logs.txt & #Cambiar 0_👋_Hello.py por el nombre de tu archivo principal

#Exponer el puerto 8501 con Cloudflare Tunnel
!cloudflared tunnel --url http://localhost:8501 > /content/cloudflared.log 2>&1 &

#Leer la URL pública generada por Cloudflare
import time
time.sleep(5)  # Esperar que se genere la URL

import re
found_context = False  # Indicador para saber si estamos en la sección correcta

with open('/content/cloudflared.log') as f:
    for line in f:
        #Detecta el inicio del contexto que nos interesa
        if "Your quick Tunnel has been created" in line:
            found_context = True

        #Busca una URL si ya se encontró el contexto relevante
        if found_context:
            match = re.search(r'https?://\S+', line)
            if match:
                url = match.group(0)  #Extrae la URL encontrada
                print(f'Tu aplicación está disponible en: {url}')
                break  #Termina el bucle después de encontrar la URL

In [None]:
import os

res = input("Digite (1) para finalizar la ejecución del Dashboard: ")

if res.upper() == "1":
    os.system("pkill streamlit")  # Termina el proceso de Streamlit
    print("El proceso de Streamlit ha sido finalizado.")
