In [0]:
from pyspark.sql import SparkSession, functions as f
df_hosa = spark.read.parquet("/mnt/bronze/hosa/patients")
df_hosa.createOrReplaceTempView("patients_hosa")

df_hosb = spark.read.parquet("/mnt/bronze/hosb/patients")
df_hosb.createOrReplaceTempView("patients_hosb")

In [0]:
%sql
select * from patients_hosa;

In [0]:
%sql
select * from patients_hosb;

In [0]:
%sql
CREATE OR REPLACE TEMP VIEW cdm_patients AS
SELECT CONCAT(SRC_PatientID,'-',datasource) AS Patient_Key, *
FROM (
  SELECT 
  PatientID AS SRC_PatientID,
  FirstName,
  LastName,
  MiddleName,
  SSN,
  PhoneNumber,
  Gender,
  DOB,
  Address,
  ModifiedDate,
  datasource FROM patients_hosa
  UNION ALL
  SELECT
  ID as SRC_PatientID,
  F_Name AS FirstName,
  L_Name AS LastName,
  M_Name AS MiddleName,
  SSN,
  PhoneNumber,
  Gender,
  DOB,
  Address,
  Updated_Date,
  datasource FROM patients_hosb 
);

In [0]:
%sql
SELECT * FROM cdm_patients;

In [0]:
%sql
CREATE OR REPLACE TEMP VIEW quality_check AS
SELECT 
Patient_Key,
SRC_PatientID,
FirstName,
LastName,
MiddleName,
SSN,
PhoneNumber,
Gender,
DOB,
Address,
ModifiedDate,
datasource,
CASE WHEN SRC_PatientID IS NULL OR DOB IS NULL OR FirstName IS NULL OR lower(FirstName) = 'null' THEN TRUE ELSE FALSE END AS is_quarantined
FROM cdm_patients;

In [0]:
%sql
CREATE TABLE IF NOT EXISTS `hrcm-data-catalog`.silver.patients(
Patient_Key STRING,
SRC_PatientID STRING,
FirstName STRING,
LastName STRING,
MiddleName STRING,
SSN STRING,
PhoneNumber STRING,
Gender STRING,
DOB DATE,
Address STRING,
SRC_ModifiedDate TIMESTAMP,
datasource STRING,
is_quarantined BOOLEAN,
InsertedDate TIMESTAMP,
ModifiedDate TIMESTAMP,
is_current BOOLEAN
)
USING DELTA;

In [0]:
%sql
merge into `hrcm-data-catalog`.silver.patients as target
using quality_check as source
on target.Patient_Key = source.Patient_Key
and target.is_current = true
when matched 
and (
  target.SRC_PatientID != source.SRC_PatientID or
  target.FirstName != source.FirstName or
  target.LastName != source.LastName or
  target.MiddleName != source.MiddleName or
  target.SSN != source.SSN or
  target.PhoneNumber != source.PhoneNumber or
  target.Gender != source.Gender or
  target.DOB != source.DOB or
  target.Address != source.Address or
  target.SRC_ModifiedDate != source.ModifiedDate or
  target.datasource != source.datasource or
  target.is_quarantined != source.is_quarantined
)
then update 
set is_current = false,
    ModifiedDate = current_timestamp()
when not matched
then insert (
  target.Patient_Key,
  target.SRC_PatientID,
  target.FirstName,
  target.LastName,
  target.MiddleName,
  target.SSN,
  target.PhoneNumber,
  target.Gender,
  target.DOB,
  target.Address,
  target.SRC_ModifiedDate,
  target.datasource,
  target.is_quarantined,
  target.InsertedDate,
  target.ModifiedDate,
  target.is_current
)
Values(
  source.Patient_Key,
  source.SRC_PatientID,
  source.FirstName,
  source.LastName,
  source.MiddleName,
  source.SSN,
  source.PhoneNumber,
  source.Gender,
  source.DOB,
  source.Address,
  source.ModifiedDate,
  source.datasource,
  source.is_quarantined,
  current_timestamp(),
  current_timestamp(),
  true
)


In [0]:
%sql
merge into `hrcm-data-catalog`.silver.patients as target
using quality_check as source
on target.Patient_Key = source.Patient_Key
and target.is_current = true
when not matched
then insert (
  target.Patient_Key,
  target.SRC_PatientID,
  target.FirstName,
  target.LastName,
  target.MiddleName,
  target.SSN,
  target.PhoneNumber,
  target.Gender,
  target.DOB,
  target.Address,
  target.SRC_ModifiedDate,
  target.datasource,
  target.is_quarantined,
  target.InsertedDate,
  target.ModifiedDate,
  target.is_current
)
Values(
  source.Patient_Key,
  source.SRC_PatientID,
  source.FirstName,
  source.LastName,
  source.MiddleName,
  source.SSN,
  source.PhoneNumber,
  source.Gender,
  source.DOB,
  source.Address,
  source.ModifiedDate,
  source.datasource,
  source.is_quarantined,
  current_timestamp(),
  current_timestamp(),
  true
)

In [0]:
df_silver = spark.sql("select * from `hrcm-data-catalog`.silver.patients")

df_silver.write.format("delta").mode("overwrite").save("/mnt/silver/patients")