In [0]:
# Databricks notebook source
from pyspark.sql import SparkSession, functions as f

#Reading Hospital A departments data 
df_hosa=spark.read.parquet("/mnt/bronze/hosa/departments")

#Reading Hospital B departments data 
df_hosb=spark.read.parquet("/mnt/bronze/hosb/departments")

#union two departments dataframes
df_merged = df_hosa.unionByName(df_hosb)

# Create the dept_id column and rename deptid to src_dept_id
df_merged = df_merged.withColumn("SRC_Dept_id", f.col("deptid")) \
                     .withColumn("Dept_id", f.concat(f.col("deptid"),f.lit('-'), f.col("datasource"))) \
                     .drop("deptid")

df_merged.createOrReplaceTempView("departments")

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS hcrcm.silver;

CREATE TABLE IF NOT EXISTS hcrcm.silver.departments (
Dept_Id string,
SRC_Dept_Id string,
Name string,
datasource string,
is_quarantined boolean -- This feature will be used to mark the records failing the data quality checks
)
USING DELTA;

In [0]:
%sql
TRUNCATE TABLE silver.departments;

In [0]:
%sql
insert into silver.departments
SELECT 
Dept_Id,
SRC_Dept_Id,
Name,
Datasource,
    CASE 
        WHEN SRC_Dept_Id IS NULL OR Name IS NULL THEN TRUE
        ELSE FALSE
    END AS is_quarantined
FROM departments

In [0]:
departments_df = spark.sql("SELECT * FROM silver.departments")

In [0]:
# Writing the data to the silver layer
departments_df.write.format("delta").mode("overwrite").save("/mnt/silver/department/")