# Camada Gold
Essa etapa contém dados agregados e otimizados para consultas e consumo por ferramentas de BI.

## Objetivos:
- Efetuei a leitura dos dados da tabela Delta da Camada Silver.

- Criei tabelas agregadas por período (ano/mês), distrito e batalhão.

- As tabelas Gold criadas estão no formato Delta e são gerenciadas pelo Unity Catalog.

## Decisões Técnicas:
- Novamente tentei me ater ao paralelismo, por isso as operações como _GroupBy_ (agrupamento) e _Agg_ (agregação) são executadas de forma paralela pelo Spark.

-  As agregações são calculadas e armazenadas previamente em tabelas separadas.

- Verifico se as tabelas agregadas existem antes de criá-las, se sim, mesclo os novos dados com *MERGE INTO*.


In [0]:
# Importando as bibliotecas necessárias para agregações
from pyspark.sql.functions import col, count, sum, year, month, dayofmonth, to_date, lit, current_timestamp
from pyspark.sql.types import LongType, DoubleType, StringType, DateType

# Tabela Silver como fonte
silver_table_name_full = "`workspace`.`default`.`fire_incidents_silver`"
silver_table_name_simple = "fire_incidents_silver"

# Tabelas Gold como destino
gold_agg_by_time_full = "`workspace`.`default`.`fire_incidents_gold_by_time`"
gold_agg_by_time_simple = "fire_incidents_gold_by_time"

gold_agg_by_district_full = "`workspace`.`default`.`fire_incidents_gold_by_district`"
gold_agg_by_district_simple = "fire_incidents_gold_by_district"

gold_agg_by_battalion_full = "`workspace`.`default`.`fire_incidents_gold_by_battalion`"
gold_agg_by_battalion_simple = "fire_incidents_gold_by_battalion"

print(f"Fonte de dados (Camada Silver): {silver_table_name_full}")
print(f"Destino de dados (Gold - Por Tempo): {gold_agg_by_time_full}")
print(f"Destino de dados (Gold - Por Distrito): {gold_agg_by_district_full}")
print(f"Destino de dados (Gold - Por Batalhão): {gold_agg_by_battalion_full}")

In [0]:
# Lendo os dados da camada Silver
print(f"\nLendo dados da Camada Silver ({silver_table_name_full})...")

silver_df = spark.table(silver_table_name_simple)

print("Schema da Camada Silver (esperado com tipos corretos e nulos tratados):")
silver_df.printSchema()
print(f"Total de registros lidos da Camada Silver: {silver_df.count()}")

In [0]:
# Agregações para a Camada Gold 
# Função auxiliar para gerar as strings SQL para UPDATE e INSERT no MERGE INTO
def generate_merge_sql_strings(df_columns_list):

    quoted_cols_for_sql = [f"`{col_name}`" for col_name in df_columns_list]
    update_set_clauses = [f"target.{col} = source.{col}" for col in quoted_cols_for_sql]
    update_set_sql_string = ",\n    ".join(update_set_clauses)
    insert_columns_sql_string = ", ".join(quoted_cols_for_sql)
    insert_values_sql_string = ", ".join([f"source.{col}" for col in quoted_cols_for_sql])
    return update_set_sql_string, insert_columns_sql_string, insert_values_sql_string

In [0]:
# Agregação por Tempo 
print("\nCriando agregação Gold por Tempo (Ano/Mês)")

# Garante que 'Incident Date' é do tipo Date
silver_df_cleaned_dates = silver_df.withColumn("Incident Date", col("Incident Date").cast(DateType()))

# Calcula as agregações
gold_by_time_df_staged = silver_df_cleaned_dates.groupBy(year(col("Incident Date")).alias("incident_year"),
                                                          month(col("Incident Date")).alias("incident_month")) \
                                                  .agg(count("ID").alias("total_incidents"),
                                                       sum("Estimated Property Loss").alias("total_property_loss"),
                                                       sum("Estimated Contents Loss").alias("total_contents_loss")) \
                                                  .withColumn("gold_loaded_at", current_timestamp()) # Metadado de carga

