# AKI Onset Determination, Staging, and Recovery

This document outlines the steps to determine AKI (Acute Kidney Injury) onset, stage, and resolving status for our cohort using the definitions from KDIGO guidelines (Kidney Disease: Improving Global Outcomes, 2012). The process involves multiple steps, including importing necessary packages, setting up data paths, defining helper functions, and applying the KDIGO criteria to identify and stage AKI events in the cohort.

### 1. Importing Required Packages and Define Helper Functions.
We begin by importing the necessary Python packages and define some helper functions.

In [1]:
import warnings
warnings.filterwarnings("ignore")
import pandas as pd
import numpy as np
import logging
import os
import re

In [2]:
def get_data_file_path(home_path, site):
    input_path = os.path.join(home_path, 'raw_data', site) + '/'
    output_path = os.path.join(home_path, 'data', site) + '/'
    aux_path = os.path.join(home_path, 'aux_files') + '/'
    # Check if the output path exists, if not, create it
    if not os.path.exists(output_path):
        os.makedirs(output_path)
    
    return [input_path, output_path, aux_path]

In [3]:
def load_and_filter_scr(onset, filepath_lst):
    xxx = pd.read_pickle(filepath_lst[0]+'AKI_LAB_SCR.pkl') 
    xxx = xxx[['PATID', 'ENCOUNTERID', 'SPECIMEN_DATE',  'RESULT_NUM']] 
    
    xxx = xxx.groupby(['PATID', 'ENCOUNTERID', 'SPECIMEN_DATE']).mean()
    xxx = xxx.sort_values(['PATID', 'ENCOUNTERID', 'SPECIMEN_DATE'])
    xxx = xxx.reset_index()
    # merge with onset data
    xxx = onset.merge(xxx, on = ['ENCOUNTERID', 'PATID'], how='inner')
    xxx['DAYS_SINCE_ONSET'] = (xxx['SPECIMEN_DATE']-xxx['ONSET_DATE']).dt.days
    return xxx

In [4]:
def inverse_MDRD(row, KDIGO_baseline):
    age = row["AGE"]
    is_male = True if row["MALE"]  else False
    is_black = True if row["RACE_BLACK"] else False
        
        
    KDIGO_baseline = np.array([
        [1.5, 1.3, 1.2, 1.0],
        [1.5, 1.2, 1.1, 1.0],
        [1.4, 1.2, 1.1, 0.9],
        [1.3, 1.1, 1.0, 0.9],
        [1.3, 1.1, 1.0, 0.8],
        [1.2, 1.0, 0.9, 0.8]
    ])
    KDIGO_baseline = pd.DataFrame(KDIGO_baseline, columns = ["Black males", "Other males",
                                                            "Black females", "Other females"],
                                 index = ["20-24", "25-29", "30-39", "40-54", "55-65", ">65"])

    if is_male and is_black:
        if age <= 24:
            return KDIGO_baseline.loc["20-24", "Black males"]
        elif 25 <= age <= 29:
            return KDIGO_baseline.loc["25-29", "Black males"]
        elif 30 <= age <= 39:
            return KDIGO_baseline.loc["30-39", "Black males"]
        elif 40 <= age <= 54:
            return KDIGO_baseline.loc["40-54", "Black males"]
        elif 55 <= age <= 65:
            return KDIGO_baseline.loc["55-65", "Black males"]
        elif age > 65:
            return KDIGO_baseline.loc[">65", "Black males"]
    
    if is_male and not is_black:
        if age <= 24:
            return KDIGO_baseline.loc["20-24", "Other males"]
        elif 25 <= age <= 29:
            return KDIGO_baseline.loc["25-29", "Other males"]
        elif 30 <= age <= 39:
            return KDIGO_baseline.loc["30-39", "Other males"]
        elif 40 <= age <= 54:
            return KDIGO_baseline.loc["40-54", "Other males"]
        elif 55 <= age <= 65:
            return KDIGO_baseline.loc["55-65", "Other males"]
        elif age > 65:
            return KDIGO_baseline.loc[">65", "Other males"]

    if not is_male and is_black:
        if age <= 24:
            return KDIGO_baseline.loc["20-24", "Black females"]
        elif 25 <= age <= 29:
            return KDIGO_baseline.loc["25-29", "Black females"]
        elif 30 <= age <= 39:
            return KDIGO_baseline.loc["30-39", "Black females"]
        elif 40 <= age <= 54:
            return KDIGO_baseline.loc["40-54", "Black females"]
        elif 55 <= age <= 65:
            return KDIGO_baseline.loc["55-65", "Black females"]
        elif age > 65:
            return KDIGO_baseline.loc[">65", "Black females"]
    
    if not is_male and not is_black:
        if age <= 24:
            return KDIGO_baseline.loc["20-24", "Other females"]
        elif 25 <= age <= 29:
            return KDIGO_baseline.loc["25-29", "Other females"]
        elif 30 <= age <= 39:
            return KDIGO_baseline.loc["30-39", "Other females"]
        elif 40 <= age <= 54:
            return KDIGO_baseline.loc["40-54", "Other females"]
        elif 55 <= age <= 65:
            return KDIGO_baseline.loc["55-65", "Other females"]
        elif age > 65:
            return KDIGO_baseline.loc[">65", "Other females"]

### 2. Determine AKI Onset, Stage, and Recovery Status
To accurately identify and stage AKI, we adhere to the definitions in KDIGO guidelines. These guidelines provide specific criteria for determining AKI onset and categorizing the severity of the condition.

