In [0]:
from pyspark.sql.functions import (
    col, avg, min, max, count, sum, when, round, row_number
)
from pyspark.sql.window import Window
from delta.tables import DeltaTable

# 1. Citește datele din zona silver
df = spark.read.format("delta").load("/mnt/data/earthquakes/silver")

# 2. Fereastră pentru extragerea cutremurului cu magnitudinea maximă
window_spec = Window.partitionBy("date", "region_country").orderBy(col("mag").desc())

df_ranked = df.withColumn("rank", row_number().over(window_spec))
df_max = df_ranked.filter(col("rank") == 1).select(
    "date", "region_country", col("location_key_latlong").alias("max_location")
)

# 3. Agregări pe categorii
df_gold = df.groupBy("date", "region_country").agg(

    # Total și magnitudine
    count("*").alias("total_eq"),
    sum(when(col("mag") > 5, 1).otherwise(0)).alias("strong_eq"),
    round(min("mag"), 2).alias("min_mag"),
    round(max("mag"), 2).alias("max_mag"),
    round(avg("mag"), 2).alias("avg_mag"),

    # Adâncime
    round(min("depth_km"), 2).alias("min_depth_km"),
    round(max("depth_km"), 2).alias("max_depth_km"),
    round(avg("depth_km"), 2).alias("avg_depth_km"),

    # Zi / Noapte
    round(sum(when(col("is_night") == 0, 1).otherwise(0)) / count("*"), 2).alias("day_ratio"),
    round(sum(when(col("is_night") == 1, 1).otherwise(0)) / count("*"), 2).alias("night_ratio"),

    # Distribuție pe adâncime
    round(sum(when(col("depth_category_km") == "shallow", 1).otherwise(0)) / count("*"), 2).alias("shallow_pct"),
    round(sum(when(col("depth_category_km") == "intermediate", 1).otherwise(0)) / count("*"), 2).alias("intermediate_pct"),
    round(sum(when(col("depth_category_km") == "deep", 1).otherwise(0)) / count("*"), 2).alias("deep_pct"),

    # Impact: simțite și tsunami
    round(sum(when(col("has_felt") == 1, 1).otherwise(0)) / count("*"), 2).alias("felt_pct"),
    round(sum(when(col("has_tsunami") == 1, 1).otherwise(0)) / count("*"), 2).alias("tsunami_pct")
)

# 4. Alătură coordonatele cutremurului cu magnitudinea maximă
df_gold_final = df_gold.join(df_max, on=["date", "region_country"], how="left")

# 5. Scriere în zona GOLD (snapshot, cu MERGE)
gold_path = "/mnt/data/earthquakes/gold"

try:
    delta_gold = DeltaTable.forPath(spark, gold_path)

    delta_gold.alias("target").merge(
        df_gold_final.alias("source"),
        "target.date = source.date AND target.region_country = source.region_country"
    ).whenMatchedUpdateAll() \
     .whenNotMatchedInsertAll() \
     .execute()

except:
    df_gold_final.write.format("delta") \
        .partitionBy("date") \
        .mode("overwrite") \
        .save(gold_path)

# 6. Salvare și în zona istorică (append-only)
df_gold_final.write.format("delta") \
    .mode("append") \
    .partitionBy("date") \
    .save("/mnt/data/earthquakes/gold_history")


[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-7365336432482748>, line 67[0m
[1;32m     56[0m     delta_gold[38;5;241m.[39malias([38;5;124m"[39m[38;5;124mtarget[39m[38;5;124m"[39m)[38;5;241m.[39mmerge(
[1;32m     57[0m         df_gold_final[38;5;241m.[39malias([38;5;124m"[39m[38;5;124msource[39m[38;5;124m"[39m),
[1;32m     58[0m         [38;5;124m"[39m[38;5;124mtarget.date = source.date AND target.region_country = source.region_country[39m[38;5;124m"[39m
[1;32m     59[0m     )[38;5;241m.[39mwhenMatchedUpdateAll() \
[1;32m     60[0m      [38;5;241m.[39mwhenNotMatchedInsertAll() \
[1;32m     61[0m      [38;5;241m.[39mexecute()
[1;32m     63[0m [38;5;28;01mexcept[39;00m:
[1;32m     64[0m     df_gold_final[38;5;241m.[39mwrite[38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mdelta[39m[38;5;1

In [0]:
# Afișare GOLD (snapshot)
df_snapshot = spark.read.format("delta").load("/mnt/data/earthquakes/gold")
display(df_snapshot)

# Afișare GOLD istoric (append-only)
df_history = spark.read.format("delta").load("/mnt/data/earthquakes/gold_history")
display(df_history)


[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-7365336432482748>, line 67[0m
[1;32m     56[0m     delta_gold[38;5;241m.[39malias([38;5;124m"[39m[38;5;124mtarget[39m[38;5;124m"[39m)[38;5;241m.[39mmerge(
[1;32m     57[0m         df_gold_final[38;5;241m.[39malias([38;5;124m"[39m[38;5;124msource[39m[38;5;124m"[39m),
[1;32m     58[0m         [38;5;124m"[39m[38;5;124mtarget.date = source.date AND target.region_country = source.region_country[39m[38;5;124m"[39m
[1;32m     59[0m     )[38;5;241m.[39mwhenMatchedUpdateAll() \
[1;32m     60[0m      [38;5;241m.[39mwhenNotMatchedInsertAll() \
[1;32m     61[0m      [38;5;241m.[39mexecute()
[1;32m     63[0m [38;5;28;01mexcept[39;00m:
[1;32m     64[0m     df_gold_final[38;5;241m.[39mwrite[38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mdelta[39m[38;5;1