In [10]:
import pickle
from docuverse.utils import open_stream
from pymilvus import (
    MilvusClient,
    DataType,
    connections,
    utility
)


cache_file="/home/raduf/.local/share/elastic_ingestion/benchmark__beir_dev__quora____en__corpus.small.jsonl_512_100_True_all_gte-small.pickle.xz"
import json
data=pickle.load(open_stream(cache_file))
questions = [
    "How can I get free gems in Clash of Clans?",
    "How can I get free gems Clash of Clans?",
    "How do you feel when someone upvotes your answer on Quora?",
    "What are the best thriller movie in Hollywood?",
    "What should someone do to overcome anxiety?"
]

In [2]:
MODEL = ("/home/raduf/sandbox2/docuverse/models/slate.30m.english.rtrvr")
data_list = []
from docuverse.utils.embeddings.dense_embedding_function import DenseEmbeddingFunction
model = DenseEmbeddingFunction(MODEL)

=== done initializing model


In [3]:
embeddings = model.encode([d['text'] for d in data], show_progress_bar=True)
keys_to_keep = {"text"}
data_list = [{**{k:v for k, v in d.items() if k in keys_to_keep}, '_id': d['id'], 'qembedding':embeddings[i]} for i,d in enumerate(data)]

Batches:   0%|          | 0/301 [00:00<?, ?it/s]

In [4]:
data_list[0].keys()

dict_keys(['text', '_id', 'qembedding'])

In [27]:
def test_search(vectors, vector_for_query=None, metric="IP", reingest=False, milvus_server_addr="test.db",
                use_connections=False, ingest_batch_size=-1, collection_name="test3", vector_field_name="qembedding"):
    truncate_dim = 384
    if ingest_batch_size < 0:
        ingest_batch_size = len(vectors)

    if vector_for_query is None:
        entities = vectors
        test = vectors[0:3]
    else:
        entities = vectors
        if isinstance(vector_for_query, list) and isinstance(vector_for_query[0], dict):
            test = [d[vector_field_name] for d in vector_for_query]# [{vector_field_name: e} for e in vector_for_query]
        else:
            test = vector_for_query

    if use_connections:
        client = connections
        init, host, port = milvus_server_addr.split(":")
        host = host.replace("//", "")
        client.connect(host=host, port=port)
        client1 = MilvusClient(milvus_server_addr)
    else:
        client = MilvusClient(milvus_server_addr)
        client1 = client


    if reingest or not client.has_collection(collection_name=collection_name):
        schema = client1.create_schema(auto_id=True, enable_dynamic_field=True, primary_field="id")

        schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
        schema.add_field(field_name="text", datatype=DataType.VARCHAR, max_length=50000)
        schema.add_field(field_name="_id", datatype=DataType.VARCHAR, max_length=50000)
        schema.add_field(field_name=vector_field_name, datatype=DataType.FLOAT_VECTOR, dim=truncate_dim)

        index_params = client1.prepare_index_params()
        index_params.add_index(
            field_name=vector_field_name,
            index_type="FLAT",
            metric_type=metric,
            index_name=vector_field_name,
            params={"nlist": 1024}
        )

        client1.drop_collection(collection_name=collection_name)
        client.create_collection(
            collection_name=collection_name, schema=schema, index_params=index_params
        )
        for i in range(0, len(vectors), ingest_batch_size):
            client.insert(collection_name=collection_name, data=entities[i:i + ingest_batch_size])
        # insert_result = client.insert(collection_name=collection_name, data=entities)
        # print({k: v for k, v in insert_result.items() if k != 'ids'})
        client.load_collection(collection_name=collection_name)
        ingested_items = 0
        connections.connect(host="localhost", port=19530)
        utility.wait_for_index_building_complete(collection_name=collection_name, index_name=vector_field_name)
        # print(
        #     client.count_entities(collection_name=collection_name)
        # )
        tm = timer()
        start = time.time()
        while ingested_items < len(vectors)-1:
            res = client.get_collection_stats(collection_name=collection_name)
            ingested_items = res["row_count"]
            print(f"{tm.time_since_beginning()}: Currently ingested items: {ingested_items}")
            time.sleep(10)
        print(f"Ingested in {tm.time_since_beginning()} seconds.")
        print(client.list_indexes(collection_name=collection_name))
        print(client.describe_index(collection_name=collection_name, index_name=vector_field_name))

    return client.search(
        collection_name=collection_name,
        # data=[t[vector_field_name] for t in test],
        data=test,
        #data=test,
        search_params={"metric_type": metric, "params": {"nprobe": 100, "efSearch": 128}},
        # anns_field=vector_field_name,
        limit=10,
        output_fields=["text", "_id"],
    )

