In [None]:
!pip install datasets

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import re
import json
import pandas as pd
import requests
import time
from datasets import load_dataset
import csv

In [None]:
def read_dataset(filename):
  # Read CSV file with comma (,) delimiter
  df = pd.read_csv(filename, on_bad_lines="skip")

  print(len(df))

  df.rename(columns={'answer': 'groundtruth-answer', 'answer_qwen4b_contriever': 'generated-answer'}, inplace=True)

  #print(df.head(1))
  return df
  # Display first few rows of the DataFrame
  #print(df.head())

In [None]:
def load_hf_dataset(ds_name, subset_name, split_name):
  # Load the 'distractor' split of HotpotQA validation set
  dataset = load_dataset(ds_name, subset_name, split=split_name)

  # Extract only the desired fields
  filtered_dataset = dataset.map(lambda x: {
      "id": x["id"],
      "question": x["question"],
      "answer": x["answer"],
      "context": x["context"]
  })
  return filtered_dataset

In [None]:
# Function to flatten context into a single text string
def flatten_context(context):
    # Each context has 'title' and 'sentences', we join all sentences into one string
    sentences = []
    for sent_list in context['sentences']:
        sentences.extend(sent_list)  # sent_list is already a list of strings
    return " ".join(sentences)

In [None]:
def sanitize_context(text):
    """Normalize and escape complex literary text for TSV/JSON safety."""
    if not isinstance(text, str):
        return ""
    clean = (
        text.replace("\\", "\\\\")    # keep backslashes valid
            .replace("\t", "\\t")     # escape tabs
            .replace("\n", "\\n")     # escape in‑field newlines
            .replace("\"", "\\\"")    # escape quotes
            .replace("“", "\"").replace("”", "\"")  # normalize fancy quotes
            .replace("’", "'").replace("‘", "'")
            .replace("–", "-").replace("—", "-")
            .replace("\u00a0", " ")   # non‑breaking → space
    )
    # collapse redundant whitespace
    return re.sub(r"\s{2,}", " ", clean.strip())

In [None]:
def escape_text(text):
  if isinstance(text, str):
    return (
      text.replace("\t", "\\t")
      .replace("\n", "\\n")
      .replace('"', '\\"')
    )
  return text

In [None]:
def safe_escape(text: str) -> str:
    """
    Uses json.dumps() to properly escape quotes, backslashes, and control characters.
    Returns string without the double quotes json.dumps adds.
    """
    if not isinstance(text, str):
        return ""
    escaped = json.dumps(text)         # creates encoded str with outer quotes
    return escaped[1:-1]               # remove the surrounding quotes

In [None]:
def create_triples(filtered_dataset):
  # Create the triple list
  triple_list = []
  for _, item in filtered_dataset.iterrows():
    #context_text = sanitize_context(item['context'])
    question = sanitize_context(item['question'])
    groundtruth_answer = sanitize_context(item['groundtruth-answer'])
    model_answer = sanitize_context(item['generated-answer'])

    # Escape problematic characters
    # Replace TAB, NEWLINE, and QUOTE with visible safe placeholders
    #context_text = escape_text(context_text)
    #question = escape_text(question)
    #answer = escape_text(answer)

    triple_list.append((question, model_answer, groundtruth_answer))

  # Display first 3 triples
  #for t in triple_list[:3]:
    #print(t)

  print("Triples created!")

  return triple_list

