# Halloween CSV to API db Initial Load POC

## Initial Set-up

In [1]:
import pandas as pd
import datetime
import os
from utils import createEngine
import uuid
import numpy as np
import warnings
from pangres import upsert
from urllib.parse import urljoin
import requests
import zipfile
import io

warnings.simplefilter(action='ignore', category=pd.errors.DtypeWarning)
warnings.simplefilter(
    action='ignore', category=pd.errors.SettingWithCopyWarning)


### Define helper functions

In [2]:

def convertBool(val):
    """
    The Halloween CSVs use 1 and 0 to represent True and False. This function maps the 1s to True and the 0s to False.
    """
    primary_to_bool = {1: True, 0: False}
    if val in primary_to_bool.keys():
        return primary_to_bool[val]
    else:
        return False

def show_or_load(df, table_name, schema_name, engine, load=True):
    """
    This function allows you to decide whether you do or don't proceed with loading data (so you can focus on preparing it/ debugging).
    It also prints out the name of the table that is being loaded, so you can tell what has been loaded and what is in progress.
    """
    if load:
        print(f'Loading {table_name}')
        df.to_sql(table_name, schema=schema_name, con=engine,
                  if_exists='append', index=False)
    else:
        print(f'Showing {table_name}')
        print(df.head())

def gen_uuid_id(base_url, id):
    url = urljoin(base_url, id)
    return uuid.uuid5(uuid.NAMESPACE_URL, url)

### Create a database engine from the settings specified in your .env file

In [3]:
engine = createEngine()

### Define the scratch workspace where the Halloween CSVs are located and where the NPPES Main File will be downloaded

In [5]:
scratch_dir = os.path.join('..','scratch')

## Get data to fill gaps in Halloween CSVs

### Get FIPS State Reference Data
We need to load the FIPS state reference data from the target db, to serve as a lookup table between state abbreviations and state codes, because the Halloween CSVs only contain state abbreviations but the db utilizes state codes

In [6]:
fips_state_df = pd.read_sql('select * from npd.fips_state', con = engine)
fips_state_df.set_index('abbreviation', inplace=True)

### Get NPI Data
Since the Halloween CSV files do not contain sufficient attributes for NPIs, we need to download the latest NPPES main file and get the additional NPI fields that the target db is expecting

In [7]:
def generateNPPESVersion(version = 'Monthly', days_ago = 0):
    current_date = datetime.datetime.now() - datetime.timedelta(days = days_ago)
    if version == 'Monthly':
        current_month = current_date.strftime("%B")
        current_year = current_date.year
        csv_version = f'{current_month}_{current_year}_V2'
    else:
        current_week_start = current_date - datetime.timedelta(days=current_date.weekday()-7).strftime("%")
        current_week_end = current_date + datetime.timedelta(days = 6)

    return csv_version

In [8]:
current_date = datetime.datetime.now()
current_month = current_date.strftime("%B")
prior_month = (current_date - datetime.timedelta(days = 30)).strftime()
current_year = current_date.year
csv_version = f'{current_month}_{current_year}_V2'
nppes_dir = os.path.join(scratch_dir,'nppes')

# Download and unzip the NPPES CSV files
#zipData = requests.get(f'https://download.cms.gov/nppes/NPPES_Data_Dissemination_{csv_version}.zip').content

TypeError: strftime() missing required argument 'format' (pos 1)

In [9]:
f'https://download.cms.gov/nppes/NPPES_Data_Dissemination_{csv_version}.zip'

NameError: name 'csv_version' is not defined

In [10]:
current_date = datetime.datetime.now()
current_month = current_date.strftime("%B")
current_year = current_date.year
csv_version = f'{current_month}_{current_year}_V2'
nppes_dir = os.path.join(scratch_dir,'nppes')

# Download and unzip the NPPES CSV files
#zipData = requests.get(f'https://download.cms.gov/nppes/NPPES_Data_Dissemination_{csv_version}.zip').content
#with zipfile.ZipFile(io.BytesIO(zipData), 'r') as zip_file:
#   zip_file.extractall(nppes_dir)
main_files = [f for f in os.listdir(nppes_dir) if 'npidata_pfile' in f and '_fileheader' not in f]
main_files.sort()
latest_main_file = main_files[-1]

