## 1. Imports & Env Set-up

In [1]:
import logging

logging.basicConfig(format="%(levelname)s - %(name)s -  %(message)s", level=logging.WARNING)
logging.getLogger("haystack").setLevel(logging.WARNING)

In [2]:
import os
from IPython.display import Markdown, display
import gradio as gr

from dotenv import load_dotenv
load_dotenv()

import warnings
warnings.filterwarnings('ignore')

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
from haystack import Pipeline
from haystack import Document
from haystack.dataclasses import ChatMessage

from haystack.utils.auth import Secret
from haystack.components.builders import PromptBuilder

from haystack.components.preprocessors.document_splitter import DocumentSplitter
from haystack.components.writers import DocumentWriter

from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever

from haystack.components.generators import OpenAIGenerator
from haystack.components.embedders import OpenAIDocumentEmbedder
from haystack.components.embedders import OpenAITextEmbedder


## 2. Indexing Pipeline

In [4]:
document_store = InMemoryDocumentStore()

In [5]:
import os
import pandas as pd
from azure.storage.blob import BlobServiceClient
from io import BytesIO

# Initialize the BlobServiceClient
blob_service_client = BlobServiceClient.from_connection_string(os.getenv("AZURE_STORAGE_CONNECTION_STRING"))

# Get a client for the specific container
container_client = blob_service_client.get_container_client("prod-container")

# Specify the exact path of the folder
genie_product='brand_pulse'
client_name='kenvue'
product_category='US_skincare'
folder_path = f"outputs/{genie_product}/{client_name}/{product_category}/bot_output/"

# List blobs in the exact folder
blob_list = container_client.list_blobs(name_starts_with=folder_path)
# Initialize a list to store documents
documents = []
dfs = []

for blob in blob_list:
    blob_client = container_client.get_blob_client(blob)
    blob_extension = blob.name.split('.')[-1].lower()

    # Download and read the blob content based on file type
    if blob_extension == 'csv':
        blob_content = blob_client.download_blob().readall()
        df = pd.read_csv(BytesIO(blob_content))
    elif blob_extension == 'parquet':
        blob_content = blob_client.download_blob().readall()
        df = pd.read_parquet(BytesIO(blob_content))
    else:
        continue  # Skip if it's not a CSV or Parquet file

    # Convert the entire DataFrame to a JSON string
    documents.append({
        'content': df.to_json(orient='records'),  # Converting the DataFrame to JSON with records orientation
        'name': blob.name
    })

    dfs.append(df)

In [6]:
# Convert your documents into Haystack Documents and write them to the store
haystack_documents = [Document(content=doc['content'], meta={"name": doc['name']}) for doc in documents]
len(haystack_documents)

1

In [7]:
splitter = DocumentSplitter(split_length = 200, split_overlap = 10, split_threshold = 20)
embedder = OpenAIDocumentEmbedder(model="text-embedding-3-small")
writer = DocumentWriter(document_store=document_store)

indexing = Pipeline()
indexing.add_component("splitter", splitter)
indexing.add_component("embedder", embedder)
indexing.add_component("writer", writer)

indexing.connect("splitter", "embedder")
indexing.connect("embedder", "writer")

<haystack.core.pipeline.pipeline.Pipeline object at 0x76579cca9c70>
🚅 Components
  - splitter: DocumentSplitter
  - embedder: OpenAIDocumentEmbedder
  - writer: DocumentWriter
🛤️ Connections
  - splitter.documents -> embedder.documents (List[Document])
  - embedder.documents -> writer.documents (List[Document])

In [8]:
indexing.run({ "splitter": {"documents": haystack_documents } })

Calculating embeddings: 100%|██████████| 61/61 [00:58<00:00,  1.04it/s]


{'embedder': {'meta': {'model': 'text-embedding-3-small',
   'usage': {'prompt_tokens': 3924550, 'total_tokens': 3924550}}},
 'writer': {'documents_written': 1939}}

In [9]:
import yaml
import os
import json

file_path = "field_description.yaml"
yaml_file_path = os.path.join(os.getcwd(), file_path)

with open(file_path, "r") as file:
    field_data = yaml.safe_load(file)

