# How to run the preprocessing pipeline (https://github.com/Ang-Li-Lab/NLPMed-Engine)

**Author:** Omid Jafari

**Created On:** 2025-10-08

**License:** AGPLv3

--- 

## Imports and settings:

In [None]:
# import os

# os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
# os.environ["CUDA_VISIBLE_DEVICES"] = "0"

In [None]:
import json
from pathlib import Path
from typing import Literal

import pandas as pd
from termcolor import colored

from nlpmed_engine.data_structures.patient import Patient
from nlpmed_engine.pipelines.batch_pipeline import SinglePipeline

## Helper functions:

In [None]:
def log(text: str, msg_type: Literal["info", "error"] = "info"):
    if msg_type == "error":
        print(f"{colored('ERROR:', 'white', 'on_red')}\t{text}", flush=True)
    else:
        print(f"{colored('INFO:', 'white', 'on_blue')}\t{text}", flush=True)

In [None]:
def read_df(
    input_path: Path,
    input_format: Literal["pickle", "parquet", "feather", "stata", "csv"],
):
    if format == "pickle":
        log(f"Reading {input_path}")
        return pd.read_pickle(input_path)

    if format == "parquet":
        log(f"Reading {input_path}")
        return pd.read_parquet(input_path)

    if format == "feather":
        log(f"Reading {input_path}")
        return pd.read_feather(input_path)

    if format == "stata":
        log(f"Reading {input_path}")
        return pd.read_stata(input_path)

    if format == "csv":
        log(f"Reading {input_path}")
        return pd.read_csv(input_path)

    return None

In [None]:
def write_df(
    df: pd.DataFrame,
    output_dir: Path,
    output_name: str,
    formats: list | None = None,
):
    if formats is None:
        formats = ["pickle", "parquet", "feather", "stata", "csv", "excel"]

    if "pickle" in formats:
        output_path = Path(output_dir, f"{output_name}.pkl.gz")
        log(f"Saving {output_path}")
        df.to_pickle(output_path, compression="gzip")

    if "parquet" in formats:
        output_path = Path(output_dir, f"{output_name}.parquet.gz")
        log(f"Saving {output_path}")
        df.to_parquet(output_path, engine="pyarrow", index=False, compression="gzip")

    if "feather" in formats:
        output_path = Path(output_dir, f"{output_name}.feather")
        log(f"Saving {output_path}")
        df.to_feather(output_path)

    if "stata" in formats:
        tmp_df = df.copy(deep=True)
        output_path = Path(output_dir, f"{output_name}.dta")
        log(f"Saving {output_path}")
        object_cols = tmp_df.select_dtypes(["object"]).columns
        tmp_df[object_cols] = tmp_df[object_cols].astype("string")
        string_cols = tmp_df.select_dtypes(["string"]).columns
        tmp_df[string_cols] = tmp_df[string_cols].fillna("")
        tmp_df.to_stata(output_path, write_index=False, version=118, convert_strl=list(string_cols))

    if "csv" in formats:
        output_path = Path(output_dir, f"{output_name}.csv")
        log(f"Saving {output_path}")
        df.to_csv(output_path, index=False)

    if "excel" in formats:
        output_path = Path(output_dir, f"{output_name}.xlsx")
        log(f"Saving {output_path}")
        df.to_excel(output_path, index=False)

## Global variables:

In [None]:
NOTES_PATH = Path()
RADS_PATH = Path()
MODEL_PATH = Path()

