In [1]:
import pandas as pd
import numpy as np

import time

import sqlalchemy
import psycopg2

from tqdm import tqdm
from datetime import datetime

pd.set_option('display.max_columns', 500)   # to display 500 columns
pd.set_option('display.max_rows', 500) # to display 500 rows

In [2]:
#![](FileStructure.png)
#from IPython.display import Image
#Image(filename='FileStructure.png')

In [3]:
#import psycopg2

# DSN (data source name) format for database connections:  
# [protocol / database  name]://[username]:[password]@[hostname / ip]:[port]/[database name here]

# on your computer you are the user postgres (full administrative access)
db_user = 'postgres'
# if you need a password to access a database, put it here
db_password = ''
# on your computer, use localhost
db_host = 'localhost'
# the default port for postgres is 5432
db_port = 5432
# we want to connect to the northwind database
database =   'cms_medicare_claims'  # 'cms_claims' #

conn_str = f'postgresql://{db_user}:{db_password}@{db_host}:{db_port}/{database}'
conn = psycopg2.connect(conn_str)

In [4]:
# Check Tables in the Database
query = """
SELECT tablename 
FROM pg_catalog.pg_tables 
WHERE schemaname='public'
"""

pd.read_sql(query, con=conn)

Unnamed: 0,tablename
0,icd9_diagonsis
1,beneficiary2008
2,beneficiary2009
3,beneficiary2010
4,prescription_drug_events
5,carrier_claims
6,icd9_procedures
7,hcpcs
8,inpatient_claims
9,outpatient_claims


In [5]:
# to view All tables and columns
''' query = """
SELECT table_name, column_name, data_type, table_schema
FROM information_schema.columns
WHERE table_schema = 'public'
order by table_name
"""
pd.read_sql(query, con=conn) '''

' query = """\nSELECT table_name, column_name, data_type, table_schema\nFROM information_schema.columns\nWHERE table_schema = \'public\'\norder by table_name\n"""\npd.read_sql(query, con=conn) '

#### Function

In [6]:
def query_func(query, conn):
    df = pd.read_sql(query , con=conn)
    return df

### Data processing & lookups for Carrier Claims table

In [7]:
datetime.now()

datetime.datetime(2020, 5, 2, 20, 51, 51, 700700)

In [8]:
q = '''SELECT * FROM carrier_claims where "DESYNPUF_ID" = '0061141AB18FFA96' '''

carrier_claimsDF = query_func(q, conn)
carrier_claimsDF.head(1) 

