In [1]:
import psycopg2 as pg2

In [2]:
conn = pg2.connect(dbname='medicare', user='postgres')

In [3]:
cur = conn.cursor()

In [24]:
# Insert provider/drug data into psql

In [4]:
query = '''
        CREATE TABLE npi_drug_13 (
            npi integer
            , last_name text
            , first_name varchar(50)
            , provider_city varchar(50)
            , provider_state varchar(5)
            , specialty_desc text
            , description_flag varchar(5)
            , drug_name varchar(50)
            , generic_name varchar(50)
            , bene_count integer
            , total_claims integer
            , total_day_supply integer
            , total_drug_cost float
            , bene_count_ge65 integer
            , bene_count_ge65_redact varchar(5)
            , total_claim_count_ge65 integer
            , ge65_redact_flag varchar(5)
            , total_day_supply_ge65 integer
            , total_drug_cost_ge65 float
            );
        '''

cur.execute(query)

In [5]:
query = '''
        COPY npi_drug_13
        FROM '/home/ubuntu/medicare-project/data/pp_npi_drug_13.tab' 
        DELIMITER '\t' 
        HEADER
        CSV;
        '''
cur.execute(query)

In [6]:
conn.commit()

In [25]:
# Insert provider summary into psql

In [4]:
query = '''
        CREATE TABLE npi_13 (
            npi integer
            , last_name text
            , first_name varchar(50)
            , middle_initial varchar(5)
            , credentials varchar(20)
            , gender varchar(5)
            , entity_code varchar(10)
            , provider_street1 text
            , prover_street2 text
            , provider_city varchar(40)
            , provider_zip varchar(20)
            , provider_state varchar(5)
            , provider_country varchar(20)
            , specialty_desc text
            , description_flag varchar(10)
            , bene_count float
            , total_claim_count integer
            , total_drug_cost float
            , total_day_supply integer
            , bene_count_ge65 float
            , bene_count_ge65_redact varchar(5)
            , total_claim_count_ge65 int
            , ge65_redact_flag varchar(5)
            , total_drug_cost_ge65 float
            , total_day_supply_ge65 float
            , brand_claim_count float
            , brand_redact_flag varchar(5)
            , brand_claim_cost float
            , generic_claim_count float
            , generic_redact_flag varchar(5)
            , generic_claim_cost float
            , other_claim_count float
            , other_redact_flag varchar(5)
            , other_claim_cost float
            , mapd_claim_count float
            , mapd_redact_flag varchar(5)
            , mapd_claim_cost float
            , pdp_claim_count float
            , pdp_redact_flag varchar(5)
            , pdp_claim_cost float
            , lis_claim_count float
            , lis_redact_flag varchar(5)
            , lis_claim_cost float
            , nonlis_claim_count float
            , nonlis_redact_flag varchar(5)
            , nonlis_claim_cost float
            );
        '''

cur.execute(query)

In [5]:
query = '''
        COPY npi_13
        FROM '/home/ubuntu/medicare-project/data/pp_npi_13.tab' 
        DELIMITER '\t' 
        HEADER
        CSV;
        '''
cur.execute(query)
conn.commit()

In [26]:
# Helper code to insert large csv into psql. Inserts all columns as varchar

In [38]:
#%%writefile psql_helper.py

'''
A Psql helper module
~~~~~~~~~~~~~~~~~~~~
Author: Ming Huang
Last Edit Date: 09/14/2015
The module included is intended to make it easier to quickly to load csv data 
into Psql and prevent having to write multiple create and insert queries, and 
is a byproduct of the Galvanize Data Science Capstone project.
This is only a first iteration, please feel free to contact me if you see 
something wrong.  Error handling has not been included yet.
'''

import psycopg2
import os
import re


