# 4.3 - Part 1: Traditional NLP Baseline for PragmatiCQA

Implementing a baseline QA system using a pre-trained model from Hugging Face.

4.3. Part 1: The "Traditional" NLP Approach
In this part, you will use a pre-trained Question Answering model as a "traditional" baseline.

Use the retriever module from the rag.ipynb notebook to retrieve relevant passage given a question. Concatenate all the elements into a single context.

Write a program that uses a pre-trained QA model from Hugging Face's transformers library to generate an answer to the question given the context retrieved by the retriever module. A good model to use for this is 'distilbert-base-cased-distilled-squad' (see https://huggingface.co/distilbert/distilbert-base-cased-distilled-squad). This module will extract an answer from the context without explicit multi-step reasoning.

Evaluate on PRAGMATICQA: Run your model on the PRAGMATICQA test set. Since the answers are free-form, you need to use a metric like F1 score or ROUGE to evaluate performance. In this assignment, we will use the SemanticF1 metric provided in DSPy which uses an LLM as a Judge method.

Since the performance of the model depends on the accuracy of the retriever module, we will compute three different configurations:

Literal answer: the answer generated by the distilbert model from the literal spans included in the dataset.
Pragmatic answer: the answer generated by the distilbert model from the pragmatic spans included in the dataset.
Retrieved answer: the answer generated by the distilbert model from the context computed by the retriever.
For each of these three configurations, report precision, recall and F1 as computed by SemanticF1 on the validation dataset for the first question of each conversation only (there are 179 such cases in val.jsonl). For the first question in each conversation, there is no "conversational context", hence the input to the model only includes the question and the retrieved passages.

