## Imports

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import DecimalType, TimestampType, StringType, LongType
from datetime import datetime, timedelta

## Data Loading

In [0]:
try:
  df_transactions = spark.read.format("parquet").load("dbfs:/FileStore/shared_uploads/gabrielsribe@gmail.com/*.parquet")
except:
  print("Failed to read the data!")
  raise Exception("Failed to read the data!")

## Cleaning and Checking Data
It has been verified that I have a wallet creation event for each account_id

In [0]:
#Adjust schema
df_transactions_cleaned = df_transactions \
    .withColumn("event_time", F.col("event_time").cast(TimestampType())) \
    .withColumn("user_id", F.col("user_id").cast(StringType())) \
    .withColumn("account_id", F.col("account_id").cast(StringType())) \
    .withColumn("amount", F.col("amount").cast(DecimalType(10, 2))) \
    .withColumn("transaction_type", F.col("transaction_type").cast(StringType())) \
    .withColumn("cdc_operation", F.col("cdc_operation").cast(StringType())) \
    .withColumn("cdc_sequence_num", F.col("cdc_sequence_num").cast(LongType())) \
    .withColumn("source_system", F.col("source_system").cast(StringType()))

In [0]:
#Data quality:
errors = []
if df_transactions_cleaned.filter(F.col("user_id").isNull()).count() > 0:
    errors.append("Column ‘user_id’ has null values.")

if df_transactions_cleaned.filter(F.col("account_id").isNull()).count() > 0:
    errors.append("Column ‘account_id’ has null values.")

if df_transactions_cleaned.filter(F.col("amount").isNull()).count() > 0:
    errors.append("Column ‘amount’ has null values.")

if df_transactions_cleaned.filter(F.col("event_time").isNull()).count() > 0:
    errors.append("Column ‘event_time’ has null values.")

if df_transactions_cleaned.filter(F.col("cdc_sequence_num").isNull()).count() > 0:
    errors.append("Column ‘cdc_sequence_num’ has null values.")

if df_transactions_cleaned.filter(~ F.col("transaction_type")\
    .isin("TRANSFER_OUT","DEPOSIT","WITHDRAWAL","TRANSFER_IN","WALLET_CREATED")).count() > 0:
    errors.append("Column transaction_type has null values.")

if errors:
    error_message = "\n".join(errors)
    raise Exception(f"Failed data quality validation:\n{error_message}")
else:
    print("Dataset validated successfully!")

Dataset validated successfully!


##Intermediate table: User balance over time

In [0]:
# Get only necessary events
df_valid_events = df_transactions_cleaned.filter(F.col("cdc_operation").isin("insert", "update")) \
    .withColumn("date", F.to_date("event_time"))

In [0]:
# Group transactions by account_id and date
df_daily_movements = (
    df_valid_events.groupBy("account_id", "date")
    .agg(F.sum("amount").alias("daily_amount"))
)

In [0]:
#Get date range from data
date_range = df_daily_movements.select(F.min("date").alias("start"), F.max("date").alias("end")).collect()[0]

#Select start date and end date
start_date, end_date = date_range["start"], date_range["end"]

#Generate Dataframe with the dates
def generate_date_range(start, end):
    days = (end - start).days + 1
    return [(start + timedelta(days=i),) for i in range(days)]

df_dates = spark.createDataFrame(
    generate_date_range(start_date, end_date),
    ["date"]
)

In [0]:
# Cross Join between account_id and dates
df_accounts = df_valid_events.select("account_id").distinct()
df_accounts_with_dates = df_accounts.crossJoin(df_dates)

In [0]:
#Join with daily movement data, filling in null values with zero
df_accounts_with_dates_daily_movements = (
    df_accounts_with_dates
    .join(df_daily_movements, on=["account_id", "date"], how="left")
    .withColumn("daily_amount", F.coalesce(F.col("daily_amount"), F.lit(0.0)))
)

In [0]:
#Calculate accumulated balance by account_id
window_spec = Window.partitionBy("account_id").orderBy("date").rowsBetween(Window.unboundedPreceding, Window.currentRow)

df_daily_balance = df_accounts_with_dates_daily_movements.withColumn("daily_balance", F.sum("daily_amount").over(window_spec))

In [0]:
#Daily Balance dataset Data Quality:
errors = []
if df_daily_balance.filter(F.col("date").isNull()).count() > 0:
    errors.append("Column date has null values.")

if df_daily_balance.filter(F.col("account_id").isNull()).count() > 0:
    errors.append("Column ‘account_id’ has null values.")

if df_daily_balance.filter((F.col("daily_amount").isNull())).count() > 0:
    errors.append("Column cdi_interest_value has null values.")

if df_daily_balance.filter((F.col("daily_balance").isNull())).count() > 0:
    errors.append("Column daily_balance has null values.")

if errors:
    error_message = "\n".join(errors)
    raise Exception(f"Failed data quality validation:\n{error_message}")
else:
    print("Dataset validated successfully!")

Dataset validated successfully!


In [0]:
# Result
df_daily_balance.filter("account_id = '001a008c-031f-5c97-9f5f-6369a53b4b4f'").display()

