In [0]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, count, when

# ==============================================================================
# Constants & Configuration
# ==============================================================================
# Mapeamento explícito: { "Nome Original Sujo": "nome_novo_limpo" }
COLUMN_MAPPING = {
    "Year": "year",
    "Continent": "continent",
    "Country": "country",
    "Avg_Temperature(°C)": "avg_temperature_celsius",
    "CO2_Emissions(Mt)": "co2_emissions_mt",
    "Sea_Level_Rise(mm)": "sea_level_rise_mm",
    "Climate_Risk_Index": "climate_risk_index"
}

TABLE_NAME_SILVER = "silver_climate_data"

# ==============================================================================
# Transformation Functions
# ==============================================================================

def rename_columns(df: DataFrame, mapping: dict) -> DataFrame:
    """
    Renames columns based on a dictionary mapping to ensure snake_case standardization.
    """
    for old_name, new_name in mapping.items():
        df = df.withColumnRenamed(old_name, new_name)
    return df

def check_data_quality(df: DataFrame) -> None:
    """
    Performs basic quality checks: Null counts and Duplicate checks.
    Prints the report to stdout.
    """
    print("--- Data Quality Report ---")
    
    # 1. Check for Nulls
    # Cria uma expressão dinâmica para contar nulos em todas as colunas
    null_counts = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns])
    print("\nNull Values per Column:")
    null_counts.show()
    
    # 2. Check for Duplicates
    total_count = df.count()
    distinct_count = df.distinct().count()
    duplicate_count = total_count - distinct_count
    
    print(f"Total Rows: {total_count}")
    print(f"Duplicate Rows: {duplicate_count}")
    
    if duplicate_count > 0:
        print("⚠️ Warning: Duplicates detected.")
    else:
        print("✅ Quality Check: No duplicates found.")

def save_as_delta_table(df: DataFrame, table_name: str) -> None:
    """
    Saves the DataFrame as a managed Delta Table in the Hive Metastore.
    Mode 'overwrite' ensures idempotency (can run multiple times without duplicating data).
    """
    df.write.format("delta").mode("overwrite").saveAsTable(table_name)
    print(f"\n✅ Success: Data saved to table '{table_name}'")

# ==============================================================================
# Main Execution
# ==============================================================================

# 1. Load Bronze Data (Já carregado no notebook anterior ou via referência direta)
# Para garantir que temos o DF, vamos reler rapidinho do volume (boas práticas de isolamento de notebook)
file_path = "/Volumes/workspace/default/mvp_engenharia/global_climate_change_2020_2025.csv"
df_bronze = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(file_path)

# 2. Transform (Rename)
df_silver = rename_columns(df_bronze, COLUMN_MAPPING)

# 3. Quality Assurance
check_data_quality(df_silver)

# 4. Load (Save to Silver)
save_as_delta_table(df_silver, TABLE_NAME_SILVER)

# 5. Final Verification
display(spark.sql(f"SELECT * FROM {TABLE_NAME_SILVER} LIMIT 5"))

In [0]:
# Gera estatísticas descritivas para preencher o Catálogo de Dados
display(df_silver.describe())