# Loading MOVER SIS_EMR dataset

In [6]:
# Import necessary libraries
import pandas as pd
import numpy as np
from datetime import datetime

# MOVER (about 39,,685 patients, 64,354 cases)
# Define the path to the INSPIRE v2 dataset
input_path = 'mover/SIS_EMR'

# Load the source tables within INSPIRE v2 into dataframes
df_info = pd.read_csv(f'{input_path}/patient_information.csv')         # Load information data
df_medi = pd.read_csv(f'{input_path}/patient_medication.csv')       # Load medications data
df_labs = pd.read_csv(f'{input_path}/patient_labs.csv')       # Load labs data
df_io = pd.read_csv(f'{input_path}/patient_input_output.csv')       # Load input_output data
df_vitals = pd.read_csv(f'{input_path}/patient_vitals.csv', on_bad_lines='skip')          # Load  vitals data
df_obs = pd.read_csv(f'{input_path}/patient_observations.csv', on_bad_lines='skip')    # Load observations table
df_vent = pd.read_csv(f'{input_path}/patient_ventilator.csv', on_bad_lines='skip')
df_aline = pd.read_csv(f'{input_path}/patient_a_line.csv', on_bad_lines='skip')
df_events = pd.read_csv(f'{input_path}/patient_procedure_events.csv', on_bad_lines='skip')          # Load vitals data


# Display the number of records in each dataset
print(f'Size of the tables: information {len(df_info)}, medication {len(df_medi)},\
      labs {len(df_labs)}, input_output {len(df_io)}, vitals {len(df_vitals)}, observations {len(df_obs)},\
      ventilator {len(df_vent)}, aline {len(df_aline)}, events {len(df_events)}')

# Combine all the subject_ids from the loaded datasets
subject_ids = df_io['PID'].tolist() + df_labs['PID'].tolist() + df_medi['PID'].tolist() + df_info['PID'].tolist() + df_obs['PID'].tolist() + df_events['PID'].tolist() + df_vent['PID'].tolist() + df_aline['PID'].tolist()

# Display the total unique subjects present in the combined dataset
print(f'total subjects in MOVER dataset: {len(np.unique(subject_ids))}')
print(f"total subjects in patient_information.csv: {len(np.unique(df_info['PID']))}")

  df_obs = pd.read_csv(f'{input_path}/patient_observations.csv', on_bad_lines='skip')    # Load observations table


Size of the tables: information 19114, medication 373852,      labs 14733, input_output 100993, vitals 3847548, observations 3663066,      ventilator 1048575, aline 2989, events 40801
total subjects in MOVER dataset: 19158
total subjects in patient_information.csv: 19114


In [36]:
# Load Athena concept, concept_relationship table
df_concept_rel = pd.read_csv(f'vocab/CONCEPT_RELATIONSHIP.csv', sep='\t', on_bad_lines='error')
df_concept = pd.read_csv(f'vocab/CONCEPT.csv', sep='\t')

  df_concept = pd.read_csv(f'vocab/CONCEPT.csv', sep='\t')


In [17]:
df_info = df_info.drop_duplicates().reset_index(drop=True)
df_io = df_io.drop_duplicates().reset_index(drop=True)
df_medi = df_medi.drop_duplicates().reset_index(drop=True)
df_vitals = df_vitals.drop_duplicates().reset_index(drop=True)
df_events = df_events.drop_duplicates().reset_index(drop=True)
df_labs = df_labs.drop_duplicates().reset_index(drop=True)
df_obs = df_obs.drop_duplicates().reset_index(drop=True)
df_vent = df_vent.drop_duplicates().reset_index(drop=True)
df_aline = df_aline.drop_duplicates().reset_index(drop=True)

# Display the number of records in each dataset
print(f'Size of the tables: information {len(df_info)}, medication {len(df_medi)},\
      labs {len(df_labs)}, input_output {len(df_io)}, vitals {len(df_vitals)}, observations {len(df_obs)},\
      ventilator {len(df_vent)}, aline {len(df_aline)}, events {len(df_events)}')


Size of the tables: information 19114, medication 373616,      labs 14733, input_output 100967, vitals 3595596, observations 2684312,      ventilator 1048575, aline 2951, events 40697


