In [39]:
import pyodbc
import pandas as pd
import numpy as np
import seaborn as sns
import urllib
import requests
import sqlalchemy as sqla
import matplotlib as mpl
import xml.etree.ElementTree as ET

from matplotlib import pyplot as plt
from importlib import reload
from tqdm import tqdm_notebook
from itertools import chain

In [42]:
hy_db = 'hy180_2021'

with open('dbmi-aetna-cxn-str') as f:
    cxn_params = f.read().strip()
    gmw_cxn_str = urllib.parse.quote_plus(cxn_params + 'Database=gmw3')
    aetna_raw_cxn_str = urllib.parse.quote_plus(cxn_params + 'Database=AetnaDataWarehouse')
    hy_cxn_str = urllib.parse.quote_plus(cxn_params + f'Database={hy_db}')
    
engine = sqla.create_engine("mssql+pyodbc:///?odbc_connect=%s" % gmw_cxn_str, connect_args = {'autocommit':True})
cxn = engine.connect()
ins = sqla.inspect(engine)

In [None]:
hy_engine = sqla.create_engine("mssql+pyodbc:///?odbc_connect=%s" % hy_cxn_str, connect_args = {'autocommit': True, 'fast_executemany': True})
hy_cxn = hy_engine.connect()

# Enrollment min length

In [43]:
min_enrollment_query = (f"drop table if exists {hy_db}.dbo.EnrollFourPlusYears;"
                       " select MemberNum, LongestEnrollmentStartDate as enrollDate, LongestEnrollmentMonths as enrollTotalMonths, dateadd(m, 48, LongestEnrollmentStartDate) as enrollDatePlusFourYears"
                       f" into {hy_db}.dbo.EnrollFourPlusYears"
                       " from MemberEnrollment"
                       " where LongestEnrollmentMonths > 48")

cxn.execute(min_enrollment_query)
pd.read_sql('select count(*) from EnrollFourPlusYears', hy_cxn)

Unnamed: 0,Unnamed: 1
0,12922926


In [44]:
def default_enroll_window(select_cols,
                          claims_table, 
                          claim_date_col,
                          enrollment_table=f'{hy_db}.dbo.EnrollFourPlusYears', 
                          window_date_col='enrollDatePlusFourYears',
                          enroll_date_col='enrollDate',
                          join_col='MemberNum'):
    
    select_cols.append(f't1.{join_col}')
    return (f"select {','.join(select_cols)}"
       f" from {claims_table} as t1 inner join {enrollment_table} as t2"
       f" on t1.{join_col}=t2.{join_col}"
       f" where {claim_date_col} between {enroll_date_col} and {window_date_col}")

In [45]:
def custom_enroll_window(select_cols, 
                          claims_table, 
                          claim_date_col,
                          claim_window=48,
                          window_unit='m',
                          enrollment_table=f'{hy_db}.dbo.EnrollFourPlusYears', 
                          enrollment_date_col='enrollDate',
                          join_col='MemberNum'):
    
    select_cols.append(f't1.{join_col}')
    return (f"select {','.join(select_cols)}"
           f" from {claims_table} as t1 inner join {enrollment_table} as t2"
           f" on t1.{join_col}=t2.{join_col}"
           f" where datediff({window_unit}, {claim_date_col}, {enrollment_date_col}) <= {claim_window}")

# Helper tables for T2D 

## Visit count

In [46]:
# Visit count per patient

visit_count_query = (f"drop table if exists {hy_db}.dbo.VisitCount;"
                    " select MemberNum, count(*) as visits"
                    f" into {hy_db}.dbo.VisitCount"
                    f" from ({default_enroll_window([], 'VisitMedicalClaim', 'FirstServiceStartDate')}) as t"
                    " group by MemberNum")
cxn.execute(visit_count_query)
pd.read_sql('select count(*) from VisitCount', hy_cxn)

Unnamed: 0,Unnamed: 1
0,12415038


## Labs

In [47]:
# Patients with glucose/hba1c lab values

glucose_tests_loinc = ('1558-6', '2339-0', '2345-7')
a1c_tests_loinc = ('4548-4', '17856-6', '4549-2', '17855-8')
lab_tests_loinc = glucose_tests_loinc + a1c_tests_loinc

