In [1]:
import os
import pandas as pd
import numpy as np
from tqdm import tqdm
import redis
from redis import Redis
from redis.commands.search.query import Query
from redis.commands.search.indexDefinition import IndexDefinition, IndexType
from redis.commands.search.field import VectorField, TagField, TextField, NumericField

# Import Data

In [None]:
df = pd.read_csv("../fma-metadata/tracks.csv", index_col=0, header=[0,1])
df.head()

# Redis Connection

In [2]:
redis_conn = Redis(host=os.environ.get('REDIS_ADDRESS', 'localhost'), port=6379, password=os.environ.get('REDIS_PASSWORD', None))

# Redis Database Definition

### Definition of Fields

In [3]:
index_name = "audiosimilarity"
distance_metric:str="COSINE"
DIM = 100

track_title = TextField(name="track_title")
album_title = TextField(name="album_title")
artist_name = TextField(name="artist_name")
track_publisher = TextField(name="track_publisher")

album_tracks = NumericField(name="album_tracks")
bit_rate = NumericField(name="bit_rate")
duration = NumericField(name="duration")
genre_top = TextField(name="genre_top")

language_code = TextField(name="language_code")
album_date_released = TextField(name="album_date_released")

feature_vector = VectorField(index_name,
            "HNSW", {
                "TYPE": "FLOAT32",
                "DIM": DIM,
                "DISTANCE_METRIC": distance_metric,
                "INITIAL_CAP": 10000,
            })

### Create index

In [None]:
redis_conn.ft(index_name).create_index(
    fields = [track_title, album_title, artist_name, track_publisher, album_tracks, bit_rate, duration, genre_top, language_code, album_date_released, feature_vector],
    definition = IndexDefinition(prefix=[index_name], index_type=IndexType.HASH)
)

# Populate Database

In [None]:
for track_id, row in tqdm(df.iloc[:10,:].iterrows()):

    row = row.replace({pd.NaT: "null"})

    redis_conn.hset(
        f"{index_name}:{track_id}",
        mapping={
            "track_title": row["track", "title"],
            "album_title": row["album", "title"],
            "artist_name": row["artist", "name"],
            "track_publisher": row["track", "publisher"],
            "album_tracks":  row["album", "tracks"],
            "bit_rate": row["track", "bit_rate"],
            "duration": row["track", "duration"],
            "genre_top": row["track", "genre_top"],
            "language_code": row["track", "language_code"],
            "album_date_released": row["album", "date_released"],
            "feature_vector": np.random.rand(DIM).astype(dtype=np.float32).tobytes()
        }
    )

# Test Query

In [None]:
# redis_conn.keys()

In [None]:
def base_query(return_fields: list=[], number_of_results: int=20):
    base_query = f'*'
    # return_fields = ['track_title','album_title','"artist_name"']
    query = Query(base_query)\
        .paging(0, number_of_results)\
        .dialect(2)
    
    results = redis_conn.ft(index_name).search(query)

    if results.docs:
        return pd.DataFrame(list(map(lambda x: {'id': x.id, 'track' : x.track_title, 'album': x.album_title, 'artist': x.artist_name}, results.docs))).sort_values(by='id')
        # return results.docs
    else:
        return pd.DataFrame()

In [None]:
def execute_query(np_vector:np.array, return_fields: list=[], search_type: str="KNN", number_of_results: int=20, vector_field_name: str="embeddings"):
    base_query = f'=>[{search_type} {number_of_results} @{vector_field_name} $vec_param AS vector_score]'
    query = Query(base_query)\
        .sort_by("vector_score")\
        .paging(0, number_of_results)\
        .return_fields(return_fields)\
        .dialect(2)

    params_dict = {"vec_param": np_vector.astype(dtype=np.float32).tobytes()}

    results = redis_conn.ft(index_name).search(query, params_dict)
    return pd.DataFrame(list(map(lambda x: {'id' : x.id, 'text': x.text, 'filename': x.filename, 'vector_score': x.vector_score}, results.docs)))