In [0]:
from pyspark.sql import SparkSession, functions as f

# Reading Hospital A departments data 
df_hosa = spark.read.parquet("/mnt/bronze/hosa/transactions")

# Reading Hospital B departments data 
df_hosb = spark.read.parquet("/mnt/bronze/hosb/transactions")

# Union two departments dataframes
df_merged = df_hosa.unionByName(df_hosb)
display(df_merged)

df_merged.createOrReplaceTempView("transactions")

In [0]:
display(df_hosb)

In [0]:
%sql
CREATE OR REPLACE TEMP VIEW quality_checks AS
SELECT 
  concat(TransactionID,'-',datasource) as TransactionID,
  TransactionID as SRC_TransactionID,
  EncounterID,
  PatientID,
  ProviderID,
  DeptID,
  VisitDate,
  ServiceDate,
  PaidDate,
  VisitType,
  Amount,
  AmountType,
  PaidAmount,
  ClaimID,
  PayorID,
  ProcedureCode,
  ICDCode,
  LineOfBusiness,
  MedicaidID,
  MedicareID,
  InsertDate as SRC_InsertDate,
  ModifiedDate as SRC_ModifiedDate,
  datasource,
  CASE 
    WHEN EncounterID IS NULL OR PatientID IS NULL OR TransactionID IS NULL OR VisitDate IS NULL THEN TRUE
    ELSE FALSE
  END AS is_quarantined
FROM transactions;


In [0]:
%sql
CREATE TABLE IF NOT EXISTS silver.transactions (
  TransactionID STRING,
  SRC_TransactionID STRING,
  EncounterID STRING,
  PatientID STRING,
  ProviderID STRING,
  DeptID STRING,
  VisitDate DATE,
  ServiceDate DATE,
  PaidDate DATE,
  VisitType STRING,
  Amount DOUBLE,
  AmountType STRING,
  PaidAmount DOUBLE,
  ClaimID STRING,
  PayorID STRING,
  ProcedureCode INTEGER,
  ICDCode STRING,
  LineOfBusiness STRING,
  MedicaidID STRING,
  MedicareID STRING,
  SRC_InsertDate DATE,
  SRC_ModifiedDate DATE,
  datasource STRING,
  is_quarantined BOOLEAN,
  audit_insertdate TIMESTAMP,
  audit_modifieddate TIMESTAMP,
  is_current BOOLEAN
)
USING DELTA;


In [0]:
%sql
-- Update old record to implement SCD Type 2
MERGE INTO silver.transactions AS target
USING quality_checks AS source
ON target.TransactionID = source.TransactionID AND target.is_current = true
WHEN MATCHED
AND (
  target.SRC_TransactionID != source.SRC_TransactionID
  OR target.EncounterID != source.EncounterID
  OR target.PatientID != source.PatientID
  OR target.ProviderID != source.ProviderID
  OR target.DeptID != source.DeptID
  OR target.VisitDate != source.VisitDate
  OR target.ServiceDate != source.ServiceDate
  OR target.PaidDate != source.PaidDate
  OR target.VisitType != source.VisitType
  OR target.Amount != source.Amount
  OR target.AmountType != source.AmountType
  OR target.PaidAmount != source.PaidAmount
  OR target.ClaimID != source.ClaimID
  OR target.PayorID != source.PayorID
  OR target.ProcedureCode != source.ProcedureCode
  OR target.ICDCode != source.ICDCode
  OR target.LineOfBusiness != source.LineOfBusiness
  OR target.MedicaidID != source.MedicaidID
  OR target.MedicareID != source.MedicareID
  OR target.SRC_InsertDate != source.SRC_InsertDate
  OR target.SRC_ModifiedDate != source.SRC_ModifiedDate
  OR target.datasource != source.datasource
  OR target.is_quarantined != source.is_quarantined
) THEN
  UPDATE
  SET
    target.is_current = false,
    target.audit_modifieddate = current_timestamp();


In [0]:
%sql
-- Update old record to implement SCD Type 2
MERGE INTO silver.transactions AS target
USING quality_checks AS source
ON target.TransactionID = source.TransactionID AND target.is_current = true
WHEN MATCHED
AND (
  target.SRC_TransactionID != source.SRC_TransactionID
  OR target.EncounterID != source.EncounterID
  OR target.PatientID != source.PatientID
  OR target.ProviderID != source.ProviderID
  OR target.DeptID != source.DeptID
  OR target.VisitDate != source.VisitDate
  OR target.ServiceDate != source.ServiceDate
  OR target.PaidDate != source.PaidDate
  OR target.VisitType != source.VisitType
  OR target.Amount != source.Amount
  OR target.AmountType != source.AmountType
  OR target.PaidAmount != source.PaidAmount
  OR target.ClaimID != source.ClaimID
  OR target.PayorID != source.PayorID
  OR target.ProcedureCode != source.ProcedureCode
  OR target.ICDCode != source.ICDCode
  OR target.LineOfBusiness != source.LineOfBusiness
  OR target.MedicaidID != source.MedicaidID
  OR target.MedicareID != source.MedicareID
  OR target.SRC_InsertDate != source.SRC_InsertDate
  OR target.SRC_ModifiedDate != source.SRC_ModifiedDate
  OR target.datasource != source.datasource
  OR target.is_quarantined != source.is_quarantined
) THEN
  UPDATE
  SET
    target.is_current = false,
    target.audit_modifieddate = current_timestamp();
