In [0]:
from pyspark.sql import functions as f

#Reading Hospital-1 transaction data from bronze layer.
df_hospital1=spark.read.parquet('/mnt/bronze/Hospital-1/transactions')

#Reading Hospital-2 transaction data from bronze layer.
df_hospital2=spark.read.parquet('/mnt/bronze/Hospital-2/transactions')

#Adding datasource columns to the dataframes
#df_hospital1 = df_hospital1.withColumn("datasource", f.lit("Hospital-1"))
#df_hospital2 = df_hospital2.withColumn("datasource", f.lit("Hospital-2"))

df_merged=df_hospital1.unionByName(df_hospital2)

df_merged.createOrReplaceTempView("transactions")

In [0]:
%sql
DESCRIBE transactions

In [0]:
%sql
CREATE OR REPLACE TEMP VIEW quality_checks AS
SELECT
concat(TransactionID,'-',datasource) AS TransactionID,
TransactionID AS SRC_TransactionID,
EncounterID,
PatientID,
ProviderID,
DeptID,
VisitDate,
ServiceDate,
PaidDate,
VisitType,
Amount,
AmountType,
PaidAmount,
ClaimID,
PayorID,
ProcedureCode,
ICDCode,
LineOfBusiness,
MedicaidID,
MedicareID,
InsertDate AS SRC_InsertDate,
ModifiedDate AS SRC_ModifiedDate,
datasource,
CASE
    WHEN TransactionID IS NULL OR EncounterID IS NULL OR PatientID IS NULL OR VisitDate IS NULL THEN TRUE
    ELSE FALSE
END AS is_quarantined
FROM transactions;

In [0]:
%sql
DESCRIBE quality_checks;

In [0]:
%sql
CREATE TABLE IF NOT EXISTS healthcarerevenuecyclemanagement_databricks.silver.transactions(
TransactionID STRING,
SRC_TransactionID STRING,
EncounterID STRING,
PatientID STRING,
ProviderID STRING,
DeptID STRING,
VisitDate DATE,
ServiceDate DATE,
PaidDate DATE,
VisitType STRING,
Amount DECIMAL,
AmountType STRING,
PaidAmount DECIMAL,
ClaimID STRING,
PayorID STRING,
ProcedureCode STRING,
ICDCode STRING,
LineOfBusiness STRING,
MedicaidID STRING,
MedicareID STRING,
SRC_InsertDate DATE,
SRC_ModifiedDate DATE,
datasource STRING,
is_quarantined BOOLEAN,
audit_insertdate TIMESTAMP,
audit_modifieddate TIMESTAMP,
is_current BOOLEAN
) USING DELTA;

In [0]:
%sql
MERGE INTO healthcarerevenuecyclemanagement_databricks.silver.transactions AS target
USING quality_checks AS source
ON target.TransactionID = source.TransactionID
AND target.is_current = TRUE
WHEN MATCHED AND
(
  target.TransactionID <> source.SRC_TransactionID OR
  target.SRC_TransactionID <> source.SRC_TransactionID OR
  target.EncounterID <> source.EncounterID OR
  target.PatientID <> source.PatientID OR
  target.ProviderID <> source.ProviderID OR
  target.DeptID <> source.DeptID OR
  target.VisitDate <> source.VisitDate OR
  target.ServiceDate <> source.ServiceDate OR
  target.PaidDate <> source.PaidDate OR
  target.VisitType <> source.VisitType OR
  target.Amount <> source.Amount OR
  target.AmountType <> source.AmountType OR
  target.PaidAmount <> source.PaidAmount OR
  target.ClaimID <> source.ClaimID OR
  target.PayorID <> source.PayorID OR
  target.ProcedureCode <> source.ProcedureCode OR
  target.ICDCode <> source.ICDCode OR
  target.LineOfBusiness <> source.LineOfBusiness OR
  target.MedicaidID <> source.MedicaidID OR
  target.MedicareID <> source.MedicareID OR
  target.SRC_InsertDate <> source.SRC_InsertDate OR
  target.SRC_ModifiedDate <> source.SRC_ModifiedDate OR
  target.datasource <> source.datasource OR
  target.is_quarantined <> source.is_quarantined
)
THEN UPDATE 
SET target.is_current = FALSE,
    target.audit_modifieddate = current_timestamp();

In [0]:
%sql
MERGE INTO healthcarerevenuecyclemanagement_databricks.silver.transactions AS target
USING quality_checks AS source
ON target.TransactionID = source.TransactionID
AND target.is_quarantined = TRUE
WHEN NOT MATCHED THEN INSERT(
TransactionID,
SRC_TransactionID,
EncounterID,
PatientID,
ProviderID,
DeptID,
VisitDate,
ServiceDate,
PaidDate,
VisitType,
Amount,
AmountType,
PaidAmount,
ClaimID,
PayorID,
ProcedureCode,
ICDCode,
LineOfBusiness,
MedicaidID,
MedicareID,
SRC_InsertDate,
SRC_ModifiedDate,
datasource,
is_quarantined,
audit_insertdate,
audit_modifieddate,
is_current
) VALUES (
source.TransactionID,
source.SRC_TransactionID,
source.EncounterID,
source.PatientID,
source.ProviderID,
source.DeptID,
CAST(source.VisitDate AS DATE),
CAST(source.ServiceDate AS DATE),
CAST(source.PaidDate AS DATE),
source.VisitType,
CAST(source.Amount AS DECIMAL),
source.AmountType,
CAST(source.PaidAmount AS DECIMAL),
source.ClaimID,
source.PayorID,
source.ProcedureCode,
source.ICDCode,
source.LineOfBusiness,
source.MedicaidID,
source.MedicareID,
CAST(source.SRC_InsertDate AS DATE),
CAST(source.SRC_ModifiedDate AS DATE),
CAST(source.datasource AS BOOLEAN),
source.is_quarantined,
current_timestamp(),
current_timestamp(),
true
);

In [0]:
%sql
SELECT TransactionID,count(*) FROM healthcarerevenuecyclemanagement_databricks.silver.transactions GROUP BY TransactionID ORDER BY 2 desc;