# Gradio Web App

This notebooks allows to run the code to set up and launch the RC Assist WebApp.

The following tasks are performed:

- Load of the child and parent documents indexes into memory (for faster inference)
- Build the retrieval system to get the relevant context to answer the queries
- Build several components to enhance the chatbot: query reformulation/expansion, memory management, context refining/reranking
- Add response streaming capability to the chatbot

Once the code executed, the web application will expose the chatbot web interface via a URL.

Import the Dependencies

In [1]:
import os
import sys
sys.path.append("../")

from pathlib import Path
import nest_asyncio
import gradio as gr
from typing import List
import warnings

from dotenv import load_dotenv, find_dotenv

from src.constants import (
    INDEX_PATH,
    CHILD_DOCUMENTS_INDEX,
    PARENT_DOCUMENTS_INDEX,
    TOP_K_RETRIEVED_CHILDREN,
    TOP_K_RERANKED_PARENTS,
    TOP_K_REFS,
    MEMORY_WINDOW,
    TEMPERATURE_QUERY_EXPANSION,
    TEMPERATURE_RESPONSE 
)
from src.doc_utils import retrieve_from_keyword_index
from src.utils.text_utils import get_token_count

from llama_index.core import (
    StorageContext,
    load_index_from_storage,
    Settings
)
from llama_index.core.retrievers import VectorIndexRetriever
from llama_index.core.schema import NodeWithScore
from llama_index.llms.azure_openai import AzureOpenAI
from llama_index.embeddings.azure_openai import AzureOpenAIEmbedding
from llama_index.core.postprocessor import LLMRerank
from llama_index.core.llms import ChatMessage

warnings.filterwarnings("ignore")
nest_asyncio.apply()

load_dotenv(find_dotenv())



  from .autonotebook import tqdm as notebook_tqdm


True

Set up the Models

In [2]:
# Model Config
model_info = {
    "AZURE_OPENAI_RESOURCE": os.environ.get("AZURE_OPENAI_RESOURCE"),
    "AZURE_OPENAI_KEY": os.environ.get("AZURE_OPENAI_KEY"),
    "AZURE_OPENAI_MODEL_VISION": os.environ.get("AZURE_OPENAI_MODEL_VISION"),
    "AZURE_OPENAI_MODEL": os.environ.get("AZURE_OPENAI_MODEL"),
    "AZURE_OPENAI_API_VERSION": os.environ.get("AZURE_OPENAI_API_VERSION"),
}

embed_model_info = {
    "AZURE_OPENAI_EMBEDDING_MODEL_RESOURCE": os.environ.get("AZURE_OPENAI_RESOURCE"),
    "AZURE_OPENAI_EMBEDDING_MODEL_RESOURCE_KEY": os.environ.get("AZURE_OPENAI_EMBEDDING_MODEL_RESOURCE_KEY"),
    "AZURE_OPENAI_EMBEDDING_MODEL": os.environ.get("AZURE_OPENAI_EMBEDDING_MODEL"),
    "AZURE_OPENAI_EMBEDDING_MODEL_API_VERSION": os.environ.get("AZURE_OPENAI_EMBEDDING_MODEL_API_VERSION"),
}

# Main LLM
model = AzureOpenAI(
    model=model_info["AZURE_OPENAI_MODEL"],
    deployment_name=model_info["AZURE_OPENAI_MODEL"],
    api_key=model_info["AZURE_OPENAI_KEY"],
    azure_endpoint=f"https://{model_info['AZURE_OPENAI_RESOURCE']}.openai.azure.com/",
    api_version=model_info["AZURE_OPENAI_API_VERSION"],
    temperature=TEMPERATURE_RESPONSE
)

# Embedding model
embed_model = AzureOpenAIEmbedding(
    model=embed_model_info["AZURE_OPENAI_EMBEDDING_MODEL"],
    deployment_name=embed_model_info["AZURE_OPENAI_EMBEDDING_MODEL"],
    api_key=embed_model_info["AZURE_OPENAI_EMBEDDING_MODEL_RESOURCE_KEY"],
    azure_endpoint=f"https://{embed_model_info['AZURE_OPENAI_EMBEDDING_MODEL_RESOURCE']}.openai.azure.com/",
    api_version=embed_model_info["AZURE_OPENAI_EMBEDDING_MODEL_API_VERSION"]
)

# LLM for Query Expansion
query_expansion_model = AzureOpenAI(
    model=model_info["AZURE_OPENAI_MODEL"],
    deployment_name=model_info["AZURE_OPENAI_MODEL"],
    api_key=model_info["AZURE_OPENAI_KEY"],
    azure_endpoint=f"https://{model_info['AZURE_OPENAI_RESOURCE']}.openai.azure.com/",
    api_version=model_info["AZURE_OPENAI_API_VERSION"],
    temperature=TEMPERATURE_QUERY_EXPANSION
)

