In [4]:
import json
from datetime import date, timedelta

from pyspark.sql.functions import col, isnull, when
from pyspark.sql.types import TimestampType

StatementMeta(synapsesApache, 7, 5, Finished, Available, Finished)

In [None]:
# Parse the JSON string
output_data = json.loads(bronze_output)

# Access individual variables
start_date = output_data.get("start_date")
silver_adls = output_data.get("silver_adls")
bronze_adls = output_data.get("bronze_adls")

# print(f"Start Date: {start_date}")
# print(f"Silver ADLS: {silver_adls}")
# print(f"Bronze ADLS: {bronze_adls}")

In [8]:
# Load the JSON data into a Spark DataFrame
df = spark.read.option("multiline", "true").json(f"{bronze_adls}{start_date}_earthquake_data.json")

StatementMeta(synapsesApache, 7, 9, Finished, Available, Finished)

In [9]:
df

StatementMeta(synapsesApache, 7, 10, Finished, Available, Finished)

DataFrame[geometry: struct<coordinates:array<double>,type:string>, id: string, properties: struct<cdi:double,code:string,detail:string,dmin:double,felt:bigint,gap:double,ids:string,mag:double,magType:string,net:string,nst:bigint,place:string,rms:double,sig:bigint,sources:string,status:string,time:bigint,title:string,tsunami:bigint,type:string,types:string,updated:bigint,url:string>, type: string]

In [10]:
df.head()

StatementMeta(synapsesApache, 7, 11, Finished, Available, Finished)

Row(geometry=Row(coordinates=[49.9597, 27.3505, 10.0], type='Point'), id='us6000q3ua', properties=Row(cdi=3.8, code='6000q3ua', detail='https://earthquake.usgs.gov/fdsnws/event/1/query?eventid=us6000q3ua&format=geojson', dmin=5.546, felt=6, gap=72.0, ids=',us6000q3ua,', mag=4.4, magType='mb', net='us', nst=47, place='49 km NE of Al Jubayl, Saudi Arabia', rms=0.69, sig=300, sources=',us,', status='reviewed', time=1743723576440, title='M 4.4 - 49 km NE of Al Jubayl, Saudi Arabia', tsunami=0, type='earthquake', types=',dyfi,origin,phase-data,', updated=1743771830285, url='https://earthquake.usgs.gov/earthquakes/eventpage/us6000q3ua'), type='Feature')

In [11]:
# Reshape earthquake data
df = (
    df
    .select(
        'id',
        col('geometry.coordinates').getItem(0).alias('longitude'),
        col('geometry.coordinates').getItem(1).alias('latitude'),
        col('geometry.coordinates').getItem(2).alias('elevation'),
        col('properties.title').alias('title'),
        col('properties.place').alias('place_description'),
        col('properties.sig').alias('sig'),
        col('properties.mag').alias('mag'),
        col('properties.magType').alias('magType'),
        col('properties.time').alias('time'),
        col('properties.updated').alias('updated')
    )
)

StatementMeta(synapsesApache, 7, 12, Finished, Available, Finished)

In [12]:
df

StatementMeta(synapsesApache, 7, 13, Finished, Available, Finished)

DataFrame[id: string, longitude: double, latitude: double, elevation: double, title: string, place_description: string, sig: bigint, mag: double, magType: string, time: bigint, updated: bigint]

In [13]:
df.head()

StatementMeta(synapsesApache, 7, 14, Finished, Available, Finished)

Row(id='us6000q3ua', longitude=49.9597, latitude=27.3505, elevation=10.0, title='M 4.4 - 49 km NE of Al Jubayl, Saudi Arabia', place_description='49 km NE of Al Jubayl, Saudi Arabia', sig=300, mag=4.4, magType='mb', time=1743723576440, updated=1743771830285)

In [14]:
# Validate data: Check for missing or null values
df = (
    df
    .withColumn('longitude', when(isnull(col('longitude')), 0).otherwise(col('longitude')))
    .withColumn('latitude', when(isnull(col('latitude')), 0).otherwise(col('latitude')))
    .withColumn('time', when(isnull(col('time')), 0).otherwise(col('time')))
)

StatementMeta(synapsesApache, 7, 15, Finished, Available, Finished)

In [15]:
# Convert 'time' and 'updated' to timestamp from Unix time
df = (
    df
    .withColumn('time', (col('time') / 1000).cast(TimestampType()))
    .withColumn('updated', (col('updated') / 1000).cast(TimestampType()))
)

StatementMeta(synapsesApache, 7, 16, Finished, Available, Finished)

In [16]:
df.head()

StatementMeta(synapsesApache, 7, 17, Finished, Available, Finished)

Row(id='us6000q3ua', longitude=49.9597, latitude=27.3505, elevation=10.0, title='M 4.4 - 49 km NE of Al Jubayl, Saudi Arabia', place_description='49 km NE of Al Jubayl, Saudi Arabia', sig=300, mag=4.4, magType='mb', time=datetime.datetime(2025, 4, 3, 23, 39, 36, 440000), updated=datetime.datetime(2025, 4, 4, 13, 3, 50, 285000))

In [17]:
# Save the transformed DataFrame to the Silver container
silver_data = f"{silver_adls}earthquake_events_silver/"

# Append DataFrame to Silver container in Parquet format
df.write.mode('append').parquet(silver_data)

StatementMeta(synapsesApache, 7, 18, Finished, Available, Finished)

In [19]:
# Use mssparkutils.notebook.exit() to pass the JSON output to the pipeline
mssparkutils.notebook.exit(silver_data)

StatementMeta(synapsesApache, 7, 20, Finished, Available, Finished)

ExitValue: abfss://silver@apistoragewem.dfs.core.windows.net/earthquake_events_silver/