# Rule Translation Reviewer

Use this notebook to step through each guideline clause, inspect its ASP translation, provide a rating (0–3), optionally add comments, and export the evaluation summary to CSV.



In [8]:
from pathlib import Path
import html

import pandas as pd
import yaml
import ipywidgets as widgets
from IPython.display import display

import sys
sys.path.append('..')
from src.review.review_data import build_rule_review_dataset
from src.processing.ASPRuleParser import ASPRuleParser
widgets.HTML("<b>widgets ok</b>")

HTML(value='<b>widgets ok</b>')

In [9]:
PROJECT_ROOT = Path.cwd().parent
CONFIG_PATH = PROJECT_ROOT / "src/configs/config.yaml"

with open(CONFIG_PATH, "r", encoding="utf-8") as f:
    config = yaml.safe_load(f)

guideline_path = PROJECT_ROOT / config["input_files"]["problem_text"]
lp_file_path = PROJECT_ROOT / config["experiment"]["output_dir"] / "rulegen_response_fired.lp"
review_csv_path = PROJECT_ROOT / config["experiment"]["output_dir"] / "rule_review.csv"

dataset = build_rule_review_dataset(str(guideline_path), str(lp_file_path))

# Expand dataset: for each guideline with multiple ASP rules, create separate rows
# so we can review each ASP rule individually
expanded_records = []
for item in dataset:
    asp_rules = item.asp_rules
    if not asp_rules:
        # No translation - still show the guideline
        expanded_records.append({
            "guideline_id": item.guideline_id,
            "guideline_text": item.guideline_text,
            "asp_rule_id": None,
            "asp_rule_text": None,
            "asp_rule_index": 0,
            "total_asp_rules": 0,
        })
    else:
        # Show guideline once, then each ASP rule individually
        for idx, rule in enumerate(asp_rules):
            expanded_records.append({
                "guideline_id": item.guideline_id,
                "guideline_text": item.guideline_text,
                "asp_rule_id": rule["rule_id"],
                "asp_rule_text": rule["rule_text"],
                "asp_rule_index": idx,
                "total_asp_rules": len(asp_rules),
            })

df = pd.DataFrame(expanded_records)
df["rating"] = None
df["comment"] = ""

# Try to load existing review if it exists
if review_csv_path.exists():
    try:
        existing_df = pd.read_csv(review_csv_path)
        # Fill NaN values in existing_df for comparison
        existing_df["asp_rule_id"] = existing_df["asp_rule_id"].fillna("")
        
        # Merge existing ratings and comments by matching guideline_id and asp_rule_id
        loaded_count = 0
        # Normalize existing_df for comparison
        existing_df["asp_rule_id"] = existing_df["asp_rule_id"].fillna("").astype(str)
        existing_df["guideline_id"] = existing_df["guideline_id"].astype(str)
        
        for idx, row in df.iterrows():
            current_asp_rule_id = str(row["asp_rule_id"]) if pd.notna(row["asp_rule_id"]) else ""
            current_guideline_id = str(row["guideline_id"])
            
            match = existing_df[
                (existing_df["guideline_id"] == current_guideline_id) &
                (existing_df["asp_rule_id"] == current_asp_rule_id)
            ]
            if not match.empty:
                match_row = match.iloc[0]
                rating_val = match_row.get("rating")
                comment_val = match_row.get("comment", "")
                
                if pd.notna(rating_val):
                    try:
                        df.at[idx, "rating"] = int(float(rating_val))
                    except (ValueError, TypeError):
                        pass
                if pd.notna(comment_val) and str(comment_val).strip() != "nan":
                    df.at[idx, "comment"] = str(comment_val)
                loaded_count += 1
        if loaded_count > 0:
            display(f"Loaded existing review from {review_csv_path} ({loaded_count} items)")
    except Exception as e:
        display(f"Could not load existing review: {e}")

translations_count = df["asp_rule_id"].notna().sum()
display(f"Loaded {len(df)} review items ({len(dataset)} guidelines, {translations_count} ASP rule translations).")



'Loaded 52 review items (25 guidelines, 52 ASP rule translations).'

In [10]:
rating_labels = {
    0: "0 – Completely wrong interpretation",
    1: "1 – Completely right",
    2: "2 – Key information missed",
    3: "3 – Information hallucinated",
}

rating_options = [("Select rating", None)] + [
    (label, value) for value, label in rating_labels.items()
]

asp_parser = ASPRuleParser() 

current_index = 0
rule_display = widgets.HTML(layout=widgets.Layout(width="100%"))
rating_dropdown = widgets.Dropdown(options=rating_options, value=None, description="Rating:")
comment_box = widgets.Textarea(
    value="",
    description="Comment:",
    placeholder="Optional notes...",
    layout=widgets.Layout(width="100%", height="100px"),
)
progress_label = widgets.Label()
output_area = widgets.Output()

prev_button = widgets.Button(description="◀ Previous", button_style="info")
next_button = widgets.Button(description="Next ▶", button_style="info")
save_button = widgets.Button(description="Save to CSV", button_style="success")
summary_button = widgets.Button(description="Show Summary", button_style="warning")


def _format_html_text(text: str) -> str:
    escaped = html.escape(text)
    return escaped.replace("\n", "<br>")