Settings.embed_model = embed_model
Settings.llm = model

Load the child and parent documents indexes

In [3]:
try:
    child_index_path = Path(INDEX_PATH) / CHILD_DOCUMENTS_INDEX
    children_storage_context = StorageContext.from_defaults(persist_dir=child_index_path)
    child_documents_index = load_index_from_storage(children_storage_context)
    print("Index %s loaded successfully!", child_index_path.stem)
except (IOError, OSError) as e:
    print(
        "Failed to load index %s. The following error occurred:\n%s", child_index_path.stem, e
    )

try:
    parent_index_path = Path(INDEX_PATH) / PARENT_DOCUMENTS_INDEX
    parents_storage_context = StorageContext.from_defaults(persist_dir=parent_index_path)
    parent_documents_index = load_index_from_storage(parents_storage_context)
    print("Index %s loaded successfully!", parent_index_path.stem)
except (IOError, OSError) as e:
    print(
        "Failed to load index %s. The following error occurred:\n%s", parent_index_path.stem, e
    )

Index %s loaded successfully! .child_documents
Index %s loaded successfully! .parent_documents


Define Retiever and Reranker.

In [4]:
# Child Documents Retriever
child_documents_retriever = VectorIndexRetriever(
    index=child_documents_index,
    similarity_top_k=TOP_K_RETRIEVED_CHILDREN
)

# Reranker (to rank the relevant documents by descending similarity score)
reranker = LLMRerank(
    choice_batch_size=5,
    top_n=TOP_K_RERANKED_PARENTS
)

Build the Chatbot.

In [5]:
#######################################################################
# 1) Token-limit tools
#######################################################################
def limit_chat_history(history, max_tokens=MEMORY_WINDOW):
    """
    Truncate conversation history from the front if the total tokens exceed max_tokens.
    Each message in the history is expected to be a dictionary with keys "role" and "content".
    The function iterates backwards (i.e. from the most recent messages) and accumulates messages
    until adding another would exceed max_tokens, then returns the surviving messages in the correct order.
    """
    total_tokens = 0
    truncated_history = []
    for message in reversed(history):
        msg_tokens = get_token_count(message["content"])
        if total_tokens + msg_tokens > max_tokens:
            break
        truncated_history.append(message)
        total_tokens += msg_tokens
    truncated_history.reverse()  # Restore chronological order.
    return truncated_history

###################
# 2) Streaming generator
###################
def stream_llm_output(llm, messages):
    """Yields tokens from llm.stream_chat(messages)."""
    for token in llm.stream_chat(messages):
        yield token.delta

def run_streaming(llm, messages):
    """Accumulate tokens into a response, yielding partial outputs."""
    response = ""
    for token in stream_llm_output(llm, messages):
        response += token
        yield response

###################
# 3) Rewrite Query
###################
def rewrite_query(user_query: str, conversation_history_str: str = "") -> str:
    """
    Enhanced rewrite of the user's query to:
      1) Identify multiple sub-questions (if any).
      2) Rewrite each sub-question as a stand-alone query, incorporating any
         context from the previous conversation (e.g., clarifying pronouns).
      3) Return them as bullet points, each bullet containing a rewritten sub-question.
    
    The final string might have multiple lines if multiple sub-questions exist.
    If there's only one question, returns a single bullet with that question.

    Incorporates examples (BERT, Bill Clinton) to demonstrate referencing prior context.
    """

    # SYSTEM MESSAGE: instruct the LLM on how to rewrite
    system_message = (
        "You are able to reason from previous conversation and the recent question, "
        "to come up with a rewrite that is self-contained.\n"
        "Given the conversation and the user's new query:\n"
        "1) Identify multiple sub-questions (if any).\n"
        "2) Rewrite each sub-question as a stand-alone query, incorporating any context from the previous conversation (e.g., clarifying pronouns).\n"
        "3) Return them as bullet points, each bullet containing a rewritten sub-question.\n\n"
        "A few examples:\n\n"
        "# Example 1\n"
        "## Previous conversation\n"
        "user: Who is Bill Clinton?\n"
        "assistant: Bill Clinton is an American politician who served as the 42nd President of the United States.\n"
        "## New question\n"
        "user: When was he born?\n"
        "## Rewritten question\n"
        "- When was Bill Clinton born?\n\n"
        "# Example 2\n"
        "## Previous conversation\n"
        "user: What is BERT?\n"
        "assistant: BERT stands for \"Bidirectional Encoder Representations from Transformers.\" "
        "It is a natural language processing (NLP) model developed by Google.\n"
        "user: What data was used for its training?\n"
        "assistant: The BERT model was trained on a large corpus of publicly available text from the internet.\n"
        "## New question\n"
        "user: How else can I apply it?\n"
        "## Rewritten question\n"
        "- How can I apply the BERT model to other tasks?\n"
        "Example 3:\n"
        "User query: 'How much will the temperature rise by 2100 and what are the main causes?'\n"
        "Rewrite:\n"
        "- How much will the temperature rise by 2100?\n"
        "- What are the main causes of that temperature rise?\n\n"
        "If there is only one question, provide one bullet. If there are multiple sub-questions, each should be a separate bullet.\n"
    )

    # USER PROMPT: feed the conversation and new query
    user_prompt = f"""
    ## Previous conversation
    {conversation_history_str}

    ## New question
    {user_query}

    ## Rewritten question (please list bullets if multiple)
    """

    messages = [
        ChatMessage(role="system", content=system_message),
        ChatMessage(role="user", content=user_prompt),
    ]

    # Use the same query_expansion_model (or whichever LLM you want) to perform rewriting
    response = query_expansion_model.chat(messages)
    rewritten = response.message.content.strip()

    # Fallback if the model returned nothing
    if not rewritten:
        return f"- {user_query}"

    return rewritten

