###Syed Sharjeelullah
February 28th, 2022

#####Please note that I have made a few assumptions here based on my understanding on the question. I have created the staging and production database environments on my local Azure cloud account, and have dropped the 2 csv files in an Azure Storage blob container in my personal Azure Storage Account.

Assumptions:
1. Staging and production environments are in Azure cloud.(I can provide the credentials for testing this code)
2. The csv files were dropped in an Azure Storage Blob. (credentials to which will be provided)
3. The final data views are stored in tables 'unique_providers' and 'unique_clinicians' in the production database in Azure.
4. It is assumed that the input file format would be exactly the same for this ETL process.
5. There are obvious improvements in this pipeline, but the focus was to implement a feasible ETL pipeline.
6. Please fill out line 16-31 with the accurate credentials supplied to you in an encrypted file.
7. Hardcoding credentials is a bad practice, and this was only done for demonstration purposes.

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import row_number,lit
from pyspark.sql.window import Window
import datetime, time
from pyspark.sql.types import IntegerType,BooleanType,DateType,StringType
from pyspark.sql.functions import lit,unix_timestamp,to_date,to_timestamp,from_unixtime
from pyspark.sql.functions import substring_index

#SparkSession Initialization
session = SparkSession.builder.getOrCreate()

#Connection variables for the staging environment which is a Postgresql database hosted in the Azure environment
#Please note for the purposes of this excercise the credentials are hardcoded here as these databases are hosted in my personal azure account
#However, it is recommended to never hardcode credentials as a general practice, and encryption would be required for HIPAA complaince
driver = "org.postgresql.Driver"
url = ""
table_c = "public.clinicians"
table_p = "public.providers"
user = ""
password = ""

#Connection variables for the production environment which is a Postgresql database hosted in the Azure environment
driver_p = "org.postgresql.Driver"
url_p = ""
table_c1 = "prod.clinicians_mart"
table_p1 = "prod.providers_mart"
table_c2 = "prod.unique_clinicians"
table_p2 = "prod.unique_providers"
user_p = ""
password_p = ""

def fetch_data(session):
  try:
    #Establishing connection to Azure storage blob
    session.conf.set(
    "fs.azure.sas.files.blob.core.windows.net",
    "sp=r&st=2022-02-26T23:31:30Z&se=2022-02-27T07:31:30Z&spr=https&sv=2020-08-04&sr=c&sig=CiblBjHXmj7dJizB69IMWhRaoWhAoGBMljo8XFDpe6o%3D"
  )
    #Reading files into dataframes
    providers = session.read.csv(
    "wasb://files@clinicianfiles.blob.core.windows.net/providers.csv", header = True
  )
    clinician_data = session.read.csv(
    "wasb://files@clinicianfiles.blob.core.windows.net/clinician data.csv", header = True
  )
    flag = True
  except Exception as e:
    print("Error connecting to Azure storage or in reading files from Azure Storage")
    
  return providers, clinician_data, flag

