# Package Installations

In [3]:
# !pip install langgraph
# !pip install rank_bm25
# !pip install flair
# !pip install faiss-cpu

In [4]:
# import nltk
# nltk.download('punkt_tab')
# !pip install --upgrade google-genai

# library imports

In [1]:
from collections import defaultdict
from datetime import datetime
from flair.data import Sentence
from flair.models import SequenceTagger

from google import genai
from google.cloud import aiplatform
from google.genai import types
from google.genai.types import HarmBlockThreshold
from google.genai.types import HarmCategory
import google.auth

import faiss
import numpy as np
import pandas as pd
import os
import pandas
import pickle
import json

from langchain.output_parsers import PydanticOutputParser
from langchain.tools import tool
# from langchain_community.vectorstores import FAISS
from langchain_core.callbacks import CallbackManagerForLLMRun
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.language_models import BaseLLM
from langchain_core.outputs import Generation,LLMResult
from langchain.prompts import PromptTemplate
from langgraph.graph import END,StateGraph

from nltk.tokenize import word_tokenize
from pydantic import BaseModel, Field
from rank_bm25 import BM25Okapi
from sentence_transformers import CrossEncoder
from sklearn.preprocessing import normalize
import re

from typing import Any, Dict, List, Optional, Union
from typing_extensions import Optional, TypedDict
import vertexai
from vertexai.generative_models import FinishReason, GenerativeModel, Part, SafetySetting
from vertexai.language_models import TextEmbedding, TextEmbeddingModel
import warnings
warnings.filterwarnings("ignore")

from time import time

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
# Prompt import
import sys
sys.path.append('/content/drive/MyDrive/Colab Notebooks/Conversational BI')
from master_prompt import db1Main,db1Clarification,summarizerPrompt


In [4]:
# print(db1Main)

In [5]:
# authentication
from google.colab import auth
auth.authenticate_user()

In [6]:
import google.generativeai as genai
from google.colab import userdata

GOOGLE_API_KEY = userdata.get('GOOGLE_API_KEY')
genai.configure(api_key=GOOGLE_API_KEY)

In [7]:
# project variables

myproject = "conversational-bi-080825"
mylocation = "us-central1"
myapi_endpoint = "us-central1-aiplatform.googleapis.com"


In [8]:
from google import genai

class CustomerVertexAILLM(BaseLLM):
    """Custom LangChain LLM wrapper for Vertex AI Gemini models."""

    # Default generation settings
    generation_config: Dict = {
        "temperature": 0,
        "top_p": 0.95,
        "seed": 0,
        "max_output_tokens": 8192
    }

    # Vertex AI content moderation configuration
    safety_settings: List[SafetySetting] = [
        SafetySetting(category="HARM_CATEGORY_HATE_SPEECH", threshold="BLOCK_MEDIUM_AND_ABOVE"),
        SafetySetting(category="HARM_CATEGORY_DANGEROUS_CONTENT", threshold="BLOCK_MEDIUM_AND_ABOVE"),
        SafetySetting(category="HARM_CATEGORY_SEXUALLY_EXPLICIT", threshold="BLOCK_MEDIUM_AND_ABOVE"),
        SafetySetting(category="HARM_CATEGORY_HARASSMENT", threshold="BLOCK_MEDIUM_AND_ABOVE"),
    ]

    def _generate(
        self,
        prompts: List[str],
        stop: Optional[List[str]] = None,
        **kwargs: Any
    ) -> LLMResult:

        # ===== 1. Initialize Vertex AI =====
        vertexai.init(
            project=myproject,        # replace with your project ID
            location=mylocation,      # replace with your region, e.g., "us-central1"
            api_endpoint=myapi_endpoint  # optional, usually auto-detected
        )

        # ===== 2. Create genai client =====
        llm_client = genai.Client(
            vertexai=True,
            project=myproject,
            location=mylocation
        )

        model_name = "gemini-2.5-flash"
        generations: List[List[Generation]] = []

        # ===== 3. Loop through each prompt =====
        for prompt in prompts:
            # Convert prompt to Vertex AI content format
            contents = [
                types.Content(
                    role="user",
                    parts=[types.Part.from_text(text=prompt)]
                )
            ]

            # ===== 4. Build generation configuration =====
            generate_content_config = types.GenerateContentConfig(
                temperature=self.generation_config.get("temperature", 0),
                top_p=self.generation_config.get("top_p", 0.95),
                seed=self.generation_config.get("seed", 0),
                max_output_tokens=self.generation_config.get("max_output_tokens", 8192),
                safety_settings=self.safety_settings,
                thinking_config=types.ThinkingConfig(
                    thinking_budget=-1  # unlimited reasoning budget
                ),
            )

            # ===== 5. Call Gemini model =====
            try:
                response = llm_client.models.generate_content(
                    model=model_name,
                    contents=contents,
                    config=generate_content_config,
                )

                # Extract text safely
                text = ""
                if response.candidates and response.candidates[0].content.parts:
                    text = response.candidates[0].content.parts[0].text

            except Exception as e:
                text = f"Error during model generation: {e}"
                print(text)  # Debugging

            # Append generation result
            generations.append([Generation(text=text)])

        # ===== 6. Return LangChain-standard result =====
        return LLMResult(generations=generations)

    @property
    def _llm_type(self) -> str:
        return "custom_vertexai"