###################
# 4) Query Expansion
###################
def parse_expanded_queries(response_text: str, max_queries: int = 5) -> List[str]:
    """
    Parse the LLM output to extract candidate expanded queries from the section
    labeled "Step 3 - Expanded Queries:" and return up to 'max_queries' queries.
    Expected format in LLM output (for example):
        Step 3 - Expanded Queries:
        - query 1
        - query 2
        ...
    If nothing is found, returns an empty list.
    """
    marker = "Step 3"
    start = response_text.find(marker)
    if start == -1:
        return []
    substring = response_text[start:]
    lines = substring.splitlines()
    candidate_queries = []
    capture = False
    for line in lines:
        if "Expanded Queries:" in line:
            capture = True
            continue
        if capture:
            stripped_line = line.strip()
            if stripped_line:
                if stripped_line.startswith("-"):
                    stripped_line = stripped_line.lstrip("- ").strip()
                candidate_queries.append(stripped_line)
    return candidate_queries[:max_queries]

def expand_query(user_query: str) -> List[str]:
    """
    Generates expansions for each sub-question found in the user's rewritten query.
    1) Parses the bullet points from 'user_query' (which may contain multiple sub-questions).
    2) For each sub-question, we call the LLM to produce expanded variants or synonyms.
    3) Returns a combined list of expansions from all sub-questions.
    """

    # Split 'user_query' by lines/bullets. We assume each bullet starts with '-' or '*'.
    sub_questions = []
    for line in user_query.splitlines():
        # Trim and check if it starts with a bullet
        line_strip = line.strip("-* \t")
        if line_strip:
            sub_questions.append(line_strip)

    # If nothing parsed, treat the entire user_query as one question
    if not sub_questions:
        sub_questions = [user_query]

    all_expanded = []

    # We'll reuse the same system prompt structure for each sub-question,
    # but you can modify this as needed.
    system_message_template = (
        "You are an expert assistant in query expansion. Your task:\n"
        "1. Take the sub-question provided.\n"
        "2. Identify synonyms or semantically related terms.\n"
        "3. Return final expansions under 'Step 3 - Expanded Queries:' as a bullet list.\n"
        "Do not add external info. Use only the sub-question."
    )

    for sq in sub_questions:
        user_prompt = f"""
        Sub-question to expand:
        "{sq}"

        Instructions:
        - Identify sub-questions if any remain.
        - Expand each key concept with synonyms or related terms.
        - Write them under "Step 3 - Expanded Queries:" as a concise bullet list.
        """

        messages = [
            ChatMessage(role="system", content=system_message_template),
            ChatMessage(role="user", content=user_prompt),
        ]

        response = query_expansion_model.chat(messages)
        response_text = response.message.content

        # Parse expansions from this single sub-question
        candidate_queries = parse_expanded_queries(response_text, max_queries=5)

        # If no expansions are found, fallback to original
        if not candidate_queries:
            candidate_queries = [sq]

        # Collect expansions
        all_expanded.extend(candidate_queries)

    return all_expanded

