In [None]:
# @title Bibliotecas
!pip install pydub
!pip install gitpython
!rm -r Repositorio
import os #Parta iterar entre archivos
import pickle #Para serializar
import numpy as np
import matplotlib.pyplot as plt

from git import Repo
from scipy.io import wavfile
from pydub import AudioSegment
from google.colab import userdata
from IPython.display import Audio
from scipy.signal import find_peaks
from scipy.signal import spectrogram
from matplotlib.colors import LogNorm
from scipy.ndimage import label, maximum_filter

Collecting pydub
  Downloading pydub-0.25.1-py2.py3-none-any.whl (32 kB)
Installing collected packages: pydub
Successfully installed pydub-0.25.1
Collecting gitpython
  Downloading GitPython-3.1.40-py3-none-any.whl (190 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m190.6/190.6 kB[0m [31m3.3 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting gitdb<5,>=4.0.1 (from gitpython)
  Downloading gitdb-4.0.11-py3-none-any.whl (62 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m62.7/62.7 kB[0m [31m6.1 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting smmap<6,>=3.0.1 (from gitdb<5,>=4.0.1->gitpython)
  Downloading smmap-5.0.1-py3-none-any.whl (24 kB)
Installing collected packages: smmap, gitdb, gitpython
Successfully installed gitdb-4.0.11 gitpython-3.1.40 smmap-5.0.1
rm: cannot remove 'Repositorio': No such file or directory


In [None]:
# @title Clonación

ruta_local = "Repositorio"
url_repositorio = "https://"+ userdata.get('passGitHub')+ "@github.com/JC-UCuenca/FFT_shazam.git"

# Clonar el repositorio
try:
  Repo.clone_from(url_repositorio, ruta_local)
except:
  None

In [None]:
# @title convertir_a_mono
def convertir_a_mono(audio_mp3: str) -> str:
  """
  Carga el audio mp3 y lo guarda con un solo canal de audio en formato wav en
  la carpeta actual.

  Args:
      audio_mp3: Ruta del archivo mp3

  Returns:
      El nombre del archivo wav generado.
  """

  audio = AudioSegment.from_file(audio_mp3) # Obtiene el archivo mp3

  audio_mono = audio.set_channels(1) # Convertir a formato mono

  nombre_audio_wav = os.path.basename(audio_mp3)[:-4] + ".wav"
  audio_mono.export(nombre_audio_wav, format="wav")  # Guardar el archivo de audio mono

  return nombre_audio_wav

In [None]:
# @title get_magnitud_fase_frecuencias
def get_magnitud_fase_frecuencias(data: list, sample_rate: int) -> tuple:
  """
  Calcula las frecuencias de los datos del audio. Aplica la FFT a los
  datos y calcula la magnitud y fase de los mismos.

  Args:
      data: Datos del audio a analizar.
      sample_rate: Frecuencia de muestreo de los datos.

  Returns:
      Una tupla de tres elementos. El primero corresponde a la magnitud de la
      FFT de los datos del audio. El segunda es la fase de los mismos. El
      último elemento corresponde a las frecuencias totales de la señal de audio
  """

  frecuencias = np.fft.fftfreq(len(data), d=1/sample_rate)
  data_fft = np.fft.fft(data)
  magnitud = np.abs(data_fft)
  fase = np.angle(data_fft)

  return magnitud, fase, frecuencias

In [None]:
# @title get_ventanas_tiempos
def get_ventanas_tiempos(data: list, sample_rate: int, ancho_ventana: int, overlap: int) -> tuple:
  """
  Divide el audio en varias ventanas (segmentos), las ventanas se
  sobreponen de acuerdo al overlap. También calcula el tiempo medio
  de cada ventana. Si la ventana tiene menos datos que ancho_ventana,
  entonces la elimina de la lista.

  Args:
      data: Datos del audio a analizar.
      sample_rate: Frecuencia de muestre del audio.
      ancho_ventana: Cantidad de datos que se toma por cada ventana.
        Además debe de ser potencia de 2.
      overlap: Cantidad de sobreposición que existe entre cada ventana.
        Además debe de ser potencia de 2 y menor que ancho_ventana.

  Returns:
      Regresa dos elementos. El primero es una lista con las ventanas
      de la señal de audio. El segundo en una lista con los tiempos
      medios de cada ventana.
  """

  tiempos_medios = list()
  ventanas = list()
  for i in range(0, len(data)-1, ancho_ventana - overlap):
    tiempos_medios.append((i + (ancho_ventana//2)) / sample_rate)
    ventana_i = data[i: i + ancho_ventana].tolist()
    ventanas.append(ventana_i)

  while len(ventanas[-1]) < ancho_ventana:  #Elimina ventanas incompletas con sus tiempos
    ventanas.pop(-1)
    tiempos_medios.pop(-1)

  return ventanas, tiempos_medios

In [None]:
# @title get_ventanasHann_FFT
def get_ventanasHann_FFT(ventanas: list) -> tuple:
  """
  Multiplica cada ventana de la lista con una ventana Hann del mismo ancho
  de cada ventana. Aplica la FFT a cada ventana multiplicada utilizando
  un método optimizado de la FFT para datos reales.

  Args:
      ventanas: Lista de ventanas de la segmentación del audio analizado.

  Returns:
      Una tupla de dos elementos. El primero es una lista de las ventanas del
      audio analizado, multiplicadas por la ventana Hann. El segundo es la FFT
      del primer elemento de la tupla
  """

  ventana_hann = np.hanning(len(ventanas[0]))
  ventanas_hanneadas = [ventana * ventana_hann for ventana in ventanas]
  ventanas_hanneadas_fft = [np.fft.rfft(ventana) for ventana in ventanas_hanneadas]

  return ventanas_hanneadas, ventanas_hanneadas_fft

In [None]:
# @title get_frecuenciasPositivas
def get_frecuenciasPositivas(ancho_ventana: int, sample_rate: int) -> list:
  """
  Calcula las frecuencias positivas para graficar en el espectrograma.

  Args:
      ancho_ventana: Cantidad de datos que se toma por cada ventana.
        Además debe de ser potencia de 2.
      sample_rate: Frecuencia de muestreo del audio.
  Returns:
      Una lista con las frecuencias positivas de las ventanas.
  """

  frecuencias = np.fft.fftfreq(ancho_ventana, d=1/sample_rate)
  frecuencias = np.fft.fftshift(frecuencias)
  frecuencias = frecuencias[len(frecuencias)//2:]
  #Agregar la última frecuencia positiva
  frecuencias = np.append(frecuencias, frecuencias[-1] + frecuencias[1])

  return frecuencias

In [None]:
# @title get_amplitudesVentanasFFT
def get_amplitudesVentanasFFT(cant_frecuencias: int, ventanas_FFT: list) -> np.ndarray:
  """
  Calcula la matriz 2D de amplitudes a utilizar para graficar el espetrograma.

  Args:
      cant_frecuencias: Cantidad de frecuencias positivas para el espectrograma.
        Es decir, las filas de la matriz.
      ventanas_FFT: Ventanas multiplicadas con la ventana de Hann y con la FFT.
        En otras palabras, las columnas de la matriz.

  Returns:
      Una matriz 2D con las apmplitudes de cada ventana. El índice x corresponde
      al índice del tiempo. El índice y corresponde al índice de las frecuencias
      positivas.
  """

  amplitudes = np.zeros((cant_frecuencias, len(ventanas_FFT))) #Crea la matriz de amplitudes para el espectrograma

  for i in range(len(ventanas_FFT)):
    amplitudes[:, i] = np.abs(ventanas_FFT[i]) #Inserta las amplitudes en la columna i de la matriz

  return amplitudes

In [None]:
# @title get_picos_coordenadas
def get_picos_coordenadas(num_picos: int, amplitudes: np.ndarray) -> tuple:
  # Asegurarse de que el número de picos por segmento es razonable
  num_segmentos = 10
  if num_picos < num_segmentos:
      raise ValueError("El número total de picos debe ser mayor que el número de segmentos")

  # Encontrar picos locales en toda la matriz
  maximos_locales = (amplitudes == maximum_filter(amplitudes, size=(3, 3)))

  # Calcular el número de columnas (tiempo) por segmento
  num_columnas = amplitudes.shape[1]
  columnas_por_segmento = num_columnas // num_segmentos

  picos_seleccionados = []

  for i in range(num_segmentos):
      inicio = i * columnas_por_segmento
      fin = inicio + columnas_por_segmento if i < num_segmentos - 1 else num_columnas

      # Encontrar picos en el segmento actual
      picos_segmento = np.column_stack(np.where(maximos_locales[:, inicio:fin]))
      picos_segmento[:, 1] += inicio  # Ajustar la columna a la posición global

      # Ordenar los picos por amplitud y seleccionar los top N
      picos_segmento_amplitudes = [amplitudes[pico[0], pico[1]] for pico in picos_segmento]
      picos_ordenados = sorted(zip(picos_segmento_amplitudes, picos_segmento), key=lambda x: x[0], reverse=True)
      num_picos_por_segmento = max(1, num_picos // num_segmentos)
      picos_seleccionados.extend(picos_ordenados[:num_picos_por_segmento])

  # Ajustar el número de picos seleccionados al total deseado
  picos_seleccionados = sorted(picos_seleccionados, key=lambda x: x[0], reverse=True)[:num_picos]
  coordenadas_picos = [pico[1] for pico in picos_seleccionados]  # Extraer solo las coordenadas

  return picos_seleccionados, coordenadas_picos

In [None]:
# @title get_TFA
def get_TFA(tiempos: list, frecuencias: list, picos: list, coordenadas: list) -> list:
  """
  Com los parámetros datos genera una lista donde cada elementos almacena
  el tiempo, frecuencia y amplitud de los picos filtrados. El cálculo se basa
  en el hecho que los índices "x" de las coordenadas corresponden a
  los índices de los tiempos. Los mismo con los índices "y" de las frecuencias.

  Args:
      tiempos: Tiempos medios de cada ventana.
      frecuencias: Frecuencias positivas para el espectrograma.
      picos: Amplitudes más altas filtradas.
      coordenadas: Coordenas de las amplitudes más altas filtradas en
        su matriz.

  Returns:
      Una lista con una tupla de tres elementos. El primero corresponde al
      tiempo en donde ocurre el pico, segundo es la frecuencia a la que ocurre,
      y el tercero es la amplitud del pico
  """
  TFA = list() # Lista con tuplas (tiempo, frecuencia, amplitud)
  for i in range(len(picos)):
    TFA.append((tiempos[coordenadas[i][1]],
                frecuencias[coordenadas[i][0]], picos[i]))

  TFA.sort(key=lambda x: x[0])

  return TFA

In [None]:
# @title get_FFD
def get_FFD(TFA: list, distancia_maxima=2) -> list:
  """
  Con los parámteros datos genera una lista donde cada elementos almacena dos
  frecuencias y la distancia en segundos entre las dos. Relaciona cada elemento
  de TFA un única vez con su elemento siguiente.

  Args:
      TFA: Lista con el tiempo, frecuencia y amplitud correspondiente a los
        picos filtrados.
      distancia_maxima: Distancia máxima de tolerancia en segundos que puede
      existir entre los elementos de TFA

  Returns:
      Una lista con una tupla de tres elementos. El primero corresponde al
      tiempo en donde ocurre el pico, segundo es la frecuencia a la que ocurre,
      y el tercero es la amplitud del pico
  """

  FFD = list() # Lista con tuplas (frecuencia1, frecuencia2, distancia en segundos)
  for i in range(len(TFA) -1):
    distancia = TFA[i+1][0] - TFA[i][0]

    if not (distancia > distancia_maxima):
      FFD.append((TFA[i][1], TFA[i+1][1], distancia))

  return FFD

In [None]:
# @title recopilar_archivos
def recopilar_archivos(carpeta: str) -> list:
  """
  Regresa todos los archivos de la carpeta pasada.

  Args:
      carpeta: Dirección de la carpeta en donde se encuentran los archivos a
        recuperar.

  Returns:
      Una lista los nombres de los archivos dentro de la carpeta del parámetro.
  """
  direcciones_archivos = list()
  for raiz, carpeta, archivos in os.walk(carpeta):
      for nombre_archivo in archivos:
          archivo = os.path.join(raiz, nombre_archivo)
          direcciones_archivos.append(archivo)

  return direcciones_archivos

In [None]:
# @title normalizar_audio
def normalizar_audio(data):
  """
  Divide la lista para el máximo valor aboluto de los elementos
  contenidos en dicha lista para normalizar los valores.

  Args:
      data: Datos del audio de la funcion wavfile.read

  Returns:
      Datos normalizados
  """
  # Encuentra el valor absoluto máximo en la señal
  max_val = np.max(np.abs(data))

  # Evita la división por cero
  if max_val == 0:
      return data

  # Escala la señal para que el valor pico sea 1 o -1
  data_normalizada = data / max_val
  return data_normalizada

In [None]:
# @title guardar_canciones_datos
def guardar_canciones_datos(canciones: list, direccion: str) -> None:
  """
  Analiza cada cancion de la lista y almacena el nombre de la cancion, su TFA,
  FFD en un diccionario. El diccionario se serealiza y se guarda en un archivo
  en formato data.

  Args:
      canciones: Ruta completa de las canciones a almacenar.

  Returns:
      None
  """
  for cancion_mp3 in canciones:
    cancion_wav = convertir_a_mono(cancion_mp3)
    sample_rate, data = wavfile.read(cancion_wav)
    data = normalizar_audio(data)
    tiempo = np.arange(0, len(data)) / sample_rate

    ancho_ventana = 1024
    ventanas, tiempos_medios = get_ventanas_tiempos(data, sample_rate, ancho_ventana, overlap=256)
    ventanas_hanneadas, ventanas_hanneadas_FFT = get_ventanasHann_FFT(ventanas)

    frecuencias = get_frecuenciasPositivas(ancho_ventana, sample_rate)
    amplitudesFFT = get_amplitudesVentanasFFT(len(frecuencias), ventanas_hanneadas_FFT)

    picos_filtrados, coordenadas_filtradas = get_picos_coordenadas(750, amplitudesFFT)

    print('Canción:', cancion_wav)
    print('Cantidad de ventanas:', len(ventanas))
    print('')

    TFA = get_TFA(tiempos_medios, frecuencias, picos_filtrados, coordenadas_filtradas)
    FFD = get_FFD(TFA)

    nombre_cancion = os.path.basename(cancion_mp3)[:-4]
    datos = {
        'cancion': nombre_cancion,
        'TFA': TFA,
        'FFD': FFD,
        'duracion': tiempo[-1],
        'amplitud': amplitudesFFT,
        'frecuencia': frecuencias,
        'tiempo': tiempos_medios
    }

    if not os.path.exists(direccion):
      os.makedirs(direccion)
      print("Carpeta creada")

    nombre_archivo = os.path.join(direccion, f"{nombre_cancion}.data")

    # nombre_archivo = direccion + nombre_cancion + '.data'
    with open(nombre_archivo, 'wb') as archivo:
      pickle.dump(datos, archivo)


In [None]:
def actualizar_GitHub():
  repo = Repo('/content/Repositorio')
  repo.git.add("--all")
  repo.index.commit("")

  # Hacer push al repositorio remoto
  a = repo.git.push("origin", "main", "--force")

In [None]:
carpeta = '/content/Repositorio/canciones_originales'
canciones = recopilar_archivos(carpeta)
carpeta2 = '/content/Repositorio/canciones_originales_data'
!rm /content/Repositorio/canciones_originales_data/*.data
actualizar_GitHub()
guardar_canciones_datos(canciones, carpeta2)
actualizar_GitHub()

Canción: One More - S3RL & Atef ft Hannah Fortune & lowstattic.wav
Cantidad de ventanas: 10573
len(picos_filtrados)=750
len(coordenadas_filtradas)=750

Canción: Lady Gaga - Bad Romance.wav
Cantidad de ventanas: 17768
len(picos_filtrados)=750
len(coordenadas_filtradas)=750

Canción: Yo-Yo Ma - Bach Cello Suite No. 1 in G Major, Prélude.wav
Cantidad de ventanas: 8212
len(picos_filtrados)=750
len(coordenadas_filtradas)=750

Canción: virtual diva - don omar.wav
Cantidad de ventanas: 13618
len(picos_filtrados)=750
len(coordenadas_filtradas)=750

Canción: Beethoven - Para Elisa.wav
Cantidad de ventanas: 10151
len(picos_filtrados)=750
len(coordenadas_filtradas)=750

Canción: Влюблино.wav
Cantidad de ventanas: 10448
len(picos_filtrados)=750
len(coordenadas_filtradas)=750

Canción: PXNDX Los Malaventurados No Lloran.wav
Cantidad de ventanas: 11797
len(picos_filtrados)=750
len(coordenadas_filtradas)=750

Canción: Andrea Bocelli, Sarah Brightman - Time To Say Goodbye.wav
Cantidad de ventanas: 126