In [9]:


class CustomVertexAIEmbeddings(Embeddings):
    model_name: str = "text-embedding-004"
    batch_size: int = 200

    def __init__(self,
                 model_name: str = "text-embedding-004",
                 project: str = myproject,
                 location: str = mylocation,
                 api_endpoint: str = myapi_endpoint):
        super().__init__()
        vertexai.init(project=project, location=location, api_endpoint=api_endpoint)
        self.model = TextEmbeddingModel.from_pretrained(model_name)

    def embed_documents(self, texts: List[str]) -> List[List[float]]:
        all_embeddings = []
        try:
            for i in range(0, len(texts), self.batch_size):
                batch = texts[i:i + self.batch_size]
                embeddings = self.model.get_embeddings(batch)
                all_embeddings.extend([embedding.values for embedding in embeddings])
            return all_embeddings
        except Exception as e:
            print(f"Error embedding documents: {e}")
            return [[] for _ in texts]

    def embed_query(self, text: str) -> List[float]:
        try:
            embedding = self.model.get_embeddings([text])[0]
            return embedding.values
        except Exception as e:
            print(f"Error embedding query: {e}")
            return []

    @property
    def _embedding_type(self) -> str:
        return "custom_vertexai_embeddings"


In [10]:
class NL2SQLResponse(BaseModel):
  sql_query:str = Field(description = "The SQL Query")
  explaination:str = Field(description = "Explaination of how SQL Query was generated")
  assumption:str = Field(description = "Assumptions made while generating SQL Query")

parser = PydanticOutputParser(pydantic_object= NL2SQLResponse)
format_instruction = parser.get_format_instructions()

In [11]:
class BusinessAssistantBackend:
  def __init__(self):
    vertexai.init(
        project=myproject,
        location=mylocation,
        api_endpoint=myapi_endpoint
    )
    self.credentials, self.project = google.auth.default()
    aiplatform.init(project=self.project,credentials=self.credentials)

    self.llm = CustomerVertexAILLM()
    self.embeddings = CustomVertexAIEmbeddings()
  def llm_call(self,query):
    return self.llm.generate([query]).generations[0][0].text


backend = BusinessAssistantBackend()
embeddings = backend.embeddings

In [12]:
def format_conversation_history(history:list)-> str:
  conversation = ""
  for turn in history:
    if turn['role'] == 'user':
      conversation+=f"\nUser: {turn['content']}"
    elif turn['role'] == 'Agent':
      conversation+=f"\nAssistant: {turn['content']}"
  return conversation.strip()


def remove_query_ready_flag(text):
  try:
    pattern = r'\[?\s*query_ready\s*:\s*(True|Flase)\s*]?'
    cleaned_text = re.sub(pattern,'',text)
    return re.sub(r'\s{2,}',' ',cleaned_text).strip()
  except Exception as e:
    return text


