This pipeline will:

Flatten nested fields (geometry.coordinates, properties.*)

Cast to correct types

Handle nulls (replace with 0 for numeric)

Remove mismatches (invalid lat/lon, mag, sig, elevation)

Deduplicate (keep latest record per id using updated)

In [0]:
#Required each time the cluster is restored which should be only on the first notebbok as they run in order.

tiers = ["bronze", "silver", "gold"]
adls_path = {tier: f"abfss://{tier}@dbprojectearthquack.dfs.core.windows.net/"for tier in tiers}

#Accesssing Paths 

bronze_adls = adls_path["bronze"]
silver_adls = adls_path["silver"]
gold_adls = adls_path["gold"]

dbutils.fs.ls(bronze_adls)
dbutils.fs.ls(silver_adls)
dbutils.fs.ls(gold_adls)


[]

In [0]:
from datetime import date, timedelta 
start_date = date.today() - timedelta(1)
end_time = date.today()

print(start_date)
print(end_time)


2025-08-31
2025-09-01


In [0]:
# Retrieve the task value from the previous task (bronze), with a debugValue for local testing

bronze_output = dbutils.jobs.taskValues.get(taskKey="bronze", key="bronze_output", debugValue={"start_date": "", "bronze_adls": "", "silver_adls": ""})

#bronze_output = dbutils.jobs.taskValues.get(taskKey="bronze", key="bronze_output", debugValue=None)

# Access individual variables
start_date = bronze_output.get("start_date", "'")
bronze_adls = bronze_output.get("bronze_adls", "")
silver_adls = bronze_output.get("silver_adls", "")

#print(f"Start Date: {start_date}, Bronze ADLS: {bronze_adls}")

print("bronze_output:", bronze_output)
print("start_date:", start_date)

bronze_output: {'start_date': '', 'bronze_adls': '', 'silver_adls': ''}
start_date: 


In [0]:
from pyspark.sql.functions import col, isnull, when
from pyspark.sql.types import TimestampType
from datetime import date, timedelta


In [0]:
# Load JSON data into dataframe.

df = spark.read.option("multiline", True).json(f"{bronze_adls}{start_date}_earthquake_data.json")

#df = spark.read.option("multiline", True).json(
    #f"abfss://bronze@dbprojectearthquack.dfs.core.windows.net{start_date}_earthquake_data.json")

In [0]:
df.printSchema()

[0;31m---------------------------------------------------------------------------[0m
[0;31mIllegalArgumentException[0m                  Traceback (most recent call last)
File [0;32m<command-6971199580977343>, line 1[0m
[0;32m----> 1[0m df[38;5;241m.[39mprintSchema()

File [0;32m/databricks/python/lib/python3.11/site-packages/pyspark/sql/connect/dataframe.py:2027[0m, in [0;36mDataFrame.printSchema[0;34m(self, level)[0m
[1;32m   2025[0m     [38;5;28mprint[39m([38;5;28mself[39m[38;5;241m.[39mschema[38;5;241m.[39mtreeString(level))
[1;32m   2026[0m [38;5;28;01melse[39;00m:
[0;32m-> 2027[0m     [38;5;28mprint[39m([38;5;28mself[39m[38;5;241m.[39mschema[38;5;241m.[39mtreeString())

File [0;32m/databricks/python/lib/python3.11/site-packages/pyspark/sql/connect/dataframe.py:2000[0m, in [0;36mDataFrame.schema[0;34m(self)[0m
[1;32m   1998[0m [38;5;28;01mif[39;00m [38;5;28mself[39m[38;5;241m.[39m_cached_schema [38;5;129;01mis[39;00m [38;5;28;0

In [0]:
# Reshape earthquake data
df = (
    df
    .select(
        col('geometry.coordinates').getItem(0).alias('longitude'),
        col('geometry.coordinates').getItem(1).alias('latitude'),
        col('geometry.coordinates').getItem(2).alias('elevation'),
        col('properties.title').alias('title'),
        col('properties.place').alias('place_description'),
        col('properties.sig').alias('sig'),
        col('properties.mag').alias('mag'),
        col('properties.magType').alias('magType'),
        col('properties.time').alias('time'),
        col('properties.updated').alias('updated'),
    )
)



In [0]:
df.show()



In [0]:
from pyspark.sql.types import DoubleType, TimestampType
from pyspark.sql.functions import col, from_unixtime, round, current_timestamp

df = (
    df
    .withColumn("longitude", round(col("longitude").cast(DoubleType()), 3))
    .withColumn("latitude", round(col("latitude").cast(DoubleType()), 3))
    .withColumn("elevation", round(col("elevation").cast(DoubleType()), 3))
    .withColumn("sig", col("sig").cast("int"))
    .withColumn("mag", round(col("mag").cast(DoubleType()), 3))
    # Convert epoch (ms) -> seconds -> timestamp
    .withColumn("time", from_unixtime((col("time").cast("bigint")/1000)).cast(TimestampType()))
    .withColumn("updated", from_unixtime((col("updated").cast("bigint")/1000)).cast(TimestampType()))
)




In [0]:
df.show()



In [0]:
# remove null with "0" leaving col(time) and col(updated)

from pyspark.sql.functions import col, when

df = (
    df
    .withColumn("longitude", when(col("longitude").isNull(), 0.0).otherwise(col("longitude")))
    .withColumn("latitude", when(col("latitude").isNull(), 0.0).otherwise(col("latitude")))
    .withColumn("elevation", when(col("elevation").isNull(), 0.0).otherwise(col("elevation")))
    .withColumn("sig", when(col("sig").isNull(), 0).otherwise(col("sig")))
    .withColumn("mag", when(col("mag").isNull(), 0.0).otherwise(col("mag")))
)
df.show()



In [0]:
# 5. Deduplicate (keep latest per id)
df = df.dropDuplicates(["time", "latitude", "longitude"])
df.show()



In [0]:
# Save the transformed DataFrame to the Silver container

silver_output_path = f"{silver_adls}earthquake_events_silver/"



In [0]:
# Append DataFrame to Silver container in Parquet format
df.write.mode("Append").parquet(silver_output_path)



In [0]:
dbutils.jobs.taskValues.set(key = "silver_output", value = silver_output_path)

