In [0]:
import json

# Retrieve the bronze_params directly as a widget
bronze_params = dbutils.widgets.get("bronze_params")
print(f"Raw bronze_params: {bronze_params}")

# Parse the JSON string
output_data = json.loads(bronze_params)

# Access individual variables
start_date = output_data.get("start_date", "")
end_date = output_data.get("end_date", "")
bronze_adls = output_data.get("bronze_adls", "")
silver_adls = output_data.get("silver_adls", "")
gold_adls = output_data.get("gold_adls", "")

print(f"Start Date: {start_date}, Bronze ADLS: {bronze_adls}")

com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$5(SequenceExecutionState.scala:132)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:132)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:129)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:129)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:715)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:435)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:435)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.can

In [0]:
from pyspark.sql.functions import col, isnull, when

In [0]:
from pyspark.sql.types import TimestampType
from datetime import date, timedelta

In [0]:
# Load the JSON data into a Spark DataFrame
df = spark.read.option("multiline", "true").json(f"{bronze_adls}{start_date}_earthquake_data.json")

In [0]:
df.head()

Row(geometry=Row(coordinates=[-66.941, 17.9355, 10.49], type='Point'), id='pr71496253', properties=Row(alert=None, cdi=None, code='71496253', detail='https://earthquake.usgs.gov/fdsnws/event/1/query?eventid=pr71496253&format=geojson', dmin=0.07072, felt=None, gap=233.0, ids=',pr71496253,', mag=2.4, magType='md', mmi=None, net='pr', nst=5, place='5 km SW of Guánica, Puerto Rico', rms=0.02, sig=89, sources=',pr,', status='reviewed', time=1758498952810, title='M 2.4 - 5 km SW of Guánica, Puerto Rico', tsunami=0, type='earthquake', types=',origin,phase-data,', tz=None, updated=1758499520740, url='https://earthquake.usgs.gov/earthquakes/eventpage/pr71496253'), type='Feature')

In [0]:
# Reshape earthquake data
df = (
    df
    .select(
        'id',
        col('geometry.coordinates').getItem(0).alias('longitude'),
        col('geometry.coordinates').getItem(1).alias('latitude'),
        col('geometry.coordinates').getItem(2).alias('elevation'),
        col('properties.title').alias('title'),
        col('properties.place').alias('place_description'),
        col('properties.sig').alias('sig'),
        col('properties.mag').alias('mag'),
        col('properties.magType').alias('magType'),
        col('properties.time').alias('time'),
        col('properties.updated').alias('updated')
    )
)

In [0]:
df.head()

Row(id='pr71496253', longitude=-66.941, latitude=17.9355, elevation=10.49, title='M 2.4 - 5 km SW of Guánica, Puerto Rico', place_description='5 km SW of Guánica, Puerto Rico', sig=89, mag=2.4, magType='md', time=1758498952810, updated=1758499520740)

In [0]:
df

DataFrame[id: string, longitude: double, latitude: double, elevation: double, title: string, place_description: string, sig: bigint, mag: double, magType: string, time: bigint, updated: bigint]

In [0]:
# Validate data: Check for missing or null values
df = (
    df
    .withColumn('longitude', when(isnull(col('longitude')), 0).otherwise(col('longitude')))
    .withColumn('latitude', when(isnull(col('latitude')), 0).otherwise(col('latitude')))
    .withColumn('time', when(isnull(col('time')), 0).otherwise(col('time')))
)

In [0]:
# If 'time' and 'updated' are already timestamps, this cell can be skipped. Otherwise, use the following code:
from pyspark.sql.types import LongType, TimestampType

# Only convert if columns are not already timestamps
if dict(df.dtypes)['time'] in ['bigint', 'long', 'int', 'double']:
    df = df.withColumn('time', (col('time') / 1000).cast(TimestampType()))

if dict(df.dtypes)['updated'] in ['bigint', 'long', 'int', 'double']:
    df = df.withColumn('updated', (col('updated') / 1000).cast(TimestampType()))
# Otherwise, do nothing

In [0]:
df.head()

Row(id='pr71496253', longitude=-66.941, latitude=17.9355, elevation=10.49, title='M 2.4 - 5 km SW of Guánica, Puerto Rico', place_description='5 km SW of Guánica, Puerto Rico', sig=89, mag=2.4, magType='md', time=datetime.datetime(2025, 9, 21, 23, 55, 52, 810000), updated=datetime.datetime(2025, 9, 22, 0, 5, 20, 740000))

In [0]:
# Save the transformed DataFrame to the Silver container
silver_output_path = f"{silver_adls}earthquake_events_silver/"

In [0]:
# Append DataFrame to Silver container in Parquet format
df.write.mode('append').parquet(silver_output_path)

In [0]:
dbutils.notebook.exit(silver_output_path)