npi_df = pd.read_csv(os.path.join(nppes_dir, latest_main_file), usecols = ['Provider Last Name (Legal Name)', 'NPI', 'Entity Type Code', 'Replacement NPI', 'Provider Enumeration Date', 'Last Update Date',
                   'NPI Deactivation Reason Code', 'NPI Deactivation Date', 'NPI Reactivation Date', 'Certification Date'])
npi_df_renamed = npi_df.rename(columns={
        'NPI': 'npi',
        'Entity Type Code': 'entity_type_code',
        'Replacement NPI': 'replacement_npi',
        'Provider Enumeration Date': 'enumeration_date',
        'Last Update Date': 'last_update_date',
        'NPI Deactivation Reason Code': 'deactivation_reason_code',
        'NPI Deactivation Date': 'deactivation_date',
        'NPI Reactivation Date': 'reactivation_date',
        'Certification Date': 'certification_date'
    })

### Populate NPI Fields
The NPPES main file strips certain field values for records with deactivated NPIs, so we populate those as needed for the target db.
1. Deactivated NPIs show up without entity_type_code values, but those are required in the db. We use the Provider Last Name (Legal Name) field to intuit whether a provider is an invidual (if there is a last name listed, the provider has entity_type_code 1) or an organization (if there is not a last name listed, the provider has entity_type_code 2)
2. Deactivated NPIs show up without enumeration_date and last_update_date values. We populate bogus dates of 1/1/1900.

In [11]:

deactivated_npi1_condition = (npi_df_renamed['entity_type_code'].isnull())&~(npi_df_renamed['Provider Last Name (Legal Name)'].isnull())
deactivated_npi2_condition = (npi_df_renamed['entity_type_code'].isnull())&(npi_df_renamed['Provider Last Name (Legal Name)'].isnull())
npi_df_renamed.loc[deactivated_npi1_condition, ['entity_type_code', 'enumeration_date', 'last_update_date']] = [1, '1900-01-01', '1900-01-01']
npi_df_renamed.loc[deactivated_npi2_condition, ['entity_type_code', 'enumeration_date', 'last_update_date']] = [2, '1900-01-01', '1900-01-01']
del npi_df_renamed['Provider Last Name (Legal Name)']

## Process Halloween Data

