In [12]:
import nest_asyncio
import os
import pickle
import re
import uuid

from alive_progress import alive_bar
from dotenv import load_dotenv
from fastembed import TextEmbedding
from langchain_anthropic import ChatAnthropic
from langchain_openai import OpenAIEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from llama_index.core import Document
from llama_index.core.node_parser import MarkdownElementNodeParser
from llama_index.llms.anthropic import Anthropic
from llama_parse import LlamaParse
from pymilvus import (
    utility,
    CollectionSchema, DataType, FieldSchema, model,
    connections, Collection, AnnSearchRequest, WeightedRanker, RRFRanker,
)
from typing import List, Optional, Tuple

## Loading in of API Keys and Cloud Infrastructure
1. OpenAI (potentially for embedding models), Anthropic (for Claude 3.5 Sonnet), LLama (for LlamaParse)
2. Zillis' Endpoint and Token

In [13]:
OPENAI_API_KEY = os.getenv('OPENAI_API_KEY')
CLAUDE_API_KEY = os.getenv('CLAUDE_API_KEY')
LLAMA_API_KEY = os.getenv('LLAMA_API_KEY')

os.environ['OPENAI_API_KEY'] = OPENAI_API_KEY
os.environ["ANTHROPIC_API_KEY"] = CLAUDE_API_KEY
os.environ["LLAMA_CLOUD_API_KEY"] = LLAMA_API_KEY

ENDPOINT = os.getenv('ZILLIS_ENDPOINT')
TOKEN = os.getenv('ZILLIS_TOKEN')

connections.connect(uri=ENDPOINT, token=TOKEN)

In [14]:
llm = ChatAnthropic(
    model="claude-3-5-sonnet-20240620",
    max_tokens=4096,
    temperature=0.0,
    stop=["\n\nHuman"],
)

llama_llm = Anthropic(
    model="claude-3-5-sonnet-20240620",
    max_tokens=4096,
    temperature=0.0
)

In [15]:
nest_asyncio.apply()

## Collection Creation
1. Drop existing collection (if one exists)
2. Define Schema -> How your documents will be Ingested
3. Create Collection with Schema defined in 2.

In [16]:
# Specify the collection name
collection_name = "vector_index"

def drop_collection(collection_name):
    # Check if the collection exists
    if utility.has_collection(collection_name):
        collection = Collection(name=collection_name)
        # Release the collection
        collection.release()
        # Drop the collection if it exists
        utility.drop_collection(collection_name)
        print(f"Collection '{collection_name}' has been dropped")
    else:
        print(f"Collection '{collection_name}' does not exist")

# drop_collection(collection_name)

In [17]:
auto_id = FieldSchema(
    name="pk",
    dtype=DataType.INT64,
    is_primary=True,
    auto_id=True)

doc_id = FieldSchema(
    name="doc_id",
    dtype=DataType.VARCHAR,
    max_length=500
)

doc_source = FieldSchema(
    name="doc_source",
    dtype=DataType.VARCHAR,
    max_length=1000,
    default_value="NA"
)

doc_content = FieldSchema(
    name="text",
    dtype=DataType.VARCHAR,
    max_length=50000,
    default_value=""
)

vec_embeddings = FieldSchema(
    name="dense_embeddings",
    dtype=DataType.FLOAT_VECTOR,
    dim=1024
)

keyword_embeddings = FieldSchema(
    name="sparse_embeddings",
    dtype=DataType.SPARSE_FLOAT_VECTOR
)

In [18]:
schema = CollectionSchema(
  fields=[auto_id, doc_id, doc_content, doc_source, vec_embeddings, keyword_embeddings],
  description="milvus_schema",
  enable_dynamic_field=True
)

In [19]:
def create_collection(collection_name, schema):
    # Check if the collection exists
    if utility.has_collection(collection_name):
        print(f"Collection '{collection_name}' already exists")
    # Create the collection
    return Collection(name=collection_name, schema=schema, using='default', shards_num=2)