def normalize_scores(results):
  scores = [r["score"] for r in results]
  min_s, max_s = min(scores), max(scores)
  for r in results:
    r["norm_score"] = (r['score']-min_s) / (max_s-min_s + 1e-8)
  return results

In [13]:
# RAG Architecture functions related to query generation and clarification
def query_ner(query):
  tagger = SequenceTagger.load("ner")

  sentence = Sentence(query.upper())

  tagger.predict(sentence)
  entities = [entity.text for entity in sentence.get_spans("ner")]

  return entities

def hybrid_search(query,distinct_value_df,dashboard):
  texts = distinct_value_df['Value'].astype(str).tolist()
  columns = distinct_value_df['Column_Name'].astype(str).tolist()
  corpus = [f'Value: {v}, Column: {c}' for v,c in zip(texts,columns)]

  tokenized_corpus = [word_tokenize(doc.lower()) for doc in corpus]
  bm25 = BM25Okapi(tokenized_corpus)

  if dashboard == "Dashboard1":
    index = faiss.read_index('/content/drive/MyDrive/Colab Notebooks/Conversational BI/RAG/db1_index.faiss')
    with open('/content/drive/MyDrive/Colab Notebooks/Conversational BI/RAG/db1_metadata.pkl','rb') as f:
      docs = pickle.load(f)

  elif dashboard == "Call_Center_Interaction":
    index = faiss.read_index('/content/drive/MyDrive/Colab Notebooks/Conversational BI/RAG/Call_center_interaction_index.faiss')
    with open('/content/drive/MyDrive/Colab Notebooks/Conversational BI/RAG/db1_metadata.pkl','rb') as f:
      docs = pickle.load(f)



  # Faiss results
  query_emebedding = embeddings.embed_query(query)
  query_vec = normalize(np.array([query_emebedding],dtype='float32'),axis=1)
  D_faiss,I_faiss = index.search(query_vec,6)
  faiss_results = [{'Value':corpus[i],"score":float(D_faiss[0][j]),'source':'faiss'} for j,i in enumerate(I_faiss[0])]

  # BM25 results

  query_tokens = word_tokenize(query.lower())
  bm25_scores = bm25.get_scores(query_tokens)
  top_bm25 = sorted(enumerate(bm25_scores),key=lambda x: x[1], reverse=True)[:6]
  bm25_results = [{'Value':corpus[i],"score":float(s),'source':'bm25'} for i,s in top_bm25]


  faiss_results = normalize_scores(faiss_results)
  bm25_results = normalize_scores(bm25_results)

  merged = defaultdict(lambda: {'bm25':0,'faiss':0})

  for r in faiss_results+bm25_results:
    merged[r['Value']][r['source']] = r['norm_score']


  final = [{"Value":val,"score":0.5*v['bm25']+0.5*v['faiss']} for val,v in merged.items()]
  final = sorted(final,key = lambda x:x['score'],reverse=True)[:8]

  # Reranking with cross encoder

  cross_encoder = CrossEncoder('BAAI/bge-reranker-large')
  pairs = [(query,r['Value']) for r in final]
  rerank_scores = cross_encoder.predict(pairs)

  reranked_scores = cross_encoder.predict(pairs)

  reranked = sorted(zip(final,rerank_scores),key = lambda x:x[1], reverse = False)
  final_res = ""
  for r,score in reranked:
    final_res+=f'{r["Value"]} ' +"\n"
  return final_res


