In [0]:
# Retrieve the task value from the previous task (bronze)
a={}
bronze_output = dbutils.jobs.taskValues.get(taskKey="Bronze", key="bronze_output",
                                            debugValue=a)

# Access individual variables
start_date = bronze_output.get("start_date", "")
bronze_adls = bronze_output.get("bronze_adls", "")
silver_adls = bronze_output.get("silver_adls", "")

print(f"Start Date: {start_date}, Bronze ADLS: {bronze_adls}")

Start Date: , Bronze ADLS: 


In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime

In [0]:

#file_path="abfss://bronze@earthquakedatafiles.dfs.core.windows.net/2025-07-19_earthquake_data.json"
#print(file_path)
# Load the JSON data into a Spark DataFrame
df = spark.read.option("multiline", "true").json(f"{bronze_adls}{start_date}_earthquake_data.json")


abfss://bronze@earthquakedatafiles.dfs.core.windows.net/2025-07-19_earthquake_data.json


In [0]:
###Loading the json data into the pyspark dataframe
#df = spark.read.option("multiline", "true").json(file_path)

In [0]:
df.printSchema()
df.show()
df.count()

root
 |-- geometry: struct (nullable = true)
 |    |-- coordinates: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- type: string (nullable = true)
 |-- id: string (nullable = true)
 |-- properties: struct (nullable = true)
 |    |-- alert: string (nullable = true)
 |    |-- cdi: double (nullable = true)
 |    |-- code: string (nullable = true)
 |    |-- detail: string (nullable = true)
 |    |-- dmin: double (nullable = true)
 |    |-- felt: long (nullable = true)
 |    |-- gap: double (nullable = true)
 |    |-- ids: string (nullable = true)
 |    |-- mag: double (nullable = true)
 |    |-- magType: string (nullable = true)
 |    |-- mmi: double (nullable = true)
 |    |-- net: string (nullable = true)
 |    |-- nst: long (nullable = true)
 |    |-- place: string (nullable = true)
 |    |-- rms: double (nullable = true)
 |    |-- sig: long (nullable = true)
 |    |-- sources: string (nullable = true)
 |    |-- status: string (nullable = true)
 | 

268

In [0]:
df.head()

Row(geometry=Row(coordinates=[-117.8531667, 36.1331667, 6.55], type='Point'), id='ci41026639', properties=Row(alert=None, cdi=None, code='41026639', detail='https://earthquake.usgs.gov/fdsnws/event/1/query?eventid=ci41026639&format=geojson', dmin=0.01561, felt=None, gap=56.0, ids=',ci41026639,', mag=0.91, magType='ml', mmi=None, net='ci', nst=21, place='13 km NE of Coso Junction, CA', rms=0.14, sig=13, sources=',ci,', status='automatic', time=1752969268010, title='M 0.9 - 13 km NE of Coso Junction, CA', tsunami=0, type='earthquake', types=',nearby-cities,origin,phase-data,scitech-link,', tz=None, updated=1752969476770, url='https://earthquake.usgs.gov/earthquakes/eventpage/ci41026639'), type='Feature')

Data Transformation

In [0]:
# Reshaping the dataframe
df = (
    df
    .select(
        'id',
        col('geometry.coordinates').getItem(0).alias('longitude'),
        col('geometry.coordinates').getItem(1).alias('latitude'),
        col('geometry.coordinates').getItem(2).alias('elevation'),
        col('properties.title').alias('title'),
        col('properties.place').alias('place_description'),
        col('properties.sig').alias('sig'),
        col('properties.mag').alias('mag'),
        col('properties.magType').alias('magType'),
        col('properties.time').alias('time'),
        col('properties.updated').alias('updated')
    )
)

In [0]:
df.head()

Row(id='ci41026639', longitude=-117.8531667, latitude=36.1331667, elevation=6.55, title='M 0.9 - 13 km NE of Coso Junction, CA', place_description='13 km NE of Coso Junction, CA', sig=13, mag=0.91, magType='ml', time=1752969268010, updated=1752969476770)

In [0]:
df.show(3)

