In [1]:
import time
from milvus import default_server
from pymilvus import (
    connections,
    FieldSchema, CollectionSchema, DataType,
    Collection,
    utility
)
from utils import extract_sample_data, split_into_batches

2023-05-16 13:37:31.981142: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX512F AVX512_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
# clean up any lingering data in milvus server
default_server.cleanup()

# startup milvus server
default_server.start()

# Set up milvus server
HOST = "127.0.0.1"
PORT = default_server.listen_port

# Spin up server (remember to close after)
connections.connect(host=HOST, port=PORT)
print("Connections: ", connections.list_connections())

# set up collection
COLLECTION_NAME = "a2o_bioacoustics"

# Drop collection if exists
if utility.has_collection(COLLECTION_NAME): 
    collection = Collection(COLLECTION_NAME)
    collection.drop()

# define collection fields
id_field = FieldSchema(
    name="id", 
    dtype=DataType.INT64, 
    descrition="primary field", 
    is_primary=True, 
    auto_id=True
)

embedding_field = FieldSchema(
    name="embedding", 
    dtype=DataType.FLOAT_VECTOR, 
    description="Float32 vector with dim 1280", 
    dim=1280,
    is_primary=False
)
file_timestamp_field = FieldSchema(
    name="file_timestamp", 
    dtype=DataType.INT64, 
    description="File timestamp (in seconds since 1970-01-01T00:00:00)"
)
offset_field = FieldSchema(
    name="offset", 
    dtype=DataType.INT64, 
    description="Offset (in seconds) from start of file where embedding window starts"
)
site_id_field = FieldSchema(
    name="site_id", 
    dtype=DataType.INT64, 
    description="Site ID", 
)
site_name_field = FieldSchema(
    name="site_name", 
    dtype=DataType.VARCHAR, 
    description="Site name", 
    max_length=1000
)
subsite_name_field = FieldSchema(
    name="subsite_name", 
    dtype=DataType.VARCHAR, 
    description="Subsite name", 
    max_length=1000
)
file_seq_id_field = FieldSchema(
    name="file_seq_id", 
    dtype=DataType.INT64, 
    description="File sequence ID", 
)
filename_field = FieldSchema(
    name="filename", 
    dtype=DataType.VARCHAR, 
    max_length=1000
)

schema = CollectionSchema(
    fields=[
        id_field,
        embedding_field, 
        file_timestamp_field,
        offset_field, 
        site_id_field, 
        site_name_field, 
        subsite_name_field, 
        file_seq_id_field, 
        filename_field
    ], 
    description="Collection for searching A20 bird embeddings"
)
collection = Collection(
    name=COLLECTION_NAME, 
    data=None,
    schema=schema, 
    # Set TTL to 0 to disable
    properties={"collection.ttl.seconds": 0}
)
print(f"Collections: {utility.list_collections()}")
print(f"Collection {COLLECTION_NAME} instantiated with {collection.num_entities} entities")

data = extract_sample_data()
    
# split data into batches of 10_000 for insertion into Milvus collection
# TODO: find documentation on why this is necessary, I did this to try 
# to get around the kernel dying when trying to insert the entire 
# collection at once
for _batch in split_into_batches(data): 

    #insert 
    collection.insert(
        [
            [_data[fieldname] for _data in _batch] 
            for fieldname in (
                "embedding",
                "file_timestamp",
                "offset",
                "site_id",
                "site_name",
                "subsite_name", 
                "file_seq_id", 
                "filename"
            )
        ]
    ) 

collection.flush()
print(f"Collection {COLLECTION_NAME} currently loaded with {collection.num_entities} entities")

TimeoutError: Milvus not startd in 30.0 seconds

In [None]:
%%time
index_params = {
    "index_type": "FLAT",
    "params":{},
    "metric_type": "L2"
}
collection.create_index("embedding", index_params)
print(f"Created index {collection.index().params}")

