In [1]:
import json
import math
import random
import re
import numpy as np

In [2]:
# Path to your PlausibleQA sample dataset
DATA_PATH = './data/random_test_data/verified_w_candidates_from_3000.json'
OUTPUT_DIR = "./data/advanced/"

def load_data(path):
    with open(path, 'r', encoding="utf-8") as f:
        return json.load(f)

# Load once
data = load_data(DATA_PATH)
print(f"Loaded {len(data)} questions")

Loaded 1266 questions


In [3]:
CJK_RE = re.compile(r'[\u4E00-\u9FFF]')
EMOJI_RE = re.compile(
    '['
    '\U0001F300-\U0001F5FF'  # symbols & pictographs
    '\U0001F600-\U0001F64F'  # emoticons
    '\U0001F680-\U0001F6FF'  # transport & map symbols
    '\U0001F700-\U0001F77F'  # alchemical symbols
    '\U0001F780-\U0001F7FF'  # geometric shapes extended
    '\U0001F800-\U0001F8FF'  # supplemental arrows-C
    ']'
)
# Matches *any* non-ASCII byte
NON_ASCII_RE = re.compile(r'[^\x00-\x7F]')

def is_valid_answer(ans: str) -> bool:
    """
    Returns False if ans:
      - is not a non-empty str
      - contains any CJK character
      - contains any emoji
      - contains any non-ASCII character
    """
    if not isinstance(ans, str) or not ans.strip():
        return False
    if CJK_RE.search(ans):
        return False
    if EMOJI_RE.search(ans):
        return False
    if NON_ASCII_RE.search(ans):
        return False
    return True

In [4]:
def normalize_scores(candidate_answers):
    """Normalize raw listwise scores so they sum to 1.""" 
    raw = [info['listwise'] for info in candidate_answers.values()]
    total = sum(raw)
    return [s/total if total>0 else 0 for s in raw]

def compute_entropy(probs):
    """Shannon entropy in bits."""
    return -sum(p * math.log2(p) for p in probs if p>0)


In [None]:
def threshold_classification(questions):
    confusing, non_confusing = [], []
    for q in questions:
        if not is_valid_answer(q['answer']):
            continue
        for c in q['candidate_answers'].values():
            if not is_valid_answer(c):
                continue
        if q["id"].startswith("nq"):
            continue
        probs = normalize_scores(q['candidate_answers'])
        top_two = sorted(probs, reverse=True)[:2]
        if top_two[0] >= 0.5 or sum(top_two) >= 0.5:
            confusing.append(q)
        elif top_two[0] <= 0.15 or sum(top_two) <= 0.2:
            non_confusing.append(q)
    return confusing, non_confusing

thr_conf, thr_non = threshold_classification(data)
print(f"[Threshold] Confusing: {len(thr_conf)}, Non-confusing: {len(thr_non)}")

