# About 

Until the wide-spread sharing and standardization of EHR, administrative claims data are, were used for predicting costly events across populations of beneficiaries. Among many issues with using such data, a major issue is defining an "event" for example in time-to-event analyses. Typically, analysts use algorithms called <b>episode groupers</b>. In the task I was assigned, we wanted to predict readmission probabilities. To do this, we needed to infer "locations" of a beneficiary over time using medical billing claims. A complication is that one claim is often a fragment of a medical visit (for example, inpatient hospital visit). In addition, medical claims (or LDS data we had) do not use a common ID to bind all claims for such a visit. Thus we wrote an algorithm that took the start and stop dates of a claim, the claim-type (inpatient, outpatient, snf, hospice, etc.), and beneficiary ID to infer the actual start and stop dates of the medical visit. Mainly this involved absorbing claims that occurred within a larger window and extending dates (not shown here) using SQL window functions. Here is a rough example of the code we started with. The actual code was more complex and accounted for more complications with billing and what counted as medical encounters, transfers, readmissions, etc. 

This is a self-contained package for defining episode time windows and adverse events. The code is a mix of SQL and Python. 
First, we generate some preliminary fake data and reference tables. These are described below. The main table is an example of raw CMS LDS data. The field names and their contents are the same as in LDS 2008-2010. Once the tables are generated, then you can run section 2 or 3 independently. Section 2 illustrates the grouper algorithm, and Section 3 the adverse events tagging. 

 

## Preliminaries (Setup Example Database and Tables)

We first initialize a toy database for the examples below. It is populated with several tables:

*   **M**  (master raw data table) - prior to defining episode intervals and tagging adverse events we compiled the LDS data into a single all claims table such as M, this is either for single years or combined across all years (for the episode interval estimation)  
*   **UTMO** (our reference list of adverse event ICD 9 codes) - see details in *Section 2: Tag Adverse Events*
*   **clm_priority** - table used to determine the primary claim type for the episodes derived from our grouper algorithm. This is used to break the tie if multiple claim types have the same start date. For now, we use a simple hiearchical scheme with **hosp>inpt>snf>hha>out>car>dme** 

More about **M** - we constructed M to illustrate some challenges of the data and how we deal with these using the scripts. There are several issues we observed with the data (and some other potential issues): 1) multiple claims with shared admission, discharge, and/or through dates, 2) multiple claim types on these dates, 3) claims occurring within inpatient stays from other types of claims, and 4) these internal events may start later and end later than a previous inpatient event in which they are embedded. The M table contains all of these issues. For adverse events, there are some codes that are specific for medical care associated events but others are conditional on acquiring them in the hospital, for example. We currently rely on the POA field to determine adverse events. The example M table gives an example of UTI where POA is positive and where it is negative. Finally we also tag the admitting diagnosis. 


In [13]:
import sqlite3
assert sqlite3.sqlite_version >= '3.30.1','Update sqlite3 to 3.30.1 or higher'


from sqlalchemy import create_engine, MetaData, Column, String, Integer,Table
import sqlalchemy as db

from tabulate import tabulate
import pandas as pd

dbname = "toy.db"
engine = create_engine("sqlite:///"+dbname)
conn = engine.connect()
metadata = MetaData(bind=engine)

Columns = ([
    Column('master_key',Integer,primary_key=True,autoincrement=True),
    Column('DSYSRTKY',String(),nullable=False),
    Column('CLM_TYPE',String(),nullable=True),
    Column('ADMSN_DT',String(),nullable=True),
    Column('DSCHRGDT',String(),nullable=True),
    Column('THRU_DT',String(),nullable=True),
    Column('AD_DGNS',String(),nullable=True),
    Column('DGNSCD1',String(),nullable=True),
    Column('CLMPOA1',String(),nullable=True),
    Column('DGNSCD2',String(),nullable=True),
    Column('CLMPOA2',String(),nullable=True)
    ])

if engine.has_table('M'):
  conn.execute('drop table M;')

M = Table("M",metadata,*Columns)
M.create()
    

