In [0]:
from pyspark.sql import functions as F
from datetime import date, datetime, timedelta

In [0]:
#config
MODE = "FULL_INIT"   # "FULL_INIT" or "INCREMENTAL"
SAFETY_DAYS = 7        # for INCREMENTAL: also process last N days even if not detected
CHUNK_BY_MONTH = True  # for FULL_INIT: process month-by-month
VERBOSE = True

CATALOG = "climate_risk"
BRONZE_SCHEMA = "bronze"
SILVER_SCHEMA = "silver"

SILVER_TABLE   = f"{CATALOG}.{SILVER_SCHEMA}.usgs_earthquakes"
STATE_TABLE    = f"{CATALOG}.{SILVER_SCHEMA}.pipeline_state"


In [0]:
wm = spark.sql("""
    select last_processed_event_date as d
    from `climate-risk`.silver.pipeline_state
    where pipeline_name='usgs_silver'               
""").collect()[0]["d"]
print(wm)

In [0]:
start_date = wm- timedelta(days=SAFETY_DAYS)
end_date = date.today()

In [0]:
def list_latest_response_paths(start_date, end_date):
    bronze_base_path = '/Volumes/climate-risk/bronze/usgs_raw'
    paths = []
    d = start_date
    while d <= end_date:
        ed_path = f"{bronze_base_path}/event_date={d.isoformat()}"
        try:
            runs = [r for r in dbutils.fs.ls(ed_path) if r.name.startswith("run_id=")]
            if runs:
                latest = max(runs, key=lambda r: r.modificationTime)
                paths.append(f"{latest.path}/response.json")
        except Exception:
            pass  # day not ingested yet
        d += timedelta(days=1)
    return paths


In [0]:
# bronze_base_path = '/Volumes/climate-risk/bronze/usgs_raw'
# event_dirs = [f.path for f in dbutils.fs.ls(bronze_base_path) if f.name.startswith("event_date=")]
# latest_response_paths = []
# for ed in event_dirs:
#     runs = [r for r in dbutils.fs.ls(ed) if r.name.startswith("run_id=")]
#     latest_run = max(runs, key=lambda r: r.modificationTime)
#     latest_response_paths.append(f'{latest_run.path}/response.json')

In [0]:
latest_response_paths = list_latest_response_paths(start_date,end_date)

In [0]:
bronze_df = (
    spark.read.option("multiLine","true")
    .json(latest_response_paths)
    .withColumn("_file",F.col("_metadata.file_path"))
)

display(bronze_df.limit(5))

In [0]:
features_df = bronze_df.select(
    F.explode("features").alias("F")
)

In [0]:
display(features_df.limit(5))

In [0]:
silver_staged = (
    features_df.select(
        F.col("F.id").alias("event_id"),
        (F.col("F.properties.time")/1000).cast("timestamp").alias("event_ts"),
        (F.col("F.properties.updated")/1000).cast("timestamp").alias("updated_ts"),
        F.col("F.properties.status").alias("status"),
        F.col("F.properties.mag").cast("double").alias("magnitude"),
        F.col("F.properties.magType").alias("mag_type"),
        F.col("F.properties.felt").cast("int").alias("felt"),
        F.col("F.properties.cdi").cast("double").alias("cdi"),
        F.col("F.properties.mmi").cast("double").alias("mmi"),
        F.col("F.properties.tsunami").cast("int").alias("tsunami"),
        F.col("F.geometry.coordinates").getItem(1).cast("double").alias("latitude"),
        F.col("F.geometry.coordinates").getItem(0).cast("double").alias("longitude"),
        F.col("F.properties.place").alias("place"),
        F.col("F.properties.dmin").cast("double").alias("dmin"),
        F.col("F.properties.rms").cast("double").alias("rms"),
        F.col("F.properties.gap").cast("int").alias("gap"),
        F.col("F.properties.nst").cast("int").alias("nst"),
        F.col("F.properties.sig").cast("double").alias("sig"),
    )
    .withColumn("event_date", F.to_date("event_ts"))
)

In [0]:

silver_staged.createOrReplaceTempView("silver_staged")

In [0]:
silver_dedup = spark.sql("""
    with cte as (
        select *, row_number() over(partition by event_id order by updated_ts desc nulls last) as rn
        from silver_staged
    )                     
    select *
    from cte
    where rn=1    
""")
silver_dedup = silver_dedup.drop("rn")
silver_dedup.createOrReplaceTempView("silver_dedup")

In [0]:
# merging this silver_dedup into silver table
spark.sql("""
    merge into `climate-risk`.silver.usgs_earthquakes as tgt
    using silver_dedup as src
    on tgt.event_id = src.event_id
    when matched and (src.updated_ts > tgt.updated_ts or tgt.updated_ts is null) then update set *
    when not matched then insert *
""")

In [0]:
spark.sql("""
    update `climate-risk`.silver.pipeline_state
    set last_processed_event_date = (select max(event_date) from silver_dedup)
    ,last_run_ts = current_timestamp()
    where pipeline_name = 'usgs_silver'  
""")