In [0]:
# dbutils.fs.mkdirs("/Volumes/workspace/recargapay/vol_rp/bronze")

In [0]:
# dbutils.fs.mkdirs("/Volumes/workspace/recargapay/vol_rp/silver")

In [0]:
# dbutils.fs.mkdirs("/Volumes/workspace/recargapay/vol_rp/gold")

In [0]:
df = spark.read.format('parquet').option('header', 'true').option('inferSchema', 'true').load("/Volumes/workspace/recargapay/vol_rp/bronze/*.parquet").orderBy("account_id", "event_time")
display(df)

In [0]:
from pyspark.sql.functions import col, date_format, sum as _sum, explode, sequence, lit, min as _min, max as _max
from pyspark.sql.window import Window

# STEP 1: Format date and aggregate total amount per day
daily_totals = (
    df.withColumn("date", date_format("event_time", "yyyy-MM-dd"))
      .groupBy("account_id", "user_id", "date")
      .agg(_sum("amount").alias("daily_amount"))
)

# STEP 2: Generate full calendar date range
min_date = df.select(date_format("event_time", "yyyy-MM-dd").alias("date")).agg(_min("date")).first()[0]
max_date = df.select(date_format("event_time", "yyyy-MM-dd").alias("date")).agg(_max("date")).first()[0]

calendar = spark.createDataFrame([(min_date, max_date)], ["start", "end"]) \
    .withColumn("date", explode(sequence(lit(min_date).cast("date"), lit(max_date).cast("date"), lit(1).cast("interval day")))) \
    .selectExpr("date_format(date, 'yyyy-MM-dd') as date")

# STEP 3: Cross join all account_id + user_id pairs with the full date range
account_users = df.select("account_id", "user_id").distinct()
full_grid = account_users.crossJoin(calendar)

# STEP 4: Join with actual daily totals and fill missing values
daily_complete = full_grid.join(daily_totals, on=["account_id", "user_id", "date"], how="left") \
                          .fillna({"daily_amount": 0.0})

# STEP 5: Define window and compute cumulative balance
window_spec = Window.partitionBy("account_id").orderBy("date").rowsBetween(Window.unboundedPreceding, 0)

result = (
    daily_complete.withColumn("daily_balance", _sum("daily_amount").over(window_spec))
                  .orderBy("account_id", "date")
)

display(result)

In [0]:
result.write.mode("overwrite").format("delta").save("/Volumes/workspace/recargapay/vol_rp/silver/daily_balance_table")