In [None]:
# randomly select a set of vectors to use as search
# vectors to compare the different indexing strategies
search_vectors = [data[i]["embedding"] for i in np.random.choice(range(len(data)), size=100)]

In [None]:
%%time
collection.load()

search_param = {
    "data": search_vectors,
    "anns_field": "embedding",
    "param": {"metric_type": "L2", "params": {}},
    "limit": 100,
}

reference_results = collection.search(**search_param)
collection.release()

In [None]:
# Result set is a 2D list with dims (100,100). 
# For each of the 100 input vectors, the inner list contains the 100
# "closest" vectors
print([v[0:5] for v in reference_results[0:5]])

# the result set is a custom Milvus type:
print(f"{type(reference_results)}[{type(reference_results[0])}[{type(reference_results[0][0])}]]")

In [None]:
def score_recall(search, reference, at=100):
    return len([i for i in search.ids[:at] if i in reference.ids[:at]])/at
    
# returns average recall@1, recall @10 and recall@100 
# for each search result (compared to the reference result
# set above)
def score(search_results):
    scores = [
        [score_recall(search,ref,at=i) 
        for i in (1,10,100)]
        # the search results class should be iterable, however something 
        # was affecting the orders of the ids when zipping the search
        # result class, so I convert them first to a List[Hits].
        for search, ref in zip(list(search_results), list(reference_results))
    ]
    return np.mean(np.array(scores), axis=0)

# Assert scoring method works by scoring the 
# reference set against itself.
score(reference_results)

In [None]:
def evaluate_index(index_name, index_params, search_params):
    if len(collection.indexes): 
        print(f"Dropping current index on field: {collection.index().field_name} -> {collection.index().params}")
        collection.drop_index()
        print(f"Indexes remaining after dropping: {collection.indexes}")
    
    print(f"Creating new index: {index_name} with params {index_params}")
    # create new index: 
    index_params = {
        "index_type": index_name,
        "params":index_params,
        "metric_type": "L2"
    }

    start = time.time()
    collection.create_index("embedding", index_params)
    index_build_time = round((time.time()-start), 2)
    
    start = time.time()
    collection.load()
    print(f"Loading collection took {round((time.time()-start), 2)} seconds")
    
    search_param = {
        "data": search_vectors,
        "anns_field": "embedding",
        "params": search_params,
        "limit": 100,
    }
    
    start = time.time()
    search_results = collection.search(**search_param)
    search_time = round((time.time()-start), 2)
    
    collection.release()
    recall_score = score(search_results)
    
    return {
        "index_build_time": index_build_time, 
        "search_time": search_time, 
        "search_scores":{
            "recall@1": recall_score[0],
            "recall@10": recall_score[1],
            "recall@100": recall_score[2]
        }
    }

In [None]:
# Evaluate 
ivf_flat_eval = evaluate_index(
    index_name="IVF_FLAT", 
    index_params={"nlist": 1024}, 
    search_params={"metric_type": "L2", "params": {"nprobe": 16}}
)
ivf_flat_eval

In [None]:
# create index IVF_SQ8 - good mix of improved speed + reduced memory footprint
# NOTE: further investigate alternative indexing strategies: 
# HSNW/ANNOY for search time reduction
# Product Quantization and PCA dimensionality reduction to reduce memory footprint
ivf_sq8_eval = evaluate_index(
    index_name="IVF_SQ8", 
    index_params={"nlist": 1024}, 
    search_params={"metric_type": "L2", "params": {"nprobe": 16}}
)
ivf_sq8_eval

In [None]:
ivf_pq4_eval = evaluate_index(
    index_name="IVF_PQ", 
    index_params={"nlist":1024, "m": 128, "nbits":8}, 
    search_params={"metric_type": "L2", "params": {"nprobe":16}}
)
ivf_pq4_eval

In [None]:
# Next steps: 
# - include data consumption (ram/disk) for each indexing stratgy
# - include various options for building ivf_pq4 index (m/nbits)
# - Add PCA pre-computation step to index evaluation 