In [0]:
from pyspark.sql import SparkSession, functions as F

In [0]:
class MaturacaoSSPSilver:
    """
    Classe para maturação incremental detalhada dos dados da camada Bronze para Silver.
    Realiza tratamentos específicos por coluna, incluindo conversão de tipos e padronizações.
    """

    def __init__(self, spark_session: SparkSession, tabela_bronze: str, tabela_silver: str):
        self.spark = spark_session
        self.tabela_bronze = tabela_bronze
        self.tabela_silver = tabela_silver

    def carregar_dados_bronze(self):
        print("📥 [INFO] Carregando dados da camada Bronze...")
        return self.spark.table(self.tabela_bronze)

    def tratar_dados(self, df_bronze):
        print("🛠️ [INFO] Tratando e preparando dados para camada Silver...")

        # Tratamento específico das datas
        colunas_data = [col for col in df_bronze.columns if 'DATA' in col]
        for coluna in colunas_data:
            df_bronze = df_bronze.withColumn(
                coluna,
                F.to_date(F.col(coluna), 'yyyy-MM-dd').cast('date')
            )

        # Tratamento específico de numéricos (exemplo: LATITUDE, LONGITUDE)
        colunas_numericas = ['LATITUDE', 'LONGITUDE']
        for coluna in colunas_numericas:
            df_bronze = df_bronze.withColumn(
                coluna,
                F.col(coluna).cast('decimal(10,18)')
            )
        # Manter coluna mes_estatistica como inteiro
        df_bronze = df_bronze.withColumn("mes_estatistica", F.col("mes_estatistica").cast("int"))

        # Tratamento específico das horas
        colunas_hora = [col for col in df_bronze.columns if 'HORA' in col]
        for coluna in colunas_hora:
            df_bronze = df_bronze.withColumn(
                coluna,
                F.to_timestamp(F.col(coluna), 'yyyy-MM-dd HH:mm:ss').cast('timestamp')
            )

        # Padronização de campos string
        colunas_string = ['DELEGACIA_NOME', 'CIDADE', 'BAIRRO', 'RUBRICA', 'PERIDOOCORRENCIA', 'LOGRADOURO', 'DESCRICAOLOCAL', 'SOLUCAO']
        for coluna in colunas_string:
            df_bronze = df_bronze.withColumn(
                coluna,
                F.upper(F.trim(F.col(coluna)))
            )

        # Limpeza de linhas vazias (totalmente nulas)
        df_bronze = df_bronze.na.drop("all")

        print("✅ [INFO] Dados tratados detalhadamente com sucesso.")
        return df_bronze

    def salvar_silver(self, df_silver):
        print(f"💾 [INFO] Salvando dados tratados na camada Silver ({self.tabela_silver})...")
        try:
            df_silver.write \
                .format("delta") \
                .mode("overwrite") \
                .option("mergeSchema", "true") \
                .saveAsTable(self.tabela_silver)

            print("✅ [INFO] Dados salvos com sucesso na camada Silver.")
        except Exception as e:
            print(f"🔴 [ERRO] Falha ao salvar dados na camada Silver: {e}")

    def executar_maturacao(self):
        print("🚀 [INFO] Iniciando maturação detalhada de Bronze para Silver...")
        df_bronze = self.carregar_dados_bronze()
        df_silver = self.tratar_dados(df_bronze)
        self.salvar_silver(df_silver)
        print("🏁 [INFO] Processo de maturação concluído com sucesso!")

In [0]:
maturacao = MaturacaoSSPSilver(
    spark_session=spark,
    tabela_bronze="bronze_sp_dados_criminais",
    tabela_silver="silver_sp_dados_criminais"
)

maturacao.executar_maturacao()