# Dense RAG functions - FAISS
def dense_rag_retreival(dashboard,query):
  if dashboard == 'Dashboard1':
    index = faiss.read_index('/content/drive/MyDrive/Colab Notebooks/Conversational BI/RAG/db1_index.faiss')
    with open('/content/drive/MyDrive/Colab Notebooks/Conversational BI/RAG/db1_metadata.pkl','rb') as f:
      docs = pickle.load(f)

  elif dashboard == 'Call_Center_Interaction':
    index = faiss.read_index('/content/drive/MyDrive/Colab Notebooks/Conversational BI/RAG/Call_center_interaction_index.faiss')
    with open('/content/drive/MyDrive/Colab Notebooks/Conversational BI/RAG/db1_metadata.pkl','rb') as f:
      docs = pickle.load(f)


  else:
    return "  "

  query_embedding = embeddings.embed_query(query)
  query_vec = normalize(np.array([query_embedding],dtype='float32'),axis=1)

  k = 15
  D,I = index.search(query_vec,k)
  threshold = 0.60

  samples = []
  for idx,score in zip(I[0],D[0]):
    if idx < len(docs):
      sample = {"Column_Name": str(docs[idx]['metadata']),"Value":str(docs[idx]['text']),"Score":str(score)}
      samples.append(sample)
  sample_string = json.dumps(samples)
  return sample_string


In [14]:
# Clarification prompt info provider function

def clari_prompt_info_provider(dashboard):
  if dashboard == 'Dashboard1':
    db1_dd = pd.read_csv('/content/drive/MyDrive/Colab Notebooks/Conversational BI/Dictionary/DB1_dd - Sheet1.csv').to_markdown()
    clari_prompt_db1 = db1Clarification
    payload = {"dd_db1":db1_dd,"prompt":clari_prompt_db1}
    return payload

  elif dashboard == 'Call_Center_Interaction':
    db1_dd = pd.read_csv('/content/drive/MyDrive/Colab Notebooks/Conversational BI/Dictionary/dd_call_center_interaction.csv').to_markdown()
    clari_prompt_db1 = db1Clarification
    payload = {"dd_db1":db1_dd,"prompt":clari_prompt_db1}
    return payload

# clarification prompt builder
def clarification_prompt_builder(dashboard,history):
  if len(history) == 1:
    query = str(history[-1]["content"])
  else:
    query = str(history[0]["content"]) + " " + str(history[-1]["content"])

  if dashboard == 'Dashboard1':
    payload = clari_prompt_info_provider(dashboard)
    dense_rag_data = dense_rag_retreival(dashboard,query)
    total_rag_data = dense_rag_data + "\n" #+ ner_hybrid_rag_data

    prompt = PromptTemplate(
        input_variables = ["table_nm","schema","rag_data","history"],
        template = payload['prompt'].strip()
    )

    formatted_prompt = prompt.format(
        table_nm = 'table_dashboard1',
        schema = payload['dd_db1'],
        rag_data = total_rag_data,
        history = format_conversation_history(history)
    )

    return formatted_prompt

  elif dashboard == 'Call_Center_Interaction':
    payload = clari_prompt_info_provider(dashboard)
    dense_rag_data = dense_rag_retreival(dashboard,query)
    total_rag_data = dense_rag_data + "\n" #+ ner_hybrid_rag_data

    prompt = PromptTemplate(
        input_variables = ["table_nm","schema","rag_data","history"],
        template = payload['prompt'].strip()
    )

    formatted_prompt = prompt.format(
        table_nm = 'table_call_center_interaction',
        schema = payload['dd_db1'],
        rag_data = total_rag_data,
        history = format_conversation_history(history)
    )

    return formatted_prompt

  else:
    print("no prompt", dashboard)

In [15]:
# SQL Generation prompt info provider

def sql_gen_prompt_info_provider(dashboard):
  if dashboard == "Dashboard1":
    db1_dd = pd.read_csv('/content/drive/MyDrive/Colab Notebooks/Conversational BI/Dictionary/DB1_dd - Sheet1.csv').to_markdown()
    sql_gen_prompt = db1Main
    payload = {'dd_db1':db1_dd,"prompt":sql_gen_prompt}
    return payload

  elif dashboard == "Call_Center_Interaction":
    db1_dd = pd.read_csv('/content/drive/MyDrive/Colab Notebooks/Conversational BI/Dictionary/dd_call_center_interaction.csv').to_markdown()
    sql_gen_prompt = db1Main
    payload = {'dd_db1':db1_dd,"prompt":sql_gen_prompt}
    return payload



# SQL Generation prompt builder function

