# Dialysis Data Processing Pipeline

In [None]:
import sys
from pathlib import Path
from config import Config as paths
import pandas as pd
import sys
import os

project_root = Path("..").resolve()
sys.path.insert(0, str(project_root))

from data_cleaning.renaming import rename_columns,generate_and_save_rename_columns_json
from data_cleaning.utils import save_json, load_json
from data_cleaning.cleaners.episode.clean_data_dialys import DialysisCleaner

## Load Data

In [None]:
dialysis = pd.read_parquet(paths.DIALYSIS_PATH)

## Generate Renaming Files

In [None]:
generate_and_save_rename_columns_json(
    dialysis, f"{paths.RENAME_FILES_DIALYSIS}/dialysis_rename_columns.json"
)

## Rename Columns

In [None]:
dialysis_renamed = dialysis_renamed = rename_columns(dialysis, f"{paths.RENAME_FILES_DIALYSIS}/dialysis_rename_columns.json")

## Clean Data

In [None]:
dialysis_cleaner = DialysisCleaner()
dialysis_cleaned = dialysis_cleaner.clean_data(df=dialysis_renamed)

## Map Data to Episodes

In [None]:
reference_data = dialysis_cleaner.clean_data(pd.read_parquet(paths.REFERENCE_DATA_PATH))
reference_data['patient_id'] = reference_data['patient_id'].astype(int)
DAYS_BEFORE_BASELINE = pd.Timedelta(99999, unit="days")
DAYS_AFTER_BASELINE = -pd.Timedelta(1, unit="days")

dialysis_mapped = dialysis_cleaner.map_data_to_interval(
    reference_df=reference_data[
        ["episode_id", "patient_id", "sample_date"]
    ].drop_duplicates(),
    df=dialysis_cleaned,
    patient_id_col_name="patient_id",
    date_col_name="dialysis_date",
    baseline_col_name="sample_date",
    time_before_baseline=DAYS_BEFORE_BASELINE,
    time_after_baseline=DAYS_AFTER_BASELINE,
)

## Add Dialysis Indicator

In [None]:
dialysis_mapped["had_dialysis"] = dialysis_cleaner.get_prefix_match_mask(
    df=dialysis_mapped,
    target_cols=["dialysis_type"],
    prefixes=["Hemodiafiltration (HDF)", "Hemodialys (HD)", "Hemofiltration (HF)"],
)

In [None]:
dialysis_mapped

## Summarize Episodes

In [None]:
def summarize_episode(df):
    df = df.copy()
    episode_id = df["episode_id"].iloc[0]
    dialysis = df.had_dialysis.max()
    return {
        "episode_id": episode_id,
        "dialysis_prior": dialysis
    }

dialysis_episode_summary = dialysis_cleaner.summarize_data_by_episode(
    df=dialysis_mapped, episode_id_col="episode_id", summary_function=summarize_episode
)

## Save Processed Data

In [None]:
if not os.path.exists(paths.STORE_DIALYSIS_DATA_PATH):
    os.makedirs(paths.STORE_DIALYSIS_DATA_PATH)
    os.makedirs(paths.STORE_DIALYSIS_DATA_PATH + "/mapped")
    
dialysis_episode_summary.to_parquet(f"{paths.STORE_DIALYSIS_DATA_PATH}/dialysis_episode_summary.parquet")
dialysis_mapped.to_parquet(f"{paths.STORE_DIALYSIS_DATA_PATH}/mapped/dialysis_episode_mapped.parquet")