In [0]:
import logging
from pyspark.sql import functions as F

logging.basicConfig(level = logging.INFO)
logger = logging.getLogger(__name__)

class SaveHelper:
    def __init__(self, df, layer, table_path, mode, key_columns=None):
        self.df = df
        self.layer = layer.lower()
        self.table_path = table_path
        self.table_exists = spark.catalog.tableExists(self.table_path)
        self.mode = mode.lower()
        self.key_columns = key_columns
        
        self._validate_inputs(df, layer, table_path, mode, key_columns)

        logger.info(f"SaveHelper inicializado - Tabela: {table_path} - Mode: {mode} - Layer: {layer}")

    def _validate_inputs(self, df, layer, table_path, mode, key_columns):
        if df is None:
            raise ValueError("DataFrame não pode ser None")
        
        if not hasattr(df, 'write'):
            raise ValueError(f"Esperado DataFrame do Spark, recebido: {type(df)}")
        
        if df.count() == 0:
            logger.warning("DataFrame está vazio!")
        
        valid_modes = ["append", "overwrite", "delta"]
        if mode.lower() not in valid_modes:
            raise ValueError(f"Mode deve ser um entre as opções: {valid_modes}")

        valid_layers = ["bronze", "silver", "gold"]
        if layer.lower() not in valid_layers:
            raise ValueError(f"Layer deve ser uma entre as opções: {valid_layers}")

        if not table_path or "." not in table_path:
            raise ValueError("O table_path deve seguir formato 'catalog.schema.table'")

        if key_columns is not None and not isinstance(key_columns, list):
            raise ValueError(f"Esperado lista no parâmetro key_columns, recebido {type(key_columns)}")
        
        if mode.lower() == "delta":
            if key_columns is None or len(key_columns) == 0:
                raise ValueError("Mode 'delta' requer key_columns para fazer o merge")

    def _create_table_if_not_exists(self):
        if not self.table_exists:
            try:
                spark.catalog.createTable(self.table_path, source="delta", schema=self.df.schema)
                logger.info(f"✅ Tabela criada com sucesso: {self.table_path}")
                self.table_exists = True
            except Exception as e:
                logger.error(f"❌ Erro ao criar a tabela: {self.table_path} - {e}")
                raise

    def _merge_data(self):
        try:            
            logger.info(f"Executando merge com chaves: {self.key_columns}")
            
            temp_view = "temp_new_data_merge"
            self.df.createOrReplaceTempView(temp_view)
            
            join_conditions = []
            for key in self.key_columns:
                join_conditions.append(f"target.{key} = source.{key}")
            
            join_condition = " AND ".join(join_conditions)
            
            columns = [col for col in self.df.columns if col != "data_ingestao"]
            columns_str = ", ".join(columns)
            values_str = ", ".join([f"source.{col}" for col in columns])
            
            if "data_ingestao" in self.df.columns:
                columns_str += ", data_ingestao"
                values_str += ", source.data_ingestao"
            
            merge_query = f"""
                MERGE INTO {self.table_path} AS target
                USING {temp_view} AS source
                ON {join_condition}
                
                WHEN NOT MATCHED THEN
                INSERT ({columns_str})
                VALUES ({values_str})
            """
            
            logger.info(f"Executando MERGE SQL...")
            spark.sql(merge_query)
            logger.info("✅ Merge executado com sucesso!")
            
        except Exception as e:
            logger.error(f"❌ Erro no merge: {e}")
            raise

    def _add_metadata(self):
        if "data_ingestao" not in self.df.columns:
            self.df = self.df.withColumn("data_ingestao", F.current_date())
            logger.info("✅ Coluna data_ingestao adicionada")

    def _saveTable(self):
        try:
            if self.mode == "append":
                self.df.write.format("delta").mode("append").option("mergeSchema", "true").saveAsTable(self.table_path)
                logger.info(f"✅ {self.df.count()} registros inseridos com sucesso em modo APPEND!")
                
            elif self.mode == "overwrite":
                self.df.write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable(self.table_path)
                logger.info(f"✅ Tabela sobrescrita com {self.df.count()} registros em modo OVERWRITE!")
                
            elif self.mode == "delta":
                self._merge_data()
                logger.info(f"✅ Merge realizado na tabela {self.table_path}!")
                
        except Exception as e:
            logger.error(f"❌ Erro ao salvar os dados na tabela {self.table_path}: {e}")
            raise

    def execute(self):
        logger.info(f"Iniciando processo de salvamento - Mode: {self.mode}")
        
        if self.layer == 'bronze':
            self._add_metadata()
        
        if self.mode == "delta" and not self.table_exists:
            logger.info("Tabela não existe para merge - criando tabela primeiro")
            self._create_table_if_not_exists()
        
        if self.mode != "delta":
            self._create_table_if_not_exists()
            
        self._saveTable()
        
        logger.info("✅ Processo concluído com sucesso!")