df_name = "summary"
table_description_dict = field_data.get(f"{df_name}").get("table_description", " ")
field_descriptions_dict = field_data.get(f"{df_name}").get("field_description", " ")

table_description = json.dumps(table_description_dict)
field_descriptions = json.dumps(field_descriptions_dict)

## 3. Custom Components

In [10]:
system_prompt = """You are an 25+ year experienced `Advanced Business Analyst & counsellor` with statistical capability and domain expertise.
Your primary task is to assist non-technical users in analyzing data with deep dive along with inference and suggestions as needed.
Strictly, Don't assume random data, in any case.
"""

### 3.0 Length Validator

In [11]:
from haystack import component
from pandasai.llm import OpenAI
from pandasai import Agent
from response_parser import GenieResponse
from pandasai.connectors import PandasConnector

import json
import yaml
import os
from typing import List
import pandas as pd

In [12]:
@component
class LengthValidator:
    def __init__(self, max_length=1000000, max_tokens=100000):
        self.max_length = max_length
        self.max_tokens = max_tokens

    @component.output_types(prompt=List[ChatMessage])
    def run(self, prompt:List[ChatMessage], **kwargs):
        """
        Checks if the content exceeds the max length. If so, trims the content from the end.
        """
        # print(prompt)
        content_str = prompt[0].content
        
        if len(content_str) > self.max_length:
            # Trimming from the end
            content_str = content_str[:self.max_length]
            print(f"Content was too long, trimmed to {self.max_length} characters.")

        if len(content_str.split()) > self.max_tokens:
            # Trimming from the end
            content_str = ' '.join(content_str.split()[:self.max_tokens])
            print(f"Content was too long, trimmed to {self.max_length} characters.")

        # Update the content back into the dictionary
        prompt[0].content = content_str
        
        return {"prompt": prompt}

### 3.1 RAG Components

In [13]:
map_prompt = """
Answer Strictly only for the asked original question in format of CRISP YET INSIGHTFUL Enterprise Grade Business Case Study ( dont't mention).
Include focused analysis for all possible nuances in the asked question.
If the answer is not contained within the context, reply with clarification questions. Don't assume random data, in any case.

```
Context:
{% for doc in documents %}
   {{ doc.content }}
{% endfor %}
```

```
Table descriptions:
{{table_description}}

Fields/Columns description:
{{field_descriptions}}
```
Instructions:
1. Respond insighfully in crisp, structured and bulleted formats with facts, numbers (strictly up to 2 decimal only) and inferences.
2. Strictly Avoid mentioning phrases like "Based on the shared context/dataset" etc.
3. When need to provide analysis, Use quantitative values to provide factual accuracy instead of relying upon the text mentions.

Question: {{ query }}
"""

reduce_prompt = """Given the conversation history and the provided supporting documents, give a brief answer to the question.
Note that supporting documents are not part of the conversation. If question can't be answered from supporting documents, say so & if needed ask clarification questions.

Answer Strictly only for the asked original question in format of CRISP YET INSIGHTFUL Enterprise Grade Business Case Study ( dont't mention).
Include focused analysis for all possible nuances in the asked question.
If the answer is not contained within the context, reply with clarification questions. Don't assume random data.

    Conversation history:
    {% for memory in memories %}
        {{ memory.content }}
    {% endfor %}

    Supporting documents:
    {% for doc in documents %}
        {{ doc }}
    {% endfor %}

Instructions:
1. Respond insighfully as a "Pro Business Analyst" in well-structured and bulleted formats with facts, numbers (strictly up to 2 decimal only) and drawn inferences (TLdr).
2. Strictly Avoid mentioning phrases like "Based on the shared context/dataset", "mentioned", "shared" etc.
3. When need to provide analysis, Use quantitative values to provide factual accuracy instead of relying upon the text mentions.

\nQuestion: {{query}}
\nAnswer:
"""

In [14]:
from haystack import component, Document
import concurrent.futures
from typing import List
from haystack.components.builders import ChatPromptBuilder, PromptBuilder
from haystack.components.generators.chat import OpenAIChatGenerator

