In [None]:
from pathlib import Path
from hashlib import sha1

import pandas as pd

base_path = Path.cwd().parent / "data"

In [None]:
from medminer.utils.data import Document

df = pd.read_excel(base_path / "eval" / "freetext_vormedikation_subset.xlsx")

docs = []
for _, row in df.iterrows():
    docs.append(Document(row["PID"], row["textvalue"]))

In [None]:
from medminer.utils.models import DefaultModel
from medminer.task.medication import MedicationTask

for doc in docs:
    task = MedicationTask(
        DefaultModel().model, 
        base_dir=base_path / "examples" / "results" / "eval",
        session_id=sha1(doc.patient_id.encode()).hexdigest()
    )
    task.run(doc.content)

In [None]:
import dask.dataframe as dd

cols = {
    "patient_id": str,
    "medication_reference": str,
    "medication_name": str,
    "medication_translated": str,
    "active_ingredient": str,
    "dose": str,
    "unit": str,
    "route": str,
    "frequency": str,
    "frequency_code": str,
    "rxcui": str,
    "atc_id": str,
    "atc_name": str,
    "atc_type": str
}

pred_df = dd.read_csv(
    base_path / "examples" / "results" / "eval" / "*" / "medication.csv", 
    assume_missing=True,
    include_path_column="hash",
    dtype=cols,
    usecols=cols.keys()
).compute().reset_index(drop=True)
pred_df["hash"] = pred_df["hash"].str.split("/").str[-2]
pred_df

In [None]:
skip_cols = [
    "dose",
    "rxcui",
]
for col in pred_df.columns:
    if col in skip_cols:
        continue

    pred_df[col] = pred_df[col].str.strip().str.replace("_x000D_", "")

pred_df

In [None]:
true_df = pd.read_excel(base_path / "eval" / "medication.xlsx")
true_df = true_df[true_df["patient_id"].isin(df["PID"])]
true_df["hash"] = true_df["patient_id"].apply(lambda x: sha1(x.encode()).hexdigest())

true_df = true_df.drop(columns=["Unnamed: 15", "Unnamed: 16"])
true_df["atc_id"] = true_df["atc_id"].astype(str)
true_df["medication_name"] = true_df["medication_name"].astype(str)
true_df["medication_reference"] = true_df["medication_reference"].astype(str)
true_df.loc[true_df["atc_id"].isin(["nan", "0"]), "atc_id"] = None
true_df = true_df[true_df["comment"] != "kein Medikament"]

exact = [
    "_x000D_",
    "b.B.:_x000D_",
]
true_df = true_df[~true_df["medication_reference"].str.strip().isin(exact)]

words = [
    "keine",
    "unbekannt",
    "pflegeüberleitungsbogen",
    "unklar",
    "pausiert",
    "medikation",
    "protokoll",
    "brief",
    "erhalten",
    "fremdanamnestisch",
    "perfusoren",
    "verlegung",
]
for word in words:
    true_df = true_df[~true_df["medication_reference"].str.lower().str.contains(word)]
    true_df = true_df[~true_df["medication_name"].str.lower().str.contains(word)]

skip_cols = [
    "dose",
    "dosage_morning",
    "dosage_noon",
    "dosage_evening",
    "dosage_night",
]
for col in true_df.columns:
    if col in skip_cols:
        continue
    true_df[col] = true_df[col].str.strip().str.replace("_x000D_", "")

In [None]:
from difflib import SequenceMatcher

def match_medications_for_patient(hash, true_df, pred_df):
    """Match medications between true and predicted dataframes for a single patient."""
    _true_df = true_df[true_df["hash"] == hash]
    _pred_df = pred_df[pred_df["hash"] == hash]

    # Initialize mapping
    _mapping = {idx: None for idx in _pred_df.index}
    
    def _get_unmapped(mapping):
        mapped_true = set(mapping.values())
        mapped_pred = {k for k, v in mapping.items() if v is not None}
        
        unmapped_true = list(set(_true_df.index.values) - mapped_true)
        unmapped_pred = list(set(_pred_df.index.values) - mapped_pred)
        
        return unmapped_true, unmapped_pred
    
    # Step 1: Exact ATC matching (first 5 characters)
    _match_by_atc(_true_df, _pred_df, _mapping)
    
    # Step 2: Exact medication name matching
    _match_by_exact_name(_true_df, _pred_df, _mapping, _get_unmapped)
    
    # Step 3: Fuzzy medication name matching
    _match_by_fuzzy_name(_true_df, _pred_df, _mapping, _get_unmapped)
    
    # Step 4: Substring matching (contains logic)
    _match_by_substring(_true_df, _pred_df, _mapping, _get_unmapped)

    # Step 5: Fuzzy medication reference matching
    _match_by_fuzzy_reference(_true_df, _pred_df, _mapping, _get_unmapped)
        
    # Report unmatched items
    _not_mapped_true, _not_mapped_pred = _get_unmapped(_mapping)
    
    return _mapping

def _match_by_atc(true_df, pred_df, mapping):
    """Match medications by ATC code (first 5 characters)."""
    pred_atc = pred_df["atc_id"].str[:5].dropna().drop_duplicates(keep=False)
    
    for true_idx, atc_id in true_df["atc_id"].str[:5].dropna().drop_duplicates(keep=False).items():
        pred_idx = pred_atc[pred_atc.str.startswith(atc_id)].index.values
        if len(pred_idx) == 1:
            mapping[pred_idx[0]] = true_idx