Unnamed: 0,DESYNPUF_ID,CLM_ID,CLM_FROM_DT,CLM_THRU_DT,ICD9_DGNS_CD_1,ICD9_DGNS_CD_2,ICD9_DGNS_CD_3,ICD9_DGNS_CD_4,ICD9_DGNS_CD_5,ICD9_DGNS_CD_6,ICD9_DGNS_CD_7,ICD9_DGNS_CD_8,PRF_PHYSN_NPI_1,PRF_PHYSN_NPI_2,PRF_PHYSN_NPI_3,PRF_PHYSN_NPI_4,PRF_PHYSN_NPI_5,PRF_PHYSN_NPI_6,PRF_PHYSN_NPI_7,PRF_PHYSN_NPI_8,PRF_PHYSN_NPI_9,PRF_PHYSN_NPI_10,PRF_PHYSN_NPI_11,PRF_PHYSN_NPI_12,PRF_PHYSN_NPI_13,TAX_NUM_1,TAX_NUM_2,TAX_NUM_3,TAX_NUM_4,TAX_NUM_5,TAX_NUM_6,TAX_NUM_7,TAX_NUM_8,TAX_NUM_9,TAX_NUM_10,TAX_NUM_11,TAX_NUM_12,TAX_NUM_13,HCPCS_CD_1,HCPCS_CD_2,HCPCS_CD_3,HCPCS_CD_4,HCPCS_CD_5,HCPCS_CD_6,HCPCS_CD_7,HCPCS_CD_8,HCPCS_CD_9,HCPCS_CD_10,HCPCS_CD_11,HCPCS_CD_12,HCPCS_CD_13,LINE_NCH_PMT_AMT_1,LINE_NCH_PMT_AMT_2,LINE_NCH_PMT_AMT_3,LINE_NCH_PMT_AMT_4,LINE_NCH_PMT_AMT_5,LINE_NCH_PMT_AMT_6,LINE_NCH_PMT_AMT_7,LINE_NCH_PMT_AMT_8,LINE_NCH_PMT_AMT_9,LINE_NCH_PMT_AMT_10,LINE_NCH_PMT_AMT_11,LINE_NCH_PMT_AMT_12,LINE_NCH_PMT_AMT_13,LINE_BENE_PTB_DDCTBL_AMT_1,LINE_BENE_PTB_DDCTBL_AMT_2,LINE_BENE_PTB_DDCTBL_AMT_3,LINE_BENE_PTB_DDCTBL_AMT_4,LINE_BENE_PTB_DDCTBL_AMT_5,LINE_BENE_PTB_DDCTBL_AMT_6,LINE_BENE_PTB_DDCTBL_AMT_7,LINE_BENE_PTB_DDCTBL_AMT_8,LINE_BENE_PTB_DDCTBL_AMT_9,LINE_BENE_PTB_DDCTBL_AMT_10,LINE_BENE_PTB_DDCTBL_AMT_11,LINE_BENE_PTB_DDCTBL_AMT_12,LINE_BENE_PTB_DDCTBL_AMT_13,LINE_BENE_PRMRY_PYR_PD_AMT_1,LINE_BENE_PRMRY_PYR_PD_AMT_2,LINE_BENE_PRMRY_PYR_PD_AMT_3,LINE_BENE_PRMRY_PYR_PD_AMT_4,LINE_BENE_PRMRY_PYR_PD_AMT_5,LINE_BENE_PRMRY_PYR_PD_AMT_6,LINE_BENE_PRMRY_PYR_PD_AMT_7,LINE_BENE_PRMRY_PYR_PD_AMT_8,LINE_BENE_PRMRY_PYR_PD_AMT_9,LINE_BENE_PRMRY_PYR_PD_AMT_10,LINE_BENE_PRMRY_PYR_PD_AMT_11,LINE_BENE_PRMRY_PYR_PD_AMT_12,LINE_BENE_PRMRY_PYR_PD_AMT_13,LINE_COINSRNC_AMT_1,LINE_COINSRNC_AMT_2,LINE_COINSRNC_AMT_3,LINE_COINSRNC_AMT_4,LINE_COINSRNC_AMT_5,LINE_COINSRNC_AMT_6,LINE_COINSRNC_AMT_7,LINE_COINSRNC_AMT_8,LINE_COINSRNC_AMT_9,LINE_COINSRNC_AMT_10,LINE_COINSRNC_AMT_11,LINE_COINSRNC_AMT_12,LINE_COINSRNC_AMT_13,LINE_ALOWD_CHRG_AMT_1,LINE_ALOWD_CHRG_AMT_2,LINE_ALOWD_CHRG_AMT_3,LINE_ALOWD_CHRG_AMT_4,LINE_ALOWD_CHRG_AMT_5,LINE_ALOWD_CHRG_AMT_6,LINE_ALOWD_CHRG_AMT_7,LINE_ALOWD_CHRG_AMT_8,LINE_ALOWD_CHRG_AMT_9,LINE_ALOWD_CHRG_AMT_10,LINE_ALOWD_CHRG_AMT_11,LINE_ALOWD_CHRG_AMT_12,LINE_ALOWD_CHRG_AMT_13,LINE_PRCSG_IND_CD_1,LINE_PRCSG_IND_CD_2,LINE_PRCSG_IND_CD_3,LINE_PRCSG_IND_CD_4,LINE_PRCSG_IND_CD_5,LINE_PRCSG_IND_CD_6,LINE_PRCSG_IND_CD_7,LINE_PRCSG_IND_CD_8,LINE_PRCSG_IND_CD_9,LINE_PRCSG_IND_CD_10,LINE_PRCSG_IND_CD_11,LINE_PRCSG_IND_CD_12,LINE_PRCSG_IND_CD_13,LINE_ICD9_DGNS_CD_1,LINE_ICD9_DGNS_CD_2,LINE_ICD9_DGNS_CD_3,LINE_ICD9_DGNS_CD_4,LINE_ICD9_DGNS_CD_5,LINE_ICD9_DGNS_CD_6,LINE_ICD9_DGNS_CD_7,LINE_ICD9_DGNS_CD_8,LINE_ICD9_DGNS_CD_9,LINE_ICD9_DGNS_CD_10,LINE_ICD9_DGNS_CD_11,LINE_ICD9_DGNS_CD_12,LINE_ICD9_DGNS_CD_13
0,0061141AB18FFA96,887453388368559,2008-01-27,2008-01-27,43400,V5873,7859,4389,34830,4241,6929,5571,2408497000.0,,,,,,,,,,,,,620499266,,,,,,,,,,,,,99231,,,,,,,,,,,,,10.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,10.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,O,,,,,,,,,,,,,4019,,,,,,,,,,,,


