In [1]:
import pandas as pd
import numpy as np
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
import pickle
from tqdm import tqdm

## Clean and Merge Vital Data with Resp comp labels

In [2]:
df_comp = pd.read_csv('/home/common/mover_data/surginf_cleaned/complications_cleaned.csv')

In [3]:
# Check duplicates
df_comp[df_comp.duplicated(subset=['mrn' ,'log_id'])]

Unnamed: 0,log_id,mrn,any_complication,n_complications,comp_abbr,comp_full,death,respiratory_comp


In [4]:
spark = SparkSession.builder.appName("mover").config("spark.driver.memory", "32g") \
    .config('spark.dirver.maxResultSize', '8g') .getOrCreate()
df_vital = spark.read.parquet('/home/common/mover_data/spark_vital/')

23/09/08 01:56:18 WARN Utils: Your hostname, argosafe resolves to a loopback address: 127.0.1.1; using 129.215.10.63 instead (on interface ens160)
23/09/08 01:56:18 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/09/08 01:56:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
comp_spark = spark.createDataFrame(df_comp[['mrn', 'log_id', 'respiratory_comp']])
df_vital = df_vital.join(comp_spark, on=["mrn", 'log_id'], how="inner").drop(comp_spark["mrn"], comp_spark["log_id"]).withColumnRenamed("df_vital.mrn", "mrn").withColumnRenamed("df_vital.log_id", "log_id")
df_vital = df_vital.withColumn("MEAS_FLOAT",df_vital.MEAS_VALUE.cast('double'))
df_vital = df_vital.groupby('MRN', 'LOG_ID', 'RECORDED_TIME', 'RECORD_TYPE', 'respiratory_comp').pivot('FLO_DISPLAY_NAME').mean('MEAS_FLOAT').filter(F.col('RECORD_TYPE') != 'POST-OP')

                                                                                

In [6]:
# Merge data in the same hour (Recorded_time) by taking the mean
df_vital = df_vital.withColumn("day", F.to_date("RECORDED_TIME"))
df_vital = df_vital.withColumn("hour", F.hour("RECORDED_TIME"))
groupby_cols = ["MRN", "LOG_ID", "day", "hour"]
exclude_cols = ["MRN", "LOG_ID", "RECORDED_TIME", "day", "hour"]

df_vital = df_vital.drop('temp source')

agg_expressions = []

for col in df_vital.columns:
    if col not in exclude_cols:
        if df_vital.schema[col].dataType == StringType():
            agg_expressions.append(F.first(col).alias(col))
        else:
            agg_expressions.append(F.mean(col).alias(col))

agg_df = df_vital.groupBy(*groupby_cols)
agg_df = agg_df.agg(*agg_expressions)


In [7]:
exprs = [(1 - (F.count(F.col(c)) / F.count(F.lit(1)))).alias(c) for c in agg_df.columns]

missing_rates_df = agg_df.agg(*exprs)

# Show the result
missing_rates_df = missing_rates_df.toPandas().transpose()

23/09/08 01:58:29 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

In [17]:
print('Feature and missing rate')
with pd.option_context('display.max_rows', None, 'display.max_columns', None):  # more options can be specified also
    print(missing_rates_df.sort_values(by=[0]))

Feature and missing rate
                                                           0
MRN                                                 0.000000
LOG_ID                                              0.000000
day                                                 0.000000
hour                                                0.000000
RECORD_TYPE                                         0.000000
respiratory_comp                                    0.000000
SpO2                                                0.299477
Resp                                                0.321273
Pulse                                               0.369435
Max Diastolic                                       0.418675
Min Diastolic                                       0.418729
MAP (mmHg)                                          0.439630
Min Systolic                                        0.476646
Max Systolic                                        0.476646
Temp                                                0.525173

In [19]:
# Select features with missing rate < 0.95
selected_feats = missing_rates_df[missing_rates_df[0] < 0.95].index.tolist()
selected_feats.remove('Max Systolic ')
selected_feats.remove('Min Systolic ')
print('Selected features (Missing rate < 95\%):')
print(selected_feats)

Selected features (Missing rate < 95\%):
['MRN', 'LOG_ID', 'day', 'hour', 'RECORD_TYPE', 'respiratory_comp', 'Air', 'Arterial Line MAP (ART)', 'ETCO2 (mmHg)', 'ETCO2RR', 'ETN2O', 'ETO2', 'Exhaled TV (ml)', 'Expired Min Volume (MV)', 'FICO2', 'FIN20 (%)', 'FiO2 (%)', 'Heart Rate', 'MAP (mmHg)', 'Max Diastolic', 'Max Systolic', 'Min Diastolic', 'Min Systolic', 'NIBP - MAP', 'O2 Flow Rate (L/min)', 'PAWP', 'PIP Observed', 'Pain Score', 'Pulse', 'Resp', 'SET TV', 'SEV ET', 'SEV In', 'Set PEEP', 'Set Resp Rate', 'SpO2', 'Temp', 'Urine', 'Urine Output', 'Volume']


In [45]:
index_cols = ['MRN', 'LOG_ID', 'day', 'hour', 'RECORD_TYPE', 'respiratory_comp']
data_df = agg_df.select(selected_feats)


#Remove the rows with all missing values
dropna_cols = [col for col in data_df.columns if col not in index_cols]
data_df = data_df.dropna(how='all', subset=dropna_cols)

In [46]:
data_df = data_df.toPandas()

                                                                                

In [50]:
pickle.dump(data_df, open('/home/common/mover_data/surginf_cleaned/cleaned_vital_df.pkl', 'wb'))

In [51]:
data_df

