In [1]:
%%capture
!pip install chromadb
!pip install datasets
!pip install pyarrow
!pip install open-clip-torch
!pip install sentence-transformers
!pip install langchain_core langchain_openai
!pip install torchcodec

In [2]:
import IPython
from IPython.display import HTML, display, Image, Markdown, Video, Audio
from typing import Optional, Sequence, List, Dict, Union

import soundfile as sf

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

from google.colab import userdata

from sentence_transformers import SentenceTransformer
from transformers import ClapModel, ClapProcessor
from datasets import load_dataset

import chromadb
from chromadb.utils.embedding_functions import OpenCLIPEmbeddingFunction
from chromadb.utils.data_loaders import ImageLoader
from chromadb.api.types import Document, Embedding, EmbeddingFunction, URI, DataLoader

import numpy as np
import torchaudio
import base64
import torch
import json
import cv2
import os

In [3]:
path = "mm_vdb"
client = chromadb.PersistentClient(path=path)

In [4]:
ds = load_dataset("ashraq/esc50") #LOAD THE DATASET

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.
Repo card metadata block was not found. Setting CardData to empty.


In [5]:
# Define the directory to save audio files
path = "esc50"
os.makedirs(path, exist_ok=True)

# Process and save audio files
for item in ds['train']:
    audio_array = item['audio']['array']
    sample_rate = item['audio']['sampling_rate']
    file_name = item['filename']
    target_path = os.path.join(path, file_name)

    # Write the audio file to the new directory
    sf.write(target_path, audio_array, sample_rate)

print("All audio files have been processed and saved.")

All audio files have been processed and saved.


In [7]:
class AudioLoader(DataLoader[List[Optional[Dict[str, any]]]]):
    def __init__(self, target_sample_rate: int = 48000) -> None:
        self.target_sample_rate = target_sample_rate

    def _load_audio(self, uri: Optional[URI]) -> Optional[Dict[str, any]]:
        if uri is None:
            return None

        try:
            waveform, sample_rate = torchaudio.load(uri)

            # Resample if necessary
            if sample_rate != self.target_sample_rate:
                resampler = torchaudio.transforms.Resample(sample_rate, self.target_sample_rate)
                waveform = resampler(waveform)

            # Convert to mono if stereo
            if waveform.shape[0] > 1:
                waveform = torch.mean(waveform, dim=0, keepdim=True)

            return {"waveform": waveform.squeeze(), "uri": uri}
        except Exception as e:
            print(f"Error loading audio file {uri}: {str(e)}")
            return None

    def __call__(self, uris: Sequence[Optional[URI]]) -> List[Optional[Dict[str, any]]]:
        return [self._load_audio(uri) for uri in uris]

In [8]:
class CLAPEmbeddingFunction(EmbeddingFunction[Union[Document, Dict[str, any]]]):
    def __init__(
        self,
        model_name: str = "laion/larger_clap_general",
        device: str = None
    ) -> None:
        if device is None:
            device = "cuda" if torch.cuda.is_available() else "cpu"
        self.model = ClapModel.from_pretrained(model_name).to(device)
        self.processor = ClapProcessor.from_pretrained(model_name)
        self.device = device

    def _encode_audio(self, audio: torch.Tensor) -> Embedding:
        inputs = self.processor(audios=audio.numpy(), sampling_rate=48000, return_tensors="pt")
        inputs = {k: v.to(self.device) for k, v in inputs.items()}
        with torch.no_grad():
            audio_embedding = self.model.get_audio_features(**inputs)
        return audio_embedding.squeeze().cpu().numpy().tolist()

    def _encode_text(self, text: Document) -> Embedding:
        inputs = self.processor(text=text, return_tensors="pt")
        inputs = {k: v.to(self.device) for k, v in inputs.items()}
        with torch.no_grad():
            text_embedding = self.model.get_text_features(**inputs)
        return text_embedding.squeeze().cpu().numpy().tolist()

    def __call__(self, input: Union[List[Document], List[Optional[Dict[str, any]]]]) -> List[Optional[Embedding]]:
        embeddings = []
        for item in input:
            if isinstance(item, dict) and 'waveform' in item:
                embeddings.append(self._encode_audio(item['waveform']))
            elif isinstance(item, str):
                embeddings.append(self._encode_text(item))
            elif item is None:
                embeddings.append(None)
            else:
                raise ValueError(f"Unsupported input type: {type(item)}")
        return embeddings

In [11]:
audio_collection = client.get_or_create_collection(
    name='audio_collection',
    embedding_function=CLAPEmbeddingFunction(),
    data_loader=AudioLoader()
)

Fetching 1 files:   0%|          | 0/1 [00:00<?, ?it/s]

In [12]:
# Takes a couple mins with GPU
def add_audio(audio_collection, folder_path):
    # List to store IDs and URIs
    ids = []
    uris = []

    # Iterate through all files in the folder
    for filename in os.listdir(folder_path):
        if filename.endswith('.wav'):
            file_id = os.path.splitext(filename)[0]
            file_uri = os.path.join(folder_path, filename)

            ids.append(file_id)
            uris.append(file_uri)

    # Add files to the collection
    audio_collection.add(ids=ids, uris=uris)

# Running it
folder_path = 'esc50'
add_audio(audio_collection, folder_path)


  s = torchaudio.io.StreamReader(src, format, None, buffer_size)
