# Pipeline Silver → Gold com Delta Lake

Este notebook demonstra um pipeline de transformação de dados da camada **Silver** para a camada **Gold** em um Data Lake, utilizando **Apache Spark** e **Delta Lake**.

Os comentários em cada célula de código descrevem, em terceira pessoa e de forma impessoal, o que é realizado em cada etapa do processo.

In [None]:
# Nesta etapa são importadas as funções necessárias do PySpark.
# São carregadas funções de agregação, manipulação de colunas e extração de componentes de data,
# que serão utilizadas na construção da camada Gold.

from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, year, month, to_date,
    count, sum, avg, max as fmax, min as fmin,
    array_contains,
    when
)

In [None]:
# Aqui é inicializada a SparkSession configurada para trabalhar com Delta Lake.
# Nessa configuração são definidos parâmetros relacionados ao catálogo Delta,
# ao número de partições de shuffle e à adaptação dinâmica de planos de execução.
# A partir deste ponto, o ambiente distribuído fica pronto para ler, transformar
# e escrever dados nas camadas Silver e Gold.

spark = (SparkSession.builder
         .appName("SilverToGold-Delta")
         .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
         .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
         .config("spark.sql.shuffle.partitions", "128")
         .config("spark.sql.adaptive.enabled", "true")
         .getOrCreate())

In [None]:
# Agora é definida a função 'get_paths', responsável por centralizar os caminhos
# de leitura da camada Silver e de escrita da camada Gold no Data Lake.
# Dessa forma, os endereços ABFSS permanecem em um único ponto de configuração,
# facilitando ajustes entre ambientes.

def get_paths():
    storage = "tccprojectdlstorage"
    # Caminho da tabela Silver em formato Delta.
    silver = f"abfss://silver@{storage}.dfs.core.windows.net/iot_events_delta"
    # Caminho da tabela Gold em formato Delta, onde serão armazenados os agregados diários.
    gold = f"abfss://gold@{storage}.dfs.core.windows.net/iot_daily_summary_delta"
    return silver, gold

In [None]:
# Em seguida é criada a função 'load_silver', responsável pela leitura dos dados
# da camada Silver diretamente no formato Delta. Após a leitura, são selecionadas
# apenas as colunas necessárias para a camada Gold e aplicada uma lógica de fallback
# para garantir a existência das colunas derivadas de data.

def load_silver(path):
    print(f"Lendo dados Silver (Delta Lake) de: {path}")
    df = spark.read.format("delta").load(path)
    
    # Lista de colunas relevantes para o resumo diário na camada Gold.
    cols_needed = [
        "event_ts",
        "event_date",
        "event_year",
        "event_month",
        "device_id",
        "device_type",
        "location_id",
        "meta_location_group",
        "meta_device_group",
        "status_code",
        "tags_array",
        "temperature",
        "pressure",
        "energy_consumption",
        "battery_level",
        "dq_temp_out_of_range",
        "dq_pressure_out_of_range",
        "dq_battery_out_of_range",
    ]
    
    # Aqui são filtradas apenas as colunas que existem de fato no DataFrame,
    # evitando falhas em cenários de evolução de schema da camada Silver.
    existing = [c for c in cols_needed if c in df.columns]
    df = df.select(*existing)
    
    # Caso alguma coluna de data não esteja presente, ela é reconstruída a partir do timestamp.
    if "event_date" not in df.columns:
        df = df.withColumn("event_date", to_date(col("event_ts")))
    if "event_year" not in df.columns:
        df = df.withColumn("event_year", year(col("event_date")))
    if "event_month" not in df.columns:
        df = df.withColumn("event_month", month(col("event_date")))
    return df