def build_rule_html(row: pd.Series) -> str:
    guideline_html = f"<h3>Guideline {row['guideline_id']}</h3><p>{_format_html_text(row['guideline_text'])}</p>"
    # Show the specific ASP rule being reviewed
    asp_rule_id = row.get("asp_rule_id")
    asp_rule_text = row.get("asp_rule_text")
    total_asp_rules = row.get("total_asp_rules", 0)
    asp_rule_index = row.get("asp_rule_index", 0)
    
    if pd.notna(asp_rule_id) and asp_rule_text:
        # Show which rule we're reviewing if there are multiple
        rule_counter = ""
        if total_asp_rules > 1:
            rule_counter = f" (Rule {asp_rule_index + 1} of {total_asp_rules})"
        
        rule_id_escaped = html.escape(str(asp_rule_id))
        rule_text_escaped = _format_html_text(str(asp_rule_text))
        
        # Generate natural language explanation
        try:
            nl_explanation = asp_parser.explain_rule(str(asp_rule_text), str(asp_rule_id))
            nl_explanation_escaped = _format_html_text(nl_explanation)
            nl_section = f"<div style='margin-top:12px; padding:12px; background-color:#e8f4f8; border-left: 4px solid #2196F3;'><strong>Natural Language:</strong><br>{nl_explanation_escaped}</div>"
        except Exception as e:
            nl_section = f"<div style='margin-top:12px; padding:8px; background-color:#ffebee; border-left: 4px solid #f44336;'><em>Error parsing rule: {html.escape(str(e))}</em></div>"
        
        asp_html = f"""
        <h4>ASP Translation{rule_counter}</h4>
        <div style='margin:10px 0;'>
            <code>{rule_id_escaped}</code><br>
            <div style='margin-top:8px; font-family:monospace; padding:8px; background-color:#f5f5f5;'>
                {rule_text_escaped}
            </div>
            {nl_section}
        </div>
        """
    else:
        asp_html = "<p><strong>No translation found.</strong></p>"

    return guideline_html + asp_html


def persist_current_state():
    rating_val = rating_dropdown.value
    comment_val = comment_box.value.strip() if comment_box.value else ""
    df.at[current_index, "rating"] = rating_val
    df.at[current_index, "comment"] = comment_val


def update_view():
    row = df.iloc[current_index]
    rule_display.value = build_rule_html(row)
    rating_value = row["rating"]
    if pd.isna(rating_value):
        rating_value = None
    rating_dropdown.value = rating_value
    comment_box.value = str(row["comment"]) if pd.notna(row["comment"]) else ""
    progress_label.value = f"Item {current_index + 1} of {len(df)}"
    prev_button.disabled = current_index == 0
    next_button.disabled = current_index == len(df) - 1


def on_prev(_):
    global current_index
    persist_current_state()
    if current_index > 0:
        current_index -= 1
    update_view()


def on_next(_):
    global current_index
    persist_current_state()
    if current_index < len(df) - 1:
        current_index += 1
    update_view()


def prepare_export_dataframe() -> pd.DataFrame:
    export_df = df.copy()
    # Convert None to empty string for CSV export
    export_df["asp_rule_id"] = export_df["asp_rule_id"].fillna("")
    export_df["asp_rule_text"] = export_df["asp_rule_text"].fillna("")
    export_df["comment"] = export_df["comment"].fillna("")
    return export_df


def on_save(_):
    persist_current_state()
    export_df = prepare_export_dataframe()
    export_df.to_csv(review_csv_path, index=False)
    with output_area:
        output_area.clear_output()
        print(f"Saved {len(export_df)} rows to {review_csv_path}")


def on_summary(_):
    persist_current_state()
    rated = df.dropna(subset=["rating"])
    with output_area:
        output_area.clear_output()
        if rated.empty:
            print("No ratings captured yet.")
            return

        summary = (
            rated.groupby("rating").size().reindex(sorted(rating_labels.keys()), fill_value=0)
        )
        total = summary.sum()
        summary_df = pd.DataFrame({
            "rating": summary.index,
            "description": [rating_labels[idx] for idx in summary.index],
            "count": summary.values,
            "percentage": (summary.values / total * 100).round(2),
        })
        display(summary_df)

        # Filter comments - handle NaN and empty strings
        def has_comment(val):
            if pd.isna(val):
                return False
            s = str(val).strip()
            return s != "" and s.lower() != "nan"
        
        comments_df = rated[rated["comment"].apply(has_comment)]
        if not comments_df.empty:
            display(comments_df[["guideline_id", "asp_rule_id", "comment"]])


prev_button.on_click(on_prev)
next_button.on_click(on_next)
save_button.on_click(on_save)
summary_button.on_click(on_summary)

controls = widgets.VBox([
    rule_display,
    widgets.HBox([rating_dropdown, progress_label]),
    comment_box,
    widgets.HBox([prev_button, next_button, save_button, summary_button]),
    output_area,
])

display(controls)
update_view()



VBox(children=(HTML(value='', layout=Layout(width='100%')), HBox(children=(Dropdown(description='Rating:', opt…

In [4]:
from src.processing.RuleProcessor import RuleProcessor

rp = RuleProcessor('/home/ashvingupta/Documents/PhD/Projects/NICE2ASP2/src/input_files/input_guidelines/pancreatic_cancer_guidelines.txt')

# Check what got parsed for rule 1.1.1
print("Rule 1.1.1:")
print(rp.guideline_text.get('1.1.1', 'NOT FOUND'))
print("\n" + "="*80 + "\n")

# Check what got parsed for rule 1.1.4
print("Rule 1.1.4:")
print(rp.guideline_text.get('1.1.4', 'NOT FOUND'))

Rule 1.1.1:
People with obstructive jaundice

For people with obstructive jaundice and suspected pancreatic cancer, offer a pancreatic protocol CT scan before draining the bile duct


Rule 1.1.4:
People without jaundice who have pancreatic abnormalities on imaging

Offer a pancreatic protocol CT scan to people with pancreatic abnormalities but no jaundice