def print_answer(q, res):
    print(f"Question: {q['text'] if isinstance(q, dict) else q}")
    if len(res) == 0:
        print("  ** No results found. **")
    else:
        for r in res:
            print({'id': r['entity']['_id'], 'answer': r['entity']['text'], 'score': r['distance']})
        print("\n")

def test_setup(milvus_server_addr="test.db", reingest=False, use_connections=False, collection_name="test3", vector_field_name="qembedding", metric="IP"):
    global questions, score
    print(f"Testing {milvus_server_addr}")
    questions = data_list
    answers = test_search(data_list, questions, reingest=reingest, milvus_server_addr=milvus_server_addr,
                          use_connections=use_connections, collection_name=collection_name, vector_field_name=vector_field_name,
                          metric=metric)
    score = 0
    total = len(questions)

    for q, res in zip(questions, answers):
        print(res)
        if isinstance(q, dict):
            if q['_id'] in [r['entity']['_id'] for r in res[0:3]]:
                score += 1
            else:
                if len(res) > 0:
                    print_answer(q, res)
        else:
            print_answer(q, res)
    print(f"Score: {score * 1.0 / total:.2f}")



In [28]:
online_milvus = "http://localhost:19530"
file_milvus = "test.db"
use_connections = True
conns = {
    "quora_docuverse": ("beir_quora_small_milvus_dense_512_100_08292024", "embeddings", "COSINE"),
    "test": ("test3", "qembedding", "IP")
}
test = conns["quora_docuverse"]
keys_to_keep = {"text"}
data_list = [{**{k:v for k, v in d.items() if k in keys_to_keep}, '_id': d['id'], test[1]:embeddings[i]} for i,d in enumerate(data)]
test_setup(online_milvus, reingest=False, use_connections=False, collection_name=test[0], vector_field_name=test[1], metric=test[2])