gold_by_time_df_staged.printSchema()
print(f"Total de registros no staging Gold (Por Tempo): {gold_by_time_df_staged.count()}")

# Prepara os dados para MERGE INTO
gold_by_time_df_staged.createOrReplaceTempView("gold_by_time_staging")

# Gera as strings SQL para o MERGE INTO
update_time_sql, insert_columns_time_sql, insert_values_time_sql = generate_merge_sql_strings(gold_by_time_df_staged.columns)

#Cria a tabela Gold por Tempo se não existir
print(f"\nVerificando e garantindo que a tabela Gold '{gold_agg_by_time_full}' exista com o schema correto")
spark.sql(f"DROP TABLE IF EXISTS {gold_agg_by_time_full}") 
gold_by_time_df_staged.limit(0).write \
                      .format("delta") \
                      .mode("overwrite") \
                      .option("overwriteSchema", "true") \
                      .option("delta.columnMapping.mode", "name") \
                      .saveAsTable(gold_agg_by_time_simple)
print(f"Tabela Gold '{gold_agg_by_time_full}' criada/garantida com sucesso com o esquema correto.")

# Executa MERGE INTO para atualização incremental
print(f"\nIniciando operação MERGE INTO para a tabela Gold '{gold_agg_by_time_full}'...")

spark.sql(f"""
  MERGE INTO {gold_agg_by_time_full} AS target
  USING gold_by_time_staging AS source
  ON target.incident_year = source.incident_year AND target.incident_month = source.incident_month
  WHEN MATCHED THEN UPDATE SET
    {update_time_sql}
  WHEN NOT MATCHED THEN INSERT (
    {insert_columns_time_sql}
  )
  VALUES (
    {insert_values_time_sql}
  )
""")
print(f"MERGE INTO concluído para a tabela Gold '{gold_agg_by_time_full}'.")
print("A Camada Gold foi atualizada.")

In [0]:
# Agregação por Distrito
print("\nCriando agregação Gold por Distrito")

gold_by_district_df_staged = silver_df.groupBy(col("neighborhood_district").alias("district_name")) \
                                 .agg(count("ID").alias("total_incidents"),
                                      sum("Estimated Property Loss").alias("total_property_loss"),
                                      sum("Estimated Contents Loss").alias("total_contents_loss")) \
                                 .withColumn("gold_loaded_at", current_timestamp())

print("\nSchema da Tabela Gold")
gold_by_district_df_staged.printSchema()
print(f"Total de registros no staging Gold (Por Distrito): {gold_by_district_df_staged.count()}")

# Prepara dados para MERGE INTO
gold_by_district_df_staged.createOrReplaceTempView("gold_by_district_staging")

# Gera as strings SQL para o MERGE INTO
update_district_sql, insert_columns_district_sql, insert_values_district_sql = generate_merge_sql_strings(gold_by_district_df_staged.columns)

# Cria a tabela Gold por Distrito 
print(f"\nVerificando e garantindo que a tabela Gold '{gold_agg_by_district_full}' exista com o schema correto")
spark.sql(f"DROP TABLE IF EXISTS {gold_agg_by_district_full}") # Para depuração
gold_by_district_df_staged.limit(0).write \
                   .format("delta") \
                   .mode("overwrite") \
                   .option("overwriteSchema", "true") \
                   .option("delta.columnMapping.mode", "name") \
                   .saveAsTable(gold_agg_by_district_simple)
print(f"Tabela Gold '{gold_agg_by_district_full}' criada/garantida com sucesso com o esquema correto.")

# Executa MERGE INTO para atualização incremental
print(f"\nIniciando operação MERGE INTO para a tabela Gold '{gold_agg_by_district_full}'...")