@component
class BatchGenerator:
    outgoing_edges = 1

    def __init__(self, question_prompt, system_prompt, table_description, field_descriptions, batch_size: int = 10):
        self.question_prompt = question_prompt
        self.system_prompt = system_prompt
        self.batch_size = batch_size
        self.table_description=table_description
        self.field_descriptions=field_descriptions

    @component.output_types(replies=List)
    def run(self, documents: List[Document], user_asked_query: str, **kwargs):

        self.user_asked_query = user_asked_query
        batches_of_10 = [documents[i:i + self.batch_size] for i in range(0, len(documents), self.batch_size)]
        print(f"Total Batch Size {len(batches_of_10)} from {len(documents)} documents")
        # Process batches in parallel
        with concurrent.futures.ThreadPoolExecutor() as executor:
            batch_results = list(executor.map(self.process_batch, batches_of_10))

        # Combine all batch results into one final output
        # final_result = "\n".join(batch_results)
        # print(final_result)
        return {"replies": batch_results}

    def process_batch(self, batch_doc: List[Document]):

        # prompt_builder = PromptBuilder(template=self.question_prompt)
        prompt_builder = ChatPromptBuilder(template=[ChatMessage.from_user(self.question_prompt)])
        generator = OpenAIChatGenerator(model="gpt-4o-mini", generation_kwargs={'temperature':0, 'seed':101, 'n':1, 'max_tokens':1000 })

        batch_rag = Pipeline()
        batch_rag.add_component("prompt", prompt_builder)
        batch_rag.add_component("length_val", LengthValidator())
        batch_rag.add_component("generator", generator)
        batch_rag.connect("prompt", "length_val")
        batch_rag.connect("length_val", "generator")
        # Generate output for individual batch
        response = batch_rag.run({"prompt": {"documents": batch_doc
                                             , "query": self.user_asked_query
                                             , "table_description":self.table_description
                                             , "field_descriptions":self.field_descriptions
                                }})

        return response["generator"]["replies"][0]

### 3.2 DATAFRAME CONTEXT Component

In [15]:
@component
class DataContextGenerator:

    def __init__(self):
        self.llm_model = "gpt-4o-mini"
        self.temperature = 0.2
        self.memory_size = 25

    def load_yaml(self, file_path):
        """Load YAML file synchronously."""
        with open(file_path, "r") as file:
            return yaml.safe_load(file)

    def run_chat(self, prompt, agent):
        """Run the agent and handle DataFrame conversion to JSON."""
        try:
            result = agent.chat(prompt, output_type='dataframe')

            if isinstance(result, str):
                return result
            else:
                return ""
        except Exception as e:
            return ""

    def run_agent(self, question, df):
        # Load the YAML file (assumes it's in the current directory)
        file_path = "field_description.yaml"
        yaml_file_path = os.path.join(os.getcwd(), file_path)
        field_data = self.load_yaml(yaml_file_path)

        df_name = "summary"

        connector = PandasConnector({"original_df": df}
                            , name=f"{df_name}"
                            , description=field_data.get(f"{df_name}").get("table_description", " ")
                            , field_descriptions=field_data.get(f"{df_name}").get("field_description", " "))

        # Initialize LLM
        llm = OpenAI(model=self.llm_model, temperature=self.temperature, seed=10)

        # Create the agent
        agent = Agent(
            connector,
            memory_size=self.memory_size,
            description="""You are a 25+ year experienced `Pro Python & Pandas Assistant`. You specialize in developing precise dataframe generation code.
                Your primary task is to assist non-technical users by providing relevant dataframe to start the analysis, by aggregating at all relevant granularity.
                If the answer is not contained within the documents, reply with 'no_data'.
                By default consider Latest few `Year_Month` data, unless asked specifically otherwise, and mention it.

                ```
                Instructions:
                1. STRICTLY AVOID CHARTS, REVERT ONLY DATAFRAME.
                2. ENSURE TO DO SANITY CHECKS & HANDLE ERRORS INTERNALLY TO ANSWER.
                3. WHEN FETCHING HIGHEST, STRICTLY CONSIDER ALL MATCHING ITEMS WITH SIMILAR VALUES TO INCLUDE IN RESPONSE.
                ```
                """,
            config={
                "llm": llm,
                "open_charts": False,
                "save_charts": False,
                "verbose": False,
                "save_logs": False,
                "response_parser": GenieResponse,
                "max_retries": 2
            }
        )

        # Rephrase query and ask clarification questions
        rephrased_prompt = agent.rephrase_query(question)
        # clarification_questions = agent.clarification_questions(str(question))

        # Run the initial prompt and rephrased prompt concurrently
        response_dict = dict()

        response_dict[question], response_dict[rephrased_prompt] = self.run_chat(question, agent), self.run_chat(rephrased_prompt, agent)

        # Run clarification questions concurrently
        # clarifications_responses = [self.run_chat(que, agent) for que in clarification_questions]

        # for que, resp in zip(clarification_questions, clarifications_responses):
        #     response_dict[que] = resp

        return response_dict

    @component.output_types(replies=str)
    def run(self, user_asked_query: str, df_list: List, **kwargs):
        data_context_dict = self.run_agent(user_asked_query, df_list[0])
        return {"replies": json.dumps(data_context_dict)}