CONFIG_NOTES = {
    "encoding_fixer": {"status": "enabled"},
    "pattern_replacer": {
        "status": "enabled",
        # r"\s{4,}" => at least 4 whitespaces
        # r"(?:\s*\n\s*){2,}" => at least two \n (each \n can be surrounded by any whitespaces)
        "pattern": r"\s{4,}",
        "target": "\n\n",
    },
    "word_masker": {
        "status": "enabled",
        "words_to_mask": [
            "PE CT",
            "PE-CT",
            "PE/CT",
            "CT PE",
            "CT/PE",
            "CT-PE",
            "DVT US",
            "DVT U/S",
            "DVT ultrasound",
            "PE protocol",
            "PE study",
            "PE technique",
            "PE scan",
            "DVT protocol",
            "DVT study",
            "DVT technique",
            "DVT scan",
            "VTE prophylaxis",
            "VTE prophy",
            "VTE ppx",
            "DVT prophylaxis",
            "DVT prophy",
            "DVT ppx",
        ],
        "mask_char": "*",
    },
    "note_filter": {
        "status": "enabled",
        "words_to_search": [
            "DVT",
            "PE",
            "VTE",
            "deep vein thrombus",
            "deep vein thrombi",
            "deep vein thrombosis",
            "deep vein thromboses",
            "deep vein thromboembolism",
            "deep venous thrombus",
            "deep venous thrombi",
            "deep venous thrombosis",
            "deep venous thromboses",
            "deep venous thromboembolism",
            "pulmonary embolus",
            "pulmonary emboli",
            "pulmonary embolic",
            "pulmonary embolism",
            "pulmonary thromboembolism",
            "pulmonary artery embolus",
            "pulmonary artery emboli",
            "pulmonary artery embolic",
            "pulmonary artery embolism",
            "pulmonary artery thromboembolism",
            "pulmonary arterial embolus",
            "pulmonary arterial emboli",
            "pulmonary arterial embolic",
            "pulmonary arterial embolism",
            "pulmonary arterial thromboembolism",
            "thromboemboli",
            "clot",
            "embolus",
            "emboli",
            "embolic",
            "embolism",
            "embolisms",
            "thrombus",
            "thrombi",
            "thrombotic",
            "thrombosis",
            "thrombosed",
            "phlebitis",
            "thrombophlebitis",
            "filling defect",
            "filling defects",
        ],
    },
    "section_splitter": {
        "status": "enabled",
        "delimiter": "\n\n",
    },
    "section_filter": {
        "status": "enabled",
        "section_inc_list": [
            "chief complaint",
            "cc:",
            "present illness",
            "history of present illness",
            "hpi",
            "clinical presentation",
            "clinical history",
            "interval history",
            "interim history",
            "clinical hx",
            "interval hx",
            "interim hx",
            "patient history",
            "patient hx",
            "subjective",
            "subj",
            "assessment",
            "impression",
            "impressions",
            "plan",
            "suggestion",
            "discussion",
            "conclusion",
            "conclusions",
            "recommendation",
            "rec",
            "recs",
            "a&p",
            "a &p",
            "a& p",
            "a/p",
            "a /p",
            "a/ p",
            "ap",
            "a p:",
            "s:",
            "a:",
            "p:",
            "attestation",
            "addendum",
            "attending note",
            "staff note",
            "teaching physician note",
            "ed course",
            "hospital course",
            "hospital summary",
            "brief summary of hospital course",
            "brief hospital course",
        ],
        "section_exc_list": [
            "review of system",
            "system review",
            "systems review",
            "hematologic history",
            "hematological history",
            "heme history",
            "oncological history",
            "oncologic history",
            "onc history",
            "hematologic hx",
            "hematological hx",
            "heme hx",
            "oncological hx",
            "oncologic hx",
            "onc hx",
            "past h",
            "past med",
            "past surg",
            "medical h",
            "social h",
            "family h",
            "medication",
            "allerg",
            "physical exam",
            "vital",
            "ros",
            "pmh:",
            "pmhx",
            "psh:",
            "pshx",
            "sh:",
            "shx",
            "fh:",
            "fhx",
            "med:",
            "meds",
            "general",
            "gen:",
            "pe:",
            "v/s",
            "data",
            "lab",
            "labs",
            "laboratory",
            "image",
            "imaging",
            "radiology",
            "pathology",
            "path",
            "problem",
            "problems",
            "objective",
            "obj",
            "o:",
            "diagnosis",
            "discharge condition",
            "disposition",
            "discharge medication",
            "medication list",
            "consults",
            "procedures",
        ],
        "fallback": True,
    },
    "sentence_segmenter": {
        "status": "enabled",
        "model_name": "en_core_sci_lg",
        "batch_size": 10,
    },
    "duplicate_checker": {
        "status": "disabled",
        "num_perm": 256,
        "sim_threshold": 0.9,
        "length_threshold": 50,
    },
    "sentence_filter": {
        "status": "enabled",
        "words_to_search": [
            "DVT",
            "PE",
            "VTE",
            "deep vein thrombus",
            "deep vein thrombi",
            "deep vein thrombosis",
            "deep vein thromboses",
            "deep vein thromboembolism",
            "deep venous thrombus",
            "deep venous thrombi",
            "deep venous thrombosis",
            "deep venous thromboses",
            "deep venous thromboembolism",
            "pulmonary embolus",
            "pulmonary emboli",
            "pulmonary embolic",
            "pulmonary embolism",
            "pulmonary thromboembolism",
            "pulmonary artery embolus",
            "pulmonary artery emboli",
            "pulmonary artery embolic",
            "pulmonary artery embolism",
            "pulmonary artery thromboembolism",
            "pulmonary arterial embolus",
            "pulmonary arterial emboli",
            "pulmonary arterial embolic",
            "pulmonary arterial embolism",
            "pulmonary arterial thromboembolism",
            "thromboemboli",
            "clot",
            "embolus",
            "emboli",
            "embolic",
            "embolism",
            "embolisms",
            "thrombus",
            "thrombi",
            "thrombotic",
            "thrombosis",
            "thrombosed",
            "phlebitis",
            "thrombophlebitis",
            "filling defect",
            "filling defects",
        ],
    },
    "sentence_expander": {
        "status": "enabled",
        "length_threshold": 50,
    },
    "joiner": {
        "status": "enabled",
        "sentence_delimiter": "\n",
        "section_delimiter": "\n\n",
    },
    "ml_inference": {
        "status": "enabled",
        "models": {
            "VTE": {
                "device": "cuda",
                "model_path": str(Path(MODEL_PATH, "model")),
                "tokenizer_path": str(Path(MODEL_PATH, "model")),
                "max_length": 512,
            }
        },
        "use_preped_text": True,
    },
}

