# Silver Layer for Data Transformations

In [0]:
s3_path = "s3://external-storage-db/"
dbutils.fs.ls(s3_path)

In [0]:
from pyspark.sql.functions import col, isnull, when
from pyspark.sql.types import TimestampType
from datetime import date, timedelta

In [0]:
start_date = date.today() - timedelta(days=1)
end_date = date.today()

In [0]:
data = spark.read.option("multiline", "true").json(f"{s3_path}{start_date}_earthquake_bronze.json")

In [0]:
data = (
    data.select(
        'id',
        col('geometry.coordinates').getItem(0).alias('longitude'),
        col('geometry.coordinates').getItem(1).alias('latitude'),
        col('geometry.coordinates').getItem(2).alias('depth'),
        col('properties.title').alias('title'),
        col('properties.mag').alias('magnitude'),
        col('properties.time').alias('time'),
        col('properties.updated').alias('updated'),
        col('properties.place').alias('place'),
        col('properties.magType').alias('magType'),
        col('properties.sig').alias('sig')
    )
)

In [0]:
data = (
    data.withColumn('longitude', when(isnull(col('longitude')), 0).otherwise(col('longitude')))
    .withColumn('latitude', when(isnull(col('latitude')), 0).otherwise(col('latitude')))
    .withColumn('depth', when(isnull(col('depth')), 0).otherwise(col('depth')))
    .withColumn('magnitude', when(isnull(col('magnitude')), 0).otherwise(col('magnitude')))
    .withColumn('sig', when(isnull(col('sig')), 0).otherwise(col('sig')))
    .withColumn('time', when(isnull(col('time')), 0).otherwise(col('time')))
)

In [0]:
data = (
    data
    .withColumn('time', (col('time') / 1000).cast(TimestampType()))
    .withColumn('updated', (col('updated') / 1000).cast(TimestampType()))
)

In [0]:
data.head()

In [0]:
silver_output = f"{s3_path}{start_date}_earthquake_silver.json"
data.write.mode("append").parquet(silver_output)

In [0]:
dbutils.jobs.taskValues.set(key="silver_output", value=silver_output)