## Import libraries

In [0]:
from pyspark.sql.functions import col, isnull, when
from pyspark.sql.types import TimestampType
from datetime import date, timedelta

## Catch previous job information

In [0]:
# Catch the data from bronze layer
bronze_output = dbutils.jobs.taskValues.get(taskKey= "Bronze", key="bronze_output")

# Access individual variables
start_date = bronze_output.get("start_date", "")
bronze_adls = bronze_output.get("bronze_adls", "")
silver_adls = bronze_output.get("silver_adls", "")

print(f"Start Date: {start_date}, Bronze ADLS: {bronze_adls}")


## Transformations

In [0]:
# Load the JSON data into a Spark DataFrame
df = spark.read.option("multiline", "true").json(f"{bronze_adls}{start_date}_earthquake_data.json") # option: multiline: true to read JSON files with multiple lines

In [0]:
# Restructure the nested Spark DataFrame into a flat DataFrame
df = (
    df.select(
        "id",
        col('geometry.coordinates').getItem(0).alias('longitude'),
        col('geometry.coordinates').getItem(1).alias('latitude'),
        col('geometry.coordinates').getItem(2).alias('elevation'),
        col('properties.title').alias('title'),
        col('properties.place').alias('place_description'),
        col('properties.sig').alias('sig'),
        col('properties.mag').alias('mag'),
        col('properties.magType').alias('magnitude_type'),
        col('properties.time').alias('time'),
        col('properties.updated').alias('updated')
        )
)

In [0]:
# Validate data: Check for missing or null values, and replacing by 0
df = (
    df
    .withColumn("longitude", when(isnull(col("longitude")), 0).otherwise(col('longitude')))
    .withColumn("latitude", when(isnull(col("latitude")), 0).otherwise(col('latitude')))
    .withColumn("time", when(isnull(col("time")), 0).otherwise(col('time')))
)

In [0]:
# Convert 'time' and 'updated' to timestamp format from Unix time (miliseconds)
df = (
    df
    .withColumn('time', (col('time') / 1000).cast(TimestampType()))
    .withColumn('updated', (col('updated') / 1000).cast(TimestampType()))
)

## Store data on Azure

In [0]:
# Path to silver container on ADLS
silver_output_path = f"{silver_adls}earthquake_data_silver/"

In [0]:
# Write transformed data to silver layer on Azure
df.write.mode('append').parquet(silver_output_path)

## Next job information

In [0]:
# Pass the transformed data path to the next job
dbutils.jobs.taskValues.set(key="silver_output", value = silver_output_path)