In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS ibge.silver

In [0]:
# Import das funções
from pyspark.sql.functions import col, trim, expr
from pyspark.sql.types import IntegerType

In [0]:
#Lendo DataFrame na Silver
agua_df = spark.table("ibge.bronze.agua")

In [0]:
# Remover header residual
agua_linha_df = (
    agua_df
    .filter(col("NC") != "Nível Territorial (Código)")
)

# Padronização + Safe Cast
agua_final_df = (
    agua_linha_df

    # Renomeações
    .withColumnRenamed("NC", "cod_nivel_territorial")
    .withColumnRenamed("NN", "nivel_territorial")
    .withColumnRenamed("MN", "unid_medida")
    .withColumnRenamed("V", "valor")
    .withColumnRenamed("D1C", "cod_municipio")
    .withColumnRenamed("D1N", "municipio")
    .withColumnRenamed("D2N", "variavel")
    .withColumnRenamed("D3N", "ano")
    .withColumnRenamed("D4N", "tipo_abastecimento_agua")

    # Trim antes do cast
    .withColumn("valor", trim(col("valor")))

    # Safe Cast usando try_cast
    .withColumn("valor", expr("try_cast(valor as double)"))

    # Cast das demais colunas
    .withColumn("cod_municipio", col("cod_municipio").cast(IntegerType()))
    .withColumn("ano", col("ano").cast(IntegerType()))
    .withColumn("cod_nivel_territorial", col("cod_nivel_territorial").cast(IntegerType()))

    # Trim nas colunas string
    .withColumn("municipio", trim(col("municipio")))
    .withColumn("variavel", trim(col("variavel")))
    .withColumn("nivel_territorial", trim(col("nivel_territorial")))
    .withColumn("unid_medida", trim(col("unid_medida")))
    .withColumn("tipo_abastecimento_agua", trim(col("tipo_abastecimento_agua")))
)


In [0]:
# Caminho gerenciado por tabela
tabela_silver = "ibge.silver.agua"

(
    agua_final_df
    .write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .saveAsTable(tabela_silver)
)