In [0]:
import json

# Retrieve the bronze_params directly as a widget
bronze_params = dbutils.widgets.get("bronze_params")
print(f"Raw bronze_params: {bronze_params}")

# Check if it's already a dictionary or a string
try:
    # If bronze_params is already a dictionary
    if isinstance(bronze_params, dict):
        output_data = bronze_params
    else:
        # Parse the JSON string
        output_data = json.loads(bronze_params)

    print(f"Parsed output_data: {output_data}")
except Exception as e:
    print(f"Error parsing bronze_params: {e}")
    raise ValueError("bronze_params is not a valid JSON or dictionary")

# Access individual variables
start_date = output_data.get("start_date", "")
end_date = output_data.get("end_date", "")
bronze_adls = output_data.get("bronze_adls", "")
silver_adls = output_data.get("silver_adls", "")
gold_adls = output_data.get("gold_adls", "")

print(f"Start Date: {start_date}, Bronze ADLS: {bronze_adls}")


Raw bronze_params: 
Error parsing bronze_params: Expecting value: line 1 column 1 (char 0)


[0;31m---------------------------------------------------------------------------[0m
[0;31mJSONDecodeError[0m                           Traceback (most recent call last)
File [0;32m<command-3054874638674434>, line 14[0m
[1;32m     12[0m [38;5;28;01melse[39;00m:
[1;32m     13[0m     [38;5;66;03m# Parse the JSON string[39;00m
[0;32m---> 14[0m     output_data [38;5;241m=[39m json[38;5;241m.[39mloads(bronze_params)
[1;32m     16[0m [38;5;28mprint[39m([38;5;124mf[39m[38;5;124m"[39m[38;5;124mParsed output_data: [39m[38;5;132;01m{[39;00moutput_data[38;5;132;01m}[39;00m[38;5;124m"[39m)

File [0;32m/usr/lib/python3.11/json/__init__.py:346[0m, in [0;36mloads[0;34m(s, cls, object_hook, parse_float, parse_int, parse_constant, object_pairs_hook, **kw)[0m
[1;32m    343[0m [38;5;28;01mif[39;00m ([38;5;28mcls[39m [38;5;129;01mis[39;00m [38;5;28;01mNone[39;00m [38;5;129;01mand[39;00m object_hook [38;5;129;01mis[39;00m [38;5;28;01mNone[39;00m [38

In [0]:
from pyspark.sql.functions import col, isnull, when
from pyspark.sql.types import TimestampType
from datetime import date, timedelta


# Load the JSON data into a Spark DataFrame
df = spark.read.option("multiline", "true").json(f"{bronze_adls}{start_date}_earthquake_data.json")

# Reshape earthquake data
df = (
    df
    .select(
        'id',
        col('geometry.coordinates').getItem(0).alias('longitude'),
        col('geometry.coordinates').getItem(1).alias('latitude'),
        col('geometry.coordinates').getItem(2).alias('elevation'),
        col('properties.title').alias('title'),
        col('properties.place').alias('place_description'),
        col('properties.sig').alias('sig'),
        col('properties.mag').alias('mag'),
        col('properties.magType').alias('magType'),
        col('properties.time').alias('time'),
        col('properties.updated').alias('updated')
    )
)

# Validate data: Check for missing or null values
df = (
    df
    .withColumn('longitude', when(isnull(col('longitude')), 0).otherwise(col('longitude')))
    .withColumn('latitude', when(isnull(col('latitude')), 0).otherwise(col('latitude')))
    .withColumn('time', when(isnull(col('time')), 0).otherwise(col('time')))
)

# Convert 'time' and 'updated' to timestamp
df = (
    df
    .withColumn('time', (col('time') / 1000).cast(TimestampType()))
    .withColumn('updated', (col('updated') / 1000).cast(TimestampType()))
)

# Save the transformed DataFrame to the Silver container
silver_output_path = f"{silver_adls}earthquake_events_silver/"

# Append DataFrame to Silver container in Parquet format
df.write.mode('append').parquet(silver_output_path)


In [0]:
dbutils.notebook.exit(silver_output_path)