In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, current_timestamp, to_timestamp, trim, lit, date_format,try_to_timestamp
from pyspark.sql.types import *

In [0]:
class SilverSalesPipeline:
    """
    Classe utilitária com a lógica de transformação e Upsert para a Camada Silver.
    """
    def __init__(self, spark, silver_table_name: str):
        self.spark = spark
        self.silver_table_name = silver_table_name
        
        # Chaves primárias para o MERGE
        self.primary_keys = ["TransactionNo", "ProductNo"]



    def apply_transformations(self, bronze_df):
        """
        Aplica limpeza, tipagem, metadados e filtros de qualidade.
        """
        
        silver_df = (
            bronze_df
            
            # 1. Limpeza
            .withColumn("Country", trim(col("Country")))
            
            # 2. Criação da coluna de preço total
            .withColumn(
                "Total_Price",
                (col("Price") * col("Quantity").cast("int"))
                .cast("decimal(18,2)")
            )
            # 3. CONVERSÃO DA DATA
            .withColumn(
                "Transaction_Datetime", 
                to_timestamp(col("Date"), "M/d/yyyy")
            )
            # 4. Coluna de Metadado de Atualização
            .withColumn("Update_Time_Silver", current_timestamp())
            
            # Filtro Básico de Qualidade
            .filter(col("CustomerNo").isNotNull())
            
            # 4. Seleção final
            .select(
                col("TransactionNo"),
                col("CustomerNo"),
                col("Transaction_Datetime"), 
                col("Price"),
                col("Total_Price"),
                col("Quantity"),
                col("Product"),          
                col("ProductNo"),
                col("Country"),
                col("Update_Time_Silver"),
                col("Ingestion_Time_Bronze"), 
                col("Raw_Data_Source")
            )
        )
        return silver_df



    def upsert_to_delta(self, batch_df, batch_id):
        """
        Executa MERGE INTO na tabela Silver de forma transacional.
        """
        print(f"Processando lote {batch_id}")
        
        # Deduplicação e verificação (mantido)
        deduped_df = batch_df.dropDuplicates(self.primary_keys)
        if deduped_df.isEmpty():
            print("Lote vazio após deduplicação. Pulando MERGE.")
            return

        deduped_df.createOrReplaceTempView("updates")

        merge_query = f"""
            MERGE INTO {self.silver_table_name} AS target
            USING updates AS source
            ON  target.TransactionNo = source.TransactionNo
            AND target.ProductNo = source.ProductNo
            WHEN MATCHED THEN UPDATE SET
                target.CustomerNo = source.CustomerNo, 
                target.Transaction_Datetime = source.Transaction_Datetime,
                target.Price = source.Price,
                target.Total_Price = source.Total_Price,
                target.Quantity = source.Quantity,
                target.Product = source.Product,
                target.ProductNo = source.ProductNo,
                target.Country = source.Country,
                target.Update_Time_Silver = source.Update_Time_Silver,
                target.Ingestion_Time_Bronze = source.Ingestion_Time_Bronze,
                target.Raw_Data_Source = source.Raw_Data_Source
            WHEN NOT MATCHED THEN INSERT (
                TransactionNo, CustomerNo, Transaction_Datetime, Price, Total_Price, Quantity, Product, ProductNo, Country, Update_Time_Silver, Ingestion_Time_Bronze, Raw_Data_Source
            ) VALUES (
                source.TransactionNo, 
                source.CustomerNo, 
                source.Transaction_Datetime,
                source.Price,
                source.Total_Price, 
                source.Quantity,
                source.Product,
                source.ProductNo, 
                source.Country,
                source.Update_Time_Silver,
                source.Ingestion_Time_Bronze, 
                source.Raw_Data_Source
            )
        """

        self.spark.sql(merge_query)

    def get_upsert_function(self):
        """
        Retorna a função wrapper compatível com foreachBatch.
        """
        def wrapper(batch_df, batch_id):
            return self.upsert_to_delta(batch_df, batch_id)
        return wrapper

