In [18]:
%%writefile rule_engine.py
from dataclasses import dataclass
from datetime import date
from typing import List, Optional, Dict

# ----------------- Data models -----------------

@dataclass
class RuleRow:
    icd: str
    title: str
    group: str
    eligibility: str                 # "BVB" | "LHB" | "NONE"
    requires_second_icd: bool
    second_icd_hint: str
    acute_window_months: Optional[int]
    notes: str
    source_url: str
    source_version: str

@dataclass
class PatientContext:
    icds: List[str]
    acute_event_date: Optional[date] = None

@dataclass
class EligibilityResult:
    icd: str
    eligible: bool
    kind: Optional[str]              # "BVB" | "LHB" | None
    conditions_met: Dict[str, bool]
    missing: List[str]
    explain: str
    source_version: str

# ----------------- Helpers -----------------

def months_between(d1: date, d2: date) -> int:
    """Whole months between d1 (later) and d2 (earlier)."""
    return (d1.year - d2.year) * 12 + (d1.month - d2.month) - (1 if d1.day < d2.day else 0)

# ----------------- Core rule evaluation -----------------

def check_rule(rule: RuleRow, ctx: PatientContext, today: date) -> EligibilityResult:
    conds: Dict[str, bool] = {}
    missing: List[str] = []

    # Is this ICD even listed for BVB/LHB?
    conds["is_listed"] = rule.eligibility in {"BVB", "LHB"}

    # Second ICD requirement
    if rule.requires_second_icd:
        conds["second_icd_present"] = any(i != rule.icd for i in ctx.icds)
        if not conds["second_icd_present"]:
            hint = rule.second_icd_hint or "siehe Liste"
            missing.append(f"Zweiter ICD erforderlich ({hint})")

    # Acute-event window requirement
    if rule.acute_window_months is not None:
        if ctx.acute_event_date:
            conds["acute_window_ok"] = months_between(today, ctx.acute_event_date) <= rule.acute_window_months
            if not conds["acute_window_ok"]:
                missing.append(f"Frist nach Akutereignis ≤ {rule.acute_window_months} Monate")
        else:
            conds["acute_window_ok"] = False
            missing.append("Datum des Akutereignisses erforderlich")

    eligible = all(conds.values()) and rule.eligibility in {"BVB", "LHB"}

    # Build robust, NaN-safe explanation
    title = (rule.title or "").strip()
    group = (rule.group or "").strip()
    notes = (rule.notes or "").strip()

    explain = f"{rule.icd} – {title or 'Diagnose'}: "
    explain += "qualifiziert" if eligible else "qualifiziert nicht"
    if rule.eligibility in {"BVB", "LHB"}:
        explain += f" für {rule.eligibility}"
    if group:
        explain += f" (Diagnosegruppe {group})"
    if notes:
        explain += f". {notes}"

    return EligibilityResult(
        icd=rule.icd,
        eligible=eligible,
        kind=rule.eligibility if eligible else None,
        conditions_met=conds,
        missing=missing,
        explain=explain.strip(),
        source_version=rule.source_version,
    )

def evaluate_patient(ctx: PatientContext, rules_by_icd: Dict[str, RuleRow], today: date) -> List[EligibilityResult]:
    results: List[EligibilityResult] = []
    for icd in ctx.icds:
        rule = rules_by_icd.get(icd)
        if rule:
            results.append(check_rule(rule, ctx, today))
        else:
            # ICD not found in rules - aus Version 1 übernehmen
            results.append(EligibilityResult(
                icd=icd,
                eligible=False,
                kind=None,
                conditions_met={"is_listed": False},
                missing=["ICD nicht in Diagnoseliste gefunden"],
                explain=f"{icd} - ICD nicht in der Heilmittel-Diagnoseliste",
                source_version="unknown"
            ))
    return results

# ----------------- Convenience utilities (UI/notebook) -----------------

def normalize_icds(s: str) -> List[str]:
    """Split free-text into normalized ICD codes."""
    import re
    toks = re.split(r"[,\s;]+", (s or "").strip())
    return [t.upper() for t in toks if t]

def icd_neighbors(icd: str, all_icds: List[str], k: int = 20) -> List[str]:
    """Return up to k ICDs in the same 'family' stem, e.g., R26.*."""
    if len(icd) >= 4 and icd[3] == ".":
        stem = icd[:4]
    else:
        stem = icd[:3] + "."
    return [x for x in sorted(all_icds) if x.startswith(stem)][:k]