+------------+------------+----------+---------+--------------------+--------------------+---+----+-------+-------------+-------------+
|          id|   longitude|  latitude|elevation|               title|   place_description|sig| mag|magType|         time|      updated|
+------------+------------+----------+---------+--------------------+--------------------+---+----+-------+-------------+-------------+
|  ci41026639|-117.8531667|36.1331667|     6.55|M 0.9 - 13 km NE ...|13 km NE of Coso ...| 13|0.91|     ml|1752969268010|1752969476770|
|ak025975facy|   -153.4387|   59.8083|    121.5|M 1.6 - 37 km E o...|37 km E of Pedro ...| 39| 1.6|     ml|1752968516020|1752968662697|
|tx2025odcpdo|    -102.171|    32.356|    4.054|M 1.5 - 34 km E o...|34 km E of McKinn...| 35| 1.5|     ml|1752968430560|1752969639272|
+------------+------------+----------+---------+--------------------+--------------------+---+----+-------+-------------+-------------+
only showing top 3 rows


In [0]:
##Handle Missing values
df = (
    df
    .withColumn('longitude', when(isnull(col('longitude')), 0).otherwise(col('longitude')))
    .withColumn('latitude', when(isnull(col('latitude')), 0).otherwise(col('latitude')))
    .withColumn('time', when(isnull(col('time')), 0).otherwise(col('time')))
)

In [0]:
df.show(3)

+------------+------------+----------+---------+--------------------+--------------------+---+----+-------+--------------------+--------------------+
|          id|   longitude|  latitude|elevation|               title|   place_description|sig| mag|magType|                time|             updated|
+------------+------------+----------+---------+--------------------+--------------------+---+----+-------+--------------------+--------------------+
|  ci41026639|-117.8531667|36.1331667|     6.55|M 0.9 - 13 km NE ...|13 km NE of Coso ...| 13|0.91|     ml|2025-07-19 23:54:...|2025-07-19 23:57:...|
|ak025975facy|   -153.4387|   59.8083|    121.5|M 1.6 - 37 km E o...|37 km E of Pedro ...| 39| 1.6|     ml|2025-07-19 23:41:...|2025-07-19 23:44:...|
|tx2025odcpdo|    -102.171|    32.356|    4.054|M 1.5 - 34 km E o...|34 km E of McKinn...| 35| 1.5|     ml|2025-07-19 23:40:...|2025-07-20 00:00:...|
+------------+------------+----------+---------+--------------------+--------------------+---+----+-

In [0]:
###Here time column shows the number of seconds passed after 1970 (Unix time).So changing the format to make it more readable.
#
df = (
    df
    .withColumn('time', (col('time') / 1000).cast(TimestampType()))
    .withColumn('updated', (col('updated') / 1000).cast(TimestampType()))
)


In [0]:
df.show(3)

+------------+------------+----------+---------+--------------------+--------------------+---+----+-------+--------------------+--------------------+
|          id|   longitude|  latitude|elevation|               title|   place_description|sig| mag|magType|                time|             updated|
+------------+------------+----------+---------+--------------------+--------------------+---+----+-------+--------------------+--------------------+
|  ci41026639|-117.8531667|36.1331667|     6.55|M 0.9 - 13 km NE ...|13 km NE of Coso ...| 13|0.91|     ml|2025-07-19 23:54:...|2025-07-19 23:57:...|
|ak025975facy|   -153.4387|   59.8083|    121.5|M 1.6 - 37 km E o...|37 km E of Pedro ...| 39| 1.6|     ml|2025-07-19 23:41:...|2025-07-19 23:44:...|
|tx2025odcpdo|    -102.171|    32.356|    4.054|M 1.5 - 34 km E o...|34 km E of McKinn...| 35| 1.5|     ml|2025-07-19 23:40:...|2025-07-20 00:00:...|
+------------+------------+----------+---------+--------------------+--------------------+---+----+-

In [0]:
# Save the transformed DataFrame to the Silver container
#silver_output_path = f"abfss://silver@earthquakedatafiles.dfs.core.windows.net/earthquake_events_silver/"
#print (silver_output_path)

silver_output_path = f"{silver_adls}earthquake_events_silver/"


abfss://silver@earthquakedatafiles.dfs.core.windows.net/earthquake_events_silver/


In [0]:

# Append DataFrame to Silver container in Parquet format
df.write.mode('append').parquet(silver_output_path)

In [0]:
dbutils.jobs.taskValues.set(key = "silver_output", value = silver_output_path)