In [0]:
# Required each time the cluster is restarted which should be only on the first notebook as they run in order

tiers = ["bronze", "silver", "gold"]
adls_paths = {tier: f"abfss://{tier}@databrickstorage17.dfs.core.windows.net/" for tier in tiers}

# Accessing paths
bronze_adls = adls_paths["bronze"]
silver_adls = adls_paths["silver"]
gold_adls = adls_paths["gold"] 

dbutils.fs.ls(bronze_adls)
dbutils.fs.ls(silver_adls)
dbutils.fs.ls(gold_adls)

[]

In [0]:
from datetime import date, timedelta
start_date = date.today() - timedelta(1)
end_date = date.today()

In [0]:
# Retrieve the task value from the previous task (bronze)
bronze_output = dbutils.jobs.taskValues.get(taskKey="Bronze", key="bronze_output")

# Access individual variables
start_date = bronze_output.get("start_date", "")
bronze_adls = bronze_output.get("bronze_adls", "")
silver_adls = bronze_output.get("silver_adls", "")

print(f"Start Date: {start_date}, Bronze ADLS: {bronze_adls}")

In [0]:
from pyspark.sql.functions import col, isnull, when
from pyspark.sql.types import TimestampType
from datetime import date, timedelta

In [0]:
# Load the JSON data into a Spark DataFrame
df = spark.read.option("multiline", "true").json(f"{bronze_adls}{start_date}_earthquake_data.json")

In [0]:
df.head()

Row(geometry=Row(coordinates=[-114.822333333333, 44.342, 7.08], type='Point'), id='mb90073713', properties=Row(alert=None, cdi=None, code='90073713', detail='https://earthquake.usgs.gov/fdsnws/event/1/query?eventid=mb90073713&format=geojson', dmin=0.2364, felt=None, gap=46.0, ids=',mb90073713,', mag=2.3, magType='ml', mmi=None, net='mb', nst=24, place='16 km NE of Stanley, Idaho', rms=0.21, sig=81, sources=',mb,', status='reviewed', time=1741389237870, title='M 2.3 - 16 km NE of Stanley, Idaho', tsunami=0, type='earthquake', types=',origin,phase-data,', tz=None, updated=1741392633820, url='https://earthquake.usgs.gov/earthquakes/eventpage/mb90073713'), type='Feature')

In [0]:
# Reshape earthquake data
df = (
    df
    .select(
        'id',
        col('geometry.coordinates').getItem(0).alias('longitude'),
        col('geometry.coordinates').getItem(1).alias('latitude'),
        col('geometry.coordinates').getItem(2).alias('elevation'),
        col('properties.title').alias('title'),
        col('properties.place').alias('place_description'),
        col('properties.sig').alias('sig'),
        col('properties.mag').alias('mag'),
        col('properties.magType').alias('magType'),
        col('properties.time').alias('time'),
        col('properties.updated').alias('updated')
    )
)

In [0]:
df.head()

Row(id='mb90073713', longitude=-114.822333333333, latitude=44.342, elevation=7.08, title='M 2.3 - 16 km NE of Stanley, Idaho', place_description='16 km NE of Stanley, Idaho', sig=81, mag=2.3, magType='ml', time=1741389237870, updated=1741392633820)

In [0]:
# Validate data: Check for missing or null values
df = (
    df
    .withColumn('longitude', when(isnull(col('longitude')), 0).otherwise(col('longitude')))
    .withColumn('latitude', when(isnull(col('latitude')), 0).otherwise(col('latitude')))
    .withColumn('time', when(isnull(col('time')), 0).otherwise(col('time')))
)

In [0]:
# Convert 'time' and 'updated' to timestamp from Unix time
df = (
    df
    .withColumn('time', (col('time') / 1000).cast(TimestampType()))
    .withColumn('updated', (col('updated') / 1000).cast(TimestampType()))
)

In [0]:
df.head()

Row(id='mb90073713', longitude=-114.822333333333, latitude=44.342, elevation=7.08, title='M 2.3 - 16 km NE of Stanley, Idaho', place_description='16 km NE of Stanley, Idaho', sig=81, mag=2.3, magType='ml', time=datetime.datetime(2025, 3, 7, 23, 13, 57, 870000), updated=datetime.datetime(2025, 3, 8, 0, 10, 33, 820000))

In [0]:
# Save the transformed DataFrame to the Silver container
silver_output_path = f"{silver_adls}earthquake_events_silver/"

In [0]:
# Append DataFrame to Silver container in Parquet format
df.write.mode('append').parquet(silver_output_path)

In [0]:
dbutils.jobs.taskValues.set(key = "silver_output", value = silver_output_path)