class PsqlConnection(object):
    '''
    Provides an interface for quick table creation and data insertion into Psql
    databases using Python.  This module is created with the intention for 
    quick data and automated Psql data storage without having to experiment 
    with writing create and insert queries against uncertain data types.  
    While this is not conventional database practice, and does make it easier 
    to quickly transition into EDA using SQL for Data Science projects.
    Usage:
    psql = PsqlConnection(db='dbname', user='username')
    psql.create_table(headers, 'tablename')
    psql.insert_csv('tablename', 'csvpath')
    psql.load_csvs_in_directory('folderpath')
    '''

    def __init__(self, db, user, host='localhost'):
        '''
        INPUT:
            db -> string; name of database
            user -> string; name of database user
            host -> string; database ip address
        Initiates PsqlConnection class.  This class enables automated table 
        generations and data insertions on an existing psql database.
        '''
        self.conn = psycopg2.connect(dbname=db, user=user)
        self.cur = self.conn.cursor()
        self.db = db
        self.user = user
        self.host = host
        print 'Connection Open'

    def create_table(self, headers, table_name):
        '''
        INPUT:
            headers -> list; list of names of columns for new table
            table_name -> string; name of table to be created
        Creates a new table in existing database using the given list of 
        headers.  The query gets automatically generated by setting all columns
         to varchar.
        '''
        create_query = 'CREATE TABLE {0} ({1});'
        cols = self._varchar_columns(headers)
        self.cur.execute(create_query.format(table_name, cols))
        self.conn.commit()
        print 'Table {0} created in {1}'.format(table_name, self.db)

    def _varchar_columns(self, headers):
        '''
        INPUT:
            headers -> list; list of names of columns for new table
        OUTPUT:
            string; the column section of a create table query
        Helper function to generate the column section of a create table query.
        '''
        var = '{0} varchar(100)'
        cols = [var.format(header) for header in headers]
        return ', '.join(cols)

    def insert_csv(self, table_name, csv_path, if_header=True):
        '''
        INPUT:
            table_name -> string; name of table to insert csv file
            csv_path -> string; file path of the csv file with data
            if_header -> boolean; true or false if csv includes headers
        Inserts csv file to table in the current database.  Table must already 
        exist in database.
        '''
        copy_query = "COPY {0} FROM '{1}' WITH DELIMITER ',' CSV {2};"
        if if_header:
            header = 'HEADER'
        else:
            header = ''
        self.cur.execute(copy_query.format(table_name, csv_path, header))
        self.conn.commit()
        print 'CSV inserted into {0}.'.format(table_name)

    def drop_table(self, table_name):
        '''
        INPUT:
            table_name -> string; name of table to drop
        Drops a table in the current database.  Will not attempt to drop if 
        table does not exist.
        '''
        drop_query = 'DROP TABLE IF EXISTS {0};'
        self.cur.execute(drop_query.format(table_name))
        self.conn.commit()
        print 'Table {0} dropped.'.format(table_name)

    def load_csvs_in_directory(self, directory):
        '''
        INPUT:
            directory -> string; directory of files to insert into psql
        Takes all csv files from the given directory, and creates tables and 
        inserts data into the current psql database.  Will not work if the data
         is not in csv or comma delimited format. 
        '''
        for f in os.listdir(directory):
            if f.endswith('.csv'):
                file_path = '{0}/{1}'.format(directory, f)
                table_name = re.sub('-| ', '_', f.split('.')[0])
                headers = self._get_headers(file_path)
                self.drop_table(table_name)
                self.create_table(headers, table_name)
                self.insert_csv(table_name, file_path)

    def _get_headers(self, file_path):
        '''
        INPUT:
            file_path -> string; file_path of data file 
        OUTPUT:
            list; list of formatted headers
        Gets and formats the headers for query generation from the given file
        path.
        '''
        with open(file_path) as f:
            headers = f.readline().split(',')
            headers = [re.sub('\n| |"|\.', '', h) for h in headers]
            headers = [re.sub(' |/', '_', h) for h in headers]
            return [re.sub('_+', '_', h) for h in headers]

    def end_connection(self):
        '''
        Closes the current psql connection.
        '''
        self.conn.close()
        print 'Connection Closed'

In [2]:
import pandas as pd
import string
import re

In [4]:
df = pd.read_csv('../data/npidata_20050523-20160313FileHeader.csv')

In [5]:
header = df.columns.values.tolist()

In [6]:
# function to replace all the whitespace and punctuation in columns names
def remove_white_punc(lst):
    '''
    Input: List of strings (with whitespaces/punctuation)
    Output: List of strings (with whitespace/punctuation removed)
    '''
    regex = re.compile('[%s]'%re.escape(string.punctuation))
    
    for ix, name in enumerate(lst):
        lst[ix] = regex.sub('', name).replace(' ', '_')
    
    return lst

In [7]:
header_strip = remove_white_punc(header)

In [8]:
header_strip[3]

'Employer_Identification_Number_EIN'

In [27]:
# Insert NPI lookup file into psql

In [23]:
psql = PsqlConnection(db='medicare', user='postgres')
psql.create_table(header_strip, 'npi_name')
psql.insert_csv('npi_name', "/home/ubuntu/medicare-project/data/npidata_20050523-20160313.csv")