In [None]:
def batch_prompting_model_old(triple_list):
    """
    triple_list: list of (question, generated-answer, groundtruth-answer)
    """
    batch_requests = []

    for idx, (question, generated, ground_truth) in enumerate(triple_list):
        system_block = {
            "type": "text",
            "text": (
                "You are serving as an impartial evaluator (LLM-as-a-Judge) for model-generated QA responses.\n"
                "You will be given a question, a model-generated answer, and a ground-truth reference answer.\n"
                "Your goal is to determine the correctness of the generated answer according to the following rules:\n\n"
                "1. Correct: The generated answer semantically matches the ground-truth answer.\n"
                "2. Hallucinated: The generated answer clearly contradicts or diverges from the ground-truth answer.\n"
                "3. Insufficient Context: If the generated answer explicitly states that the context does not provide "
                "sufficient information (e.g., contains phrases such as 'the context does not provide sufficient information', "
                "'cannot be determined from context', or similar), label it as Insufficient Context.\n\n"
                "Provide exactly one JSON object as output in the following format:\n"
                "{\"label\": one of [\"Correct\", \"Hallucinated\", \"Insufficient Context\"], "
                "\"explanation\": one concise sentence explaining your reasoning.}\n\n"
                "Example:\n"
                "{\"label\": \"Correct\", \"explanation\": \"The generated answer accurately matches the gold reference answer.\"}"
            ),
        }

        user_prompt = f"""
        QUESTION: {question}

        MODEL-GENERATED ANSWER: {generated}

        GROUND-TRUTH ANSWER: {ground_truth}

        Compare the generated answer against the ground truth.
        Return exactly one JSON object containing:
        - "label": one of ["Correct", "Hallucinated", "Insufficient Context"]
        - "explanation": a single concise justification.
        """

        request = {
            "custom_id": f"judge-{idx}",
            "params": {
                "model": "claude-sonnet-4-5",
                "max_tokens": 120,
                "system": [system_block],
                "messages": [
                    {"role": "user", "content": user_prompt}
                ],
            },
        }

        batch_requests.append(request)

    return batch_requests

In [None]:
def batch_prompting_model(triple_list):
    """
    triple_list: list of (question, generated-answer, groundtruth-answer)
    """
    batch_requests = []

    system_block = {
        "type": "text",
        "text": (
            "You are serving as an impartial evaluator (LLM-as-a-Judge) for model-generated QA responses.\n"
            "You will be given a question, a model-generated answer, and a ground-truth reference answer.\n"
            "Your goal is to determine the correctness of the generated answer according to the following rules:\n\n"
            "1. Correct: The generated answer semantically matches the ground-truth answer.\n"
            "2. Hallucinated: The generated answer clearly contradicts or diverges from the ground-truth answer.\n"
            "3. Insufficient Context: If the generated answer explicitly states that the context does not provide "
            "sufficient information (e.g., contains phrases such as 'the context does not provide sufficient information', "
            "'cannot be determined from context', or similar), label it as Insufficient Context.\n\n"
            "Provide exactly one JSON object as output in the following format:\n"
            "{\"label\": one of [\"Correct\", \"Hallucinated\", \"Insufficient Context\"], "
            "\"explanation\": one concise sentence explaining your reasoning.}\n\n"
            "Example:\n"
            "{\"label\": \"Correct\", \"explanation\": \"The generated answer accurately matches the gold reference answer.\"}"
        )
    }

    for idx, (question, generated, ground_truth) in enumerate(triple_list):
        user_prompt = f"""
        QUESTION: {question}

        MODEL-GENERATED ANSWER: {generated}

        GROUND-TRUTH ANSWER: {ground_truth}

        Compare the generated answer against the ground truth.
        Return exactly one JSON object containing:
        - "label": one of ["Correct", "Hallucinated", "Uncertain"]
        - "explanation": a single concise justification.
        """

        request = {
            "custom_id": f"judge-{idx}",
            "params": {
                "model": "claude-sonnet-4-5",
                "max_tokens": 200,
                "system": [system_block],
                "messages": [
                    {"role": "user", "content": user_prompt}
                ],
            },
        }

        batch_requests.append(request)

    return batch_requests

In [None]:
def send_batch_request(API_KEY, API_URL, batch_requests):
  # Prepare batch payload
  payload = {
    "requests": batch_requests
  }

  # Send request to Batch API
  headers = {
    "x-api-key": API_KEY,
    "anthropic-version": "2023-06-01",
    "content-type": "application/json"
  }
  response = requests.post(API_URL, json=payload, headers=headers)
  try:
        result = response.json()
  except json.JSONDecodeError:
        print("Error: Could not decode JSON.")
        print("Response text:")
        print(response.text)
        raise

  # Debugging output
  print("HTTP Status:", response.status_code)
  print("Response Data:", json.dumps(result, indent=2))

  # Handle errors explicitly
  if "error" in result:
      print("API Error:", result["error"])
      raise RuntimeError(f"API Error: {result['error']}")

  # Ensure the expected keys exist
  if "id" not in result or "processing_status" not in result:
      print("Unexpected response keys:", result.keys())
      raise KeyError("Missing 'id' or 'processing_status' in response")



  #result = response.json()
  print("Batch Created:", result["id"])
  print("Processing Status:", result["processing_status"])

  return result

