In [1]:
# import libraries
# %pip install -U python-dotenv langchain langchain_community langchain_mongodb nltk langchain-ollama langchain-pinecone pinecone-notebooks einops langgraph langchain_huggingface

assuming you have a JSON copy of the data   
Data aquisition:   
import the json data

In [2]:
# read the json file
import json
from pathlib import Path
from dotenv import load_dotenv
import os
load_dotenv()
# load a json file as data
data  = json.loads(Path("../data/data.json").read_text())

Data ingestion:   
since it is better to supply the full context of structured and unstructured data fetched in the future,   
we will store the raw text in mongodb first   
and apply preprocessing later when we need semantically/meaning accurate search results (important for vectorization)

In [3]:
# Connect to your MongoDB Atlas(Cloud) cluster
from pymongo import MongoClient
client = MongoClient(os.environ["MONGODB_URI"])
db = client[os.environ["MONGODB_DB"]]

In [4]:
#specify outer level json objects as mongodb collection names, then based on the specified names, create new collections and insert corresponding data

#specify all the collections
collection_names = []
for key, value in data.items():
    collection_names.append(key)
    # if type(value) != str:
    #     value = json.dumps(value)
    # print(key+":    "+value)
    
#fetch existing collections
ignored_and_existing_collections = db.list_collection_names()

#specify the collections to ignore
if "questions_and_answers2" not in ignored_and_existing_collections:
        ignored_and_existing_collections.append("questions_and_answers2")
if "exclusions2" not in ignored_and_existing_collections:
        ignored_and_existing_collections.append("exclusions2")

for collection_name in collection_names:
    if collection_name in ignored_and_existing_collections:
       continue
    collection = db[collection_name]
    if isinstance(data[collection_name], list):
        print(f"""
inserting many records of {data[collection_name]} into {collection_name} collection
""")
        collection.insert_many(data[collection_name])
    else:
        print(f"""
inserting one record of {data[collection_name]} into {collection_name} collection
""")
        collection.insert_one(data[collection_name])

# separately insert the vector targets
collection = db["vector_targets"]
vector_target_records = list(collection.find())
if len(vector_target_records) == 0:
        print(f"""
inserting one record of {data["vector_targets"]} into {collection_name} collection
""")
        collection.insert_many(data["vector_targets"])        
else:
    print("vector_targets already exists in the database, skipping insertion")

print("insertion complete")

ignored_and_existing_collections = None
del ignored_and_existing_collections
collection_names = None
del collection_names
vector_target_records = None
del vector_target_records
data = None
del data


vector_targets already exists in the database, skipping insertion
insertion complete


further preprocessing:   
lowercasing   
special character removal

stop words removal

lemmatization

In [5]:
#use nltk to lowercase, remove special characters, tokenize, remove stopwords, and lemmatize the values in the json data
#fetch all data cleaning resources
import nltk
import re
nltk.download('wordnet')

from nltk.corpus import stopwords, wordnet
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer

#initialize the lemmatizer
lemmatizer = WordNetLemmatizer()
#initialize the stopwords
stop_words = set(stopwords.words('english'))
#allow specific stopwords
stop_words.remove("both")


def preprocess_text(text):
    pattern = re.compile(r"[;@#&*!()\[\]]")
    def get_wordnet_pos(tag):
        match tag[0]:
            case 'J':
                return wordnet.ADJ
            case 'V':
                return wordnet.VERB
            case 'R':
                return wordnet.ADV
            case _:
                return wordnet.NOUN
    # Tokenize
    tokens = word_tokenize(text)
    pos_tags = nltk.pos_tag(tokens)
    # Remove stop words and lemmatize
    processed_tokens = [
        lemmatizer.lemmatize(word.lower(), get_wordnet_pos(tag)) 
        for word, tag in pos_tags 
        if word.lower() not in stop_words and not pattern.match(word)
    ]
    # Join tokens back to a string
    return ' '.join(processed_tokens)

#setup a function to recursively preprocess the json data
def traverse_json(data, json_text_action, target_key=None):
    if isinstance(data, dict):
        for key, value in data.items():
            if isinstance(value, str) and (target_key is None or key == target_key):
                # Preprocess the string value
                data[key] = json_text_action(value)
            elif isinstance(value, dict) or isinstance(value, list):
                # Recursively preprocess the dictionary or list
                traverse_json(value, json_text_action)
    elif isinstance(data, list):
        for i, element in enumerate(data):
            if isinstance(element, str):
                # Preprocess the string in the list and save it in the original position
                data[i] = json_text_action(element)
            elif isinstance(element, dict) or isinstance(element, list):
                # Recursively preprocess the dictionary or list
                traverse_json(element, json_text_action)
    return data

