In [1]:
import os, re, glob, json
import ollama
from pymilvus import MilvusClient
from pypdf import PdfReader

from typing import List

In [2]:
# ---------- Config ----------
OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "phi3:latest")
EMBED_MODEL  = os.getenv("EMBED_MODEL",  "nomic-embed-text")
DB_PATH      = os.getenv("MILVUS_LITE_PATH", "./milvus.db")
COLLECTION   = os.getenv("MILVUS_COLLECTION", "docs")
CHUNK_CHARS  = int(os.getenv("CHUNK_CHARS", "1200"))
CHUNK_OVER   = int(os.getenv("CHUNK_OVERLAP", "200"))
TOP_K        = int(os.getenv("TOP_K", "5"))

In [27]:

def read_pdf(path:str):

    '''
    Read input PDF file, pagewise, and return text :str
    '''
    
    out = []
    r = PdfReader(path)
    
    for page in r.pages:

        t = page.extract_text() or ""
        out.append(t)

    return "\n".join(out)


def load_docs(folder: str):

    '''
    Load the .pdf and .txt documents
    '''
    docs = []

    for path in glob.glob(os.path.join(index,"**","*"),recursive=True):

        if not os.path.isfile(path):

            continue

        ext = os.path.splitext(path)[1].lower().strip()

        try:
            if ext in [".pdf"]:
    
                text = read_pdf(path)
    
            elif ext in [".txt"]:
    
                continue
    
            else:
                continue
    
            if text.strip():
    
                docs.append({"doc_id": os.path.basename(path),
                            "text": text
                            })
        except Exception as e:
            print(e)
            print(f"[skip] {path}")
            
            
    return docs

            

        
    
def chunk_text(text: str, size=1200, overlap=200):

    '''
    Chunk the input text for the minimum given size (default: 1200 chars), with overlap (default: 200 chars) for the next
    '''

    sents = re.split(r'(?<=[\.\!\?])\s+', text.strip())

    # print("sentence level split text loss chgeck")
    # print(len(text),[len(i) for i in sents], sum([len(i) for i in sents]) )

    chunks, cur = [], ""

    for s in sents:

        
        if len(s)+len(cur) <= size:

            cur = cur+(" " if cur else "") + s

        else:

            if cur:
                chunks.append(cur)

            tail = cur[-overlap:] if overlap >0 else ""

            cur = (tail + " " + s).strip()

    if cur:
        chunks.append(cur)

    return chunks



def embed_texts(texts: List[str]):
    '''
    Embed the input batch of chunks
    '''

    resp = ollama.embed(model=OLLAMA_MODEL, input=texts)
    return resp["embeddings"]



client = MilvusClient(DB_PATH)

def ensure_collection(dimension: int):

    collections_list = client.list_collections()
    print("collections_list : ", collections_list)
    
    if COLLECTION not in collections_list:

        
        client.create_collection(
            
            collection_name = COLLECTION,
            dimension = dimension,
            metric_type="COSINE",
            index_params = {"index_type": "AUTOINDEX"},
            auto_id=True
            
        )

        print(f"[milvus] created collection {COLLECTION} , dim={dimension}")
        
                

def index_folder(folder: str):

    '''
    Create chunks, embed them and push records with doc_id, chunk_id into Milvus(DB_PATH)

    TO DO: Prepare cluster (on Oracle Cloud Free Tier) for Milvus instead of .db file
    '''
    
    docs = load_docs(folder)

    if not docs:

        print("[index] no documents found")

    print([i["doc_id"] for i in docs])


    #---------Chunk & Embed Documents -------------------------------

    batch  = []
    meta_batch = []
    sample_emb_dim = None
    records = []
    
    for d in docs:

        chunks = chunk_text(d["text"], CHUNK_CHARS, CHUNK_OVER)
        
        for i, ch in enumerate(chunks):

            batch.append(ch)

            meta_batch.append({"doc_id":d["doc_id"],
                              "chunk_id": i,
                               "text": ch
                              })
            
            if len(batch) >= 64:

                print("batch length touched: 64")
                vecs = embed_texts(batch)

                if sample_emb_dim is None:

                    sample_emb_dim = len(vecs[0])
                    ensure_collection(sample_emb_dim)

                for m,v in zip(meta_batch, vecs):

                    records.append({"vector":v, **m})
                
                batch, meta_batch = [], []
        
    if batch:
        print("Final batch length: ", len(batch))
        vecs = embed_texts(batch)

        print("Vectors Length",len(vecs))
        
        if not client.has_collection(COLLECTION):
            print("Collection Not found")
            ensure_collection(len(vecs[0]))

        for m,v in zip(meta_batch, vecs):
            records.append({"vector":v, **m})
        
        batch, meta_batch = [], []

    if records:

        client.insert(collection_name=COLLECTION, data=records)
        print(f"records are inserted into collection: {COLLECTION}, records length: {len(records)}")


In [33]:
SYSMTEM = ''' You are a helpful research assistant.
- When answering user questions, call the 'retrive_from_mivlus' tool to gather relevant context when needed.
- Cite each quote with {doc_id;chunk:id}.
- If nothing is found, say so and ask the user to add documents.
'''



def milvus_search(query: str, k: int=TOP_K):

    qvec = embed_texts([query])
    res = client.search(
        collection_name = COLLECTION,
        data=[qvec],
        limit=k,
        output_fields = ["doc_id", "chunk_id", "text"]
    )

    hits = res[0] if res else []

    # Normalize to compact a summary for the LLM

    out = []

    for h in hits:

        out.append({
            "doc_id": h.get("entity", {}).get("doc_id"),
            "chunk_id": h.get("entity", {}).get("chunk_id"),
            "score": h.get("distance"),
            "text": h.get("entity", {}).get("text"),
        })

    return out    

    

def retrive_from_milvus(query: str, k: int = TOP_K ):

    '''
    Retrive top k chunks from the milvus DB. Include score variable along with, doc_id, text and chunk_id
    '''

    return milvus_search(query, k)
    

def chat_once(question: str):

    '''
    '''

    messages ={
        {"role": "system", "content": SYSTEM},
        {"role": "user", "content": question}
    }

    response = ollama.chat({
        model = OLLAMA_MODEL,
        messages = messages,
        tools = [retrived_from_milvus],
    })



    
    

    

    

SyntaxError: expected '(' (4027182605.py, line 24)

In [34]:
# agent initialize

# if __name__ == "__main__":

#     import argparse

#     ap = argparse.ArgumentParser()

#     ap.add_argument("--index",type= str,help="Path to folder of docs to ingest")
#     ap.add_argument("--ask", type= str, help="ASk a question, to get an answer based on documents using LLM")

#     args = ap.parse_args()

#     if args.index:
#         index_folder(args.index)
#     if args.ask:
#         print(chat_once(args.ask))


index = "data"
ask = "Tell me about radhika"


''' TO DO: Check if paths are already processed, process only newly added files, prepare checkpoint file to keep track of 
processd file metadata
'''
# if index:
#     index_folder(index)
if ask:
    print(chat_once(ask))




NameError: name 'chat_once' is not defined

In [32]:
# client.list_collections()

In [25]:
# client.flush(COLLECTION)

In [26]:
# client.drop_collection(COLLECTION)