In [None]:
def download_results(API_KEY, results_url, output_file = "results.json"):
# Local file to save the results
  try:
    response = requests.get(
        results_url,
        headers={
            "x-api-key": API_KEY,
            "anthropic-version": "2023-06-01"
        }
    )
    response.raise_for_status()  # Raise error for HTTP issues

    # Save the results locally
    with open(output_file, "wb") as f:
        f.write(response.content)

    print(f"Download complete! Results saved as: {output_file}")

  except requests.exceptions.RequestException as e:
    print(f"Failed to download results: {e}")

In [None]:
def poll_batchcompletion(result, headers):

  batch_id = result["id"]
  poll_url = f"https://api.anthropic.com/v1/messages/batches/{batch_id}"

  # Start the timer
  start_time = time.perf_counter()

  while True:
      poll_response = requests.get(poll_url, headers=headers)
      poll_data = poll_response.json()
      status = poll_data["processing_status"]
      print("Current Status:", status)
      if status == "ended":
          results_url = poll_data["results_url"]
          break
      time.sleep(30)  # Wait 30 seconds and retry

  # Stop the timer
  end_time = time.perf_counter()
  elapsed = end_time - start_time

  print(f"Batch complete! Total time: {elapsed / 60:.2f} minutes")
  print("Batch complete! Download results at:", results_url)

  download_results(API_KEY, results_url, output_file = "results.json")

In [None]:
import json
import re
import pandas as pd

def process_json(file_name="results.json", triple_list=None):
    with open(file_name, "r", encoding="utf-8") as f:
        lines = f.readlines()

    results = []
    for idx, line in enumerate(lines):
        if not line.strip():
            continue

        try:
            data = json.loads(line)
            custom_id = data.get("custom_id", f"judge-{idx}")

            stop_reason = data.get("result", {}).get("message", {}).get("stop_reason", "")
            if stop_reason != "refusal":
              # Extract label and explanation JSON from the model output
              content = data["result"]["message"]["content"][0]["text"]
              match = re.search(r"\{.*\}", content, re.DOTALL)
              if match:
                  inner_json = json.loads(match.group(0))
                  label = inner_json.get("label", "")
                  explanation = inner_json.get("explanation", "")
              else:
                  label, explanation = "", ""

              # Retrieve question, model-generated answer, and ground-truth answer if provided
              if triple_list and idx < len(triple_list):
                  question, model_answer, groundtruth_answer = triple_list[idx]
              else:
                  question, model_answer, groundtruth_answer = "", "", ""
            #refusal
            else:
                question, model_answer, groundtruth_answer = triple_list[idx]
                label = "Refusal"
                explanation = "Model refused to generate output due to safety filters."

            results.append({
                "question": question,
                "model-generated answer": model_answer,
                "groundtruth answer": groundtruth_answer,
                "LLM-as-a-judge label": label,
                "LLM-as-a-judge explanation": explanation
            })

        except Exception as e:
            results.append({"id": f"error-{idx}", "error": str(e)})

    # Convert to DataFrame and save
    df = pd.DataFrame(results)
    df.to_csv("processed_results.tsv", sep="\t", index=True, index_label="row_id", encoding="utf-8", quoting=3)
    print("Results saved → processed_results.tsv")

In [None]:
API_KEY = 
API_URL = "https://api.anthropic.com/v1/messages/batches"

# Send request to Batch API
headers = {
  "x-api-key": API_KEY,
  "anthropic-version": "2023-06-01",
  "content-type": "application/json"
}

folder_path = "/content/drive/MyDrive/GenerationRAG/"
file_name = "hotpotqa_answers_contriever_qwen3_4b_1200rows_final_cut.csv"
ds = read_dataset(folder_path + file_name)

#filtered_dataset = load_hf_dataset(ds_name, subset_name, split_name)
#filtered_dataset = filtered_dataset.select(range(1000))
triple_list = create_triples(ds)
batch_requests = batch_prompting_model(triple_list)

result = send_batch_request(API_KEY, API_URL, batch_requests)
poll_batchcompletion(result, headers)

#processing the json file returned from LLM-as-a-judge process
file_name = "results.json"
process_json(file_name, triple_list)