def staging_data(providers, clinician_data):
  
  #Timestamp to be stored to track the rows inserted
  ct = datetime.datetime.now()
  timestamp = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
  
  #Preparing clinician dataframe to be stored into the staging envrionment
  #Creating and adding columns like 'row_inserted_at', 'row_updated-at' and 'status' to track when the rows were inserted or updated and their status of whether they processed by the ETL pipeline or not
  clinician_data = clinician_data.select("ID", "provider first", 'provider last','email address','Sex','Care Center Name','primary language', 'NPI', 'title', 'updated_date')
  clinician_data = clinician_data.withColumnRenamed('ID', 'id').withColumnRenamed("provider first", 'first_name').withColumnRenamed("provider last", 'last_name').withColumnRenamed('email address','email').withColumnRenamed('Care Center Name','care_center').withColumnRenamed('Sex','gender').withColumnRenamed('primary language','languages')
  clinician_data = clinician_data.withColumn('row_inserted_at',unix_timestamp(lit(timestamp),'yyyy-MM-dd HH:mm:ss').cast("timestamp")).withColumn('row_updated_at',unix_timestamp(lit(timestamp),'yyyy-MM-dd HH:mm:ss').cast("timestamp")).withColumn("status", lit('Ready for Processing')).withColumn("id", clinician_data.id.cast('int'))
  
  #Preparing providers dataframe to be stored into the staging envrionment
  #Creating and adding columns like 'row_inserted_at', 'row_updated-at' and 'status' to track when the rows were inserted or updated and their status of whether they processed by the ETL pipeline or not
  providers = providers.select("id", "first name", 'last_name','email','gender','Care Center','languages', 'NPI', 'title')
  providers = providers.withColumnRenamed('first name', 'first_name').withColumnRenamed('Care Center','care_center')
  providers = providers.withColumn("row_inserted_at", unix_timestamp(lit(timestamp),'yyyy-MM-dd HH:mm:ss').cast("timestamp")).withColumn("row_updated_at", unix_timestamp(lit(timestamp),'yyyy-MM-dd HH:mm:ss').cast("timestamp")).withColumn("status", lit('Ready for Processing')).withColumn("id", providers.id.cast('int'))
  
  return providers, clinician_data

def load_to_staging(providers, clinicians_data):

  try:
    #Writing out dataframes to the staging environment which contains already created tables for these two schemas
    clinician_data.select('id', 'first_name', 'last_name','email','gender','care_center','languages', 'NPI', 'title', 'updated_date','row_inserted_at','row_updated_at', 'status').write.format("jdbc")\
    .option("url", url) \
    .option("driver", driver).option("dbtable", table_c) \
    .option("user", user).option("password", password) \
    .mode("append") \
    .save()
    
    providers.select('id', 'first_name', 'last_name','email','gender','care_center','languages', 'NPI', 'title','row_inserted_at','row_updated_at', 'status').write.format("jdbc")\
    .option("url", url) \
    .option("driver", driver).option("dbtable", table_p) \
    .option("user", user).option("password", password) \
    .mode("append") \
    .save()
    
    return_code = 'SUCCESS'
    
  except Exception as e:
      output = f"{e}"
      return_code = 'FAIL'
      
  return return_code

#Reading from the staging database and only fetching unprocessed rows, to be prepared to be transformed and loaded into the data mart
def read_for_processing():
  
  try:
    provider_df = spark.read.format("jdbc").option("url", url) \
    .option("driver", driver).option("dbtable", table_p) \
    .option("user", user).option("password", password).load()
    
    provider_df.createOrReplaceTempView('providers')
    provider_df = spark.sql("SELECT * FROM providers WHERE status = 'Ready for Processing'")

    #Reading from the staging database and only fetching rows that were not processed before
    clinician_df = spark.read.format("jdbc").option("url", url) \
    .option("driver", driver).option("dbtable", table_c) \
    .option("user", user).option("password", password).load()

    clinician_df.createOrReplaceTempView('clinicians')
    clinician_df = spark.sql("SELECT * FROM clinicians WHERE status = 'Ready for Processing'")
    return_bool = True
  except Exception as e:
    output = f"{e}"
    return_code = 'FAILED TO READ DATA FROM STAGING'
    print(return_code)
    return_bool = False
     
  return provider_df, clinician_df, return_bool