In [24]:
### patient_information table
# Convert the datetime in string into datetime64[ns] type
df_info['OR_start'] = pd.to_datetime(df_info['OR_start'], format='%m/%d/%y %H:%M', errors='raise')
df_info['OR_end'] = pd.to_datetime(df_info['OR_end'], format='%m/%d/%y %H:%M', errors='raise')
df_info['Surgery_start'] = pd.to_datetime(df_info['Surgery_start'], format='%m/%d/%y %H:%M', errors='raise')
df_info['Surgery_end'] = pd.to_datetime(df_info['Surgery_end'], format='%m/%d/%y %H:%M', errors='raise')

# Change string into numeric value for height and weight
df_info['Ht'] = pd.to_numeric(df_info['Ht'], errors='coerce')
df_info['Wt'] = pd.to_numeric(df_info['Wt'], errors='coerce')

df_info.to_parquet(f'{input_path}/parquet/patient_information.parquet', index=False)

In [None]:
### patient medication table
# Original columns of medication table
columns = list(df_medi.columns)

# Regular expression to extract the string before a number or '/'
df_medi['ingredient'] = df_medi['Drug_name'].str.extract(r'([^0-9/]+)')
# Lower cases and strip whitespace
df_medi['ingredient'] = df_medi['ingredient'].str.lower().str.strip()

# Split ingredients that are coupled by '-'
df_medi = df_medi.assign(ingredient=df_medi['ingredient'].str.split('-')).explode('ingredient').reset_index(drop=True)
df_medi['ingredient'] = df_medi['ingredient'].str.strip()

# Matches the ingredient names with 1, 2, 3 words
df_medi['ingredients'] = df_medi['ingredient'].str.split()
df_medi['ingredient3'] = df_medi['ingredients'].apply(lambda x: ' '.join(x[:3])).str.strip()
df_medi['ingredient2'] = df_medi['ingredients'].apply(lambda x: ' '.join(x[:2])).str.strip()
df_medi['ingredient1'] = df_medi['ingredients'].apply(lambda x: ' '.join(x[:1])).str.strip()

# Map the source concept of drugs into standard concept (RxNorm)
rxnorm_concepts = df_concept[((df_concept['vocabulary_id'] == 'RxNorm') | (df_concept['vocabulary_id'] == 'RxNorm Extension')) & (df_concept['standard_concept'] == 'S')][['concept_name', 'concept_id']]
rxnorm_concepts['concept_name'] = rxnorm_concepts['concept_name'].str.lower()

df_medi['concept_id3'] = df_medi.merge(rxnorm_concepts, left_on='ingredient3', right_on='concept_name', how='left')['concept_id']
df_medi['concept_id2'] = df_medi.merge(rxnorm_concepts, left_on='ingredient2', right_on='concept_name', how='left')['concept_id']
df_medi['concept_id1'] = df_medi.merge(rxnorm_concepts, left_on='ingredient1', right_on='concept_name', how='left')['concept_id']

df_medi['concept_id'] = df_medi[['concept_id3', 'concept_id2', 'concept_id1']].bfill(axis=1).iloc[:, 0]
df_medi['concept_id'] = df_medi['concept_id'].astype('Int64')

# Calculate and print the number of MEDICATIONS table records that couldn't be mapped to a standard concept
nan_sum = df_medi['concept_id'].isna().sum()
print(f'mismatched concepts in MEDICATIONS table: {nan_sum} / {len(df_medi)} ({nan_sum/len(df_medi)*100:.1f}%)')

#df_medi.dropna(columns='concept_id', inplace=True)
df_medi = df_medi[columns+['ingredient', 'concept_id']]

In [55]:
# previous method : just comparing exact concept name with ingredient
df_medi.drop(columns='concept_id', inplace=True)

### patient medication table
# Regular expression to extract the string before a number or '/'
df_medi['ingredient'] = df_medi['Drug_name'].str.extract(r'([^0-9/]+)')
# Lower cases and strip whitespace
df_medi['ingredient'] = df_medi['ingredient'].str.lower().str.strip()

# Split ingredients that are coupled by '-'
df_medi = df_medi.assign(ingredient=df_medi['ingredient'].str.split('-')).explode('ingredient').reset_index(drop=True)
df_medi['ingredient'] = df_medi['ingredient'].str.strip()