def entropy_classification(questions, h_pct=50, s_pct=50, fallback_pct=25):
    """
    Returns two lists: confusing, non_confusing
    
    confusing     = {q | S >= S_thresh and H >= H_thresh}
    non_confusing = {q | S >= S_thresh and H <  H_thresh}
    
    Where S_thresh = percentile(S_vals, s_pct)
          H_thresh = percentile(H_vals, h_pct)
          
    Prints distribution stats so you can choose sensible cutoffs.
    If either pool is empty, retries with fallback_pct.
    """
    records = []
    for q in questions:
        if not is_valid_answer(q['answer']): continue
        for cand_text in q['candidate_answers'].keys():
            if not is_valid_answer(cand_text):
                continue   # now this really checks the string "40", "20", etc.

        #if q["id"].startswith("nq"): continue

        # Raw scores and sum
        scores = [info['listwise'] for info in q['candidate_answers'].values()]
        S = sum(scores)
        
        # Normalize and compute entropy
        total = float(S) if S>0 else 1.0
        probs = [s / total for s in scores]
        H = compute_entropy(probs)

        item = q.copy()
        item.pop("pairwise", None)
        records.append((item, S, H))

    if not records:
        print("No valid records found.")
        return [], []

    # Gather S and H arrays
    S_vals = np.array([r[1] for r in records])
    H_vals = np.array([r[2] for r in records])

    # Print distribution summaries
    for name, arr in [("S", S_vals), ("H", H_vals)]:
        p0, p25, p50, p75, p100 = np.percentile(arr, [0,25,50,75,100])
        print(f"{name}-percentiles → min:{p0:.2f}, 25th:{p25:.2f}, median:{p50:.2f}, 75th:{p75:.2f}, max:{p100:.2f}")

    # Compute thresholds
    S_thresh = np.percentile(S_vals, s_pct)
    H_thresh = np.percentile(H_vals, h_pct)
    print(f"Using S_thresh={S_thresh:.2f} (pct={s_pct}), H_thresh={H_thresh:.2f} (pct={h_pct})")

    # Build strata
    confusing = [q for (q,S,H) in records if S >= S_thresh and H >= H_thresh]
    non_conf  = [q for (q,S,H) in records if S >= S_thresh and H <  H_thresh]

    # If either is empty, retry with fallback_pct
    if not confusing or not non_conf:
        print("Empty stratum detected, retrying thresholds at", fallback_pct, "percentile.")
        return entropy_classification(questions, h_pct=fallback_pct, s_pct=fallback_pct, fallback_pct=fallback_pct)

    print(f"[Entropy+Sum] Confusing: {len(confusing)}, Non-confusing: {len(non_conf)}")
    return confusing, non_conf

ent_conf, ent_non = entropy_classification(data)

[Threshold] Confusing: 75, Non-confusing: 76
S-percentiles → min:32.00, 25th:156.00, median:254.00, 75th:380.00, max:825.00
H-percentiles → min:1.03, 25th:3.00, median:3.13, 75th:3.20, max:3.32
Using S_thresh=254.00 (pct=50), H_thresh=3.13 (pct=50)
[Entropy+Sum] Confusing: 344, Non-confusing: 283


In [11]:
def sample_questions(q_list, k=5, seed=42):
    random.seed(seed)
    sample = q_list if len(q_list) < k else random.sample(q_list, k)
    if len(q_list) < k:
        print(f"Warning: only {len(q_list)} available; returning all.")
    return sample

sample_size = 50

entropy_conf_samp = sample_questions(ent_conf, sample_size)
entropy_non_samp  = sample_questions(ent_non, sample_size)

with open(OUTPUT_DIR + f'entropy_{sample_size}_samples.json', 'w', encoding="utf-8") as f:
    json.dump({'confusing': entropy_conf_samp, 'non_confusing': entropy_non_samp}, f, indent=2, ensure_ascii=False)

# Save to JSON
""" with open(OUTPUT_DIR + f'threshold_{sample_size}_samples.json', 'w', encoding="utf-8") as f:
    json.dump({'confusing': thr_conf_samp, 'non_confusing': thr_non_samp}, f, indent=2, ensure_ascii=True)
print(f"Saved threshold_{sample_size}_samples.json") """
""" 
# Sample for entropy method
ent_conf_samp = sample_questions(ent_conf, sample_size)
ent_non_samp  = sample_questions(ent_non, sample_size)
# Save to JSON
with open(OUTPUT_DIR + f'entropy_{sample_size}_samples.json', 'w', encoding="utf-8") as f:
    json.dump({'confusing': ent_conf_samp, 'non_confusing': ent_non_samp}, f, indent=2, ensure_ascii=False)
print(f"Saved entropy_{sample_size}_samples.json") """


' \n# Sample for entropy method\nent_conf_samp = sample_questions(ent_conf, sample_size)\nent_non_samp  = sample_questions(ent_non, sample_size)\n# Save to JSON\nwith open(OUTPUT_DIR + f\'entropy_{sample_size}_samples.json\', \'w\', encoding="utf-8") as f:\n    json.dump({\'confusing\': ent_conf_samp, \'non_confusing\': ent_non_samp}, f, indent=2, ensure_ascii=False)\nprint(f"Saved entropy_{sample_size}_samples.json") '