In [0]:
import json

# Retrieve the bronze_params directly as a widget
bronze_params = dbutils.widgets.get("bronze_params")
print(f"Raw bronze_params: {bronze_params}")

# Parse the JSON string
output_data = json.loads(bronze_params)

# Access individual variables
start_date = output_data.get("start_date", "")
end_date = output_data.get("end_date", "")
bronze_adls = output_data.get("bronze_adls", "")
silver_adls = output_data.get("silver_adls", "")
gold_adls = output_data.get("gold_adls", "")

print(f"Start Date: {start_date}, Bronze ADLS: {bronze_adls}")

In [0]:
from pyspark.sql.functions import col, isnull, when
from pyspark.sql.types import TimestampType
from datetime import date, timedelta

In [0]:
# Load the JSON data into a Spark DataFrame
df = spark.read.option("multiline", "true").json(f"{bronze_adls}{start_date}_earthquake_data.json")

In [0]:
df.printSchema()

root
 |-- geometry: struct (nullable = true)
 |    |-- coordinates: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- type: string (nullable = true)
 |-- id: string (nullable = true)
 |-- properties: struct (nullable = true)
 |    |-- alert: string (nullable = true)
 |    |-- cdi: double (nullable = true)
 |    |-- code: string (nullable = true)
 |    |-- detail: string (nullable = true)
 |    |-- dmin: double (nullable = true)
 |    |-- felt: long (nullable = true)
 |    |-- gap: long (nullable = true)
 |    |-- ids: string (nullable = true)
 |    |-- mag: double (nullable = true)
 |    |-- magType: string (nullable = true)
 |    |-- mmi: double (nullable = true)
 |    |-- net: string (nullable = true)
 |    |-- nst: long (nullable = true)
 |    |-- place: string (nullable = true)
 |    |-- rms: double (nullable = true)
 |    |-- sig: long (nullable = true)
 |    |-- sources: string (nullable = true)
 |    |-- status: string (nullable = true)
 |   

In [0]:
(df.count(), len(df.columns))

(202, 4)

In [0]:
df.head()

Row(geometry=Row(coordinates=[-104.349, 31.653, 4.4806], type='Point'), id='tx2025zbqbzw', properties=Row(alert=None, cdi=None, code='2025zbqbzw', detail='https://earthquake.usgs.gov/fdsnws/event/1/query?eventid=tx2025zbqbzw&format=geojson', dmin=0.1, felt=None, gap=97, ids=',tx2025zbqbzw,', mag=2.1, magType='ml', mmi=None, net='tx', nst=31, place='58 km S of Whites City, New Mexico', rms=0.3, sig=68, sources=',tx,', status='automatic', time=1766360318551, title='M 2.1 - 58 km S of Whites City, New Mexico', tsunami=0, type='earthquake', types=',origin,phase-data,', tz=None, updated=1766383619946, url='https://earthquake.usgs.gov/earthquakes/eventpage/tx2025zbqbzw'), type='Feature')

In [0]:
# Reshape earthquake data
df = (
    df
    .select(
        'id',
        col('geometry.coordinates').getItem(0).alias('longitude'),
        col('geometry.coordinates').getItem(1).alias('latitude'),
        col('geometry.coordinates').getItem(2).alias('elevation'),
        col('properties.title').alias('title'),
        col('properties.place').alias('place_description'),
        col('properties.sig').alias('sig'),
        col('properties.mag').alias('mag'),
        col('properties.magType').alias('magType'),
        col('properties.time').alias('time'),
        col('properties.updated').alias('updated')
    )
)

In [0]:
df

DataFrame[id: string, longitude: double, latitude: double, elevation: double, title: string, place_description: string, sig: bigint, mag: double, magType: string, time: bigint, updated: bigint]

In [0]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- elevation: double (nullable = true)
 |-- title: string (nullable = true)
 |-- place_description: string (nullable = true)
 |-- sig: long (nullable = true)
 |-- mag: double (nullable = true)
 |-- magType: string (nullable = true)
 |-- time: long (nullable = true)
 |-- updated: long (nullable = true)



In [0]:
df.head()

Row(id='tx2025zbqbzw', longitude=-104.349, latitude=31.653, elevation=4.4806, title='M 2.1 - 58 km S of Whites City, New Mexico', place_description='58 km S of Whites City, New Mexico', sig=68, mag=2.1, magType='ml', time=1766360318551, updated=1766383619946)

In [0]:
# Validate data: Check for missing or null values
df = (
    df
    .withColumn('longitude', when(isnull(col('longitude')), 0).otherwise(col('longitude')))
    .withColumn('latitude', when(isnull(col('latitude')), 0).otherwise(col('latitude')))
    .withColumn('time', when(isnull(col('time')), 0).otherwise(col('time')))
)

In [0]:
# Convert 'time' and 'updated' to timestamp from Unix time
df = (
    df
    .withColumn('time', (col('time') / 1000).cast(TimestampType()))
    .withColumn('updated', (col('updated') / 1000).cast(TimestampType()))
)

In [0]:
df.head()

Row(id='tx2025zbqbzw', longitude=-104.349, latitude=31.653, elevation=4.4806, title='M 2.1 - 58 km S of Whites City, New Mexico', place_description='58 km S of Whites City, New Mexico', sig=68, mag=2.1, magType='ml', time=datetime.datetime(2025, 12, 21, 23, 38, 38, 551000), updated=datetime.datetime(2025, 12, 22, 6, 6, 59, 946000))

In [0]:
# Save the transformed DataFrame to the Silver container
silver_output_path = f"{silver_adls}earthquake_events_silver/"

In [0]:
# Append DataFrame to Silver container in Parquet format
df.write.mode('append').parquet(silver_output_path)

In [0]:
dbutils.notebook.exit(silver_output_path)