In [8]:
# install and import dependencies

%pip install datasets transformers wget opensearch-py envtpl gdown librosa torch torchvision torchaudio soundfile ipywidgets pyaudio

from transformers import pipeline, ClapModel, ClapProcessor, AutoTokenizer
import IPython as ip
import librosa
import json
import csv
from opensearchpy import OpenSearch
from opensearchpy.helpers import bulk
import os

Collecting soundfile
  Using cached soundfile-0.12.1-py2.py3-none-macosx_11_0_arm64.whl.metadata (14 kB)
Using cached soundfile-0.12.1-py2.py3-none-macosx_11_0_arm64.whl (1.1 MB)
Installing collected packages: soundfile
Successfully installed soundfile-0.12.1
Note: you may need to restart the kernel to use updated packages.


In [None]:
!docker-compose up -d

In [2]:
# check opensearch connection

!curl -X GET http://localhost:9200

{
  "name" : "opensearch-node1",
  "cluster_name" : "opensearch-cluster",
  "cluster_uuid" : "ujpQJtHiSX2MwrT14-m2AA",
  "version" : {
    "distribution" : "opensearch",
    "number" : "2.12.0",
    "build_type" : "tar",
    "build_hash" : "2c355ce1a427e4a528778d4054436b5c4b756221",
    "build_date" : "2024-02-20T02:20:12.084014282Z",
    "build_snapshot" : false,
    "lucene_version" : "9.9.2",
    "minimum_wire_compatibility_version" : "7.10.0",
    "minimum_index_compatibility_version" : "7.0.0"
  },
  "tagline" : "The OpenSearch Project: https://opensearch.org/"
}


In [3]:
# create opensearch client

host = 'localhost'
port = 9200

# Create the client with ssl and auth disabled, NOT to be used for production!
client = OpenSearch(
    hosts = [{'host': host, 'port': port}],
    http_compress = True, # enables gzip compression for request bodies
    use_ssl = False,
    verify_certs = False,
    ssl_assert_hostname = False,
    ssl_show_warn = False,
)

print(client.info())

{'name': 'opensearch-node1', 'cluster_name': 'opensearch-cluster', 'cluster_uuid': 'ujpQJtHiSX2MwrT14-m2AA', 'version': {'distribution': 'opensearch', 'number': '2.12.0', 'build_type': 'tar', 'build_hash': '2c355ce1a427e4a528778d4054436b5c4b756221', 'build_date': '2024-02-20T02:20:12.084014282Z', 'build_snapshot': False, 'lucene_version': '9.9.2', 'minimum_wire_compatibility_version': '7.10.0', 'minimum_index_compatibility_version': '7.0.0'}, 'tagline': 'The OpenSearch Project: https://opensearch.org/'}


In [4]:
# Create clap index if it doesn't already exist.

index_name = 'clap'

response = client.indices.exists(index=index_name)
print('\nDoes Index already exist?')
print(response)
if response == True:
  print('Skipping creating index')
else:
  # generate the index mappings and settings and create the index
  f = open('./clap_mapping.json')
  index_mappings_and_settings = json.load(f)

  response = client.indices.create(index_name, body=index_mappings_and_settings)
  print('\nCreating index:')
  print(response)


Does Index already exist?
True
Skipping creating index


In [5]:
# init ML processors/models/tokenizers
audio_classifier = pipeline(task="zero-shot-audio-classification", model="laion/larger_clap_music_and_speech")
model = ClapModel.from_pretrained("laion/larger_clap_music_and_speech")
processor = ClapProcessor.from_pretrained("laion/larger_clap_music_and_speech")
tokenizer = AutoTokenizer.from_pretrained("laion/larger_clap_music_and_speech")

In [6]:
# function to create audio embedding

def embed_audio(filepath):
    y, sr = librosa.load(filepath)
    inputs = processor(audios=y, return_tensors="pt", sampling_rate=48000)
    audio_embed = model.get_audio_features(**inputs)
    arr = audio_embed.detach().numpy()
    return arr

In [13]:
# List to store documents for bulk indexing
bulk_docs = []
# es bulk batch size
batch_size = 100