data = [{'DSYSRTKY':10007,'CLM_TYPE':71,'ADMSN_DT':None,'DSCHRGDT':None,'THRU_DT':20090218,'AD_DGNS':None,'DGNSCD1':51881,'CLMPOA1':None,'DGNSCD2':None,'CLMPOA2':None},
        {'DSYSRTKY':10007,'CLM_TYPE':60,'ADMSN_DT':20090218,'DSCHRGDT':20090310,'THRU_DT':20090310,'AD_DGNS':'0389','DGNSCD1':'0389','CLMPOA1':'Y','DGNSCD2':'486','CLMPOA2':'Y'},
{'DSYSRTKY':10007,'CLM_TYPE':60,'ADMSN_DT':20090326,'DSCHRGDT':20090402,'THRU_DT':20090402,'AD_DGNS':'0389','DGNSCD1':'0389','CLMPOA1':'Y','DGNSCD2':'51884','CLMPOA2':'Y'},
{'DSYSRTKY':10007,'CLM_TYPE':70,'ADMSN_DT':None,'DSCHRGDT':None,'THRU_DT':20090328,'AD_DGNS':None,'DGNSCD1':'7802','CLMPOA1':None,'DGNSCD2':None,'CLMPOA2':None},
{'DSYSRTKY':10007,'CLM_TYPE':60,'ADMSN_DT':20090329,'DSCHRGDT':20090603,'THRU_DT':20090603,'AD_DGNS':'4589','DGNSCD1':'4589','CLMPOA1':'Y','DGNSCD2':'51883','CLMPOA2':'Y'},
{'DSYSRTKY':10008,'CLM_TYPE':20,'ADMSN_DT':20090801,'DSCHRGDT':20090813,'THRU_DT':20090813,'AD_DGNS':'4589','DGNSCD1':'4589','CLMPOA1':'Y','DGNSCD2':'51883','CLMPOA2':'Y'},
{'DSYSRTKY':10008,'CLM_TYPE':60,'ADMSN_DT':20090813,'DSCHRGDT':20091020,'THRU_DT':20091020,'AD_DGNS':'51881','DGNSCD1':'51883','CLMPOA1':'Y','DGNSCD2':'5990','CLMPOA2':'N'},
{'DSYSRTKY':10008,'CLM_TYPE':60,'ADMSN_DT':20090813,'DSCHRGDT':None,'THRU_DT':20091020,'AD_DGNS':'51881','DGNSCD1':'71536','CLMPOA1':'Y','DGNSCD2':None,'CLMPOA2':None}
]


q = db.insert(M).values(data) 

# print(q)
conn.execute(q)

r = conn.execute("select * from M;")
print(tabulate(pd.DataFrame(r), headers=r.keys(), tablefmt='psql'))



+----+--------------+------------+------------+------------+------------+-----------+-----------+-----------+-----------+-----------+-----------+
|    |   master_key |   DSYSRTKY |   CLM_TYPE |   ADMSN_DT |   DSCHRGDT |   THRU_DT |   AD_DGNS |   DGNSCD1 | CLMPOA1   |   DGNSCD2 | CLMPOA2   |
|----+--------------+------------+------------+------------+------------+-----------+-----------+-----------+-----------+-----------+-----------|
|  0 |            1 |      10007 |         71 |            |            |  20090218 |           |     51881 |           |           |           |
|  1 |            2 |      10007 |         60 |   20090218 |   20090310 |  20090310 |      0389 |      0389 | Y         |       486 | Y         |
|  2 |            3 |      10007 |         60 |   20090326 |   20090402 |  20090402 |      0389 |      0389 | Y         |     51884 | Y         |
|  3 |            4 |      10007 |         70 |            |            |  20090328 |           |      7802 |           |   

  if engine.has_table('M'):


In the table **M** above, note that for master_key = 7 the Present On Admission variable CLMPOA2 is set to 'N', indicating that the corresponding diagnosis DGNSCD2 = '5990' ("Urinary tract infection, site not specified") was *not* present on admission.