# Map the source concept of drugs into standard concept (RxNorm)
drug_concept = df_concept[((df_concept['vocabulary_id'] == 'RxNorm') | (df_concept['vocabulary_id'] == 'RxNorm Extension')) & (df_concept['standard_concept'] == 'S')][['concept_name', 'concept_id']]
drug_concept['concept_name'] = drug_concept['concept_name'].str.lower()

df_medi['concept_id'] = df_medi.merge(drug_concept, left_on='ingredient', right_on='concept_name', how='left')['concept_id']

# Calculate and print the number of MEDICATIONS table records that couldn't be mapped to a standard concept
nan_sum = df_medi['concept_id'].isna().sum()

#df_medi.dropna(columns='

print(f'mismatched concepts in MEDICATIONS table: {nan_sum} / {len(df_medi)} ({nan_sum/len(df_medi)*100:.1f}%)')


mismatched concepts in MEDICATIONS table: 133241 / 460664 (28.9%)


* medi_counts

In [58]:
# Medi_counts
drugs_per_subject = df_medi.drop_duplicates(subset=['PID', 'ingredient'])
drug_counts = drugs_per_subject['ingredient'].value_counts().to_frame()
drug_counts.reset_index(inplace=True)
# 문제는 ingredient 하나에 여러개 medication이 대응될 수 있음 (medication을 split해서 ingredient를 만들어서, 원래 ingredient 하나 자체가 medication인 경우와 중복)
drug_counts['medication'] = drug_counts.merge(drugs_per_subject.drop_duplicates(['ingredient']), on='ingredient', how='left')['Drug_name']

# Map the source concept of drugs into standard concept (RxNorm)
rxnorm_concepts = df_concept[((df_concept['vocabulary_id'] == 'RxNorm') | (df_concept['vocabulary_id'] == 'RxNorm Extension')) & (df_concept['standard_concept'] == 'S')][['concept_name', 'concept_id']]
rxnorm_concepts['concept_name'] = rxnorm_concepts['concept_name'].str.lower()

drug_mapped = drug_counts.merge(rxnorm_concepts, left_on='ingredient', right_on='concept_name', how='left')

drug_mismatch = drug_mapped[drug_mapped['concept_id'].isna()]

drug_counts['perc'] = drug_counts['count'] / len(df_medi['PID'].unique()) * 100
#drug_counts['mismatch'] = np.where(drug_counts['ingredient'].isin(drug_mismatch['ingredient']), 1, np.nan)
#drug_counts.to_csv('results/mover_medi_counts.csv', index=False)

# Remove rows that have empty ingredient
drug_counts = drug_counts[drug_counts['ingredient']!='']

# Add an auxiliary column to maintain the order
drug_counts['order'] = range(len(drug_counts))

# Step 1: Extract the first word of 'ingredient' if it has more than one word
drug_counts['first_word'] = drug_counts['ingredient'].str.split().str[0]

# Step 2: Merge with rxnorm_concepts on 'ingredient' for exact matches
exact_matches = pd.merge(drug_counts, rxnorm_concepts, left_on='ingredient', right_on='concept_name', how='left')

# Step 3: Identify which entries didn't get a match
non_matches = drug_counts[~drug_counts['ingredient'].isin(exact_matches['concept_name'])]
exact_matches.dropna(subset='concept_id', inplace=True)

# Step 4: Merge the non-matches with the 'concept_name' based on 'first_word'
partial_matches = pd.merge(non_matches, rxnorm_concepts, left_on='first_word', right_on='concept_name', how='left')

# Step 5: Combine the exact matches with the partial matches
combined_matches = pd.concat([exact_matches, partial_matches]).sort_values(by='order')

# Step 6: Drop the temporary 'first_word' columns and any duplicates that might have arisen
final_df = combined_matches.drop_duplicates(['first_word', 'order']).drop(columns=['first_word', 'order'])

# Check mismatch
final_df['mismatch'] = (final_df['concept_id'].isna()).astype(int)

final_df.to_csv('results/mover_sis_medi_counts.csv', index=False)

In [77]:
# 이어서 manually mapped medications 결과를 불러옴
medi_mapped = pd.read_csv('results/mover_sis_medi_counts+manual.csv')