def prep_for_datamart(provider_df, clinician_df):  
  
  ct = datetime.datetime.now()
  timestamp = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')  
  
  #Provider data being prepped to be loaded into datamart
  #Only including NPI digits after the hyphen '-'
  #Only including Unique NPI's
  cleaned_provider_df = provider_df.withColumn("NPI", substring_index(provider_df["NPI"], '-', -1).alias('right')) #right of delim
  cleaned_provider_df = cleaned_provider_df.dropDuplicates(["NPI"])
  cleaned_provider_df = cleaned_provider_df.withColumn('row_inserted_at',unix_timestamp(lit(timestamp),'yyyy-MM-dd HH:mm:ss').cast("timestamp")).withColumn('row_updated_at',unix_timestamp(lit(timestamp),'yyyy-MM-dd HH:mm:ss').cast("timestamp"))
  cleaned_provider_df = cleaned_provider_df.select('first_name','last_name','email','gender','care_center','languages','NPI','title','row_inserted_at','row_updated_at','provider_staging_id')  
  
  #Clinician data being prepped to be loaded into datamart
  #Only including NPI digits after the hyphen '-'
  #Only including Unique NPI's
  cleaned_clinician_df = clinician_df.withColumn("NPI", substring_index(clinician_df["NPI"], '-', -1).alias('right')) #right of delim
  cleaned_clinician_df = cleaned_clinician_df.dropDuplicates(["NPI"])
  cleaned_clinician_df = cleaned_clinician_df.withColumn('row_inserted_at',unix_timestamp(lit(timestamp),'yyyy-MM-dd HH:mm:ss').cast("timestamp")).withColumn('row_updated_at',unix_timestamp(lit(timestamp),'yyyy-MM-dd HH:mm:ss').cast("timestamp"))
  cleaned_clinician_df = cleaned_clinician_df.select('first_name','last_name','email','gender','care_center','languages','NPI','title','updated_date','row_inserted_at','row_updated_at','clinician_staging_id')
  
  try:
    cleaned_provider_df.select('first_name','last_name','email','gender','care_center','languages','NPI','title','row_inserted_at','row_updated_at','provider_staging_id')\
    .write.format("jdbc")\
          .option("url", url_p) \
          .option("driver", driver_p).option("dbtable", table_p1) \
          .option("user", user_p).option("password", password_p) \
          .mode("append") \
          .save()
    
    cleaned_clinician_df.select('first_name','last_name','email','gender','care_center','languages','NPI','title','updated_date','row_inserted_at',
                                'row_updated_at','clinician_staging_id').write.format("jdbc")\
    .option("url", url_p) \
    .option("driver", driver_p).option("dbtable", table_c1) \
    .option("user", user_p).option("password", password_p) \
    .mode("append") \
    .save()
    return_code = 'LOADED DATA IN THE MART SUCCESSFULLY'
  except Exception as e:
    output = f"{e}"
    return_code = 'FAILED TO READ DATA FROM STAGING'
  
  return return_code

#This function would store the transformed dataframes into tables 'unique-provider' and 'unique-clinicians', with unique NPI's and titles as the data points as given in the assignment
def storing_transformed_views():
  
  ct = datetime.datetime.now()
  timestamp = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')  
  
  #Reading from the production datamart
  try:
    final_providers_df = spark.read.format("jdbc").option("url", url_p) \
    .option("driver", driver_p).option("dbtable", table_p1) \
    .option("user", user_p).option("password", password_p).load()

    final_clinician_df = spark.read.format("jdbc").option("url", url_p) \
      .option("driver", driver_p).option("dbtable", table_c1) \
      .option("user", user_p).option("password", password_p).load()
      
    bool1 = True
  except Exception as e:
    msg = 'FAILED TO READ DATA FROM PRODUCTION'
    print(msg)
  
  try:
    #Now creating a new dataframe with field ['provider_internal_id', 'NPI', 'title', 'created_at']
    #This would be stored in tables unique_clinicians and unique_providers
    final_providers_df.createOrReplaceTempView('providermart')
    final_providers_df = spark.sql("SELECT provider_internal_id, NPI, title FROM providermart")
    final_providers_df = final_providers_df.withColumn('created_at', unix_timestamp(lit(timestamp),'yyyy-MM-dd HH:mm:ss').cast("timestamp"))

    final_providers_df.select('*').write.format("jdbc")\
        .option("url", url_p) \
        .option("driver", driver_p).option("dbtable", table_p2) \
        .option("user", user_p).option("password", password_p) \
        .mode("append") \
        .save()

    final_clinician_df.createOrReplaceTempView('clinicianmart')
    final_clinician_df = spark.sql("SELECT clinician_internal_id, NPI, title FROM clinicianmart")
    final_clinician_df = final_clinician_df.withColumn('created_at', unix_timestamp(lit(timestamp),'yyyy-MM-dd HH:mm:ss').cast("timestamp"))

    final_clinician_df.select('*').write.format("jdbc")\
        .option("url", url_p) \
        .option("driver", driver_p).option("dbtable", table_c2) \
        .option("user", user_p).option("password", password_p) \
        .mode("append") \
        .save()
    bool2 = True
  except:
    msg = "Writing to production tables 'unique_providers' and/or 'unique_clinicians' failed"
    print(msg)
    
  if bool1 and bool2 == True:
    msg = "Complete Success"
    
  return msg

