# üìö Rule & Question Explorer

Interactive notebook to explore questions, results, and rules from the self-regulated pipeline.

In [1]:
import os
import json
import xml.etree.ElementTree as ET
from pathlib import Path
from IPython.display import display, HTML, Markdown
import ipywidgets as widgets

# Configuration
CHECKPOINT_DIR = "/root/hsin_research/ruledistill-main/data/checkpoints"
DATASET_PATH = "/root/hsin_research/FinQA-main/dataset/train.json"

In [6]:
# Read out the first item in the list of objects in the train.json
import json
with open(DATASET_PATH, 'r') as f:
    data = json.load(f)
    
first_item = data[0]
first_item



{'pre_text': ['interest rate to a variable interest rate based on the three-month libor plus 2.05% ( 2.05 % ) ( 2.34% ( 2.34 % ) as of october 31 , 2009 ) .',
  'if libor changes by 100 basis points , our annual interest expense would change by $ 3.8 million .',
  'foreign currency exposure as more fully described in note 2i .',
  'in the notes to consolidated financial statements contained in item 8 of this annual report on form 10-k , we regularly hedge our non-u.s .',
  'dollar-based exposures by entering into forward foreign currency exchange contracts .',
  'the terms of these contracts are for periods matching the duration of the underlying exposure and generally range from one month to twelve months .',
  'currently , our largest foreign currency exposure is the euro , primarily because our european operations have the highest proportion of our local currency denominated expenses .',
  'relative to foreign currency exposures existing at october 31 , 2009 and november 1 , 2008 , 

## 1. Load Data

In [2]:
def load_all_results(checkpoint_dir):
    """Load all batch results into a list."""
    results = []
    result_files = sorted(Path(checkpoint_dir).glob("results_batch_*.jsonl"))
    
    for rf in result_files:
        batch_num = int(rf.stem.split('_')[-1])
        with open(rf, 'r') as f:
            for line in f:
                if line.strip():
                    item = json.loads(line)
                    item['batch_num'] = batch_num
                    results.append(item)
    
    return results

def load_rulebook(filepath):
    """Parse rulebook XML and return list of rules."""
    try:
        with open(filepath, 'r') as f:
            content = f.read()
        root = ET.fromstring(content)
        rules = []
        for rule in root.findall('.//Rule'):
            rules.append({
                'id': rule.get('id', ''),
                'type': rule.get('type', ''),
                'source': rule.get('source', ''),
                'trigger': rule.find('Trigger').text if rule.find('Trigger') is not None else '',
                'action': rule.find('Action').text if rule.find('Action') is not None else ''
            })
        return rules
    except Exception as e:
        print(f"Error loading {filepath}: {e}")
        return []

def load_all_rulebooks(checkpoint_dir):
    """Load all rulebook versions."""
    rulebooks = {}
    rb_files = sorted(Path(checkpoint_dir).glob("rulebook_batch_*.xml"))
    for rf in rb_files:
        batch_num = int(rf.stem.split('_')[-1])
        rulebooks[batch_num] = load_rulebook(str(rf))
    return rulebooks

def load_metrics(checkpoint_dir):
    """Load metrics history."""
    metrics_file = os.path.join(checkpoint_dir, "metrics.jsonl")
    metrics = []
    if os.path.exists(metrics_file):
        with open(metrics_file, 'r') as f:
            for line in f:
                if line.strip():
                    metrics.append(json.loads(line))
    return metrics

# Load everything
print("Loading data...")
all_results = load_all_results(CHECKPOINT_DIR)
all_rulebooks = load_all_rulebooks(CHECKPOINT_DIR)
all_metrics = load_metrics(CHECKPOINT_DIR)

print(f"‚úì Loaded {len(all_results)} question results")
print(f"‚úì Loaded {len(all_rulebooks)} rulebook versions")
print(f"‚úì Loaded {len(all_metrics)} batch metrics")

# Show sample keys
if all_results:
    print(f"\nAvailable fields: {list(all_results[0].keys())}")

Loading data...
‚úì Loaded 50 question results
‚úì Loaded 10 rulebook versions
‚úì Loaded 10 batch metrics

Available fields: ['reasoning', 'answer', 'rules_applied', 'success', 'raw_response', 'idx', 'question', 'ground_truth', 'batch_num']


## 2. Question Explorer

In [3]:
import html as html_lib

def display_question(idx):
    """Display a single question with all details."""
    if idx < 0 or idx >= len(all_results):
        print(f"Invalid index. Valid range: 0 to {len(all_results)-1}")
        return
    
    q = all_results[idx]
    # Use 'success' field (correct field name from data)
    is_correct = q.get('success', False)
    status = "‚úÖ CORRECT" if is_correct else "‚ùå INCORRECT"
    status_color = "green" if is_correct else "red"
    
    # Get the answer (prediction) and reasoning
    answer = q.get('answer', 'N/A')
    reasoning = q.get('reasoning', '')
    raw_response = q.get('raw_response', '')
    rules_applied = q.get('rules_applied', [])
    
    # Escape HTML in content
    reasoning_safe = html_lib.escape(str(reasoning))
    raw_response_safe = html_lib.escape(str(raw_response))
    
    html = f"""
    <div style="border: 2px solid {status_color}; border-radius: 10px; padding: 15px; margin: 10px 0; background: #f9f9f9;">
        <h3 style="margin-top: 0;">Question #{idx} (Batch {q.get('batch_num', 'N/A')}) {status}</h3>
        
        <div style="background: #fff; padding: 10px; border-radius: 5px; margin: 10px 0;">
            <strong>üìù Question:</strong><br>
            <p style="font-size: 14px;">{html_lib.escape(str(q.get('question', 'N/A')))}</p>
        </div>
        
        <div style="display: flex; gap: 20px;">
            <div style="flex: 1; background: #e8f5e9; padding: 10px; border-radius: 5px;">
                <strong>üéØ Ground Truth:</strong><br>
                <code style="font-size: 16px;">{html_lib.escape(str(q.get('ground_truth', 'N/A')))}</code>
            </div>
            <div style="flex: 1; background: {'#e8f5e9' if is_correct else '#ffebee'}; padding: 10px; border-radius: 5px;">
                <strong>ü§ñ Model Answer:</strong><br>
                <code style="font-size: 16px;">{html_lib.escape(str(answer))}</code>
            </div>
        </div>
        
        <div style="margin-top: 10px; background: #e3f2fd; padding: 8px; border-radius: 5px;">
            <strong>üìò Rules Applied:</strong> {', '.join(rules_applied) if rules_applied else 'None parsed'}
        </div>
        
        <details style="margin-top: 15px;">
            <summary style="cursor: pointer; font-weight: bold;">üí≠ Model Reasoning (click to expand)</summary>
            <pre style="background: #fff3e0; padding: 10px; border-radius: 5px; max-height: 400px; overflow-y: auto; font-size: 12px; white-space: pre-wrap;">{reasoning_safe}</pre>
        </details>
        
        <details style="margin-top: 10px;">
            <summary style="cursor: pointer; font-weight: bold;">üìÑ Full Raw Response (click to expand)</summary>
            <pre style="background: #f5f5f5; padding: 10px; border-radius: 5px; max-height: 400px; overflow-y: auto; font-size: 11px; white-space: pre-wrap;">{raw_response_safe}</pre>
        </details>
    </div>
    """
    display(HTML(html))

# Interactive slider
question_slider = widgets.IntSlider(
    value=0,
    min=0,
    max=len(all_results)-1 if all_results else 0,
    step=1,
    description='Question #:',
    continuous_update=False,
    layout=widgets.Layout(width='80%')
)

output = widgets.Output()

def update_display(change):
    from IPython.display import clear_output
    with output:
        clear_output(wait=True)
        display_question(question_slider.value)

question_slider.observe(update_display, names='value')

display(widgets.VBox([question_slider, output]))
display_question(0)

VBox(children=(IntSlider(value=0, continuous_update=False, description='Question #:', layout=Layout(width='80%‚Ä¶

## 3. Rule Explorer

In [None]:
def display_rule(batch_num, rule_idx):
    """Display a single rule with styling."""
    if batch_num not in all_rulebooks:
        print(f"Batch {batch_num} not found. Available: {list(all_rulebooks.keys())}")
        return
    
    rules = all_rulebooks[batch_num]
    if rule_idx < 0 or rule_idx >= len(rules):
        print(f"Invalid rule index. Valid range: 0 to {len(rules)-1}")
        return
    
    rule = rules[rule_idx]
    
    html = f"""
    <div style="border: 2px solid #2196f3; border-radius: 10px; padding: 15px; margin: 10px 0; background: #e3f2fd;">
        <h3 style="margin-top: 0;">üìò Rule {rule['id']} (Batch {batch_num})</h3>
        
        <div style="margin-bottom: 10px;">
            <span style="background: #1976d2; color: white; padding: 3px 8px; border-radius: 4px; font-size: 12px;">
                {rule['type']}
            </span>
            <span style="background: #ff9800; color: white; padding: 3px 8px; border-radius: 4px; font-size: 12px; margin-left: 5px;">
                {rule['source']}
            </span>
        </div>
        
        <div style="background: #fff; padding: 12px; border-radius: 5px; margin: 10px 0;">
            <strong>üéØ Trigger:</strong><br>
            <p style="font-size: 14px; margin: 5px 0;">{rule['trigger']}</p>
        </div>
        
        <div style="background: #fff; padding: 12px; border-radius: 5px;">
            <strong>‚ö° Action:</strong><br>
            <p style="font-size: 14px; margin: 5px 0;">{rule['action']}</p>
        </div>
    </div>
    """
    display(HTML(html))

def display_all_rules_for_batch(batch_num):
    """Display all rules for a given batch."""
    if batch_num not in all_rulebooks:
        print(f"Batch {batch_num} not found.")
        return
    
    rules = all_rulebooks[batch_num]
    print(f"\nüìö Rulebook for Batch {batch_num} ({len(rules)} rules)\n" + "="*50)
    
    for i, rule in enumerate(rules):
        display_rule(batch_num, i)

# Get available batches
available_batches = sorted(all_rulebooks.keys())
print(f"Available batches: {available_batches}")

if available_batches:
    batch_dropdown = widgets.Dropdown(
        options=available_batches,
        value=available_batches[-1],
        description='Batch:'
    )
    
    max_rules = max(len(all_rulebooks[b]) for b in available_batches) - 1
    rule_slider = widgets.IntSlider(
        value=0,
        min=0,
        max=max_rules,
        step=1,
        description='Rule #:',
        continuous_update=False,
        layout=widgets.Layout(width='80%')
    )
    
    rule_output = widgets.Output()
    
    def update_rule_display(change):
        from IPython.display import clear_output
        with rule_output:
            clear_output(wait=True)
            display_rule(batch_dropdown.value, rule_slider.value)
    
    batch_dropdown.observe(update_rule_display, names='value')
    rule_slider.observe(update_rule_display, names='value')
    
    display(widgets.VBox([batch_dropdown, rule_slider, rule_output]))
    display_rule(available_batches[-1], 0)

## 4. Filter Questions by Correctness

In [None]:
# Get correct and incorrect questions (using 'success' field)
correct_questions = [q for q in all_results if q.get('success', False)]
incorrect_questions = [q for q in all_results if not q.get('success', False)]

print(f"‚úÖ Correct: {len(correct_questions)}")
print(f"‚ùå Incorrect: {len(incorrect_questions)}")
print(f"üìä Accuracy: {len(correct_questions)/len(all_results)*100:.1f}%" if all_results else "No data")

In [None]:
# Browse incorrect questions only
print("\n‚ùå INCORRECT QUESTIONS BROWSER\n" + "="*50)

if incorrect_questions:
    incorrect_slider = widgets.IntSlider(
        value=0,
        min=0,
        max=len(incorrect_questions)-1,
        step=1,
        description='Incorrect #:',
        continuous_update=False,
        layout=widgets.Layout(width='80%')
    )
    
    incorrect_output = widgets.Output()
    
    def show_incorrect(idx):
        q = incorrect_questions[idx]
        original_idx = all_results.index(q)
        display_question(original_idx)
    
    def update_incorrect(change):
        from IPython.display import clear_output
        with incorrect_output:
            clear_output(wait=True)
            show_incorrect(incorrect_slider.value)
    
    incorrect_slider.observe(update_incorrect, names='value')
    display(widgets.VBox([incorrect_slider, incorrect_output]))
    show_incorrect(0)
else:
    print("No incorrect questions found!")

## 5. Batch Comparison

In [None]:
def compare_batches(batch1, batch2):
    """Compare rules between two batches."""
    rules1 = {r['id']: r for r in all_rulebooks.get(batch1, [])}
    rules2 = {r['id']: r for r in all_rulebooks.get(batch2, [])}
    
    ids1 = set(rules1.keys())
    ids2 = set(rules2.keys())
    
    added = ids2 - ids1
    removed = ids1 - ids2
    common = ids1 & ids2
    
    changed = []
    for rid in common:
        if rules1[rid]['trigger'] != rules2[rid]['trigger'] or rules1[rid]['action'] != rules2[rid]['action']:
            changed.append(rid)
    
    print(f"\nüìä Batch {batch1} ‚Üí Batch {batch2} Comparison")
    print("="*50)
    print(f"‚ûï Added: {len(added)} rules {list(added) if added else ''}")
    print(f"‚ûñ Removed: {len(removed)} rules {list(removed) if removed else ''}")
    print(f"‚úèÔ∏è Changed: {len(changed)} rules {changed if changed else ''}")
    print(f"üìå Unchanged: {len(common) - len(changed)} rules")
    
    if added:
        print("\n--- NEW RULES ---")
        for rid in added:
            r = rules2[rid]
            print(f"\n[{rid}] {r['type']}")
            print(f"  Trigger: {r['trigger']}")
            print(f"  Action: {r['action'][:100]}...")

# Compare consecutive batches
if len(available_batches) >= 2:
    compare_batches(available_batches[-2], available_batches[-1])

## 6. Search Questions

In [None]:
def search_questions(keyword, field='question'):
    """Search questions containing a keyword."""
    matches = []
    keyword_lower = keyword.lower()
    
    for i, q in enumerate(all_results):
        text = str(q.get(field, '')).lower()
        if keyword_lower in text:
            matches.append((i, q))
    
    return matches

# Example: search for percentage questions
results = search_questions("percentage")
print(f"Found {len(results)} questions containing 'percentage'")

# Show first few matches
for idx, q in results[:3]:
    status = "‚úÖ" if q.get('success') else "‚ùå"
    print(f"  [{idx}] {status} {q.get('question', '')[:80]}...")

In [None]:
# Interactive search
search_box = widgets.Text(
    value='',
    placeholder='Enter keyword to search...',
    description='Search:',
    layout=widgets.Layout(width='50%')
)

search_output = widgets.Output()

def do_search(change):
    from IPython.display import clear_output
    with search_output:
        clear_output(wait=True)
        if search_box.value:
            results = search_questions(search_box.value)
            print(f"Found {len(results)} matches for '{search_box.value}'")
            print("="*50)
            for idx, q in results[:10]:
                status = "‚úÖ" if q.get('success') else "‚ùå"
                print(f"[{idx}] {status} {q.get('question', '')[:70]}...")
            if len(results) > 10:
                print(f"... and {len(results)-10} more")

search_box.observe(do_search, names='value')
display(widgets.VBox([search_box, search_output]))

## 7. Quick Stats

In [None]:
import matplotlib.pyplot as plt

# Accuracy per batch
if all_metrics:
    batches = [m.get('batch_num', i) for i, m in enumerate(all_metrics)]
    accuracies = [m.get('accuracy', 0) for m in all_metrics]
    
    plt.figure(figsize=(10, 4))
    plt.plot(batches, accuracies, 'g-o', linewidth=2)
    plt.fill_between(batches, accuracies, alpha=0.3, color='green')
    plt.xlabel('Batch')
    plt.ylabel('Accuracy')
    plt.title('Accuracy by Batch')
    plt.ylim(0, 1)
    plt.grid(True, alpha=0.3)
    plt.show()

## 8. Export Functions

In [None]:
def export_incorrect_to_csv(output_path="incorrect_questions.csv"):
    """Export incorrect questions to CSV."""
    import csv
    
    with open(output_path, 'w', newline='', encoding='utf-8') as f:
        writer = csv.writer(f)
        writer.writerow(['batch', 'question', 'ground_truth', 'model_answer', 'reasoning'])
        
        for q in incorrect_questions:
            writer.writerow([
                q.get('batch_num', ''),
                q.get('question', ''),
                q.get('ground_truth', ''),
                q.get('answer', ''),
                q.get('reasoning', '')[:500]  # Truncate reasoning
            ])
    
    print(f"Exported {len(incorrect_questions)} incorrect questions to {output_path}")

# Uncomment to export:
# export_incorrect_to_csv()

## 9. View Specific Question by Index

In [None]:
# Quick function to view any question by index
def view(idx):
    """View a specific question by index."""
    display_question(idx)

# Example usage:
# view(163)  # View question #163