In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp, col, to_date, regexp_replace
from delta.tables import DeltaTable

class DeltaPipeline:
    def __init__(self, minio_endpoint, access_key, secret_key):
        self.spark = SparkSession.builder \
            .appName("spark") \
            .config("spark.hadoop.fs.s3a.endpoint", minio_endpoint) \
            .config("spark.hadoop.fs.s3a.access.key", access_key) \
            .config("spark.hadoop.fs.s3a.secret.key", secret_key) \
            .config("spark.hadoop.fs.s3a.path.style.access", "true") \
            .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
            .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
            .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
            .getOrCreate()

    def read_landing(self, landing_path):
        df = self.spark.read.option("header", "true").option("sep", ";").csv(landing_path)
        df = df.withColumnRenamed("Product ID", "Product_ID")
        df = df.withColumn("ID_Pedido", col("ID_Pedido").cast("string"))
        return df

    def prepare_fato(self, df):
        df_fato = df.select(
            "ID_Pedido", "Data_Pedido", "ID_Cliente", "Product_ID",
            "Quantidade", "Total_Vendas", "Desconto", "Lucro"
        ).dropDuplicates(["ID_Pedido"]).dropna(subset=["ID_Pedido"])

        # Converte Data_Pedido de string "dd-MM-yyyy" para date
        df_fato = df_fato.withColumn("Data_Pedido", to_date(col("Data_Pedido"), "dd-MM-yyyy"))

        # Corrige colunas numéricas substituindo vírgula por ponto e cast para double
        for col_name in ["Quantidade", "Total_Vendas", "Desconto", "Lucro"]:
            df_fato = df_fato.withColumn(col_name,
                regexp_replace(col(col_name), ",", ".").cast("double")
            )

        return df_fato

    def prepare_dim_cliente(self, df):
        return df.select(
            "ID_Cliente", "Segmento", "Regiao", "Pais", "Prioridade"
        ).dropDuplicates(["ID_Cliente"]).dropna(subset=["ID_Cliente"])

    def prepare_dim_produto(self, df):
        return df.select(
            "Product_ID", "Categoria", "SubCategoria"
        ).dropDuplicates(["Product_ID"]).dropna(subset=["Product_ID"])

    def merge_delta(self, df_source, path_delta, key, timestamp_col="process_timestamp"):
        df_source = df_source.withColumn(timestamp_col, current_timestamp())

        if DeltaTable.isDeltaTable(self.spark, path_delta):
            delta_table = DeltaTable.forPath(self.spark, path_delta)

            merge_condition = f"target.{key} = source.{key}"

            columns_to_compare = [c for c in df_source.columns if c != timestamp_col]
            equality_condition = " AND ".join([f"(target.{c} <=> source.{c})" for c in columns_to_compare])

            delta_table.alias("target").merge(
                df_source.alias("source"),
                merge_condition
            ).whenMatchedUpdate(
                condition=f"NOT ({equality_condition})",
                set={col: f"source.{col}" for col in df_source.columns}
            ).whenNotMatchedInsertAll().execute()
        else:
            df_source.write.format("delta").mode("overwrite").save(path_delta)

    def run(self, landing_path, staging_fato_path, staging_dim_cliente_path, staging_dim_produto_path):
        df_landing = self.read_landing(landing_path)

        df_fato = self.prepare_fato(df_landing)
        df_dim_cliente = self.prepare_dim_cliente(df_landing)
        df_dim_produto = self.prepare_dim_produto(df_landing)

        self.merge_delta(df_fato, staging_fato_path, "ID_Pedido")
        self.merge_delta(df_dim_cliente, staging_dim_cliente_path, "ID_Cliente")
        self.merge_delta(df_dim_produto, staging_dim_produto_path, "Product_ID")

        print("Processamento concluído!")

# Uso do pipeline
if __name__ == "__main__":
    pipeline = DeltaPipeline(
        minio_endpoint="http://minio:9000",
        access_key="jGL83XVOmaZOGSKipZzb",
        secret_key="bUE3WUc0tvplgyss01XlMmpHSLsoZCkEomgCh93s"
    )

    pipeline.run(
        landing_path="s3a://landing/dataset.csv",
        staging_fato_path="s3a://staging/fato_pedidos",
        staging_dim_cliente_path="s3a://staging/dim_cliente",
        staging_dim_produto_path="s3a://staging/dim_produto"
    )


Processamento concluído!