In [None]:
# Nesta etapa é definida a função 'transform_to_gold', que aplica as regras de negócio
# e consolida os eventos em um nível mais agregado para compor a camada Gold.
#
# Dentro dessa função são realizadas as seguintes ações principais:
# - Definição de uma métrica binária de criticidade por evento (is_critical);
# - Conversão das flags de qualidade de dados em inteiros (0/1) para permitir somatórios;
# - Agrupamento por data, dispositivo e localidade, com cálculo de estatísticas como
#   contagem de eventos, temperatura média/máxima/mínima, pressão média, consumo médio
#   de energia e nível médio de bateria;
# - Cálculo das contagens de violações de qualidade e da porcentagem de eventos críticos.

def transform_to_gold(df):
    print("Iniciando transformações (Gold)...")
    
    # Primeiro é verificado se a coluna 'tags_array' está disponível para ser usada na
    # definição de criticidade dos eventos.
    has_tags_array = "tags_array" in df.columns
    crit_expr = col("status_code") != "OK"
    if has_tags_array:
        crit_expr = crit_expr | array_contains(col("tags_array"), "critical")
    
    # Aqui é criada a coluna binária que indica se o evento é crítico.
    df = df.withColumn("is_critical", when(crit_expr, 1).otherwise(0))
    
    # Em seguida, as colunas de qualidade de dados são convertidas para inteiros,
    # permitindo o uso direto em agregações por soma.
    df = df.withColumn("dq_temp_flag", col("dq_temp_out_of_range").cast("int"))
    df = df.withColumn("dq_pressure_flag", col("dq_pressure_out_of_range").cast("int"))
    df = df.withColumn("dq_battery_flag", col("dq_battery_out_of_range").cast("int"))
    
    # Aqui é realizada a etapa de agregação em nível de data, dispositivo e localidade.
    grouped = (
        df.groupBy(
            "event_date",
            "event_year",
            "event_month",
            "device_id",
            "device_type",
            "location_id",
            "meta_location_group",
            "meta_device_group"
        )
        .agg(
            count("*").alias("events_total"),
            sum("is_critical").alias("events_critical"),
            avg("temperature").alias("avg_temperature"),
            fmax("temperature").alias("max_temperature"),
            fmin("temperature").alias("min_temperature"),
            avg("pressure").alias("avg_pressure"),
            avg("energy_consumption").alias("avg_energy_consumption"),
            avg("battery_level").alias("avg_battery_level"),
            sum("dq_temp_flag").alias("dq_temp_out_of_range_count"),
            sum("dq_pressure_flag").alias("dq_pressure_out_of_range_count"),
            sum("dq_battery_flag").alias("dq_battery_out_of_range_count")
        )
    )
    
    # Por fim, é calculada a porcentagem de eventos críticos dentro de cada grupo agregado.
    grouped = grouped.withColumn(
        "pct_critical_events",
        (col("events_critical") / col("events_total")).cast("double")
    )
    print("Transformações Gold concluídas.")
    return grouped

In [None]:
# Agora é definida a função 'write_gold', responsável por gravar o resultado agregado
# na camada Gold, utilizando o formato Delta. A escrita é realizada em modo 'overwrite'
# e particionada por ano e mês, o que favorece consultas analíticas filtradas por tempo.
# A opção 'overwriteSchema' permite que alterações estruturais sejam refletidas na tabela.

def write_gold(df, path):
    print(f"Iniciando escrita para a camada Gold (Delta Lake) em: {path}")
    (
        df.write
          .mode("overwrite")
          .partitionBy("event_year", "event_month")
          .format("delta")
          .option("overwriteSchema", "true")
          .save(path)
    )
    print("Escrita na camada Gold concluída.")

In [None]:
# Nesta célula é executado o pipeline Silver → Gold de ponta a ponta.
# São realizadas as seguintes etapas:
# 1. Recuperação dos caminhos das tabelas Silver e Gold;
# 2. Leitura da camada Silver em formato Delta;
# 3. Aplicação das transformações e agregações para gerar o resumo diário;
# 4. Escrita do resultado em uma tabela Delta particionada por ano e mês.

silver_path, gold_path = get_paths()

df_silver = load_silver(silver_path)
df_gold = transform_to_gold(df_silver)
write_gold(df_gold, gold_path)

print("Pipeline S->G (Delta) finalizado com sucesso.")