#### Bronze Ingestion Pattern
- Read JSON
- Schema inference (allowed in Bronze)
- Add ingestion_ts
- Write as Delta
- Append mode


In [0]:
from pyspark.sql.functions import current_timestamp, col

df_patients = (
    spark.read
         .format("json")
         .option("multiLine", "true")
         .load("/Volumes/medilakehealth_catalog/landing/operational_data/patients/patients.json")
         .select("*", col("_metadata.file_path").alias("source_file"))
         .withColumn("ingestion_ts", current_timestamp())
)

df_patients.write \
    .format("delta") \
    .mode("append") \
    .saveAsTable("medilakehealth_catalog.bronz.patients")

display(df_patients);


from pyspark.sql.functions import 
- current_timestamp() → captures ingestion time
- col() → used to reference columns (including metadata)


```

spark.read
    .format("json")
    .option("multiLine", "true")
    .load("/Volumes/medilakehealth_catalog/landing/operational_data/patients/patients.json")


```
- Reads raw JSON data
- multiLine = true is required because the JSON is an array of objects
- Data is read from a Unity Catalog Volume (governed storage)


```
.select("*", col("_metadata.file_path").alias("source_file"))

```
- _metadata.file_path records the exact source file
- Required in Unity Catalog (replaces input_file_name())
- Helps with auditing, debugging, and lineage


```

.withColumn("ingestion_ts", current_timestamp())

```

- Stores when the record entered the Bronze layer
- Useful for incremental logic and monitoring pipelines


``` df_patients.write \
    .format("delta") \
    .mode("append") \
    .saveAsTable("medilakehealth_catalog.bronz.patients") 
```
- Writes data as a Delta Lake table
- append mode supports incremental ingestion
- Table is managed by Unity Catalog

In [0]:
%sql
SELECT * FROM medilakehealth_catalog.bronz.patients;

encounters 


In [0]:
from pyspark.sql.functions import current_timestamp, col

df_encounters = (
    spark.read
         .format("json")
         .option("multiLine", "true")
         .load("/Volumes/medilakehealth_catalog/landing/operational_data/encounters.json")
         .select("*", col("_metadata.file_path").alias("source_file"))
         .withColumn("ingestion_ts", current_timestamp())
)

df_encounters.write \
    .format("delta") \
    .mode("append") \
    .saveAsTable("medilakehealth_catalog.bronz.encounters")

display(df_encounters)

In [0]:
%sql
SELECT * FROM medilakehealth_catalog.bronz.encounters;

diagnoses

In [0]:
from pyspark.sql.functions import current_timestamp, col

df_diagnoses = (
    spark.read
         .format("json")
         .option("multiLine", "true")
         .load("/Volumes/medilakehealth_catalog/landing/operational_data/diagnoses.json")
         .select("*", col("_metadata.file_path").alias("source_file"))
         .withColumn("ingestion_ts", current_timestamp())
)

df_diagnoses.write \
    .format("delta") \
    .mode("append") \
    .saveAsTable("medilakehealth_catalog.bronz.diagnoses")

display(df_diagnoses)

In [0]:
%sql
SELECT * FROM medilakehealth_catalog.bronz.diagnoses;

procedures

In [0]:

from pyspark.sql.functions import current_timestamp, col

df_procedures = (
    spark.read
         .format("json")
         .option("multiLine", "true")
         .load("/Volumes/medilakehealth_catalog/landing/operational_data/procedures.json")
         .select("*", col("_metadata.file_path").alias("source_file"))
         .withColumn("ingestion_ts", current_timestamp())
)

df_procedures.write \
    .format("delta") \
    .mode("append") \
    .saveAsTable("medilakehealth_catalog.bronz.procedures")

display(df_procedures)

In [0]:
%sql

SELECT * FROM medilakehealth_catalog.bronz.procedures;

lab_results

In [0]:
 

from pyspark.sql.functions import current_timestamp, col

df_lab_results = (
    spark.read
         .format("json")
         .option("multiLine", "true")
         .load("/Volumes/medilakehealth_catalog/landing/operational_data/lab_results.json")
         .select("*", col("_metadata.file_path").alias("source_file"))
         .withColumn("ingestion_ts", current_timestamp())
)

df_lab_results.write \
    .format("delta") \
    .mode("append") \
    .saveAsTable("medilakehealth_catalog.bronz.lab_results")

display(df_lab_results)

In [0]:
%sql

SELECT * FROM medilakehealth_catalog.bronz.lab_results;

medications

In [0]:
  

from pyspark.sql.functions import current_timestamp, col

df_medications = (
    spark.read
         .format("json")
         .option("multiLine", "true")
         .load("/Volumes/medilakehealth_catalog/landing/operational_data/medications.json")
         .select("*", col("_metadata.file_path").alias("source_file"))
         .withColumn("ingestion_ts", current_timestamp())
)

df_medications.write \
    .format("delta") \
    .mode("append") \
    .saveAsTable("medilakehealth_catalog.bronz.medications")

display(df_medications)

In [0]:
%sql

SELECT * FROM medilakehealth_catalog.bronz.medications;

Validate Bronze Tables

In [0]:
%sql

SHOW TABLES IN medilakehealth_catalog.bronz;



In [0]:
%sql

SELECT * FROM medilakehealth_catalog.bronz.patients LIMIT 5;

In [0]:
%sql

SELECT COUNT(*) FROM medilakehealth_catalog.bronz.lab_results;

In [0]:
%fs ls /Volumes/medilakehealth_catalog/landing/operational_data/