df_medi['concept_id'] = df_medi.merge(medi_mapped, on='ingredient', suffixes=('_', None), how='left')['concept_id']

# Convert string into datetime
df_medi['Start_time'] = pd.to_datetime(df_medi['Start_time'], format='%m/%d/%y %H:%M', errors='raise')
df_medi['End_time'] = pd.to_datetime(df_medi['End_time'], format='%m/%d/%y %H:%M', errors='coerce')

# Save the result in parquet
df_medi.dropna(subset=['Drug_name', 'Start_time'], inplace=True, ignore_index=True)
df_medi.to_parquet(f'{input_path}/parquet/patient_medications.parquet')

In [104]:
# Load manually mapped concepts
df_params = pd.read_csv('mover/parameters_mover-sis_mapped.csv')


### patient_labs table ###
# Melt columns of labs table into rows
labs_columns = list(df_labs.columns)
df_labs = df_labs.melt(id_vars=labs_columns[:2], var_name='Label')

# Merge manually mapped concepts into labs table
labs_mapped = df_params[df_params['Table']=='labs']
df_labs = df_labs.merge(labs_mapped[['Label', 'Unit', 'concept_id', 'unit_concept_id']], on='Label', how='left')

# Convert string into datetime
df_labs['Obs_time'] = pd.to_datetime(df_labs['Obs_time'].astype(str), format='%Y-%m-%d %H:%M:%S', errors='coerce')

# Drop rows with nan value and save into parquet
df_labs.dropna(subset=['Obs_time', 'Label', 'value', 'concept_id'], inplace=True, ignore_index=True)
df_labs.to_parquet(f'{input_path}/parquet/patient_labs.parquet')

In [108]:
### patient vitals table ###
# Melt columns of labs table into rows
vitals_columns = list(df_vitals.columns)
df_vitals = df_vitals.melt(id_vars=vitals_columns[:2], var_name='Label')

# Merge manually mapped concepts into vitals table
vitals_mapped = df_params[df_params['Table']=='vitals']
df_vitals = df_vitals.merge(vitals_mapped[['Label', 'Unit', 'concept_id', 'unit_concept_id']], on='Label', how='left')

# Convert string into datetime
df_vitals['Obs_time'] = pd.to_datetime(df_vitals['Obs_time'].astype(str), format='%Y-%m-%d %H:%M:%S', errors='coerce')

# Drop rows with nan value and save into parquet
df_vitals.dropna(subset=['Obs_time', 'Label', 'value', 'concept_id'], inplace=True, ignore_index=True)
df_vitals.to_parquet(f'{input_path}/parquet/patient_vitals.parquet')

In [158]:
### patient observations table ###
# Melt columns of labs table into rows
obs_columns = list(df_obs.columns)
df_obs = df_obs.melt(id_vars=obs_columns[:2], value_vars=obs_columns[2:], var_name='Label')

# Merge manually mapped concepts into observation table
obs_mapped = df_params[df_params['Table']=='observations']
df_obs = df_obs.merge(obs_mapped[['Label', 'Unit', 'concept_id', 'unit_concept_id']], on='Label', how='left')

# Convert string into datetime
df_obs['Obs_time'] = pd.to_datetime(df_obs['Obs_time'].astype(str), format='%Y-%m-%d %H:%M:%S', errors='coerce')
df_obs['value'] = pd.to_numeric(df_obs['value'].astype(str), errors='coerce')

# Drop rows with nan value and save into parquet
df_obs.dropna(subset=['Obs_time', 'Label', 'value', 'concept_id'], inplace=True, ignore_index=True)
df_obs.to_parquet(f'{input_path}/parquet/patient_observations.parquet')

In [156]:
df_obs.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6162792 entries, 0 to 6162791
Data columns (total 7 columns):
 #   Column           Dtype         
---  ------           -----         
 0   PID              object        
 1   Obs_time         datetime64[ns]
 2   Label            object        
 3   value            float64       
 4   Unit             object        
 5   concept_id       object        
 6   unit_concept_id  int64         
dtypes: datetime64[ns](1), float64(1), int64(1), object(4)
memory usage: 329.1+ MB