### Read in the Halloween CSVs 
We loop through the halloween_data folder (downloaded from the [Halloween Release Google Drive Folder](https://drive.google.com/drive/folders/1zvneyQi7xNReIfeKkdpTgxX1BziaeKkT)) and store each in a dictionary. Optionally, we can load them to the raw_csv schema within the database, to facilitate querying and inspection.

In [12]:
load_raw_csvs = False
df_dict={}
for f in os.listdir(os.path.join(scratch_dir, 'halloween_data')):
    if '.csv' in f:
        tablename = f.split('.csv')[0]
        df = pd.read_csv(os.path.join(scratch_dir,'halloween_data',f), na_values=[''], keep_default_na=False)
        df_dict[f]=df
        if load_raw_csvs:
            df.to_sql(tablename, index=False, schema = 'raw_csv', con = engine, if_exists='replace')

### Structure Practitioner Data
Using the pracitioner csv and the practiitonerrole csv, we transform the data into the formats necessary for the individual provider tables:
* Assign a uuid id to each practitioner record
* Map the following csv fields to db fields:
    * gender_code to sex, because that is how the field is captured in NPPES (in the ETL process it is renamed to gender_code, but here it is renamed back)
    * name_prefix to prefix
    * name_suffix to suffix
* Assign a name_use_id of 1 for "usual," which is the FHIR code assigned to the primary name. Since we don't have any other provider names listed, we assume that the name provided here is the primary name
* Join practitioner data with taxonomy data, so we can associate our new uuids with the taxonomy records
* Filter out the practitioner with the NPI 1770923773, because that NPI does not exist
* Filter out the taxonomy records with a state code of 'ZZ,' because that state code does not exist
* Separate out taxonomy information from license information

In [13]:
practitioner_df = df_dict['practitioner.csv']
#note: we can do this because each practitioner only appears once in this table
practitioner_df['id'] = [gen_uuid_id('https://api/individual/', str(i)) for i in practitioner_df['id']]
practitioner_df_renamed = practitioner_df.rename(columns = {'gender_code': 'sex', 'name_prefix': 'prefix', 'name_suffix': 'suffix'})
practitioner_df_renamed['name_use_id'] = 1
practitioner_taxonomy_df = df_dict['practitionerrole.csv']
filtered_practitioner_taxonomy_df = practitioner_taxonomy_df.loc[practitioner_taxonomy_df['practitioner_id']!=1770923773]
merged_taxonomy_df = filtered_practitioner_taxonomy_df.merge(practitioner_df_renamed, left_on = 'practitioner_id', right_on = 'npi', suffixes = ('tax', 'individual')) 
merged_taxonomy_df = merged_taxonomy_df.loc[merged_taxonomy_df['state_code']!='ZZ']
merged_taxonomy_df['state_code'] = [fips_state_df.loc[i]['id'] if i in fips_state_df.index else np.nan for i in merged_taxonomy_df['state_code']]
merged_taxonomy_df_renamed = merged_taxonomy_df.rename(columns={'idindividual': 'individual_id', 'taxonomy_code':'nucc_code'})
provider_to_taxonomy_df = merged_taxonomy_df_renamed[['npi', 'nucc_code', 'is_primary']]
provider_to_taxonomy_df['is_primary'] = provider_to_taxonomy_df['is_primary'].apply(lambda x: convertBool(x))
dedup_taxonomy_df = provider_to_taxonomy_df.sort_values(by='is_primary', ascending=False)[
        ['npi', 'nucc_code', 'is_primary']].drop_duplicates(subset=['nucc_code', 'npi'])
dedup_taxonomy_df['id'] = [uuid.uuid4() for i in dedup_taxonomy_df.index]
license_df = dedup_taxonomy_df.merge(merged_taxonomy_df_renamed, on = ['npi', 'nucc_code'], suffixes = ('tax', 'cred'))
license_df_renamed = license_df.rename(columns={'id': 'provider_to_taxonomy_id'})

### Structure Organization Data
Using the organization csv and the organization_npi csv, we attempt to discern a hierarchical organization structure and transform the data into the formats necessary for the organization tables:
* Since we only have one name per organization, we assume this is the primary name and set the is_primary field (which will later be loaded into the organization_to_name table) to True
* We associate a uuid id with each organization 
* We back calculate the organization hierarchy by backpopulating the uuids into the old id and parent id fields
* We also ensure that each NPI is associated with its own organization and that the hierarchy is maintained when doing so


In [14]:
organization_df = df_dict['organization.csv']
organization_df['is_primary'] = True
organization_df_renamed = organization_df.rename(columns={'id':'old_org_id', 'parent_id':'old_parent_id', 'organization_name':'name'})
organization_df_renamed.set_index(['old_org_id'], inplace=True)
organization_df_renamed['org_id'] = [uuid.uuid4() for i in organization_df_renamed.index]
organization_df_renamed['org_parent_id'] = [organization_df_renamed.loc[i]['org_id'] if i in fips_state_df.index else np.nan for i in organization_df_renamed['old_parent_id']]
organization_npi_df = df_dict['organization_npi.csv']
organization_npi_df_renamed = organization_npi_df.rename(columns={'organization_id':'old_org_id'})
organization_npi_df_renamed['id'] = [gen_uuid_id('https://api/organization/', str(i)) for i in organization_npi_df_renamed['npi']]
clinical_organization_df = organization_npi_df_renamed.merge(organization_df_renamed, on='old_org_id', how='outer')
clinical_organization_df_renamed = clinical_organization_df.rename(columns={'org_id':'parent_id'})
other_organization_df = organization_df_renamed.rename(columns = {'org_id':'id', 'org_parent_id': 'parent_id'})


### Structure Endpoint Data
Using the endpoint csv, we transform the data into the necessary structure for the endpoint tables:
* Rename `fhir_url` to `address`
* Create a table of unique ehr vendors and assign a uuid to each
* Join the vendor uuids back to the endpoint data
* Populate fields that are not present in the dataset (environment_type_id and endpoint_connection_type_id) with hardcoded values
* Assign a uuid to each endpoint record

In [15]:
endpoint_df = df_dict['endpoint.csv']
endpoint_df_renamed = endpoint_df.rename(columns={'id':'endpoint_id','fhir_url':'address'})
ehr_vendor_df = endpoint_df.drop_duplicates(subset='vendor_name')
ehr_vendor_df['id'] = [gen_uuid_id('https://api/ehr_vendor/', str(i)) for i in ehr_vendor_df['vendor_name']]
ehr_vendor_df_renamed = ehr_vendor_df.rename(columns={'vendor_name':'name'})
ehr_vendor_df_renamed.set_index('name', inplace=True, drop=False)
endpoint_df_renamed['ehr_vendor_id'] = endpoint_df_renamed['vendor_name'].apply(lambda x: ehr_vendor_df_renamed.loc[x]['id'])
endpoint_df_renamed['environment_type_id'] = 'prod'
endpoint_df_renamed['endpoint_connection_type_id'] = 'hl7-fhir-rest'
endpoint_df_renamed['id'] = [gen_uuid_id('https://api/endpoint_instance/', str(i)) for i in endpoint_df_renamed['endpoint_id']]

### Structure Organization to Endpoint Data
Using the organization_endpoint csv, we transform the data into the necessary structure for the organization to endpoint relationship:
* Join the endpoint data to the organization_to_endpoint data so we can associate the endpoint uuids with the org_to_endpoint records and also join the organization data to the organization_to_endpoint data, so we can associate the organization uuids with the org_to_endpoint records

In [16]:
org_to_endpoint_df = df_dict['organization_endpoint.csv']
merged_org_to_endpoint_df = org_to_endpoint_df.merge(endpoint_df_renamed, on = 'endpoint_id', how='outer').merge(clinical_organization_df_renamed, left_on = 'organization_npi', right_on = 'npi', suffixes = ('endpoint', 'organization'), how='outer')
merged_org_to_endpoint_df= merged_org_to_endpoint_df[['idendpoint', 'idorganization']].rename(columns = {'idendpoint': 'endpoint_instance_id', 'idorganization':'organization_id'}).dropna()

### Structure Address Data
Using the location csv and the npi_location csv, transform the data into the necessary structure for the address and location tables:
* Rename the columns to align with the naming in the database:
    * `line` to `delivery_line_1` (note: ideally we would have multiple fields, one for each line)
    * `postalcode` to `zipcode`
    * `city` to `city_name`
* Assign a uuid to each address record
* Filter out the states that do not exist (FM, ~, UK, MH) (note: FM and MH are US territories, but the addresses with those values listed as states do not correspond to those territories)
* Populate the fips state codes based on state abbreviation
* Join the address data to npi_location in order to populate the address uuids
* Join the practitioner and organization data to npi_location in order to populate practitioner uuid and organization uuid
* Populate the address_use_id with a hard coded value of 2 (for work address), since there is no reference to the address type
* Assign a location uuid to each location (address associated with an organization)
* Associate the endpoints with the appropriate locations, based on their organization affiliations (we assume that each organization uses each endpoint at all their locations)

In [17]:
address_df = df_dict['location.csv']
address_df_renamed = address_df.rename(columns={'id':'address_us_id', 'line':'delivery_line_1', 'postalcode':'zipcode', 'city':'city_name'})
address_df_renamed['id']= [uuid.uuid4() for i in address_df_renamed.index]
address_df_renamed = address_df_renamed.loc[(address_df_renamed['state'] != 'FM') & (address_df_renamed['state'] != '~') & (address_df_renamed['state'] != 'UK') & (address_df['state'] != 'MH')]
address_df_renamed['state_code'] = address_df_renamed['state'].apply(lambda x: fips_state_df.loc[x]['id'])
location_npi_df = df_dict['npi_location.csv']
merged_df_1 = location_npi_df.merge(address_df_renamed, left_on='location_id', right_on = 'address_us_id', how='outer')
merged_df_2 = merged_df_1.merge(npi_df_renamed, on = 'npi', suffixes=('address','npi'), how='outer')
merged_df_3 = merged_df_2.merge(practitioner_df_renamed, on = 'npi', suffixes = ('address', 'individual'), how='outer')
merged_location_df = merged_df_3.merge(clinical_organization_df_renamed, on = 'npi', suffixes = ('address', 'organization'), how='outer')
merged_location_df_renamed = merged_location_df.rename(columns={'idaddress':'address_id', 'idindividual':'individual_id', 'id':'organization_id', 'nameaddress':'name'})
merged_location_df_renamed['address_use_id'] = 2
individual_to_address_df = merged_location_df_renamed[['address_id','individual_id', 'address_use_id']].dropna(how='any')
location_df = merged_location_df_renamed[['address_id','organization_id','name', 'address_use_id']].dropna(how='any')
location_df['id'] = [uuid.uuid4() for i in location_df.index]
location_to_endpoint_df = location_df.merge(merged_org_to_endpoint_df, on = 'organization_id', how='outer')[['id', 'endpoint_instance_id']].dropna(how = 'any').rename(columns = {'id':'location_id'})


### Structure Provider to Organization Data
Using the personal_npi_to_organizational_npi csv, transform the data into the necessary structure for the provider to organization and provider to location relationships:
* Join provider and organization data to associate the provider and organization uuids with the NPIs listed
* Assign a uuid to each provider to organization relationship
* Assign a relationship_type_id value of 2 for each relationship where the affiliation_source is 'PECOS Assignment Relationships'
* Join location information based on organization NPI (we assume each provider works at every location owned by the organization that they have a relationship with)
* Assign a uuid to each provider to location relationship

In [18]:
provider_to_organization_df = df_dict['personal_npi_to_organizational_npi.csv']
merged_provider_to_org_df = provider_to_organization_df.merge(practitioner_df_renamed, left_on = 'personal_npi', right_on = 'npi', how='inner').merge(clinical_organization_df_renamed, left_on = 'organizational_npi', right_on = 'npi', suffixes = ('individual', 'organization'), how='inner')
provider_to_org_df_renamed = merged_provider_to_org_df.rename(columns = {'idindividual':'individual_id', 'idorganization':'organization_id'})
provider_to_org_df_renamed['id'] = [uuid.uuid4() for i in provider_to_org_df_renamed.index]
provider_to_org_df_renamed['relationship_type_id'] = [2 if val=='PECOS Assignment Relationships' else val for val in provider_to_org_df_renamed['affiliation_source']]
provider_to_location_df = provider_to_org_df_renamed.merge(location_df, on='organization_id', how='inner', suffixes=('porg','location'))
provider_to_location_df['id'] = [uuid.uuid4() for i in provider_to_location_df.index]
provider_to_location_df_renamed = provider_to_location_df.rename(columns={'idlocation':'location_id', 'idporg':'provider_to_organization_id'})

## Load the Data

In [None]:
schema_name = 'npd'
load = True

# load npi
#show_or_load(npi_df_renamed, 'npi', schema_name, engine, load)

# load individual
#show_or_load(practitioner_df_renamed[['id', 'sex']], 'individual', schema_name, engine, load)
practitioner_df_renamed_renamed = practitioner_df_renamed.rename(columns={'id':'individual_id'})

# load individual_to_name
#show_or_load(practitioner_df_renamed_renamed[['individual_id', 'first_name', 'middle_name', 'last_name', 'prefix', 'suffix', 'name_use_id']], 'individual_to_name', schema_name, engine, load)

# load provider
#show_or_load(practitioner_df_renamed_renamed.merge(npi_df_renamed, on = 'npi', how='inner')[['npi', 'individual_id']], 'provider', schema_name, engine, load)

# load organization
#show_or_load(other_organization_df[['id']], 'organization', schema_name, engine, load)
other_organization_df.set_index('id', drop=False, inplace=True)
if load:
    print('adding parent_id to organization')
    #upsert(df = other_organization_df[['parent_id']], con = engine, schema = schema_name, if_row_exists='update', table_name = 'organization')
#show_or_load(clinical_organization_df_renamed[['id']], 'organization', schema_name, engine, load)
clinical_organization_df_renamed.set_index('id', drop=False, inplace=True)
if load:
    print('adding parent_id to clinical_organization organizations')
    #upsert(df = clinical_organization_df_renamed[['parent_id']], con = engine, schema = schema_name, if_row_exists='update', table_name = 'organization')

other_organization_df_renamed = other_organization_df.rename(columns={'id':'organization_id', 'organization_name':'name'})
clinical_organization_df_renamed_renamed = clinical_organization_df_renamed.rename(columns={'id':'organization_id'})

# load organization_to_name

#show_or_load(other_organization_df_renamed[['organization_id', 'name', 'is_primary']], 'organization_to_name', schema_name, engine, load)
#show_or_load(clinical_organization_df_renamed_renamed[['organization_id', 'name', 'is_primary']], 'organization_to_name', schema_name, engine, load)

# load clinical_organization
#show_or_load(clinical_organization_df_renamed_renamed[['organization_id', 'npi']], 'clinical_organization', schema_name, engine, load)

# load ehr_vendor
#show_or_load(ehr_vendor_df_renamed[['id', 'name']], 'ehr_vendor', schema_name, engine, load)

# load endpoint_instance
#show_or_load(endpoint_df_renamed[['id', 'ehr_vendor_id', 'address', 'endpoint_connection_type_id', 'environment_type_id']], 'endpoint_instance', schema_name, engine, load)

# load address_us
#show_or_load(address_df_renamed[['address_us_id', 'delivery_line_1','city_name','state_code','zipcode']].rename(columns={'address_us_id':'id'}), 'address_us', schema_name, engine, load)

# load address
#show_or_load(address_df_renamed[['id', 'address_us_id']], 'address', schema_name, engine, load)

# load individual_to_address
show_or_load(individual_to_address_df, 'individual_to_address', schema_name, engine, load)

# load organization_to_address
show_or_load(location_df[['address_id','organization_id', 'address_use_id']], 'organization_to_address', schema_name, engine, load)

# load location
show_or_load(location_df[['id','address_id','organization_id']], 'location', schema_name, engine, load)

# load location_to_endpoint_instance
show_or_load(location_to_endpoint_df, 'location_to_endpoint_instance', schema_name, engine, load)

# load provider_to_organization
show_or_load(provider_to_org_df_renamed[['individual_id', 'organization_id', 'relationship_type_id','id']], 'provider_to_organization', schema_name, engine, load)

# load provider_to_location
show_or_load(provider_to_location_df_renamed[['location_id', 'provider_to_organization_id', 'id']], 'provider_to_location', schema_name, engine, load)

# load provider_to_taxonomy
show_or_load(dedup_taxonomy_df, 'provider_to_taxonomy', schema_name, engine, load)

# load provider_to_credential
###show_or_load(credential_df_renamed[['license_number', 'state_code', 'provider_to_taxonomy_id']], 'provider_to_credential', schema_name, engine, load)

Loading npi
Loading individual
Loading individual_to_name
Loading provider
Loading organization
adding parent_id to organization
Loading organization
adding parent_id to clinical_organization organizations
Loading organization_to_name
Loading organization_to_name
Loading clinical_organization
Loading ehr_vendor
Loading endpoint_instance
Loading address_us
Loading address
Loading individual_to_address


PendingRollbackError: Can't reconnect until invalid transaction is rolled back.  Please rollback() fully before proceeding (Background on this error at: https://sqlalche.me/e/20/8s2b)