In [0]:
# Databricks notebook source
from delta.tables import DeltaTable
from pyspark.sql.functions import col, udf
from pyspark.sql import DataFrame, Column
from pyspark.sql.window import Window
import pyspark.sql.functions as F
import pyspark.sql.types as T
from datetime import datetime, timedelta
from athena_mvsh import Athena, CursorParquetDuckdb
from dateutil.relativedelta import relativedelta

cols_rename = [
    "filial", "cod_prod", 
    "periodo", "etiqueta", 
    "perc_dsc_cupom", "venda", 
    "venda_desconto"
]

cols_cosmos = [
    "MVVC_CD_FILIAL_MOV",
    "MVVP_NR_PRD",
    "MVVC_DT_MOV",
    "NUMERO_AUTORIZ_PAGUEMENOS",
    "MVVP_PR_DSC_ITE",
    "MVVP_VL_PRE_VDA",
    "MVVP_VL_PRD_VEN",
]

cols_pre_venda = [
    "VC_CD_FILIAL",
    "VD_CD_PRODUTO",
    "VC_DH_VENDA",
    "VD_COD_ETIQUETA_ULCH",
    "VD_PERC_DESCONTO",
    "VD_VL_PRODUTO",
    "VD_VL_PRODUTO_COM_DESCONTO",
]

cols_autorizador = [
    "ulch_sq_autorizacao",
    "ulch_preco_venda",
    "ulch_percentual_desconto",
    "ulch_fl_tipo_produto",
    "ulch_cd_barras",
    "ulch_fl_situacao",
    "ulch_sq_produto"
]

cols_produto = [
    "ulch_sq_produto",
    "xxxx_dh_cad",
    "ulch_lote",
    "ulch_dt_vencimento",
    "ulch_sq_produto"
]

# definir periodo
end = datetime.now()
start = end.replace(day=1)

if end.day <= 5:
    start = start - relativedelta(months=1)

# definir acesso externo
username = dbutils.secrets.get(scope="externo", key="username")
password = dbutils.secrets.get(scope="externo", key="password")
region = dbutils.secrets.get(scope="externo", key="region")
location = dbutils.secrets.get(scope="externo", key="location")
location_tables = dbutils.secrets.get(scope="externo", key="location_tables")

# info tables
catalog = "bronze"
schema = "super_desconto"
table_name = "venda"
table_athena = "super_desconto_vendas"
schema_athena = "prevencao-perdas"

print(start, end)

In [0]:
def table_exists(
    catalog: str, 
    schema: str, 
    tablename: str
) -> bool:
    
    return spark.catalog.tableExists(f"{catalog}.{schema}.{tablename}")


def etiqueta(colname: str) -> Column:
    return F.lpad(F.trim(colname), 30, "0").cast(T.StringType())


def list_files(
    path: str, 
    start: datetime, 
    end: datetime
):
    days = (end - start).days + 1
    for day in range(days):
        dt = start + timedelta(day)
        yield f"/Volumes/raw/super_desconto/{path}/{dt:%Y/%m/%d}.parquet"

In [0]:
def view_pre_venda(
    path: str, 
    start: datetime, 
    end: datetime, 
    columns: list[str]
) -> DataFrame:
    
    col_etiqueta = columns[3]
    files_path = list(list_files(path, start, end))

    return (
        spark.read.parquet(*files_path)
        .select(columns)
        .withColumn(col_etiqueta, etiqueta(col_etiqueta))
        .withColumnsRenamed(dict(zip(columns, cols_rename)))
    )


def view_cupom(start: datetime, end: datetime) -> DataFrame:
    windows = (
        Window.partitionBy("etiqueta")
         .orderBy(col("venda_desconto").desc())
    )

    return (
        view_pre_venda("COSMOSMOV", start, end, cols_cosmos)
        .union(view_pre_venda("PRE_VENDA", start, end, cols_pre_venda))
        .withColumn("id", F.row_number().over(windows))
        .filter(col("id") == 1)
        .drop("id")
    )