Connection Open
Table npi_name created in medicare
CSV inserted into npi_name.


In [4]:
# Change npi column in NPI lookup to integer

In [5]:
query = '''
        ALTER TABLE npi_name
        ALTER COLUMN npi
        TYPE integer 
        USING npi::integer;
        '''
cur.execute(query)
conn.commit()

In [37]:
# Insert healthcare provider taxonomy code description into psql

In [41]:
df = pd.read_csv('data/nucc_taxonomy_160.csv')

In [42]:
df.head()

Unnamed: 0,Code,Grouping,Classification,Specialization,Definition,Notes
0,101Y00000X,Behavioral Health & Social Service Providers,Counselor,,A provider who is trained and educated in the ...,Sources: Abridged from definitions provided by...
1,101YA0400X,Behavioral Health & Social Service Providers,Counselor,Addiction (Substance Use Disorder),Definition to come...,
2,101YM0800X,Behavioral Health & Social Service Providers,Counselor,Mental Health,Definition to come...,
3,101YP1600X,Behavioral Health & Social Service Providers,Counselor,Pastoral,Definition to come...,
4,101YP2500X,Behavioral Health & Social Service Providers,Counselor,Professional,Definition to come...,


In [8]:
# Drop definition and notes columns - not needed
# Write out to csv

In [44]:
df_taxonomy_striped = df[['Code', 'Grouping', 'Classification', 'Specialization']]

In [59]:
df_taxonomy_striped.to_csv('data/nucc_taxonomy_160_stripped.csv', index=False, na_rep='None')

In [9]:
# non-utf8 character in file, stripped using unix 

In [4]:
query = '''
        CREATE TABLE taxonomy_lookup (
            code varchar(15)
            , grouping varchar(100)
            , classification varchar(100)
            , specialization varchar(100)
            );
        '''

cur.execute(query)

In [5]:
query = '''
        COPY taxonomy_lookup
        FROM '/home/ubuntu/medicare-project/data/nucc_taxonomy_160_stripped_clean.csv' 
        DELIMITER ',' 
        HEADER
        CSV;
        '''
cur.execute(query)

In [6]:
conn.commit()

### Insert Medicare Provider Payments into PSQL

In [4]:
import pandas as pd

In [5]:
# Header was stripped from file and placed into its own file using unix

In [6]:
misc_payments_header = pd.read_csv('../data/Medicare_Provider_Util_Payment_PUF_CY2013_header.txt', sep='\t')

In [7]:
misc_payments_header.columns

Index([u'NPI', u'NPPES_PROVIDER_LAST_ORG_NAME', u'NPPES_PROVIDER_FIRST_NAME',
       u'NPPES_PROVIDER_MI', u'NPPES_CREDENTIALS', u'NPPES_PROVIDER_GENDER',
       u'NPPES_ENTITY_CODE', u'NPPES_PROVIDER_STREET1',
       u'NPPES_PROVIDER_STREET2', u'NPPES_PROVIDER_CITY',
       u'NPPES_PROVIDER_ZIP', u'NPPES_PROVIDER_STATE',
       u'NPPES_PROVIDER_COUNTRY', u'PROVIDER_TYPE',
       u'MEDICARE_PARTICIPATION_INDICATOR', u'PLACE_OF_SERVICE', u'HCPCS_CODE',
       u'HCPCS_DESCRIPTION', u'HCPCS_DRUG_INDICATOR', u'LINE_SRVC_CNT',
       u'BENE_UNIQUE_CNT', u'BENE_DAY_SRVC_CNT',
       u'AVERAGE_MEDICARE_ALLOWED_AMT', u'STDEV_MEDICARE_ALLOWED_AMT',
       u'AVERAGE_SUBMITTED_CHRG_AMT', u'STDEV_SUBMITTED_CHRG_AMT',
       u'AVERAGE_MEDICARE_PAYMENT_AMT', u'STDEV_MEDICARE_PAYMENT_AMT'],
      dtype='object')