def sql_gen_prompt_builder(dashboard,plan,chat_history):
  if dashboard == "Dashboard1":
    payload = sql_gen_prompt_info_provider(dashboard)
    prompt = PromptTemplate(
        input_varibles = ["table_nm","schema","plan","format_instruction","history"],
        template = payload['prompt'].strip()
    )

    formatted_prompt = prompt.format(
        table_nm = 'table_dashboard1',
        schema = payload['dd_db1'],
        plan = plan,
        format_instruction = format_instruction,
        history = format_conversation_history(chat_history)
    )
    return formatted_prompt

  elif dashboard == "Call_Center_Interaction":
    payload = sql_gen_prompt_info_provider(dashboard)
    prompt = PromptTemplate(
        input_varibles = ["table_nm","schema","plan","format_instruction","history"],
        template = payload['prompt'].strip()
    )

    formatted_prompt = prompt.format(
        table_nm = 'table_call_center_interaction',
        schema = payload['dd_db1'],
        plan = plan,
        format_instruction = format_instruction,
        history = format_conversation_history(chat_history)
    )

    return formatted_prompt




In [16]:
# SQL Summary prompt info provider

def sql_summary_prompt_info_provider():
  sql_summary_prompt = summarizerPrompt
  payload = {"prompt":sql_summary_prompt}
  return payload


# SQL Summary prompt builder function

def sql_summary_prompt_builder(question,plan,data):
  payload = sql_summary_prompt_info_provider()
  prompt = PromptTemplate(
      input_varibles = ["question","plan","data"],
      template = payload['prompt'].strip()
  )

  formatted_prompt = prompt.format(
      question = question,
      plan = plan,
      data = data
  )
  return formatted_prompt




In [17]:
def extract_sql_query(text: str) -> str:
    """
    Extracts SQL query by removing ``` and sql keyword
    """
    return (
        text.strip()
        .removeprefix("```sql")
        .removeprefix("```")
        .removesuffix("```")
        .strip()
    )


def execute_sql(query):
  sql_data = pd.read_sql(query, conn).to_markdown()
  return sql_data

In [18]:
import pandas as pd
import sqlite3
def create_sql_table(dashboard):
  if dashboard == 'Dashboard1':
    # 1. Load CSV into Pandas
    df = pd.read_csv("/content/drive/MyDrive/Colab Notebooks/Conversational BI/Data/Dashboard1 - Sheet1.csv")  # update path

    # 2. Create SQLite DB (in-memory or a file)
    conn = sqlite3.connect(":memory:")   # ":memory:" keeps it in RAM
    # conn = sqlite3.connect("mydb.sqlite")  # persistent file-based DB

    # 3. Write DataFrame into a SQL table
    df.to_sql("table_dashboard1", conn, index=False, if_exists="replace")

    # 4. Run SQL queries
    # query = "SELECT column1, COUNT(*) as cnt FROM mytable GROUP BY column1"
    # result = pd.read_sql(query, conn)

    # print(result)
  elif dashboard == 'Call_Center_Interaction':
  # 1. Load CSV into Pandas
    df = pd.read_csv("/content/drive/MyDrive/Colab Notebooks/Conversational BI/Data/InteractionSummary_Consolidated (1).xlsx - in.csv")  # update path

    # 2. Create SQLite DB (in-memory or a file)
    conn = sqlite3.connect(":memory:")   # ":memory:" keeps it in RAM
    # conn = sqlite3.connect("mydb.sqlite")  # persistent file-based DB

    # 3. Write DataFrame into a SQL table
    df.to_sql("table_call_center_interaction", conn, index=False, if_exists="replace")

    # 4. Run SQL queries
    # query = "SELECT column1, COUNT(*) as cnt FROM mytable GROUP BY column1"
    # result = pd.read_sql(query, conn)

    # print(result)
  return df, conn

