In [0]:
class Framework:
    """
    Classe de controle para execução dos processos ETL/ELT, com lógica para criação de tabelas Bronze e Silver.
    
    Camada Bronze:
      - Lê os dados da tabela Delta no schema landing.
      - Gera a coluna BK_EXTRACAO via MD5.
      - Gera a coluna DATA_EXTRACAO com a data atual no formato YYYY-MM-DD.
      - Verifica se existe alguma coluna com todos os conteúdos nulos.
      - Grava a tabela Bronze no metastore (schema bronze) via APPEND_ALL.
    
    Camada Silver:
      - Lê os dados da tabela Bronze (schema bronze).
      - Verifica, no dicionário de metadados (schema dicionarios.dicionario_de_metadados),
        se há mapeamento para renomear a tabela na Silver.
      - Permite carga via APPEND_ALL ou UPSERT (merge com base na coluna BK_EXTRACAO).
    """
    
    class RawFileFormat:
        Delta = "delta"
    
    class LoadType:
        APPEND_ALL = "APPEND_ALL"
        UPSERT = "UPSERT"
    
    class SchemaEvolutionMode:
        ADD_NEW_COLUMNS = "ADD_NEW_COLUMNS"
    
    @classmethod
    def generate_bronze_table_full_name(cls, schema_name, table_name):
        """
        Gera o nome completo da tabela Bronze no metastore, no formato schema.table.
        """
        return f"{schema_name}.{table_name}"
    
    @classmethod
    def write_delta_table(cls, df, schema_name, table_full_name, load_type, key_columns, partition_columns, schema_evolution_mode):
        """
        Escrita da Delta Table no metastore (usado na camada Bronze).
        Utiliza to_csv para criar ou acrescentar dados na tabela.
        """
        df.to_csv(f"{table_full_name}.csv", index=False, mode='a', header=not os.path.exists(f"{table_full_name}.csv"))
        print("Escrita na camada Bronze realizada com sucesso.")

    def execute_etl(self, 
                    schema_name: str,
                    source_table: str,
                    landing_table: str,
                    target_table: str,
                    bk_columns_keys: list,
                    partition: str = ""):
        """
        Executa o fluxo ETL para a camada Bronze.
        
        :param schema_name: Schema de destino (ex.: "bronze").
        :param source_table: Nome da tabela fonte (ex.: "sales_oracle_system") – usado para identificar o processo.
        :param landing_table: Nome da tabela Delta no schema landing (ex.: "landing.sales_oracle_system").
        :param target_table: Nome da tabela destino (ex.: "sales_oracle_system").
        :param bk_columns_keys: Lista de colunas para gerar a chave de extração.
        :param partition: Coluna(s) de particionamento (opcional).
        """
        # Leitura direta da tabela Delta na camada landing
        df = pd.read_csv(landing_table)
        
        # Geração da coluna BK_EXTRACAO (hash MD5 da concatenação das colunas informadas)
        df['BK_EXTRACAO'] = df[bk_columns_keys].astype(str).agg('_'.join, axis=1).apply(lambda x: hashlib.md5(x.encode()).hexdigest())
        
        # Geração da coluna DATA_EXTRACAO com a data atual no formato YYYY-MM-DD
        df['DATA_EXTRACAO'] = pd.to_datetime('today').normalize()
        
        # Verificação de colunas com todos os conteúdos nulos
        null_columns = [c for c in df.columns if df[c].isnull().all()]
        if null_columns:
            print(f"Colunas com todos os conteúdos nulos: {null_columns}")
        
        key_columns = ["BK_EXTRACAO"]
        
        # Gera o nome completo da tabela Bronze (ex.: bronze.sales_oracle_system)
        table_full_name = Framework.generate_bronze_table_full_name(schema_name, target_table)
        
        # Escrita da Delta Table no schema Bronze
        Framework.write_delta_table(
            df=df,
            schema_name=schema_name,
            table_full_name=table_full_name,
            load_type=Framework.LoadType.APPEND_ALL,
            key_columns=key_columns,
            partition_columns=partition,
            schema_evolution_mode=Framework.SchemaEvolutionMode.ADD_NEW_COLUMNS,
        )
    
    def execute_silver(self, 
                       silver_schema: str,
                       bronze_table: str,
                       target_table: str,
                       load_type: str,
                       partition: str = ""):
        """
        Executa o fluxo para a camada Silver.
        
        Lê os dados da tabela Bronze (do schema bronze) e verifica, no dicionário de metadados,
        se existe um registro onde o campo 'nome_tabela_origem' seja igual ao nome da tabela Bronze.
        Se existir, o nome da tabela Silver passará a ser o valor do campo 'nome_tabela_transformada';
        caso contrário, utiliza o valor informado em target_table.
        
        Permite dois modos de carga:
          - APPEND_ALL: Acrescenta os dados;
          - UPSERT: Realiza merge (update/insert) usando a coluna BK_EXTRACAO.
        
        :param silver_schema: Schema de destino para a Silver (ex.: "silver").
        :param bronze_table: Nome da tabela Bronze (ex.: "sales_oracle_system") – presente no schema bronze.
        :param target_table: Nome sugerido para a tabela Silver; poderá ser alterado conforme o dicionário.
        :param load_type: Modo de carga: Framework.LoadType.APPEND_ALL ou Framework.LoadType.UPSERT.
        :param partition: Coluna(s) de particionamento (opcional).
        """
        # Leitura dos dados da camada Bronze (assumindo que ela esteja no schema "bronze")
        bronze_full_name = f"bronze.{bronze_table}.csv"
        df = pd.read_csv(bronze_full_name)
        
        # Verifica o dicionário de metadados para mapear o nome da tabela Silver
        try:
            meta_df = pd.read_csv("dicionarios/dicionario_de_metadados.csv")
            meta_filtered = meta_df[meta_df["nome_tabela_origem"] == bronze_table]
            if not meta_filtered.empty:
                silver_table_name = meta_filtered.iloc[0]["nome_tabela_transformada"]
                print(f"Mapeamento encontrado no dicionário: {bronze_table} -> {silver_table_name}")
            else:
                silver_table_name = target_table
                print("Nenhum mapeamento encontrado no dicionário; usando target_table informado.")
        except Exception as e:
            print(f"Erro ao acessar dicionário de metadados: {e}")
            silver_table_name = target_table
        
        # Define o nome completo da tabela Silver
        full_silver_table_name = f"{silver_schema}.{silver_table_name}.csv"
        
        if load_type == Framework.LoadType.APPEND_ALL:
            # Escrita simples via append
            df.to_csv(full_silver_table_name, index=False, mode='a', header=not os.path.exists(full_silver_table_name))
            print("Escrita na camada Silver realizada com sucesso (APPEND_ALL).")
        elif load_type == Framework.LoadType.UPSERT:
            # Lógica de UPSERT utilizando merge baseado em BK_EXTRACAO
            try:
                existing_df = pd.read_csv(full_silver_table_name)
                merged_df = pd.concat([existing_df, df]).drop_duplicates(subset=['BK_EXTRACAO'], keep='last')
                merged_df.to_csv(full_silver_table_name, index=False)
                print("Merge UPSERT na camada Silver realizado com sucesso.")
            except Exception as e:
                # Caso a tabela não exista, cria-a
                print(f"Tabela Silver não existe: {e}. Criando tabela como CSV...")
                df.to_csv(full_silver_table_name, index=False)
                print("Tabela Silver criada e dados inseridos (UPSERT via append).")
        else:
            raise NotImplementedError("Load type não implementado para Silver.")