In [4]:
query = '''
        CREATE TABLE util_payments_2013 (
            npi integer
            , last_name varchar(100)
            , first_name varchar(50)
            , middle_initial varchar(20)
            , credentials varchar(50)
            , gender varchar(5)
            , entity_code varchar(20)
            , street1 varchar(100)
            , street2 varchar(100)
            , city varchar(50)
            , zip varchar(20)
            , state varchar(10)
            , country varchar(10)
            , provider_type varchar(50)
            , medicare_participation_indicator varchar(10)
            , place_of_service varchar(10)
            , hcpcs_code varchar(15)
            , hcpcs_desc text
            , hcpcs_drug_indicator varchar(10)
            , line_srvc_count float
            , bene_unique_count integer
            , bene_day_srvc_count integer
            , avg_medicare_allowed_amt float
            , stddev_medicare_allowed_amt float
            , avg_submitted_chg_amt float
            , stddev_submitted_chg_amt float
            , avg_medicare_payment_amt float
            , stddev_medicare_payment_amt float
            );
        '''

cur.execute(query)

In [5]:
query = '''
        COPY util_payments_2013
        FROM '/home/ubuntu/medicare-project/data/Medicare_Provider_Util_Payment_PUF_CY2013_Strip.txt' 
        DELIMITER '\t' 
        CSV;
        '''
cur.execute(query)

In [6]:
conn.commit()

### Insert Medicare Provider Payments Aggregate into PSQL

In [4]:
query = '''
        CREATE TABLE util_payments_agg_2013 (
            index integer
            , npi integer
            , last_name varchar(100)
            , first_name varchar(50)
            , middle_initial varchar(20)
            , credentials varchar(50)
            , gender varchar(5)
            , entity_code varchar(20)
            , street1 varchar(100)
            , street2 varchar(100)
            , city varchar(50)
            , zip varchar(20)
            , state varchar(10)
            , country varchar(10)
            , provider_type varchar(50)
            , medicare_participation_indicator varchar(10)
            , num_hcpcs integer
            , num_services float
            , num_unique_bene integer
            , total_submitted_charges float
            , total_medicare_allowed_amt float
            , total_medicare_payment_amt float
            , drug_suppress_indicator varchar(10)
            , num_hcpcs_associated_drug_srvc float
            , num_drug_srvc float
            , num_unique_bene_with_drug_srvc float
            , total_drug_submitted_charges float
            , total_drug_medicare_allowed_amt float
            , total_drug_medicare_payment_amt float
            , medical_suppress_indicator varchar(10)
            , num_hcpcs_associated_med_srvc float
            , num_med_srvc float
            , num_unique_bene_with_med_srvc float
            , total_med_submitted_charges float
            , total_med_medicare_allowed_amt float
            , total_med_medicare_payment_amt float
            , avg_age_bene float
            , num_bene_le65 float
            , num_bene_65to74 float
            , num_bene_75to84 float
            , num_bene_ge84 float
            , num_female float
            , num_male float
            , num_non_his_white float
            , num_african_american float
            , num_asian float
            , num_hispanic float
            , num_american_indian float
            , num_no_race float
            , num_medicare_only float
            , num_medicare_medicaid float
            , pcnt_alzheimers_dementia float
            , pcnt_asthma float
            , pcnt_artrial_fibrillation float
            , pcnt_cancer float
            , pcnt_chronic_kidney float
            , pcnt_chronic_obstructive_pulmonary float
            , pcnt_depression float
            , pcnt_diabetes float
            , pcnt_heart_failure float
            , pcnt_hyperlipidemia float
            , pcnt_hypertension float
            , pcnt_ischemic_heart float
            , pcnt_osteoporosis float
            , pcnt_rheumatoid_arthritis_osteoarthirtis float
            , pcnt_schizophrenia_psychotic float
            , pcnt_stroke float
            , avg_hcc_risk_score float
            );
        '''

cur.execute(query)

In [5]:
query = '''
        COPY util_payments_agg_2013
        FROM '/home/ubuntu/medicare-project/data/Medicare-Physician-and-Other-Supplier-NPI-Aggregate-CY2013.csv' 
        DELIMITER ',' 
        HEADER
        CSV;
        '''
cur.execute(query)

In [6]:
conn.commit()

In [4]:
# Input label data into psql

In [4]:
query = '''
        CREATE TABLE indictments_2013 (
            index integer
            , first_name varchar(15)
            , middle_name varchar(15)
            , last_name varchar(50)
            , state varchar(5)
            , npi_status boolean
            , link text
            , status varchar(50)
            );
        '''

cur.execute(query)

In [5]:
query = '''
        COPY indictments_2013
        FROM '/home/ubuntu/medicare-project/data/medicare_labels_2013.csv' 
        DELIMITER ',' 
        HEADER
        CSV;
        '''
cur.execute(query)

In [6]:
conn.commit()