# General Imports

In [1]:
import os
import time
import re
import warnings
import gc

gc.collect()
warnings.filterwarnings('ignore')

# General Tasks

In [2]:
from dotenv import load_dotenv
load_dotenv(dotenv_path=".env")

google_search_project_api_key = os.getenv('google_search_api_key')
google_search_project_id = os.getenv('google_search_project_id')
google_gemini_api_key = os.getenv('google_gemini_api_key')

# LLM

In [3]:
from langchain_google_genai import ChatGoogleGenerativeAI

llm = ChatGoogleGenerativeAI(
        model='gemini-1.5-pro',
        temperature=0.9,
        google_api_key=google_gemini_api_key,
    )


import json
def string_to_json(string):
    try:
        string = string.content.replace('```json', '').replace('```', '').strip()
        json_obj = json.loads(string)
        return json_obj
    except json.JSONDecodeError:
        print("Invalid JSON string")
        return None
    

gc.collect()

# response = llm.invoke('how many calories are in 100 grams of wheat flour roti in asia?(Give me short and to the point answer only. do not add markdowns, etc)')
# print(response.content)

20

# Search Engine

In [4]:
import requests

def googlesearch_results(query:str, number_of_results:int=10):
    query = "how many calories are in 100 grams of wheat flour roti in asia?"
    google_search_url_template = f"https://www.googleapis.com/customsearch/v1?key={google_search_project_api_key}&cx={google_search_project_id}&q={query}&num={number_of_results}&gl=pk&cr=countryPK&hl=en&lr=lang_en"
    response = requests.get(google_search_url_template)
    results = []
    for item in response.json()['items']:
        results.append(item['link'])
    return results


from duckduckgo_search import DDGS
def duckduckgo_results(query:str, num_results:int=10):
    results = DDGS().text(
        query+" filetype:html",
        max_results=num_results, region="pk"
    )
    result_list = []
    for result in results:
        result_list.append(result["href"])
    return result_list

# from googlesearch import search
# def googlesearch_results(query:str, num_results:int=10):
#     result_obj = search(
#         query, num_results=num_results,
#         lang="en"
#     )
#  
#     result_list = []
#     for i in result_obj:
#         result_list.append(i)
#     return result_list

def query_list_to_urls_results(query_list:list, num_results_per_query:int=3, search_engine_func:callable=None):
    """
    Get web results from a list of queries
    INPUT:
        query_list: list of queries
        num_results_per_query: number of results per query
        search_engine_func: search engine function
    OUTPUT:
        web_results: list of web results
    
    Example:
        web_results_urls_list = query_list_to_web_results_as_urls(json_res['search_query_list'], 5, duckduckgo_results)
    """
    
    web_results = []
    for query in query_list:
        web_results.extend(search_engine_func(query, num_results_per_query))
    return list(set(web_results))

gc.collect()

0

In [5]:
# web_results_urls = duckduckgo_results("how many calories are in 100 grams of wheat flour roti in asia?", 4)

# Text Loading

In [7]:
from langchain.document_loaders import WebBaseLoader
import nest_asyncio
nest_asyncio.apply()

def load_webpages(website_url:list, async_mode:bool=False, requests_per_second:int=2):
    """
    Load webpages from a list of urls

    INPUT:
        website_url: list of urls
    OUTPUT:
        langchain document object
    """
    loader = WebBaseLoader(website_url, continue_on_failure=True, verify_ssl=False)
    if async_mode:
        loader.requests_per_second = requests_per_second
        return loader.aload()
    else:
        return loader.load()


def text_cleaner(text:str):
    """
    Clean text from html tags, extra spaces, newlines, etc
    INPUT:
        text: string
    OUTPUT:
        cleaned_text: string
    """
    cleaned_text = re.sub(r"\n{3,}", "\n\n", text)
    cleaned_text = re.sub(r"\xa0|\r|\t", " ", cleaned_text)
    # cleaned_text = re.sub(r"\s+", " ", cleaned_text)
    cleaned_text = re.sub(r"\s{2,}", " ", cleaned_text)
    cleaned_text = re.sub(r"<[^>]+>", "", cleaned_text)
    return cleaned_text

def langchain_document_cleaner(document_obj):
    """
    Clean page_content of langchain document object
    INPUT:
        document: langchain document object
    OUTPUT:
        cleaned_document: langchain document object
    """
    for i in range(len(document_obj)):
        document_obj[i].page_content = text_cleaner(document_obj[i].page_content)
    return document_obj

def loader_with_cleaner(website_url:list, async_mode:bool=False, requests_per_second:int=2):
    """
    Load webpages from a list of urls and clean them
    INPUT:
        website_url: list of urls
    OUTPUT:
        langchain document object
    """
    return langchain_document_cleaner(load_webpages(website_url, async_mode, requests_per_second))




