In [0]:
from pyspark.sql import SparkSession, functions as f

claims_df=spark.read.csv("/mnt/landing/claims/*.csv",header=True)

claims_df = claims_df.withColumn(
    "datasource",
    f.when(f.input_file_name().contains("hospital1"), "hosa").when(f.input_file_name().contains("hospital2"), "hosb")
     .otherwise(None)
)

display(claims_df)

In [0]:
claims_df.write.format("parquet").mode("overwrite").save("/mnt/bronze/claims/")

In [0]:
claims_df.createOrReplaceTempView("claims")

In [0]:
%sql
CREATE OR REPLACE TEMP VIEW quality_checks AS
SELECT
  concat(ClaimID,'_',datasource) AS ClaimID,
  ClaimID AS SRC_ClaimID,
  TransactionID,
  PatientID,
  EncounterID,
  ProviderID,
  DeptID,
  cast(ServiceDate as date) ServiceDate,
  cast(ClaimDate as date) ClaimDate,
  PayorID,
  ClaimAmount,
  PaidAmount,
  ClaimStatus,
  PayorType,
  Deductible,
  Coinsurance,
  Copay,
  cast(InsertDate as date) SRC_InsertDate,
  cast(ModifiedDate as date) SRC_ModifiedDate,
  datasource,
  CASE
    WHEN ClaimID IS NULL OR TransactionID IS NULL OR PatientID IS NULL OR ServiceDate IS NULL THEN TRUE 
    ELSE FALSE
  END AS is_quarantined
FROM claims 

In [0]:
%sql
SELECT * FROM quality_checks

In [0]:
%sql
CREATE TABLE IF NOT EXISTS silver.claims(
  ClaimID STRING,
  SRC_ClaimID STRING,
  TransactionID STRING,
  PatientID STRING,
  EncounterID STRING,
  ProviderID STRING,
  DeptID STRING,
  ServiceDate DATE,
  ClaimDate DATE,
  PayorID STRING,
  ClaimAmount STRING,
  PaidAmount STRING,
  ClaimStatus STRING,
  PayorType STRING,
  Deductible STRING,
  Coinsurance STRING,
  Copay STRING,
  SRC_InsertDate DATE,
  SRC_ModifiedDate DATE,
  datasource STRING,
  is_quarantined BOOLEAN,
  audit_insertedDate TIMESTAMP,
  audit_modifiedDate TIMESTAMP,
  is_current BOOLEAN
)

In [0]:
%sql
-- Update old records to implement SCD Type 2
MERGE INTO silver.claims AS target
USING quality_checks AS source
ON target.ClaimID = source.ClaimID AND target.is_current = TRUE
WHEN MATCHED AND (
  target.SRC_ClaimID != source.SRC_ClaimID OR
  target.TransactionID != source.TransactionID OR
  target.PatientID != source.PatientID OR
  target.EncounterID != source.EncounterID OR
  target.ProviderID != source.ProviderID OR
  target.DeptID != source.DeptID OR
  target.ServiceDate != source.ServiceDate OR
  target.ClaimDate != source.ClaimDate OR
  target.PayorID != source.PayorID OR
  target.ClaimAmount != source.ClaimAmount OR
  target.PaidAmount != source.PaidAmount OR
  target.ClaimStatus != source.ClaimStatus OR
  target.PayorType != source.PayorType OR
  target.Deductible != source.Deductible OR
  target.Coinsurance != source.Coinsurance OR
  target.Copay != source.Copay OR
  target.SRC_InsertDate != source.SRC_InsertDate OR
  target.SRC_ModifiedDate != source.SRC_ModifiedDate OR
  target.datasource != source.datasource OR
  target.is_quarantined != source.is_quarantined
)
THEN UPDATE SET
  is_current = FALSE,
  audit_modifiedDate = current_timestamp()

In [0]:
%sql
-- SCD Type2, insert new/current records
MERGE INTO silver.claims AS target
USING quality_checks AS source
ON target.ClaimID = source.ClaimID AND target.is_current = TRUE
WHEN NOT MATCHED THEN
INSERT(
  ClaimID,
  SRC_ClaimID,
  TransactionID,
  PatientID,
  EncounterID,
  ProviderID,
  DeptID,
  ServiceDate,
  ClaimDate,
  PayorID,
  ClaimAmount,
  PaidAmount,
  ClaimStatus,
  PayorType,
  Deductible,
  Coinsurance,
  Copay,
  SRC_InsertDate,
  SRC_ModifiedDate,
  datasource,
  is_quarantined,
  audit_insertedDate,
  audit_modifiedDate,
  is_current
)
VALUES(
  source.ClaimID,
  source.SRC_ClaimID,
  source.TransactionID,
  source.PatientID,
  source.EncounterID,
  source.ProviderID,
  source.DeptID,
  source.ServiceDate,
  source.ClaimDate,
  source.PayorID,
  source.ClaimAmount,
  source.PaidAmount,
  source.ClaimStatus,
  source.PayorType,
  source.Deductible,
  source.Coinsurance,
  source.Copay,
  source.SRC_InsertDate,
  source.SRC_ModifiedDate,
  source.datasource,
  source.is_quarantined,
  current_timestamp(),
  current_timestamp(),
  TRUE
)