#### **AKI Onset:**
AKI onset is defined as any of the following (Not Graded):
* Increase in SCr by >= 0.3 mg/dl (>= 26.5 μmol/l) within 48 hours; or
* Increase in SCr to >= 1.5 times baseline, which is known or presumed to have occurred within the prior 7 days; or 
* Urine volume < 0.5 ml/kg/h for 6 hours.

#### **AKI Staging:**
AKI is staged for severity according to the following criteria:
* **Stage 1**: If SCr increases by 1.5–2.0 (strictly less than 2.0) times the baseline value or increases by >= 0.3 mg/dl (>= 26.5 μmol/l); or if urine output <0.5 ml/kg/h for 6–12 hours.
* **Stage 2**: If SCr increases by 2.0–3.0 (strictly less than 3.0) times the baseline value; or if urine output <0.5 ml/kg/h for >=12 hours.
* **Stage 3**: If SCr increases by >= 3.0 times the baseline value or increases to 4.0 mg/dl (>= 353.6 μmol/l); or initiation of renal replacement therapy (RRT); or if urine output <0.3 ml/kg/h for >=24 hours or anuria for >=12 hours.

#### **Resolving AKI:**
Resolving AKI is defined as a decrease in serum creatinine concentration of 0.3 mg/dL or more, or 25% or more from the maximum in the first 72 hours after AKI onset.

We are currently excluding the urine output condition from the definition due to the lack of relevant data. Instead, we are focusing on the SCr condition. To begin, we need to establish the AKI baseline value and ascertain whether a patient has been on RRT. In what follows, we will detail the functions used to define the baseline SCr value and to determine the RRT status and timing for patients undergoing RRT.

#### 2.1. Calculating Baseline SCr Level

To define AKI onset and stage AKI according to the KDIGO guidelines, we first need to determine the baseline SCr level for each patient. The following definition is used to determine the baseline SCr level.

#### Workflow for Defining SCr Baseline:
If SCr records exist from the past 7 days before admission:
* Use the lower value between the lowest SCr from the past 7 days and the SCr value within the first 24 hours of admission as the SCr baseline.

If no SCr records exist from the past 7 days but are available for the period 365 days to 7 days before admission:
* Use the lower value between the most recent SCr from this period and the SCr value within the first 24 hours of admission as the SCr baseline.