lab_values_query = (f"drop table if exists {hy_db}.dbo.GlucoseLabs;"
                    " select distinct MemberNum, LoincCode, TestResultValue, ServiceStartDate as date"
                   f" into {hy_db}.dbo.GlucoseLabs"
                   f" from ({default_enroll_window(['LoincCode', 'ServiceStartDate', 'TestResultValue'], 'ObservationLab', 'ServiceStartDate')}) as t"
                    " where LoincCode in %s" % str(lab_tests_loinc))
cxn.execute(lab_values_query)

pd.read_sql('select count(*) from GlucoseLabs', hy_cxn)

Unnamed: 0,Unnamed: 1
0,19175213


In [48]:
glucose_tests_cpt = ('82947', )
glucose_panels_cpt = ('80047', '80048', '80053', '80069')
a1c_tests_cpt = ('83036', )
all_tests_cpt = glucose_tests_cpt + glucose_panels_cpt + a1c_tests_cpt

lab_orders_query = (f"drop table if exists {hy_db}.dbo.GlucoseLabsCPT"
                   " select distinct MemberNum, DateServiceStarted as date, LineLevelProcedureCode as cpt"
                  f" into {hy_db}.dbo.GlucoseLabsCPT"
                  f" from ({default_enroll_window(['DateServiceStarted', 'LineLevelProcedureCode'], 'ObservationProcedure', 'DateServiceStarted')}) as t"
                   " where LineLevelProcedureCode in %s"
                   % str(all_tests_cpt)
                   )
cxn.execute(lab_orders_query)

pd.read_sql('select count(*) from GlucoseLabsCPT', hy_cxn)

Unnamed: 0,Unnamed: 1
0,35895150


## Diagnosis

In [49]:
# Patients with diagnosis of diabetes

dm_dx_icd = str(('790.21', '790.22', '790.2', '790.29', '791.5', '277.7', 'V18.0', 'V77.1'))
dm_dx_icd_fuzzy = ' or '.join(["DiagnosisCode like '%s'" % s for s in ('250%', '648.8%', '648.0%')])
dm_dx_icd10 = str(('R81', 'E88.81', 'Z83.3', 'Z13.1'))
dm_dx_icd10_fuzzy = ' or '.join(["DiagnosisCode like '%s'" % s for s in ('E10%', 'E11%', 'R73.%', 'O99.81%', 'O24%')])

# ICD for Family hx included here 
diabetes_dx_query = (f"drop table if exists {hy_db}.dbo.DxDm;"
                    " select distinct MemberNum, DiagnosisCode as diagCode, ICD10 as icd10, StartDate as date"
                   f" into {hy_db}.dbo.DxDm"
                   f" from ({default_enroll_window(['DiagnosisCode', 'ICD10', 'StartDate'], 'ObservationDiagnosis', 'StartDate')}) as t"
                   f" where (ICD10=0 and (DiagnosisCode in {dm_dx_icd} or {dm_dx_icd_fuzzy}))"
                   f"  or (ICD10=1 and (DiagnosisCode in {dm_dx_icd10} or {dm_dx_icd10_fuzzy}))")

cxn.execute(diabetes_dx_query)
pd.read_sql('select count(*) from DxDm', hy_cxn)

Unnamed: 0,Unnamed: 1
0,24980119


In [50]:
# Split into t1d and t2d tables
t1dm_icd_fuzzy = "diagCode like '250._1' or diagCode like '250._3'"
t1dm_icd10_fuzzy = "diagCode like 'E10.%'"
t2dm_icd_fuzzy = "diagCode like '250.[0,2-9]0' or diagCode like '250.[0,2-9]2'"
t2dm_icd10_fuzzy = "diagCode like 'E11.%'"

t1dm_query = (f"drop table if exists {hy_db}.dbo.DxT1dm;"
             " select MemberNum, diagCode, icd10, date"
             f" into {hy_db}.dbo.DxT1dm"
             f" from {hy_db}.dbo.DxDm with (nolock)"
             " where (icd10=0 and %s) or (icd10=1 and %s)" % (t1dm_icd_fuzzy, t1dm_icd10_fuzzy))
t2dm_query = (f"drop table if exists {hy_db}.dbo.DxT2dm;"
             " select MemberNum, diagCode, icd10, date"
             f" into {hy_db}.dbo.DxT2dm"
             f" from {hy_db}.dbo.DxDm with (nolock)"
             " where (icd10=0 and %s) or (icd10=1 and %s)" % (t2dm_icd_fuzzy, t2dm_icd10_fuzzy))