In [16]:
import tiktoken

@component
class ContextWindowLimiter:

    def trim_context(self, paragraph: str, max_tokens=100000):
        tokenizer = tiktoken.get_encoding("cl100k_base")  # Assuming GPT-4 tokenizer
        tokens = tokenizer.encode(paragraph)

        if len(tokens) <= max_tokens:
            return paragraph

        trimmed_tokens = tokens[-max_tokens:]
        trimmed_paragraph = tokenizer.decode(trimmed_tokens)

        return trimmed_paragraph

    def count_tokens(self, paragraph: str):
        tokenizer = tiktoken.get_encoding("cl100k_base")
        return len(tokenizer.encode(paragraph))

    @component.output_types(replies=str)
    def run(self, paragraph: str, **kwargs):
        limited_paragraph = self.trim_context(paragraph)
        return {"replies": limited_paragraph}

In [17]:

data_context_prompt = """Given the conversation history and the provided supporting documents, give a brief answer to the question.
Note that supporting documents are not part of the conversation. If question can't be answered from supporting documents, say so & if needed ask clarification questions.

Answer Strictly only for the asked original question in format of Enterprise Grade Business Case Study ( dont't mention).
Include focused analysis for all possible nuances in the asked question.
Here are the relevant dataframes extracted for the user asked questions and some clarification questions as well as context.

    Conversation history:
    {% for memory in memories %}
        {{ memory.content }}
    {% endfor %}

    Supporting documents:
    {{documents}}

Instructions:
1. Have facts, numbers (strictly up to 2 decimal only) and drawn inferences (TLdr).
2. Strictly Avoid mentioning phrases like "Based on the shared context/dataset", "mentioned", "shared" etc.
3. For numeric values, ensure to provide data in efficient PIVOT form tabular grids, if more than 2 rows, as per relevance and provide insights from it in reference to nuances of questions asked.
4. When analyse, prioritize only latest of months / years data, unless asked specifically otherwise.
5. Ensure to provide an enterprise grade answer in sections, but only limit to asked question.

\nQuestion: {{query}}
\nAnswer:
"""

### 3.3 Pipeline Selector

