In [None]:
# Imports
import os
import re
from collections import Counter
from typing import cast
from datasets import load_dataset, Dataset
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_openai import ChatOpenAI
import torch
import string
import pandas as pd
from dotenv import load_dotenv


In [2]:
# Load or set environment variables
# os.environ["OPENAI_API_KEY"] = ""
load_dotenv()

True

In [3]:
# Config (tweak these)
CHUNK_SIZE = 1000
CHUNK_OVERLAP = 200
K_RETRIEVE = 5
EMBEDDING_MODEL = "BAAI/bge-large-en-v1.5"
CHAT_MODEL = "gpt-4o-mini"

# Check for GPU
device = "cuda" if torch.cuda.is_available() else "cpu"
print("Using device:", device)


Using device: cpu


In [4]:
# Load HotpotQA: use TRAIN for corpus, VALIDATION for evaluation
print("Loading HotpotQA...")
ds_train = cast(Dataset, load_dataset("hotpot_qa", "fullwiki", split="train", streaming=False)) # cast to Dataset to avoid pylance error
ds_val = cast(Dataset, load_dataset("hotpot_qa", "fullwiki", split="validation", streaming=False))

print("Train size:", len(ds_train))
print("Validation size:", len(ds_val))


Loading HotpotQA...
Train size: 90447
Validation size: 7405


In [5]:
# Build a corpus from training and validation sets
corpus_rows = []
for example in ds_train:
    titles = example["context"]["title"]
    sentences_lists = example["context"]["sentences"]
    for title, sents in zip(titles, sentences_lists):
        paragraph_text = " ".join(sents)
        corpus_rows.append({"title": title, "text": paragraph_text})

for example in ds_val:
    titles = example["context"]["title"]
    sentences_lists = example["context"]["sentences"]
    for title, sents in zip(titles, sentences_lists):
        paragraph_text = " ".join(sents)
        corpus_rows.append({"title": title, "text": paragraph_text})

# Remove duplicates
unique_seen = set()
unique_rows = []
for row in corpus_rows:
    clean_text = re.sub(r"\s+", " ", row["text"]).strip().lower()
    key = (row["title"], clean_text)
    if key not in unique_seen:
        unique_seen.add(key)
        unique_rows.append({"title": row["title"], "text": row["text"]})

corpus_rows = unique_rows
print("Paragraphs:", len(corpus_rows))


Paragraphs: 508826


In [6]:
# Chunk with RecursiveCharacterTextSplitter
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=CHUNK_SIZE,
    chunk_overlap=CHUNK_OVERLAP,
)

texts, metas = [], []
for r in corpus_rows:
    chunks = text_splitter.split_text(r['text'])
    texts.extend(chunks)
    metas.extend([{"title": r['title']} for _ in chunks])

print("Chunks indexed:", len(texts))


Chunks indexed: 533179


In [7]:
# Build or load FAISS vector store (TODO: move this and the code b4 to a separate script to reuse later)
embedding_model = HuggingFaceEmbeddings(
    model_name=EMBEDDING_MODEL, 
    model_kwargs={"device": device}, # Use GPU if available
    encode_kwargs={"normalize_embeddings": True}, 
    # show_progress=True
)

if os.path.exists("faiss_hotpotqa"):
    print("Loading existing FAISS vector store from faiss_hotpotqa...")
    vector_store = FAISS.load_local("faiss_hotpotqa", embedding_model, allow_dangerous_deserialization=True)
else:
    print("Creating new FAISS vector store...")
    vector_store = FAISS.from_texts(
        texts,
        embedding_model, 
        metadatas=metas
    )

    # Save vector store to disk for future use
    vector_store.save_local("faiss_hotpotqa")
    print("FAISS vector store saved to faiss_hotpotqa")

Loading existing FAISS vector store from faiss_hotpotqa...


In [8]:
# Answer with LLM + Retrieved Docs
llm = ChatOpenAI(model=CHAT_MODEL, temperature=0)

SYSTEM_PROMPT = (
    "You are a precise QA assistant. Return just the short answer phrase with no explanation, and no full sentences."
    "If you are COMPLETELY UNSURE of the answer based on the provided passages, respond with 'Unknown'."
)

def build_user_prompt(question, passages):
    bundle = "\n\n".join([f"PASSAGE {i+1}:\n{p}" for i, p in enumerate(passages)])
    return f"{bundle}\n\nQUESTION: {question}\nANSWER:"

def singlehop_answer(question, k = K_RETRIEVE):
    docs = vector_store.similarity_search(question, k=k)
    # Keep only the page content to reduce tokens
    passages = [d.page_content for d in docs]
    user_prompt = build_user_prompt(question, passages)
    resp = llm.invoke([{"role":"system","content": SYSTEM_PROMPT},
                       {"role":"user","content": user_prompt}])
    pred = resp.content
    return pred, passages

