In [1]:
import dotenv

dotenv.load_dotenv()

True

In [2]:
import os
import sys
import time
import warnings

import numpy as np
import s3fs
from cavmir.encoders.interface import AudioEncoder
from cavmir.encoders.muq import MuQEncoder

warnings.filterwarnings("ignore")

s3 = s3fs.S3FileSystem(anon=False)

audio_prefix = os.environ["AUDIO_PREFIX"]
embedding_prefix = os.environ["EMBEDDING_PREFIX"]


def list_files_on_s3(s3_path, file_type=None):
    all_files = s3.ls(s3_path)

    if file_type:
        all_files = [f for f in all_files if f.endswith(file_type)]

    return ["s3://" + f for f in all_files]


def create_embedding_uri(audio_uri, embedding_prefix, model_id):
    audio_file_name = os.path.basename(audio_uri).split(".")[0]
    embedding_file_name = f"{audio_file_name}.{model_id}.npy"

    return os.path.join(embedding_prefix, model_id, embedding_file_name)


def create_and_store_embedding(
    audio_uri: str,
    embedding_prefix: str,
    encoder: AudioEncoder,
):
    out_uri = create_embedding_uri(audio_uri, embedding_prefix, encoder.model_id)

    if s3.exists(out_uri):
        print(f"Embedding already exists at {out_uri}")
        return

    s3.download(audio_uri, "temp_audio")
    embedding = encoder.embed_audio_file("temp_audio")

    np.save("temp_embedding.npy", embedding)
    s3.upload("temp_embedding.npy", out_uri)

    os.remove("temp_embedding.npy")
    os.remove("temp_audio")


In [3]:
encoder = MuQEncoder()

In [None]:
audio_data = list_files_on_s3(audio_prefix, "mp3")


start = time.time()
for i, audio_uri in enumerate(audio_data):
    create_and_store_embedding(audio_uri, embedding_prefix, encoder)

    if i % 100 == 0:
        sys.stdout.write(f"\rProcessed {i} files. Time taken: {time.time() - start}")