In [9]:
print(carrier_claimsDF.shape)
carrier_claimsDF.dtypes

(74, 142)


DESYNPUF_ID                              object
CLM_ID                                    int64
CLM_FROM_DT                      datetime64[ns]
CLM_THRU_DT                      datetime64[ns]
ICD9_DGNS_CD_1                           object
ICD9_DGNS_CD_2                           object
ICD9_DGNS_CD_3                           object
ICD9_DGNS_CD_4                           object
ICD9_DGNS_CD_5                           object
ICD9_DGNS_CD_6                           object
ICD9_DGNS_CD_7                           object
ICD9_DGNS_CD_8                           object
PRF_PHYSN_NPI_1                         float64
PRF_PHYSN_NPI_2                         float64
PRF_PHYSN_NPI_3                         float64
PRF_PHYSN_NPI_4                         float64
PRF_PHYSN_NPI_5                         float64
PRF_PHYSN_NPI_6                         float64
PRF_PHYSN_NPI_7                          object
PRF_PHYSN_NPI_8                          object
PRF_PHYSN_NPI_9                         

In [10]:
datetime.now()

datetime.datetime(2020, 5, 2, 20, 52, 7, 529373)

#### Adding lookup for 10 Diagnostic codes for carrier  claims

In [None]:
datetime.now()

In [None]:
q = '''SELECT  
            CC."DESYNPUF_ID", 
            ICD9D1.long_desc as DGNS_CD_1_desc,
            ICD9D2.long_desc as DGNS_CD_2_desc,
            ICD9D3.long_desc as DGNS_CD_3_desc,
            ICD9D4.long_desc as DGNS_CD_4_desc,
            ICD9D5.long_desc as DGNS_CD_5_desc,
            ICD9D6.long_desc as DGNS_CD_6_desc,
            ICD9D7.long_desc as DGNS_CD_7_desc,
            ICD9D8.long_desc as DGNS_CD_8_desc
                 
       FROM 
                      carrier_claims as CC
            LEFT JOIN icd9_diagonsis as ICD9D1 ON CC."ICD9_DGNS_CD_1" = ICD9D1.diagnosis_cd
            LEFT JOIN icd9_diagonsis as ICD9D2 ON CC."ICD9_DGNS_CD_2" = ICD9D2.diagnosis_cd
            LEFT JOIN icd9_diagonsis as ICD9D3 ON CC."ICD9_DGNS_CD_3" = ICD9D3.diagnosis_cd
            LEFT JOIN icd9_diagonsis as ICD9D4 ON CC."ICD9_DGNS_CD_4" = ICD9D4.diagnosis_cd
            LEFT JOIN icd9_diagonsis as ICD9D5 ON CC."ICD9_DGNS_CD_5" = ICD9D5.diagnosis_cd
            LEFT JOIN icd9_diagonsis as ICD9D6 ON CC."ICD9_DGNS_CD_6" = ICD9D6.diagnosis_cd
            LEFT JOIN icd9_diagonsis as ICD9D7 ON CC."ICD9_DGNS_CD_7" = ICD9D7.diagnosis_cd
            LEFT JOIN icd9_diagonsis as ICD9D8 ON CC."ICD9_DGNS_CD_8" = ICD9D8.diagnosis_cd

            ;
    '''

ICD_descDF = query_func(q, conn)
print(ICD_descDF.shape)
ICD_descDF.head(1)

In [None]:
datetime.now()

#### Adding lookup for Line Diagnosis codes for carrier  claims

In [None]:
datetime.now()

