# Milvus Database

experimenting with vector storage via- https://milvus.io/docs/v1.0.0/connect_milvus_python.md


In [1]:
sys.path.append("..") #to load in dir files

from milvus import Milvus, IndexType, MetricType, Status
from alexandria_utils.es_ds import * 


# Connect to the Server (running in Docker Presumably)
host = 'localhost'
port = '19530'

milvus = Milvus(host=host, port=port)


# Get a Couple of Vectors from Local Storage + Longformer API
pdfs = get_data("../data/pdfs.json")

In [80]:
# Endpoint Request (GPU Machine on Wifi)
import json
import requests
import numpy as np
import time

# Configure URL and Desired Endpoint
endpoint = '/summarize' #in base could pick from: ["summarize", "e-mlm", "e-lf", "keyphrase"]
endpoint = '/e-lf' #longformer Endpoint
url = "http://localhost:5003" + endpoint    #CPU local
url = "http://192.168.1.26:5003" + endpoint #GPU Machine
endpoint_url = url #posting data to wrong place!


# Sample Text for Request
txt = """
- cascaded diffusion models are capable of generating high quality images on the class-conditional ImageNet generation benchmark
- The cascaded model uses a combination of two super-resolution models to boost sample quality
- The models achieve FID scores of 63.02% and 84.06% for high-fidelity images
"""
payload = json.dumps({
    "input_text": txt
})
headers = {
    'Content-Type': 'application/json'
}


# POST the Request
response = requests.request("POST", url, headers=headers, data=payload)

def get_embedding(txt, normalize=False):
    if normalize:
        payload = json.dumps({"input_text": txt, "normalize_vecs": True}) #will return Vectors of Unit Len (sum of output~=1)
    else:
        payload = json.dumps({"input_text": txt})
    headers = {'Content-Type': 'application/json'}

    response = requests.request("POST", endpoint_url, headers=headers, data=payload)
    if response.status_code != 200:
        print(f"Error, Code Given: {response.status_code} -- sleeping 1 sec")
        time.sleep(1)

    vec = np.array(response.json()) #get JSON from Endpoint
    vec = vec.reshape(768) #endpoint has support for batch requests, remove unnecessary dimensions
    return vec



In [137]:

t = ["- wow", "- this", "- is cool"]
"\n".join(t).replace("- ", "")

'wow\nthis\nis cool'

In [96]:
from tqdm import tqdm

# Get Embeddings for Each of the Content Summaries
for url in tqdm(pdfs.keys()):

    txt = "\n".join(pdfs[url]["SUMMARY"])
    # vec = get_embedding(txt)

    payload = json.dumps({"input_text": txt})
    # payload = json.dumps({"input_text": txt, "normalize_vecs": True}) #normalized variant
    headers = {'Content-Type': 'application/json'}

    response = requests.request("POST", endpoint_url, headers=headers, data=payload)
    resj = response.json()
    

    if response.status_code != 200:
        print(f"Error, Code Given: {response.status_code} -- sleeping 1 sec")
        time.sleep(1)

    # Parse out Vec and Add to Temp Data Dict
    vec = np.array(resj) #get JSON from Endpoint
    vec = vec.reshape(768) #endpoint has support for batch requests, remove unnecessary dimensions
    pdfs[url]["VEC"] = vec
    # pdfs[url]["NORMVEC"] = vec #if using normalized version



100%|██████████| 9/9 [00:04<00:00,  2.07it/s]


In [4]:
pdfs