# preprocess_text("i need information on performing funds. how much my premium is allocated? the allocation rate, the pricing plan, how much will i be paying per premium payment, what is the coverage performance, what are the fundings, testing the word post-apocalyptic, 80 90% 20$")

[nltk_data] Downloading package wordnet to /Users/damonng/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


In [6]:
# Connect to your pinecone vector database and specify the working index
from pinecone import Pinecone
pc = Pinecone(os.environ["PINECONE_API_KEY"])
index = pc.Index(os.environ["PINECONE_INDEX_NAME"])

  from tqdm.autonotebook import tqdm


data embedding//vectorization

target descriptive values and generate embeddings for semantic search

In [7]:
# Load the embedding model (https://huggingface.co/nomic-ai/nomic-embed-text-v1.5")
# %pip install sentence-transformers scipy
from sentence_transformers import SentenceTransformer

global_embedding_model = SentenceTransformer("nomic-ai/nomic-embed-text-v1.5", trust_remote_code=True)

number_of_dimensions = 768 #the embedding dimensions according to the model documentation

<All keys matched successfully>


In [8]:
#specify the target fields to embed
from copy import deepcopy

def vectorize(texts):
    "vectorize the provided texts using a globally declared model."    
    vectors = global_embedding_model.encode(texts)
    return vectors

ignore_collections = ["vector_targets", "premium_plans","important_notice","total_investment_estimations","premium_allocations","test2", "test","questions_and_answers2","exclusions2"]

def vectorize_and_index(mongo_database=db, vector_index=index, ignore_collections=ignore_collections, debug=False):
    """vectorize the target fields and store the embeddings in the pinecone index.
    
current implmentation does not support converting mongo collection name to pinecone namespaces."""
    if mongo_database == None:
        raise ValueError("mongo_database is required, the mongo database should also have a vector_targets collection specifying the target fields to vectorize")
    if not vector_index:
        raise ValueError("vector_index is required")
    vectors = []
    for collection_name in mongo_database.list_collection_names():
        if collection_name in ignore_collections:
            continue                                            #skip collections that should not to be vectorized
        if debug:
            print(f"In {collection_name}")
        results = list(mongo_database[collection_name].find())
        targets = list(mongo_database.vector_targets.find({"name": collection_name}))
        for record in results:
            if targets:
                for target in targets:                              #iterate through the target fields from the same collection
                    old_data = deepcopy(record[target['field']])              #store original info
                    traverse_json(record, preprocess_text, target['field'])
                    cleaned_data = record[target['field']]                     #clean the data
                    vector = vectorize(record[target['field']])                #store the vectorized embeddings
                    if debug:
                        print(f"original: {old_data}\ncleaned : {cleaned_data}\nlength  : {len(vector)}\nvector  : {vector}")
                    if len(vector) == number_of_dimensions: #check if the vectorized embeddings are of the correct length
                        vectors.append({"id": str(record['_id'])+"-"+collection_name+"-"+target['field'], 
                                        "values": vector
                                        ,"metadata": {"original":cleaned_data}
                                        })
                        pass
                    else:                                           
                        #if the vectorized embeddings length not correct, possibility of other datatypes, iterate through the iterable
                        for j , sub_vector in enumerate(vector):
                            if debug:
                                print(len(sub_vector),cleaned_data[j])
                            vectors.append({"id": str(record['_id'])+"-"+collection_name+"-"+target['field']+"-"+str(j), 
                                            "values": sub_vector
                                            ,"metadata": {"original":cleaned_data[j]}
                                            })
            else:
                if debug:
                    print("no matching target, skipping vectorization")
                continue
            
    # vector_index.upsert(vectors=vectors)

# vectorize_and_index(debug=True)


In [18]:
#using local(ollama) base model as chatbot model
from langchain_ollama.chat_models import ChatOllama as Ollama
intent_classifier = Ollama(model='llama3.2',#:1B
                           temperature=0.1,
                           num_predict=4,
                           request_timeout=10,
                           verbose=False )
chatbot_model = Ollama(model='llama3.2', request_timeout=60)