# Function to perform bulk indexing
def bulk_index_documents(documents):
    actions = []
    for doc in documents:
        action = {
            "_index": index_name,
            "_source": doc
        }
        actions.append(action)
    
    bulk(client, actions)

In [None]:
# FMA Audio Set

# define range for fma sub directories
start_fma_directory = 0
end_fma_directory = 155  

# Define the common part of fma directory path
base_directory = '../audio_data/fma/data/fma_small/'

# csv with metadata for fma tracks
fma_metadata = '../audio_data/fma/data/fma_metadata/raw_tracks.csv'


# Read CSV file into a dictionary for easy lookup
fma_mapping = {}
with open(fma_metadata, 'r') as f:
    reader = csv.DictReader(f)
    for row in reader:
        fma_mapping[row['track_id']] = {'artist': row['artist_name'], 'title': row['track_title'], 'album': row['album_title'], 'genres': row['track_genres']}

# Iterate over all audio files in the directory and generate es doc
for directory_number in range(start_fma_directory, end_fma_directory + 1):
    # Construct the directory path
    fma_directory = os.path.join(base_directory, f"{directory_number:03d}")

    for filename in os.listdir(fma_directory):
        track_file = filename.lstrip('0')
        track_id = track_file[:-4]

        genres_arr = fma_mapping[track_id]['genres'].replace("'", '"')
        genres_j = json.loads(genres_arr)
        genres = [genre['genre_title'] for genre in genres_j]
        if filename.endswith(".mp3"):
            filepath = os.path.join(fma_directory, filename)
            print("Processing:", filepath)
            print("track_id: ", track_id)

            # y, sr = librosa.load(filepath)
            # inputs = processor(audios=y, return_tensors="pt", sampling_rate=48000)
            # audio_embed = model.get_audio_features(**inputs)
            # arr = audio_embed.detach().numpy()

            arr = embed_audio(filepath)

            doc = {
                "audio_embedding": arr[0],
                "audio_set": "fma",
                "title": fma_mapping[track_id]['title'],
                "artist": fma_mapping[track_id]['artist'],
                "album": fma_mapping[track_id]['album'],
                "track_id": track_id,
                "genres": genres,
                "filepath": filepath,
            }

            # Add document to bulk indexing list
            bulk_docs.append(doc)
            
            # Perform bulk indexing if batch size is reached
            if len(bulk_docs) == batch_size:
                bulk_index_documents(bulk_docs)
                bulk_docs = []
                
# Index any remaining documents
if bulk_docs:
    bulk_index_documents(bulk_docs)

print("Bulk indexing completed.")


In [None]:
# Vocal Imitations Audio Set

# Iterate over all audio files in the directory and generate es doc
vocal_imitations_directory = '../audio_data/vocal_imitations/included/'
for filename in os.listdir(vocal_imitations_directory):
    track_id = filename[:-4]

    if filename.endswith(".wav"):
        filepath = os.path.join(vocal_imitations_directory, filename)
        print("Processing:", filepath)
        print("track_id: ", track_id)

        # y, sr = librosa.load(filepath)
        # inputs = processor(audios=y, return_tensors="pt", sampling_rate=48000)
        # audio_embed = model.get_audio_features(**inputs)
        # arr = audio_embed.detach().numpy()

        arr = embed_audio(filepath)

        doc = {
            "audio_embedding": arr[0],
            "audio_set": "vocal_imitations",
            "track_id": track_id,
            "filepath": filepath,
        }

        # Add document to bulk indexing list
        bulk_docs.append(doc)
        
        # Perform bulk indexing if batch size is reached
        if len(bulk_docs) == batch_size:
            bulk_index_documents(bulk_docs)
            bulk_docs = []
            
# Index any remaining documents
if bulk_docs:
    bulk_index_documents(bulk_docs)

print("Bulk indexing completed.")

In [None]:
# FUSS Audio Set