cxn.execute(';'.join([t1dm_query, t2dm_query]))
print(pd.read_sql('select count(*) from DxT1dm', hy_cxn))
print(pd.read_sql('select count(*) from DxT2dm', hy_cxn))

          
0  2040786
           
0  18777230


## Meds

In [52]:
t1dm_rxnorm = (139825, 274783, 314684, 352385, 400008, 51428, 5856, 86009, 139953)
t2dm_rxnorm = (173, 10633, 2404, 4821, 217360, 4815, 25789, 73044, 274332, 6809, 84108, 33738, 72610, 16681, 30009, 593411, 60548)
dm_rxnorm = (126958, 412956, 412959, 637321, 668291, 668370, 686655, 692383, 748611, 880998, 881056, 751128, 847187, 847191, 847197, 847203, 847207, 847211, 847230, 847239, 847252, 847256, 847259, 847263, 847278, 847416, 847417, 806905, 806903, 408119)

### Ingredient labeling

In [53]:
all_drugs = pd.read_sql('select NationalDrugCode,NdcDescription,NumberOfClaims from ConceptMedication', cxn, index_col='NationalDrugCode')
all_drugs.to_csv('all_drugs.csv')

In [55]:
drug_ingredients_create_table = ("drop table if exists RxIngredients;"
                                " create table RxIngredients ("
                                "  ndc bigint not null,"
                                "  rxcui bigint,"
                                "  ingredientRxcui bigint"
                                " );"
                                " create index indexByNdc on RxIngredients (ndc);")
hy_cxn.execute(drug_ingredients_create_table)

<sqlalchemy.engine.result.ResultProxy at 0x7fb70440f310>

In [60]:
# Ingredient fetching happens in an external script
drug_ingredients = pd.read_csv('drug_ingredients.csv', usecols=['ndc', 'rxcui', 'ingredient_rxcui'], dtype=np.float)
drug_ingredients = drug_ingredients.dropna()
drug_ingredients = drug_ingredients.astype(np.int_)