account_id,date,daily_amount,daily_balance
001a008c-031f-5c97-9f5f-6369a53b4b4f,2024-05-01,58.98,58.98
001a008c-031f-5c97-9f5f-6369a53b4b4f,2024-05-02,91.76,150.74
001a008c-031f-5c97-9f5f-6369a53b4b4f,2024-05-03,0.0,150.74
001a008c-031f-5c97-9f5f-6369a53b4b4f,2024-05-04,0.0,150.74
001a008c-031f-5c97-9f5f-6369a53b4b4f,2024-05-05,0.0,150.74
001a008c-031f-5c97-9f5f-6369a53b4b4f,2024-05-06,0.0,150.74
001a008c-031f-5c97-9f5f-6369a53b4b4f,2024-05-07,0.0,150.74
001a008c-031f-5c97-9f5f-6369a53b4b4f,2024-05-08,224.46,375.2000000000001
001a008c-031f-5c97-9f5f-6369a53b4b4f,2024-05-09,-372.52,2.680000000000064
001a008c-031f-5c97-9f5f-6369a53b4b4f,2024-05-10,-18.18,-15.499999999999936


In [0]:
#Select only necessary columns
df_daily_balance = df_daily_balance.select("date", "account_id", "daily_balance")
df_daily_balance = df_daily_balance.withColumnRenamed("daily_balance", "current_balance")

##Table logic: CDI Bonus

### Previous Day Balance

In [0]:
#Generate column with previous day's balance
window_spec = Window.partitionBy("account_id").orderBy("date")
df_wallet_balance_cdi = df_daily_balance.withColumn("previous_balance", F.lag("current_balance").over(window_spec))

### Unchanged Balance Value

In [0]:
#Calculate how much of the balance over 100 has not changed in the last 24 hours
df_wallet_balance_cdi_unchanged_value = df_wallet_balance_cdi.withColumn(
    "balance_above_100_for_one_day",
    F.greatest(
        F.lit(0),
        F.least(
            F.when(F.col("current_balance") > 100, F.col("current_balance") - 100).otherwise(0),
            F.when(F.col("previous_balance") > 100, F.col("previous_balance") - 100).otherwise(0)
        )
    )
)

In [0]:
df_wallet_balance_cdi_unchanged_value.filter("date >= '2024-09-29'").filter("account_id = '001a008c-031f-5c97-9f5f-6369a53b4b4f'").display()

date,account_id,current_balance,previous_balance,balance_above_100_for_one_day
2024-09-29,001a008c-031f-5c97-9f5f-6369a53b4b4f,1084.5100000000002,616.7000000000003,516.7000000000003
2024-09-30,001a008c-031f-5c97-9f5f-6369a53b4b4f,865.1000000000003,1084.5100000000002,765.1000000000003
2024-10-01,001a008c-031f-5c97-9f5f-6369a53b4b4f,996.9900000000002,865.1000000000003,765.1000000000003
2024-10-02,001a008c-031f-5c97-9f5f-6369a53b4b4f,896.9700000000003,996.9900000000002,796.9700000000003
2024-10-03,001a008c-031f-5c97-9f5f-6369a53b4b4f,794.2400000000002,896.9700000000003,694.2400000000002
2024-10-04,001a008c-031f-5c97-9f5f-6369a53b4b4f,961.5200000000002,794.2400000000002,694.2400000000002
2024-10-05,001a008c-031f-5c97-9f5f-6369a53b4b4f,974.5300000000002,961.5200000000002,861.5200000000002
2024-10-06,001a008c-031f-5c97-9f5f-6369a53b4b4f,521.4300000000002,974.5300000000002,421.4300000000002
2024-10-07,001a008c-031f-5c97-9f5f-6369a53b4b4f,521.4300000000002,521.4300000000002,421.4300000000002


### Apply CDI Bonus

In [0]:
#Daily CDI calculation, simulating values
df_cdi = spark.createDataFrame([
    ("2024-10-01", 0.0015),
    ("2024-10-02", 0.0014),
    ("2024-10-03", 0.0015),
    ("2024-10-04", 0.0016),
    ("2024-10-05", 0.0014),
    ("2024-10-06", 0.0015),
    ("2024-10-07", 0.0016),
    ("2024-10-08", 0.0014),
    ("2024-10-09", 0.0015),
    ("2024-10-10", 0.0016),
    ("2024-10-11", 0.0014),
    ("2024-10-12", 0.0015),
    ("2024-10-13", 0.0016),
    ("2024-10-14", 0.0017),
    ("2024-10-15", 0.0014),
    ("2024-10-16", 0.0015),
    ("2024-10-17", 0.0016),
    ("2024-10-18", 0.0017),
    ("2024-10-19", 0.0014),
    ("2024-10-20", 0.0016),
], ["date", "daily_cdi_rate"]).withColumn("date", F.to_date("date"))

In [0]:
#Join ballance dataset with cdi dataset
df_ballance_cdi = df_wallet_balance_cdi_unchanged_value.join(df_cdi, on=["date"], how="left")