In [10]:
#setup the semantic search function
def semantic_search(question, model=global_embedding_model, index=index, top_k=6, verbose=False):
    """vectorizes the question and queries the pinecone index for the top 3 closest matches.

current implementation does not support querying multiple namespaces or using mongoDB indexes.

Args:
    question (str): question string used to query the pinecone index
    verbose (bool, optional): flag to turn on debug mode. Defaults to False.
    model (SentenceTransformer, optional): model used for vectorizing a cleaned question. Defaults to global_embedding_model.
    index (Pinecone.Index, optional): the provided pinecone index. Defaults to index.

Returns:
    list(QueryResponse): the top closest query results from the pinecone index
    """
    if not index:
        print("Index not found, please specify your pinecone or mongo search index")
        return
    if not model:
        print("Model not found")
        return
    if verbose:
        print("Encoding question...")
        
    vectors = model.encode(question)
    
    if not isinstance(vectors, list): # Ensure vectors is a list of floats
        vectors = vectors.tolist()
    matches = []
    spaces = index.describe_index_stats()['namespaces']
    for key, value in spaces.items():
        res = index.query(
            namespace=key,
            vector=vectors,
            top_k=top_k,
            include_metadata=False,
            include_values=False
        )
        matches.append(res)
        print(res['usage'])
    return matches

# Example usage
response = semantic_search("who manages this product")
response

{'read_units': 5}


[{'matches': [{'id': '67171ded2ca774a8bfd83375-questions_and_answers-answer-2',
               'score': 0.499227405,
               'values': []},
              {'id': '67171ded2ca774a8bfd83372-questions_and_answers-answer',
               'score': 0.469530821,
               'values': []},
              {'id': '67171dec2ca774a8bfd8334d-advantages-advantage',
               'score': 0.464928329,
               'values': []},
              {'id': '6717364e2ca774a8bfd83399-funds-description-2',
               'score': 0.463731647,
               'values': []},
              {'id': '67171dec2ca774a8bfd8334b-critical_illnesses-disclaimer',
               'score': 0.460437477,
               'values': []},
              {'id': '67171ded2ca774a8bfd83352-requirements-description',
               'score': 0.454422921,
               'values': []}],
  'namespace': '',
  'usage': {'read_units': 5}}]

In [11]:
#setup a function to receive semantic search results and return the collection name and ids
def get_collection_matches(response : list, verbose=False) -> list:
    """based on the response from a pinecone semantic search, extract and consolidate the collection name and ids of the matching documents.

    Args:
        response (list): a list of QueryResponse objects from a pinecone semantic search
        verbose (bool, optional): flag to use debug mode. Defaults to False.

    Returns:
        list: list of mongodb documents
    """
    if verbose:
        print("Getting collection matches...")
    document_metadata = []
    for collection_matches in response:
        collection = collection_matches['namespace']
        is_using_default_namespace = False
        if collection == "":
            is_using_default_namespace = True
            if verbose:
                print("no pinecone namespace found, checking id or additional metadata")
        ids = []
        for match in collection_matches['matches']:
            if verbose:
                print(match)
            data_from_id = match['id'].split("-")
            match_id = data_from_id[0]
            if is_using_default_namespace:
                collection = data_from_id[1]
            #filter out duplicate ids
            if match_id not in ids:
                ids.append(match_id)
            else:
                if verbose:
                    print(f"Duplicate id {match_id} for {collection} found in fetch list, ignoring")
                continue
            if is_using_default_namespace:
                document_metadata.append({'collection':collection, 'id':match_id})
        if not is_using_default_namespace:
            document_metadata.append({'collection':collection, 'ids':ids})
        
    return document_metadata

document_metadata = get_collection_matches(response, verbose=True)

Getting collection matches...
no pinecone namespace found, checking id or additional metadata
{'id': '67171ded2ca774a8bfd83375-questions_and_answers-answer-2',
 'score': 0.499227405,
 'values': []}
{'id': '67171ded2ca774a8bfd83372-questions_and_answers-answer',
 'score': 0.469530821,
 'values': []}
{'id': '67171dec2ca774a8bfd8334d-advantages-advantage',
 'score': 0.464928329,
 'values': []}
{'id': '6717364e2ca774a8bfd83399-funds-description-2',
 'score': 0.463731647,
 'values': []}
{'id': '67171dec2ca774a8bfd8334b-critical_illnesses-disclaimer',
 'score': 0.460437477,
 'values': []}
{'id': '67171ded2ca774a8bfd83352-requirements-description',
 'score': 0.454422921,
 'values': []}


