In [1]:
import pandas as pd
import numpy as np
import os
from collections import Counter
from datetime import datetime
from tqdm import tqdm

# pd.set_option('display.max_columns', None)
# pd.set_option('display.max_rows', None)

In [2]:
def readin_data(data_type: str):
    """ This function reads in test or train data, which must be in folders 'testing_data' and 'training_data' in the same directory.

    Parameters
    ----------
    'data_type' : str
        This must be 'test' or 'train'.
    """
    assert (data_type=='test') or (data_type=='train'), f'You gave data_type as {data_type}. Please define data_type as "test" or "train."'
    if data_type == 'test':
        inner_directory = './testing_data/'
        data_list = os.listdir('./testing_data')
    else:
        inner_directory = './training_data/'
        data_list = os.listdir('./training_data')
    data_dict = {}
    for file_name in data_list:
        data_dict[file_name.split('.')[0]] = pd.read_csv(inner_directory+file_name).drop_duplicates()
    return data_dict

In [3]:
# os.makedirs('./processed_data')

In [4]:
def add_uids(data_dictionary: dict):
    """ This function adds a UID to each row to establish unique instances between person_id & measurement datetimes for the various tables.
    
    This is not done for the demographics file since the information in it is not sensitive to the hour.

    Parameters
    ----------
    'data_dictionary' : dict
        A dictionary of pandas DataFrames.
    """
    for table_ind in list(data_dictionary.keys()):
        if not table_ind.startswith("person_demographics"):
            table = data_dictionary[table_ind]
            datetime_index = np.argmax([i.find('datetime') for i in table.columns])
            date_column = table.columns[datetime_index]
            personid_index = np.argmax([i.find('person_id') for i in table.columns])
            personid_column = table.columns[personid_index]
            table['uid'] = table[date_column].astype(str) + table[personid_column].astype(str)
            table.drop(columns=[date_column,personid_column],inplace=True)
            data_dictionary[table_ind] = table
    
    return

In [5]:
def birthday_management(data_dictionary: dict):
    """
    This function processes the 'person_demographics' table of given data, which is inputed as a dictionary. The data in dictionary is replaced by index, hence function returns nothing.

    Parameters
    ----------
    'data_dictionary' : dict
        A dictionary of pandas DataFrames.
    """
    demographics_ind_no = np.argmax([table.startswith("person_demographics") for table in data_dictionary.keys()])
    demographics_index = list(data_dictionary.keys())[demographics_ind_no]
    demographics = data_dictionary[demographics_index]

    new_birthday_col = pd.DataFrame(columns=['birthday_formatted', 'person_id'])
    new_visit_col = pd.DataFrame(columns=['visit_start_date','new_visit_startdate'])
    
    for person in np.unique(demographics['person_id']):
        birthday = demographics[demographics['person_id']==person]['birth_datetime'].to_list()[0]
        birthday_formatted = datetime.strptime(birthday,'%Y-%m-%d')
        new_birthday_col.loc[len(new_birthday_col)] = [birthday_formatted, person]

    for date in np.unique(demographics['visit_start_date']):
        visit_start = demographics[demographics['visit_start_date']==date]['visit_start_date'].to_list()[0]
        new_visit_startdate = datetime.strptime(visit_start,'%Y-%m-%d')
        # print(f'new {new_visit_startdate} old {date}')
        new_visit_col.loc[len(new_visit_col)] = [date, new_visit_startdate]


    demographics = pd.merge(left=demographics,right=new_birthday_col,how='left',on='person_id')
    demographics = pd.merge(left=demographics,right=new_visit_col,how='left',on='visit_start_date')
    demographics.drop(columns=['visit_start_date','birth_datetime'],inplace=True)
    demographics.to_csv(f'./processed_data/processed_{demographics_index}.csv')
    data_dictionary[demographics_index] = demographics
    return None

In [6]:
def measurement_meds_processing(data_dictionary: dict):
    """ This function processes the 'measurement_meds' table of given data, which is inputed in a dictionary. The data in dictionary is replaced by index, hence function returns nothing.

    Parameters
    ----------
    'data_dictionary' : dict
        A dictionary of pandas DataFrames.
    """
    body_measurements_ind = np.argmax([table.startswith("measurement_meds") for table in data_dictionary.keys()])
    body_measurements_index = list(data_dictionary.keys())[body_measurements_ind]
    measurements = data_dictionary[body_measurements_index]
    
    measurements = measurements.dropna(subset=measurements.select_dtypes(float).columns, how='all')
    # measurements.drop(index=[i for i in measurements[measurements['Body temperature']>45].index], axis=1,inplace=True)
    measurements['Body temperature'] = measurements['Body temperature'].apply(lambda x: np.nan if x >46 else x)
    measurements.to_csv(f'./processed_data/processed_{body_measurements_index}.csv')
    data_dictionary[body_measurements_index] = measurements
    return None

In [7]:
# REDONE NOT TESTED

