In [0]:
# Databricks notebook source
from pyspark.sql import SparkSession, functions as f

#Reading Hospital A departments data 
df_hosa=spark.read.parquet("/mnt/bronze/hosa/departments")

#Reading Hospital B departments data 
df_hosb=spark.read.parquet("/mnt/bronze/hosb/departments")

#union two departments dataframes
df_merged = df_hosa.unionByName(df_hosb)

# Create the dept_id column and rename deptid to src_dept_id
# reason behind doing below transformation is to avoid confusion of data between two hospital. Lets say hospital A has deptid 2 which is radiology and hospital 2 has deptid which is surgery, so concating will remove the confusion
df_merged = df_merged.withColumn("SRC_Dept_id", f.col("deptid")) \
                     .withColumn("Dept_id", f.concat(f.col("deptid"),f.lit('-'), f.col("datasource"))) \
                     .drop("deptid")

df_merged.createOrReplaceTempView("departments")

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS silver;

In [0]:
 %sql
CREATE TABLE IF NOT EXISTS silver.departments (
 Dept_Id string,
 SRC_Dept_Id string,
 Name string,
 datasource string,
 is_quarantined boolean
 )
 USING DELTA;

 --is_quarantined means if any data failed data qualtiy check then it should be quarantine. It should not go with right data

In [0]:
%sql
-- if any data present in this table then it will truncate as this is full load.
truncate table silver.departments

In [0]:
 %sql
insert into silver.departments
SELECT 
Dept_Id,
SRC_Dept_Id,
Name,
Datasource,
     CASE 
        WHEN SRC_Dept_Id IS NULL OR Name IS NULL THEN TRUE
         ELSE FALSE
     END AS is_quarantined
FROM departments

In [0]:
%sql
select * from silver.departments