Silver Notebook - Transform and clean the raw data stored in bronze, save in delta format

In [0]:
# Get bronze output metadata (e.g., paths)

bronze_output = dbutils.jobs.taskValues.get(taskKey="bronze",key="bronze_output")
#file_path = "abfss://silver@abychen.dfs.core.windows.net/2025-04-14_earthquake_data.json"

start_date = bronze_output.get("start_date","") # "" at the end is used for setting a default value if output is not present
end_date = bronze_output.get("end_date","")
bronze_adls = bronze_output.get("bronze_adls","")
silver_adls = bronze_output.get("silver_adls","")
gold_adls = bronze_output.get("gold_adls","")



In [0]:
# Import required Spark SQL functions

from pyspark.sql.functions import when, col, to_timestamp, to_date, date_format, isnull
from pyspark.sql.types import TimestampType

In [0]:
# Flatten nested schema and select relevant fields

df = spark.read.option("multiline", "true").json(bronze_adls)

df2 = df.select(
    df.id,
    df.geometry.coordinates[0].alias("longitude"),
    df.geometry.coordinates[1].alias("latitude"),
    df.geometry.coordinates[2].alias("elevation"),
    df.properties.title.alias("title"),
    df.properties.mag.alias("magnitude"),
    df.properties.place.alias("place_description"),
    df.properties.sig.alias("sig"),
    df.properties.magType.alias("magType"),
    df.properties.time.alias("time"),
    df.properties.updated.alias("updated")
)
df2 = df2.withColumn("longitude", when(isnull(col("longitude")), 0).otherwise(col("longitude")))\
                   .withColumn("latitude",when(isnull(col('latitude')),0).otherwise(col("latitude")))\
                       .withColumn("time",when(df2.time.isNull(),0).otherwise(df2.time))
# Replace null values with defaults

df2 = df2.withColumn("time",((df2.time)/1000).cast(TimestampType()))\
       .withColumn("updated",((df2.updated)/1000).cast(TimestampType()))
# Convert UNIX epoch (ms) to timestamp

df2 = df2.withColumn("event_date", to_date(to_timestamp(col("time"))))\
         .withColumn("event_time", date_format(to_timestamp(col("time")),"HH:mm:ss:SSS"))\
         .withColumn("updated_date", to_date(to_timestamp(col("updated"))))\
         .withColumn("updated_time", date_format(to_timestamp(col("updated")),"HH:mm:ss:SSS"))
# Create formatted date/time columns

df2 = df2.drop("time","updated")

In [0]:
# Save processed data in Delta format to ADLS silver container

silver_file_path = f"{silver_adls}earthquake_events_silver"
df2.write.mode("append").format("delta").save(silver_file_path)

In [0]:
# Output path for gold notebook

dbutils.jobs.taskValues.set(key="silver_output",value=silver_file_path) 
# sending the file_path as a single string