def drugs_exposure_processing(data_dictionary: dict):
    """ This function processes the 'drugsexposure' table of given data, which is inputed in a dictionary. The data in dictionary is replaced by index, hence function returns nothing.

    Parameters
    ----------
    'data_dictionary' : dict
        A dictionary of pandas DataFrames.

    Requirements
    ------------
    You need to run add_uids first.
    """
    # assert uids_added == True, 'You need to run add_uids before this function.'
    drugs_exposure_ind = np.argmax([table.startswith("drugsexposure") for table in data_dictionary.keys()])
    drugs_exposure_index = list(data_dictionary.keys())[drugs_exposure_ind]
    drugs_exposure = data_dictionary[drugs_exposure_index]

    drugs_exposure_processed = pd.DataFrame(columns = ['uid', 'drugs', 'routes', 'visit_occurrence_id'])
    for x in tqdm(np.unique(drugs_exposure['uid'])):
        drugs = drugs_exposure[drugs_exposure['uid']==x]['drug_concept_id'].to_list()
        drugs.sort()
        try:
            route = drugs_exposure[drugs_exposure['uid']==x]['route_concept_id'].to_list()
            route = list(set(route))
            route = [str(i) for i in route]
            route.sort()
        except:
            route = drugs_exposure[drugs_exposure['uid']==x]['route_concept_id'].to_list()
            route = list(set(route))
        visit_occurrence = drugs_exposure[drugs_exposure['uid']==x]['visit_occurrence_id'].to_list()[0]
        drugs_exposure_processed.loc[len(drugs_exposure_processed)]= [x,drugs,route, visit_occurrence]
    data_dictionary[drugs_exposure_index] = drugs_exposure_processed
    drugs_exposure.to_csv(f'./processed_data/processed_{drugs_exposure_index}.csv')
    return None

In [8]:
def measurement_lab_processing(data_dictionary: dict):
    """ This function processes the 'measurement_lab' table of given data, which is inputed in a dictionary. The data in dictionary is replaced by index, hence function returns nothing.

    Parameters
    ----------
    'data_dictionary' : dict
        A dictionary of pandas DataFrames.

    Requirements
    ------------
    You need to run add_uids first.
    """
    # assert uids_added == True, 'You need to run add_uids before this function.'

    measurement_lab_ind = np.argmax([table.startswith("measurement_lab") for table in data_dictionary.keys()])
    measurement_lab_index = list(data_dictionary.keys())[measurement_lab_ind]
    measurement_lab = data_dictionary[measurement_lab_index]

    measurement_lab = measurement_lab.dropna(subset=measurement_lab.select_dtypes(float).columns, how='all')
    measurement_lab_count = pd.DataFrame([list(i) for i in Counter(measurement_lab['uid']).items()],columns=['uid','count'])
    measurement_lab_count['count'].astype(int)
    measurement_lab_rows = pd.DataFrame()
    measurement_lab_extras = measurement_lab_count[measurement_lab_count['count']>1]
    for j in [i for i in measurement_lab_extras['uid']]:
        measurement_lab_rows = pd.concat([measurement_lab_rows,measurement_lab[measurement_lab['uid']==j].max().to_frame().T]).reset_index(drop=True)
    measurement_lab.drop(index=[i for i in measurement_lab[measurement_lab['uid']==measurement_lab_extras['uid'].item()].index], axis=1,inplace=True)
    measurement_lab = pd.concat([measurement_lab,measurement_lab_rows]).reset_index(drop=True)

    measurement_lab.to_csv(f'./processed_data/processed_{measurement_lab_index}.csv')
    data_dictionary[measurement_lab_index] = measurement_lab


In [9]:
def measurement_observation_processing(data_dictionary: dict):
    """ This function processes the 'measurement_observation' table of given data, which is inputed in a dictionary. The data in dictionary is replaced by index, hence function returns nothing.

    Parameters
    ----------
    'data_dictionary' : dict
        A dictionary of pandas DataFrames.
    """

    measurement_obs_ind = np.argmax([table.startswith("measurement_observation") for table in data_dictionary.keys()])
    measurement_obs_index = list(data_dictionary.keys())[measurement_obs_ind]
    measurement_obs = data_dictionary[measurement_obs_index]

    # measurement_obs = measurement_obs.dropna(subset=measurement_obs.select_dtypes(float).columns, how='all')
    measurement_obs.to_csv(f'./processed_data/processed_{measurement_obs_index}.csv')
    data_dictionary[measurement_obs_index] = measurement_obs    

In [10]:
def observation_processing(data_dictionary: dict):
    """ This function processes the 'observation' table of given data, which is inputed in a dictionary. The data in dictionary is replaced by index, hence function returns nothing.

    Parameters
    ----------
    'data_dictionary' : dict
        A dictionary of pandas DataFrames.
    """

    observation_ind = np.argmax([table.startswith("observation") for table in data_dictionary.keys()])
    observation_index = list(data_dictionary.keys())[observation_ind]
    observation = data_dictionary[observation_index]

    observation = observation.dropna(subset=observation.select_dtypes(object).columns, how='all')

    observation.to_csv(f'./processed_data/processed_{observation_index}.csv')

    data_dictionary[observation_index] = observation 

