In [0]:
# Read the Parquet data from the bronze layer
bronze_claims_df = spark.read.format("parquet").load("/mnt/rcmabhi/bronze/claims/")

# Create a temporary view to query the data using SQL
bronze_claims_df.createOrReplaceTempView("bronze_claims")

In [0]:
%sql
CREATE OR REPLACE TEMP VIEW quality_checks AS
SELECT
  CONCAT(ClaimID, '-', datasource) AS UniqueClaimID,
  ClaimID AS SRC_ClaimID,
  TransactionID,
  PatientID,
  EncounterID,
  ProviderID,
  DeptID,
  cast(ServiceDate AS date) AS ServiceDate,
  cast(ClaimDate AS date) AS ClaimDate,
  PayorID,
  ClaimAmount,
  PaidAmount,
  ClaimStatus,
  PayorType,
  Deductible,
  Coinsurance,
  Copay,
  cast(InsertDate AS date) AS SRC_InsertDate,
  cast(ModifiedDate AS date) AS SRC_ModifiedDate,
  datasource,
  CASE
    WHEN ClaimID IS NULL OR TransactionID IS NULL OR PatientID IS NULL OR ServiceDate IS NULL THEN TRUE
    ELSE FALSE
  END AS is_quarantined
FROM
  bronze_claims

In [0]:
%sql
CREATE TABLE IF NOT EXISTS silver.claims (
  UniqueClaimID STRING,
  SRC_ClaimID STRING,
  TransactionID STRING,
  PatientID STRING,
  EncounterID STRING,
  ProviderID STRING,
  DeptID STRING,
  ServiceDate DATE,
  ClaimDate DATE,
  PayorID STRING,
  ClaimAmount STRING,
  PaidAmount STRING,
  ClaimStatus STRING,
  PayorType STRING,
  Deductible STRING,
  Coinsurance STRING,
  Copay STRING,
  SRC_InsertDate DATE,
  SRC_ModifiedDate DATE,
  datasource STRING,
  is_quarantined BOOLEAN,
  audit_insertdate TIMESTAMP,
  audit_modifieddate TIMESTAMP,
  is_current BOOLEAN
)
USING DELTA;

In [0]:
%sql
-- Find matching records that have changed and mark them as not current
MERGE INTO silver.claims AS target
USING quality_checks AS source
ON target.UniqueClaimID = source.UniqueClaimID AND target.is_current = true
WHEN MATCHED AND (
    target.TransactionID != source.TransactionID OR
    target.PatientID != source.PatientID OR
    target.EncounterID != source.EncounterID OR
    target.ProviderID != source.ProviderID OR
    target.DeptID != source.DeptID OR
    target.ServiceDate != source.ServiceDate OR
    target.ClaimDate != source.ClaimDate OR
    target.PayorID != source.PayorID OR
    target.ClaimAmount != source.ClaimAmount OR
    target.PaidAmount != source.PaidAmount OR
    target.ClaimStatus != source.ClaimStatus OR
    target.PayorType != source.PayorType OR
    target.Deductible != source.Deductible OR
    target.Coinsurance != source.Coinsurance OR
    target.Copay != source.Copay OR
    target.is_quarantined != source.is_quarantined
) THEN
  UPDATE SET
    target.is_current = false,
    target.audit_modifieddate = current_timestamp()

In [0]:
%sql
-- Insert new records and the updated versions of existing records
MERGE INTO silver.claims AS target
USING (
  -- Select all records from the source
  SELECT * FROM quality_checks
  UNION ALL
  -- Select the records that were just updated (now marked as not current)
  -- so they can be re-inserted as the new current version.
  SELECT s.*
  FROM quality_checks s
  JOIN silver.claims t ON s.UniqueClaimID = t.UniqueClaimID AND t.is_current = false
  WHERE NOT EXISTS (
    SELECT 1 FROM silver.claims WHERE UniqueClaimID = s.UniqueClaimID AND is_current = true
  )
) AS source
ON target.UniqueClaimID = source.UniqueClaimID AND target.is_current = true
WHEN NOT MATCHED THEN
  INSERT (
    UniqueClaimID,
    SRC_ClaimID,
    TransactionID,
    PatientID,
    EncounterID,
    ProviderID,
    DeptID,
    ServiceDate,
    ClaimDate,
    PayorID,
    ClaimAmount,
    PaidAmount,
    ClaimStatus,
    PayorType,
    Deductible,
    Coinsurance,
    Copay,
    SRC_InsertDate,
    SRC_ModifiedDate,
    datasource,
    is_quarantined,
    audit_insertdate,
    audit_modifieddate,
    is_current
  )
  VALUES (
    source.UniqueClaimID,
    source.SRC_ClaimID,
    source.TransactionID,
    source.PatientID,
    source.EncounterID,
    source.ProviderID,
    source.DeptID,
    source.ServiceDate,
    source.ClaimDate,
    source.PayorID,
    source.ClaimAmount,
    source.PaidAmount,
    source.ClaimStatus,
    source.PayorType,
    source.Deductible,
    source.Coinsurance,
    source.Copay,
    source.SRC_InsertDate,
    source.SRC_ModifiedDate,
    source.datasource,
    source.is_quarantined,
    current_timestamp(),
    current_timestamp(),
    true
  );