# Python ETL Synthea to OMOP

References: https://github.com/scivm/ETL-Synthea-Python/blob/master/python_etl/SyntheaToOmop.py

In [1]:
import pandas as pd
import numpy as np
import datetime
from database_postgres import DatabasePostgres

In [5]:

def getColumns(table_name):
    db = DatabasePostgres()
    columns = (db.select(f"""select column_name
                             from information_schema.columns where table_name = '{table_name}' 
                         order by ordinal_position;
                         """)['column_name'].values)
    return list(columns)

def insertData(data,table_name):
    db = DatabasePostgres()
    data_columns_names = data.columns
    response = db.insert_many(schema='cdm',
                   table_name=table_name,
                   data=data,
                   column_names=','.join(data_columns_names))
    return {'table_name':table_name,'response':response}

In [6]:
def patienthash( id):
    return hash(id) & ((1<<63)-1)

# given date in synthea format return the year
def getYearFromSyntheaDate( date):
    return datetime.datetime.strptime(date, "%Y-%m-%d").year

# given date in synthea format return the month
def getMonthFromSyntheaDate( date):
    return datetime.datetime.strptime(date, "%Y-%m-%d").month

# given date in synthea format return the day
def getDayFromSyntheaDate( date):
    return datetime.datetime.strptime(date, "%Y-%m-%d").day

# given gender as M or F return the OMOP concept code
def getGenderConceptCode( gender):
    gendre = gender.upper()
    if gender=='M':
        return '8507'
    elif gender=='F':
        return '8532'
    else:
        return 0

# given synthea race code return omop code
def getRaceConceptCode( race):
    race = race.upper()
    if race=='WHITE':
        return '8527'
    elif race=='BLACK':
        return '8516'
    elif race=='ASIAN':
        return 8515
    else:
        return '0'

def getEthnicityConceptCode( eth):
    eth = eth.upper()
    #if race=='HISPANIC' or eth=='CENTRAL_AMERICAN' or eth=='DOMINICAN' or eth=='MEXICAN' or eth=='PUERTO_RICAN' or eth=='SOUTH_AMERICAN':
    if eth=='CENTRAL_AMERICAN' or eth=='DOMINICAN' or eth=='MEXICAN' or eth=='PUERTO_RICAN' or eth=='SOUTH_AMERICAN':
        return '38003563'
    else:
        return '0'

In [238]:
def patientsToOmop(column_names, df):
        person = pd.DataFrame(columns=column_names)
        person['person_id'] = df['Id'].apply(patienthash)
        person['gender_concept_id'] = df['GENDER'].apply(getGenderConceptCode)
        person['year_of_birth'] = df['BIRTHDATE'].apply(getYearFromSyntheaDate)
        person['month_of_birth'] = df['BIRTHDATE'].apply(getMonthFromSyntheaDate)
        person['day_of_birth'] = df['BIRTHDATE'].apply(getDayFromSyntheaDate)
        person['race_concept_id'] =  df['RACE'].apply(getRaceConceptCode)
        person['ethnicity_concept_id'] = df['ETHNICITY'].apply(getEthnicityConceptCode)
        person['location_id'] = df['Id'].apply(patienthash)
        person['person_source_value'] = df['Id']
        person['gender_source_concept_id']=df['GENDER'].apply(getGenderConceptCode)
        person['race_source_value'] = df['RACE']
        person['race_source_concept_id'] = df['RACE'].apply(getRaceConceptCode) 
        person['ethnicity_source_value'] = df['ETHNICITY']
        person['ethnicity_source_concept_id'] = df['ETHNICITY'].apply(getEthnicityConceptCode)
        # filter out person's with missing or unknown gender
        person = person[person['gender_concept_id'] != 0]
        person= person.where(pd.notnull(person), None)
        return person

def locationToOmop(column_names, df):
        df['STATE']= np.where(df['STATE']=='Massachusetts','MA',None)
        location = pd.DataFrame(columns=column_names)
        location['location_id'] = df['Id'].apply(patienthash)
        location['address_1'] = df['ADDRESS']
        location['city'] = df['CITY']
        location['state'] = df['STATE']
        location['zip'] = df['ZIP']
        location['county'] = df['COUNTY']
        location['location_source_value'] = df['Id']
        location= location.where(pd.notnull(location), None)
        return location