In [None]:
q = '''SELECT  
            CC."DESYNPUF_ID", 
            ICD9D1.long_desc as LINE_DGNS_CD_1_desc,
            ICD9D2.long_desc as LINE_DGNS_CD_2_desc,
            ICD9D3.long_desc as LINE_DGNS_CD_3_desc,
            ICD9D4.long_desc as LINE_DGNS_CD_4_desc,
            ICD9D5.long_desc as LINE_DGNS_CD_5_desc,
            ICD9D6.long_desc as LINE_DGNS_CD_6_desc,
            ICD9D7.long_desc as LINE_DGNS_CD_7_desc,
            ICD9D8.long_desc as LINE_DGNS_CD_8_desc,
            ICD9D9.long_desc as LINE_DGNS_CD_9_desc,
            ICD9D10.long_desc as LINE_DGNS_CD_10_desc,
            ICD9D11.long_desc as LINE_DGNS_CD_11_desc,
            ICD9D12.long_desc as LINE_DGNS_CD_12_desc,
            ICD9D13.long_desc as LINE_DGNS_CD_13_desc
            
                 
       FROM 
                      carrier_claims as CC
            LEFT JOIN icd9_diagonsis as ICD9D1 ON CC."LINE_ICD9_DGNS_CD_1" = ICD9D1.diagnosis_cd
            LEFT JOIN icd9_diagonsis as ICD9D2 ON CC."LINE_ICD9_DGNS_CD_2" = ICD9D2.diagnosis_cd
            LEFT JOIN icd9_diagonsis as ICD9D3 ON CC."LINE_ICD9_DGNS_CD_3" = ICD9D3.diagnosis_cd
            LEFT JOIN icd9_diagonsis as ICD9D4 ON CC."LINE_ICD9_DGNS_CD_4" = ICD9D4.diagnosis_cd
            LEFT JOIN icd9_diagonsis as ICD9D5 ON CC."LINE_ICD9_DGNS_CD_5" = ICD9D5.diagnosis_cd
            LEFT JOIN icd9_diagonsis as ICD9D6 ON CC."LINE_ICD9_DGNS_CD_6" = ICD9D6.diagnosis_cd
            LEFT JOIN icd9_diagonsis as ICD9D7 ON CC."LINE_ICD9_DGNS_CD_7" = ICD9D7.diagnosis_cd
            LEFT JOIN icd9_diagonsis as ICD9D8 ON CC."LINE_ICD9_DGNS_CD_8" = ICD9D8.diagnosis_cd
            LEFT JOIN icd9_diagonsis as ICD9D9 ON CC."LINE_ICD9_DGNS_CD_9" = ICD9D9.diagnosis_cd
            LEFT JOIN icd9_diagonsis as ICD9D10 ON CC."LINE_ICD9_DGNS_CD_10" = ICD9D10.diagnosis_cd
            LEFT JOIN icd9_diagonsis as ICD9D11 ON CC."LINE_ICD9_DGNS_CD_11" = ICD9D11.diagnosis_cd
            LEFT JOIN icd9_diagonsis as ICD9D12 ON CC."LINE_ICD9_DGNS_CD_12" = ICD9D12.diagnosis_cd
            LEFT JOIN icd9_diagonsis as ICD9D13 ON CC."LINE_ICD9_DGNS_CD_13" = ICD9D13.diagnosis_cd
            
            ;
    '''

LineICD_descDF = query_func(q, conn)
print(LineICD_descDF.shape)
LineICD_descDF.head(2)

In [None]:
datetime.now()

#### Adding lookup for HCPCS codes for carrier  claims

In [None]:
datetime.now()

