In [0]:
# service principal for integrating with ADLS and access it's data

spark.conf.set("fs.azure.account.auth.type.hpadlsacc.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.hpadlsacc.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.hpadlsacc.dfs.core.windows.net", dbutils.secrets.get("hc-secret-scope", "app-key"))
spark.conf.set("fs.azure.account.oauth2.client.secret.hpadlsacc.dfs.core.windows.net", dbutils.secrets.get("hc-secret-scope", "service-cred"))
tenant_id = dbutils.secrets.get("hc-secret-scope", "dir-id")
spark.conf.set("fs.azure.account.oauth2.client.endpoint.hpadlsacc.dfs.core.windows.net", f"https://login.microsoftonline.com/{tenant_id}/oauth2/token")

# Creating transaction table in the silver layer

In [0]:
# importing the necessary functions
from pyspark.sql import SparkSession, functions as f

# defining the file path
src_hosa = "abfss://bronze@hpadlsacc.dfs.core.windows.net/hos-a/transactions"
src_hosb = "abfss://bronze@hpadlsacc.dfs.core.windows.net/hos-b/transactions"

#Reading Hospital A departments data 
df_hosa=spark.read.parquet(src_hosa)

#Reading Hospital B departments data 
df_hosb=spark.read.parquet(src_hosb)

#union two departments dataframes
df_merged = df_hosa.unionByName(df_hosb)
display(df_merged)

# temp view for sql operations
df_merged.createOrReplaceTempView("transactions")

In [0]:
%sql
-- creating the temp view for quality check
-- adding a new column 'is_quarantined' to check for null values
-- if any primary/important column null then the particular record should be quarantined(True)
-- else Not(False)

CREATE OR REPLACE TEMP VIEW quality_checks AS
SELECT 
concat(TransactionID,'-',datasource) as TransactionID,
TransactionID as SRC_TransactionID,
EncounterID,
PatientID,
ProviderID,
DeptID,
VisitDate,
ServiceDate,
PaidDate,
VisitType,
Amount,
AmountType,
PaidAmount,
ClaimID,
PayorID,
ProcedureCode,
ICDCode,
LineOfBusiness,
MedicaidID,
MedicareID,
InsertDate as SRC_InsertDate,
ModifiedDate as SRC_ModifiedDate,
datasource,
    CASE 
        WHEN EncounterID IS NULL OR PatientID IS NULL OR TransactionID IS NULL OR VisitDate IS NULL THEN TRUE
        ELSE FALSE
    END AS is_quarantined
FROM transactions;

In [0]:
%sql
-- displaying some records

select * from quality_checks;

In [0]:
%sql
-- creating a external table for silver layer
CREATE TABLE IF NOT EXISTS silver.transactions (
  TransactionID string,
  SRC_TransactionID string,
  EncounterID string,
  PatientID string,
  ProviderID string,
  DeptID string,
  VisitDate date,
  ServiceDate date,
  PaidDate date,
  VisitType string,
  Amount double,
  AmountType string,
  PaidAmount double,
  ClaimID string,
  PayorID string,
  ProcedureCode integer,
  ICDCode string,
  LineOfBusiness string,
  MedicaidID string,
  MedicareID string,
  SRC_InsertDate date,
  SRC_ModifiedDate date,
  datasource string,
  is_quarantined boolean,
  audit_insertdate timestamp,
  audit_modifieddate timestamp,
  is_current boolean
)
USING DELTA
LOCATION "abfss://silver@hpadlsacc.dfs.core.windows.net/transactions";


In [0]:
%sql
-- Step 1: based on condition TransactionID should be simliar and is_current should be true (i.e currently that specific record is active)
-- Mark that existing records as historical (is_current = false) for patients that will be updated
-- target.is_current = false,
-- target.modified_date = current_timestamp()

MERGE INTO silver.transactions AS target 
USING quality_checks AS source 
ON target.TransactionID = source.TransactionID
AND target.is_current = true
WHEN MATCHED
AND (
  target.SRC_TransactionID != source.SRC_TransactionID
  OR target.EncounterID != source.EncounterID
  OR target.PatientID != source.PatientID
  OR target.ProviderID != source.ProviderID
  OR target.DeptID != source.DeptID
  OR target.VisitDate != source.VisitDate
  OR target.ServiceDate != source.ServiceDate
  OR target.PaidDate != source.PaidDate
  OR target.VisitType != source.VisitType
  OR target.Amount != source.Amount
  OR target.AmountType != source.AmountType
  OR target.PaidAmount != source.PaidAmount
  OR target.ClaimID != source.ClaimID
  OR target.PayorID != source.PayorID
  OR target.ProcedureCode != source.ProcedureCode
  OR target.ICDCode != source.ICDCode
  OR target.LineOfBusiness != source.LineOfBusiness
  OR target.MedicaidID != source.MedicaidID
  OR target.MedicareID != source.MedicareID
  OR target.SRC_InsertDate != source.SRC_InsertDate
  OR target.SRC_ModifiedDate != source.SRC_ModifiedDate
  OR target.datasource != source.datasource
  OR target.is_quarantined != source.is_quarantined
) 
THEN UPDATE
SET
  target.is_current = false,
  target.audit_modifieddate = current_timestamp()


In [0]:
%sql
-- Step 2: Insert new and updated records into the Delta table, marking them as current
-- that is old record is updated with new records
-- based on condition TransactionID should be simliar and is_current should be true (i.e currently that specific record is active)
-- because the condition will not satisfy
-- inserting the new records which are not present in the silver table and updating the old records

MERGE INTO silver.transactions AS target
USING quality_checks AS source
ON target.TransactionID = source.TransactionID
AND target.is_current = true
WHEN NOT MATCHED THEN
INSERT (
  TransactionID,
  SRC_TransactionID,
  EncounterID,
  PatientID,
  ProviderID,
  DeptID,
  VisitDate,
  ServiceDate,
  PaidDate,
  VisitType,
  Amount,
  AmountType,
  PaidAmount,
  ClaimID,
  PayorID,
  ProcedureCode,
  ICDCode,
  LineOfBusiness,
  MedicaidID,
  MedicareID,
  SRC_InsertDate,
  SRC_ModifiedDate,
  datasource,
  is_quarantined,
  audit_insertdate,
  audit_modifieddate,
  is_current
)
VALUES (
  source.TransactionID,
  source.SRC_TransactionID,
  source.EncounterID,
  source.PatientID,
  source.ProviderID,
  source.DeptID,
  source.VisitDate,
  source.ServiceDate,
  source.PaidDate,
  source.VisitType,
  source.Amount,
  source.AmountType,
  source.PaidAmount,
  source.ClaimID,
  source.PayorID,
  source.ProcedureCode,
  source.ICDCode,
  source.LineOfBusiness,
  source.MedicaidID,
  source.MedicareID,
  source.SRC_InsertDate,
  source.SRC_ModifiedDate,
  source.datasource,
  source.is_quarantined,
  current_timestamp(),
  current_timestamp(),
  true
);


In [0]:
%sql
-- displaying the records

select * from silver.transactions
where datasource = "hos-a";

In [0]:
%sql
SHOW COLUMNS IN silver.transactions;


In [0]:
%sql
-- drop table silver.transactions