def view_autorizador() -> DataFrame:
    volume = "/Volumes/raw/super_desconto/cosmos_v14b"

    file = "cosmos_v14b_dbo_ultima_chance_autorizacao.parquet"
    file_hist = "cosmos_v14b_dbo_ultima_chance_autorizacao_hist.parquet"
    
    def inner_dataframe(filename: str, priority: int) -> DataFrame:
        return (
            spark.read.parquet(f"{volume}/{filename}")
            .select(cols_autorizador)
            .filter(col("ulch_fl_situacao") == "F")
            .withColumn("ulch_cd_barras", etiqueta("ulch_cd_barras"))
            .withColumn("ulch_percentual_desconto", F.coalesce("ulch_percentual_desconto", F.lit(0)))
            .withColumn("priority", F.lit(priority))
        )

    return (
        inner_dataframe(file, 1)
        .union(inner_dataframe(file_hist, 2))
        .orderBy("priority")
        .dropDuplicates(["ulch_cd_barras"])
        .drop("priority")
    )


def view_produto() -> DataFrame:
    volume = "/Volumes/raw/super_desconto/cosmos_v14b"
    file = "cosmos_v14b_dbo_ultima_chance_produto.parquet"
    file_hist = "cosmos_v14b_dbo_ultima_chance_produto_hist.parquet"
    
    def inner_dataframe(filename: str, priority: int) -> DataFrame:
        return (
            spark.read.parquet(f"{volume}/{filename}")
            .select(cols_produto)
            .withColumn("ulch_lote", F.upper(F.trim("ulch_lote")))
            .withColumn("priority", F.lit(priority))
        )
    
    return (
        inner_dataframe(file, 1)
        .union(inner_dataframe(file_hist, 2))
        .orderBy("priority")
        .dropDuplicates(["ulch_sq_produto"])
        .drop("priority")
    )

In [0]:
# consulta full load
autorizador = view_autorizador()
produto = view_produto()

In [0]:
cupom = view_cupom(start, end)

view_create = (
    cupom
     .join(autorizador, cupom.etiqueta == autorizador.ulch_cd_barras)
     .join(produto, autorizador.ulch_sq_produto == produto.ulch_sq_produto)
     .select(
        autorizador.ulch_sq_autorizacao,
        produto.ulch_sq_produto,
        produto.xxxx_dh_cad,
        cupom.periodo.alias("dt_venda"),
        cupom.filial,
        cupom.cod_prod,
        produto.ulch_lote,
        produto.ulch_dt_vencimento,
        cupom.etiqueta,
        cupom.perc_dsc_cupom,
        cupom.venda,
        cupom.venda_desconto,
        autorizador.ulch_preco_venda.alias("ulch_preco_venda"), 
        autorizador.ulch_percentual_desconto, 
        autorizador.ulch_fl_tipo_produto
     )
)

if not table_exists(catalog, schema, table_name):
    print(f"creating table {catalog}.{schema}.{table_name}")

    (
        view_create.coalesce(1).write.mode("overwrite")
        .format("delta").saveAsTable(f"{catalog}.{schema}.{table_name}")
    )

else:
    print(f"updating table {catalog}.{schema}.{table_name}")

    target = DeltaTable.forName(spark, f"{catalog}.{schema}.{table_name}")
    (
        target.alias('t')
        .merge(view_create.alias('s'), "t.etiqueta = s.etiqueta")
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute()
    )

In [0]:
# ETL -- athena
cursor = CursorParquetDuckdb(
    s3_staging_dir=location,
    result_reuse_enable=True,
    aws_access_key_id=username,
    aws_secret_access_key=password,
    region_name=region
)

print(f"periodo: {start:%Y-%m-%d} - {end:%Y-%m-%d}")

df = (
    spark.table(f"{catalog}.{schema}.{table_name}")
    .filter(col("dt_venda").between(f"{start:%Y-%m-%d} 00:00:00.000", f"{end:%Y-%m-%d} 23:59:59.999"))
)

columns = {c.name:col(c.name).cast(T.DoubleType()) for c in df.schema if isinstance(c.dataType, T.DecimalType)}
columns.update({"ulch_dt_vencimento": col("ulch_dt_vencimento").cast(T.TimestampNTZType())})
df = df.withColumns(columns).toPandas()

print(f"Rows df: {df.shape}")

with Athena(cursor=cursor) as cliente:
    cliente.merge_table_iceberg(
        table_athena,
        df,
        schema=schema_athena,
        predicate="t.etiqueta = s.etiqueta",
        location=f"{location_tables}tables/{table_athena}/"
    )