def deathToOmop(column_names, df):
        death = pd.DataFrame(columns=column_names)
        death['person_id'] = df['Id'].apply(patienthash)
        death['death_date'] = df['DEATHDATE']
        death =  death[death.death_date.notnull()]  
        death= death.where(pd.notnull(death), None)
        death['death_type_concept_id'] = '38003564'
        return death

def conditionsToOmop( df, column_names,condition_id= 1):
        condition_occurrence = pd.DataFrame(columns=column_names)
        condition_occurrence['condition_occurrence_id'] = range(condition_id, condition_id+len(df))
        condition_id = condition_id+len(df)
        condition_occurrence['person_id'] = df['PATIENT'].apply(patienthash)
        condition_occurrence['condition_start_date'] = df['START']
        condition_occurrence['condition_end_date'] = df['STOP']
        condition_occurrence['visit_occurrence_id'] = df['ENCOUNTER'].apply(patienthash)
        condition_occurrence['condition_concept_id'] = df['CODE']
        condition_occurrence['condition_source_value'] = df['CODE']
        condition_occurrence['condition_source_concept_id'] = df['CODE']
        condition_occurrence['condition_type_concept_id'] = '32020'
        condition_occurrence= condition_occurrence.where(pd.notnull(condition_occurrence), None)
        return condition_occurrence

def drugToOmop( df, column_names,drug_id=1):
        drug_exposure = pd.DataFrame(columns=column_names)
        drug_exposure['drug_exposure_id']= range(drug_id, drug_id+len(df))
        drug_id_max = int(drug_exposure['drug_exposure_id'].max())
        drug_exposure['person_id'] = df['PATIENT'].apply(patienthash)
        drug_exposure['drug_exposure_start_date'] = df['START']
        drug_exposure['drug_exposure_end_date'] = df['STOP']
        drug_exposure['verbatim_end_date'] = df['STOP']
        drug_exposure['visit_occurrence_id'] = df['ENCOUNTER'].apply(patienthash)
        drug_exposure['drug_concept_id'] = df['CODE']
        drug_exposure['drug_source_value'] = df['CODE']
        drug_exposure['drug_source_concept_id'] = df['CODE']
        drug_exposure['drug_type_concept_id'] = '581452'
        drug_exposure['days_supply'] = '1' 
        drug_exposure['route_concept_id']='0'
        drug_exposure= drug_exposure.where(pd.notnull(drug_exposure), None)
        return drug_exposure,drug_id_max

def observationToOmop( df, column_names,observation_id=1):
        df['CATEGORY_ID']='0'
        df['CATEGORY_ID']=np.where(df['CATEGORY']=='laboratory','261904005',df['CATEGORY_ID'])
        df['CATEGORY_ID']=np.where(df['CATEGORY']=='survey','44804317',df['CATEGORY_ID'])
        df['CATEGORY_ID']=np.where(df['CATEGORY']=='vital-signs','46680005',df['CATEGORY_ID'])
        df['CATEGORY_ID']=np.where(df['CATEGORY']=='social-history','161033005',df['CATEGORY_ID'])
        df['CATEGORY_ID']=np.where(df['CATEGORY']=='procedure','71388002',df['CATEGORY_ID'])
        df['CATEGORY_ID']=np.where(df['CATEGORY']=='exam','425044008',df['CATEGORY_ID'])
        df['CATEGORY_ID']=np.where(df['CATEGORY']=='therapy','276239002',df['CATEGORY_ID'])
        df['CATEGORY_ID']=np.where(df['CATEGORY']=='363679005','44804317',df['CATEGORY_ID'])


        observation = pd.DataFrame(columns=column_names)
        observation['observation_id'] = range(observation_id, observation_id+len(df))
        observation_id_max = int(observation['observation_id'].max())
        observation['person_id'] = df['PATIENT'].apply(patienthash)
        observation['observation_date'] = df['DATE']
        observation['visit_occurrence_id'] = df['ENCOUNTER'].apply(patienthash)
        observation['observation_concept_id'] = df['CATEGORY_ID']
        observation['observation_source_value'] = df['CODE']
        observation['observation_source_concept_id'] = df['CODE']
        observation['observation_type_concept_id'] = '38000280'
        observation['observation_datetime']= df['DATE']
        observation['qualifier_concept_id']=0
        observation= observation.where(pd.notnull(observation), None)
        return observation,observation_id_max