In [18]:
selector_prompt = """Based on the conversation history, user question, and the table structure with column descriptions, decide which ONE of the following should be used for the next step of analysis:

1. **RAG Pipeline**: Choose this if:
   - The question involves analyzing textual data embedded in the table (e.g., free-text fields, long descriptions).
   - The user question involves a broad analysis that isn't about performing calculations or operations on specific columns of the table.

2. **DATAFRAME Pipeline**: Choose this if:
   - The question directly/indirectly may require numerical columns or operations or date month year column
   - The user is asking for a straightforward statistical or numerical operation that can be executed directly on the dataset (e.g., sorting, filtering, grouping by a column).

3. **Generic Response**: Choose this ONLY if:
   - The user input is clearly a greeting, a generic comment, or irrelevant text that does not require any analytical pipeline.

### Decision Rules:
- If the question involves broad or interpretive insights, choose the `RAG Pipeline`.
- If the question involves quantified analysis on data (e.g., sums, averages, comparisons, trends, date / month / year), choose the `DATAFRAME Pipeline`.
- When both text and numerical elements are involved, select the pipeline that aligns best with the core requirement of the question (e.g., is the focus more on numeric calculations or text-based insights?).
- **Prioritize the `DATAFRAME Pipeline`**: Use the `DATAFRAME Pipeline` for any question that involves quantified analysis or structured data, even if the question includes some general context. Only choose the `RAG Pipeline` if the core of the question clearly requires textual analysis.
- **Limit `RAG Pipeline`** to situations where structured numerical analysis cannot solve the problem, such as when dealing with broad natural language queries or complex text-based analysis.
- **Fallback to the `DATAFRAME Pipeline`** if there is ambiguity between numerical and textual analysis.
- **Fallback to `Generic Response`** for anything irrelevant, greetings, or broad queries unrelated to the table data and including external calculations, not relevant to data.

### Table Structure and Column Descriptions:
    ```
    Table descriptions:
    {{table_description}}

    Fields/Columns description:
    {{field_descriptions}}
    ```

Carefully analyze the conversation history and the user question. Choose ONLY ONE option based on the best analysis of the input.

    Conversation history:
    {% for memory in memories %}
        {{ memory.content }}
    {% endfor %}

    User Question:
    {{query}}

Return only one: `RAG Pipeline`, `DATAFRAME Pipeline`, or `GENERIC Response`.
"""

In [19]:
routes = [
    {
        "condition": "{{'rag' in replies[0]|lower}}",
        "output": "{{query}}",
        "output_name": "go_to_rag",
        "output_type": str,
    },
    {
        "condition": "{{'dataframe' in replies[0]|lower}}",
        "output": "{{query}}",
        "output_name": "go_to_df",
        "output_type": str,
    },
    {
        "condition": "{{'generic' in replies[0]|lower}}",
        "output": "{{query}}",
        "output_name": "go_to_generic",
        "output_type": str,
    },
    {
        "condition": "{{'generic' not in replies[0]|lower}}",
        "output": "{{replies[0]}}",
        "output_name": "answer",
        "output_type": str,
    },
]

In [20]:
# from haystack import Pipeline
# from haystack.dataclasses import ChatMessage

# from haystack.components.builders import ChatPromptBuilder, PromptBuilder
# from haystack.components.generators.chat import OpenAIChatGenerator
# from haystack.components.routers import ConditionalRouter

# router = ConditionalRouter(routes=routes)


# demo_pipe = Pipeline()
# demo_pipe.add_component("selector_prompt", ChatPromptBuilder(template=[ChatMessage.from_user(selector_prompt)]))
# demo_pipe.add_component("llm", OpenAIChatGenerator(model="gpt-4o-mini", generation_kwargs={'temperature':0, 'seed':101, 'n':1 }))
# demo_pipe.add_component("router", ConditionalRouter(routes=routes))

# demo_pipe.connect("selector_prompt.prompt", "llm.messages")
# demo_pipe.connect("llm.replies", "router.replies")

# while True:
#     question = input("Enter your question or Q to exit.\n🧑 ")
#     if question=="Q":
#         break
#     print(question)
#     response = demo_pipe.run( data={
#         "selector_prompt": {"query": question, "table_description":table_description, "field_descriptions":field_descriptions},
#         "router": {"query": question},
#         })
#     print(response)
#     # print(response["llm"]["replies"][0].content)


### 3.4 Generic Component