CONFIG_RADS = {
    "encoding_fixer": {"status": "enabled"},
    "pattern_replacer": {
        "status": "enabled",
        "pattern": r"(?:\s*\n\s*){2,}",
        "target": "\n\n",
    },
    "word_masker": {
        "status": "enabled",
        "words_to_mask": [
            "PE CT",
            "PE-CT",
            "PE/CT",
            "CT PE",
            "CT/PE",
            "CT-PE",
            "DVT US",
            "DVT U/S",
            "DVT ultrasound",
            "PE protocol",
            "PE study",
            "PE technique",
            "PE scan",
            "DVT protocol",
            "DVT study",
            "DVT technique",
            "DVT scan",
            "VTE prophylaxis",
            "VTE prophy",
            "VTE ppx",
            "DVT prophylaxis",
            "DVT prophy",
            "DVT ppx",
        ],
        "mask_char": "*",
    },
    "note_filter": {
        "status": "enabled",
        "words_to_search": [
            "DVT",
            "PE",
            "VTE",
            "deep vein thrombus",
            "deep vein thrombi",
            "deep vein thrombosis",
            "deep vein thromboses",
            "deep vein thromboembolism",
            "deep venous thrombus",
            "deep venous thrombi",
            "deep venous thrombosis",
            "deep venous thromboses",
            "deep venous thromboembolism",
            "pulmonary embolus",
            "pulmonary emboli",
            "pulmonary embolic",
            "pulmonary embolism",
            "pulmonary thromboembolism",
            "pulmonary artery embolus",
            "pulmonary artery emboli",
            "pulmonary artery embolic",
            "pulmonary artery embolism",
            "pulmonary artery thromboembolism",
            "pulmonary arterial embolus",
            "pulmonary arterial emboli",
            "pulmonary arterial embolic",
            "pulmonary arterial embolism",
            "pulmonary arterial thromboembolism",
            "thromboemboli",
            "clot",
            "embolus",
            "emboli",
            "embolic",
            "embolism",
            "embolisms",
            "thrombus",
            "thrombi",
            "thrombotic",
            "thrombosis",
            "thrombosed",
            "phlebitis",
            "thrombophlebitis",
            "filling defect",
            "filling defects",
        ],
    },
    "section_splitter": {
        "status": "enabled",
        "delimiter": "\n\n\n\n\n\n\n\n\n\n\n\n",  # Artificially disabling section_splitter
    },
    "section_filter": {
        "status": "enabled",
        "section_inc_list": [],  # Artificially disabling section_splitter
        "section_exc_list": [],  # Artificially disabling section_splitter
        "fallback": True,
    },
    "sentence_segmenter": {
        "status": "enabled",
        "model_name": "en_core_sci_lg",
        "batch_size": 10,
    },
    "duplicate_checker": {
        "status": "disabled",
        "num_perm": 256,
        "sim_threshold": 0.9,
        "length_threshold": 50,
    },
    "sentence_filter": {
        "status": "enabled",
        "words_to_search": [
            "DVT",
            "PE",
            "VTE",
            "deep vein thrombus",
            "deep vein thrombi",
            "deep vein thrombosis",
            "deep vein thromboses",
            "deep vein thromboembolism",
            "deep venous thrombus",
            "deep venous thrombi",
            "deep venous thrombosis",
            "deep venous thromboses",
            "deep venous thromboembolism",
            "pulmonary embolus",
            "pulmonary emboli",
            "pulmonary embolic",
            "pulmonary embolism",
            "pulmonary thromboembolism",
            "pulmonary artery embolus",
            "pulmonary artery emboli",
            "pulmonary artery embolic",
            "pulmonary artery embolism",
            "pulmonary artery thromboembolism",
            "pulmonary arterial embolus",
            "pulmonary arterial emboli",
            "pulmonary arterial embolic",
            "pulmonary arterial embolism",
            "pulmonary arterial thromboembolism",
            "thromboemboli",
            "clot",
            "embolus",
            "emboli",
            "embolic",
            "embolism",
            "embolisms",
            "thrombus",
            "thrombi",
            "thrombotic",
            "thrombosis",
            "thrombosed",
            "phlebitis",
            "thrombophlebitis",
            "filling defect",
            "filling defects",
        ],
    },
    "sentence_expander": {
        "status": "disabled",
        "length_threshold": 50,
    },
    "joiner": {
        "status": "enabled",
        "sentence_delimiter": "\n",
        "section_delimiter": "\n\n",
    },
    "ml_inference": {
        "status": "enabled",
        "models": {
            "VTE": {
                "device": "cuda",
                "model_path": str(Path(MODEL_PATH, "model")),
                "tokenizer_path": str(Path(MODEL_PATH, "model")),
                "max_length": 512,
            }
        },
        "use_preped_text": True,
    },
}