In [189]:
### patient ventilator table ###
vent_columns = list(df_vent.columns)
df_vent = df_vent.rename(columns={'ETC02':'ETCO2', 'FIC02': 'FICO2', 'FIN20': 'FIN2O', 'ETN20': 'ETN2O', 'FI02': 'FIO2', 'ET02':'ETO2'})
df_vent = df_vent.melt(id_vars=vent_columns[:3], var_name='Label')


# Convert Agent, Agent_Fi, Agent_Et into {Agent}_Fi, {Agent}_Et
mask = df_vent['Label'].str.contains('Agent')
df_vent.loc[mask, 'Label'] = df_vent.loc[mask, 'Agent'] + '_' + df_vent.loc[mask, 'Label'].str.split('_').str[1]
# Drop unnecessary column
df_vent = df_vent.drop('Agent', axis=1)

# Merge manually mapped concepts into observation table
vent_mapped = df_params[df_params['Table']=='ventilator']
df_vent = df_vent.merge(vent_mapped[['Label', 'Unit', 'concept_id', 'unit_concept_id']], on='Label', how='left')

# Convert string into datetime
df_vent['Obs_time'] = pd.to_datetime(df_vent['Obs_time'], format='%m/%d/%y %H:%M', errors='coerce')
df_vent['value'] = pd.to_numeric(df_vent['value'].astype(str), errors='coerce')

# Drop rows with nan value and save into parquet
df_vent.dropna(subset=['Obs_time', 'Label', 'value', 'concept_id'], inplace=True, ignore_index=True)
df_vent.to_parquet(f'{input_path}/parquet/patient_ventervations.parquet')

In [195]:
df_vent.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 12582900 entries, 0 to 12582899
Data columns (total 7 columns):
 #   Column           Dtype         
---  ------           -----         
 0   PID              object        
 1   Obs_time         datetime64[ns]
 2   Label            object        
 3   value            float64       
 4   Unit             object        
 5   concept_id       object        
 6   unit_concept_id  int64         
dtypes: datetime64[ns](1), float64(1), int64(1), object(4)
memory usage: 672.0+ MB


In [200]:
### I/O table ###
df_io['IO_datetime'] = pd.to_datetime(df_io['IO_datetime'].astype(str), format='%Y-%m-%d %H:%M:%S', errors='coerce')
df_io.to_parquet(f'{input_path}/parquet/patient_input_output.parquet')

### Procedure Events table ###
df_events['Event_time'] = pd.to_datetime(df_events['Event_time'].astype(str), format='%Y-%m-%d %H:%M:%S', errors='coerce')
df_events.to_parquet(f'{input_path}/parquet/patient_procedure_events.parquet')

# Person

In [204]:
import os

save_path = 'MOVER_ETL/sis'

# Make directory to save the results
if not os.path.exists(save_path):
    os.mkdir(save_path)
if not os.path.exists(f'{save_path}/sample'):
    os.mkdir(f'{save_path}/sample')    
    

# start_index for each table_id
start_index = {
'person': 1000000,
'observation_period': 2000000,
'visit_occurrence': 3000000,
'visit_detail': 4000000,
'condition_occurrence': 5000000,
'drug_exposure': 6000000,
'procedure_occurrence': 7000000,
'measurement': 8000000,
'death': 9000000,
'note': 10000000,
'location': 20000000 
}

In [205]:
# Create an empty dataframe with defined columns for PERSON table
df_person = pd.DataFrame()

# Assign unique IDs to each distinct 'PID' from the operations data
unique_ids = df_info['PID'].unique()
df_person['PERSON_ID'] = start_index['person'] + np.arange(1, len(unique_ids) + 1)
df_person['PID'] = unique_ids

# Merge relevant columns from the operations dataframe with the PERSON dataframe based on 'PID'
usecols = ['PID', 'Age', 'Surgery_start', 'Gender']
df_person = df_person.merge(df_info[usecols], on = 'PID', how='left')
# Ensure only the latest discharge_time is retained for each unique PERSON_ID
df_person.drop_duplicates(subset = 'PERSON_ID', keep = 'last', inplace = True, ignore_index = True)

# Map gender values ('M' or 'F') to corresponding GENDER_CONCEPT_ID values
df_person['GENDER_CONCEPT_ID'] = df_person['Gender'].map({'Male': 8507, 'Female': 8532})