In [62]:
rows = [str(tuple(l)) for l in drug_ingredients.values]
i = 0
for i in tqdm_notebook(range(len(rows)//1000 + 1)):
    insert_sql = ("insert into RxIngredients (ndc, rxcui, ingredientRxcui) values %s;" % ','.join(rows[1000*i:1000*(i+1)]))
    hy_cxn.execute(insert_sql)

HBox(children=(IntProgress(value=0), HTML(value='')))




### Insulin supplies

DM med supplies listed in emerge algorithm are not ingredient-level, but SCDs, so we need explicitly find related NDCs

In [None]:
# Get NDCs relating to DM med supplies list
BASE_URL = "https://rxnav.nlm.nih.gov/REST"

explore_queue = list(dm_rxnorm)
all_rxcuis = set(dm_rxnorm)
insulin_ndcs = []

for rxcui in explore_queue:
    print(rxcui)
    # Get related RxCUIs 
    resp = requests.get(BASE_URL + '/rxcui/%s/allrelated' % rxcui)
    root = ET.fromstring(resp.text)
    related_rxcui = list(chain.from_iterable([[n.text for n in root.findall("./allRelatedGroup/conceptGroup/[tty='%s']/conceptProperties/rxcui" % tty)] for tty in ['SCD', 'GPCK', 'SBD', 'BPCK']]))
    explore_queue.extend(set(related_rxcui).difference(all_rxcuis))
    all_rxcuis = all_rxcuis.union(related_rxcui)
    
    # Get NDCs for RxCUI
    resp = requests.get(BASE_URL + '/rxcui/%s/allhistoricalndcs' % rxcui)
    root = ET.fromstring(resp.text)
    insulin_ndcs += [int(n.text) for n in root.findall("./historicalNdcConcept/historicalNdcTime/ndcTime/ndc")]
    
with open('insulin_ndcs', 'w') as f:
    for ndc in insulin_ndcs:
        f.write('%d\n' % ndc)

In [29]:
with open('insulin_ndcs') as f:
    insulin_ndcs = [int(l) for l in f.readlines()]

### Validation of ingredient labeling

In [63]:
# Claims coverage 
all_drugs.loc[drug_ingredients.ndc.unique()].NumberOfClaims.sum() / all_drugs.NumberOfClaims.sum()

0.9830098750410705

In [64]:
# Count DM-related drugs discovered
drug_ingredients[drug_ingredients.ingredient_rxcui.isin(t1dm_rxnorm) | drug_ingredients.ingredient_rxcui.isin(t2dm_rxnorm) | drug_ingredients.ingredient_rxcui.isin(dm_rxnorm)].shape[0]

2621

## Patients on DM meds

In [65]:
insulin_med_temp_query = ("drop table if exists #InsulinMeds;"
                          " select ingredientRxcui, ndc"
                          " into #InsulinMeds"
                          " from RxIngredients"
                          " where ndc in %s" % str(tuple(insulin_ndcs)))
insulin_rx_query = ("drop table if exists RxInsulin"
                    " select distinct t1.MemberNum, t2.ndc, t2.ingredientRxcui, t1.DispenseDate"
                    " into RxInsulin"
                   f" from ({default_enroll_window(['NationalDrugCode', 'DispenseDate'], 'gmw3.dbo.ObservationMedication', 'DispenseDate')}) as t1"
                    " inner join #InsulinMeds as t2"
                    " on t1.NationalDrugCode=t2.ndc")

hy_cxn.execute(';'.join([insulin_med_temp_query, insulin_rx_query]))
pd.read_sql('select count(*) from RxInsulin', hy_cxn)

Unnamed: 0,Unnamed: 1
0,836421


In [66]:
t1dm_med_temp_query = ("drop table if exists #T1DmMeds;"
                    " select ingredientRxcui, ndc"
                    " into #T1DmMeds"
                    " from RxIngredients"
                    " where ingredientRxcui in %s" % str(t1dm_rxnorm))
t1dm_rx_query = ("drop table if exists RxT1Dm"
              " select distinct t1.MemberNum, t2.ndc, t2.ingredientRxcui, t1.DispenseDate"
              " into RxT1Dm"
             f" from ({default_enroll_window(['NationalDrugCode', 'DispenseDate'], 'gmw3.dbo.ObservationMedication', 'DispenseDate')}) as t1"
              " inner join #T1DmMeds as t2"
              " on t1.NationalDrugCode=t2.ndc")

hy_cxn.execute(';'.join([t1dm_med_temp_query, t1dm_rx_query]))
pd.read_sql('select count(*) from RxT1Dm', hy_cxn)

Unnamed: 0,Unnamed: 1
0,2011636


In [67]:
t2dm_med_temp_query = ("drop table if exists #T2DmMeds;"
                    " select ingredientRxcui, ndc"
                    " into #T2DmMeds"
                    " from RxIngredients"
                    " where ingredientRxcui in %s" % str(t2dm_rxnorm))
t2dm_rx_query = ("drop table if exists RxT2dm;"
              " select distinct t1.MemberNum, t2.ndc, t2.ingredientRxcui, t1.DispenseDate"
              " into RxT2dm"
             f" from ({default_enroll_window(['NationalDrugCode', 'DispenseDate'], 'gmw3.dbo.ObservationMedication', 'DispenseDate')}) as t1"
              " inner join #T2DmMeds as t2"
              " on t1.NationalDrugCode=t2.ndc")

hy_cxn.execute(';'.join([t2dm_med_temp_query, t2dm_rx_query]))
pd.read_sql('select count(*) from RxT2dm', hy_cxn)

Unnamed: 0,Unnamed: 1
0,9073871


In [68]:
dm_rx_query = ("drop table if exists RxDm;"
              " select *"
              " into RxDm"
              " from (select * from RxT2dm union select * from RxInsulin union select * from RxT1dm) as t1")
hy_cxn.execute(dm_rx_query)
pd.read_sql('select count(*) from RxDm', hy_cxn)

Unnamed: 0,Unnamed: 1
0,11101913


# Helper tables for depression

In [51]:
depression_icds = ('293.83', '296.20', '296.21', '296.22', '296.23', '296.24', '296.25', '296.26', '296.30', '296.31', '296.32', '296.33', '296.34', '296.35', '296.36', '300.4', '311')

# Get gaps between depression claims 
depression_dx_query = (f"drop table if exists {hy_db}.dbo.dxDepression;"
                       " select distinct MemberNum, DiagnosisCode as diagCode, StartDate as date"
                      f" into {hy_db}.dbo.dxDepression"
                      f" from ({default_enroll_window(['DiagnosisCode', 'StartDate'], 'ObservationDiagnosis', 'StartDate')}) as t"
                      f" where DiagnosisCode in {str(depression_icds)}")

cxn.execute(depression_dx_query)
pd.read_sql('select count(*) from dxDepression', hy_cxn)

Unnamed: 0,Unnamed: 1
0,8353409