In [None]:
pipeline_notes = SinglePipeline(config=CONFIG_NOTES)
pipeline_rads = SinglePipeline(config=CONFIG_RADS)

## Preprocessing:

#### Load notes and radiology impressions:

In [None]:
notes_df = read_df(input_path=NOTES_PATH, input_format="parquet")
rads_df = read_df(input_path=RADS_PATH, input_format="parquet")

### Sort by patient_id, note_date, note_id:

In [None]:
notes_df2 = notes_df.sort_values(by=["patient_id", "note_date", "note_id"])
rads_df2 = rads_df.sort_values(by=["patient_id", "note_date", "note_id"])

### Drop duplicates (notes having the same text on the same day):

In [None]:
notes_df3 = notes_df2.drop_duplicates(subset=["patient_id", "note_id", "note_text", "note_date"], keep="first")
rads_df3 = rads_df2.drop_duplicates(subset=["patient_id", "note_id", "note_text", "note_date"], keep="first")

### Convert to JSON:

In [None]:
notes_df3["notes"] = notes_df3.apply(
    lambda r: {"note_id": "note_id", "text": "note_text"},
    axis=1,
)
grouped_notes = notes_df3.groupby("patient_id")["notes"].apply(list).reset_index()
notes_patients_list = grouped_notes.to_dict(orient="records")

rads_df3["notes"] = rads_df3.apply(
    lambda r: {"note_id": "note_id", "text": "note_text"},
    axis=1,
)
grouped_rads = rads_df3.groupby("patient_id")["notes"].apply(list).reset_index()
rads_patients_list = grouped_rads.to_dict(orient="records")

### Run preprocessing:

In [None]:
processed_notes_patients = [
    pipeline_notes.process(
        Patient.from_json(json.dumps(p)),
        config=CONFIG_NOTES,
    )
    for p in notes_patients_list
]

processed_rads_patients = [
    pipeline_rads.process(
        Patient.from_json(json.dumps(p)),
        config=CONFIG_RADS,
    )
    for p in rads_patients_list
]

### Merge back to initial dataframe:

In [None]:
def merge_back_outputs(df, processed):
    rows = []

    for p in processed:
        pid = p.patient_id
        notes = getattr(p, "notes", []) or []
        for n in notes:
            rows.append(
                {
                    "patient_id": pid,
                    "note_id": getattr(n, "note_id", None),
                    "preped_text": getattr(n, "preprocessed_text", None),
                    "pred_label": getattr(n, "predicted_label", None),
                    "pred_score": getattr(n, "predicted_score", None),
                }
            )

    proc_df = pd.DataFrame(rows)
    out = df.copy()

    if proc_df.empty:
        for col in ("preped_text", "pred_label", "pred_score"):
            if col not in out.columns:
                out[col] = pd.NA
        return out

    out = out.merge(
        proc_df,
        on=["patient_id", "note_id"],
        how="left",
        sort=False,
    )

    for col in ("preped_text", "pred_label", "pred_score"):
        if col not in out.columns:
            out[col] = pd.NA

    return out


final_notes = merge_back_outputs(notes_df3, processed_notes_patients)
final_rads = merge_back_outputs(rads_df3, processed_rads_patients)

### Write dataframes:

In [None]:
write_df(df=final_notes, output_name="", output_dir="", formats=["parquet"])
write_df(df=final_rads, output_name="", output_dir="", formats=["parquet"])