Overwriting rule_engine.py


In [19]:
# === ZERO-INSTALL, SINGLE-CELL RUN ===
# Paths
CSV_PATH = "/kaggle/input/heilmittel-bvb/diagnoseliste_extracted.csv"

# Rest of your code unchanged...
SRC_ENGINE = "/kaggle/input/heilmittel-bvb/rule_engine.py"
DST_ENGINE = "/kaggle/working/rule_engine.py"

# 1) Ensure engine is importable
import shutil, os, pandas as pd
from datetime import date
shutil.copy(SRC_ENGINE, DST_ENGINE)

from rule_engine import RuleRow, PatientContext, evaluate_patient  # now import works

CSV_IN  = "/kaggle/input/heilmittel-bvb/diagnoseliste_extracted.csv"
CSV_OUT = "/kaggle/working/diagnoseliste_curated.csv"
VERSION = "2025-01-01"

df = pd.read_csv(CSV_IN)

need = ["icd","title","group","eligibility","requires_second_icd","second_icd_hint",
        "acute_window_months","notes","source_url","source_version"]
for c in need:
    if c not in df.columns:
        if c == "requires_second_icd": df[c] = False
        elif c == "acute_window_months": df[c] = pd.NA
        else: df[c] = ""

df["icd"] = df["icd"].astype(str).str.upper().str.strip()
for c in ["title","group","eligibility","second_icd_hint","notes","source_url","source_version"]:
    df[c] = df[c].astype(str).replace({"nan":""}).fillna("")
df["eligibility"] = df["eligibility"].str.upper().str.strip()
df["requires_second_icd"] = df["requires_second_icd"].map(lambda v: str(v).strip().lower() in {"1","true","t","yes","y"})
df["acute_window_months"] = pd.to_numeric(df["acute_window_months"], errors="coerce").astype("Int64")
df.loc[df["source_version"].eq(""), "source_version"] = VERSION

# -------- EXPLICIT OVERRIDES (edit to your truth) --------
overrides = {
    "R26.2": {"eligibility":"BVB", "title":"Gehbeschwerden", "group":"PN"},
    "R26.0": {"eligibility":"BVB", "title":"Ataktischer Gang", "group":"PN",
              "requires_second_icd": True, "second_icd_hint":"Neurologische Grunderkrankung"},
    "R26.1": {"eligibility":"BVB", "title":"Paretischer Gang", "group":"PN",
              "requires_second_icd": True, "second_icd_hint":"Neurologische Grunderkrankung"},
    # add more here as you curate…
}

for icd, vals in overrides.items():
    mask = df["icd"].eq(icd)
    if not mask.any():
        # if the ICD wasn't extracted, create a new row (optional)
        new = {k:"" for k in need}
        new.update({"icd": icd})
        df = pd.concat([df, pd.DataFrame([new])], ignore_index=True)
        mask = df["icd"].eq(icd)
    for k, v in vals.items():
        df.loc[mask, k] = v

# default any unknown eligibility to NONE (engine expects one of these)
df.loc[~df["eligibility"].isin({"BVB","LHB"}), "eligibility"] = "NONE"

# save curated copy
df.to_csv(CSV_OUT, index=False)
print("Wrote:", CSV_OUT)
print(df[df["icd"].isin(list(overrides.keys()))][["icd","title","eligibility","requires_second_icd","second_icd_hint","group","acute_window_months"]])

CSV_PATH = "/kaggle/working/diagnoseliste_curated.csv"

def load_rules(csv_path: str, version_hint: str = "2025-01-01"):
    df = pd.read_csv(csv_path)
    need = ["icd","title","group","eligibility","requires_second_icd",
            "second_icd_hint","acute_window_months","notes","source_url","source_version"]
    
    for c in need:
        if c not in df.columns:
            df[c] = "" if c not in ("requires_second_icd","acute_window_months") else (False if c=="requires_second_icd" else pd.NA)
    
    # Fix: Convert all string fields to proper strings, replacing NaN with empty strings
    string_fields = ["icd", "title", "group", "eligibility", "second_icd_hint", "notes", "source_url", "source_version"]
    for field in string_fields:
        df[field] = df[field].fillna("").astype(str).str.strip()
    
    df["icd"] = df["icd"].str.upper()
    df["eligibility"] = df["eligibility"].str.upper()
    df["requires_second_icd"] = df["requires_second_icd"].map(lambda v: str(v).strip().lower() in {"1","true","t","yes","y"})
    df["acute_window_months"] = pd.to_numeric(df["acute_window_months"], errors="coerce").astype("Int64")
    
    # Fix: Ensure source_version is properly handled
    df.loc[df["source_version"] == "", "source_version"] = version_hint
    
    rules = {r["icd"]: RuleRow(**r) for r in df.to_dict(orient="records")}
    return df, rules

