In [1]:
#Import ImageBind
import torch
from imagebind import data
from imagebind.models import imagebind_model
from imagebind.models.imagebind_model import ModalityType
import os
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"


  from .autonotebook import tqdm as notebook_tqdm


In [2]:
#Import para FAISS
import numpy as np
import faiss
import json
import glob

In [3]:
#Import para AUDIO
import sounddevice as sd
import wave
import threading
import soundfile as sf
from lameenc import Encoder

In [4]:
#Funciones ImageBind.
def GenerarEmbeddings(audio_path):
    audio_data = data.load_and_transform_audio_data([audio_path], device)

    with torch.no_grad():
        embeddings = model({ModalityType.AUDIO: audio_data})

    audio_embeddings = embeddings[ModalityType.AUDIO].cpu().numpy().flatten()

    return audio_embeddings

In [5]:
#Funciones .json.
def CargarLabels(path):
    with open(path, "r") as file:
        dict_labels = json.load(file)
    return dict_labels

def GuardarLabels(path, dict_labels):
    with open(path, "w") as file:
        json.dump(dict_labels, file)

def GenerarNuevoLabel(path, label):
    dict_labels = CargarLabels(path)
    values = list(dict_labels.values())
    dict_labels[label] = max(values) + 1
    GuardarLabels(path, dict_labels)

In [31]:
#Funciones FAISS.

def CrearIDX(path, dimension):
    idx_index = faiss.IndexIDMap(faiss.IndexFlatIP(dimension))
    faiss.write_index(idx_index, path)

def LeerIDX(path):
    idx_index = faiss.read_index(path)
    return idx_index

def InsertarAudio(list_emb, label, path_label, path_faiss):
    #CheckeoDeLabel
    while True:
        dict_labels = CargarLabels(path_label)
        if label in dict_labels.keys(): 
            break
        GenerarNuevoLabel(path_label, label)

    labels = [dict_labels[label]] * len(list_emb)

    faiss.normalize_L2(list_emb)

    idx_index = LeerIDX(path_faiss)

    idx_index.add_with_ids(list_emb, labels) 

    faiss.write_index(idx_index, path_faiss)

def BuscarIDX(idx_index, dict_labels, query):
    faiss.normalize_L2(query)
    resultado, id = idx_index.search(query, 1)
    label = [label for label,v in dict_labels.items() if v == id.item()]
    return (resultado.item(), label[0])

def BusquedaAudio(audio_path, idx_path, label_path, i):
    idx_index = LeerIDX(idx_path)
    dict_labels = CargarLabels(label_path)
    query = GenerarEmbeddings(audio_path).reshape(1, -1)
    resultado = BuscarIDX(idx_index, dict_labels, query)
    if float(resultado[0]) > 0.75: 
        print(f"Análisis {i}: {resultado}")
    else:
       print(f"Análisis: No se reconoció.")

def PrepararAudios(path):
    if os.path.exists(path):
      audios_wav = glob.glob(os.path.join(path, '*'))
      if len(audios_wav) > 0:
        list_emb = np.array([GenerarEmbeddings(i) for i in audios_wav])
        return list_emb
      else:
        print('No existen audios en el path.')
    else:
      print('No existe el path.')

In [7]:
#Funciones de Audio
def GenerarAudio(hz, tiempo, i, path):
    print(f"\nGrabando: {i}")
    audio = sd.rec(int(tiempo * hz), samplerate= hz, dtype='int16', channels=1)
    sd.wait()
    filename = f"{path}/demo{i}"
    
    #Hilo de guardado
    thread_guardado = threading.Thread(target = GuardarAudio, args = (filename , 1, hz, audio, i))
    thread_guardado.start()
    
def GuardarAudio(filename, channels, hz, audio, i ):
    print(f"Guardando {i}: {filename}")
    # with wave.open(f"{filename}.wav", 'wb') as wf:
    #     wf.setnchannels(channels)
    #     wf.setsampwidth(2) 
    #     wf.setframerate(hz) 
    #     wf.writeframes(audio.tobytes()) 

    encoder = Encoder()
    encoder.set_bit_rate(128)         # Tasa de bits (calidad)
    encoder.set_in_sample_rate(hz)   # Frecuencia de muestreo
    encoder.set_channels(channels)   # Número de canales
    encoder.set_quality(2)           # Calidad (más bajo es mejor)

    # Codificar los datos en MP3
    mp3_data = encoder.encode(audio.tobytes())
    mp3_data += encoder.flush()

    # Guardar el archivo MP3
    with open(f"{filename}.mp3", "wb") as f:
        f.write(mp3_data)
    
    #Hilo de búsqueda de embedding
    thread_busqueda = threading.Thread(target= BusquedaAudio, args = (filename + '.mp3', 'idx_index.faiss', 'labels.json', i))
    thread_busqueda.start()
    