In [11]:
def procedures_processing(data_dictionary: dict):
    """ This function processes the 'observation' table of given data, which is inputed in a dictionary. The data in dictionary is replaced by index, hence function returns nothing.

    Parameters
    ----------
    'data_dictionary' : dict
        A dictionary of pandas DataFrames.

    Requirements
    ------------
    You need to run add_uids first.
    """
    # assert uids_added == True, 'You need to run add_uids before this function.'

    procedures_ind = np.argmax([table.startswith("proceduresoccurrences") for table in data_dictionary.keys()])
    procedures_index = list(data_dictionary.keys())[procedures_ind]
    procedures = data_dictionary[procedures_index]

    procedures = procedures.dropna(subset=procedures.select_dtypes(object).columns, how='all')


    visit_id_index = np.argmax([i.find('visit_occurrence') for i in procedures.columns])
    visit_column = procedures.columns[visit_id_index]
    procedures.drop(columns=visit_column,inplace=True)
    procedures.drop_duplicates(inplace=True)

    procedures.to_csv(f'./processed_data/processed_{procedures_index}.csv')


    data_dictionary[procedures_index] = procedures 

In [12]:
def devices_processing(data_dictionary: dict):
    """ This function processes the 'devices' table of given data, which is inputed in a dictionary. The data in dictionary is replaced by index, hence function returns nothing.

    Parameters
    ----------
    'data_dictionary' : dict
        A dictionary of pandas DataFrames.
    """

    devices_ind = np.argmax([table.startswith("devices") for table in data_dictionary.keys()])
    devices_index = list(data_dictionary.keys())[devices_ind]
    devices = data_dictionary[devices_index]

    devices = devices.dropna(subset=devices.select_dtypes(object).columns, how='all')

    devices.to_csv(f'./processed_data/processed_{devices_index}.csv')

    data_dictionary[devices_index] = devices

In [13]:
def sepsis_processing(data_dictionary: dict):
    """ This function processes the 'sepsis' table of given data, which is inputed in a dictionary. The data in dictionary is replaced by index, hence function returns nothing.

    Parameters
    ----------
    'data_dictionary' : dict
        A dictionary of pandas DataFrames.
    """
    sepsis_ind = np.argmax([table.startswith("SepsisLabel") for table in data_dictionary.keys()])
    sepsis_index = list(data_dictionary.keys())[sepsis_ind]
    sepsis = data_dictionary[sepsis_index]

    #no NA values found:
    sepsis = sepsis.dropna(subset=sepsis.select_dtypes(int).columns, how='all')

In [14]:
def process_data(data_type: str):
    """ This function reads in test or train data and goes through functions to preprocess it. For further details see specific functions.

    If processing has already been run then files from the 'processed_data' folder will be loaded instead of reprocessing.

    Returns factors, a DataFrame of processed data.

    Parameters
    ----------
    'data_type' : str
        This must be 'test' or 'train'.
    """
    assert (data_type=='test') or (data_type=='train'), f'You gave data_type as {data_type}. Please define data_type as "test" or "train."'

    if data_type == 'train':
        training_data = readin_data('train')
        add_uids(training_data)
        birthday_management(training_data)
        measurement_meds_processing(training_data)
        drugs_exposure_processing(training_data)
        measurement_lab_processing(training_data)
        procedures_processing(training_data)

        factors = pd.merge(left=training_data['measurement_meds_train'], right=training_data['measurement_lab_train'],how='outer',on='uid')
        factors = pd.merge(left=factors, right=training_data['drugsexposure_train'],how='outer',on='uid')
        factors = pd.merge(left=factors, right=training_data['proceduresoccurrences_train'],how='outer',on='uid')
        factors = pd.merge(left=factors, right=training_data['person_demographics_episode_train'], how='outer',on='visit_occurrence_id')
        factors = pd.merge(left=training_data['SepsisLabel_train'],right=factors,how='left',on='uid')

    else:
        testing_data = readin_data('test')
        add_uids(testing_data)
        birthday_management(testing_data)
        measurement_meds_processing(testing_data)
        drugs_exposure_processing(testing_data)
        measurement_lab_processing(testing_data)

        factors = pd.merge(left=testing_data['measurement_meds_test'], right=testing_data['measurement_lab_test'],how='outer',on='uid')
        factors = pd.merge(left=factors, right=testing_data['drugsexposure_test'],how='outer',on='uid')
        factors = pd.merge(left=factors, right=testing_data['person_demographics_episode_test'], how='outer',on='visit_occurrence_id')
        factors = pd.merge(left=testing_data['SepsisLabel_test'],right=factors,how='left',on='uid')

    return factors

In [None]:
process_data('train')

  3%|▎         | 4636/150407 [02:52<1:29:52, 27.03it/s]