def allergiesToOmop( df, column_names,allergy_id=1):
        observation = pd.DataFrame(columns=column_names)
        observation['observation_id'] = range(allergy_id, allergy_id+len(df))
        observation_id_max = int(observation['observation_id'].max())
        observation['person_id'] = df['PATIENT'].apply(patienthash)
        observation['observation_date'] = df['START']
        observation['visit_occurrence_id'] = df['ENCOUNTER'].apply(patienthash)
        observation['observation_concept_id'] = df['CODE']
        observation['observation_source_value'] = df['CODE']
        observation['observation_source_concept_id'] = df['CODE']
        observation['observation_datetime']= df['START']
        observation['observation_type_concept_id'] = '43021226'
        observation['qualifier_concept_id']=0
        observation= observation.where(pd.notnull(observation), None)
        return observation,observation_id_max

def proceduresToOmop( df, column_names,procedure_id=1):
        procedure_occurrence = pd.DataFrame(columns=column_names)
        procedure_occurrence['person_id'] = df['PATIENT'].apply(patienthash)
        procedure_occurrence['procedure_date'] = df['START']
        procedure_occurrence['procedure_datetime'] = df['START']
        procedure_occurrence['procedure_end_datetime'] = df['STOP']
        procedure_occurrence['procedure_end_date'] = df['STOP']
        
        procedure_occurrence['procedure_occurrence_id']= range(procedure_id, procedure_id+len(df))
        procedure_occurrence['visit_occurrence_id'] = df['ENCOUNTER'].apply(patienthash)
        procedure_occurrence['procedure_concept_id'] = df['CODE']
        procedure_occurrence['procedure_source_value'] = df['CODE']
        procedure_occurrence['procedure_source_concept_id'] = df['CODE']
        procedure_occurrence['procedure_type_concept_id']= '71388002'
        procedure_occurrence['modifier_concept_id']=0
        procedure_occurrence= procedure_occurrence.where(pd.notnull(procedure_occurrence), None)
        return procedure_occurrence

def immunizationsToOmop(df,column_names,inmunization_id=1):
        drug_exposure = pd.DataFrame(columns=column_names)
        drug_exposure['drug_exposure_id']= range(inmunization_id, inmunization_id+len(df))
        drug_exposure_max_id = int(drug_exposure['drug_exposure_id'].max())
        drug_exposure['person_id'] = df['PATIENT'].apply(patienthash)
        drug_exposure['drug_exposure_start_date'] = df['DATE']
        drug_exposure['drug_exposure_end_date'] = df['DATE']
        drug_exposure['verbatim_end_date'] = df['DATE']
        drug_exposure['visit_occurrence_id'] = df['ENCOUNTER'].apply(patienthash)
        drug_exposure['drug_concept_id'] = df['CODE']
        drug_exposure['drug_source_value'] = df['CODE']
        drug_exposure['drug_source_concept_id'] = df['CODE']
        drug_exposure['drug_type_concept_id'] = '581452'
        drug_exposure['days_supply'] = '1'
        drug_exposure['route_concept_id']='0'
        drug_exposure= drug_exposure.where(pd.notnull(drug_exposure), None)
        return drug_exposure,drug_exposure_max_id

def imagesToOmop( df, column_names):
        procedure_occurrence = pd.DataFrame(columns=column_names)
        procedure_occurrence['person_id'] = df['PATIENT'].apply(patienthash)
        procedure_occurrence['procedure_date'] = df['DATE']
        procedure_occurrence['procedure_datetime'] = df['DATE']
        procedure_occurrence['procedure_end_datetime'] = df['DATE']
        procedure_occurrence['procedure_end_date'] = df['DATE']
        procedure_occurrence['procedure_occurrence_id']= df['Id'].apply(patienthash)
        procedure_occurrence['visit_occurrence_id'] = df['ENCOUNTER'].apply(patienthash)
        procedure_occurrence['procedure_concept_id'] = df['PROCEDURE_CODE']
        procedure_occurrence['procedure_source_value'] = df['PROCEDURE_CODE']
        procedure_occurrence['procedure_source_concept_id'] = df['PROCEDURE_CODE']
        procedure_occurrence['procedure_type_concept_id']= '24587005'
        procedure_occurrence['modifier_concept_id']=0
        procedure_occurrence= procedure_occurrence.where(pd.notnull(procedure_occurrence), None)
        return procedure_occurrence