{'https://www.cs.princeton.edu/courses/archive/fall13/cos597E/papers/howtoread.pdf': {'TIMESTAMP': 1664211358862,
  'TITLE': 'howtoread',
  'SUMMARY': ['- researchers often waste effort reading research papers',
   '- This article outlines a three-pass method for reading papers',
   "- The method can be used to do a literature survey of a paper's content and contents",
   '- The three passes can be read in up to ten minutes each'],
  'TOKEN_COUNTS': {'paper': 43,
   'pass': 27,
   'read': 21,
   'papers': 18,
   'work': 10,
   'research': 9,
   'survey': 9,
   'understand': 9,
   'reading': 8,
   'approach': 8},
  'UID': 5874606664423525727,
  'VEC': array([ 1.32313028e-01, -2.59309351e-01,  1.09914623e-01, -2.74222856e-03,
          3.22457463e-01, -2.07663819e-01, -4.81771350e-01, -4.18430001e-01,
         -9.00319293e-02, -2.58529842e-01, -3.67496550e-01, -1.49059400e-01,
          2.10911095e-01, -2.90587455e-01,  1.09449416e-01, -1.16337143e-01,
         -3.89987528e-01, -8.155464

In [5]:
# Get Version of the Client
milvus.client_version()

'1.0.1'

#### Collections

A collection in Milvus is a group of vectors w the same dim/characteristics (presumably from the same model)


In [161]:
# Geta List of all the Collections currently in Milvus

status, vals = milvus.list_collections() 
vals


['ckg_test', 'test_non_norm', 'test_norm']

In [118]:
# Creating a New Collection
collection_params = {
    # "collection_name": "ckg_test",
    "collection_name": "test_non_norm",
    "dimension": 768,              #dim of the vecs in collection
    "index_file_size": 2048,       #int, between (1,024 MB by default and 4,096 MB)
    # "metric_type": MetricType.L2   #a metric from- https://milvus.io/docs/metric.md#Similarity-Metrics
    "metric_type": MetricType.IP   #requires normalization of Embeddings! (unit vecs)
}

milvus.create_collection(collection_params) #can pass in a config dict or fill in the func params manually
    #if a collection w the name exists, throws an `ILLEGAL_COLLECTION_NAME` error


Status(code=0, message='Create collection successfully!')

In [133]:
# Get Info on a Created Collection --> dim of vectors, file sizes, distance metric
milvus.get_collection_info("ckg_test") #default name for testing

(Status(code=0, message='Describe collection successfully!'),
 CollectionSchema(collection_name='ckg_test', dimension=768, index_file_size=2048, metric_type=<MetricType: L2>))

In [116]:
# Delete a Collection
milvus.drop_collection("ckg_test")

Status(code=0, message='Delete collection successfully!')

In [10]:
# Get Statistics of Data in Collection --> number of partitions | rows/vecs, index_names, etc.
milvus.get_collection_stats("ckg_test")

(Status(code=0, message='Success'),
 {'partitions': [{'row_count': 9,
    'segments': [{'data_size': 27720,
      'index_name': 'IDMAP',
      'name': '1668544671005882000',
      'row_count': 9}],
    'tag': '_default'}],
  'row_count': 9})

#### Partitions


In [140]:
# Creating Partitions 

milvus.create_partition("ckg_test", "application/pdf") #this is how we could separate the different schemas (filter search/added vecs)


Status(code=0, message='OK')

In [39]:
# Listing all the Named Partitions

milvus.list_partitions("ckg_test") #"_default" is always present, can use these to filter searches?


(Status(code=0, message='Success'),
 [(collection_name='ckg_test', tag='_default'),
  (collection_name='ckg_test', tag='application/pdfs')])

In [None]:
# Deleting a Partition

milvus.drop_partition("ckg_test", "application/pdfs") #dropped this, the pdf is not plural, example of removing a partition


#### Inserting Data/Vectors


In [141]:
# Insert Instances/Entities to a Collection

urls = list(pdfs.keys()) #need get vals for Upload by Key (url)
vecs = np.array([pdfs[url]["VEC"] for url in urls], dtype=float) #get vecs
normalized_vecs = np.array([pdfs[url]["NORMVEC"] for url in urls], dtype=float) 
uids = [int(pdfs[url]["UID"]) for url in urls] #get unique ids
# tmp_ids = [i for i in range(9)] #for testing if these work and not the 128-bit ids

# milvus.insert(collection_name="ckg_test", records=vecs, ids=uids) #cant add in 128bit ints, need downsize all
# milvus.insert(collection_name="ckg_test", records=vecs, ids=tmp_ids) #can supply really small ints, upper limit for milvus ids is 64bits

milvus.insert(collection_name="ckg_test", records=vecs, ids=uids, partition_tag="application/pdf") #add vecs to a specific partition

# TESTING OUT DIFFERNCE IN NORM VS NON-NORM
# milvus.insert(collection_name="test_norm", records=normalized_vecs, ids=uids) 
# milvus.insert(collection_name="test_non_norm", records=vecs, ids=uids) #cant add in 128bit ints, need downsize all

(Status(code=0, message='Add vectors successfully!'),
 [5874606664423525727,
  8265254332739437035,
  149267493308284940,
  8642705718651734456,
  2519719930320209347,
  3698312979005721555,
  3461076549439540103,
  6365688196714612853,
  5970674455019340766])

In [138]:
# Count number of Entities/Instances in a Collection

milvus.count_entities("ckg_test") #returns a tuple of; (request_status, num_entites) --> 2nd element is the count of vecs in the collection

(Status(code=0, message='Success!'), 0)

#### Searching 


In [144]:
# Sample Search (for minimal distance of a query vec to others in the DB)

query_text = """
Wasting time reading through academic papers
"""
query_embedding = get_embedding(query_text) #dim 768, from longformer -- matches the dim of our vector db
query_embedding = query_embedding.reshape(1, 768) #need a 2d array for search! (add an extra dim to a base vector)

# Search based on Distance Metric
status, results = milvus.search("ckg_test", 7, query_embedding, partition_tags=["application/pdf"])
milvus.search("ckg_test", 7, query_embedding, partition_tags=["application/pdf"]) #list of distances, top entry is Least far away (in metric space)
    #can lookup the original data to display semantic queries with the new ids

In [None]:
# Normalized Embeddings

query_embedding_norm = get_embedding(query_text, True) 
query_embedding_norm = query_embedding_norm.reshape(1, 768)
#  
milvus.search("test_norm", 9, query_embedding_norm) #list of distances, top entry is Least far away (in metric space)

In [None]:
# Check the Results from above and below versions of same query
uid_to_content(3461076549439540103, "../data/pdfs.json")

In [142]:
# Un-Normalized Embeddings

milvus.search("test_non_norm", 10, query_embedding) #list of distances, top entry is Least far away (in metric space)

(Status(code=0, message='Search vectors successfully!'),
 [
 [
 (id:8642705718651734456, distance:44.47812271118164),
 (id:5874606664423525727, distance:44.20343780517578),
 (id:8265254332739437035, distance:44.18317413330078),
 (id:2519719930320209347, distance:43.855403900146484),
 (id:5970674455019340766, distance:43.84170913696289),
 (id:3698312979005721555, distance:43.8220100402832),
 (id:3461076549439540103, distance:43.53544998168945),
 (id:6365688196714612853, distance:43.41592788696289),
 (id:149267493308284940, distance:43.39691162109375)
 ]
 ])

If we use [IP](https://milvus.io/docs/metric.md#Inner-product-IP) as a metric (as opposed to the Euclidean L2) then we NEED to normalize any embeddings that go into our vector store + any queries that we want to compare those vectors against 

For now euclidean seems to be the play, results returned from inner product are in reverse order (descending, least similar comes first?) + IP does not get the most relevant (anecdotally at least) result in the top position


In [78]:
len(vecs)
type(vecs[0])

numpy.ndarray

In [105]:
# This function assumes we know WHICH Content JSON to get data from --> moved into `data.py` in the utils dir
def uid_to_content(uid=None, path=None):
    if type(uid) == int:
        uid = str(uid) #need convert to string if reading from datafile (so match the uid)
    data = get_data(path) #from the ES controller file
    return data[uid]


uid_to_content(5874606664423525727, "../data/pdfs.json")


{'TIMESTAMP': 1664215209121,
 'TITLE': '1706.03762',
 'SUMMARY': ['- a new neural network architecture based solely on attention mechanisms',
  '- The Transformer is superior in quality and parallelizable to recurrent neural networks',
  '- It can be used to perform machine translation tasks with better results',
  '- The authors propose a new approach to sequence transduction models for language and machine translation'],
 'TOKEN_COUNTS': {'attention': 14,
  'output': 13,
  'sequence': 11,
  'models': 11,
  'model': 11,
  'positions': 11,
  'encoder': 10,
  'transformer': 10,
  'decoder': 9,
  'input': 9},
 'UID': 3461076549439540103,
 'URL': 'https://arxiv.org/pdf/1706.03762.pdf'}

In [75]:
results[0][1].id #example of indexing a search result for ids
results[0][1].distance #same but for the distance score

print(f"Search Result ID- {results[0][0].id} is {results[0][0].distance:.4f} away")


Search Result ID- 5874606664423525727 is 0.2953 away


In [None]:
def parse_milvus_res(results, out_val="ids"):
    """
    Util to parse out relevant values from a Milvus Search Result Object
    """
    if out_val:
        return [results[1][i].id for i in results]


In [16]:
np.array(vecs)
type(uids)

milvus.list_partitions("ckg_test")

(Status(code=0, message='Success'),
 [(collection_name='ckg_test', tag='_default')])

In [159]:
type(res[1][0])
len(res[1][0])


7