#Generate the CDI interest value
df_bonus_cdi = df_ballance_cdi.withColumn(
    "cdi_interest_value", F.col("balance_above_100_for_one_day") * F.col("daily_cdi_rate")
).select("date", "account_id", "balance_above_100_for_one_day", "cdi_interest_value")

#As in our simulation we only have the CDI value for the month of 2024-10-* , we will filter the table records to only include this month. Ideally, we would have the entire historical CDI record
df_bonus_cdi = df_bonus_cdi.filter((F.col("cdi_interest_value").isNotNull()))

In [0]:
#Data CDI dataset quality:
errors = []
if df_bonus_cdi.filter(F.col("date").isNull()).count() > 0:
    errors.append("Column date has null values.")

if df_bonus_cdi.filter(F.col("account_id").isNull()).count() > 0:
    errors.append("Column ‘account_id’ has null values.")

if df_bonus_cdi.filter((F.col("cdi_interest_value").isNull()) | (F.col("cdi_interest_value") < 0)).count() > 0:
    errors.append("Column cdi_interest_value has null or negative values.")

if errors:
    error_message = "\n".join(errors)
    raise Exception(f"Failed data quality validation:\n{error_message}")
else:
    print("Dataset validated successfully!")

Dataset validated successfully!


In [0]:
#Result
df_bonus_cdi.filter("account_id = '001a008c-031f-5c97-9f5f-6369a53b4b4f'").display()

date,account_id,balance_above_100_for_one_day,cdi_interest_value
2024-10-02,001a008c-031f-5c97-9f5f-6369a53b4b4f,796.9700000000003,1.1157580000000002
2024-10-01,001a008c-031f-5c97-9f5f-6369a53b4b4f,765.1000000000003,1.1476500000000005
2024-10-04,001a008c-031f-5c97-9f5f-6369a53b4b4f,694.2400000000002,1.1107840000000004
2024-10-03,001a008c-031f-5c97-9f5f-6369a53b4b4f,694.2400000000002,1.0413600000000005
2024-10-05,001a008c-031f-5c97-9f5f-6369a53b4b4f,861.5200000000002,1.2061280000000003
2024-10-06,001a008c-031f-5c97-9f5f-6369a53b4b4f,421.4300000000002,0.6321450000000003
2024-10-07,001a008c-031f-5c97-9f5f-6369a53b4b4f,421.4300000000002,0.6742880000000003


##Suggestion on how to add this information to a relational database

In [0]:
#Generating dataset with only CDI amounts to be deposited (round value)
df_payout = df_bonus_cdi.filter(F.col("cdi_interest_value") > 0)\
    .withColumn("transaction_type", F.lit("cdi_interest")) \
    .withColumn("amount", F.round(F.col("cdi_interest_value"), 2)) \
    .withColumn("source_system", F.lit("cdi_bonus_system")) \
    .withColumn("event_time", F.current_timestamp()) \
    .withColumn("user_id", F.lit("system_user_id")) \
    .select("event_time", "user_id", "account_id", "amount", "transaction_type", "source_system")

df_payout.display()

event_time,user_id,account_id,amount,transaction_type,source_system
2025-05-20T17:31:15.188+0000,system_user_id,0004bb3f-fc20-512e-b0a4-09a47cbe8ce8,3.28,cdi_interest,cdi_bonus_system
2025-05-20T17:31:15.188+0000,system_user_id,001a008c-031f-5c97-9f5f-6369a53b4b4f,1.12,cdi_interest,cdi_bonus_system
2025-05-20T17:31:15.188+0000,system_user_id,00309825-24a5-53e2-92b2-6d6b095b6c01,18.13,cdi_interest,cdi_bonus_system
2025-05-20T17:31:15.188+0000,system_user_id,007002bb-b320-5352-be3b-4bf55d3b33ca,0.57,cdi_interest,cdi_bonus_system
2025-05-20T17:31:15.188+0000,system_user_id,007105d7-4da8-59ab-90f3-cea488e3aec4,10.49,cdi_interest,cdi_bonus_system
2025-05-20T17:31:15.188+0000,system_user_id,00875b71-52a7-55ca-90de-919b290ef376,0.04,cdi_interest,cdi_bonus_system
2025-05-20T17:31:15.188+0000,system_user_id,00aa366b-99b2-5db6-8657-391b574addf7,12.14,cdi_interest,cdi_bonus_system
2025-05-20T17:31:15.188+0000,system_user_id,00d03a67-60ab-5582-a9f3-ca14802d1f95,0.43,cdi_interest,cdi_bonus_system
2025-05-20T17:31:15.188+0000,system_user_id,00d47593-67eb-518f-a2d5-982f61b289c2,0.48,cdi_interest,cdi_bonus_system
2025-05-20T17:31:15.188+0000,system_user_id,00d59762-ec6e-5a44-bd66-2cc9820399fd,4.88,cdi_interest,cdi_bonus_system


In [0]:
#Connection configuration, ideally get this information from the vault
jdbc_url = "my_jdbc_url"
dbtable = "my_dbtable"
user_name = "USER"
password = "PASSWORD"



In [0]:
#Write to database
df.write.format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", db_table) \
    .option("user", "USER") \
    .option("password", "PASSWORD") \
    .mode("append") \
    .save()