# Iterate over all audio files in the directory and generate es doc
fuss_directory = '../audio_data/FUSS/source_pure/fsd_data/train/sound'
for filename in os.listdir(fuss_directory):
    track_id = filename[:-4]

    if filename.endswith(".wav"):
        filepath = os.path.join(fuss_directory, filename)
        print("Processing:", filepath)
        print("track_id: ", track_id)

        arr = embed_audio(filepath)

        doc = {
            "audio_embedding": arr[0],
            "audio_set": "fuss",
            "track_id": track_id,
            "filepath": filepath,
        }

        # Add document to bulk indexing list
        bulk_docs.append(doc)
        
        # Perform bulk indexing if batch size is reached
        if len(bulk_docs) == batch_size:
            bulk_index_documents(bulk_docs)
            bulk_docs = []
            
# Index any remaining documents
if bulk_docs:
    bulk_index_documents(bulk_docs)

print("Bulk indexing completed.")

In [None]:
# Check how many docs have audio_embedding
query = {
  'size': 5,
  'query': {
    'exists': {'field': 'audio_embedding'}
  }
}

response = client.search(
    body = query,
    index = index_name
)
print('\nSearch results:')
print(response['hits']['total'])

In [21]:
# similarity search with text input against audio_embeddings

query = input('type a search query: ')
text_data = tokenizer([query], padding=True, return_tensors="pt")
text_embed = model.get_text_features(**text_data)
text_arr = text_embed.detach().numpy()[0]

# Search for the document.
query = {
  'size': 5,
  'query': {
    'knn': {
        'audio_embedding': {
            'k': 10,
            'vector': text_arr
        }
    }
  }
}


response = client.search(
    body = query,
    index = index_name
)
print('\nSearch results:')
hits = response['hits']['hits']

def displayResults(hits):
  for hit in hits:
    if 'title' in hit['_source'] and 'genres' in hit['_source']:
      ip.display.display(hit['_source']['title'], hit['_source']['genres'])
    ip.display.display(hit['_score'])
    filepath = hit['_source']['filepath']
    ip.display.display(ip.display.Audio(filepath))

displayResults(hits)



Search results:


'Dieter Rams'

['Soundtrack', 'Instrumental']

0.49459127

'Silent Passing'

['Ambient']

0.48520157

'Girls On A Swing'

['Folk']

0.47270966

'Seeking'

['Pop']

0.47017682

'Butterfly'

['Experimental Pop']

0.46832645

In [None]:
%pip install ipywidgets

In [None]:
%pip install pyaudio

In [None]:
import ipywidgets as widgets
import pyaudio
import wave
import threading

recording = False
audio_data = []
filename = input('enter a name for the recording')
ip.display.display("filename: " + filename + ".wav")

def record_audio():
    global recording
    global audio_data

    CHUNK = 1024
    FORMAT = pyaudio.paInt16
    CHANNELS = 1
    RATE = 44100

    audio = pyaudio.PyAudio()

    # Enumerate available audio devices
    num_devices = audio.get_device_count()
    devices = [audio.get_device_info_by_index(i) for i in range(num_devices)]

    # Find the index of the desired device by name
    desired_device_name = 'MacBook Pro Microphone'
    desired_device_index = None
    for i, device in enumerate(devices):
        if device['name'] == desired_device_name:
            desired_device_index = i
            break

    stream = audio.open(format=FORMAT,
                        channels=CHANNELS,
                        rate=RATE,
                        input=True,
                        input_device_index=desired_device_index,
                        frames_per_buffer=CHUNK)

    audio_data = []
    print("Recording...")

    while recording:
        data = stream.read(CHUNK)
        audio_data.append(data)

    print("Finished recording")

    stream.stop_stream()
    stream.close()
    audio.terminate()

def on_button_clicked(button):
    global recording
    global audio_data

    if button.description == "Record":
        button.description = "Stop"
        recording = True
        threading.Thread(target=record_audio).start()
        
    else:
        button.description = "Record"
        recording = False

        if audio_data:
            wf = wave.open(f"./recordings/{filename}.wav", 'wb')
            wf.setnchannels(1)
            wf.setsampwidth(pyaudio.PyAudio().get_sample_size(pyaudio.paInt16))
            wf.setframerate(44100)
            wf.writeframes(b''.join(audio_data))
            wf.close()
        else:
            print("No audio data recorded.")

# Create the button widget
record_button = widgets.Button(description="Record")

# Define the button click event handler
record_button.on_click(on_button_clicked)

# Display the button
display(record_button)