In [21]:
generic_response_prompt = """You are Genie Bot, an advanced and friendly analytical assistant developed by i-Genie.
Given the conversation history, and mainly Based on the user's message, respond appropriately, whether it's a greeting, generic question, or analytical inquiry.
Adjust your tone according to the user's sentiment, whether positive, neutral, or negative, while always maintaining professionalism.

### Response Instructions:

1. **If the user greets you**:
   - Respond with enthusiasm: "Hello! It's great to hear from you! 😊 How can I assist you today with your data analysis?"

2. **If the user asks a generic or irrelevant question**:
   - Attempt to answer based on general knowledge, but suggest the user ask more data-related questions for better insights.
   - Politely guide the user with examples of business analysis questions, dynamically generated based on the provided data description.

3. **If the user provides positive feedback**:
   - Acknowledge their satisfaction: "I'm glad that was helpful! 😊"
   - Then offer further analysis suggestions tailored to the data: "Would you like to explore more detailed insights based on the current dataset?"

4. **If the user responds negatively**:
   - Stay calm and acknowledge potential limitations: "I understand this might not be what you expected, and I’m still learning. Sometimes, I may not be perfect."
   - Encourage collaboration: "Let’s explore further together. Based on the dataset, you can ask specific questions that may help refine the analysis."
   - Reassure them: "Your feedback helps me improve, and we can work together to find the right insights."

5. **If the user asks a data-related question**:
   - Provide the analysis as requested, and then offer additional suggestions for further exploration, based on the dataset.

6. **MUST suggest dynamic follow-up questions** based on the table and data description, without revealing schema details. Use the dataset to generate relevant, meaningful questions dynamically to keep the conversation engaging, but ensure that they are answerable from dataset.

````
### Underlying dataset Reference:
Below is the general description of the dataset, which should guide your suggestions:
    Table descriptions:
    {{table_description}}

    Fields/Columns description:
    {{field_descriptions}}
````

    Conversation history:
    {% for memory in memories %}
        {{ memory.content }}
    {% endfor %}

### Task:
1. Judge the user's message and tone to respond accordingly, whether it's a greeting, irrelevant query, positive or negative feedback, or data-related question.
2. Always suggest helpful business analysis question examples, crafted based on the data description & conversation history. You should be dynamically tailoring responses and follow-up questions based on the data description, coversation history and more importantly user question.
3. While suggesting question examples, ensure they are not much complex yet insightful, and data oriented.
4. Encourage deeper data exploration, keeping your tone friendly and professional, while guiding the user toward meaningful insights.

Keep your tone friendly, professional, and energetic where appropriate! You are here to help the user make the most of the tool!

\n **User Message**: {{query}}
\n **Response**:
"""

## 4. Pipeline development

In [22]:
from typing import List
from haystack import Pipeline
from haystack.components.builders import ChatPromptBuilder, PromptBuilder
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.components.generators import OpenAIGenerator
from haystack.components.joiners import BranchJoiner
from haystack.components.routers import ConditionalRouter

from haystack_experimental.chat_message_stores.in_memory import InMemoryChatMessageStore
from haystack_experimental.components.retrievers import ChatMessageRetriever
from haystack_experimental.components.writers import ChatMessageWriter

########################################
##   CUSTOM COMPONENT INSTANTIATION   ##
########################################

# Memory Components Instantiation
memory_store = InMemoryChatMessageStore()
memory_retriever = ChatMessageRetriever(memory_store)
memory_writer = ChatMessageWriter(memory_store)

# Custom Components Instantiation
batch_generator = BatchGenerator(question_prompt=map_prompt
                                 , system_prompt=system_prompt
                                 , table_description=table_description
                                 , field_descriptions=field_descriptions
                                 , batch_size=5)

data_context_generator = DataContextGenerator()

# Router Instantiation
routes = [
    {
        "condition": "{{'rag' in replies[0]|lower}}",
        "output": "{{query}}",
        "output_name": "go_to_rag",
        "output_type": str,
    },
    {
        "condition": "{{'dataframe' in replies[0]|lower}}",
        "output": "{{query}}",
        "output_name": "go_to_df",
        "output_type": str,
    },
    {
        "condition": "{{'generic' in replies[0]|lower}}",
        "output": "{{query}}",
        "output_name": "go_to_generic",
        "output_type": str,
    },
    {
        "condition": "{{'generic' not in replies[0]|lower}}",
        "output": "{{replies[0]}}",
        "output_name": "answer",
        "output_type": str,
    },
]

########################################
##     PIPELINE DEFINITION            ##
########################################

pipeline = Pipeline()

# components for Selected Routing
pipeline.add_component("s_prompt", ChatPromptBuilder(template=[ChatMessage.from_user(selector_prompt)]))
pipeline.add_component("s_lv", LengthValidator())
pipeline.add_component("s_llm", OpenAIChatGenerator(model="gpt-4o-mini", generation_kwargs={'temperature':0, 'seed':101, 'n':1, 'max_tokens':2000 }))
pipeline.add_component("s_router", ConditionalRouter(routes=routes))