df, rules = load_rules(CSV_PATH)
icd_input = "R26.2 G35 I63.9"
ctx = PatientContext(icds=[s.strip().upper() for s in icd_input.split()], acute_event_date=None)
results = evaluate_patient(ctx, rules, today=date.today())

bvb = [r.icd for r in results if r.eligible and r.kind=="BVB"]
lhb = [r.icd for r in results if r.eligible and r.kind=="LHB"]
print("BVB:", ", ".join(bvb) or "—")
print("LHB:", ", ".join(lhb) or "—")


import re
def clean_explain(txt: str) -> str:
    if not txt:
        return ""
    # strip "(Diagnosegruppe nan)" and stray "nan"
    txt = re.sub(r"\s*\(Diagnosegruppe\s+nan\)", "", txt)
    txt = re.sub(r"\s*nan\s*$", "", txt)
    return re.sub(r"\s{2,}", " ", txt).strip()

for r in results:
    msg = clean_explain(r.explain)
    print(("🟢" if r.eligible else "⚪️"), r.icd, "—", msg)
    print("  Bedingungen:", r.conditions_met)
    print("  Fehlend   :", ", ".join(r.missing) if r.missing else "—")
    print("  Version   :", r.source_version)


Wrote: /kaggle/working/diagnoseliste_curated.csv
       icd             title eligibility  requires_second_icd  \
127  R26.0  Ataktischer Gang         BVB                 True   
128  R26.1  Paretischer Gang         BVB                 True   
129  R26.2    Gehbeschwerden         BVB                False   

                   second_icd_hint group  acute_window_months  
127  Neurologische Grunderkrankung    PN                 <NA>  
128  Neurologische Grunderkrankung    PN                 <NA>  
129                                   PN                 <NA>  
BVB: R26.2, I63.9
LHB: —
🟢 R26.2 — R26.2 – Gehbeschwerden: qualifiziert für BVB (Diagnosegruppe PN)
  Bedingungen: {'is_listed': True}
  Fehlend   : —
  Version   : 2025-07-01
⚪️ G35 — G35 - ICD nicht in der Heilmittel-Diagnoseliste
  Bedingungen: {'is_listed': False}
  Fehlend   : ICD nicht in Diagnoseliste gefunden
  Version   : unknown
🟢 I63.9 — I63.9 – Hirninfarkt nicht näher bezeichnet: qualifiziert für BVB. Längstens 1 Jahr 

In [20]:
# === KAGGLE DATEN EXPORT & VALIDIERUNG ===

import pandas as pd
import os

# 1) DATEN EXPORTIEREN (für lokale Nutzung/Docker)
def export_for_production():
    # Deine aktuellen Daten laden
    df = pd.read_csv("/kaggle/input/heilmittel-bvb/diagnoseliste_extracted.csv")
    
    print("=== DATASET ANALYSE ===")
    print(f"Zeilen: {len(df)}")
    print(f"Spalten: {list(df.columns)}")
    print(f"\nEligibility Verteilung:")
    print(df['eligibility'].value_counts() if 'eligibility' in df.columns else "Keine eligibility Spalte!")
    
    # Sample anzeigen
    print(f"\n=== SAMPLE DATEN ===")
    print(df.head(10))
    
    # Nach /kaggle/working exportieren (für Download)
    df.to_csv("/kaggle/working/diagnoseliste_for_docker.csv", index=False)
    print(f"\n✅ Exportiert nach: /kaggle/working/diagnoseliste_for_docker.csv")
    
    # Zusätzlich: JSON-Format für Rule Engine
    rules_data = []
    for _, row in df.iterrows():
        rule_dict = {
            "icd": row.get('icd', ''),
            "title": row.get('title', ''),
            "group": row.get('group', ''),
            "eligibility": row.get('eligibility', 'NONE'),
            "requires_second_icd": row.get('requires_second_icd', False),
            "second_icd_hint": row.get('second_icd_hint', ''),
            "acute_window_months": row.get('acute_window_months'),
            "notes": row.get('notes', ''),
            "source_url": row.get('source_url', ''),
            "source_version": row.get('source_version', '2025-01-01')
        }
        rules_data.append(rule_dict)
    
    import json
    with open("/kaggle/working/rules.json", "w", encoding="utf-8") as f:
        json.dump(rules_data, f, ensure_ascii=False, indent=2)
    print(f"✅ JSON exportiert nach: /kaggle/working/rules.json")
    
    return df