def ComprimirAudioFLAC(filename):
    data, samplerate = sf.read(f"{filename}.wav")
    sf.write(f"{filename}.flac", data, samplerate, format='FLAC')
    print(f"Comprimido en: {filename}.flac")
    
    
    thread_eliminar = threading.Thread(target= EliminarWAV, args = (filename,))
    thread_eliminar.start()
     
def EliminarWAV(filename):
    path_wav = f"{filename}.wav"
    if os.path.exists(path_wav):
        os.remove(path_wav)
        print(f"Eliminando: {path_wav}")

In [8]:
def Inicializar(carpetas):
    CrearIDX('idx_index.faiss', 1024)
    GuardarLabels('labels.json', {'Génesis': 0})
    for label in carpetas:
        list_emb = PrepararAudios(f'./{label}')
        InsertarAudio(list_emb, label, 'labels.json', 'idx_index.faiss')

In [9]:
device = "cuda" if torch.cuda.is_available() else "cpu"
model = imagebind_model.imagebind_huge(pretrained=True)
model.eval()

ImageBindModel(
  (modality_preprocessors): ModuleDict(
    (vision): RGBDTPreprocessor(
      (cls_token): tensor((1, 1, 1280), requires_grad=True)
      
      (rgbt_stem): PatchEmbedGeneric(
        (proj): Sequential(
          (0): PadIm2Video()
          (1): Conv3d(3, 1280, kernel_size=(2, 14, 14), stride=(2, 14, 14), bias=False)
        )
      )
      (pos_embedding_helper): SpatioTemporalPosEmbeddingHelper(
        (pos_embed): tensor((1, 257, 1280), requires_grad=True)
        
      )
    )
    (text): TextPreprocessor(
      (pos_embed): tensor((1, 77, 1024), requires_grad=True)
      (mask): tensor((77, 77), requires_grad=False)
      
      (token_embedding): Embedding(49408, 1024)
    )
    (audio): AudioPreprocessor(
      (cls_token): tensor((1, 1, 768), requires_grad=True)
      
      (rgbt_stem): PatchEmbedGeneric(
        (proj): Conv2d(1, 768, kernel_size=(16, 16), stride=(10, 10), bias=False)
        (norm_layer): LayerNorm((768,), eps=1e-05, elementwise_affine=

In [33]:
device = 1
hz = 44100
tiempo = 2
device = "cuda" if torch.cuda.is_available() else "cpu"
model.to(device)
carpetas = ['Seba','Diego']

# Inicializar(carpetas)

In [34]:
i = 0
while True:
    GenerarAudio(hz, tiempo, i, './audio_rt')
    i += 1


Grabando: 0
Guardando 0: ./audio_rt/demo0
Grabando: 1

Análisis: No se reconoció.
Guardando 1: ./audio_rt/demo1

Grabando: 2
Análisis: No se reconoció.
Guardando 2: ./audio_rt/demo2

Grabando: 3
Análisis: No se reconoció.
Guardando 3: ./audio_rt/demo3

Grabando: 4
Análisis 3: (0.8316513299942017, 'Seba')
Guardando 4: ./audio_rt/demo4
Grabando: 5

Análisis: No se reconoció.
Guardando 5: ./audio_rt/demo5

Grabando: 6
Análisis: No se reconoció.
Guardando 6: ./audio_rt/demo6

Grabando: 7
Análisis: No se reconoció.
Guardando 7: ./audio_rt/demo7

Grabando: 8
Análisis: No se reconoció.
Guardando 8: ./audio_rt/demo8

Grabando: 9
Análisis: No se reconoció.
Guardando 9: ./audio_rt/demo9
Grabando: 10

Análisis: No se reconoció.
Guardando 10: ./audio_rt/demo10

Grabando: 11
Análisis: No se reconoció.
Guardando 11: ./audio_rt/demo11

Grabando: 12
Análisis: No se reconoció.
Guardando 12: ./audio_rt/demo12

Grabando: 13
Análisis: No se reconoció.
Guardando 13: ./audio_rt/demo13

Grabando: 14
Análisi

KeyboardInterrupt: 