# components for Generic Response
pipeline.add_component("g_prompt", ChatPromptBuilder(template=[ChatMessage.from_user(generic_response_prompt)]))
pipeline.add_component("g_lv", LengthValidator())
pipeline.add_component("g_llm", OpenAIChatGenerator(model="gpt-4o-mini", generation_kwargs={'temperature':0.5, 'seed':101, 'n':1, 'max_tokens':2000 }))

# components for DataFrame Context
pipeline.add_component("dc_generator", data_context_generator)
pipeline.add_component("dc_prompt_builder", ChatPromptBuilder(template=[ChatMessage.from_user(data_context_prompt)]))
pipeline.add_component("dc_lv", LengthValidator())
pipeline.add_component("dc_llm", OpenAIChatGenerator(model="gpt-4o-mini", generation_kwargs={'temperature':0.3, 'seed':101, 'n':1 , 'max_tokens':500}))

# components for RAG
pipeline.add_component("r_query_embedder", OpenAITextEmbedder())
pipeline.add_component("r_retriever", InMemoryEmbeddingRetriever(document_store=document_store))
pipeline.add_component("r_batch_generator", batch_generator)
pipeline.add_component("r_prompt_builder", ChatPromptBuilder(template=[ChatMessage.from_user(reduce_prompt)]))
pipeline.add_component("r_lv", LengthValidator())
pipeline.add_component("r_llm", OpenAIChatGenerator(model="gpt-4o-mini", generation_kwargs={'temperature':0.3, 'seed':101, 'n':1, 'max_tokens':2000 }))

# components for memory
pipeline.add_component("memory_retriever", memory_retriever)
pipeline.add_component("memory_writer", memory_writer)
pipeline.add_component("memory_joiner", BranchJoiner(List[ChatMessage]))

# connections for Selected routing
pipeline.connect("s_prompt", "s_lv")
pipeline.connect("s_lv.prompt", "s_llm.messages")
pipeline.connect("s_llm.replies", "s_router.replies")
pipeline.connect("s_router.go_to_rag", "r_query_embedder.text")
pipeline.connect("s_router.go_to_df", "dc_generator.user_asked_query")
pipeline.connect("s_router.go_to_generic", "g_prompt.query")

# connection for Generic Response
pipeline.connect("g_prompt", "g_lv")
pipeline.connect("g_lv.prompt", "g_llm.messages")
pipeline.connect("g_llm.replies", "memory_joiner")

# connections for RAG
pipeline.connect("r_query_embedder.embedding", "r_retriever.query_embedding")
pipeline.connect("r_retriever.documents", "r_batch_generator.documents")
pipeline.connect("r_batch_generator.replies", "r_prompt_builder.documents")
pipeline.connect("r_prompt_builder", "r_lv")
pipeline.connect("r_lv.prompt", "r_llm.messages")
pipeline.connect("r_llm.replies", "memory_joiner")

# connections for DataFrame Context
pipeline.connect("dc_generator.replies", "dc_prompt_builder.documents")
pipeline.connect("dc_prompt_builder", "dc_lv")
pipeline.connect("dc_lv.prompt", "dc_llm.messages")
pipeline.connect("dc_llm.replies", "memory_joiner")

# connections for memory
pipeline.connect("memory_joiner", "memory_writer")
pipeline.connect("memory_retriever", "s_prompt.memories")
pipeline.connect("memory_retriever", "dc_prompt_builder.memories")
pipeline.connect("memory_retriever", "r_prompt_builder.memories")
pipeline.connect("memory_retriever", "g_prompt.memories")

<haystack.core.pipeline.pipeline.Pipeline object at 0x765784312a50>
🚅 Components
  - s_prompt: ChatPromptBuilder
  - s_lv: LengthValidator
  - s_llm: OpenAIChatGenerator
  - s_router: ConditionalRouter
  - g_prompt: ChatPromptBuilder
  - g_lv: LengthValidator
  - g_llm: OpenAIChatGenerator
  - dc_generator: DataContextGenerator
  - dc_prompt_builder: ChatPromptBuilder
  - dc_lv: LengthValidator
  - dc_llm: OpenAIChatGenerator
  - r_query_embedder: OpenAITextEmbedder
  - r_retriever: InMemoryEmbeddingRetriever
  - r_batch_generator: BatchGenerator
  - r_prompt_builder: ChatPromptBuilder
  - r_lv: LengthValidator
  - r_llm: OpenAIChatGenerator
  - memory_retriever: ChatMessageRetriever
  - memory_writer: ChatMessageWriter
  - memory_joiner: BranchJoiner
