In [None]:
!pip install --upgrade "langchain>=0.1.0" langchain-community "arize-phoenix[evals]" nest-asyncio pyarrow python-docx pydantic phoenix requests pyprojroot

In [None]:
# --- Standard Library ---
import html
import json
import os
import re
import warnings
from typing import Any, Dict
from urllib.request import urlopen

# --- Core Third-Party Libraries ---
import nest_asyncio
import numpy as np
import pandas as pd
import requests
from docx import Document
from pydantic import BaseModel, Field
from pyprojroot import here
from tqdm import tqdm
from typing import List

# --- Langchain and Langchain Community ---
from langchain.chains import RetrievalQA
from langchain.chat_models import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain.schema import SystemMessage, HumanMessage, Document as LangchainDocument
from langchain.utilities import SQLDatabase
from langchain_community.agent_toolkits import create_sql_agent
from langchain_community.utilities import SQLDatabase as CommunitySQLDatabase
from langchain_core.prompts import (
    ChatPromptTemplate as CoreChatPromptTemplate,
    SystemMessagePromptTemplate,
    HumanMessagePromptTemplate,
    MessagesPlaceholder,
)
from langchain_core.runnables import RunnableSerializable
from langchain_openai import AzureChatOpenAI

# --- Phoenix, OpenInference & OpenTelemetry (for tracing and evals) ---
import phoenix as px
from phoenix.evals import (
    HALLUCINATION_PROMPT_RAILS_MAP,
    HALLUCINATION_PROMPT_TEMPLATE,
    QA_PROMPT_RAILS_MAP,
    QA_PROMPT_TEMPLATE,
    TOXICITY_PROMPT_TEMPLATE,
    HallucinationEvaluator,
    OpenAIModel,
    QAEvaluator,
    RelevanceEvaluator,
    llm_classify,
    run_evals,
)
from phoenix.otel import register
from phoenix.session.evaluation import get_qa_with_reference, get_retrieved_documents
from phoenix.trace import DocumentEvaluations, SpanEvaluations
from phoenix.trace.dsl import SpanQuery
from openinference.instrumentation.langchain import LangChainInstrumentor
from opentelemetry import trace
from opentelemetry.instrumentation.requests import RequestsInstrumentor

# --- Runtime Configuration ---
warnings.filterwarnings('ignore')
nest_asyncio.apply()

In [None]:
def initialize_llm() -> AzureChatOpenAI:
    """Initializes the AzureChatOpenAI model."""
    try:
        llm = AzureChatOpenAI(
            model="<your-model-name>",
            azure_endpoint="https://<your-azure-openai-endpoint>",
            api_version="<your-azure-api-version>",
            api_key="<your-azure-api-key>",
            azure_deployment="<your-azure-deployment-name>"
        )
        return llm
    except Exception as e:
        raise RuntimeError(f"\n❌ Failed to initialize LLM: {e}")

In [None]:
def extract_questions_from_doc(doc_path: str) -> List[str]:
    """Extracts non-empty paragraphs (questions) from a Word document."""
    if not os.path.exists(doc_path):
        raise FileNotFoundError(f"Document not found: {doc_path}")
    try:
        doc = Document(doc_path)
        return [para.text.strip() for para in doc.paragraphs if para.text.strip()]
    except Exception as e:
        raise RuntimeError(f"\n❌ Error reading document: {e}")

In [None]:
def generate_variations(llm_chain, questions: List[str]) -> List[str]:
    """Uses the LLM chain to generate question variations."""
    variations = []

    for question in tqdm(questions, desc="Generating Questionnaire Variations...", unit="question"):
        prompt_text = (
            f"Generate all possible positive and negative variations of the question: '{question}'. "
            "Only output the questions. Do not include any explanations or answers or serial numbering or dashes."
        )
        try:
            result = llm_chain.invoke(prompt_text)
            content = html.unescape(result.content).encode().decode("unicode_escape")
            variations.extend(filter(None, content.split("\n")))
            variations.append(question)  # Include original
        except Exception as e:
            print(f"\n❌ Error processing question: '{question}': {e}")

    return list(set(variations))  # Remove duplicates