#The main function or program handler  
def main():
  
  start = time.time()
  log = ''
  
  try:
    providers, clinician_data, flag = fetch_data(session)
    print("Step 1 Successful")
    log = log + "Step 1 Successful" + ", "
  except:
    print("Step 1 Failed")
    log = log + "Step 1 Failed" + ", "

  try:  
    if flag == True:
      provider, clinician_data = staging_data(providers,clinician_data)
      print("Step 2 Successful")
      log = log + "Step 2 Successful" + ", "
  except:
    print("Step 2 Failed")
    log = log + "Step 2 Failed" + ", "
  
  try:
    if load_to_staging(provider, clinician_data) == 'SUCCESS':
      provider_df, clinician_df, readbool = read_for_processing()
      print("Step 3 Successful") 
      log = log + "Step 3 Successful" + ", "
    else:
      print("Load to Staging Failed")
      log = log + "Load to Staging Failed" + ", "
  except:
    print("Step 3 Failed")  
    log = log + "Step 3 Failed" + ", "
    
  try:
    if prep_for_datamart(provider_df, clinician_df) == 'LOADED DATA IN THE MART SUCCESSFULLY':
      print("Step 4 Successful")
      log = log + "Step 4 Successful" + ", "
      step4 = True
  except:
    print("Step 4 Failed")
    log = log + "Step 4 Failed" + ", "
    step4 = False
    
  if step4 == True:
    if storing_transformed_views() == "Complete Success":
        print("Step 5 Successful")
        print("The ETL pipeline ran successfully")
        log = log + "Step 5 Successful"
  else:
    print("Step 5 Failed")
    log = log + "Step 5 Failed"

  print('Runtime: ', time.time() - start, ' Seconds')
  
  return log

if __name__ == "__main__": 
  print(main())

In [0]:
import pytest
  
def test_main():
  logs = main()
  assert 'Failed' not in logs, "Test Failed, one or more of the steps Failed in main()"
  assert logs is not None, "Test Failed, logs did not populate"
  print("Test Successful: main test passed")
  
def test_Azure_connectivity():
  providers, clinicians, flag = fetch_data(session)
  assert flag == True, "Test failed: could not load data from Azure Storage"
  print("Test Successful: Azure Storage Connectivity Successful")
  
def test_storing_transformed_views():
  assert storing_transformed_views() == "Complete Success", "Test Failed: transfored views were not stored back in the prod db"
  print("Test Successful: test_storing_transformed_views Successful")
  
def test_prep_for_datamart():
  providers, clinicians, flag = read_for_processing()
  if flag == True:
    assert prep_for_datamart(providers, clinicians) == 'LOADED DATA IN THE MART SUCCESSFULLY', "Test Failed: dataframes were not stored successfuly in the mart tables"
    print("Test Successful: test_prep_for_datamart successful")
  
def test_read_for_processing():
    p,c, readflag = read_for_processing()
    assert readflag == True, "Test Failed: Was not able to successfully retrieve unprocessed rows from staging"
    print("Test Successful: Unprocessed rows retreived successfully from staging")

In [0]:
#test_Azure_connectivity()
# test_main()
# test_storing_transformed_views()
# test_read_for_processing()
#test_prep_for_datamart()