In [0]:
df_hosa = spark.read.parquet("/mnt/bronze/hosa/patients")
df_hosa.createOrReplaceTempView("patients_hosa")

df_hosb = spark.read.parquet("/mnt/bronze/hosb/patients")
df_hosb.createOrReplaceTempView("patients_hosb")



In [0]:
%sql
select * from patients_hosa

In [0]:
%sql
select * from patients_hosb

In [0]:
%sql
create or replace temp view cdm_patient as
select concat(SRC_PatientID,'-',datasource) as Patient_Key,* from
(
  select
  PatientID as SRC_PatientID,
  FirstName,
  LastName,
  MiddleName,
  SSN,
  PhoneNumber,
  Gender,
  DOB,
  Address,
  ModifiedDate,
  datasource
  from patients_hosa

  union all
  
  select
  ID as SRC_PatientID,
  F_Name as FirstName,
  L_Name as LastName,
  M_Name as MiddleName,
  SSN,
  PhoneNumber,
  Gender,
  DOB,
  Address,
  Updated_Date as ModifiedDate,
  datasource
  from patients_hosb
)

In [0]:
%sql
select * from cdm_patient

In [0]:
%sql
create or replace temp view  quality_checks as
select
  Patient_Key,
  SRC_PatientID,
  FirstName,
  LastName,
  MiddleName,
  SSN,
  PhoneNumber,
  Gender,
  DOB,
  Address,
  ModifiedDate as SRC_ModifiedDate,
  datasource,
  CASE
    WHEN SRC_PatientID is null or DOB is null or FirstName is null or lower(FirstName)='null' then true
    else false
  end as is_quarantined
from cdm_patient


In [0]:
%sql    
select * from quality_checks

In [0]:
%sql
create table if not exists silver.patients(
  Patient_Key string,
  SRC_PatientID string,
  FirstName string,
  LastName string,
  MiddleName string,
  SSN string,
  PhoneNumber string,
  Gender string,
  DOB date,
  Address string,
  SRC_ModifiedDate date,
  datasource string,
  is_quarantined boolean,
  inserted_date timestamp,
  modified_date timestamp,
  is_current boolean
)
using delta

In [0]:
%sql
merge into silver.patients as target
using quality_checks as source
on target.Patient_Key = source.Patient_Key
and is_current = true
when matched
and (
  target.SRC_PatientID != source.SRC_PatientID
  or target.FirstName != source.FirstName
  or target.LastName != source.LastName
  or target.MiddleName != source.MiddleName
  or target.SSN != source.SSN
  or target.PhoneNumber != source.PhoneNumber
  or target.Gender != source.Gender
  or target.DOB != source.DOB
  or target.Address != source.Address
  or target.SRC_ModifiedDate != source.SRC_ModifiedDate
  or target.datasource != source.datasource
  or target.is_quarantined != source.is_quarantined
)
Then update set
  target.is_current = false,
  target.modified_date = current_timestamp()
  
when not matched
Then insert (
  Patient_Key,
  SRC_PatientID,
  FirstName,
  LastName,
  MiddleName,
  SSN,
  PhoneNumber,
  Gender,
  DOB,
  Address,
  SRC_ModifiedDate,
  datasource,
  is_quarantined,
  inserted_date,
  modified_date,
  is_current
) values (
  source.Patient_Key,
  source.SRC_PatientID,
  source.FirstName,
  source.LastName,
  source.MiddleName,
  source.SSN,
  source.PhoneNumber,
  source.Gender,
  source.DOB,
  source.Address,
  source.SRC_ModifiedDate,
  source.datasource,
  source.is_quarantined,
  current_timestamp(),
  current_timestamp(),
  true
)