In [8]:
# documents = loader_with_cleaner(web_results_urls, async_mode=True, requests_per_second=3)
# gc.collect()

# Creating Chunks

In [9]:
from langchain.text_splitter import RecursiveCharacterTextSplitter

# all-MiniLM-L12-v2 model has a limit of 256 words. ==> 4 x 256 = 1024 characters max
def splitter(documents, chunk_size:int=900, chunk_overlap:int=200):
    """
    Get chunks of text from the documents
    INPUT:
        documents: langchain document objects
        chunk_size: size of each chunk
        chunk_overlap: overlap between chunks
    OUTPUT:
        chunks: langchain document objects
    """
    s = RecursiveCharacterTextSplitter(chunk_size=900, chunk_overlap=200)
    chunks = s.split_documents(documents)
    return chunks

In [10]:
# chunks = splitter(documents)

# Embeddings

In [11]:
from langchain_community.embeddings import HuggingFaceEmbeddings
# embeddings = HuggingFaceEmbeddings(model_name="all-MiniLM-L12-v2", cache_folder="temp")
embeddings = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2", cache_folder="temp")
gc.collect()

embeddings.embed_query("Warm Up")[:3]

  warn_deprecated(


[-0.06030871719121933, 0.023757167160511017, 0.03897421434521675]

# Vector DB

In [12]:
from langchain_community.vectorstores import FAISS

async def get_vDB(chunks, embeddings, folder_path:str=None, index_name:str=None, async_mode:bool=False):
    '''
    Get vector database from chunks
    INPUT:
        chunks: langchain document objects
        embeddings: langchain embeddings object
        folder_path: path to save the index
        index_name: name of the index
        async_mode: bool
    OUTPUT:
        vector database
    '''
    if async_mode:
        db = await FAISS.afrom_documents(chunks, embeddings)
    else:
        db = FAISS.from_documents(chunks, embeddings)
    if folder_path and index_name:
        db.save_local(folder_path=folder_path, index_name=index_name)
        print(f"Index saved successfully at {folder_path}/{index_name}")
    return db


def get_similar_docs(query_list:list[str], db, docs_per_query:int=5):
    """
    Get similar documents from the query
    INPUT:
        query: string
        db: vector database
        k: number of similar documents
    OUTPUT:
        similar_docs: list of similar documents
    """
    docs_list = []
    for query in query_list:
        docs_list.extend(db.similarity_search(query, docs_per_query))
    return docs_list

def documents_to_contextText(documents):
    temp = []
    context_text = ""
    for doc in documents:
        page_content = doc.page_content
        if page_content in temp:
            continue
        context_text += f"CONTENT:\n'{page_content}'\n\n"
        temp.append(page_content)
    return context_text

In [13]:
# db = await get_vDB(chunks, embeddings, folder_path="vdbs", index_name="index_2", async_mode=True)
# gc.collect()


# Prompts

In [53]:
from langchain_core.prompts import PromptTemplate

prompt_template = PromptTemplate.from_template(
    '''You are an expert content writer and RAG specialist. Your task is to:
1. Rephrase the user query into list of concise search string optimized for search engines. Use multiple search queries if needed. and return as a list of strings.
2. "Use general values in search queries which have higher chances of existance on search engines, not specific numbers which is difficult to find (Example: if user asks for finding price of 340 grams of potatos, then you can search for general/standard/easy to find value such as 100 grams or 1 kilogram). 
3. Then calculate the result for specific numbers from the general/standard/easy to find value from search results. (Like instead of searching for "calories in 340 grams of wheat flour roti in asia", search for "calories in 100 grams of wheat flour roti in asia" and then calculate the value for 500 grams using values fom 100 grams)
4. Maximum number of search queries should be 4.
5. IMPORTANT: Always give answer as a valid JSON object with keys: "original_query", "search_query_list". Make sure the JSON is valid.

Example:
User Query: "I'm feeling tired all the time. What foods can help?"
Answer:
{{
    "original_query": "how many calories are in 500 grams of wheat flour roti in asia?",
    "search_query": ["calories in 100g wheat flour roti in aisa",
                      "calories in 1kg wheat flour roti in asia",
                      "calories in wheat flour roti in asia"],
}}

####################################################
User Query: "{user_query}"
Answer:

    '''
)


prompt_template_2 = PromptTemplate.from_template(
    '''
## Task:
Answer user queries in an informative way, leveraging the provided context for calculations and responses.

## Instructions:
Respond directly to the user query, avoiding additional explanations or markdown.
Utilize the given context for calculations and formulating answers.
Strictly adhere to the provided context, omitting external information.

## Input:
> Context:
{context}

> User Query:{user_query}

> Answer:'''
)

# Main Work Pipline

In [16]:
user_query = "Explain how Allah tell us the method of distribution of inheritance? Also mention the verse number and the name of the Surah."
prompt_1 = prompt_template.format(user_query=user_query)
llm_res_1 = llm.invoke(prompt_1)
llm_res_1_json = string_to_json(llm_res_1)
print("Step 1: LLM queries generated")

web_urls = query_list_to_urls_results(llm_res_1_json['search_query_list'], 3, duckduckgo_results)
chunks = splitter(loader_with_cleaner(web_urls, async_mode=True, requests_per_second=2))
print("Step 2: Data Loaded and cleaned")

db = await get_vDB(chunks, embeddings, folder_path="vdbs", index_name="index_5", async_mode=True)
similar_chunks = get_similar_docs(llm_res_1_json['search_query_list'], db, 3)
print("Step 3: Vector database is prepared and used")

context = documents_to_contextText(similar_chunks)
prompt_2 = prompt_template_2.format(context=context, user_query=user_query)
llm_res_2 = llm.invoke(prompt_2)
gc.collect()
print("Step 4: Final response from LLM is generated")

print(f"\n\nAnswer:\n {llm_res_2.content.strip()} ")

Index saved successfully at vdbs/index_5
Step 3: Vector database is prepared and used
Step 4: Final response from LLM is generated


Answer:
 Allah decrees in the Quran, specifically in Surah Nisa, verses 4:11-12 and 4:176, the shares of inheritance for each eligible recipient. 


In [20]:
llm_res_1

AIMessage(content='```json\n{\n    "original_query": "Explain how Allah tell us the method of distribution of inheritance? Also mention the verse number and the name of the Surah.",\n    "search_query_list": [\n        "islamic inheritance laws verse",\n        "quran verses on inheritance distribution",\n        "surah about inheritance in islam",\n        "method of inheritance distribution in islam"\n    ]\n}\n```', response_metadata={'prompt_feedback': {'block_reason': 0, 'safety_ratings': []}, 'finish_reason': 'STOP', 'safety_ratings': [{'category': 'HARM_CATEGORY_SEXUALLY_EXPLICIT', 'probability': 'NEGLIGIBLE', 'blocked': False}, {'category': 'HARM_CATEGORY_HATE_SPEECH', 'probability': 'NEGLIGIBLE', 'blocked': False}, {'category': 'HARM_CATEGORY_HARASSMENT', 'probability': 'NEGLIGIBLE', 'blocked': False}, {'category': 'HARM_CATEGORY_DANGEROUS_CONTENT', 'probability': 'NEGLIGIBLE', 'blocked': False}]}, id='run-56a07ffe-d12e-4056-9977-522f9314b04d-0', usage_metadata={'input_tokens'

In [28]:
dict(llm_res_1)["usage_metadata"]

{'input_tokens': 396, 'output_tokens': 91, 'total_tokens': 487}

In [43]:
def usage_calculator(response_list:list, input_price, output_price, token_per_unit:int=1000, round_off:int=6):
    """ 
    INPUT:
        response_list: list of responses from LLMs
        input_price: price of per unit [default is 1000] input tokens
        output_price: price of per unit [default is 1000] output tokens
    OUTPUT:
        cost: cost of the responses
    """
    
    
    input_price_per_token = input_price/token_per_unit
    output_price_per_token = output_price/token_per_unit
    input_tokens = 0
    output_tokens = 0
    
    for response in response_list:
        input_tokens += dict(response)["usage_metadata"]['input_tokens']
        output_tokens += dict(response)["usage_metadata"]['output_tokens']
    
    input_cost = round(input_tokens * input_price_per_token, round_off)
    output_cost = round(output_tokens * output_price_per_token, round_off)
    
    return {
        "cost": {
            "total_cost": input_cost + output_cost,
            "input_cost": input_cost,
            "output_cost": output_cost
        },
        "token": {
            "total_tokens": input_tokens + output_tokens,
            "input_tokens": input_tokens,
            "output_tokens": output_tokens
        }
    }

In [45]:
usage = usage_calculator(
    [llm_res_1, llm_res_2],
    input_price=3.5,
    output_price=10.5,
    token_per_unit=1e6,
    round_off=10)

usage

{'cost': {'total_cost': 0.0062125,
  'input_cost': 0.0048475,
  'output_cost': 0.001365},
 'token': {'total_tokens': 1515, 'input_tokens': 1385, 'output_tokens': 130}}

In [52]:
usage['cost']['total_cost']

0.0062125