In [19]:
if __name__ == "__main__":

  chat_history = []
  current_dashboard = 'Call_Center_Interaction'
  previous_dashboard = None
  df, conn = create_sql_table(current_dashboard)

  print("Business Assistant is ready. Type 'exit' or 'q' to quit")

  while True:
    user_input = input("You: ").strip()
    if user_input.lower() in ['exit','q']:
      print('GoodBye!')
      break

    chat_history.append({'role':'user','content': user_input})

    print(f"Dashboard Context: {current_dashboard}")
    start_time1 = time()
    if previous_dashboard != current_dashboard:
      print(f"Initializing context for '{current_dashboard}'. Resetting conversation history for this topic")
      chat_history = [msg for msg in chat_history if msg['role'] == 'user'][-1:]
      previous_dashboard = current_dashboard
    print('Final Chat History for LLM:', chat_history)
    prompt_clarification = clarification_prompt_builder(current_dashboard,chat_history)
    response_plan = backend.llm_call(prompt_clarification)
    end_time1 = time()
    print(response_plan)
    print("time elapsed in clarification: ",end_time1 - start_time1,"s")

    if "True".lower() in response_plan.lower():
      start_time2 = time()
      reponse_temp = remove_query_ready_flag(response_plan)
      sql_gen_prompt = sql_gen_prompt_builder(current_dashboard, reponse_temp,chat_history)
      response_query = backend.llm_call(sql_gen_prompt)
      parsed_output = parser.parse(response_query)
      sql_query = parsed_output.sql_query
      # new additions
      extracted_sql_query = extract_sql_query(sql_query)
      sql_query_res = execute_sql(extracted_sql_query)
      print(f"Agent: Generated SQL: \n{sql_query}")
      end_time2 = time()
      print("time elapsed in main: ",end_time2 - start_time2,"s")
      # new additions
      sql_summary_prompt = sql_summary_prompt_builder(user_input,reponse_temp,sql_query_res)
      start_time3 = time()
      response_summary = backend.llm_call(sql_summary_prompt)
      print(response_summary)
      end_time3 = time()
      print("time elapsed in summary: ",end_time3 - start_time3,"s")
      chat_history.append({'role': 'Agent', "content": sql_query})
      if len(chat_history) > 10:
        chat_history = chat_history[-10:]
      chat_history[1:-1] = []
    else:
      chat_history.append({"role": "Agent", "content": response_plan})
      if len(chat_history) > 10:
        chat_history = chat_history[-10:]

    print("Total time",time()-start_time1,'s')


Business Assistant is ready. Type 'exit' or 'q' to quit
You: county wise conversion rate
Dashboard Context: Call_Center_Interaction
Initializing context for 'Call_Center_Interaction'. Resetting conversation history for this topic
Final Chat History for LLM: [{'role': 'user', 'content': 'county wise conversion rate'}]
```sql
SELECT
  County,
  CAST(SUM(Enrollments) AS FLOAT64) / NULLIF(COUNT(ContactId), 0) AS conversion_rate
FROM table_call_center_interaction
GROUP BY
  County;
```
[query_ready: True]
time elapsed in clarification:  5.935588836669922 s
Agent: Generated SQL: 
```sql
SELECT
  County,
  ROUND(CAST(SUM(Enrollments) AS FLOAT64) / NULLIF(COUNT(ContactId), 0), 2) AS conversion_rate
FROM
  table_call_center_interaction
GROUP BY
  County;
```
time elapsed in main:  4.738275766372681 s
**User Intent**
   - The user's intent was to determine the conversion rate (enrollments per contact) for each county.

**Result Summary**
   - The query successfully calculated the conversion rate

In [26]:
# Run SQL queries
query = """
SELECT
    *
FROM
    table_call_center_interaction
    limit 1;
    """
result = pd.read_sql(query, conn)

print(result)

  State        AgentName CallStatus  CallType        CampaignPocName  \
0    AL  Robert Thompson   Answered  Callback  HealthPlan United-IVR   

    Channel County                Factor1         Manager            Partner  \
0  CarrierX   None  HealthPlan United-IVR  Jessica Martel  HealthPlan United   

   ...      Newani OriginalLeadCost  RecordId StatTime SubDisposition  \
0  ...  56788999.0              0.0  386842.0     None           None   

   TeamCaptain TotalCost  UniqueCalls   ZIP   MA  
0     John Doe       $0        0.2283  None  0.0  

[1 rows x 50 columns]