In [None]:
# Path to the Word document containing sample questions.
# The user should keep example/sample questions in this document,
# from which additional question variations will be generated automatically.
doc_path = "C:/Users/Desktop/Questionnaire.docx"
try:
    # Initialize the Azure-hosted OpenAI LLM
    llm = initialize_llm()
    
    # Create a chat prompt with a system and human message template
    prompt = ChatPromptTemplate.from_messages([
        ("system", "You are a helpful assistant."),
        ("human", "{text}")
    ])

    # Create a simple prompt-to-LLM pipeline (LangChain chain)
    chain = prompt | llm

    # Extract questions from the Word document
    questions = extract_questions_from_doc(doc_path)

    # Generate variations for each question using the LLM chain
    all_variations = generate_variations(chain, questions)

    # Output the generated question variations
    print("\nGenerated Variations:\n" + "-"*50)
    for variation in all_variations:
        print(variation)
        
except Exception as e:
    print(f"\n❌ An unexpected error occurred: {e}")

In [None]:
# --- Path to the SQLite Database ---

db_path = "C:/path/to/your/database/MySQLDatabase.db"

# --- Load the SQLite database using LangChain's SQLDatabase utility ---
db = SQLDatabase.from_uri(f"sqlite:///{db_path}")

# --- Custom prompt instructions to guide the LLM's SQL generation and response formatting ---
custom_prompt_instructions = """
You are a SQLite expert and a data chatbot. You take in a user query, translate it into SQL, and output several pieces of metadata that help visualize the SQL output.
"""

# --- Create ChatPromptTemplate ---
# The "openai-tools" agent type will automatically get tool descriptions.
# We provide the main system instructions, the human input, and a placeholder for agent working steps.
prompt = ChatPromptTemplate.from_messages(
    [
        SystemMessagePromptTemplate.from_template(custom_prompt_instructions), # System context
        HumanMessagePromptTemplate.from_template("{input}"), # This is where user's question goes
        MessagesPlaceholder(variable_name="agent_scratchpad"), # Tracks agent's reasoning steps
    ]
)

agent_executor = create_sql_agent(
    llm,                       # Previously initialized LLM
    db=db,                     # SQLDatabase connection to SQLite
    agent_type="openai-tools", # Agent type that supports tool-use reasoning
    verbose=True,              # Enables detailed logging/output
    prompt=prompt              # Custom prompt template for SQL reasoning
)

# --- Store variations and responses for traceability and evaluation ---
ref_inputs = []    # User input variations
ref_outputs = []   # Agent-generated answers

# --- Loop through the question variations and get responses from the SQL agent ---
for variation in all_variations[:1]:
    response = agent_executor.invoke({"input": variation})
    ref_inputs.append(variation)  # input to agent
    ref_outputs.append(response["output"])  # agent's SQL-based answer

# --- Create a Pandas DataFrame containing 'input' and 'reference' fields, as required by Arize Phoenix for evaluation ---
ref_df = pd.DataFrame({"input": ref_inputs, "reference": ref_outputs})

In [None]:
# Print the DataFrame containing inputs and reference outputs for review
ref_df

In [None]:
# Initialize and register the OpenTelemetry tracer 
tracer_provider = register()

# Instrument LangChain library with the tracer provider to enable automatic tracing of LangChain operations
# skip_dep_check=True bypasses dependency checks to ensure instrumentation proceeds without interruptions
LangChainInstrumentor(tracer_provider=tracer_provider).instrument(skip_dep_check=True)

In [None]:
# Launch and start a Phoenix session for interactive exploration and visualization of evaluation data
# The web app by default runs on port 6006
session = px.launch_app()

In [None]:
# Initialize an OpenTelemetry tracer named "langchain"
tracer = trace.get_tracer("langchain")


class RequestsChain(RunnableSerializable):
    # Define the API endpoint for chatbot requests
    endpoint: str = Field(..., description="API endpoint for chatbot requests")
    
    @tracer.chain  # Instrument the invoke method to capture tracing data for observability
    def invoke(self, question) -> Dict[str, Any]:
        """
        Sends a POST request with the input question, processes the response,
        extracts answers from HTML content, and returns cleaned answer texts.

        Args:
            question (str): The user question to send to the API.

        Returns:
            Dict[str, Any]: A list of extracted answers or an error dictionary if failed.
        """
        try:
            input = {"ques_text": question}
            response = requests.post(self.endpoint, data=json.dumps(input)) # Make POST request to the configured endpoint with JSON payload
            response.raise_for_status()  # Ensure any HTTP errors raise exceptions
            data = response.json() # Parse JSON response from the API
            answers = []
            # Loop through the items under the key "0" in the response data
            for item in data["0"]:              
                if item["type"] == "Text":
                    html = item["value"]
                    text = re.sub(r'<[^>]+>', '', html) # Strip HTML tags to get plain text
                    match = re.search(r'Answer:\s*(.*)', text, re.DOTALL) # Extract the part of the text after "Answer:" keyword
                    if match:
                        answer_text = match.group(1).strip() # Clean extracted answer text
                        answers.append(answer_text)
            return answers # Return the list of cleaned answers
        except Exception as e:  # On failure, return error message along with the original input for debugging
            return {"error": str(e), "input": input}