In [10]:
# Let's test on one example
query = "Were Scott Derrickson and Ed Wood of the same nationality?"
pred, passages = singlehop_answer(query, k=K_RETRIEVE)
print("Question:", query)
print("Predicted Answer:", pred)
print("Retrieved Passages:")
for i, p in enumerate(passages):
    print(f"Passage {i+1}:\n{p}\n")

Question: Were Scott Derrickson and Ed Wood of the same nationality?
Predicted Answer: Yes
Retrieved Passages:
Passage 1:
Scott Derrickson (born July 16, 1966) is an American director, screenwriter and producer.  He lives in Los Angeles, California.  He is best known for directing horror films such as "Sinister", "The Exorcism of Emily Rose", and "Deliver Us From Evil", as well as the 2016 Marvel Cinematic Universe installment, "Doctor Strange."

Passage 2:
Edward Davis Wood Jr. (October 10, 1924 – December 10, 1978) was an American filmmaker, actor, writer, producer, and director.

Passage 3:
Donald G. Jackson (April 24, 1943 – October 20, 2003) was an American filmmaker who is often referred to in the media as the Ed Wood of the video age.  This delination was given due to the bizarre nature, content, and lack of defined storyline prevalent in his film and because virtually all of his films were harshly criticized by film critics.

Passage 4:
Ed Wood is a 1994 American biographical p

In [11]:
# EM/F1 evaluation
def normalize_answer(s):
    def remove_articles(text):
        return re.sub(r'\b(a|an|the)\b', ' ', text)
    def white_space_fix(text):
        return ' '.join(text.split())
    def remove_punc(text):
        exclude = set(string.punctuation)
        return ''.join(ch for ch in text if ch not in exclude)
    def lower(text):
        return text.lower()

    return white_space_fix(remove_articles(remove_punc(lower(s))))


def f1_score(prediction, ground_truth):
    normalized_prediction = normalize_answer(prediction)
    normalized_ground_truth = normalize_answer(ground_truth)

    if normalized_prediction in ['yes', 'no'] and normalized_prediction != normalized_ground_truth:
        return 0
    if normalized_ground_truth in ['yes', 'no'] and normalized_prediction != normalized_ground_truth:
        return 0

    prediction_tokens = normalized_prediction.split()
    ground_truth_tokens = normalized_ground_truth.split()
    common = Counter(prediction_tokens) & Counter(ground_truth_tokens)
    num_same = sum(common.values())
    if num_same == 0:
        return 0
    precision = 1.0 * num_same / len(prediction_tokens)
    recall = 1.0 * num_same / len(ground_truth_tokens)
    f1 = (2 * precision * recall) / (precision + recall)
    return f1


def exact_match_score(prediction, ground_truth):
    return 1.0 if (normalize_answer(prediction) == normalize_answer(ground_truth)) else 0.0

In [None]:
def eval(ds, n, k=K_RETRIEVE):
    idxs = list(range(min(n, len(ds)))) # first n examples

    ems, f1s = [], []
    detailed_results = []

    for i in idxs:
        ex = ds[i]
        q = ex["question"]
        ground_truth = ex["answer"]

        # Predictions from your singlehop system
        pred, passages = singlehop_answer(q, k=k)
        print(f"Q: {q}")
        print(f"Pred: {pred}")
        print(f"Ground Truth: {ground_truth}")

        em = exact_match_score(pred, ground_truth)
        f1 = f1_score(pred, ground_truth)
        ems.append(em)
        f1s.append(f1)
        
        # Collect structured data for Phoenix analysis
        detailed_results.append({
            "index": i,
            "question": q,
            "prediction": pred,
            "ground_truth": ground_truth,
            "em_score": em,
            "f1_score": f1,
            "retrieved_passages": passages,
        })

    m = len(idxs) if idxs else 1
    metrics = {
        "n": len(idxs),
        "k": k,
        "EM": sum(ems)/m,
        "F1": sum(f1s)/m,
    }
    return metrics, detailed_results

# Run eval
metrics, detailed_results = eval(ds_val, 100, k=K_RETRIEVE) # TODO: change N to ds_val size for full eval later
print("Metrics:", metrics)

# Create DataFrame for Phoenix analysis
eval_df = create_eval_dataframe(detailed_results, model_type="single_hop")
print(f"\nCreated evaluation DataFrame with {len(eval_df)} examples")

# Run Phoenix evaluations
print("\nRunning Phoenix evaluations (this may take a while)...")
evaluators = [hallucination_evaluator, completeness_evaluator]
eval_df_with_scores = run_phoenix_evaluations(eval_df, evaluators)