# Remove any rows with missing gender values
df_person.dropna(subset=['GENDER_CONCEPT_ID'])

# Calculate and assign the year of birth based on age and the start date
df_person['YEAR_OF_BIRTH'] = df_person['Surgery_start'].dt.year - df_person['Age']
# Compute the exact birth datetime using age and start date
df_person['BIRTH_DATETIME'] = df_person['YEAR_OF_BIRTH'].apply(lambda x: datetime(year=x, month=1, day=1))

# Set RACE_CONCEPT_ID to indicate all individuals are
#df_person['RACE_CONCEPT_ID']

# Assign default values for LOCATION_ID (2 for MOVER-SIS)
df_person['LOCATION_ID'] = 2
#df_person['PROVIDER_ID'] = 0

# Populate source value columns based on values from the operations data
df_person['PERSON_SOURCE_VALUE'] = df_person['PID']
df_person['GENDER_SOURCE_VALUE'] = df_person['Gender']
#df_person['RACE_SOURCE_VALUE'] = df_person['race']
#df_person['RACE_SOURCE_CONCEPT_ID'] = 8515

# Remove columns that aren't part of the final PERSON table format
df_person.drop(columns=usecols, inplace=True)

# Write the processed data to a CSV file
df_person.to_csv(f'{save_path}/MOVER_PERSON.csv', index=False)
df_person.to_parquet(f'{save_path}/MOVER_PERSON.parquet')
# sample
df_person[:1000].to_csv(f'{save_path}/sample/MOVER_PERSON.csv', index=False)

# Observation period

In [None]:
# Create an empty dataframe for OBSERVATION_PERIOD table
df_obs = pd.DataFrame()

# Copy PERSON_ID from PERSON table to OBSERVATION_PERIOD_ID and PERSON_ID columns in OBSERVATION_PERIOD table
df_obs['PERSON_ID'] = df_person['PERSON_ID']
df_obs['OBSERVATION_PERIOD_ID'] = start_index['observation_period'] + np.arange(1, len(df_obs) + 1)

# Copy PERSON_SOURCE_VALUE from PERSON table to MRN in OBSERVATION_PERIOD table for merging purposes
df_obs['MRN'] = df_person['PERSON_SOURCE_VALUE']


usecols = ['MRN', 'HOSP_ADMSN_TIME', 'HOSP_DISCH_TIME']
# Get the earliest admission time and latest discharge time for each MRN
grouped = df_info.groupby('MRN').agg({
    'HOSP_ADMSN_TIME': 'min',
    'HOSP_DISCH_TIME': 'max'
}).reset_index()
# Merge to observation table
df_obs = df_obs.merge(grouped, on='MRN', how='left')

# Set the OBSERVATION_PERIOD_START_DATE to admission time
df_obs['OBSERVATION_PERIOD_START_DATE'] = pd.to_datetime(df_obs['HOSP_ADMSN_TIME'].dt.date)
# Set the OBSERVATION_PERIOD_END_DATE to discharge time
df_obs['OBSERVATION_PERIOD_END_DATE'] = pd.to_datetime(df_obs['HOSP_DISCH_TIME'].dt.date)

# Assign the PERIOD_TYPE_CONCEPT_ID indicating the data source is an EHR
df_obs['PERIOD_TYPE_CONCEPT_ID'] = 32817

# Remove columns that aren't part of the final OBSERVATION_PERIOD table format
df_obs.drop(columns=usecols, inplace=True)

# Write the processed data to a CSV file
df_obs.to_csv(f'{save_path}/MOVER_OBSERVATION_PERIOD.csv', index=False)
df_obs.to_parquet(f'{save_path}/MOVER_OBSERVATION_PERIOD.parquet')
# sample
df_obs[:1000].to_csv(f'{save_path}/sample/MOVER_OBSERVATION_PERIOD.csv', index=False)

In [206]:
!pip -V

/bin/bash: /opt/anaconda-3-2020.02/envs/hskim/lib/libtinfo.so.6: no version information available (required by /bin/bash)
pip 21.3.1 from /usr/local/lib/python3.8/dist-packages/pip (python 3.8)