In [12]:
#find the mongo documents based on their collection and ids
from bson.objectid import ObjectId as oid 
def find_documents(collection_matches : list, verbose=False, database=db) -> list:
    """use the collection matches to find the corresponding documents in the specified mongo database.

    Args:
        collection_matches (list): list of document metadata containing collection names and ids
        verbose (bool, optional): flag to turn on debug mode. Defaults to False.
        database (pymongo.synchronous.database.Database, optional): the mongodb database to use. Defaults to the db variable.

    Returns:
        list: list of mongodb documents
    """
    if verbose:
        print("Finding documents...")
    documents = []
    for collection_match in collection_matches:
        collection = database[collection_match['collection']]
        if 'ids' in collection_match:
            for id in collection_match['ids']:
                documents.append(collection.find_one({"_id": oid(id)}))
        else:
            documents.append(collection.find_one({"_id": oid(collection_match['id'])}))
    return documents

documents = find_documents(document_metadata)
documents

[{'_id': ObjectId('67171ded2ca774a8bfd83375'),
  'question': 'What are the current fees and charges?',
  'answer': ['Insurance charges are Applicable to the sum assured, and vary depending on the average age profile and claim experience of the scheme.',
   'Monthly Policy Fee is RM5.00',
   'Fund Management Charge is 0.50% per annum. Note: The fees and charges levied may change from time to time.']},
 {'_id': ObjectId('67171ded2ca774a8bfd83372'),
  'question': 'Can the Assured Member/spouse/children apply to contribute more',
  'answer': 'Yes. The Assured Member/spouse/children will be required to reapply by completing a standard Group Proposal form, subject to approval by the Company and up to the maximum benefit allowed'},
 {'_id': ObjectId('67171dec2ca774a8bfd8334d'),
  'advantage': 'Automatic premium remittance via credit card / bank deduction / JomPay ensures continuous protection.'},
 {'_id': ObjectId('6717364e2ca774a8bfd83399'),
  'name': 'Dana Gemilang',
  'description': ['The 

the previous 3 functions only allow access to unstructured data, but ignores structured data like tables   
add a component for fetching structured data

In [13]:
#setup the chatbot model for tool use
from langchain_core.tools import tool
import re

@tool
def fetch_metrics(question: str) -> list:
    """Retrieves only JSON of table data for either premium plans, total investment estimations, premium allocation or fund performance based on the user query."""
    cleaned_question = preprocess_text(question)
    documents = []
    has_fetched_preimum_plans = False
    has_fetched_total_investment_estimation = False
    has_fetched_fund = False
    has_fetched_premium_allocation = False
    for keyword in re.findall(r'\b\w+\b', cleaned_question):
        match keyword:
            case "premium" | "plan" | "coverage" | "price" | "pricing":
                if not has_fetched_preimum_plans:
                    print("fetching premium plans")
                    
                    documents.append(list(db["premium_plans"].find()))
                    has_fetched_preimum_plans = True
            case "investment" | "value" | "allocation":
                if not has_fetched_total_investment_estimation:
                    print("fetching investment plans")
                    
                    documents.append(list(db["total_investment_estimations"].find()))
                    has_fetched_total_investment_estimation = True
            case "performance" | "perform" | "fund":
                if not has_fetched_fund:
                    print("fetching fund performance")
                    
                    documents.append(list(db["funds"].find()))
                    has_fetched_fund = True
            case "allocation":
                if not has_fetched_premium_allocation:
                    print("fetching premium allocation")
                    
                    documents.append(list(db["premium_allocations"].find()))
                    has_fetched_premium_allocation = True
            case _:
                continue

    return documents
print()
@tool
def get_context(question: str, debug: bool = False) -> list:
    """Retrieves text-based information for an insurance product only based on the user query.
does not answer quwstions about the chat agent"""
    print(question)
    matches = semantic_search(question, verbose=debug)
    document_data = get_collection_matches(matches, verbose=debug)
    context = find_documents(document_data, verbose=debug)
    return context

tools = [fetch_metrics,
        get_context
        ]
toolPicker = Ollama(model='llama3.2').bind_tools(tools)
# for group in fetch_metrics("i need information on performing funds, how much my premium is allocated, the allocation rate, the pricing plan, how much will i be paying per premium payment, what is the coverage performance, what are the fundings"):
#     for document in group:
#         print(document)

# fetch_metrics("i need information on performing funds. how much my premium is allocated? the allocation rate, the pricing plan, how much will i be paying per premium payment, what is the coverage performance, what are the fundings, testing the word post-apocalyptic, 80 90% 20$")




In [19]:
#setup the RAG workflow
from langchain_core.prompts import ChatPromptTemplate
# from langchain_core.output_parsers import StrOutputParser as parser
# from langchain.schema.runnable import RunnablePassthrough
from pydantic import ValidationError

def RAG(query : str, verbose : bool | None = False):
    if query in ["exit", "quit", "bye", "end", "stop", ""]:
        return
    if verbose:
        tool_query=query+" (Turn on debug mode)"
    else:
        tool_query=query
    messages = [query]
    
    ai_msg = toolPicker.invoke(tool_query)
    for tool_call in ai_msg.tool_calls:
        selected_tool = {"fetch_metrics": fetch_metrics,
                        "get_context": get_context
                        }[tool_call["name"].lower()]

        tool_msg = None 
        try:
            tool_msg = selected_tool.invoke(tool_call)
        except ValidationError as e:
            print(f"Error: {e.json()}")
        print(f"called {tool_call["name"].lower()} with {tool_call['args']}")
        messages.append(f"connected {tool_call["name"].lower()} function returned {tool_msg.content}")

    if verbose:
        for message in messages:
            print(message)

    
    return chatbot_model.stream(f"""fullfill the query with the provided information
query      :{query}
information:{messages}""")

In [15]:
#setup a simple intent classifier
def classify_intent(user_input:str) -> str:
    """classify the intent of the user input

current implementation uses a lightweight model (llama3.2:1B)
with few-shot prompting examples
for classifying 'normal', 'register', 'RAG' intents

"""
    return intent_classifier.invoke(f"""Classify the given input, answer only with 'normal','register','RAG':

example:

Input: "Is there a contact number", Intent: RAG
Input: "How do I create a new account", Intent: register
Input: "How do I make a claim", Intent: RAG
Input: "Search the web for cat videos", Intent: normal
Input: "Help me register for this service", Intent: register
Input: "Where can I get started", Intent: register
Input: "What is your name", Intent: normal
Input: "What entities are attached to this service", Intent: RAG
Input: "What is your purpose", Intent: normal
Input: "Can I see some fund performance metrics", Intent: RAG
Input: "What is the weather in your country", Intent: normal
Input: "How can i register for an account", Intent: register
Input: "Who owns the product", Intent: RAG
Input: "Tell me how to subscribe", Intent: register
Input: "Guide me through the registration process", Intent: register
Input: "How do I sign up for the trial", Intent: register
Input: "Can you explain your features", Intent: normal
Input: "Sign me up", Intent: register
Input: "Can you assist me with enrolling", Intent: register
Input: "What services does the product provide", Intent: RAG
Input: "Where can i wash my dog", Intent: normal
Input: "Goodbye", Intent: normal
Input: "How do i pay for the service", Intent: RAG
Input: "how is my premium allocated", Intent: RAG
Input: "Hello", Intent: normal
Input: "What are the coverage options", Intent: RAG
Input: "What's the first step to register", Intent: register
Input: "What funds are involved", Intent: RAG
Input: "Where are you located", Intent: normal
Input: "Who do I contact for help", Intent: RAG
Input: "Can I pay with a credit card", Intent: RAG
Input: "Why should i trust you", Intent: normal
Input: "What should i get ready for enrollment", Intent: register
Input: "How are you", Intent": normal
Input: "What company distributes this service", Intent: RAG
Input: "Are there any additional charges", Intent: RAG

Input: {user_input}""").content

In [20]:
def chat(user_input: str) -> None:
    intent = classify_intent(user_input)
    if intent == "RAG":
        stream = RAG(user_input)
    elif intent == "register":
        print("register function work in progress")
    else:
        stream = chatbot_model.stream(user_input)
    for chunk in stream:
        print(chunk.content, end="", flush=True)


In [21]:
chat("what are the covered medical conditions")

covered medical conditions
{'read_units': 5}
called get_context with {'debug': False, 'question': 'covered medical conditions'}
Based on the provided information, the covered medical conditions are:

1. Critical Illnesses:
   - Heart Attack - of specified severity
   - Stroke - resulting in permanent neurological deficit
   - Coronary Artery By-pass Surgery
   - Cancer - of specified severity
   - Kidney Failure - requiring dialysis or transplant
   - Fulminant Viral Hepatitis
   - Major Organ/Bone Marrow Transplant
   - Paralysis of Limbs
   - Multiple Sclerosis
   - Primary Pulmonary Arterial Hypertension
   - Blindness - Permanent and Irreversible
   - Heart Valve Surgery
   - Deafness - Permanent and Irreversible
   - Surgery to Aorta
   - Loss of Speech
   - Alzheimer's Disease / Severe Dementia
   - Third Degree Burns - of specified severity
   - Coma - resulting in permanent neurological deficit
   - Cardiomyopathy - of specified severity
   - Motor Neuron Disease
   - HIV Infec