In [0]:
bronze_df = spark.table("sathyajith.demoprojectfhir.bronze_fhir_bundle")


In [0]:
from pyspark.sql.functions import explode, col

exploded_df = bronze_df.select(
    explode(col("entry")).alias("entry"),
    col("ingestion_time"),
    col("source_file")
)


In [0]:
silver_base_df = exploded_df.select(
    col("entry.fullUrl").alias("full_url"),
    col("entry.resource.resourceType").alias("resource_type"),
    col("entry.resource.id").alias("resource_id"),
    col("entry.resource").alias("resource"),
    col("ingestion_time"),
    col("source_file")
)


In [0]:
patient_df = silver_base_df.filter(col("resource_type") == "Patient")


In [0]:
encounter_df = silver_base_df.filter(col("resource_type") == "Encounter")


In [0]:
from pyspark.sql.functions import element_at

silver_patient_df = patient_df.select(
    col("resource_id").alias("patient_id"),
    element_at(col("resource.name"), 1).family.alias("family_name"),
    element_at(col("resource.name"), 1).given.alias("given_names"),
    col("resource.gender"),
    col("resource.birthDate"),
    col("ingestion_time"),
    col("source_file")
)


In [0]:
silver_encounter_df = encounter_df.select(
    col("resource_id").alias("encounter_id"),
    col("resource.status"),
    col("resource.subject.reference").alias("patient_reference"),
    col("resource.period.start").alias("start_time"),
    col("resource.period.end").alias("end_time"),
    col("ingestion_time"),
    col("source_file")
)


In [0]:
%sql
create table if not exists sathyajith.demoprojectfhir.silver_patient

In [0]:
silver_patient_df.write \
  .format("delta") \
      .option ("mergeSchema", "true")\
  .mode("append") \
  .saveAsTable("sathyajith.demoprojectfhir.silver_patient")


In [0]:
%sql
create table if not exists sathyajith.demoprojectfhir.silver_encounter

In [0]:
silver_encounter_df.write \
  .format("delta") \
  .mode("append") \
      .option ("mergeSchema", "true")\
  .saveAsTable("sathyajith.demoprojectfhir.silver_encounter")


In [0]:
%sql
SELECT * FROM sathyajith.demoprojectfhir.silver_patient;
SELECT * FROM sathyajith.demoprojectfhir.silver_encounter;


In [0]:
%sql
SELECT * FROM sathyajith.demoprojectfhir.silver_encounter;

In [0]:
from pyspark.sql.functions import col

observation_df = silver_base_df.filter(
    col("resource_type") == "Observation"
)


In [0]:
silver_observation_df = observation_df.select(
    col("resource_id").alias("observation_id"),
    col("resource.subject.reference").alias("patient_reference"),
    col("resource.code.text").alias("observation_name"),
    col("resource.valueQuantity.value").alias("value"),
    col("resource.valueQuantity.unit").alias("unit"),
    col("resource.effectiveDateTime").alias("observation_time"),
    col("ingestion_time"),
    col("source_file")
)


In [0]:
%sql
create table if not exists sathyajith.demoprojectfhir.silver_observation
    

In [0]:
silver_observation_df.write \
  .format("delta") \
  .mode("append") \
      .option("mergeSchema", "true")\
  .saveAsTable("sathyajith.demoprojectfhir.silver_observation")


In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

window = Window.partitionBy("resource_id").orderBy(col("ingestion_time").desc())

dedup_patient_df = silver_patient_df \
    .withColumn("rn", row_number().over(window)) \
    .filter(col("rn") == 1) \
    .drop("rn")


In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number

window = Window.partitionBy("patient_id") \
               .orderBy(col("ingestion_time").desc_nulls_last())

dedup_patient_df = (
    silver_patient_df
    .withColumn("rn", row_number().over(window))
    .filter(col("rn") == 1)
    .drop("rn")
)

dedup_patient_df.display()


In [0]:
silver_patient_df.select("patient_id", "ingestion_time").show(5)