If no SCr records are available from the past 365 days:
* For non-CKD patients: The SCr baseline is determined by the lower value between the SCr inferred using the [MDRD formula](https://www.kidney.org/content/mdrd-study-equation) with an eGFR of 75 mL/min/1.73 m² and the SCr value within the first 24 hours of admission.
* For CKD patients: The SCr baseline is determined by the lower value between the SCr inferred using the MDRD formula with an eGFR corresponding to the CKD stage and the SCr value within the first 24 hours of admission.

In [5]:
def get_scr_baseline(df_scr, df_admit, filepath_lst, c7day = 'MOST_RECENT', c365day = 'AVERAGE', cckd = 'DROP'):

    cohort_table = dict()
    
    # load & process dx data
    dx = pd.read_pickle(filepath_lst[0]+'AKI_DX.pkl')  
    dx = dx[['PATID', 'ENCOUNTERID', 'DX', 'DX_DATE', 'DX_TYPE']] 
    dx = df_admit[['PATID', 'ENCOUNTERID', 'ADMIT_DATE']].merge(dx, on = ['PATID', 'ENCOUNTERID'], how = 'inner')
    dx['DAYS_SINCE_ADMIT'] = (dx['DX_DATE']-dx['ADMIT_DATE']).dt.days

    dx['DX'] = dx['DX'].astype(str)
    dx['DX_TYPE'] = dx['DX_TYPE'].astype(str)
    dx['DX_TYPE'] = dx['DX_TYPE'].replace('09', '9')
    
    # load & process demo data
    demo = pd.read_pickle(filepath_lst[0]+'AKI_DEMO'+'.pkl')  
    demo['MALE'] = demo['SEX'] == 'M'

    demo['RACE_WHITE'] = demo['RACE'] == '05'
    demo['RACE_BLACK'] = demo['RACE'] == '03'
    demo = demo[['PATID', 'ENCOUNTERID', 'AGE', 'MALE', 'RACE_WHITE', 'RACE_BLACK']]
    demo = demo.drop_duplicates()
    
    # estimate SCr Baseline
    pat_id_cols = ['PATID', 'ENCOUNTERID']
    complete_df = df_scr[['ENCOUNTERID', 'PATID', 'ADMIT_DATE', 'SPECIMEN_DATE', 'RESULT_NUM']]
 
    # 1. min between the min of 1-week prior admission SCr and within 24 hour after admission SCr
    # SCr within 24 hour after admission, that is admission day and one day after, get mean
    admission_SCr = complete_df[(complete_df.SPECIMEN_DATE >= complete_df.ADMIT_DATE) & \
                                (complete_df.SPECIMEN_DATE <= (complete_df.ADMIT_DATE + pd.Timedelta(days=1)))].copy()

    # Admission SCr is the mean of all the SCr within 24h admission
    admission_SCr = admission_SCr.groupby(pat_id_cols)['RESULT_NUM'].mean().reset_index()

    admission_SCr.rename(columns = {'RESULT_NUM': 'ADMISSION_SCR'}, inplace = True)

    # merge the ADMISSION_SCR back to the main frame
    complete_df = complete_df.merge(admission_SCr, 
                                    on = pat_id_cols,
                                    how = 'left')

    # SCr within 7 days prior to admission
    one_week_prior_admission = complete_df[(complete_df.SPECIMEN_DATE >= complete_df.ADMIT_DATE - pd.Timedelta(days=7)) & \
                                           (complete_df.SPECIMEN_DATE < complete_df.ADMIT_DATE)].copy()
    one_week_prior_admission = one_week_prior_admission.sort_values(by = ['PATID', 'ENCOUNTERID','SPECIMEN_DATE'])
    
    if c7day == 'MOST_RECENT':
        one_week_prior_admission = one_week_prior_admission.groupby(pat_id_cols)['RESULT_NUM'].last().reset_index()
    else:
        one_week_prior_admission = one_week_prior_admission.groupby(pat_id_cols)["RESULT_NUM"].min().reset_index()
        
    one_week_prior_admission.rename(columns = {'RESULT_NUM': 'ONE_WEEK_SCR'}, inplaced = True)

    complete_df = complete_df.merge(one_week_prior_admission, 
                                    on = pat_id_cols,
                                    how = 'left')

    # take the min between one week SCr and admission SCr
    complete_df.loc[complete_df.ONE_WEEK_SCR.notna(), 'BASELINE_EST_1'] = \
                np.min(complete_df.loc[complete_df.ONE_WEEK_SCR.notna(), ['ONE_WEEK_SCR','ADMISSION_SCR']], axis = 1)
    
    cohort_table['ONE_WEEK_SCR_YES'] = complete_df.ONE_WEEK_SCR.notna().sum()
    cohort_table['ONE_WEEK_SCR_NO'] = complete_df.ONE_WEEK_SCR.isna().sum()    
    cohort_table['ONE_WEEK_SCR_ONE_WEEK_SCR'] = (complete_df.ONE_WEEK_SCR.notna() & (cohort_table['ONE_WEEK_SCR']==cohort_table['BASELINE_EST_1'])).sum()
    cohort_table['ONE_WEEK_SCR_ADMISSION_SCR'] = (complete_df.ONE_WEEK_SCR.notna() & (cohort_table['ONE_WEEK_SCR']!=cohort_table['BASELINE_EST_1'])).sum()
        
    ori_num_unique_combinations = df_scr.groupby(['PATID', 'ENCOUNTERID']).ngroups
    # get the percentage of encounters that do not have past 7-day records
    criterion1_no_missing = complete_df.loc[complete_df.ONE_WEEK_SCR.notna(), :].groupby(pat_id_cols).ngroups
    criterion1_missing_rate = 1 - criterion1_no_missing / ori_num_unique_combinations

    # 2. pre-admission 365-7 day mean
    # here we only care about SCr measurements within 1 year before hospitalization
    one_year_prior_admission = complete_df[(complete_df.SPECIMEN_DATE < (complete_df.ADMIT_DATE - pd.Timedelta(days=7))) & \
                                     (complete_df.SPECIMEN_DATE >= (complete_df.ADMIT_DATE - pd.Timedelta(days=365.25)))].copy()
    one_year_prior_admission = one_year_prior_admission.sort_values(by = ['PATID', 'ENCOUNTERID','SPECIMEN_DATE'])
    one_year_prior_admission = one_year_prior_admission.loc[:, pat_id_cols + ['RESULT_NUM']]
    
    if c365day == 'AVERAGE':
        one_year_prior_admission = one_year_prior_admission.groupby(pat_id_cols)['RESULT_NUM'].mean().reset_index()
    else:
        one_year_prior_admission = one_year_prior_admission.groupby(pat_id_cols)['RESULT_NUM'].last().reset_index()  # or mean()
    
    one_year_prior_admission.rename(columns = {'RESULT_NUM': 'ONE_YEAR_SCR'}, inplace = True)
    
    complete_df = complete_df.merge(one_year_prior_admission, 
                                    on = pat_id_cols,
                                    how = 'left')
    
    # take the min between one week SCr and admission SCr
    complete_df.loc[complete_df.ONE_YEAR_SCR.notna(), 'BASELINE_EST_2'] = \
                np.min(complete_df.loc[complete_df.ONE_YEAR_SCR.notna(), ['ONE_YEAR_SCR', 'ADMISSION_SCR']], axis = 1)

    # priority 1: 7day SCr, priority 2: one year SCr
    complete_df['BASELINE_NO_INVERT'] = \
                np.where(complete_df['BASELINE_EST_1'].isna(), complete_df['BASELINE_EST_2'], complete_df['BASELINE_EST_1'])

    cohort_table['ONE_YEAR_SCR_YES'] = (complete_df.ONE_WEEK_SCR.notna() & complete_df.ONE_YEAR_SCR.notna()).sum()
    cohort_table['ONE_YEAR_SCR_NO'] = (complete_df.ONE_WEEK_SCR.isna() & complete_df.ONE_YEAR_SCR.isna()).sum()
    cohort_table['ONE_YEAR_SCR_ONE_WEEK_SCR'] = (complete_df.ONE_WEEK_SCR.notna() & complete_df.ONE_YEAR_SCR.notna() & (cohort_table['ONE_YEAR_SCR']==cohort_table['BASELINE_EST_2'])).sum()
    cohort_table['ONE_YEAR_SCR_ADMISSION_SCR'] = (complete_df.ONE_WEEK_SCR.notna() & complete_df.ONE_YEAR_SCR.notna() & (cohort_table['ONE_YEAR_SCR']!=cohort_table['BASELINE_EST_2'])).sum()    
    
    # 3. Invert CKD-EPI (2021) to estimate baseline (only for non-CKD patients)
    # get those encounters for which we need to impute baseline
    pat_to_invert = complete_df.loc[complete_df.BASELINE_NO_INVERT.isna(), pat_id_cols+['ADMIT_DATE', 'ADMISSION_SCR']]
    # one patient one row
    pat_to_invert.drop_duplicates(subset=pat_id_cols, keep='first', inplace = True)


    pat_dx = pat_to_invert.merge(dx.drop(['ENCOUNTERID', 'ADMIT_DATE'], axis = 1), 
                                              on = 'PATID', 
                                              how = 'left')

    # calculate DX_DATE when it is missing
    pat_dx.loc[pat_dx.DX_DATE.isna(), 'DX_DATE'] = \
            pat_dx.loc[pat_dx.DX_DATE.isna(), 'ADMIT_DATE'] + \
            pd.to_timedelta(pat_dx.loc[pat_dx.DX_DATE.isna(), 'DAYS_SINCE_ADMIT'], unit='D')

    # check patients that do not have DX in the database
    #pat_dx.DX_DATE.isna().mean()

    # filter out those DX after admission
    pat_dx = pat_dx[pat_dx.DX_DATE <= pat_dx.ADMIT_DATE]

    # get the default eGFR for inversion: default to 75 for non-CKD patients, average of eGFR from staging criteria for CKD patients
    pat_dx['DFLT_eGFR'] = 75

    pat_dx.loc[pat_dx['DX'].isin(['585.3', 'N18.3']), 'DFLT_eGFR'] = 90/2
    pat_dx.loc[pat_dx['DX'].isin(['585.4', 'N18.4']), 'DFLT_eGFR'] = 45/2
    pat_dx.loc[pat_dx['DX'].isin(['585.5', 'N18.5']), 'DFLT_eGFR'] = 15/2
#    pat_dx.loc[pat_dx['DX'].isin(['585.6', 'N18.6']), 'DFLT_eGFR'] = 15/2

    pat_def_egfr = pat_dx.groupby(pat_id_cols)['DFLT_eGFR'].min().reset_index()

    cohort_table['MDRD_NOCKD'] = (pat_def_egfr['DFLT_eGFR'] == 75)
    cohort_table['MDRD_CKD3']  = (pat_def_egfr['DFLT_eGFR'] == 90/2)
    cohort_table['MDRD_CKD4']  = (pat_def_egfr['DFLT_eGFR'] == 45/2)
    cohort_table['MDRD_CKD5']  = (pat_def_egfr['DFLT_eGFR'] == 15/2)
        
    pat_to_invert= pat_to_invert.merge(pat_def_egfr, on = pat_id_cols, how = 'left')
    pat_to_invert['DFLT_eGFR'] = pat_to_invert['DFLT_eGFR'].fillna(75)

    pat_to_invert['DROPCKD'] = pat_to_invert['DFLT_eGFR'] != 75
    
    #pat_to_invert.DFLT_eGFR.value_counts()

    # Backcalculation for patients
    # merge DEMO with pat_to_invert
    pat_to_invert = pat_to_invert.merge(demo, on = pat_id_cols, how = 'left')
    
    # estimate SCr from eGFR
    pat_to_invert.loc[:, 'BASELINE_INVERT'] = pat_to_invert.apply(inverse_MDRD, args = (KDIGO_baseline,), axis = 1) #pat_to_invert.apply(inverse_CKDEPI21, axis = 1)

    # take minimum of inverted SCr and admission SCr
    pat_to_invert['BASELINE_EST_3'] = np.min(pat_to_invert[['ADMISSION_SCR', 'BASELINE_INVERT']], axis = 1)

    # merge back the computation results
    complete_df = complete_df.merge(pat_to_invert[pat_id_cols + ['BASELINE_EST_3']], 
                                    on = pat_id_cols,
                                    how = 'left')

    # replace the old baseline
    complete_df['SERUM_CREAT_BASE'] = np.min(complete_df[['BASELINE_NO_INVERT', 'BASELINE_EST_3']], axis = 1)

    if cckd:
        complete_df = complete_df[complete_df['DROPCKD'] & complete_df['BASELINE_NO_INVERT'].isna()]
        
    # drop those still cannot find baseline
    complete_df = complete_df.dropna(subset=['SERUM_CREAT_BASE'])

    return complete_df.drop_duplicates(), cohort_table

#### 2.2. Determining RRT Status

Determining whether a patient has received renal replacement therapy (RRT), which includes both dialysis procedures and kidney transplants, is essential for accurately staging AKI. The following function can be used to identify the sub-cohort of patients who have received RRT and to determine the timing of the RRT.

In [6]:
def determine_rrt_status(df_admit, filepath_lst):
    px = pd.read_pickle(filepath_lst[0]+'AKI_PX.pkl')   

    idx_transplant = np.logical_or(np.logical_or(
                           np.logical_and(px['PX_TYPE']=='CH',px['PX'].isin(['50300','50320','50323','50325','50327','50328','50329','50340','50360','50365','50370','50380'])),
                           np.logical_and(px['PX_TYPE']=='09',px['PX'].isin(['55.51','55.52','55.53','55.54','55.61','55.69']))),np.logical_or(
                           np.logical_and(px['PX_TYPE']=='9',px['PX'].isin(['55.51','55.52','55.53','55.54','55.61','55.69'])),                       
                           np.logical_and(px['PX_TYPE']=='10',px['PX'].isin(['0TY00Z0','0TY00Z1','0TY00Z2','0TY10Z0','0TY10Z1','0TY10Z2','0TB00ZZ','0TB10ZZ','0TT00ZZ','0TT10ZZ','0TT20ZZ']))))

    idx_dialysis =(((px['PX_TYPE']=='CH') & (px['PX'].isin(['90935', '90937']))) |  
                  ((px['PX_TYPE']=='CH') & (pd.to_numeric(px['PX'], errors='coerce').between(90940, 90999))) |   
                  ((px['PX_TYPE']=='9') & ((px['PX'].isin(['39.93','39.95','54.98', 'V45.11'])))) | 
                  ((px['PX_TYPE']=='09') & (px['PX'].isin(['39.93','39.95','54.98', 'V45.11']))) |  
                  ((px['PX_TYPE']=='10') & (px['PX'].isin(['5A1D00Z','5A1D60Z','5A1D70Z','5A1D80Z','5A1D90Z', 'Z99.2'])))) 
 
    rrt_stage =  px[idx_transplant | idx_dialysis] 

    rrt_stage = rrt_stage[['PATID','ENCOUNTERID','PX_DATE']]
    rrt_stage.columns = ['PATID','ENCOUNTERID','RRT_ONSET_DATE']

    rrt_stage = rrt_stage.merge(df_admit, on=['PATID', 'ENCOUNTERID'], how='inner')
    rrt_stage['RRT_SINCE_ADMIT'] = (rrt_stage['RRT_ONSET_DATE']-rrt_stage['ADMIT_DATE']).dt.total_seconds()/(3600*24)
    rrt_stage = rrt_stage.loc[rrt_stage[['ENCOUNTERID', 'RRT_SINCE_ADMIT']].groupby('ENCOUNTERID').idxmin().reset_index()['RRT_SINCE_ADMIT']]
    rrt_stage.drop('ADMIT_DATE', axis = 1, inplace = True)
    return rrt_stage

#### 2.3. Determining AKI Onset and Staging

After preparing the necessary helper functions, we use the following function to determine AKI onset and staging based on the KDIGO definition presented before. This function combines the calculated baseline SCr values, admission data, and RRT status to identify the onset of AKI and stage its severity. 

The `get_aki_onset` function performs the following steps:
1. Merges baseline SCr values with onset data.
2. Calculates the minimum baseline SCr over the last 2 days.
3. Checks for AKI Stage 1 criteria based on 1.5x increase from baseline within 7 days or 0.3 mg/dl increase from the 48-hr minimum.
4. Identifies the earliest AKI Stage 1 onset for each patient.
5. Checks for AKI Stage 2 (2.0x - <3.0x increase) and AKI Stage 3 (>=3.0x increase or >=4 mg/dl SCr) criteria.
6. Identifies patients who initiated RRT after AKI onset.
7. Merges AKI staging information and determines the final stage by taking the highest AKI stage recorded for each patient.

In [7]:
def get_aki_onset(df_scr, df_admit, df_rrt, df_baseline, aki_criteria = 'either'):
    xxx = df_scr.copy()
    yyy = df_admit.copy()

    zzz = df_baseline[['PATID', 'ENCOUNTERID', 'SERUM_CREAT_BASE']].drop_duplicates()
    zzz.columns= ['PATID', 'ENCOUNTERID',  'RESULT_NUM_BASE_7d']
    xxx = xxx.merge(zzz, on = ['PATID', 'ENCOUNTERID'], how='left')

    zzz2 = xxx[['PATID', 'ENCOUNTERID', 'SPECIMEN_DATE', 'RESULT_NUM']].groupby(['PATID', 'ENCOUNTERID']).rolling('2d', on='SPECIMEN_DATE').min().reset_index()
    zzz2 = zzz2[['PATID', 'ENCOUNTERID', 'SPECIMEN_DATE', 'RESULT_NUM']]
    zzz2.columns= ['PATID', 'ENCOUNTERID', 'SPECIMEN_DATE', 'RESULT_NUM_BASE_2d']
    xxx = xxx.merge(zzz2, on = ['PATID', 'ENCOUNTERID', 'SPECIMEN_DATE'], how='left')

    # Check condition for AKI1
    #1.5 increase in 7 days
    xxx['AKI1.5'] = (xxx['RESULT_NUM']>=1.5*xxx['RESULT_NUM_BASE_7d']) & (xxx['DAYS_SINCE_ADMIT']>=0) 
    #0.3 increase in 48 hours
    xxx['AKI0.3'] = (xxx['RESULT_NUM']-xxx['RESULT_NUM_BASE_2d']>=0.3) & (xxx['DAYS_SINCE_ADMIT']>=0)      
    
    if aki_criteria == '2d':
        xxx = xxx[xxx['AKI0.3']]
        xxx = xxx.sort_values(['SPECIMEN_DATE', 'RESULT_NUM'], ascending=[True, False])
        xxx_backup = xxx.copy()
        xxx = xxx.groupby(['PATID', 'ENCOUNTERID']).first().reset_index()
        xxx['RESULT_NUM_BASE'] = xxx['RESULT_NUM_BASE_7d']
        
    elif aki_criteria == '7d':
        xxx = xxx[xxx['AKI1.5']]
        xxx = xxx.sort_values(['SPECIMEN_DATE', 'RESULT_NUM'], ascending=[True, False])
        xxx_backup = xxx.copy()
        xxx = xxx.groupby(['PATID', 'ENCOUNTERID']).first().reset_index()
        xxx['RESULT_NUM_BASE'] = xxx['RESULT_NUM_BASE_7d']
        #xxx['RESULT_NUM_BASE'] = xxx['RESULT_NUM_BASE_2d']
        
    elif aki_criteria == 'either':
        xxx = xxx[xxx['AKI0.3'] | xxx['AKI1.5']]
        xxx = xxx.sort_values(['SPECIMEN_DATE', 'RESULT_NUM'], ascending=[True, False])
        xxx_backup = xxx.copy()
        xxx = xxx.groupby(['PATID', 'ENCOUNTERID']).first().reset_index()
        xxx['RESULT_NUM_BASE'] = xxx['RESULT_NUM_BASE_7d']
        #xxx.loc[xxx['AKI0.3'], 'RESULT_NUM_BASE'] = xxx.loc[xxx['AKI0.3'], 'RESULT_NUM_BASE_2d']

    xxx['AKI1_SINCE_ADMIT'] = xxx['DAYS_SINCE_ADMIT'].copy()
    xxx['AKI1_DATE'] = xxx['SPECIMEN_DATE'].copy()
    xxx['AKI1_SCR'] = xxx['RESULT_NUM'].copy()
    xxx['SCR_BASELINE'] = xxx['RESULT_NUM_BASE'].copy()
    xxx['SCR_REFERENCE'] = xxx['RESULT_NUM_BASE_2d'].copy()
    xxx['AKI1_7D'] = xxx['AKI1.5'].copy()
    xxx['AKI1_2D'] = xxx['AKI0.3'].copy()
    xxx = xxx[['PATID', 'ENCOUNTERID', 'SCR_BASELINE', 'SCR_REFERENCE',  'AKI1_DATE', 'AKI1_SCR', 'AKI1_SINCE_ADMIT', 'AKI1_7D', 'AKI1_2D']]

    # Check condition for AKI2: 2.0x - <3.0x
    aki2 = xxx.merge(xxx_backup, on=['PATID', 'ENCOUNTERID'], how='left')
    aki2 = aki2[aki2['SPECIMEN_DATE']>=aki2['AKI1_DATE']]
    aki2 = aki2[aki2['RESULT_NUM']>=2*aki2['SCR_BASELINE']]
    aki2 = aki2.groupby(['PATID', 'ENCOUNTERID']).first().reset_index()
    aki2['AKI2_SINCE_ADMIT'] = aki2['DAYS_SINCE_ADMIT'].copy()
    aki2['AKI2_DATE'] = aki2['SPECIMEN_DATE'].copy()
    aki2['AKI2_SCR'] = aki2['RESULT_NUM'].copy()
    aki2 = aki2[['PATID', 'ENCOUNTERID', 'AKI2_DATE', 'AKI2_SCR', 'AKI2_SINCE_ADMIT']]
    
    # Check condition for AKI3: SCR >= 3.0x Baseline
    aki3 = xxx.merge(xxx_backup, on=['PATID', 'ENCOUNTERID'], how='left')
    aki3 = aki3[aki3['SPECIMEN_DATE']>=aki3['AKI1_DATE']]
    aki3 = aki3[(aki3['RESULT_NUM']>=3*aki3['SCR_BASELINE']) | (aki3['RESULT_NUM']>=4)]
    aki3 = aki3.groupby(['PATID', 'ENCOUNTERID']).first().reset_index()
    aki3['AKI3_SINCE_ADMIT'] = aki3['DAYS_SINCE_ADMIT'].copy()
    aki3['AKI3_DATE'] = aki3['SPECIMEN_DATE'].copy()
    aki3['AKI3_SCR'] = aki3['RESULT_NUM'].copy()
    aki3 = aki3[['PATID', 'ENCOUNTERID', 'AKI3_DATE', 'AKI3_SINCE_ADMIT', 'AKI3_SCR']]
    
    # Check condition for AKI3: initiation of RRT
    #df_rrt = get_rrt(path, ext, sep, yyy)
    rrt = df_rrt.merge(xxx[['PATID', 'ENCOUNTERID', 'AKI1_DATE']], on=['PATID', 'ENCOUNTERID'], how='left')
    rrt = rrt[rrt['RRT_ONSET_DATE'] >= rrt['AKI1_DATE']]
    aki3b =  aki3.merge(rrt, on = ['PATID', 'ENCOUNTERID'], how = 'outer')
    cond_rrt = (aki3b['RRT_SINCE_ADMIT'] < aki3b['AKI3_SINCE_ADMIT']) | (aki3b['AKI3_SINCE_ADMIT'].isna() & aki3b['RRT_SINCE_ADMIT'].notna())
    aki3b.loc[cond_rrt, 'AKI3_SINCE_ADMIT'] = aki3b.loc[cond_rrt, 'RRT_SINCE_ADMIT']
    aki3b.loc[cond_rrt, 'AKI3_DATE'] = aki3b.loc[cond_rrt, 'RRT_ONSET_DATE']
    
    aki3_all = aki3b[['PATID', 'ENCOUNTERID', 'AKI3_DATE', 'AKI3_SINCE_ADMIT', 'AKI3_SCR']]
    
    # Merge AKI staging information
    onset = xxx.merge(aki2, on=['PATID', 'ENCOUNTERID'], how='outer').merge(aki3_all, on=['PATID', 'ENCOUNTERID'], how='outer')
    onset = onset.merge(yyy, on=['PATID', 'ENCOUNTERID'], how='left')

    onset.columns = onset.columns.str.upper()
    onset['ONSET_DATE'] = onset['AKI1_DATE'].copy()  
    onset['SCR_ONSET'] = onset['AKI1_SCR'].copy() 
    
    onset['DISCHARGE_SINCE_ONSET'] = (onset['DISCHARGE_DATE'] - onset['ONSET_DATE']).dt.days
    
    onset = onset[['PATID','ENCOUNTERID', 'ADMIT_DATE', 'DISCHARGE_DATE', 
                   'ONSET_DATE', 'AKI1_SINCE_ADMIT', 'AKI2_SINCE_ADMIT', 
                   'AKI3_SINCE_ADMIT',  'DISCHARGE_SINCE_ONSET','SCR_ONSET', 
                   'SCR_BASELINE',  'SCR_REFERENCE', 'AKI1_7D', 'AKI1_2D']]

    onset['FLAG'] = (onset['AKI2_SINCE_ADMIT'].notna()) | (onset['AKI3_SINCE_ADMIT'].notna())
    onset['ONSET_SINCE_ADMIT'] = onset['AKI1_SINCE_ADMIT'].copy()  
    
    # Get AKI stage by taking the final stage
    onset['AKI_STAGE'] = 0
    filter_aki3 = onset['AKI3_SINCE_ADMIT'].notna()
    filter_aki2 = onset['AKI2_SINCE_ADMIT'].notna() & onset['AKI3_SINCE_ADMIT'].isna()
    filter_aki1 = onset['AKI1_SINCE_ADMIT'].notna() & onset['AKI2_SINCE_ADMIT'].isna() & onset['AKI3_SINCE_ADMIT'].isna()
    
    onset.loc[filter_aki3, 'AKI_STAGE'] = 3
    onset.loc[filter_aki2, 'AKI_STAGE'] = 2
    onset.loc[filter_aki1, 'AKI_STAGE'] = 1
    
    return onset.drop_duplicates()

#### 2.4. Determining AKI Recovery Status

The following functions are used to determine whether an episode of AKI is resolving or non-resolving. Resolving AKI is defined as a decrease in serum creatinine concentration of 0.3 mg/dL or more, or 25% or more from the maximum in the first 72 hours after AKI onset. Non-resolving AKI does not meet these criteria.

- The `akiresolving` function evaluates whether an AKI episode is resolving based on sCr values:
    1. Filters the data to include only the first 72 hours after AKI onset.
    2. Identifies the maximum sCr value during this period for each patient.
    3. Evaluates whether the sCr decrease meets the criteria for resolving AKI (a decrease of ≥0.3 mg/dL or 25% or more from the maximum).
    4. Flags patients who meet these criteria with an `AKI_resolving` status.


- The `akiresolvingsustain` function checks if the resolving status is sustained within the 72-hour period:
    1. Identifies the first time point when the resolving criteria were met.
    2. Checks subsequent sCr values to ensure that the resolving status is sustained.
    3. Flags patients with sustained resolving AKI using the `sustain` variable.


- The `get_aki_resolving` function integrates the above steps to determine and summarize the AKI recovery status for each patient:
    1. Filters the sCr data to include only observations after AKI onset.
    2. Applies the `akiresolving` function to determine the resolving status.
    3. Applies the `akiresolvingsustain` function to check if the resolving status is sustained.
    4. Generates two outcome variables:
       - `AKI_TRIGG`: Indicates whether the resolving condition was triggered.
       - `AKI_RESOL`: Indicates whether the resolving condition was sustained.

Finally, the function merges these outcomes with the onset data and returns the final dataset, summarizing the AKI recovery status for each patient.

In [8]:
def akiresolving(dall, time, dlevel, ratio):
    time_window = (dall['onset_day']<=3) & (dall['onset_day']>=time) 
    dall = dall[time_window].reset_index().drop('index',axis=1)
    dall2 = dall.sort_values(['value', 'charttime'], ascending=[False, True])[['subject_id', 'hadm_id', 'charttime', 'value']]
    max_idx = dall2.groupby(['subject_id', 'hadm_id'])['value'].idxmax()
    dmax_sCr = dall2.loc[max_idx]
    dmax_sCr.columns = ['subject_id', 'hadm_id', 'charttime_max', 'value_max']
    dall = dall.merge(dmax_sCr, left_on=['subject_id', 'hadm_id'], right_on=['subject_id', 'hadm_id'], how='left')
    # redefine time windows
    time_window = (dall['onset_day']<=3) & (dall['onset_day']>=time)
    dall['AKI_resolving']=0
   
    dall['ddays']= (dall['charttime']-dall['charttime_max'])
    dall['dvalues'] = dall['value_max']-dall['value']
    dall['dvalues2'] = dall['value']/dall['value_max']
    # within 72 hrs, which means onset_day equals 1, 2, or 3
    filter_lvl = (dall['ddays']>pd.Timedelta(value=0, unit='s')) & time_window & (dall['dvalues']>=dlevel)
    filter_rat = (dall['ddays']>pd.Timedelta(value=0, unit='s')) & time_window & (dall['dvalues2']<=ratio)
    dall.loc[filter_lvl,'AKI_resolving']=1
    dall.loc[filter_rat,'AKI_resolving']=1
    dall=dall.drop(['ddays', 'dvalues', 'dvalues2'],axis=1)
    dall['AKI_resolving2']=dall['AKI_resolving'].copy()

    dmaxRelv = dall[['subject_id', 'hadm_id', 'AKI_resolving']].groupby(['subject_id', 'hadm_id']).max().reset_index()
    dmaxRelv.columns = ['subject_id', 'hadm_id', 'max_AKI_resolving']
    dall = dall.merge(dmaxRelv, left_on=['subject_id', 'hadm_id'], right_on=['subject_id', 'hadm_id'], how='left')
    dall['AKI_resolving'] = dall['max_AKI_resolving']
    return dall.drop('max_AKI_resolving',axis = 1)

def akiresolvingsustain(dall, time):
    
    dresolv = dall[dall['AKI_resolving2']==1]
    dresolv = dresolv[['subject_id', 'hadm_id', 'charttime']].sort_values('charttime').groupby(['subject_id', 'hadm_id']).first().reset_index()
    dresolv.columns = ['subject_id', 'hadm_id', 'charttime_resolv']

    dall = dall.merge(dresolv, left_on=['subject_id', 'hadm_id'], right_on=['subject_id', 'hadm_id'], how='left')
    time_window = (dall['onset_day']<=3) & (dall['onset_day']>=time) 
    dall['after_ddays'] = ((dall['charttime']-dall['charttime_resolv']) > pd.Timedelta(value=0, unit='s')) & time_window 

    dresolv2 = dall[dall['after_ddays']][['subject_id', 'hadm_id', 'AKI_resolving2']].groupby(['subject_id', 'hadm_id']).min().reset_index()
    dresolv2.columns = ['subject_id', 'hadm_id', 'sustain']

    dall = dall.merge(dresolv2, left_on=['subject_id', 'hadm_id'], right_on=['subject_id', 'hadm_id'], how='left')
    dall['sustain'].fillna(True, inplace = True)
    return dall

def get_aki_resolving(scr, onset):
    scr = scr[scr['DAYS_SINCE_ONSET']>=0]
    scr = scr[['PATID', 'ENCOUNTERID', 'SPECIMEN_DATE', 'RESULT_NUM', 'DAYS_SINCE_ONSET']]
    scr.columns = ['subject_id', 'hadm_id', 'charttime', 'value', 'onset_day']

    scr = akiresolving(dall = scr, time = 0, dlevel = 0.3, ratio = 0.75)
    scr = akiresolvingsustain(dall = scr, time = 0)

    scr['AKI_sustain'] = scr['AKI_resolving']*scr['sustain']
    scr['PATID'] = scr['subject_id']
    scr['ENCOUNTERID'] = scr['hadm_id']
    scr = scr[['PATID', 'ENCOUNTERID', 'AKI_resolving', 'AKI_sustain']].groupby(['PATID', 'ENCOUNTERID']).sum().reset_index()
    scr['AKI_TRIGG'] = scr['AKI_resolving']>0
    scr['AKI_RESOL'] = scr['AKI_sustain']>0
    onset = onset.merge(scr[['PATID', 'ENCOUNTERID',  'AKI_TRIGG', 'AKI_RESOL']],on=['PATID', 'ENCOUNTERID'], how='left')
    return onset.drop_duplicates()

#### 2.5. Processing AKI Onset and Recovery

The `process_onset_and_recovery` function automates the processing of AKI onset and recovery data for multiple sites. This function integrates previously defined components to streamline the analysis, producing final datasets for both AKI onset and recovery status.

In [9]:
def process_onset_and_recovery(home_path, site):
    filepath_lst = get_data_file_path(home_path, site)
    df_scr, df_admit = load_onset_data(filepath_lst)
    df_baseline, cohort_table = get_scr_baseline(df_scr, df_admit, filepath_lst)
    df_rrt = determine_rrt_status(df_admit, filepath_lst)
    onset = get_aki_onset(df_scr, df_admit, df_rrt, df_baseline)
    onset.to_pickle(filepath_lst[1] + 'onset.pkl')
    df_scr_filtered = load_and_filter_scr(onset, filepath_lst)
    recovery = get_aki_resolving(scr = df_scr_filtered, onset = onset)
    recovery.to_pickle(filepath_lst[1]+'recovery.pkl')
    return onset, recovery, cohort_table

#### 2.6. Main Script for Processing AKI Onset and Recovery Across Sites

The script is designed to be executed iteratively across a list of sites, as demonstrated below:

In [11]:
home_path = '/blue/yonghui.wu/hoyinchan/Data/data2022/'

# Set up logging
logging.basicConfig(filename= home_path+'log/processing_errors.log', level=logging.ERROR)
site_lst = ['KUMC', 'UTSW', 'MCW', 'UIOWA', 'UMHC', 'UTHSCSA', 'UPITT', 'UNMC', 'MCRI', 'UofU'] 

site_lst = ['KUMC']

for site in site_lst:
    try:
        onset, recovery, cohort_table = process_onset_and_recovery(home_path, site)
        print(f"Finished generating AKI onset and recovery for {site}.", flush = True)
    except Exception as e:
        error_message = f"Error processing site {site}: {str(e)}"
        logging.error(error_message)
        print(error_message)

Error processing site KUMC: name 'load_onset_data' is not defined