Note: To improve performance, consider using the SemanticF1.batch method to perform the dataset evaluation (https://dspy.ai/api/evaluation/SemanticF1/) or the general dspy.Evaluate evaluation method which runs evaluation in parallel and outputs a tabular report.

Analyze the results: where does the model succeed, and where does it fail? Does it tend to give literal answers when a more pragmatic one is needed?

## Setup

In [1]:
from dotenv import load_dotenv
import os

load_dotenv()
api_key = os.environ['XAI_API_KEY']


In [2]:
import dspy
from dspy.evaluate import SemanticF1
from transformers import AutoModelForQuestionAnswering, AutoTokenizer, pipeline
import json
import os
from typing import List, Dict
import torch

# Configure DSPy with an LM FIRST (before creating SemanticF1)
lm = dspy.LM('xai/grok-3-mini', api_key=api_key)
dspy.configure(lm=lm)

# Set up the QA model
model_name = "distilbert/distilbert-base-cased-distilled-squad"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForQuestionAnswering.from_pretrained(model_name)
qa_pipeline = pipeline('question-answering', model=model, tokenizer=tokenizer)

# Set up SemanticF1 metric (now it will work because LM is configured)
metric = SemanticF1()

Device set to use cpu


## Load and extract questions

In [3]:
def load_pragmaticqa_test(dataset_dir="../PragmatiCQA/data"):
    """Load the val set from PragmatiCQA dataset."""
    corpus = []
    with open(os.path.join(dataset_dir, "val.jsonl"), 'r') as f:
        for line in f:
            corpus.append(json.loads(line))
    return corpus

def get_first_questions(data):
    """Extract only the first questions from each conversation - keep topic too."""
    first_questions = []
    for doc in data:
        qas = doc.get('qas', [])
        if not qas:
            continue
        first_qa = qas[0]
        first_questions.append({
            'topic': doc.get('topic', ''),  # needed to choose the right retriever
            'question': first_qa.get('q', ''),
            'answer': first_qa.get('a', ''),
            # support both possible field names
            'literal_spans': [obj.get('text','') for obj in first_qa.get('a_meta', {}).get('literal_obj', [])] or first_qa.get('literal_spans', []),
            'pragmatic_spans': [obj.get('text','') for obj in first_qa.get('a_meta', {}).get('pragmatic_obj', [])] or first_qa.get('pragmatic_spans', [])
        })
    return first_questions

In [4]:
# Load test data
test_data = load_pragmaticqa_test()
first_questions = get_first_questions(test_data)
print(f"Loaded {len(first_questions)} first questions from the val set.")

Loaded 179 first questions from the val set.


## Set up retriever


In [28]:
# Hard-coded aliases -> existing folder names
TOPIC_ALIASES = {
    "A Nightmare on Elm Street (2010 film)": "A Nightmare on Elm Street",
    "Alexander Hamilton": "Hamilton the Musical",      # proxy topic
    "Popeye": "Popeye the Sailor",
    "The Wonderful Wizard of Oz (book)": "Wizard of Oz",
}

import os

def list_available_topics(sources_dir):
    return {
        name for name in os.listdir(sources_dir)
        if os.path.isdir(os.path.join(sources_dir, name))
    }

def resolve_topic_name(topic, available_topics):
    """Return a folder topic to use for this logical topic."""
    # exact match first
    if topic in available_topics:
        return topic
    # alias match next
    alias = TOPIC_ALIASES.get(topic)
    if alias and alias in available_topics:
        return alias
    # nothing found
    return None


In [29]:
from sentence_transformers import SentenceTransformer
from bs4 import BeautifulSoup

# P2.2 - Load HTML sources and build per-topic retrievers

def create_retriever_for_topic(topic, sources_dir="../PragmatiCQA-sources", k=5):
    topic_dir = os.path.join(sources_dir, topic) if topic else None
    corpus = read_html_files_with_stopper(topic_dir) if topic_dir else []
    if not corpus:
        return None
    return dspy.retrievers.Embeddings(embedder=embedder, corpus=corpus, k=k)

def cache_topic_retrievers(split_data, sources_dir="../PragmatiCQA-sources", k=5):
    topics = {ex.get("topic","") for ex in split_data if ex.get("topic","")}
    available = list_available_topics(sources_dir)
    cache = {}
    missing = []

    for t in sorted(topics):
        folder_topic = resolve_topic_name(t, available) or t
        r = create_retriever_for_topic(folder_topic, sources_dir=sources_dir, k=k)
        if r is not None:
            # keep key as the ORIGINAL topic from the dataset,
            # but build the retriever from the resolved folder topic
            cache[t] = r
        else:
            missing.append(t)

    print(f"Built retrievers for {len(cache)}/{len(topics)} topics.")
    if missing:
        print("Missing or empty topic folders:", missing)
    return cache


model = SentenceTransformer("sentence-transformers/static-retrieval-mrl-en-v1", device="cpu")
embedder = dspy.Embedder(model.encode)

def read_html_files_with_stopper(directory):
    docs = []
    if not os.path.isdir(directory):
        return docs
    for fn in os.listdir(directory):
        if fn.endswith(".html"):
            p = os.path.join(directory, fn)
            try:
                with open(p, "r", encoding="utf-8", errors="ignore") as f:
                    soup = BeautifulSoup(f.read(), "html.parser")
                    text = soup.get_text(separator=" ", strip=True)
                    if text:
                        # clamp to avoid huge prompts
                        docs.append(text[:3000])
            except Exception as e:
                print(f"Skip {fn}: {e}")
    return docs

# Build per-topic retrievers from the topics present in this split
retr_lookup_val = cache_topic_retrievers(test_data, sources_dir="../PragmatiCQA-sources", k=5)

Built retrievers for 11/11 topics.


In [23]:
topics_in_data = {ex.get("topic","") for ex in test_data if ex.get("topic","")}
topics_in_cache = set(retr_lookup_val.keys())
missing = topics_in_data - topics_in_cache
print(f"Topics missing retrievers ({len(missing)}): {sorted(missing)}")


Topics missing retrievers (4): ['A Nightmare on Elm Street (2010 film)', 'Alexander Hamilton', 'Popeye', 'The Wonderful Wizard of Oz (book)']


In [24]:
import os

sources_root = r"C:\liran\Program\SMSTR8\NLPWithLLM\hw3\PragmatiCQA-sources"

if not os.path.exists(sources_root):
    print(f"❌ Path not found: {sources_root}")
else:
    folders = [name for name in os.listdir(sources_root)
               if os.path.isdir(os.path.join(sources_root, name))]
    print(f"📁 Found {len(folders)} topic folders under:\n{sources_root}\n")
    for f in sorted(folders):
        print(" -", f)


📁 Found 73 topic folders under:
C:\liran\Program\SMSTR8\NLPWithLLM\hw3\PragmatiCQA-sources

 - 'Cats' Musical
 - A Nightmare on Elm Street
 - Arrowverse
 - Barney
 - Baseball
 - Batman
 - Big Nate
 - Bleach
 - Britney Spears
 - Detective Conan
 - Dinosaur
 - Doctor Who
 - Doom Patrol
 - Dr. Stone
 - Dream Team
 - Edens Zero
 - Enter the Gungeon
 - Evangelion
 - Fallout
 - Fullmetal Alchemist
 - Game of Thrones
 - Girl Genius
 - Goosebumps
 - H. P. Lovecraft
 - Half-Life series
 - Halo
 - Hamilton the Musical
 - Harry Potter
 - Inazuma Eleven
 - Indiana Jones
 - Jujutsu Kaisen
 - Kingdom Hearts
 - Kung Fu Panda
 - LEGO
 - Lady Gaga
 - Lemony Snicket
 - MS Paint Adventures
 - Madagascar
 - My Hero Academia
 - Mystery Science Theater 3000
 - Non-alien Creatures
 - Olympics
 - One Piece
 - Peanuts Comics
 - Pixar
 - Popeye the Sailor
 - Rap
 - Reborn
 - Riordan
 - Serious Sam
 - Shaman King
 - ShowBiz Pizza
 - Six the Musical
 - Skulduggery Pleasant
 - Sonic the Hedgehog
 - Soul Eater
 - S

## Evaluate QA system

### New method with keeping the full report

In [44]:
RETR_TOPK = 3
CTX_MAX_CHARS = 2000

def to_strings(passages):
    out = []
    for p in passages:
        if isinstance(p, str):
            out.append(p)
        elif hasattr(p, "text"):
            out.append(str(p.text))
        else:
            out.append(str(p))
    return out

def get_retrieved_context(question, retr):
    """Return joined context string from a DSPy retriever result, robust to shapes."""
    try:
        res = retr(question)
    except Exception:
        return ""

    # 1) list[str] already
    if isinstance(res, list):
        passages = res

    # 2) has attribute .passages
    elif hasattr(res, "passages"):
        passages = res.passages

    # 3) dict with 'passages'
    elif isinstance(res, dict) and "passages" in res:
        passages = res["passages"]

    # 4) fallback - stringify
    else:
        passages = [str(res)]

    strings = to_strings(passages)[:RETR_TOPK]
    ctx = " ".join(s.strip() for s in strings if s and s.strip())
    return ctx[:CTX_MAX_CHARS]


In [45]:
import numpy as np
import dspy
from dspy.evaluate import SemanticF1

def _token_prf(prediction, reference):
    pt = set(str(prediction).lower().split())
    rt = set(str(reference).lower().split())
    if not pt or not rt:
        return 0.0, 0.0, 0.0
    overlap = pt & rt
    precision = len(overlap) / len(pt) if pt else 0.0
    recall    = len(overlap) / len(rt) if rt else 0.0
    f1 = 0.0 if (precision + recall) == 0 else 2 * precision * recall / (precision + recall)
    return precision, recall, f1

def evaluate_qa_system_full_metrics(
    questions,
    context_type='retrieved',
    batch_size=16,
    retr_lookup=None,
    qa_pipeline=None,
    metric: SemanticF1=None,
    verbose=True
):
    """
    Evaluates with:
    - DSPy SemanticF1 using dspy.Example (LLM-as-judge)
    - token precision, recall, F1
    Returns: (detailed_results, avg_precision, avg_recall, avg_f1, avg_semantic_f1)
    """
    if context_type == 'retrieved' and not isinstance(retr_lookup, dict):
        raise ValueError("context_type='retrieved' requires retr_lookup=dict.")
    if qa_pipeline is None:
        raise ValueError("qa_pipeline must be provided.")
    if metric is None:
        # make sure you did dspy.configure(lm=...) before this
        metric = SemanticF1()

    examples = []
    empty_ctx = 0

    # build contexts and predictions
    for q in questions:
        question  = q['question']
        reference = q['answer']

        if context_type == 'literal':
            context = ' '.join(q.get('literal_spans', [])).strip()
        elif context_type == 'pragmatic':
            context = ' '.join(q.get('pragmatic_spans', [])).strip()
        elif context_type == 'retrieved':
            topic = q.get('topic', '')
            retr = retr_lookup.get(topic) if retr_lookup else None
            context = get_retrieved_context(q['question'], retr) if retr else ''

        if not context:
            empty_ctx += 1
            pred = ""
        else:
            try:
                out = qa_pipeline(question=question, context=context)
                pred = (out.get('answer', '') if isinstance(out, dict) else "").strip()
            except Exception:
                pred = ""

        examples.append({
            'topic': q.get('topic',''),
            'question': question,
            'prediction': pred,
            'reference': reference,
            'context_type': context_type,
            'context': context
        })

    if verbose:
        n = len(examples)
        print(f"Built {n} examples - empty contexts: {empty_ctx}/{n} ({empty_ctx/max(1,n):.1%})")

    if not examples:
        return [], 0.0, 0.0, 0.0, 0.0

    # per-example SemanticF1 using dspy.Example
    detailed_results = []
    pr_list, re_list, f1_list, sf1_list = [], [], [], []

    total = len(examples)
    for i in range(0, total, batch_size):
        if verbose:
            print(f"scoring batch {i//batch_size+1}/{(total+batch_size-1)//batch_size}")
        batch = examples[i:i+batch_size]
        for ex in batch:
            pred = ex['prediction']
            ref  = ex['reference']

            # semantic-F1 via dspy.Example
            try:
                gold_ex = dspy.Example(question=ex['question'], response=ref)
                pred_ex = dspy.Example(question=ex['question'], response=pred)
                s_f1 = float(metric(gold_ex, pred_ex))
            except Exception:
                s_f1 = 0.0

            # token PRF
            p, r, f = _token_prf(pred, ref)

            detailed_results.append({
                **ex,
                'scores': {
                    'precision': p,
                    'recall': r,
                    'f1': f,
                    'semantic_f1': s_f1
                }
            })
            pr_list.append(p); re_list.append(r); f1_list.append(f); sf1_list.append(s_f1)

    avg_p   = float(np.mean(pr_list)) if pr_list else 0.0
    avg_r   = float(np.mean(re_list)) if re_list else 0.0
    avg_f   = float(np.mean(f1_list)) if f1_list else 0.0
    avg_sf1 = float(np.mean(sf1_list)) if sf1_list else 0.0

    if verbose:
        print(f"Mean PRF: P={avg_p:.3f} R={avg_r:.3f} F1={avg_f:.3f} | SemanticF1={avg_sf1:.3f}")

    return detailed_results, avg_p, avg_r, avg_f, avg_sf1


In [46]:
# make sure LM is configured before creating the metric
# lm = dspy.LM('xai/grok-3-mini', api_key=api_key)
# dspy.configure(lm=lm)

metric = SemanticF1()  # created after configure

clean_results = {}
for cfg in ['literal','pragmatic','retrieved']:
    print(f"\nEvaluating {cfg}...")
    kwargs = {'retr_lookup': retr_lookup_val} if cfg == 'retrieved' else {}
    detailed, avg_p, avg_r, avg_f, avg_sf1 = evaluate_qa_system_full_metrics(
        first_questions,
        context_type=cfg,
        batch_size=32,
        retr_lookup=kwargs.get('retr_lookup'),
        qa_pipeline=qa_pipeline,
        metric=metric,
        verbose=True
    )
    clean_results[cfg] = {
        'avg_precision': avg_p,
        'avg_recall': avg_r,
        'avg_f1': avg_f,
        'avg_semantic_f1': avg_sf1,
        'detailed_results': detailed
    }



Evaluating literal...
Built 179 examples - empty contexts: 0/179 (0.0%)
scoring batch 1/6
scoring batch 2/6
scoring batch 3/6
scoring batch 4/6
scoring batch 5/6
scoring batch 6/6
Mean PRF: P=0.522 R=0.074 F1=0.118 | SemanticF1=0.424

Evaluating pragmatic...
Built 179 examples - empty contexts: 0/179 (0.0%)
scoring batch 1/6
scoring batch 2/6
scoring batch 3/6
scoring batch 4/6
scoring batch 5/6
scoring batch 6/6
Mean PRF: P=0.563 R=0.091 F1=0.143 | SemanticF1=0.383

Evaluating retrieved...
Built 179 examples - empty contexts: 0/179 (0.0%)
scoring batch 1/6
scoring batch 2/6
scoring batch 3/6
scoring batch 4/6
scoring batch 5/6
scoring batch 6/6
Mean PRF: P=0.230 R=0.039 F1=0.062 | SemanticF1=0.149


In [51]:

import json
with open("clean_results.json", "w", encoding="utf-8") as f:
    json.dump(clean_results, f, ensure_ascii=False, indent=2)
print("Saved to clean_results.json")


Saved to clean_results.json


In [8]:
import math
import pandas as pd

def _safe_num(x, default=0.0):
    try:
        return float(x) if not (x is None or (isinstance(x, float) and math.isnan(x))) else default
    except Exception:
        return default

def report_overall_metrics(results):
    """Prints and returns a small dataframe with avg precision/recall/F1/semantic-F1 per configuration."""
    rows = []
    for config, res in results.items():
        rows.append({
            "config": config.capitalize(),
            "precision": _safe_num(res.get("avg_precision")),
            "recall": _safe_num(res.get("avg_recall")),
            "f1": _safe_num(res.get("avg_f1")),
            "semantic_f1": _safe_num(res.get("avg_semantic_f1")),
        })
    df = pd.DataFrame(rows).set_index("config").sort_index()
    print("\n📊 Overall Evaluation Report\n" + "="*40)
    display(df)
    return df

# Example usage
overall_df = report_overall_metrics(clean_results)



📊 Overall Evaluation Report


Unnamed: 0_level_0,precision,recall,f1,semantic_f1
config,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
Literal,0.522104,0.073627,0.11831,0.424383
Pragmatic,0.563388,0.090601,0.143291,0.383401
Retrieved,0.230119,0.038806,0.062257,0.14934


In [6]:
def analyze_results(results, metric_key='semantic_f1', topk=1):
    """
    Prints best and worst examples per config by a chosen metric.
    metric_key in {'semantic_f1','f1','precision','recall'}.
    """
    for config, bundle in results.items():
        detailed = bundle.get('detailed_results', [])
        if not detailed:
            print(f"\n⚠️ No detailed results for configuration '{config}'")
            continue

        # collect (score, idx)
        scored = []
        for i, ex in enumerate(detailed):
            val = ex.get('scores', {}).get(metric_key, 0.0)
            try:
                val = float(val)
            except Exception:
                val = 0.0
            if math.isnan(val):
                val = 0.0
            scored.append((val, i))

        if not scored:
            print(f"\n⚠️ No '{metric_key}' scores found for '{config}'")
            continue

        scored.sort(key=lambda t: t[0], reverse=True)
        best = scored[:topk]
        worst = list(reversed(scored))[:topk]

        print(f"\n{'='*60}\n🔹 Analysis for '{config}' configuration ({metric_key.upper()})\n{'='*60}")

        print("\n✅ Best example(s):")
        for val, idx in best:
            ex = detailed[idx]
            print(f"- Score: {val:.4f}")
            print(f"  Q: {ex['question']}")
            print(f"  Pred: {ex['prediction']}")
            print(f"  Ref: {ex['reference']}")
            print(f"  Ctx: {ex['context'][:200]}...\n")

        print("❌ Worst example(s):")
        for val, idx in worst:
            ex = detailed[idx]
            print(f"- Score: {val:.4f}")
            print(f"  Q: {ex['question']}")
            print(f"  Pred: {ex['prediction']}")
            print(f"  Ref: {ex['reference']}")
            print(f"  Ctx: {ex['context'][:200]}...\n")


### Parallel eval


In [49]:
def analyze_results(results, metric_key='semantic_f1'):
    """
    Analyze where the model succeeds and fails across configurations.
    Args:
        results (dict): output of evaluate_qa_system_full_metrics() runs.
        metric_key (str): which metric to analyze (e.g., 'semantic_f1', 'f1', 'precision', 'recall')
    """
    for config in results:
        detailed = results[config].get('detailed_results', [])
        if not detailed:
            print(f"\n⚠️ No detailed results for configuration '{config}'")
            continue

        # Extract chosen metric for each example
        scores = [ex['scores'].get(metric_key, 0.0) for ex in detailed]

        if not scores:
            print(f"\n⚠️ No '{metric_key}' scores found for '{config}'")
            continue

        best_idx = int(max(range(len(scores)), key=lambda i: scores[i]))
        worst_idx = int(min(range(len(scores)), key=lambda i: scores[i]))

        best_example = detailed[best_idx]
        worst_example = detailed[worst_idx]

        print(f"\n{'='*60}\n🔹 Analysis for '{config}' configuration ({metric_key.upper()})\n{'='*60}")

        print("\n✅ Best performing example:")
        print(f"Question: {best_example['question']}")
        print(f"Prediction: {best_example['prediction']}")
        print(f"Reference: {best_example['reference']}")
        print(f"Context snippet: {best_example['context'][:150]}...")
        print(f"{metric_key}: {best_example['scores'][metric_key]:.4f}")

        print("\n❌ Worst performing example:")
        print(f"Question: {worst_example['question']}")
        print(f"Prediction: {worst_example['prediction']}")
        print(f"Reference: {worst_example['reference']}")
        print(f"Context snippet: {worst_example['context'][:150]}...")
        print(f"{metric_key}: {worst_example['scores'][metric_key]:.4f}")


## Analyze results (success/failure)

In [9]:
import os
import json

# Check if clean_results exists, otherwise load from file
if 'clean_results' not in globals() or not clean_results:
    if os.path.exists("clean_results.json"):
        with open("clean_results.json", "r", encoding="utf-8") as f:
            clean_results = json.load(f)
        print("Loaded clean_results from clean_results.json")
    else:
        raise RuntimeError("clean_results is not initialized and clean_results.json not found.")

analyze_results(clean_results)


🔹 Analysis for 'literal' configuration (SEMANTIC_F1)

✅ Best example(s):
- Score: 1.0000
  Q: what year was the show release ? 
  Pred: 2005
  Ref: Good morning!The first American Supernanny show began airing on ABC on January 7, 2005.
  Ctx: The first American Supernanny show began airing on ABC on January 7, 2005....

❌ Worst example(s):
- Score: 0.0000
  Q: How many books have been published in the Game of Thrones series so far?
  Pred: Yes
  Ref: There are 5 books in the series and 3 prequel novellas set in the same world.
  Ctx: Yes...


🔹 Analysis for 'pragmatic' configuration (SEMANTIC_F1)

✅ Best example(s):
- Score: 1.0000
  Q: Ok, Where does the Supernanny mainly live (country)?
  Pred: British
  Ref: Supernanny lives in the UK. It is originally a British TV series.
  Ctx: a British TV series...

❌ Worst example(s):
- Score: 0.0000
  Q: What is Game of thrones its real or not?
  Pred: George R.R. Martin
  Ref: It is based on the novel series A Song of Ice and Fire
  Ctx: wri

Analyze the results: where does the model succeed, and where does it fail? Does it tend to give literal answers when a more pragmatic one is needed?

# 4.4 - Part 2: The LLM Multi-Step Prompting Approach

4.4. Part 2: The LLM Multi-Step Prompting Approach
Now, you will build a more sophisticated model using an LLM with multi-step prompting.

We will now evaluate all the questions in the conversations, not only the first question of each conversation as in 4.3.

In each turn, the model will have access as input to the following:

The previous turns as pairs (question, answer)
The current question.
The context retrieved by the retriever model given the current question.
Your model can enrich this by computing additional intermediary fields, for example:

A summary of the student's goal or interests based on the conversation history
A pragmatic or cooperative need underlying the student's current question (based on the past conversation and retrieved spans)
A generated "cooperative" question which can be used to re-query the source documents and extract additional context
A reasoning field computed by a Chain-of-Thought module for any of these intermediary steps
Implement the DSPy Module: Create a DSPy module that uses the strategy you devise to generate a cooperative answer.

For reference, you can start from the DSPy tutorials demonstrating variations around RAG:

https://dspy.ai/tutorials/rag/ (using a retriever based on FAISS and passage embeddings)
https://dspy.ai/tutorials/multihop_search/ (using a retriever based on BM25s)
https://dspy.ai/tutorials/agents/ (using a ReACT module with a ColbertV2 retriever)
4.4.1 First Questions
Perform the same evaluation as in 4.3 on the first questions in each conversation and compare the results of your model with the one in 4.3 based on the traditional text-to-text transformer.

4.4.2 Conversational Context
Now consider all questions in the conversations and take into account the conversational context.

Compile and Evaluate: Compile your DSPy program (you can use a small training set from the PRAGMATICQA data for this) and evaluate it on the validation set using the same metrics as in 4.3. Explain which metric you use to drive the optimization.



In [10]:
# P2.3 - Turn builders
def get_all_questions(split_data):
    out = []
    for conv in split_data:
        topic = conv.get("topic","")
        history = []
        for qa in conv.get("qas", []):
            q = qa["q"]; a = qa["a"]
            out.append({
                "topic": topic,
                "question": q,
                "answers": [a],
                "conversation_history": list(history)   # copy current history
            })
            history.append((q, a))  # update rolling history
    return out


In [53]:
# P2.4 - Multi-step DSPy program - OLD
class ConversationAnalyzer(dspy.Module):
    def __init__(self):
        super().__init__()
        self.analyze = dspy.ChainOfThought(
            "conversation_history, current_question -> user_interests, conversation_goal"
        )
    def forward(self, conversation_history, current_question):
        return self.analyze(conversation_history=conversation_history,
                            current_question=current_question)

class PragmaticReasoner(dspy.Module):
    def __init__(self):
        super().__init__()
        self.reason = dspy.ChainOfThought(
            "conversation_history, current_question, retrieved_context -> pragmatic_needs, cooperative_query"
        )
    def forward(self, conversation_history, current_question, retrieved_context):
        return self.reason(conversation_history=conversation_history,
                           current_question=current_question,
                           retrieved_context=retrieved_context)

class CooperativeAnswerGenerator(dspy.Module):
    def __init__(self):
        super().__init__()
        self.generate = dspy.ChainOfThought(
            "question, user_interests, pragmatic_needs, literal_context, pragmatic_context -> response"
        )
    def forward(self, question, user_interests, pragmatic_needs, literal_context, pragmatic_context):
        return self.generate(question=question,
                             user_interests=user_interests,
                             pragmatic_needs=pragmatic_needs,
                             literal_context=literal_context,
                             pragmatic_context=pragmatic_context)

class PragmaticRAG(dspy.Module):
    def __init__(self, retriever_lookup, default_k=5, max_context_chars=12000):
        super().__init__()
        self.retriever_lookup = retriever_lookup
        self.default_k = default_k
        self.max_context_chars = max_context_chars
        self.conv = ConversationAnalyzer()
        self.reason = PragmaticReasoner()
        self.answerer = CooperativeAnswerGenerator()

    def _retrieve(self, topic, query, k=None):
        retr = self.retriever_lookup.get(topic)
        if retr is None or not query:
            return []
        retr.k = k or self.default_k
        try:
            res = retr(query)
            passages = list(res.passages) if hasattr(res, "passages") else []
            return passages
        except Exception as e:
            print("Retrieval error:", e)
            return []

    def _join_ctx(self, chunks):
        if not chunks:
            return ""
        text = " ".join(chunks)
        # clamp to keep prompts reasonable
        return text[: self.max_context_chars]

    def forward(self, topic, conversation_history, question, k=None):
        # 1) base retrieval on the literal question
        base_ctx_chunks = self._retrieve(topic, question, k=k)
        base_ctx = self._join_ctx(base_ctx_chunks)

        # 2) analyze conversation
        conv_out = self.conv(conversation_history=conversation_history,
                             current_question=question)
        user_interests = getattr(conv_out, "user_interests", "")
        conv_goal = getattr(conv_out, "conversation_goal", "")

        # 3) reason about pragmatic needs and propose a cooperative query
        reasoning = self.reason(conversation_history=conversation_history,
                                current_question=question,
                                retrieved_context=base_ctx)
        pragmatic_needs = getattr(reasoning, "pragmatic_needs", "")
        coop_query = getattr(reasoning, "cooperative_query", None) or question

        # 4) cooperative re-retrieval
        coop_ctx_chunks = self._retrieve(topic, coop_query, k=k)
        coop_ctx = self._join_ctx(coop_ctx_chunks)

        # 5) synthesize cooperative answer
        gen = self.answerer(
            question=question,
            user_interests=user_interests,
            pragmatic_needs=pragmatic_needs,
            literal_context=base_ctx,
            pragmatic_context=coop_ctx
        )
        final_text = getattr(gen, "response", "") or ""

        # Important for Evaluate: return an object with field `response`
        return dspy.Prediction(response=final_text)



In [9]:
# P2.5 - Load PRAGMATICQA data splits
def read_jsonl(path):
    items = []
    with open(path, "r", encoding="utf-8") as f:
        for line in f:
            items.append(json.loads(line))
    return items

DATASET_DIR = "../PragmatiCQA/data"  # adjust if needed

TRAIN_PATH = os.path.join(DATASET_DIR, "train.jsonl")

# optional tiny train slice for compilation
train_data = read_jsonl(TRAIN_PATH)[:50]  # small subset to keep compile fast

len(test_data), len(train_data)


(179, 50)

In [54]:
# P2.6 - SemanticF1 evaluation wrapper
from dspy.evaluate import SemanticF1

def evaluate_semantic_f1(examples, decompositional=True):
    metric = SemanticF1(decompositional=decompositional)
    preds, golds = [], []
    for ex in examples:
        preds.append(dspy.Example(question=ex["question"], response=ex["prediction"], inputs={"context": ""}))
        golds.append(dspy.Example(question=ex["question"], response=ex["reference"], inputs={"context": ""}))
    batch = metric.batch(preds, golds)

    detailed = []
    P, R, F = [], [], []
    for ex, sc in zip(examples, batch):
        p = sc.get("precision", 0.0); r = sc.get("recall", 0.0); f = sc.get("f1", 0.0)
        detailed.append({
            "question": ex["question"],
            "prediction": ex["prediction"],
            "reference": ex["reference"],
            "topic": ex.get("topic",""),
            "precision": p, "recall": r, "score": f
        })
        P.append(p); R.append(r); F.append(f)

    n = max(1, len(F))
    overall = {"precision": sum(P)/n, "recall": sum(R)/n, "f1": sum(F)/n}
    return {"overall": overall, "detailed_results": detailed}


In [55]:
# 4.4.1 - Build devset for first questions (no history)
from dspy.evaluate import Evaluate, SemanticF1

def build_devset_first_questions(conversations):
    dev = []
    for conv in conversations:
        qas = conv.get("qas", [])
        if not qas:
            continue
        q0 = qas[0]
        dev.append(
            dspy.Example(
                topic=conv.get("topic",""),
                conversation_history=[],          # no history for first turn
                question=q0["q"],
                response=q0["a"]                  # gold reference
            ).with_inputs("topic", "conversation_history", "question")
        )
    return dev

# cache retrievers once
prog_first = PragmaticRAG(retriever_lookup=retr_lookup_val, default_k=5)

devset_first = build_devset_first_questions(test_data)
evaluator_first = Evaluate(devset=devset_first, metric=SemanticF1(decompositional=True))
report_first = evaluator_first(prog_first)
print("4.4.1 (First questions) SemanticF1:", report_first)


2025/10/14 12:04:54 INFO dspy.evaluate.evaluate: Average Metric: 58.35448739312303 / 179 (32.6%)


4.4.1 (First questions) SemanticF1: 32.6


In [None]:
# 4.4.1 - Build devset for first questions (no history)
from dspy.evaluate import Evaluate, SemanticF1

def build_devset_first_questions(conversations):
    dev = []
    for conv in conversations:
        qas = conv.get("qas", [])
        if not qas:
            continue
        q0 = qas[0]
        dev.append(
            dspy.Example(
                topic=conv.get("topic",""),
                conversation_history=[],          # no history for first turn
                question=q0["q"],
                response=q0["a"]                  # gold reference
            ).with_inputs("topic", "conversation_history", "question")
        )
    return dev

# cache retrievers once
prog_first = PragmaticRAG(retriever_lookup=retr_lookup_val, default_k=5)

devset_first = build_devset_first_questions(test_data)
evaluator_first = Evaluate(devset=devset_first, metric=SemanticF1(decompositional=True))
report_first = evaluator_first(prog_first)
print("4.4.1 (First questions) SemanticF1:", report_first)


Built retrievers for 7/11 topics.


2025/10/12 19:26:24 INFO dspy.evaluate.evaluate: Average Metric: 59.187601294131746 / 179 (33.1%)


4.4.1 (First questions) SemanticF1: 33.07


In [None]:
# 4.4.1 - Build devset for first questions (no history)
from dspy.evaluate import Evaluate, SemanticF1

def build_devset_first_questions(conversations):
    dev = []
    for conv in conversations:
        qas = conv.get("qas", [])
        if not qas:
            continue
        q0 = qas[0]
        dev.append(
            dspy.Example(
                topic=conv.get("topic",""),
                conversation_history=[],          # no history for first turn
                question=q0["q"],
                response=q0["a"]                  # gold reference
            ).with_inputs("topic", "conversation_history", "question")
        )
    return dev

# cache retrievers once
prog_first = PragmaticRAG(retriever_lookup=retr_lookup_val, default_k=5)

devset_first = build_devset_first_questions(test_data)
evaluator_first = Evaluate(devset=devset_first, metric=SemanticF1(decompositional=True))
report_first = evaluator_first(prog_first)
print("4.4.1 (First questions) SemanticF1:", report_first)


In [15]:
# 4.4.2 - Build devset for all questions with rolling history
def build_devset_all_questions(conversations):
    dev = []
    for conv in conversations:
        topic = conv.get("topic","")
        hist = []
        for qa in conv.get("qas", []):
            dev.append(
                dspy.Example(
                    topic=topic,
                    conversation_history=list(hist),   # copy history so far
                    question=qa["q"],
                    response=qa["a"]
                ).with_inputs("topic", "conversation_history", "question")
            )
            hist.append((qa["q"], qa["a"]))  # update rolling history
    return dev

retr_lookup_val = cache_topic_retrievers(test_data, k=5)
prog_all = PragmaticRAG(retriever_lookup=retr_lookup_val, default_k=5)

devset_all = build_devset_all_questions(test_data)
evaluator_all = Evaluate(devset=devset_all, metric=SemanticF1(decompositional=True))
report_all = evaluator_all(prog_all)
print("4.4.2 (All turns) SemanticF1:", report_all)


Built retrievers for 7/11 topics.


2025/10/12 22:15:28 INFO dspy.evaluate.evaluate: Average Metric: 471.5585260847488 / 1526 (30.9%)


4.4.2 (All turns) SemanticF1: 30.9


In [16]:
# --- Save all evaluation results to JSON ---
import json
from datetime import datetime

results_summary = {
    "timestamp": datetime.now().isoformat(),
    "model": "PragmaticRAG",
    "dataset": "PragmatiCQA",
    "results": {
        "part_4_4_1_first_questions": {
            "semantic_f1": str(report_first)
        },
        "part_4_4_2_all_turns": {
            "semantic_f1": str(report_all)
        }
    }
}

# Save to a JSON file
OUTPUT_FILE = "part2_results.json"
with open(OUTPUT_FILE, "w", encoding="utf-8") as f:
    json.dump(results_summary, f, ensure_ascii=False, indent=2)

print(f"✅ Results saved to {OUTPUT_FILE}")


✅ Results saved to part2_results.json


## Old code

In [None]:
import dspy
import numpy as np

def _token_prf(prediction, reference):
    pt = set(str(prediction).lower().split())
    rt = set(str(reference).lower().split())
    if not pt or not rt:
        return 0.0, 0.0, 0.0
    overlap = pt & rt
    precision = len(overlap) / len(pt) if pt else 0.0
    recall = len(overlap) / len(rt) if rt else 0.0
    f1 = 0.0 if (precision + recall) == 0 else 2 * precision * recall / (precision + recall)
    return precision, recall, f1

def evaluate_qa_system_full_metrics(questions, context_type='retrieved', batch_size=5):
    """
    Build predictions, then evaluate per example:
    - semantic_f1 from DSPy SemanticF1 (float)
    - token precision, recall, f1 as a simple overlap baseline
    Returns: (detailed_results, avg_precision, avg_recall, avg_f1, avg_semantic_f1)
    """
    examples = []

    # 1) Build examples with context and predictions
    for q in questions:
        question = q['question']
        reference = q['answer']

        if context_type == 'literal':
            context = ' '.join(q.get('literal_spans', []))
        elif context_type == 'pragmatic':
            context = ' '.join(q.get('pragmatic_spans', []))
        else:
            # If you use a retriever, make sure `search` exists. Otherwise default to empty.
            try:
                context = ' '.join(search(question).passages)  # replace if you have a different retriever
            except Exception:
                context = ''

        if context.strip():
            try:
                prediction = qa_pipeline(question=question, context=context)['answer']
            except Exception:
                prediction = ""
        else:
            prediction = ""

        examples.append({
            'question': question,
            'prediction': prediction,
            'reference': reference,
            'context': context
        })

    # 2) Evaluate
    metric = dspy.evaluate.SemanticF1()
    detailed_results = []
    pr_list, re_list, f1_list, sf1_list = [], [], [], []

    total = len(examples)
    if total == 0:
        # Nothing to evaluate
        return [], 0.0, 0.0, 0.0, 0.0

    for i in range(0, total, batch_size):
        batch_num = i // batch_size + 1
        total_batches = (total + batch_size - 1) // batch_size
        batch = examples[i:i + batch_size]
        print(f"🟢 Processing batch {batch_num}/{total_batches} ({len(batch)} examples)")
        
        for ex in batch:
            pred = ex['prediction']
            ref = ex['reference']

            # semantic f1 (float)
            try:
                gold_ex = dspy.Example(question=ex['question'], response=ref)
                pred_ex = dspy.Example(question=ex['question'], response=pred)
                semantic_f1 = metric(gold_ex, pred_ex)  # float
            except Exception:
                semantic_f1 = 0.0

            # token PRF
            p, r, f = _token_prf(pred, ref)

            detailed_results.append({
                **ex,
                'scores': {
                    'precision': p,
                    'recall': r,
                    'f1': f,
                    'semantic_f1': semantic_f1
                }
            })
            pr_list.append(p); re_list.append(r); f1_list.append(f); sf1_list.append(semantic_f1)

    avg_p = float(np.mean(pr_list))
    avg_r = float(np.mean(re_list))
    avg_f = float(np.mean(f1_list))
    avg_sf1 = float(np.mean(sf1_list))

    return detailed_results, avg_p, avg_r, avg_f, avg_sf1