Testing http://localhost:19530
[{'id': 452755057875654967, 'distance': 0.07759928703308105, 'entity': {'text': 'Why does my cat like to jump on my leg?'}}, {'id': 452755057875651688, 'distance': 0.07471269369125366, 'entity': {'text': 'What is Brahmakumaris?'}}, {'id': 452755057875652495, 'distance': 0.06591494381427765, 'entity': {'text': 'Do coffee cure headaches?'}}, {'id': 452755057875650110, 'distance': 0.06302182376384735, 'entity': {'text': 'Is (space-) time quantized or continuous?'}}, {'id': 452755057875650941, 'distance': 0.06174270436167717, 'entity': {'text': 'I know the email account used for Facebook but I forgot my password and the phone number I used is no longer active. How do I recover it?'}}, {'id': 452755057875648732, 'distance': 0.06101927161216736, 'entity': {'text': 'Will India end up having a Muslim majority in the future? What will be the social, economical and political consequences if that happens?'}}, {'id': 452755057875648202, 'distance': 0.0606855824589729

KeyError: '_id'

In [26]:
data_list[0].keys()

dict_keys(['text', '_id', 'embeddings'])

In [24]:
for q, res in zip(questions, answers):
    print(f"Question: {q['text']}")
    if len(res) == 0:
        print("  ** No results found. **")
    else:
        for r in res:
            print({'id': r['entity']['_id'], 'answer': r['entity']['text'], 'score': r['distance']})
        print("\n")

Question: I was suddenly logged off Gmail. I can't remember my Gmail password and just realized the recovery email is no longer alive. What can I do?
{'id': '117-0-139', 'answer': "I was suddenly logged off Gmail. I can't remember my Gmail password and just realized the recovery email is no longer alive. What can I do?", 'score': 1.000000238418579}
{'id': '106577-0-51', 'answer': 'What is the best music to listen to while studying?', 'score': 0.9982950091362}
{'id': '348001-0-30', 'answer': 'What is New Zealand known for?', 'score': 0.7475979328155518}
{'id': '383079-0-123', 'answer': 'How many days will it take to dispatch pan card after application has received at income tax office for further processing?', 'score': 0.741649866104126}
{'id': '124606-0-30', 'answer': 'How do I play cards by myself?', 'score': 0.7245960235595703}
{'id': '416946-0-42', 'answer': 'What are the best ways to confront a liar?', 'score': 0.6999878883361816}
{'id': '471345-0-108', 'answer': "I am a engineerin

In [23]:
questions[0]['text']

"I was suddenly logged off Gmail. I can't remember my Gmail password and just realized the recovery email is no longer alive. What can I do?"

In [13]:
client = MilvusClient("http://localhost:19530")

In [14]:
client.list_collections()

['ibmsw_gte_small_512_100_1889300',
 'beir_quora_hybrid_milvus_bm25_512_100_08292024',
 'beir_quora_hybrid_milvus_dense_512_100_08292024',
 'beir_quora_new_milvus_hybrid_512_100_08292024',
 'test1',
 'test3',
 'beir_fiqa_milvus_bm25_512_100_08292024',
 'hello_sparse',
 'test_collection',
 'beir_quora_milvus_hybrid_512_100_08292024',
 'test2',
 'beir_quora_small_milvus_dense_512_100_08292024']

In [18]:
client.get_collection_stats("beir_quora_small_milvus_dense_512_100_08292024")

{'row_count': 9618}

In [19]:
client.list_indexes("beir_quora_small_milvus_dense_512_100_08292024")

['embeddings']

In [83]:
client.describe_collection("test1")

{'collection_name': 'test1',
 'auto_id': True,
 'num_shards': 1,
 'description': '',
 'fields': [{'field_id': 100,
   'name': 'id',
   'description': '',
   'type': <DataType.INT64: 5>,
   'params': {},
   'auto_id': True,
   'is_primary': True},
  {'field_id': 101,
   'name': 'text',
   'description': '',
   'type': <DataType.VARCHAR: 21>,
   'params': {'max_length': 50000}},
  {'field_id': 102,
   'name': '_id',
   'description': '',
   'type': <DataType.VARCHAR: 21>,
   'params': {'max_length': 50000}},
  {'field_id': 103,
   'name': 'qembedding',
   'description': '',
   'type': <DataType.FLOAT_VECTOR: 101>,
   'params': {'dim': 384}}],
 'aliases': [],
 'collection_id': 452890847312229091,
 'consistency_level': 2,
 'properties': {},
 'num_partitions': 1,
 'enable_dynamic_field': True}

In [47]:
from pymilvus import utility, connections
connections.connect(host="localhost", port='19530')

In [81]:
utility.index_building_progress("test2", "qembedding")


{'total_rows': 0,
 'indexed_rows': 0,
 'pending_index_rows': 0,
 'state': 'Finished'}

In [66]:
client.get_collection_stats("test")

{'row_count': 9617}

In [78]:
client.describe_index("test1", "qembedding")

{'nlist': '1024',
 'index_type': 'FLAT',
 'metric_type': 'IP',
 'field_name': 'qembedding',
 'index_name': 'qembedding',
 'total_rows': 0,
 'indexed_rows': 0,
 'pending_index_rows': 0,
 'state': 'Finished'}

In [76]:
client.drop_collection("test3")

In [None]:
client.describe_index("beir_quora_small_milvus_dense_512_100_08292024", "embeddings")

In [33]:
client.flush()

AttributeError: 'MilvusClient' object has no attribute 'flush'