In [20]:
collection = create_collection(collection_name, schema)

Collection 'vector_index' already exists


In [21]:
bge_embed_model = TextEmbedding(model_name="BAAI/bge-large-en-v1.5")
openai_embed_model = OpenAIEmbeddings(model_name="text-embedding-3-large")
spalde_embed_model = model.sparse.SpladeEmbeddingFunction(
    model_name="naver/splade-cocondenser-ensembledistil",
    device="cpu",
)

Fetching 5 files:   0%|          | 0/5 [00:00<?, ?it/s]

                    model_name was transferred to model_kwargs.
                    Please confirm that model_name is what you intended.


## Pre-processing
1. Defining Helper functions for cleaning
2. Data Parsing using `LLamaParse` -> Node Parsing using MarkdownElementNodeParser. After this stage, Document objects are pickled
3. Chunk long texts using appropriate technique.

In [22]:
def remove_table_of_contents(text):
    pattern = r"TABLE OF CONTENTS.*?(?=#)"
    cleaned_text = re.sub(pattern, "", text, flags=re.DOTALL)
    return cleaned_text.strip()

def convert_nodes_to_documents(text_nodes, object_nodes, source):
    """
    Converts nodes to Documents

    Args:
        text_nodes (List[Nodes]): List of text nodes
        object_nodes (List[Nodes]): List of object nodes
        source (str): Source of the document

    Returns:
        documents (List[Documents]): List of Documents
    """
    documents = []
    for node in text_nodes:
        text = node.text
        doc = Document(
            text= text,
            metadata = {
                "is_table": False,
                "source": source
            }
        )
        documents.append(doc)
        
    for node in object_nodes:
        text = node.text
        doc = Document(
            text= text,
            metadata = {
                "is_table": True,
                "source": source
            }
        )
        documents.append(doc)
        
    return documents

In [23]:
# instantiate doc parser
parser = LlamaParse(
    result_type="markdown",
    num_workers=8,
    verbose = True,
    language="en",
)

# instantiate node parser
node_parser = MarkdownElementNodeParser(llm=llama_llm, num_workers=8)

In [24]:
# create folder to store parsed data
data_folder = "data"
os.makedirs(data_folder, exist_ok=True)

def parse_docs(file_location: str, data_folder: Optional[str] = None) -> List[Document]:
    """
    Parses PDF Folder and returns a list of Documents

    Args:
        file_location (str): PDF Folder Location
        data_folder (Optional[str], optional): Folder to save pickles (Optional). Defaults to None.

    Returns:
        List[Document]: _description_
    """
    all_docs = []
    for file_name in os.listdir(file_location):
        if not file_name.endswith(".pdf"):
            continue

        print("File: " + str(file_name))
        doc_path = os.path.join(file_location, file_name)
        modified_file_name = os.path.splitext(file_name)[0].lower().replace(' ', '_')

        # results in a list of Document Objects
        documents = parser.load_data(doc_path)
        
        for idx, doc in enumerate(documents):
            doc.text = remove_table_of_contents(doc.text)
            if idx > 4:
                break

        raw_nodes = node_parser.get_nodes_from_documents(documents)
        # list of text_nodes, list of objects
        text_nodes, objects = node_parser.get_nodes_and_objects(raw_nodes)
        
        final_docs = convert_nodes_to_documents(text_nodes, objects, modified_file_name)
        all_docs.append(final_docs)
        
        if data_folder:
            data_path = os.path.join(data_folder, modified_file_name + '.pkl')
            pickle.dump(final_docs, open(data_path, "wb"))
    
    return [item for sublist in all_docs for item in sublist]

In [37]:
file_location = "pdfs"

doc_list = parse_docs(file_location=file_location, data_folder=data_folder)

File: Diabetes Medications.pdf
Started parsing the file under job_id d19e906a-11eb-48e6-9777-5c980136bffc
..

0it [00:00, ?it/s]
1it [00:00, 15887.52it/s]
0it [00:00, ?it/s]


