In [0]:
# Define the storage account name and container
storage_account_name = "healthcarercmra"
client_id = "55cb5f89-1a5c-41b2-a286-947e13e78c78"
tenant_id = "e1dd8e8f-9203-44c7-b497-48a69721f03b"
client_secret = "ZAC8Q~0J3U3H08m2P5Y~yYzHOdFDkd8W2IsOwbU-"

# Set up the configuration for the service principal
configs = {
    "fs.azure.account.auth.type": "OAuth",
    "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
    "fs.azure.account.oauth2.client.id": client_id,
    "fs.azure.account.oauth2.client.secret": client_secret,
    "fs.azure.account.oauth2.client.endpoint": f"https://login.microsoftonline.com/{tenant_id}/oauth2/token"
}

# Apply the configuration
for key, value in configs.items():
    spark.conf.set(key, value)

# Define the path to the container
bronze_path = f"abfss://bronze@{storage_account_name}.dfs.core.windows.net/"
print(bronze_path)

landing_path = f"abfss://landing@{storage_account_name}.dfs.core.windows.net/"
print(landing_path)

In [0]:
# Databricks notebook source
# Reading Hospital A patient data 
claims_df=spark.read.parquet("abfss://bronze@healthcarercmra.dfs.core.windows.net/claims_pq/")
claims_df.display()
claims_df.createOrReplaceTempView("claims")

In [0]:
%sql
-- Enrich data with computed columns and quarantine flag
CREATE OR REPLACE TEMP VIEW quality_checks AS
SELECT 
  CONCAT(ClaimID, '-', datasource) AS ClaimID,
  ClaimID AS SRC_ClaimID,
  TransactionID,
  PatientID,
  EncounterID,
  ProviderID,
  DeptID,
  CAST(ServiceDate AS DATE) AS ServiceDate,
  CAST(ClaimDate AS DATE) AS ClaimDate,
  PayorID,
  ClaimAmount,
  PaidAmount,
  ClaimStatus,
  PayorType,
  Deductible,
  Coinsurance,
  Copay,
  CAST(InsertDate AS DATE) AS SRC_InsertDate,
  CAST(ModifiedDate AS DATE) AS SRC_ModifiedDate,
  datasource,
  CASE 
      WHEN ClaimID IS NULL OR TransactionID IS NULL OR PatientID IS NULL OR ServiceDate IS NULL THEN TRUE
      ELSE FALSE
  END AS is_quarantined
FROM claims;

In [0]:
%sql
-- Create Silver Delta Table for SCD2 Tracking

CREATE TABLE IF NOT EXISTS silver.claims (
  ClaimID STRING,
  SRC_ClaimID STRING,
  TransactionID STRING,
  PatientID STRING,
  EncounterID STRING,
  ProviderID STRING,
  DeptID STRING,
  ServiceDate DATE,
  ClaimDate DATE,
  PayorID STRING,
  ClaimAmount STRING,
  PaidAmount STRING,
  ClaimStatus STRING,
  PayorType STRING,
  Deductible STRING,
  Coinsurance STRING,
  Copay STRING,
  SRC_InsertDate DATE,
  SRC_ModifiedDate DATE,
  datasource STRING,
  is_quarantined BOOLEAN,
  audit_insertdate TIMESTAMP,
  audit_modifieddate TIMESTAMP,
  is_current BOOLEAN
)
USING DELTA;

In [0]:
%sql
-- SCD Type 2 - Update Old Records if Data Changed
MERGE INTO silver.claims AS target
USING quality_checks AS source
ON target.ClaimID = source.ClaimID AND target.is_current = true

WHEN MATCHED AND (
  target.SRC_ClaimID        != source.SRC_ClaimID OR
  target.TransactionID      != source.TransactionID OR
  target.PatientID          != source.PatientID OR
  target.EncounterID        != source.EncounterID OR
  target.ProviderID         != source.ProviderID OR
  target.DeptID             != source.DeptID OR
  target.ServiceDate        != source.ServiceDate OR
  target.ClaimDate          != source.ClaimDate OR
  target.PayorID            != source.PayorID OR
  target.ClaimAmount        != source.ClaimAmount OR
  target.PaidAmount         != source.PaidAmount OR
  target.ClaimStatus        != source.ClaimStatus OR
  target.PayorType          != source.PayorType OR
  target.Deductible         != source.Deductible OR
  target.Coinsurance        != source.Coinsurance OR
  target.Copay              != source.Copay OR
  target.SRC_InsertDate     != source.SRC_InsertDate OR
  target.SRC_ModifiedDate   != source.SRC_ModifiedDate OR
  target.datasource         != source.datasource OR
  target.is_quarantined     != source.is_quarantined
)
THEN UPDATE SET
  target.is_current = false,
  target.audit_modifieddate = current_timestamp();

In [0]:
%sql
-- Step 6: SCD Type 2 - Insert New or Changed Records

MERGE INTO silver.claims AS target
USING quality_checks AS source
ON target.ClaimID = source.ClaimID AND target.is_current = true

WHEN NOT MATCHED THEN
INSERT (
  ClaimID,
  SRC_ClaimID,
  TransactionID,
  PatientID,
  EncounterID,
  ProviderID,
  DeptID,
  ServiceDate,
  ClaimDate,
  PayorID,
  ClaimAmount,
  PaidAmount,
  ClaimStatus,
  PayorType,
  Deductible,
  Coinsurance,
  Copay,
  SRC_InsertDate,
  SRC_ModifiedDate,
  datasource,
  is_quarantined,
  audit_insertdate,
  audit_modifieddate,
  is_current
)
VALUES (
  source.ClaimID,
  source.SRC_ClaimID,
  source.TransactionID,
  source.PatientID,
  source.EncounterID,
  source.ProviderID,
  source.DeptID,
  source.ServiceDate,
  source.ClaimDate,
  source.PayorID,
  source.ClaimAmount,
  source.PaidAmount,
  source.ClaimStatus,
  source.PayorType,
  source.Deductible,
  source.Coinsurance,
  source.Copay,
  source.SRC_InsertDate,
  source.SRC_ModifiedDate,
  source.datasource,
  source.is_quarantined,
  current_timestamp(),
  current_timestamp(),
  true
);


In [0]:
%sql
SELECT * FROM silver.claims limit 4;