# Analyze errors
print("\nError Analysis:")
error_analysis = analyze_errors(eval_df_with_scores)
for key, value in error_analysis.items():
    if key != "failed_examples_df":
        print(f"{key}: {value}")

# Display sample of failed examples if any
if "failed_examples_df" in error_analysis and len(error_analysis["failed_examples_df"]) > 0:
    print(f"\nSample of failed examples (showing first 5):")
    print(error_analysis["failed_examples_df"].head())

# Optionally save to CSV for later analysis
# eval_df_with_scores.to_csv("single_hop_eval_results.csv", index=False)
# print("\nResults saved to single_hop_eval_results.csv")

Q: Were Scott Derrickson and Ed Wood of the same nationality?
Pred: Yes
Ground Truth: yes
Q: What government position was held by the woman who portrayed Corliss Archer in the film Kiss and Tell?
Pred: Unknown
Ground Truth: Chief of Protocol
Q: What science fantasy young adult series, told in first person, has a set of companion books narrating the stories of enslaved worlds and alien species?
Pred: Unknown
Ground Truth: Animorphs
Q: Are the Laleli Mosque and Esma Sultan Mansion located in the same neighborhood?
Pred: No.
Ground Truth: no
Q: The director of the romantic comedy "Big Stone Gap" is based in what New York city?
Pred: Unknown
Ground Truth: Greenwich Village, New York City
Q: 2014 S/S is the debut album of a South Korean boy group that was formed by who?
Pred: Unknown
Ground Truth: YG Entertainment
Q: Who was known by his stage name Aladin and helped organizations improve their performance as a consultant?
Pred: Eenasul Fateh
Ground Truth: Eenasul Fateh
Q: The arena where th

In [None]:
# Phoenix Error Analysis Setup
import phoenix as px
from phoenix.evals import (
    HallucinationEvaluator,
    ClassificationEvaluator,
    LLM,
    run_evaluator,
)
from phoenix.evals.models import OpenAIModel

def create_eval_dataframe(results_list, model_type="single_hop"):
    """
    Convert evaluation results list to pandas DataFrame for Phoenix analysis.
    
    Args:
        results_list: List of dicts with evaluation results
        model_type: "single_hop" or "multi_hop"
    
    Returns:
        pandas DataFrame with structured evaluation data
    """
    df_data = []
    for result in results_list:
        row = {
            "model_type": model_type,
            "index": result["index"],
            "question": result["question"],
            "prediction": result["prediction"],
            "ground_truth": result["ground_truth"],
            "em_score": result["em_score"],
            "f1_score": result["f1_score"],
            "context": "\n\n".join(result["retrieved_passages"]) if result["retrieved_passages"] else "",
        }
        df_data.append(row)
    
    df = pd.DataFrame(df_data)
    return df


In [None]:
# Initialize Phoenix Evaluators
# Reuse existing LLM instance for evaluators
eval_llm = OpenAIModel(model_name=CHAT_MODEL)

# Hallucination Evaluator - checks if prediction is grounded in retrieved context
hallucination_evaluator = HallucinationEvaluator(model=eval_llm)

# Completeness Evaluator - assesses if prediction fully answers the question
completeness_prompt = """
You are an expert at judging the completeness of a response to a query.
Given a query and response, rate the completeness of the response.
A response is complete if it fully answers all parts of the query.
A response is partially complete if it only answers part of the query.
A response is incomplete if it does not answer any part of the query or is not related to the query.

Query: {query}
Response: {response}

Is the response complete, partially complete, or incomplete?
"""

completeness_evaluator = ClassificationEvaluator(
    model=eval_llm,
    name="completeness",
    prompt_template=completeness_prompt,
    choices={"complete": 1.0, "partially complete": 0.5, "incomplete": 0.0},
)