File: managing-pre-diabetes-(updated-on-27-jul-2021)c2bfc77474154c2abf623156a4b93002.pdf
Started parsing the file under job_id 97500a3f-44f8-4826-9e80-eddcce309858


0it [00:00, ?it/s]
1it [00:00, 11748.75it/s]
0it [00:00, ?it/s]
1it [00:00, 19599.55it/s]
3it [00:00, 41665.27it/s]
0it [00:00, ?it/s]


File: Diabetic Foot Ulcer_ Symptoms and Treatment.pdf
Started parsing the file under job_id 90905b25-f816-4f6e-b938-0a9434cea268


0it [00:00, ?it/s]
0it [00:00, ?it/s]


File: Diabetes Treatment_ Insulin.pdf
Started parsing the file under job_id 0d47bc63-fe00-4659-b2f4-adb9fe8f3af2


0it [00:00, ?it/s]
0it [00:00, ?it/s]
0it [00:00, ?it/s]
1it [00:00, 10782.27it/s]


## Reading of pickles after parsing

In [38]:
def read_pickles(data_folder: str) -> List[Document]:
    doc_list = []
    for file_name in os.listdir(data_folder):
        doc_path = os.path.join(data_folder, file_name)
        if file_name.endswith(".pkl"):
            with open(doc_path, 'rb') as file:
                # data will be a doc_list
                data = pickle.load(file)
                doc_list.append(data)
                
    # since doc_list is a list of list of documents, we need to flatten it
    doc_list = [item for sublist in doc_list for item in sublist]
    return doc_list

In [39]:
doc_list = read_pickles(data_folder)

In [40]:
doc_list