###################
# Helper: Convert internal history to Chatbot tuples for Gradio interface
###################
def convert_history_to_tuples(history: List[dict]) -> List[tuple]:
    """
    Converts our internal conversation history (a list of dictionaries) into
    the format expected by Gradio Chatbot: a list of (user, assistant) tuples.
    """
    output = []
    temp = None
    for msg in history:
        if msg["role"] == "user":
            temp = msg["content"]
        elif msg["role"] == "assistant":
            # If a user message exists, pair it with the assistant message.
            if temp is not None:
                output.append((temp, msg["content"]))
                temp = None
            else:
                output.append(("", msg["content"]))
    # In case the last message is from the user without a response.
    if temp is not None:
        output.append((temp, ""))
    return output

###################
# 5) Main conversation function
###################
def chat_conversation(user_message, history):
    """
    Orchestrates the chat conversation by:
      - Maintaining a conversation history (as a list of dictionaries).
      - Expanding the user's query.
      - Retrieving, de-duplicating, and re-ranking relevant documents.
      - Constructing a final prompt with conversation history and retrieved context.
      - Streaming the LLM response and appending document references.
    The output is converted to a list of (user, assistant) tuples as required by the Gradio Chatbot.
    """
    # If history comes in as tuples, convert it to our internal dictionary format.
    if history is None:
        history = []
    elif len(history) > 0 and isinstance(history[0], (list, tuple)):
        new_history = []
        for tup in history:
            if isinstance(tup, (list, tuple)) and len(tup) == 2:
                new_history.append({"role": "user", "content": tup[0]})
                new_history.append({"role": "assistant", "content": tup[1]})
            else:
                new_history.append(tup)
        history = new_history

    # Append the user's message.
    history.append({"role": "user", "content": user_message})
    # Limit the history based on the token budget.
    history = limit_chat_history(history, max_tokens=MEMORY_WINDOW)
    # Build a textual summary for query expansion.
    conversation_history_str = ""
    for msg in history:
        conversation_history_str += f"{msg['role'].capitalize()}: {msg['content']}\n"

    # Rewrite the user's query.
    rewritten_query = rewrite_query(
        user_query=user_message,
        conversation_history_str=conversation_history_str
    )

    # Expand the user's query.
    candidate_queries = expand_query(
        user_query=rewritten_query
    )
    print(f"Candidate Expanded Queries: {candidate_queries}")
    # Retrieve child nodes for each candidate query.
    retrieved_child_nodes = []
    for cq in candidate_queries:
        retrieved_child_nodes.extend(child_documents_retriever.retrieve(cq))
    # Retrieve parent documents and de-duplicate.
    raw_parents = []
    for node in retrieved_child_nodes:
        parent_node = retrieve_from_keyword_index(
            index=parent_documents_index,
            key="name",
            value=node.metadata["parent"]
        )
        if parent_node:
            raw_parents.append(parent_node[0])
    unique_parents = []
    seen_names = set()
    for p in raw_parents:
        name = p.metadata.get("name")
        if name not in seen_names:
            unique_parents.append(p)
            seen_names.add(name)
    # Rerank the parent documents.
    parents_with_score = [NodeWithScore(node=p, score=0.0) for p in unique_parents]
    combined_query_str = user_message + " | " + " | ".join(candidate_queries)        # alternatively, if we can rerank according to the user_message. This way we will display higher relevance scores for the reference docs. But, we will not be covering all the aspects of the question (sub-queries).
    reranked_parents_with_score = reranker.postprocess_nodes(
        parents_with_score,
        query_str=combined_query_str
    )
    reranked_parents = [nws.node for nws in reranked_parents_with_score]

    # Build the retrieved context and references strings.
    retrieved_context = "\n".join(node.text for node in reranked_parents)
    
    # Build the references to display the final 'score' (relevance)
    unique_ref_node_with_score = []
    seen_doc_page = set()

    for nws in reranked_parents_with_score:
        # Retrieve the nested metadata
        source_dict = nws.node.metadata.get("source", {})
        doc_name = source_dict.get("document")
        page_num = source_dict.get("page")

        # Check if this (document, page) was seen before
        if (doc_name, page_num) not in seen_doc_page:
            unique_ref_node_with_score.append(nws)
            seen_doc_page.add((doc_name, page_num))

    # Now build the references from the unique NodeWithScore objects, 
    # slicing to your desired TOP_K_REFS if needed.
    references = "\n\n\n".join(
        f"{idx}.   {nws.node.metadata.get('source', {}).get('document')}   -   "
        f"Page {nws.node.metadata.get('source', {}).get('page')}\n"
        # f"Relevance:   {int(nws.score)}%"
        for idx, nws in enumerate(unique_ref_node_with_score[:TOP_K_REFS], 1)
    )

    # Build a conversation summary (exclude the last user message).
    conversation_summary_str = ""
    for message in history[:-1]:
        conversation_summary_str += f"{message['role'].capitalize()}: {message['content']}\n"
    current_user_question = history[-1]["content"]
    # Construct the final system and user prompts.
    system_message = (
        "You are a helpful and factual assistant. Provide clear, concise, and accurate answers "
        "based solely on the provided context and conversation history. If the context and "
        "history do not include sufficient information, state 'No relevant details are available.' "
        "Do not invent or fabricate information, and do not refer explicitly to the retrieval process."
    )
    user_prompt = (
        "Below are the conversation history, the user's current question, and the retrieved context "
        "from our knowledge base. The retrieved context may include relevant text passages, tables "
        "in Markdown format, or image descriptions.\n\n"
        f"Conversation History:\n{conversation_summary_str}\n\n"
        f"User Question:\n{current_user_question}\n\n"
        f"Retrieved Context:\n{retrieved_context}\n\n"
        "Please provide a clear and integrated answer covering all parts of the user's question. "
        "If there are multiple sub-questions, address each one. Do not reference the retrieval process or the provided context. "
        "If there is insufficient information, say: 'No relevant details are available.' "
        "Respond in English."
    )
    messages = [
        ChatMessage(role="system", content=system_message),
        ChatMessage(role="user", content=user_prompt),
    ]
    # Append an empty assistant message before streaming.
    history.append({"role": "assistant", "content": ""})
    # Stream the LLM response in partial chunks.
    for chunk in run_streaming(model, messages):
        history[-1]["content"] = chunk
        yield convert_history_to_tuples(history)
    # Once streaming is complete, append references unless the answer starts with a trigger phrase.
    partial_answer = history[-1]["content"]
    trigger_phrases = ("no relevant details are available", "no information is available.", "i'm sorry", "i am sorry", "sorry")
    if partial_answer.strip().lower().startswith(trigger_phrases):
        final_answer = partial_answer
    else:
        final_answer = partial_answer + "\n\n---\nReferences:\n\n" + references
    history[-1]["content"] = final_answer
    yield convert_history_to_tuples(history)