In [None]:
# Run Phoenix Evaluations on DataFrame
def run_phoenix_evaluations(df, evaluators):
    """
    Run Phoenix evaluators on a dataframe.
    
    Args:
        df: pandas DataFrame with columns: question, prediction, context
        evaluators: list of evaluator objects (not used directly, but kept for API consistency)
    
    Returns:
        DataFrame with evaluation scores added
    """
    df_eval = df.copy()
    
    # Run hallucination evaluator
    # HallucinationEvaluator expects: input (query), output (response), context
    hallucination_results = []
    print("Evaluating hallucination scores...")
    for idx, row in df.iterrows():
        try:
            # Create a single-row dataframe for this evaluation
            eval_df = pd.DataFrame([{
                "input": row["question"],
                "output": row["prediction"],
                "context": row["context"] if row["context"] else "",
            }])
            result = run_evaluator(hallucination_evaluator, dataframe=eval_df)
            # Extract score from result
            if len(result) > 0 and "score" in result.columns:
                hallucination_results.append(result.iloc[0]["score"])
            elif len(result) > 0:
                # Try to get the first value if column name is different
                hallucination_results.append(result.iloc[0].iloc[0])
            else:
                hallucination_results.append(None)
        except Exception as e:
            print(f"Error evaluating hallucination for row {idx}: {e}")
            hallucination_results.append(None)
    
    df_eval["hallucination_score"] = hallucination_results
    
    # Run completeness evaluator
    # ClassificationEvaluator expects: query, response
    completeness_results = []
    print("Evaluating completeness scores...")
    for idx, row in df.iterrows():
        try:
            eval_df = pd.DataFrame([{
                "query": row["question"],
                "response": row["prediction"],
            }])
            result = run_evaluator(completeness_evaluator, dataframe=eval_df)
            # Extract score from result
            if len(result) > 0 and "score" in result.columns:
                completeness_results.append(result.iloc[0]["score"])
            elif len(result) > 0:
                # Try to get the first value if column name is different
                completeness_results.append(result.iloc[0].iloc[0])
            else:
                completeness_results.append(None)
        except Exception as e:
            print(f"Error evaluating completeness for row {idx}: {e}")
            completeness_results.append(None)
    
    df_eval["completeness_score"] = completeness_results
    
    return df_eval


In [None]:
# Error Analysis Functions
def analyze_errors(df, f1_threshold=0.5):
    """
    Analyze errors in evaluation results.
    
    Args:
        df: DataFrame with evaluation results including em_score, f1_score, hallucination_score, completeness_score
        f1_threshold: F1 score threshold below which to consider an error
    
    Returns:
        Dictionary with error analysis summary
    """
    # Filter failed examples
    failed = df[(df["em_score"] == 0) | (df["f1_score"] < f1_threshold)].copy()
    
    total = len(df)
    num_failed = len(failed)
    error_rate = num_failed / total if total > 0 else 0
    
    analysis = {
        "total_examples": total,
        "failed_examples": num_failed,
        "error_rate": error_rate,
        "avg_em_score": df["em_score"].mean(),
        "avg_f1_score": df["f1_score"].mean(),
    }
    
    # Analyze error patterns if we have Phoenix scores
    if "hallucination_score" in df.columns:
        hallucination_errors = failed[failed["hallucination_score"] == 0] if "hallucination_score" in failed.columns else pd.DataFrame()
        analysis["hallucination_errors"] = len(hallucination_errors)
        analysis["hallucination_error_rate"] = len(hallucination_errors) / num_failed if num_failed > 0 else 0
    
    if "completeness_score" in df.columns:
        incomplete = failed[failed["completeness_score"] < 1.0] if "completeness_score" in failed.columns else pd.DataFrame()
        analysis["incomplete_answers"] = len(incomplete)
        analysis["incompleteness_rate"] = len(incomplete) / num_failed if num_failed > 0 else 0
    
    # Error breakdown by type
    if num_failed > 0:
        analysis["failed_examples_df"] = failed[["index", "question", "prediction", "ground_truth", "em_score", "f1_score"]]
    
    return analysis

def compare_models(single_hop_df, multi_hop_df):
    """
    Compare error rates and performance between single-hop and multi-hop models.
    
    Args:
        single_hop_df: DataFrame with single-hop evaluation results
        multi_hop_df: DataFrame with multi-hop evaluation results
    
    Returns:
        Dictionary with comparison metrics
    """
    comparison = {
        "single_hop": {
            "avg_em": single_hop_df["em_score"].mean(),
            "avg_f1": single_hop_df["f1_score"].mean(),
            "error_rate": (single_hop_df["em_score"] == 0).mean(),
        },
        "multi_hop": {
            "avg_em": multi_hop_df["em_score"].mean(),
            "avg_f1": multi_hop_df["f1_score"].mean(),
            "error_rate": (multi_hop_df["em_score"] == 0).mean(),
        },
    }
    
    # Calculate improvements
    comparison["em_improvement"] = comparison["multi_hop"]["avg_em"] - comparison["single_hop"]["avg_em"]
    comparison["f1_improvement"] = comparison["multi_hop"]["avg_f1"] - comparison["single_hop"]["avg_f1"]
    comparison["error_reduction"] = comparison["single_hop"]["error_rate"] - comparison["multi_hop"]["error_rate"]
    
    # Find questions where one model performs better
    if len(single_hop_df) == len(multi_hop_df):
        single_hop_better = (single_hop_df["em_score"] > multi_hop_df["em_score"]).sum()
        multi_hop_better = (multi_hop_df["em_score"] > single_hop_df["em_score"]).sum()
        comparison["single_hop_better_count"] = single_hop_better
        comparison["multi_hop_better_count"] = multi_hop_better
        comparison["tie_count"] = len(single_hop_df) - single_hop_better - multi_hop_better
    
    return comparison
