In [0]:
# importando as bibliotecas necessárias
from pyspark.sql import functions as f
from pyspark.sql.window import Window
import datetime

In [0]:
bronze_transactions_path = "/Volumes/rp/bronze/transactions"
bronze_calendar_table = "rp.bronze.dim_calendario"
silver_eligible_balances_table = "rp.silver.daily_eligible_balances"

In [0]:
# Parametros do Job
dbutils.widgets.text("processing_date", "2025-08-16")
processing_date_str = dbutils.widgets.get("processing_date")
# processing_date_str = "2025-08-16" # usando a data atual como exemplo
processing_date = datetime.datetime.strptime(processing_date_str, "%Y-%m-%d").date()

In [0]:
# limite mínimo de saldo para elegibilidade
balance_threshold = 100.0

In [0]:
# Simulação da tabela de taxas de juros
# esta tabela seria mantida e atualizada por uma equipe financeira.
interest_rates_data = [
    (datetime.date(2025, 8, 14), 0.1150), # quinta-feira
    (datetime.date(2025, 8, 15), 0.1155)  # sexta-feira
]
interest_rates_df = spark.createDataFrame(interest_rates_data, ["rate_date", "annual_rate"]) \
    .withColumn("daily_rate", f.pow(f.lit(1) + f.col("annual_rate"), f.lit(1/252)) - 1)

print(f"pipeline configurado para a data de processamento: {processing_date_str}")
print("fontes e destinos:")
print(f"  bronze (volume): {bronze_transactions_path}")
print(f"  bronze (tabela): {bronze_calendar_table}")
print(f"  silver (tabela): {silver_eligible_balances_table}")

In [0]:
# Lendo tabela Bronze
wallet_cdc_df = spark.read.format("parquet") \
    .option('header', 'true') \
    .option('inferSchema', 'true') \
    .load(bronze_transactions_path) \
    .withColumn("timestamp", f.to_timestamp(f.col("event_time")))

In [0]:
# lendo a tabela de calendário considerando informações da FEBRABAN
calendar_df = spark.table(bronze_calendar_table) \
    .withColumn("date", f.to_date(f.col("data")))

In [0]:
# determinar o último dia útil antes da data de processamento.
# A coluna 'flag_dia_util_bancario' para identificar os dias úteis.
last_business_day_row = calendar_df \
    .filter((f.col("data") < processing_date) & (f.col("flag_dia_util_bancario") == "S")) \
    .orderBy(f.col("data").desc()) \
    .first()

if not last_business_day_row:
    dbutils.notebook.exit(f"nenhum dia útil encontrado antes de {processing_date_str}. verifique a tabela de calendário.")

last_business_day = last_business_day_row.data
print(f"último dia útil considerado para o cálculo: {last_business_day}")

In [0]:
# Calcular o saldo de abertura no início do último dia útil.
opening_balance_df = (
    wallet_cdc_df
    .filter(f.col("timestamp") < f.to_timestamp(f.lit(str(last_business_day))))
    .groupBy("user_id")
    .agg(f.sum("amount").alias("balance_on_last_biz_day"))
)

In [0]:
# Identificando usuários com movimentação desde o último dia útil.
unstable_users_df = (
    wallet_cdc_df
    .filter(
        (f.col("timestamp") >= f.to_timestamp(f.lit(str(last_business_day)))) &
        (f.col("timestamp") < f.to_timestamp(f.lit(processing_date_str)))
    )
    .select("user_id")
    .distinct()
)

In [0]:
# Filtrando somnente os saldos estáveis e que atendem ao critério de valor mínimo de 100 reais
eligible_balances_df = opening_balance_df \
    .join(unstable_users_df, on="user_id", how="left_anti") \
    .filter(f.col("balance_on_last_biz_day") > balance_threshold) \
    .withColumn("processing_date", f.lit(processing_date).cast("date")) \
    .withColumn("last_business_day", f.lit(last_business_day).cast("date"))

print("saldos elegíveis para o dia (a serem salvos na camada silver):")
eligible_balances_df.display()

In [0]:
# Salvando a tabela na camada silver
(
    eligible_balances_df.write
    .format("delta")
    .mode("overwrite")
    .option("replaceWhere", f"processing_date = '{processing_date_str}'")
    .partitionBy("processing_date")
    .saveAsTable(silver_eligible_balances_table)
)

print(f"tabela de saldos elegíveis salva com sucesso em '{silver_eligible_balances_table}' para a data {processing_date_str}.")