In [0]:
%sql
show catalogs;

catalog
earthquake_pip
hive_metastore
samples
system


In [0]:
from pyspark.sql import functions as F
from pyspark.sql import types as T

# glopal variables
storage_account = "ataaearthquakelake"
datalake_key = "xxxxxxx"
bucket_name = "bronze"
folder_path = "raw_data"
raw_data_location = f"abfss://{bucket_name}@{storage_account}.dfs.core.windows.net/{folder_path}/"

,
spark.conf.set(f"fs.azure.account.key.{storage_account}.dfs.core.windows.net", 
               datalake_key)


In [0]:
date_query = """select max(creation_date) as max_date  from delta.`abfss://gold@ataaearthquakelake.dfs.core.windows.net/earthqu_details_gold`"""
max_date = spark.sql(date_query).collect()[0][0]
max_date

datetime.date(2025, 2, 12)

In [0]:
df_bronze = spark.read.option("multiline", "true")\
                          .json(raw_data_location)

In [0]:
df_bronze.printSchema()

root
 |-- bbox: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- creation_date: string (nullable = true)
 |-- features: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- geometry: struct (nullable = true)
 |    |    |    |-- coordinates: array (nullable = true)
 |    |    |    |    |-- element: double (containsNull = true)
 |    |    |    |-- type: string (nullable = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- properties: struct (nullable = true)
 |    |    |    |-- alert: string (nullable = true)
 |    |    |    |-- cdi: double (nullable = true)
 |    |    |    |-- code: string (nullable = true)
 |    |    |    |-- detail: string (nullable = true)
 |    |    |    |-- dmin: double (nullable = true)
 |    |    |    |-- felt: long (nullable = true)
 |    |    |    |-- gap: double (nullable = true)
 |    |    |    |-- ids: string (nullable = true)
 |    |    |    |-- mag: double (nullable = true)
 |    |

In [0]:
df_exploded = df_bronze.select(F.col("features"), F.col("creation_date"))\
    .withColumn("features", F.explode("features"))

In [0]:
df_exploded.printSchema()

root
 |-- features: struct (nullable = true)
 |    |-- geometry: struct (nullable = true)
 |    |    |-- coordinates: array (nullable = true)
 |    |    |    |-- element: double (containsNull = true)
 |    |    |-- type: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- properties: struct (nullable = true)
 |    |    |-- alert: string (nullable = true)
 |    |    |-- cdi: double (nullable = true)
 |    |    |-- code: string (nullable = true)
 |    |    |-- detail: string (nullable = true)
 |    |    |-- dmin: double (nullable = true)
 |    |    |-- felt: long (nullable = true)
 |    |    |-- gap: double (nullable = true)
 |    |    |-- ids: string (nullable = true)
 |    |    |-- mag: double (nullable = true)
 |    |    |-- magType: string (nullable = true)
 |    |    |-- mmi: double (nullable = true)
 |    |    |-- net: string (nullable = true)
 |    |    |-- nst: long (nullable = true)
 |    |    |-- place: string (nullable = true)
 |    |    |-- rms: double (

In [0]:
df_flat_silver = df_exploded.select(
                             F.col("features.id").alias("id"),
                             F.col("features.geometry.coordinates").getItem(0).alias("Longitude"),
                             F.col("features.geometry.coordinates").getItem(1).alias("Latitude"),
                             F.col("features.geometry.coordinates").getItem(2).alias("Depth"), 
                             F.col("features.properties.title").alias("title"),
                             F.col("features.properties.place").alias("place_description"),
                             F.col("features.properties.sig").alias("sig"),
                             F.col("features.properties.mag").alias("mag"),
                             F.col("features.properties.magType").alias("magType"),
                             F.col("features.properties.time").alias("time"),
                             F.col("features.properties.updated").alias("updated"),
                             F.col("creation_date"))\
                            .withColumn("updated", (F.col("updated")/1000).cast(T.TimestampType()))\
                            .withColumn("time", (F.col("time")/1000).cast(T.TimestampType()))\
                            .withColumn("creation_date", (F.col("creation_date")).cast(T.DateType()))

In [0]:
df_flat_silver.printSchema()

root
 |-- id: string (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Depth: double (nullable = true)
 |-- title: string (nullable = true)
 |-- place_description: string (nullable = true)
 |-- sig: long (nullable = true)
 |-- mag: double (nullable = true)
 |-- magType: string (nullable = true)
 |-- time: timestamp (nullable = true)
 |-- updated: timestamp (nullable = true)
 |-- creation_date: date (nullable = true)



In [0]:
df_silver_new = df_flat_silver.filter(F.col("creation_date") > max_date)

In [0]:
df_silver_new.show()

+---+---------+--------+-----+-----+-----------------+---+---+-------+----+-------+-------------+
| id|Longitude|Latitude|Depth|title|place_description|sig|mag|magType|time|updated|creation_date|
+---+---------+--------+-----+-----+-----------------+---+---+-------+----+-------+-------------+
+---+---------+--------+-----+-----+-----------------+---+---+-------+----+-------+-------------+



In [0]:
if not df_silver_new.isEmpty():
    bucket_name = "silver"
    folder_path = "earthqu_details_silver"
    silver_data_location = f"abfss://{bucket_name}@{storage_account}.dfs.core.windows.net/{folder_path}/"
    df_silver_new.write.mode("overwrite").format("parquet").save(silver_data_location)

elif df_silver_new.isEmpty():
    print("no new data to be written to the silver layer")
    bucket_name = "silver"
    folder_path = "earthqu_details_silver"
    silver_data_location = f"abfss://{bucket_name}@{storage_account}.dfs.core.windows.net/{folder_path}/"
    df_silver_new.write.mode("overwrite").format("parquet").save(silver_data_location)

no new data to be written to the silver layer


In [0]:
df_silver_new.show()

+---+---------+--------+-----+-----+-----------------+---+---+-------+----+-------+-------------+
| id|Longitude|Latitude|Depth|title|place_description|sig|mag|magType|time|updated|creation_date|
+---+---------+--------+-----+-----+-----------------+---+---+-------+----+-------+-------------+
+---+---------+--------+-----+-----+-----------------+---+---+-------+----+-------+-------------+