Below is code that makes a mock-up of the [Utah-Missouri list of adverse event codes](http://health.utah.gov/psi/icd9.htm), in a new table named **UTMO**. We will insert only a few codes from the Utah-Missouri list into this mock-up table. We will use this table to detect adverse events below.

In [14]:
Columns = ([Column('dxICD',String(),nullable=True)])

if engine.has_table('UTMO'):
  UTMO.drop(engine)


UTMO = Table("UTMO",metadata,*Columns)
UTMO.create()
    

data = [{'dxICD':'0389'},
        {'dxICD':'486'},
        {'dxICD':'7802'},
        {'dxICD':'51881'},
        {'dxICD':'5990'}
]


q = db.insert(UTMO).values(data) 

# print(q)
conn.execute(q)

r = conn.execute("select * from UTMO;")
print(tabulate(pd.DataFrame(r), headers=r.keys(), tablefmt='psql'))

+----+---------+
|    |   dxICD |
|----+---------|
|  0 |    0389 |
|  1 |     486 |
|  2 |    7802 |
|  3 |   51881 |
|  4 |    5990 |
+----+---------+


  if engine.has_table('UTMO'):


Below we generate tables to help determine the primary type of claim for an episode.

In [15]:
Columns = ([Column('clm_type',String(),nullable=False),
            Column('priority',String(),nullable=False),
            Column('name',String(),nullable=False)])

if engine.has_table("clm_priority"):
  conn.execute("drop table clm_priority;")

clm_priority = Table("clm_priority",metadata,*Columns)
clm_priority.create()
    
#hosp>inpt>snf>hha>out>car>dme

data = [{'clm_type':'50','priority':1,'name':'hospice'},
        {'clm_type':'60','priority':2,'name':'inpatient'},
        {'clm_type':'61','priority':2,'name':'inpatient'},
        {'clm_type':'20','priority':3,'name':'snf'},
        {'clm_type':'30','priority':3,'name':'snf'},
        {'clm_type':'10','priority':4,'name':'hha'},
        {'clm_type':'40','priority':5,'name':'outpatient'},
        {'clm_type':'70','priority':6,'name':'carrier'},
        {'clm_type':'71','priority':6,'name':'carrier'},
        {'clm_type':'81','priority':7,'name':'dme'},
        {'clm_type':'82','priority':7,'name':'dme'}]

q = db.insert(clm_priority).values(data) 

# print(q)
conn.execute(q)

r = conn.execute("select * from clm_priority;")
print(tabulate(pd.DataFrame(r), headers=r.keys(), tablefmt='psql'))


#----
Columns = ([Column('priority',String(),nullable=False),
            Column('name',String(),nullable=False)])

if engine.has_table("priority_name"):
  conn.execute("drop table priority_name;")

priority_name = Table("priority_name",metadata,*Columns)
priority_name.create()

data = [{'priority':1,'name':'hospice'},
        {'priority':2,'name':'inpatient'},
        {'priority':3,'name':'snf'},
        {'priority':4,'name':'hha'},
        {'priority':5,'name':'outpatient'},
        {'priority':6,'name':'carrier'},
        {'priority':7,'name':'dme'}]

q = db.insert(priority_name).values(data) 

# print(q)
conn.execute(q)


+----+------------+------------+------------+
|    |   clm_type |   priority | name       |
|----+------------+------------+------------|
|  0 |         50 |          1 | hospice    |
|  1 |         60 |          2 | inpatient  |
|  2 |         61 |          2 | inpatient  |
|  3 |         20 |          3 | snf        |
|  4 |         30 |          3 | snf        |
|  5 |         10 |          4 | hha        |
|  6 |         40 |          5 | outpatient |
|  7 |         70 |          6 | carrier    |
|  8 |         71 |          6 | carrier    |
|  9 |         81 |          7 | dme        |
| 10 |         82 |          7 | dme        |
+----+------------+------------+------------+


  if engine.has_table("clm_priority"):
  if engine.has_table("priority_name"):


<sqlalchemy.engine.cursor.LegacyCursorResult at 0x1202a9040>



---


## Section 1: Get Episode Start and Stop Dates

### Step 1: Define start and stop dates for each row (also filter out irrelevant data)

We define start and stop dates for all claim types. Admission date trumps other date fields. If present then admission and discharge dates are used for start and stop, otherwise through date is used for start and stop. Note if admission date is present but discharge is null, then we keep the null discharge as an unobserved event. 

In [16]:
q = "create table A as select "
q += "m.master_key, m.dsysrtky, m.clm_type, b.priority as clm_type_priority, "
q += "case when m.admsn_dt is null then m.thru_dt else m.admsn_dt end start, "
q += "case when m.admsn_dt is null then m.thru_dt when m.dschrgdt = 0 then null else m.dschrgdt end stop "
q += "from M m left join clm_priority b on m.clm_type=b.clm_type;"

if engine.has_table('A'):
  conn.execute('drop table A;')

conn.execute(q)

q = "create table M_startstop as select *, "
q += "case when admsn_dt is null then thru_dt else admsn_dt end start, "
q += "case when admsn_dt is null then thru_dt when dschrgdt = 0 then null else dschrgdt end stop "
q += "from M;"

if engine.has_table('M_startstop'):
  conn.execute('drop table M_startstop;')

conn.execute(q)

r = conn.execute("select * from A;")
print(tabulate(pd.DataFrame(r), headers=r.keys(), tablefmt='psql'))



+----+--------------+------------+------------+---------------------+----------+----------+
|    |   master_key |   DSYSRTKY |   CLM_TYPE |   clm_type_priority |    start |     stop |
|----+--------------+------------+------------+---------------------+----------+----------|
|  0 |            1 |      10007 |         71 |                   6 | 20090218 | 20090218 |
|  1 |            2 |      10007 |         60 |                   2 | 20090218 | 20090310 |
|  2 |            3 |      10007 |         60 |                   2 | 20090326 | 20090402 |
|  3 |            4 |      10007 |         70 |                   6 | 20090328 | 20090328 |
|  4 |            5 |      10007 |         60 |                   2 | 20090329 | 20090603 |
|  5 |            6 |      10008 |         20 |                   3 | 20090801 | 20090813 |
|  6 |            7 |      10008 |         60 |                   2 | 20090813 | 20091020 |
|  7 |            8 |      10008 |         60 |                   2 | 20090813 |

  if engine.has_table('A'):
  if engine.has_table('M_startstop'):


### Step 2. Collapse common start dates and select latest stop date

Next we collapse records with the same start date.

In [17]:
q = "create table B as "
q += "with INITIAL_GROUP_CONCAT as ( "
q += "    select group_concat(master_key) as master_key_groups, "
q += "           dsysrtky, "
q += "           start, "
q += "           case when min(stop) is null then null else max(stop) end as stop, "
q += "           group_concat(distinct clm_type) as clm_type_samestart, "
q += "           min(clm_type_priority) as clm_type_priority "
q += "    from A group by dsysrtky,start "
q += "), NULL_INDICATOR as ( "
q += "    select DSYSRTKY, "
q += "           start, "
q += "           case when stop is null then 1 else 0 end as STOP_IS_NULL "
q += "    from A "
q += "), MAX_NULL_INDICATOR as ( "
q += "    select DSYSRTKY, "
q += "           start, "
q += "           max(STOP_IS_NULL) as ANY_STOP_IS_NULL "
q += "    from NULL_INDICATOR "
q += "    group by DSYSRTKY, start "
q += ") "
q += "select a.master_key_groups, "
q += "    a.DSYSRTKY, "
q += "    a.start, "
q += "    case when b.ANY_STOP_IS_NULL = 0 then a.stop else null end as stop, "
q += "    a.clm_type_samestart,"
q += "    a.clm_type_priority "
q += "from INITIAL_GROUP_CONCAT a "
q += "left join MAX_NULL_INDICATOR b on a.DSYSRTKY=b.DSYSRTKY and a.start=b.start;"
if engine.has_table('B'):
  conn.execute('drop table B;')
conn.execute(q)
r = conn.execute("select * from B order by DSYSRTKY, start;")
print(tabulate(pd.DataFrame(r), headers=r.keys(), tablefmt='psql'))


+----+---------------------+------------+----------+----------+----------------------+---------------------+
|    | master_key_groups   |   dsysrtky |    start |     stop | clm_type_samestart   |   clm_type_priority |
|----+---------------------+------------+----------+----------+----------------------+---------------------|
|  0 | 1,2                 |      10007 | 20090218 | 20090310 | 71,60                |                   2 |
|  1 | 3                   |      10007 | 20090326 | 20090402 | 60                   |                   2 |
|  2 | 4                   |      10007 | 20090328 | 20090328 | 70                   |                   6 |
|  3 | 5                   |      10007 | 20090329 | 20090603 | 60                   |                   2 |
|  4 | 6                   |      10008 | 20090801 | 20090813 | 20                   |                   3 |
|  5 | 7,8                 |      10008 | 20090813 |          | 60                   |                   2 |
+----+-------------

  if engine.has_table('B'):


### Step 3. Run interloper pruning 
This is an iterative scheme. Given the putative episodes above and for a single episode *i*, the code checks if the prior start date is less than and the prior stop data is greater than the current *i* episode. Discharge date of null is treated as infinite time. If the putative episode is embedded (is an interloper) then it is removed from the table. This is repeated until no conflicts exist. (Note that master_key 5 is an interloper but has a date longer than what is embedded in. In this case it gets absorbed into 3,4,5. (Not shown in the code, we usually extened the stop date to the longest one of the group and run the interloper algorithm again with the new interval.) Finally, we label the primary claim type using a hierarchical rule for what takes precedence when multiple claim types have the same start date. 



In [18]:
if engine.has_table('C'):
  conn.execute('drop table C;')

check = 1
k = 0
while check != 0:
  k +=1
  print(k)
  q = "create table C as select *, "
  q += "case "
  q += "when (lag(start,1) over (partition by dsysrtky order by start) < start) and "
  q += "(lag(stop,1) over (partition by dsysrtky order by start) > start) then 1 "
  q += "when (lag(start,1) over (partition by dsysrtky order by start) < start) and "
  q += "(lag(stop,1) over (partition by dsysrtky order by start) is null) then 1 else 0 end interloper "
  q += "from B;"

  r = conn.execute(q)
  

  q = "select interloper from C;"
  r = conn.execute(q)

  check = sum([i['interloper'] for i in r])
  q='drop table B;'
  conn.execute(q)
  q='create table B as select master_key_groups, dsysrtky, start, stop, clm_type_samestart, clm_type_priority from C where interloper=0;'
  conn.execute(q)
  q='drop table C;'
  conn.execute(q)

if engine.has_table('episode_intervals'):
  conn.execute('drop table episode_intervals;')

q = "create table episode_intervals as select b.*,"
q += "c.name as episode_type "
q += "from B b join priority_name c on b.clm_type_priority=c.priority;"

conn.execute(q)

# r = conn.execute("select * from M;")
# print(tabulate(pd.DataFrame(r), headers=r.keys(), tablefmt='psql'))


r = conn.execute("select * from episode_intervals;")
print(tabulate(pd.DataFrame(r), headers=r.keys(), tablefmt='psql'))


1
2
3
+----+---------------------+------------+----------+----------+----------------------+---------------------+----------------+
|    | master_key_groups   |   dsysrtky |    start |     stop | clm_type_samestart   |   clm_type_priority | episode_type   |
|----+---------------------+------------+----------+----------+----------------------+---------------------+----------------|
|  0 | 1,2                 |      10007 | 20090218 | 20090310 | 71,60                |                   2 | inpatient      |
|  1 | 3                   |      10007 | 20090326 | 20090402 | 60                   |                   2 | inpatient      |
|  2 | 6                   |      10008 | 20090801 | 20090813 | 20                   |                   3 | snf            |
|  3 | 7,8                 |      10008 | 20090813 |          | 60                   |                   2 | inpatient      |
+----+---------------------+------------+----------+----------+----------------------+---------------------+----

  if engine.has_table('C'):
  if engine.has_table('episode_intervals'):


### Step 4. Grab data for episode period  

Once the episode intervals are defined, we then join back all claims within these periods. For example, claims 4 and 5 were absorbed into 3 but we bring back all the data for these claims.

In [20]:
if engine.has_table('episodes'):
  conn.execute('drop table episodes;')
fieldnames = metadata.tables['M'].c.keys()
fieldnames.remove('DSYSRTKY')
q = "create table episodes as "
q += "with REPLACE_STOP_NULLS_WITH_INFINITY as ( "
q += "    select master_key_groups, "
q += "           DSYSRTKY, "
q += "            episode_type,"
q += "           start, "
q += "           case when stop is null then 9e999 else stop end as stop, "
q += "           clm_type_samestart "
q += "    from episode_intervals "
q += ") "
q += "select b.*, "
for i in fieldnames:
  q += "group_concat(m."+i+"), "
q += "group_concat(distinct m.start) as group_starts, "
q += "group_concat(distinct m.stop) as group_stops "
q += "from REPLACE_STOP_NULLS_WITH_INFINITY b left join M_startstop m on b.start<=m.start and cast(m.start as Integer)<b.stop group by b.dsysrtky, b.start;"
conn.execute(q)
r = conn.execute("select * from M_startstop;")
# print('M')
print(tabulate(pd.DataFrame(r), headers=r.keys(), tablefmt='psql'))
print('episodes')
r = conn.execute("select * from episodes;")
print(tabulate(pd.DataFrame(r), headers=r.keys(), tablefmt='psql'))

+----+--------------+------------+------------+------------+------------+-----------+-----------+-----------+-----------+-----------+-----------+----------+----------+
|    |   master_key |   DSYSRTKY |   CLM_TYPE |   ADMSN_DT |   DSCHRGDT |   THRU_DT |   AD_DGNS |   DGNSCD1 | CLMPOA1   |   DGNSCD2 | CLMPOA2   |    start |     stop |
|----+--------------+------------+------------+------------+------------+-----------+-----------+-----------+-----------+-----------+-----------+----------+----------|
|  0 |            1 |      10007 |         71 |            |            |  20090218 |           |     51881 |           |           |           | 20090218 | 20090218 |
|  1 |            2 |      10007 |         60 |   20090218 |   20090310 |  20090310 |      0389 |      0389 | Y         |       486 | Y         | 20090218 | 20090310 |
|  2 |            3 |      10007 |         60 |   20090326 |   20090402 |  20090402 |      0389 |      0389 | Y         |     51884 | Y         | 20090326 | 200

  if engine.has_table('episodes'):




---


## Section 2: Tag Adverse Events<br>
<br>
Adverse events were detected by comparing observed ICD-9 diagnostic codes (e.g., DGNSCD1, DGNSCD2, ...) to the list compiled by the [Utah/Missouri Patient Safety Consortium](http://health.utah.gov/psi/icd9.htm). At this point in this notebook, the Utah-Missouri code list is stored in a table named **UTMO**, while the claims data is stored in a table named **M**.<br>
<br>
If an ICD-9 code from the Utah-Missouri list is observed in a claim *and* the corresponding Present On Admission code indicates that the diagnosis was *not* present on admission, it is tagged as an adverse event. For each ICD-9 column (e.g., DGNSCD1), a new column (e.g., AE_UTMO_1) is generated indicating whether or not the diagnosis was an adverse event. A summary variable (AE_UTMO) is also generated, indicating whether any of the diagnoses (e.g., DGNSCD1, DGNSCD2, ...) was an adverse event. Although the Admitting Diagnosis (AD_DGNS) does not have a Present On Admission variable, an output column named AE_UTMO_AD is also provided, indicating whether or not the Admitting Diagnosis was in the Utah-Missouri list.<br>
<br>
In the mock-up table **M**, note that the only time an adverse event occurs is for master_key = 7, where DGNSCD2 = '5990' ("Urinary tract infection, site not specified"). The corresponding Present On Admission variable CLMPOA2 is set to 'N', indicating that this diagnosis was *not* present on admission. So the variable AE_UTMO_2 is set to 1 (True), indicating that this is an adverse event. The summary adverse event variable AE_UTMO is set to 1 if any adverse events were detected within a particular row/record in table **M**, i.e., if either AE_UTMO_1 or AE_UTMO_2 is set to 1. For master_key = 7, AE_UTMO is set to 1 since AE_UTMO_2 was set to 1.


In [21]:
q = "create table AE as \
with BACKBONE as ( \
    select distinct master_key, DSYSRTKY \
    from M \
), REPLACE_NULLS as ( \
    select master_key, \
           DSYSRTKY, \
           coalesce(AD_DGNS,'') as AD_DGNS, \
           coalesce(DGNSCD1,'') as DGNSCD1, \
           coalesce(DGNSCD2,'') as DGNSCD2, \
           coalesce(CLMPOA1,'') as CLMPOA1, \
           coalesce(CLMPOA2,'') as CLMPOA2 \
    from M \
), COMPUTE_PAE_1 as ( \
    select master_key, \
           DSYSRTKY, \
           CLMPOA1, \
           case when ae.dxICD is null then 0 else 1 end as PAE_UTMO_1 \
    from REPLACE_NULLS rn \
    left join UTMO ae on rn.DGNSCD1 = ae.dxICD \
), COMPUTE_PAE_2 as ( \
    select master_key, \
           DSYSRTKY, \
           CLMPOA2, \
           case when ae.dxICD is null then 0 else 1 end as PAE_UTMO_2 \
    from REPLACE_NULLS rn \
    left join UTMO ae on rn.DGNSCD2 = ae.dxICD \
), COMPUTE_AE_AD as ( \
    select master_key, \
           DSYSRTKY, \
           case when ae.dxICD is null then 0 else 1 end as AE_UTMO_AD \
    from REPLACE_NULLS rn \
    left join UTMO ae on rn.AD_DGNS = ae.dxICD \
), COMPUTE_AE_1 as ( \
    select master_key, \
           DSYSRTKY, \
           case when PAE_UTMO_1 = 1 and CLMPOA1 = 'N' then 1 else 0 end as AE_UTMO_1 \
    from COMPUTE_PAE_1 \
), COMPUTE_AE_2 as ( \
    select master_key, \
           DSYSRTKY, \
           case when PAE_UTMO_2 = 1 and CLMPOA2 = 'N' then 1 else 0 end as AE_UTMO_2 \
    from COMPUTE_PAE_2 \
) \
select bb.master_key, \
       bb.DSYSRTKY, \
       aead.AE_UTMO_AD, \
       case when ae1.AE_UTMO_1 = 1 or ae2.AE_UTMO_2 = 1 then 1 else 0 end as AE_UTMO, \
       ae1.AE_UTMO_1, \
       ae2.AE_UTMO_2 \
from BACKBONE bb \
left join COMPUTE_AE_AD aead on bb.master_key = aead.master_key \
left join COMPUTE_AE_1 ae1 on bb.master_key = ae1.master_key \
left join COMPUTE_AE_2 ae2 on bb.master_key = ae2.master_key"


if engine.has_table('AE'):
  conn.execute('drop table AE;')

conn.execute(q)
print('M')
r = conn.execute("select * from M;")
print(tabulate(pd.DataFrame(r), headers=r.keys(), tablefmt='psql'))
print('UTMO')
r = conn.execute("select * from UTMO;")
print(tabulate(pd.DataFrame(r), headers=r.keys(), tablefmt='psql'))
print('AE tagged')
r = conn.execute("select * from AE;")
print(tabulate(pd.DataFrame(r), headers=r.keys(), tablefmt='psql'))

M
+----+--------------+------------+------------+------------+------------+-----------+-----------+-----------+-----------+-----------+-----------+
|    |   master_key |   DSYSRTKY |   CLM_TYPE |   ADMSN_DT |   DSCHRGDT |   THRU_DT |   AD_DGNS |   DGNSCD1 | CLMPOA1   |   DGNSCD2 | CLMPOA2   |
|----+--------------+------------+------------+------------+------------+-----------+-----------+-----------+-----------+-----------+-----------|
|  0 |            1 |      10007 |         71 |            |            |  20090218 |           |     51881 |           |           |           |
|  1 |            2 |      10007 |         60 |   20090218 |   20090310 |  20090310 |      0389 |      0389 | Y         |       486 | Y         |
|  2 |            3 |      10007 |         60 |   20090326 |   20090402 |  20090402 |      0389 |      0389 | Y         |     51884 | Y         |
|  3 |            4 |      10007 |         70 |            |            |  20090328 |           |      7802 |           | 

  if engine.has_table('AE'):
