# ATS Tracker MilvusDB Ops

In [1]:
import os
import pandas as pd
import numpy as np
from tqdm.notebook import tqdm
from PyPDF2 import PdfReader

In [48]:
from pymilvus import connections, Collection, CollectionSchema, FieldSchema, DataType, utility, AnnSearchRequest, WeightedRanker
from pymilvus.model.hybrid import BGEM3EmbeddingFunction

In [3]:
# paths
milvusdb_uri="../db/milvus/ats_tracker.db"
resumes_uri="../data/master_data/resumes/v1.0/"
resumes_zipped_metadata_uri="../data/master_data/resumes/v1.0/resumes_metadata.tar.gz"

## Read Resumes

In [4]:
def read_pdf(pdf_path:str)->str:
    """reads pdf and return them as a string"""
    reader=PdfReader(pdf_path)
    txt=""
    for page in reader.pages:
        txt+="\n"+page.extract_text()        
    return txt

In [46]:
resumes=[]
for (root, dirs, files) in tqdm(os.walk(resumes_uri)):
    for f in files:
        if ".pdf" in f:         
            resume_domain=os.path.basename(os.path.normpath(root))
            resume_id=f.replace(".pdf", "")
            resume_uri=os.path.join(root, f)            
            resume=read_pdf(resume_uri)
            
            resumes.append([resume_id, resume_domain, resume_uri, resume])

print("resume size: ", len(resumes))

0it [00:00, ?it/s]

resume size:  2484


In [47]:
df_resumes=pd.DataFrame(resumes, columns=["resume_id", "resume_domain", "resume_uri", "resume"])
df_resumes.head()

Unnamed: 0,resume_id,resume_domain,resume_uri,resume
0,10554236,ACCOUNTANT,../data/master_data/resumes/v1.0/ACCOUNTANT/10...,\nACCOUNTANT\nSummary\nFinancial Accountant sp...
1,10674770,ACCOUNTANT,../data/master_data/resumes/v1.0/ACCOUNTANT/10...,\nSTAFF ACCOUNTANT\nSummary\nHighly analytical...
2,11163645,ACCOUNTANT,../data/master_data/resumes/v1.0/ACCOUNTANT/11...,\nACCOUNTANT\nProfessional Summary\nTo obtain ...
3,11759079,ACCOUNTANT,../data/master_data/resumes/v1.0/ACCOUNTANT/11...,\nSENIOR ACCOUNTANT\nExperience\nCompany Name\...
4,12065211,ACCOUNTANT,../data/master_data/resumes/v1.0/ACCOUNTANT/12...,\nSENIOR ACCOUNTANT\nProfessional Summary\nSen...


In [None]:
df_resumes.to_csv("../data/processed_data/resumes.csv", index=False)

## BGE-M3 Embedding

In [35]:
# please set the use_fp16 to False when you are using cpu.
# by default the return options is:
#  return_dense True
#  return_sparse True
#  return_colbert_vecs False 
bge_m3 = BGEM3EmbeddingFunction(
    model_name='BAAI/bge-m3',   # specify the model name
    device='cpu',               # specify the device to use, e.g., 'cpu' or 'cuda:0'
    use_fp16=False              # specify whether to use fp16. Set to `False` if `device` is `cpu`.
)

Fetching 30 files:   0%|          | 0/30 [00:00<?, ?it/s]

In [4]:
def encode_documents(docs):
    """BGE-M3 returns both dense and sparse encodings"""
    embeddings = bge_m3.encode_documents(docs)
    return embeddings["dense"], list(embeddings["sparse"])

## MilvusDB Ops

### Connect to MilvusDB

In [8]:
connections.connect(uri=milvusdb_uri)

### Create Milvus Collection

In [9]:
dense_dim=1024 # len(dense_embeds)

In [10]:
# define field names and their data types
pk_field = "doc_id"
id_field = "id"
domain_field = "domain"
dense_field = "dense_vector"
sparse_field = "sparse_vector"