spark.sql(f"""
  MERGE INTO {gold_agg_by_district_full} AS target
  USING gold_by_district_staging AS source
  ON target.district_name = source.district_name
  WHEN MATCHED THEN UPDATE SET
    {update_district_sql}
  WHEN NOT MATCHED THEN INSERT (
    {insert_columns_district_sql}
  )
  VALUES (
    {insert_values_district_sql}
  )
""")
print(f"MERGE INTO concluído para a tabela Gold '{gold_agg_by_district_full}'.")
print("A Camada Gold (Por Distrito) foi atualizada.")

In [0]:

# Agregação por Batalhão
print("\nCriando agregação Gold por Batalhão.")

gold_by_battalion_df_staged = silver_df.groupBy(col("Battalion").alias("battalion_id")) \
                                 .agg(count("ID").alias("total_incidents"),
                                      sum("Estimated Property Loss").alias("total_property_loss"),
                                      sum("Estimated Contents Loss").alias("total_contents_loss")) \
                                 .withColumn("gold_loaded_at", current_timestamp())

print("\nSchema da Tabela Gold (Por Batalhão - Staging):")
gold_by_battalion_df_staged.printSchema()
print(f"Total de registros por Batalhão: {gold_by_battalion_df_staged.count()}")

# Prepara os dados para MERGE INTO
gold_by_battalion_df_staged.createOrReplaceTempView("gold_by_battalion_staging")

# Gera as strings SQL para o MERGE INTO
update_battalion_sql, insert_columns_battalion_sql, insert_values_battalion_sql = generate_merge_sql_strings(gold_by_battalion_df_staged.columns)

# Cria a tabela Gold por Batalhão
print(f"\nVerificando e garantindo que a tabela Gold '{gold_agg_by_battalion_full}' exista com o schema correto")
spark.sql(f"DROP TABLE IF EXISTS {gold_agg_by_battalion_full}") # Para depuração
gold_by_battalion_df_staged.limit(0).write \
                    .format("delta") \
                    .mode("overwrite") \
                    .option("overwriteSchema", "true") \
                    .option("delta.columnMapping.mode", "name") \
                    .saveAsTable(gold_agg_by_battalion_simple)
print(f"Tabela Gold '{gold_agg_by_battalion_full}' criada/garantida com sucesso com o esquema correto.")

# MERGE INTO para atualização incremental
print(f"\nIniciando operação MERGE INTO para a tabela Gold '{gold_agg_by_battalion_full}'...")

spark.sql(f"""
  MERGE INTO {gold_agg_by_battalion_full} AS target
  USING gold_by_battalion_staging AS source
  ON target.battalion_id = source.battalion_id
  WHEN MATCHED THEN UPDATE SET
    {update_battalion_sql}
  WHEN NOT MATCHED THEN INSERT (
    {insert_columns_battalion_sql}
  )
  VALUES (
    {insert_values_battalion_sql}
  )
""")
print(f"MERGE INTO concluído para a tabela Gold '{gold_agg_by_battalion_full}'.")
print("A Camada Gold (Por Batalhão) foi atualizada.")

print("\n--- Agregações da Camada Gold concluídas.")

In [0]:
# Verificação Final das Tabelas Gold
print("\n--- Verificando a Tabela Gold: Por Tempo (Ano/Mês) ---")
final_gold_by_time_df = spark.table(gold_agg_by_time_simple)
display(final_gold_by_time_df.orderBy("incident_year", "incident_month").limit(10))

print("\n--- Verificando a Tabela Gold: Por Distrito ---")
final_gold_by_district_df = spark.table(gold_agg_by_district_simple)
display(final_gold_by_district_df.orderBy(col("total_incidents").desc()).limit(10))

print("\n--- Verificando a Tabela Gold: Por Batalhão ---")
final_gold_by_battalion_df = spark.table(gold_agg_by_battalion_simple)
display(final_gold_by_battalion_df.orderBy(col("total_incidents").desc()).limit(10))

print("As tabelas agregadas estão prontas para consumo.")