In [None]:
# Replace <your-api-domain> and <your-api-endpoint> with your actual API base URL and path
API_URL = "https://<your-api-domain>/<your-api-endpoint>" # Define the chatbot API endpoint
chain = RequestsChain(endpoint=API_URL) # Create an instance of the RequestsChain with the specified endpoint
chain_type = "stuff" # Define the type of chain being used (here "stuff" is a placeholder type)
chain_metadata={"application_type": "question_answering"} # Additional metadata about the chain, useful for tracking in observability tools like Arize Phoenix

# Loop through the first 5 variations and invoke the chain for each
# tqdm is used to show a progress bar during iteration
for variation in tqdm(all_variations):
    chain.invoke(variation)

In [None]:
# Query all CHAIN span inputs from Phoenix traces
# Selects the span_id and the input values associated with the chain-level spans
query = SpanQuery().where("span_kind == 'CHAIN'").select(
    span_id="context.span_id", input="input.value"
)
input_df = pd.DataFrame(px.Client().query_spans(query))

# Query all CHAIN span outputs
# Selects the span_id and the output values from the same chain-level spans
query = SpanQuery().where("span_kind == 'CHAIN'").select(
    span_id="context.span_id", output="output.value"
)
output_df = pd.DataFrame(px.Client().query_spans(query))

# Merge input and output DataFrames on span_id
# This creates a single DataFrame that contains both input and output for each traceable span using context.span_id as the join key
queries_df = pd.merge(input_df, output_df, on="context.span_id", how="outer")

# Display the queries_df DataFrame combining context.span_id, inputs, outputs
queries_df

In [None]:
# Ensure 'context.span_id' is a column in the DataFrame; reset index if it's currently the index
if "context.span_id" not in queries_df.columns:
    queries_df = queries_df.reset_index()

# Merge with reference DataFrame to attach the ground truth (reference answers)
# Arize Phoenix requires the following columns for evaluation:
# - 'context.span_id' : unique identifier for each trace/span
# - 'input'           : the user query or prompt
# - 'output'          : the model's predicted response
# - 'reference'       : the ground truth answer used for evaluation metrics
merged_df = pd.merge(queries_df, ref_df, on="input")

In [None]:
# DataFrame containing merged data, combining context.span_id, inputs, outputs, and references for evaluation or analysis
merged_df

In [None]:
# Initialize the OpenAIModel for evaluation using Azure OpenAI deployment
# This model will be used to compare model outputs against reference answers

eval_model = OpenAIModel(
    model="<your-model-name>",
    azure_endpoint="https://<your-azure-openai-endpoint>",
    api_version="<your-azure-api-version>",
    api_key="<your-azure-api-key>",
    azure_deployment="<your-azure-deployment-name>"
)

In [None]:
# Set the index to 'context.span_id' for traceability during evaluations
merged_df = merged_df.set_index("context.span_id")

# Perform QA Correctness evaluation using LLM classification
qa_correctness_eval = llm_classify(
    dataframe=merged_df,
    model=eval_model,  # The evaluation model (e.g., GPT-4o via Azure)
    template=QA_PROMPT_TEMPLATE,  # Prompt template for evaluating correctness
    rails=list(QA_PROMPT_RAILS_MAP.values()),  # Expected answer categories (e.g., Correct, Incorrect)
    provide_explanation=True,  # Ask LLM to explain its reasoning for transparency
    concurrency=4  # Run 4 evaluations concurrently for performance
)

# Perform Hallucination evaluation using LLM classification
hallucination_eval = llm_classify(
    dataframe=merged_df,
    model=eval_model,
    template=HALLUCINATION_PROMPT_TEMPLATE,  # Prompt template for hallucination detection
    rails=list(HALLUCINATION_PROMPT_RAILS_MAP.values()),  # Expected outputs like Hallucinated, Factual
    provide_explanation=True,
    concurrency=4
)

# Log the evaluations back into Arize Phoenix using the Phoenix client
px.Client().log_evaluations(
    SpanEvaluations(eval_name="Hallucination", dataframe=hallucination_eval),
    SpanEvaluations(eval_name="QA Correctness", dataframe=qa_correctness_eval)
)