In [None]:
q = '''SELECT  
            CC."DESYNPUF_ID", 
            h1."DESCRIPTION" as hcpcs_CD_1_desc,
            h2."DESCRIPTION" as hcpcs_CD_2_desc,
            h3."DESCRIPTION" as hcpcs_CD_3_desc,
            h4."DESCRIPTION" as hcpcs_CD_4_desc,
            h5."DESCRIPTION" as hcpcs_CD_5_desc,
            h6."DESCRIPTION" as hcpcs_CD_6_desc,
            h7."DESCRIPTION" as hcpcs_CD_7_desc,
            h8."DESCRIPTION" as hcpcs_CD_8_desc,
            h9."DESCRIPTION" as hcpcs_CD_9_desc,
            h10."DESCRIPTION" as hcpcs_CD_10_desc,
            h11."DESCRIPTION" as hcpcs_CD_11_desc
            
       FROM 
                      carrier_claims as CC
            LEFT JOIN hcpcs as h1 ON CAST(CC."HCPCS_CD_1" as varchar) = CAST(h1."HCPCS" as varchar)
            LEFT JOIN hcpcs as h2 ON CAST(CC."HCPCS_CD_2" as varchar) = CAST(h2."HCPCS" as varchar)
            LEFT JOIN hcpcs as h3 ON CAST(CC."HCPCS_CD_3" as varchar) = CAST(h3."HCPCS" as varchar)
            LEFT JOIN hcpcs as h4 ON CAST(CC."HCPCS_CD_4" as varchar) = CAST(h4."HCPCS" as varchar)
            LEFT JOIN hcpcs as h5 ON CAST(CC."HCPCS_CD_5" as varchar) = CAST(h5."HCPCS" as varchar)
            LEFT JOIN hcpcs as h6 ON CAST(CC."HCPCS_CD_6" as varchar) = CAST(h6."HCPCS" as varchar)
            LEFT JOIN hcpcs as h7 ON CAST(CC."HCPCS_CD_7" as varchar) = CAST(h7."HCPCS" as varchar)
            LEFT JOIN hcpcs as h8 ON CAST(CC."HCPCS_CD_8" as varchar) = CAST(h8."HCPCS" as varchar)
            LEFT JOIN hcpcs as h9 ON CAST(CC."HCPCS_CD_9" as varchar) = CAST(h9."HCPCS" as varchar)
            LEFT JOIN hcpcs as h10 ON CAST(CC."HCPCS_CD_10" as varchar) = CAST(h10."HCPCS" as varchar)
            LEFT JOIN hcpcs as h11 ON CAST(CC."HCPCS_CD_11" as varchar) = CAST(h11."HCPCS" as varchar)

            ;
    '''

hcpcsDF_carrier = query_func(q, conn)
print(hcpcsDF_carrier.shape)
hcpcsDF_carrier.head(5)

In [None]:
datetime.now()

#### Merging ICD diagnostic description to Carrier claims DF

In [None]:
carrier_claimsDF.index, ICD_descDF.index, LineICD_descDF.index, hcpcsDF_carrier.index

In [None]:
ICD_descDF.index

In [None]:
datetime.now()

In [None]:
carrier_claimsDF = carrier_claimsDF.merge(ICD_descDF, on='DESYNPUF_ID', 
                                              how='inner',  left_index=True, right_index=True)

In [None]:
datetime.now()  

In [None]:
print(carrier_claimsDF.shape)
carrier_claimsDF.head(2)

#### Merging Line ICD diagnostic description to Carrier claims DF

In [None]:
datetime.now()  

In [None]:
carrier_claimsDF = carrier_claimsDF.merge(LineICD_descDF, on='DESYNPUF_ID', 
                                              how='inner',  left_index=True, right_index=True)

In [None]:
datetime.now()  

In [None]:
print(carrier_claimsDF.shape)
carrier_claimsDF.head(2)

In [None]:
datetime.now()

#### Merging HCPCS description to carrier claims DF

In [None]:
datetime.now()  

In [None]:
carrier_claimsDF = carrier_claimsDF.merge(hcpcsDF_carrier, on='DESYNPUF_ID', 
                                              how='inner',  left_index=True, right_index=True)

In [None]:
datetime.now()  

In [None]:
print(carrier_claimsDF.shape)
carrier_claimsDF.head(2)

### Adding these DataFrames to the Database (overwriting)

In [None]:
# create an sqlalchemy connection
conn_postgres = f'postgresql://{db_user}:{db_password}@{db_host}:{db_port}'
engine = sqlalchemy.engine.create_engine(conn_postgres)

In [None]:
# connect using sqlalchemy
connection = engine.connect()

In [None]:
# commit
connection.execute('commit')

In [None]:
# close connection
connection.close()

In [None]:
# open a new connection to the database that we created
conn_str = f'postgresql://{db_user}:{db_password}@{db_host}:{db_port}/{database}'
conn = psycopg2.connect(conn_str)

In [None]:
# connect using sqlalchemy
engine = sqlalchemy.engine.create_engine(conn_str)

#### Loading files to DataBase

##### Carrier_claims

In [None]:
datetime.now()

In [None]:
carrier_claimsDF.to_sql(name = 'carrier_claims', con = engine, if_exists = 'replace', index = False) 

In [None]:
datetime.now()

In [None]:
conn.commit() 

In [None]:
del [carrier_claimsDF]
#gc.collect()

carrier_claimsDF = pd.DataFrame()