def providersToOmop( df, column_names):
        provider = pd.DataFrame(columns=getColumns(table_name='provider'))
        provider['provider_id'] = df['Id'].apply(patienthash)
        provider['care_site_id'] = df['ORGANIZATION'].apply(patienthash)
        provider['provider_name'] = df['NAME']
        provider['gender_source_value']=  df['GENDER']
        provider['gender_concept_id']=  df['GENDER'].apply(getGenderConceptCode)
        provider['gender_source_concept_id']= df['GENDER'].apply(getGenderConceptCode)
        provider['specialty_concept_id']=393262001 # general practice
        provider['specialty_source_concept_id'] = '0'
        provider['provider_source_value'] = df['Id']
        provider= provider.where(pd.notnull(provider), None)
        return provider


## Transformations

In [239]:
drug_id = 1
observation_id=1

# Person
person = patientsToOmop(column_names =  getColumns(table_name='person'),
                          df = pd.read_csv('data/patients.csv'))

location = patientsToOmop(column_names =  getColumns(table_name='location'),
                          df = pd.read_csv('data/patients.csv'))

death = deathToOmop(column_names =  getColumns(table_name='death'),
                            df = pd.read_csv('data/patients.csv'))

# condtions
condition = conditionsToOmop(column_names= getColumns(table_name='condition_occurrence'),
    df = pd.read_csv('data/conditions.csv'))

observation,observation_id = observationToOmop(column_names= getColumns(table_name='observation'),
    df = pd.read_csv('data/observations.csv'))

allergy,observation_id = allergiesToOmop(column_names= getColumns(table_name='observation'),
    df = pd.read_csv('data/allergies.csv'),allergy_id=observation_id+1)

observation=pd.concat([observation,allergy]).reset_index(drop=True)
observation['observation_id']=observation.reset_index()['index'].values

procedure = proceduresToOmop(column_names= getColumns(table_name='procedure_occurrence'),   
    df = pd.read_csv('data/procedures.csv'))

device  = proceduresToOmop(column_names= getColumns(table_name='procedure_occurrence'),   
    df = pd.read_csv('data/devices.csv'))

images = imagesToOmop(column_names= getColumns(table_name='procedure_occurrence'),
    df = pd.read_csv('data/imaging_studies.csv'))

procedure=pd.concat([procedure,images,device]).reset_index(drop=True)
procedure['procedure_occurrence_id']=procedure.reset_index()['index'].values

provider = providersToOmop( column_names=  getColumns(table_name='provider'),
    df = pd.read_csv('data/providers.csv'))

drug_exposure,drug_id = drugToOmop(column_names= getColumns(table_name='drug_exposure'),
    df = pd.read_csv('data/medications.csv'))

inmunization,drug_id = immunizationsToOmop(column_names= getColumns(table_name='drug_exposure'),
    df = pd.read_csv('data/immunizations.csv'),
    inmunization_id=drug_id+1)

drug_exposure=pd.concat([drug_exposure,inmunization]).reset_index(drop=True)
drug_exposure['drug_exposure_id']=drug_exposure.reset_index()['index'].values

location = locationToOmop(column_names =  getColumns(table_name='location'),
                          df = pd.read_csv('data/patients.csv'))

## Load Data 

In [None]:
insertData(data=person,table_name='person')
insertData(data=condition,table_name='condition_occurrence')
insertData(data=death,table_name='death')
insertData(data=provider,table_name='provider')
insertData(data=procedure,table_name='procedure_occurrence')
insertData(data=observation,table_name='observation')
insertData(data=drug_exposure,table_name='drug_exposure')
insertData(data=location,table_name='location')

## Vocabularies

In [None]:

db = DatabasePostgres()
column_names = ['concept_id','concept_name','domain_id','vocabulary_id','concept_class_id','standard_concept','concept_code','valid_start_date','valid_end_date','invalid_reason']
db.copy(filepath='data/concept.csv',table='concept',schema='vocabularies',columns=column_names,delimeter='\t')



In [17]:
db.select('select * from vocabularies.concept limit 1;')

Unnamed: 0,concept_id,valid_start_date,valid_end_date,concept_name,domain_id,vocabulary_id,concept_class_id,concept_code,standard_concept,invalid_reason
0,21600572,1970-01-01,2099-12-31,lactitol; oral,Drug,ATC,ATC 5th,A06AD12,C,
