## Import Required Library and Packages.

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, coalesce
# Create a SparkSession
spark = SparkSession.builder \
    .appName("Load CSV") \
    .getOrCreate()


spark.sparkContext.setLogLevel("ERROR")

aws_access_key = dbutils.secrets.get(scope="aws-secrets", key="aws-access-key")
aws_secret_key = dbutils.secrets.get(scope="aws-secrets", key="aws-secret-key")

spark.conf.set("fs.s3a.access.key", aws_access_key)
spark.conf.set("fs.s3a.secret.key", aws_secret_key)
spark.conf.set("fs.s3a.endpoint", "s3.amazonaws.com")

s3_loc = dbutils.widgets.get("s3_loc")


load_type = dbutils.widgets.get("s3_loc")

## Load data into databricks from s3.

In [0]:
nppes_data_renamed = spark.read.option("header", "true").csv(
    f"s3://nppes-npi-directory/{s3_loc}/npidata_pfile_{s3_loc}.csv", 
    header='True'
)

for col in nppes_data_renamed.columns:
    remove_bracket_col = col.split('(')[0].strip()
    new_col = remove_bracket_col.replace(" ", "_")
    nppes_data_renamed = nppes_data_renamed.withColumnRenamed(col, new_col)




affiliations = spark.read.option("header", "true").csv(
    f"s3://nppes-npi-directory/{s3_loc}/endpoint_pfile_{s3_loc}.csv", 
    header='True'
)

for col in affiliations.columns:
    remove_bracket_col = col.split('(')[0].strip()
    new_col = remove_bracket_col.replace(" ", "_")
    affiliations = affiliations.withColumnRenamed(col, new_col)




taxonomy_codes = spark.read.option("header", "true").csv(
    "s3://nppes-npi-directory/nucc_taxonomy_250.csv", 
    header='True'
)

for col in taxonomy_codes.columns:
    remove_bracket_col = col.split('(')[0].strip()
    new_col = remove_bracket_col.replace(" ", "_")
    taxonomy_codes = taxonomy_codes.withColumnRenamed(col, new_col)

## Add Specialities to data

In [0]:
taxonomy_codes = taxonomy_codes.withColumnRenamed('NPI', 'Tax_NPI')
taxonomy_cols = [f'Healthcare_Provider_Taxonomy_Code_{i}' for i in range(1,16)]

nppes_data_coalesced = nppes_data_renamed.withColumn(
    'Taxonomy_Code', 
    coalesce(*[nppes_data_renamed[col] for col in taxonomy_cols])
)


npi_specialties_assigned = nppes_data_coalesced.join(
    taxonomy_codes, 
    on=nppes_data_coalesced['Taxonomy_Code'] == taxonomy_codes['Code'], 
    how='left'
)


## Add affiliations data to specialties. 

In [0]:
affiliations = affiliations.withColumnRenamed('NPI', 'Affiliated_NPI')

available_affiliations = affiliations.filter(affiliations.Affiliation == "Y").select(
    ['Affiliated_NPI', 
    'Affiliation_Legal_Business_Name', 
    'Affiliation_Address_Line_One', 
    'Affiliation_Address_Line_Two',
    'Affiliation_Address_City',
    'Affiliation_Address_State',
    'Affiliation_Address_Country',
    'Affiliation_Address_Postal_Code']
)

npi_affiliations_added = npi_specialties_assigned.join(
    available_affiliations,
    npi_specialties_assigned['NPI'] == available_affiliations['Affiliated_NPI'],
    how = 'left'
)





## Remove deactivated NPIs from data. 

In [0]:
reactivated_npis = npi_affiliations_added.filter(
    npi_affiliations_added["NPI_Reactivation_Date"] > npi_affiliations_added["NPI_Deactivation_Date"]
    )

no_deactivated_npis = npi_affiliations_added.filter(
    npi_affiliations_added['NPI_Deactivation_Date'].isNull()
)

activated_npis = no_deactivated_npis.union(reactivated_npis)

## Select relevant columns. 

In [0]:
rel_columns = [
    'NPI',
    'Entity_Type_Code',
    'Provider_Organization_Name',
    'Provider_Last_Name',
    'Provider_First_Name',
    'Provider_Middle_Name',
    'Provider_First_Line_Business_Mailing_Address',
    'Provider_Second_Line_Business_Mailing_Address',
    'Provider_Business_Mailing_Address_City_Name',
    'Provider_Business_Mailing_Address_State_Name',
    'Provider_Business_Mailing_Address_Postal_Code',
    'Provider_Business_Mailing_Address_Country_Code',
    'Provider_Business_Mailing_Address_Telephone_Number',
    'Provider_Business_Mailing_Address_Fax_Number',
    'Provider_First_Line_Business_Practice_Location_Address',
    'Provider_Second_Line_Business_Practice_Location_Address',
    'Provider_Business_Practice_Location_Address_City_Name',
    'Provider_Business_Practice_Location_Address_State_Name',
    'Provider_Business_Practice_Location_Address_Postal_Code',
    'Provider_Business_Practice_Location_Address_Country_Code',
    'Provider_Business_Practice_Location_Address_Telephone_Number',
    'Provider_Business_Practice_Location_Address_Fax_Number',
    'Provider_Gender_Code',
    'Grouping',
    'Classification',
    'Specialization',
    'Affiliation_Legal_Business_Name',
    'Affiliation_Address_Line_One',
    'Affiliation_Address_Line_Two',
    'Affiliation_Address_City',
    'Affiliation_Address_State',
    'Affiliation_Address_Country',
    'Affiliation_Address_Postal_Code'
]

npis = activated_npis.select(rel_columns)

## Load data based on load type. 

In [0]:
if load_type == 'full':
  npis.write.mode("overwrite").saveAsTable("dbdemo.default.npi_reference_nppes")

elif load_type == 'incremental':
  existing_npis = spark.read.table("dbdemo.default.npi_reference_nppes")

  new_npis_lst = [i[0] for i in npis.select('NPI').distinct().collect()]
  old_npis = existing_npis.filter(~existing_npis["NPI"].isin(new_npis_lst))
  final_npi_data = npis.union(old_npis)
  final_npi_data.write.mode("overwrite").saveAsTable('npi_reference_nppes')