🛤️ Connections
  - s_prompt.prompt -> s_lv.prompt (List[ChatMessage])
  - s_lv.prompt -> s_llm.messages (List[ChatMessage])
  - s_llm.replies -> s_router.replies (List[ChatMessage])
  - s_router.go_to_rag -> r_query_embedder.text (str)
  

In [23]:
# pipeline.show()

## 5. Conversational Bot

In [24]:
# system_message = ChatMessage.from_system(system_prompt)

# while True:
#     messages = [system_message,]
#     question = input("Enter your question or Q to exit.\n🧑 ")
#     if question=="Q":
#         break

#     res = pipeline.run(data={"s_prompt": {"query": question, "table_description":table_description, "field_descriptions":field_descriptions},
#                              "g_prompt": {"table_description":table_description, "field_descriptions":field_descriptions},
#                              "s_router": {"query": question},
#                              "dc_generator": {"df_list":[df,]},
#                              "dc_prompt_builder": {"query": question},
#                              "r_retriever": {"top_k": 50},
#                              "r_batch_generator": {"user_asked_query": question,},
#                              "r_prompt_builder": {"query": question},
#                              "memory_joiner": {"value": [ChatMessage.from_user(question)]}
#                              },
#                             include_outputs_from=['g_llm', 'dc_llm', 'r_llm', "s_prompt", "s_router",
#                                                   "dc_generator", "dc_prompt_builder",
#                                                   "r_retriever", "r_batch_generator", "r_prompt_builder", "memory_retriever"  ]
#                         )

#     llm_type = [key for key in ['g_llm', 'dc_llm', 'r_llm'] if key in res][0]
#     assistant_resp = res[llm_type]['replies'][0]
#     display(Markdown(f"{assistant_resp.content}"))

In [25]:
# res["memory_retriever"]["messages"]

## 6. Gradio Bot

In [26]:
messages = [ChatMessage.from_system(system_prompt)]

def chat(question, history):
    messages.append(ChatMessage.from_user(question))
    response = pipeline.run(data={"s_prompt": {"query": question, "table_description":table_description, "field_descriptions":field_descriptions},
                             "g_prompt": {"table_description":table_description, "field_descriptions":field_descriptions},
                             "s_router": {"query": question},
                             "dc_generator": {"df_list":[df,]},
                             "dc_prompt_builder": {"query": question},
                             "r_retriever": {"top_k": 50},
                             "r_batch_generator": {"user_asked_query": question,},
                             "r_prompt_builder": {"query": question},
                             "memory_joiner": {"value": [ChatMessage.from_user(question)]}
                             },
                            include_outputs_from=['g_llm', 'dc_llm', 'r_llm', "s_prompt", "s_router",
                                                  "dc_generator", "dc_prompt_builder",
                                                  "r_retriever", "r_batch_generator", "r_prompt_builder",
                                                   "memory_retriever" ]
                        )
    llm_type = [key for key in ['g_llm', 'dc_llm', 'r_llm'] if key in response][0]
    assistant_resp = response[llm_type]['replies']
    messages.extend(assistant_resp)

    return assistant_resp[0].content

In [27]:
demo = gr.ChatInterface(
    fn=chat,
    examples=[
        "What is the overall score?",
        "How are my brands performing?",
    ],
    title="Brand Pulse Bot",
    cache_examples=True,
    analytics_enabled=True,
    show_progress='full',
    fill_height=True,

)
demo.launch(share=True)

Using cache from '/workspaces/g_bot/gradio_cached_examples/15' directory. If method or examples have changed since last caching, delete this folder to clear cache.

Running on local URL:  http://127.0.0.1:7860
Running on public URL: https://93b0cbeeb1d167ac58.gradio.live

This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio deploy` from Terminal to deploy to Spaces (https://huggingface.co/spaces)