###################
# 6) Reset conversation function
###################
def reset_conversation():
    """
    Return an empty list for the chatbot (no history)
    and an empty string for the user input box.
    """
    return [], ""


Create the Chatbot Interface.

In [6]:
###################
# 7) Gradio UI
###################
with gr.Blocks() as demo:
    gr.Markdown("# RC Chat Assistant\n\nA Gen AI-Powered Solution for Resource Managers, Coordinators & Assistants.")
    chatbot = gr.Chatbot(
        height=600,
        label="RC Chat Assistant"
    )
    user = gr.Textbox(
        label="User",
        show_label=True,
        placeholder="Message RC Chat Assistant here and press Enter...",
        lines=1,
    )
    submit_button = gr.Button("Submit")
    reset_button = gr.Button("Reset Conversation")
    # On submit, call chat_conversation (generator) to stream partial answers
    submit_button.click(
        fn=chat_conversation,
        inputs=[user, chatbot],
        outputs=chatbot,
        queue=True,
    )
    # Also allow pressing 'Enter' in the textbox
    user.submit(
        fn=chat_conversation,
        inputs=[user, chatbot],
        outputs=chatbot,
        queue=True,
    )
    # Reset button clears the chatbot and the textbox
    reset_button.click(
        fn=reset_conversation,
        inputs=[],
        outputs=[chatbot, user],
        queue=False,
    )

# Launch the WebApp
print("Launching the RC Chatbot Assistant ...")
demo.launch()

Launching the RC Chatbot Assistant ...
* Running on local URL:  http://127.0.0.1:7860

To create a public link, set `share=True` in `launch()`.




Candidate Expanded Queries: ['What does SAP stand for?', 'What is the meaning of SAP?', 'What is SAP software?', 'What is SAP ERP?', 'What is the purpose of SAP?']
Candidate Expanded Queries: ['"What is Material Requirements Planning (MRP) in the context of SAP ERP?"', '"How does Material Requirements Planning (MRP) function in SAP systems?"', '"What is the role of MRP in SAP supply chain management?"', '"How is MRP used in SAP for production planning and inventory control?"', '"What are the features of Material Requirements Planning in SAP software?"', '"How does Material Requirements Planning (MRP) function within SAP to handle material planning and inventory management?"', '"What is the process of MRP in SAP for managing supply chain planning and stock control?"', '"How does SAP\'s MRP system operate to optimize material procurement and inventory levels?"', '"What role does MRP play in SAP for streamlining material scheduling and warehouse management?"', '"How does SAP use MRP to su