# 2) DATENQUALITÄT PRÜFEN
def validate_bvb_data(df):
    print("\n=== BVB/LHB VALIDIERUNG ===")
    
    # Bekannte BVB-relevante ICDs (Beispiele)
    known_bvb_icds = [
        'I63.9',  # Hirninfarkt  
        'G35',    # Multiple Sklerose
        'M79.3',  # Panniculitis
        'F32.9',  # Depression
        'M25.9'   # Gelenkerkrankung
    ]
    
    print("Prüfung bekannter BVB-relevanter ICDs:")
    for icd in known_bvb_icds:
        if icd in df['icd'].values:
            eligibility = df[df['icd'] == icd]['eligibility'].iloc[0]
            print(f"  {icd}: {eligibility} ({'✅' if eligibility in ['BVB', 'LHB'] else '❌'})")
        else:
            print(f"  {icd}: NICHT GEFUNDEN")
    
    # Statistik
    if 'eligibility' in df.columns:
        total = len(df)
        none_count = (df['eligibility'] == 'NONE').sum()
        bvb_count = (df['eligibility'] == 'BVB').sum()
        lhb_count = (df['eligibility'] == 'LHB').sum()
        
        print(f"\n📊 VERTEILUNG:")
        print(f"  Gesamt: {total}")
        print(f"  NONE:   {none_count} ({none_count/total*100:.1f}%)")
        print(f"  BVB:    {bvb_count} ({bvb_count/total*100:.1f}%)")  
        print(f"  LHB:    {lhb_count} ({lhb_count/total*100:.1f}%)")
        
        if bvb_count + lhb_count == 0:
            print("\n🚨 KRITISCH: Keine BVB/LHB Qualifikationen gefunden!")
            print("   -> Prüfe die ursprüngliche Datenquelle")
            print("   -> Eventuell müssen Klassifikationen manuell hinzugefügt werden")

# 3) AUSFÜHREN
df = export_for_production()
validate_bvb_data(df)

print("\n=== NÄCHSTE SCHRITTE ===")
print("1. Download die exportierten Dateien aus /kaggle/working/")
print("2. Falls alle 'NONE': Originaldaten prüfen oder manuell BVB/LHB zuweisen")
print("3. Für Docker: diagnoseliste_for_docker.csv + rules.json verwenden")

=== DATASET ANALYSE ===
Zeilen: 130
Spalten: ['icd', 'title', 'group', 'eligibility', 'requires_second_icd', 'second_icd_hint', 'acute_window_months', 'notes', 'source_url', 'source_version']

Eligibility Verteilung:
eligibility
LHB     64
BVB     62
NONE     4
Name: count, dtype: int64

=== SAMPLE DATEN ===
     icd                                          title  group eligibility  \
0  B94.1            Folgezustände der Virusenzephalitis    NaN         BVB   
1    C00                                              -    NaN        NONE   
2  C70.0                                      Hirnhäute    NaN         BVB   
3  C70.1                                Rückenmarkhäute    NaN         BVB   
4  C70.9                Meningen nicht näher bezeichnet    NaN         BVB   
5  C71.0  Zerebrum ausgenommen Hirnlappen und Ventrikel    NaN         BVB   
6  C71.1                                  Frontallappen    NaN         BVB   
7  C71.2                                 Temporallappen    NaN    

  has_large_values = (abs_vals > 1e6).any()
  has_small_values = ((abs_vals < 10 ** (-self.digits)) & (abs_vals > 0)).any()
  has_small_values = ((abs_vals < 10 ** (-self.digits)) & (abs_vals > 0)).any()
