In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [0]:
cptcodesDf=spark.read.csv('/mnt/landing/cptcodes/*.csv',header=True)
cptcodesDf.count()

In [0]:
# replace spaces in column names with underscores
for col in cptcodesDf.columns:
    newCol=col.replace(' ','_')
    cptcodesDf=cptcodesDf.withColumnRenamed(col,newCol)
cptcodesDf.createOrReplaceTempView("cptcodes")

In [0]:
display(cptcodesDf)

In [0]:
cptcodesDf.repartition(1).write.format('parquet').mode('overwrite').save('/mnt/bronze/cptcodes')

In [0]:
%sql
CREATE OR REPLACE TEMP VIEW quality_checks AS
select cpt_codes,procedure_code_category,procedure_code_descriptions,code_status,
case when cpt_codes IS NULL OR procedure_code_descriptions IS NULL  THEN TRUE else false
END AS is_quarantined from cptcodes

In [0]:
%sql
use catalog emrcatalog;

In [0]:
%sql
CREATE TABLE IF NOT EXISTS silver.cptcodes 
(
  cpt_codes string,
  procedure_code_category string,
  procedure_code_descriptions string,
  code_status string,
  is_quarantined boolean,
  audit_insertdate TIMESTAMP,
  audit_modifieddate TIMESTAMP,
  is_active BOOLEAN
)
using delta

In [0]:
%sql
-- Update old record to implement SCD Type 2

MERGE INTO silver.cptcodes AS target
USING quality_checks AS source
ON target.cpt_codes = source.cpt_codes AND target.is_active = true
WHEN MATCHED AND (
  target.procedure_code_category != source.procedure_code_category OR
  target.procedure_code_descriptions != source.procedure_code_descriptions OR
  target.code_status != source.code_status OR
  target.is_quarantined != source.is_quarantined
) then 
update set
target.is_active = false,target.audit_modifieddate = current_timestamp()

In [0]:
%sql
-- Update old record to implement SCD Type 2

MERGE INTO silver.cptcodes AS target
USING quality_checks AS source
ON target.cpt_codes = source.cpt_codes AND target.is_active = true
WHEN not MATCHED then
insert
(
  target.cpt_codes,
  target.procedure_code_category,
  target.procedure_code_descriptions,
  target.code_status,
  target.is_quarantined,
  target.audit_insertdate,
  target.audit_modifieddate,
  target.is_active
)
values
(
  source.cpt_codes,
  source.procedure_code_category,
  source.procedure_code_descriptions,
  source.code_status,
  source.is_quarantined,
  current_timestamp(),
  current_timestamp(),
  true
)
