# Prepare CCW proxy

* chonic condition algorithm from https://www2.ccwdata.org/documents/10280/19139608/ccw-cond-algo-alzdisease.pdf

In [None]:
## Load packages ----
import numpy as np
import pandas as pd
import pyarrow.parquet as pq

import sshtunnel
import psycopg2 as pg
import json
import os

import duckdb

In [None]:
## read icd_json ----
with open('../data/input/remote_data/icd_codes.json', 'r') as json_file:
  icd_dict = json.load(json_file)
diag_string = ",".join([f"'{x}'" for x in icd_dict['alzheimer']['icd9']]) + "," + ",".join([f"'{x}'" for x in icd_dict['alzheimer']['icd10']])
diag_string

## Fetch admissions from Dorieh

In [None]:
## Open ssh tunnel to DB host ----
tunnel = sshtunnel.SSHTunnelForwarder(
    ('nsaph.rc.fas.harvard.edu', 22),
    ssh_username=f'{os.environ["MY_NSAPH_SSH_USERNAME"]}',
    ssh_private_key=f'{os.environ["HOME"]}/.ssh/id_rsa', 
    ssh_password=f'{os.environ["MY_NSAPH_SSH_PASSWORD"]}', 
    remote_bind_address=("localhost", 5432)
)

tunnel.start()

In [None]:
## Open connection to DB ----
connection = pg.connect(
    host='localhost',
    database='nsaph2',
    user=f'{os.environ["MY_NSAPH_DB_USERNAME"]}',
    password=f'{os.environ["MY_NSAPH_DB_PASSWORD"]}', 
    port=tunnel.local_bind_port
)

In [None]:
## read icd_json ----
with open('../data/input/remote_data/icd_codes.json', 'r') as json_file:
  icd_dict = json.load(json_file)

In [None]:
sql_query = f"""
SELECT
    bene_id, 
    diagnoses
FROM medicare.admissions
WHERE diagnoses && ARRAY[{diag_string}]
LIMIT 10
"""
sql_query

In [None]:
## Request query ----
df = pd.read_sql_query(sql_query, connection, index_col = 'bene_id').reset_index()

## Fetch admissions from parquet datawarehouse

In [None]:
adm_prefix = "../data/input/local_data/data_warehouse/dw_adm_xu_sabath_00_16/adm"

In [None]:
pq.read_table(f"{adm_prefix}_2000.parquet").column_names

In [None]:
## start connection ----
conn = duckdb.connect()

In [None]:
df = conn.execute(f"""
     SELECT 
          DISTINCT bene_id
     FROM '{adm_prefix}_2000.parquet', UNNEST(diagnoses) AS adm(diag)
     WHERE adm.diag IN ({diag_string})
""").fetchdf()

In [None]:
print(df.columns)
print(df.shape)

## Obtain chronic conditions proxy

In [None]:
bene_prefix = "../data/input/local_data/data_warehouse/dw_bene_xu_sabath_00_16/bene"

In [None]:
pq.read_table(f"{bene_prefix}_2000.parquet").column_names

In [None]:
diag_files = [f"{adm_prefix}_{year}.parquet" for year in [2001, 2002, 2003]]
diag_queries = []

for file in diag_files:
    query = f"""
        SELECT DISTINCT
            bene_id,
            admission_date, 
            diag 
        FROM '{file}', UNNEST(diagnoses) AS adm(diag)
        WHERE adm.diag IN ({diag_string})
    """
    diag_queries.append(query)

diag_query = " UNION ALL ".join(diag_queries)

adm_query = f"""
    WITH diag AS ({diag_query}) 
    SELECT 
        bene_id,
        MIN(admission_date) as min_adm_date, 
        MAX(admission_date) as max_adm_date,
        COUNT(diag) as n
    FROM diag
    GROUP BY bene_id
    """

adm = conn.execute(adm_query).fetchdf()

In [None]:
print(adm.columns)
print(adm.shape)

In [None]:
bene_in_claims = adm['bene_id'].unique()
len(bene_in_claims)

In [None]:
hmo_files = [f"{bene_prefix}_{year}.parquet" for year in [2001, 2002, 2003]]
hmo_queries = []

for file in hmo_files:
    query = f"""
        SELECT
            bene_id,
            year, 
            SUM(hmo_mo) as hmo_mo 
        FROM '{file}'
        GROUP BY 
            bene_id, year
    """
    hmo_queries.append(query)

hmo = conn.execute(" UNION ALL ".join(hmo_queries)).fetchdf()

In [None]:
print(hmo.shape)
print(hmo.columns)

In [None]:
hmo_files = [f"{bene_prefix}_{year}.parquet" for year in [2001, 2002, 2003]]
hmo_queries = []
for file in hmo_files:
    q = f"""
        SELECT
            bene_id,
            year, 
            SUM(hmo_mo) as hmo_y 
        FROM '{file}'
        GROUP BY 
            bene_id, year
    """
    hmo_queries.append(q)
hmo_query = " UNION ALL ".join(hmo_queries)

ffs_query = f"""
    WITH hmo AS ({hmo_query}) 
    SELECT 
        bene_id,
        CASE WHEN SUM(hmo_y) = 0 THEN 1 ELSE 0 END AS ffs_coverage
    FROM hmo
    GROUP BY bene_id
    """

ffs = conn.execute(ffs_query).fetchdf()

In [None]:
print(ffs.shape)
print(ffs.columns)

In [None]:
bene_in_ffs = ffs.bene_id[ffs.ffs_coverage == 1].values
len(bene_in_ffs)

In [None]:
bene = conn.execute(f"""
     SELECT 
          bene_id, 
          year as rfrnc_yr
     FROM '{bene_prefix}_2003.parquet'
""").fetchdf()

In [None]:
print(bene.shape)
print(bene.columns)

In [None]:
ffs['bene_id'].isin(bene['bene_id']).sum()

In [None]:
adm['bene_id'].isin(bene['bene_id']).sum()

In [None]:
bene = bene.merge(ffs, on='bene_id', how='left')
bene['diag'] = bene['bene_id'].isin(adm['bene_id']).astype(int)

In [None]:
def compute_ccw_proxy(row):
    if row['diag'] == 0 and row['ffs_coverage'] == 0:
        return 0
    elif row['diag'] == 1 and row['ffs_coverage'] == 0:
        return 1
    elif row['diag'] == 0 and row['ffs_coverage'] == 1:
        return 2
    else:
        return 3

bene['alzh'] = bene.apply(compute_ccw_proxy, axis=1)

In [None]:
bene = bene.merge(adm[['bene_id', 'min_adm_date']], on='bene_id', how='left')

In [None]:
bene = bene.rename(columns={'min_adm_date': 'alzh_ever'})

In [None]:
bene = bene.drop(['ffs_coverage', 'diag'], axis=1)

In [None]:
bene.columns

In [None]:
bene.rfrnc_yr.sample(5)

In [None]:
bene.alzh.sample(5)

In [None]:
bene.alzh_ever[bene.alzh_ever.notna()].sample(5)