Unnamed: 0,MRN,LOG_ID,day,hour,RECORD_TYPE,respiratory_comp,Air,Arterial Line MAP (ART),ETCO2 (mmHg),ETCO2RR,...,SET TV,SEV ET,SEV In,Set PEEP,Set Resp Rate,SpO2,Temp,Urine,Urine Output,Volume
0,0089e170012bc07e,9e2441c763ec19d9,2022-04-16,6,PRE-OP,No,,,,,...,,,,,,98.000000,98.400000,,,
1,0089e170012bc07e,9e2441c763ec19d9,2022-04-16,7,PRE-OP,No,1.360,,31.285714,11.428571,...,548.214286,1.319048,1.804762,3.214286,10.375,99.372093,97.130769,,,
2,0089e170012bc07e,9e2441c763ec19d9,2022-04-16,8,INTRA-OP,No,1.675,,34.333333,16.000000,...,575.000000,2.280000,2.768333,5.000000,,97.716667,97.420000,,,
3,0089e170012bc07e,9e2441c763ec19d9,2022-04-16,9,INTRA-OP,No,1.500,,41.250000,11.983333,...,564.285714,1.955000,2.076667,5.683333,,98.150000,98.715094,,0.0,400.0
4,0089e170012bc07e,9e2441c763ec19d9,2022-04-16,10,INTRA-OP,No,,,33.375000,8.375000,...,,0.250000,0.025000,6.000000,,99.000000,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
3589914,fffb5d2e00cf670f,ca86eca1473b99bf,2021-01-12,0,PRE-OP,No,,,,,...,,,,,,97.000000,98.600000,,,
3589915,fffb5d2e00cf670f,ca86eca1473b99bf,2021-01-12,1,PRE-OP,No,,,,,...,,,,,,100.000000,,,,
3589916,fffb5d2e00cf670f,ca86eca1473b99bf,2021-01-12,2,PRE-OP,No,,,41.500000,2.500000,...,500.000000,0.000000,0.000000,0.000000,10.000,97.600000,,,,
3589917,fffb5d2e00cf670f,ca86eca1473b99bf,2021-01-12,3,INTRA-OP,No,1.300,,36.600000,11.983333,...,496.666667,1.335000,1.728333,5.500000,10.000,99.183333,97.073077,,,300.0


## Data imputation and split

In [2]:
data_df = pickle.load(open('/home/common/mover_data/surginf_cleaned/cleaned_vital_df.pkl', 'rb'))

In [3]:
cleaned_df = data_df.drop(['RECORD_TYPE'], axis=1)

# 1. FFill nulls within each surgery
cleaned_df = cleaned_df.groupby(['MRN', 'LOG_ID'], group_keys=False).apply(lambda group: group.ffill())

# 2. FFill nulls within each patient
cleaned_df = cleaned_df.groupby(['MRN'], group_keys=False).apply(lambda group: group.ffill())
    
# # 3. If values are still missing, use the global mean
global_median = cleaned_df.median()
cleaned_df = cleaned_df.fillna(global_median)


  global_median = cleaned_df.median()


In [4]:
# Data split
from sklearn.model_selection import train_test_split

# Map respiratory_comp to 0/1
full_df = cleaned_df.copy()
full_df = full_df.sort_values(by=['MRN', 'LOG_ID', 'day', 'hour'])
full_df['respiratory_comp'] = full_df['respiratory_comp'].map({'No': 0, 'Yes': 1})

patient_df = full_df.groupby(['MRN']).agg({'respiratory_comp': 'max'}).reset_index()
patient_list = patient_df['MRN'].tolist()
y_list = patient_df['respiratory_comp'].tolist()


# Stratified split data into train, validation, test as 70%, 10%, 20%
p_train, p_test, y_train, y_test = train_test_split(patient_list, y_list, test_size=0.2, random_state=42, stratify=y_list)
p_train, p_val, y_train, y_val = train_test_split(p_train, y_train, test_size=0.125, random_state=42, stratify=y_train)

train_df = full_df[full_df['MRN'].isin(p_train)].copy()
val_df = full_df[full_df['MRN'].isin(p_val)].copy()
test_df = full_df[full_df['MRN'].isin(p_test)].copy()

In [5]:
# Data normalization
from sklearn.preprocessing import StandardScaler

columns_to_scale = train_df.columns.difference(['MRN', 'LOG_ID', 'day', 'hour', 'respiratory_comp'])

scaler = StandardScaler()
train_df[columns_to_scale] = scaler.fit_transform(train_df[columns_to_scale])
val_df[columns_to_scale] = scaler.transform(val_df[columns_to_scale])
test_df[columns_to_scale] = scaler.transform(test_df[columns_to_scale])

In [16]:
# Dataframe to numpy tensor
def get_dataset_dict(df):
    # Get all unique tuples of (MRN, LOG_ID)
    data_dict = {}
    feat_cols = df.columns.difference(['MRN', 'LOG_ID', 'day', 'hour', 'respiratory_comp'])
    for _, group in tqdm(df.groupby(['MRN', 'LOG_ID'])):
        cur_id = (group['MRN'].iloc[0], group['LOG_ID'].iloc[0])
        matrix = np.array(group[feat_cols].values)
        label = group['respiratory_comp'].iloc[0]
        data_dict[cur_id] = (matrix, label)
    return data_dict


In [17]:
dataset_dict = {}
dataset_dict['train'] = get_dataset_dict(train_df)
dataset_dict['val'] = get_dataset_dict(val_df)
dataset_dict['test'] = get_dataset_dict(test_df)


100%|██████████| 39609/39609 [00:17<00:00, 2249.76it/s]
100%|██████████| 5739/5739 [00:02<00:00, 2268.27it/s]
100%|██████████| 11331/11331 [00:05<00:00, 2231.96it/s]


In [19]:
pickle.dump(dataset_dict, open('/home/common/mover_data/surginf_cleaned/dataset_dict.pkl', 'wb'))