fields = [
    FieldSchema(name=pk_field, dtype=DataType.VARCHAR, is_primary=True, auto_id=True, max_length=100),
    FieldSchema(name=id_field, dtype=DataType.VARCHAR, max_length=100),
    FieldSchema(name=domain_field, dtype=DataType.VARCHAR, max_length=200),
    FieldSchema(name=dense_field, dtype=DataType.FLOAT_VECTOR, dim=dense_dim),
    FieldSchema(name=sparse_field, dtype=DataType.SPARSE_FLOAT_VECTOR),
]

# create a collection with the defined schema
schema = CollectionSchema(fields)

In [11]:
# create milvus collection
collection_name = "ats_tracker_resumes_collection"

if utility.has_collection(collection_name):
    Collection(collection_name).drop()

collection = Collection(collection_name, schema, consistency_level="Strong")

# To make vector search efficient, we need to create indices for the vector fields
sparse_index = {"index_type": "SPARSE_INVERTED_INDEX", "metric_type": "IP"}
collection.create_index(sparse_field, sparse_index)

dense_index = {"index_type": "AUTOINDEX", "metric_type": "IP"}
collection.create_index(dense_field, dense_index)

collection.load()

### Insert Resumes

In [36]:
# define batch size
batch_size=20

def process_batch(batch):
    entities=[]

    for _, row in batch.iterrows():
        id=row["resume_id"]
        domain=row["resume_domain"]
        dense_embeds, sparse_embeds=encode_documents([row["resume"]])

        # convert csr matrix to dictionary
        row_index=0
        sparse_dictionary={idx: val for idx, val in zip(sparse_embeds[row_index].indices, sparse_embeds[row_index].data)}

        # create entity with correct field names
        entity={
            id_field: id,
            domain_field: domain,
            dense_field: dense_embeds[0],
            sparse_field: sparse_dictionary,
        }

        entities.append(entity)

    # upsert the batch of entities into Milvus
    if len(entities) > 0:
        resp = collection.insert(entities)

    return resp

def divide_chunks(df, batch_size):
    # yield successive n-sized chunks from the dataframe
    for i in range(0, len(df), batch_size):
        yield df.iloc[i:i + batch_size]

# process and insert the df_resumes in batches
for batch in tqdm(divide_chunks(df_resumes, batch_size)):
    # process each batch
    resp=process_batch(batch)

    # flush to ensure data is persisted
    collection.flush()

# load the collection into memory for querying
collection.load()

0it [00:00, ?it/s]

You're using a XLMRobertaTokenizerFast tokenizer. Please note that with a fast tokenizer, using the `__call__` method is faster than using a method to encode the text followed by a call to the `pad` method to get a padded encoding.


## Resumes Filter

In [42]:
def resumes_filter(
    collection,
    query_dense_embedding,
    query_sparse_embedding,
    sparse_weight=1.0,
    dense_weight=1.0,
    limit=10,
):
    dense_search_params={"metric_type": "IP", "params": {}}
    dense_req=AnnSearchRequest(
        [query_dense_embedding], "dense_vector", dense_search_params, limit=limit
    )
    sparse_search_params={"metric_type": "IP", "params": {}}
    sparse_req=AnnSearchRequest(
        [query_sparse_embedding], "sparse_vector", sparse_search_params, limit=limit
    )
    rerank=WeightedRanker(sparse_weight, dense_weight)
    resp=collection.hybrid_search(
        [sparse_req, dense_req], rerank=rerank, limit=limit, output_fields=["id"]
    )[0]
    return [hit.get("id") for hit in resp]

In [43]:
query="python developer"

In [45]:
dense_query_embeds, sparse_query_embeds=encode_documents([query])

# convert csr matrix to dictionary
row_index=0
sparse_dictionary={idx: val for idx, val in zip(sparse_query_embeds[row_index].indices, sparse_query_embeds[row_index].data)}

results=resumes_filter(
    collection,
    dense_query_embeds[0],
    sparse_dictionary,
    sparse_weight=0.5,
    dense_weight=0.5,
)

print("Resumes search results: ", results)
print("\n")

Resumes search results:  ['20674668', '71767359', '62994611', '26503829', '12632728', '26746496', '14771530', '16186411', '19557384', '12144825']