def _match_by_exact_name(true_df, pred_df, mapping, get_unmapped_func):
    """Match medications by exact medication name."""
    not_mapped_true, not_mapped_pred = get_unmapped_func(mapping)
    
    for true_idx, true_name in true_df.loc[not_mapped_true, "medication_name"].str.strip().str.lower().items():
        pred_matches = pred_df.loc[not_mapped_pred]
        pred_idx = pred_matches[
            pred_matches["medication_name"].str.strip().str.lower() == true_name
        ].index.values
        
        if len(pred_idx) == 1:
            mapping[pred_idx[0]] = true_idx

def _match_by_fuzzy_name(true_df, pred_df, mapping, get_unmapped_func, threshold=0.8):
    """Match medications by fuzzy medication name matching."""
    not_mapped_true, not_mapped_pred = get_unmapped_func(mapping)
    
    for true_idx, true_name in true_df.loc[not_mapped_true, "medication_name"].str.strip().str.lower().items():
        best_match_idx, best_score = _find_best_fuzzy_match(
            true_name, pred_df, not_mapped_pred, "medication_name", threshold
        )
        
        if best_match_idx is not None:
            mapping[best_match_idx] = true_idx

def _match_by_fuzzy_reference(true_df, pred_df, mapping, get_unmapped_func, threshold=0.8):
    """Match medications by fuzzy medication reference matching."""
    not_mapped_true, not_mapped_pred = get_unmapped_func(mapping)
    
    for true_idx, true_ref in true_df.loc[not_mapped_true, "medication_reference"].str.strip().str.lower().items():
        best_match_idx, best_score = _find_best_fuzzy_match(
            true_ref, pred_df, not_mapped_pred, "medication_reference", threshold
        )
        
        if best_match_idx is not None:
            mapping[best_match_idx] = true_idx

def _find_best_fuzzy_match(target_text, pred_df, not_mapped_pred, column_name, threshold):
    """Find the best fuzzy match for a given text in the specified column."""
    best_match_idx = None
    best_score = 0
    
    for pred_idx in not_mapped_pred:
        pred_text = pred_df.loc[pred_idx, column_name]
        if pd.notna(pred_text):
            pred_text = pred_text.strip().lower()
            score = SequenceMatcher(None, target_text, pred_text).ratio()
            
            if score > threshold and score > best_score:
                best_score = score
                best_match_idx = pred_idx
    
    return best_match_idx, best_score

def _match_by_substring(true_df, pred_df, mapping, get_unmapped_func):
    """Match medications using substring containment logic."""
    not_mapped_true, not_mapped_pred = get_unmapped_func(mapping)
    
    for true_idx in not_mapped_true:
        true_name = _safe_lower_strip(true_df.loc[true_idx, "medication_name"])
        true_ref = _safe_lower_strip(true_df.loc[true_idx, "medication_reference"])
        
        for pred_idx in not_mapped_pred:
            pred_name = _safe_lower_strip(pred_df.loc[pred_idx, "medication_name"])
            pred_ref = _safe_lower_strip(pred_df.loc[pred_idx, "medication_reference"])
            
            if _check_substring_match(true_name, true_ref, pred_name, pred_ref):
                mapping[pred_idx] = true_idx
                not_mapped_true, not_mapped_pred = get_unmapped_func(mapping)

def _safe_lower_strip(text):
    """Safely convert text to lowercase and strip whitespace."""
    return text.strip().lower() if pd.notna(text) else None

def _check_substring_match(true_name, true_ref, pred_name, pred_ref):
    """Check if any combination of true/pred names/references contain each other."""
    pairs = [
        (true_name, pred_name),
        (true_name, pred_ref),
        (true_ref, pred_name),
        (true_ref, pred_ref)
    ]
    
    for text1, text2 in pairs:
        if text1 and text2:
            if text1 in text2 or text2 in text1:
                return True
    
    return False

# Main execution
mapping = {}
for patient_id, _true_df in true_df.groupby("hash"):
    patient_mapping = match_medications_for_patient(patient_id, true_df, pred_df)
    mapping.update(patient_mapping)

In [None]:
pred_df_copy = pred_df.copy()
pred_df_copy.index = pred_df.index.map(mapping)

res_df = true_df.join(pred_df_copy, lsuffix="_true", rsuffix="_pred", how="outer")
res_df["patient_id"] = res_df["patient_id_true"].fillna(res_df["patient_id_pred"])
res_df["hash"] = res_df["hash_true"].fillna(res_df["hash_pred"])
res_df = res_df.sort_values(["patient_id", "atc_id_true", "medication_name_true"]).reset_index(drop=True)
res_df.to_csv(base_path / "examples" / "results" / "medication_comparison.csv", index=False)

len(res_df), len(mapping), len(list(filter(None, mapping.values())))

In [None]:
res2 = res_df.copy()
res2[res2["medication_reference_true"].isna() | res2["medication_reference_pred"].isna()][[
    "hash",
    "patient_id",
    'medication_reference_true', 
    'medication_reference_pred', 
    'medication_name_true',
    'medication_name_pred',
    'medication_translated_true', 
    'medication_translated_pred', 
    'active_ingredient_true',
    'active_ingredient_pred',
    "atc_id_true",
    "atc_id_pred"
]].sort_values(["hash", "atc_id_true", "medication_name_true"])