[Document(id_='c5007184-5a14-48c2-b267-df334bae0e90', embedding=None, metadata={'is_table': False, 'source': 'diabetic_foot_ulcer__symptoms_and_treatment'}, excluded_embed_metadata_keys=[], excluded_llm_metadata_keys=[], relationships={}, text='Diabetic Foot Ulcer: Symptoms and Treatment\n\n What causes diabetic ulcers?\n\nHere’s what you need to know in terms of self-care, prevention, and the treatment of diabetic ulcer.\n\n What Is a Diabetic Ulcer?\n\nA diabetic ulcer is an open sore or wound resulting from poor circulation or lack of sensation due to nerve damage caused by elevated blood glucose levels.\n\nThe legs and feet are most at risk for these ulcers. Diabetes makes it hard for the body to heal itself, increasing the risk of wounds becoming chronic and raising the risk of infection.\n\nSignificantly, nonhealing diabetic ulcers result in a large number of amputations in Singapore. About two major limb amputations are carried out daily to remove lower limbs affected by diabete

### Recursive splitting for longer texts

In [41]:
LONG_CHUNK_SIZE = 2000

def further_split_long_docs(doc_list: List[Document]) -> Tuple[List[Document], List[Document]]:
    long_docs, short_docs = [], []
    for doc in doc_list:
        is_table = doc.metadata["is_table"]
        if not is_table:
            if len(doc.text) > LONG_CHUNK_SIZE:
                long_docs.append(doc)
            else:
                short_docs.append(doc)
        else:
            short_docs.append(doc)
    return long_docs, short_docs

long_docs, short_docs = further_split_long_docs(doc_list)

In [42]:
def chunk_doc(doc: Document, text_splitter: RecursiveCharacterTextSplitter) -> List[Document]:
    chunks = text_splitter.split_text(doc.text)
    return [
        Document(
            text=chunk,
            metadata={
                'is_table': doc.metadata['is_table'],
                'source': doc.metadata.get('source', '')
            }
        )
        for i, chunk in enumerate(chunks)
    ]
    
def recursive_chunk_documents(long_docs: List[Document],
                              short_docs: List[Document], 
                              chunk_size: int = 1024, 
                              chunk_overlap: int = 128,
                              separators: List[str] = ["\n\n", "\n", " ", ""]) -> List[Document]:
    
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        separators=separators
    )

    for doc in long_docs:
        short_docs.extend(chunk_doc(doc, text_splitter))

    return short_docs

In [43]:
final_docs = recursive_chunk_documents(long_docs, short_docs)

In [44]:
final_docs

[Document(id_='c5007184-5a14-48c2-b267-df334bae0e90', embedding=None, metadata={'is_table': False, 'source': 'diabetic_foot_ulcer__symptoms_and_treatment'}, excluded_embed_metadata_keys=[], excluded_llm_metadata_keys=[], relationships={}, text='Diabetic Foot Ulcer: Symptoms and Treatment\n\n What causes diabetic ulcers?\n\nHere’s what you need to know in terms of self-care, prevention, and the treatment of diabetic ulcer.\n\n What Is a Diabetic Ulcer?\n\nA diabetic ulcer is an open sore or wound resulting from poor circulation or lack of sensation due to nerve damage caused by elevated blood glucose levels.\n\nThe legs and feet are most at risk for these ulcers. Diabetes makes it hard for the body to heal itself, increasing the risk of wounds becoming chronic and raising the risk of infection.\n\nSignificantly, nonhealing diabetic ulcers result in a large number of amputations in Singapore. About two major limb amputations are carried out daily to remove lower limbs affected by diabete

In [45]:
def get_required_data(final_docs: List[Document]) -> Tuple[List[str], List[str], List[str]]:
    all_ids, all_texts, all_sources = [], [], []

    with alive_bar(len(final_docs), title='Metadata', force_tty=True) as bar:
        for doc in final_docs:
            doc_id = str(uuid.uuid4())
            text = doc.text
            source = doc.metadata['source']
            
            all_ids.append(doc_id)
            all_texts.append(text)
            all_sources.append(source)
            bar()
    
    return all_ids, all_texts, all_sources

all_ids, all_texts, all_sources = get_required_data(final_docs)

Metadata |████████████████████████████████████████| 37/37 [100%] in 0.1s (348.81


## Feeding texts (and tables) into respective Embedding Models
1. Dense Embeddings with BGE
2. Sparse Embeddings with SPLADE

In [46]:
def get_dense_and_sparse_embeddings(all_texts: List[str]):
    dense_embeddings_list = list(bge_embed_model.embed(all_texts))
    sparse_embeddings_list = spalde_embed_model.encode_documents(all_texts)
    
    return dense_embeddings_list, sparse_embeddings_list

In [47]:
dense_embeddings_list, sparse_embeddings_list = get_dense_and_sparse_embeddings(all_texts)

### Saving of respective embedding lists

In [None]:
with open('data/dense_embeddings.pkl', 'wb') as f:
    pickle.dump(dense_embeddings_list, f)
    
with open('data/sparse_embeddings.pkl', 'wb') as f:
    pickle.dump(sparse_embeddings_list, f)

## Batch Ingestion

In [51]:
def batch_ingestion(collection, final_docs):
    # Get required data
    all_ids, all_texts, all_sources = get_required_data(final_docs)
    # Get dense and sparse embeddings
    # dense_embeddings_list, sparse_embeddings_list = get_dense_and_sparse_embeddings(all_texts)
    
    # Start ingestion process
    data = [
        all_ids,
        all_texts,
        all_sources,
        dense_embeddings_list,
        sparse_embeddings_list
    ]
    # Set batch size
    batch_size = 10
    total_elements = len(data[0])  # All lists have the same length

    # Calculate the total number of batches
    total_batches = (total_elements + batch_size - 1) // batch_size

    # Initialize alive_bar with the total number of batches
    with alive_bar(total_batches, force_tty=True) as bar:
        for start in range(0, total_elements, batch_size):
            end = min(start + batch_size, total_elements)
            batch = [sublist[start:end] for sublist in data]
            collection.insert(batch)
            bar()

In [52]:
batch_ingestion(collection, final_docs)

Metadata |████████████████████████████████████████| 37/37 [100%] in 0.1s (347.55
|████████████████████████████████████████| 4/4 [100%] in 2.3s (1.36/s)          


## Creating of Index
1. Delete any existing Index
2. Create new Indexes

In [53]:
def drop_indexes(collection: Collection, index_names: List[str]) -> None:
    # Release or drop the existing collection index
    collection.release()
    for name in index_names:
        collection.drop_index(index_name=name)
        print(f"Index '{name}' has been dropped")

In [54]:
drop_indexes(collection, index_names=["sparse_embeddings", "dense_embeddings"])

Index 'sparse_embeddings' has been dropped
Index 'dense_embeddings' has been dropped


In [55]:
def create_all_indexes(collection: Collection) -> None:
    # Dense embeddings index
    collection.create_index(
        field_name="dense_embeddings",
        index_params={
            "metric_type": "COSINE",
            "index_type": "HNSW",
            "params": {
                "M": 5,
                "efConstruction": 512
            }
        },
        index_name="dense_embeddings_index"
    )
    
    print("Dense embeddings index created")

    # Sparse embeddings index
    collection.create_index(
        field_name="sparse_embeddings",
        index_params={
            "metric_type": "IP",
            "index_type": "SPARSE_INVERTED_INDEX",
            "params": {
                "drop_ratio_build": 0.2
            }
        },
        index_name="sparse_embeddings_index"
    )
    
    print("Sparse embeddings index created")
    collection.load()
    print("Collection loaded")

In [56]:
create_all_indexes(collection)

Dense embeddings index created
Sparse embeddings index created
Collection loaded


## Loading in of collection and setting up Hybrid Searches

In [57]:
collection = Collection(name=collection_name)

In [58]:
def hybrid_search(query: str) -> str:
    dense_embedding = list(bge_embed_model.query_embed(query))[0]
    sparse_embedding = list(spalde_embed_model.encode_queries([query]))
    
    search_results = collection.hybrid_search(
            reqs=[
                AnnSearchRequest(
                    data=[dense_embedding],  # content vector embedding
                    anns_field='dense_embeddings',  # content vector field
                    param={"metric_type": "COSINE", "params": {"M": 64, "efConstruction": 512}}, # Search parameters
                    limit=3
                ),
                AnnSearchRequest(
                    data=list(sparse_embedding),  # keyword vector embedding
                    anns_field='sparse_embeddings',  # keyword vector field
                    param={"metric_type": "IP", "params": {"drop_ratio_build": 0.2}}, # Search parameters
                    limit=3
                )
            ],
            output_fields=['doc_id', 'text', 'doc_source'],
            # Use WeightedRanker to combine results with specified weights
            # Alternatively, use RRFRanker for reciprocal rank fusion reranking
            rerank=RRFRanker(),
            limit=3
            )
    
    hits = search_results[0]
    
    context = []
    for res in hits:
        text = res.text
        source = res.doc_source
        context.append(f"Source: {source}\nContext: {text}")
    
    return "\n\n".join(context)

In [59]:
res = hybrid_search("How should i know whether i should throw away my insulin?")

In [60]:
print(res)

Source: diabetes_treatment__insulin
Context: Insulin should be stored in the lower part of the fridge, away from the freezer. You should always keep at least one extra bottle of each type of insulin.

Source

Source: diabetes_treatment__insulin
Context: Check your injection areas every few days. You can press gently and run your fingertips across each area. If there are any lumps, painful spots or any change of colour in the area, report these to your diabetes care team. Avoid using these areas until the problem stops.

 Timing of Insulin Injections

Inject your insulin 30 minutes before meals. The insulin will move into your blood to match the rise in the blood sugar level. You should eat regularly throughout the day. This is to avoid a low blood sugar levels caused by the injected insulin.

If you are using a new insulin called Lispro Insulin (Humalog®) you do not have to wait 30 minutes before eating your meal. Lispro insulin is absorbed and goes to work almost immediately after inj