# PySpark Account Balance Report

## Task Description
Create a report with the following information:
- Account number
- Customer  
- Opening balance 2025
- Balance end of January, February, March, ... through December
- Sum of transactions YTD (Year-to-Date)

## Data Sources
1. **Account Entries** - transaction data (AccountEntries.csv)
2. **Account Information** - account details (AccountInformation.csv)

In [1]:
# Import required libraries
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import *
from pyspark.storagelevel import StorageLevel
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

print("All libraries successfully imported!")

All libraries successfully imported!


In [21]:
# Configuration constants for better maintainability
TARGET_YEAR = 2025
PREVIOUS_YEAR = TARGET_YEAR - 1  # 2024
CSV_SEPARATOR = ";"
DATE_FORMAT = "dd/MM/yyyy"
OUTPUT_LIMIT = 10  # Number of rows to display in final report

print(f"Configuration: Analyzing data for year {TARGET_YEAR}")
print(f"Output will be limited to {OUTPUT_LIMIT} rows for display")

Configuration: Analyzing data for year 2025
Output will be limited to 10 rows for display


In [3]:
# Environment diagnostics
import sys
import os

print("🔍 Environment Diagnostics:")
print(f"Python executable: {sys.executable}")
print(f"Python version: {sys.version}")
print(f"Current working directory: {os.getcwd()}")

# Check if Java is available
print("\n🔍 Java check:")
try:
    import subprocess

    result = subprocess.run(
        ["java", "-version"], capture_output=True, text=True, shell=True
    )
    if result.returncode == 0:
        print("✅ Java is available")
        print(result.stderr)
    else:
        print("❌ Java not found in PATH")
except Exception as e:
    print(f"❌ Error checking Java: {e}")

# Check PySpark installation
print("\n🔍 PySpark check:")
try:
    import pyspark

    print(f"✅ PySpark version: {pyspark.__version__}")
    print(f"PySpark location: {pyspark.__file__}")
except ImportError as e:
    print(f"❌ PySpark import error: {e}")

print("\n🔍 Environment variables:")
for var in ["JAVA_HOME", "SPARK_HOME", "PYTHONPATH"]:
    value = os.environ.get(var, "Not set")
    print(f"{var}: {value}")

🔍 Environment Diagnostics:
Python executable: c:\Users\juraj\Desktop\ucoudify_pyspark\pyspark_env_final\Scripts\python.exe
Python version: 3.11.9 (tags/v3.11.9:de54cf5, Apr  2 2024, 10:12:12) [MSC v.1938 64 bit (AMD64)]
Current working directory: c:\Users\juraj\Desktop\ucoudify_pyspark\Tryout-Data-Dev-2025Q3-PySpark

🔍 Java check:
✅ Java is available
openjdk version "11.0.28" 2025-07-15
OpenJDK Runtime Environment Temurin-11.0.28+6 (build 11.0.28+6)
OpenJDK 64-Bit Server VM Temurin-11.0.28+6 (build 11.0.28+6, mixed mode)


🔍 PySpark check:
✅ PySpark version: 3.5.1
PySpark location: c:\Users\juraj\Desktop\ucoudify_pyspark\pyspark_env_final\Lib\site-packages\pyspark\__init__.py

🔍 Environment variables:
JAVA_HOME: C:\Program Files\Eclipse Adoptium\jdk-11.0.28.6-hotspot\
SPARK_HOME: Not set
PYTHONPATH: Not set
✅ Java is available
openjdk version "11.0.28" 2025-07-15
OpenJDK Runtime Environment Temurin-11.0.28+6 (build 11.0.28+6)
OpenJDK 64-Bit Server VM Temurin-11.0.28+6 (build 11.0.28+6,

In [20]:
# Initialize Spark Session

print("Initializing Spark Session...")

spark = (
    SparkSession.builder.appName("Account Balance Report")
    .config("spark.sql.adaptive.enabled", "true")
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true")
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .getOrCreate()
)

print(
    f"Spark Session initialized successfully!"
)
print(
    f"Spark Version: {spark.version}"
)
print(
    f"Application Name: {spark.sparkContext.appName}"
)
print(
    f"Spark UI: {spark.sparkContext.uiWebUrl}"
)

Initializing Spark Session...
Spark Session initialized successfully!
Spark Version: 3.5.1
Application Name: Account Balance Report
Spark UI: http://BENDIKFILMSXPS:4040


## Data Loading - Account Information

Loading and preparing account master data with proper date type casting and validation.

In [19]:
# Load Account Information (cast dates; treat 'NULL' as null)
# Note: CSV files use semicolon (;) as separator and comma (,) as decimal separator

account_info_df = (
    spark.read.option("header", "true")
    .option("sep", ";")
    .option("inferSchema", "true")
    .option("nullValue", "NULL")
    .csv("Data/AccountInformation.csv")
)

# Cast date columns from dd/MM/yyyy to proper DateType
info_prepared_df = (
    account_info_df.withColumn(
        "OpeningDate", F.to_date(F.col("OpeningDate"), "dd/MM/yyyy")
    )
    .withColumn("ClosingDate", F.to_date(F.col("ClosingDate"), "dd/MM/yyyy"))
    .withColumn("Customer", F.trim(F.col("Customer")))
)

print("Account Information loaded and date columns cast to DateType")
print("Schema:")
info_prepared_df.printSchema();
print(
    f"Row count: {info_prepared_df.count()}"
)

# Cache the DataFrame to improve performance for subsequent actions
info_prepared_df.persist(StorageLevel.MEMORY_AND_DISK)

Account Information loaded and date columns cast to DateType
Schema:
root
 |-- Account: integer (nullable = true)
 |-- Customer: string (nullable = true)
 |-- OpeningDate: date (nullable = true)
 |-- ClosingDate: date (nullable = true)

Row count: 4


DataFrame[Account: int, Customer: string, OpeningDate: date, ClosingDate: date]

## Data Loading - Account Entries

Loading and preparing transaction data with amount cleaning, validation, and proper type casting. This section handles European CSV format with semicolon separators and comma decimal notation.

In [17]:
# --- Step 1: Load the AccountEntries.csv data from the file ---
print("--> Loading AccountEntries.csv...")
account_entries_df = (
    spark.read.option("header", "true")
    .option("sep", ";")
    .csv("Data/AccountEntries.csv")
)

# --- Step 2: Clean, Validate, and Transform the data ---
print("--> Data loaded. Starting cleaning, validation, and transformation...")

# --- Part 2a: Handle malformed amounts ---
# Create a cleaned Amount column, replacing European format with standard format.
# This also handles potential non-numeric values.
entries_cleaned_amount_df = account_entries_df.withColumn(
    "Amount_cleaned",
    F.regexp_replace(
        F.regexp_replace(F.col("Amount"), "\\.", ""),
        ",",
        ".",
    ),
).withColumn(
    "Amount_numeric",
    F.col("Amount_cleaned").cast(DoubleType()),
)

# Identify and report rows with bad amounts
bad_amount_rows = entries_cleaned_amount_df.filter(F.col("Amount_numeric").isNull())
bad_amount_count = bad_amount_rows.count()

if bad_amount_count > 0:
    print(
        f"\nWARNING: Found {bad_amount_count} rows with malformed 'Amount'. These rows will be excluded."
    )
    print("Rows with malformed amounts:")
    bad_amount_rows.select("Date", "Account", "Amount").show()

# Keep only the rows where the amount could be successfully converted to a number
entries_valid_amount_df = entries_cleaned_amount_df.filter(
    F.col("Amount_numeric").isNotNull()
).select(
    F.col("Date"),
    F.col("Account"),
    F.col("Amount_numeric").alias("Amount"),
    F.col("Currency"),
    F.col("Text"),
)

# --- Part 2b: Final Type Casting ---
entries_prepared_df = entries_valid_amount_df.withColumn(
    "Date", F.to_date(F.col("Date"), "dd/MM/yyyy")
).withColumn("Account", F.col("Account").cast("integer"))

# --- Step 3: Verify the result ---
print("\n--> Preparation complete. Verifying the result:")
print("\nFinal Schema:")
entries_prepared_df.printSchema();

print(f"\nFinal row count after cleaning: {entries_prepared_df.count()}")
print("\nSample Data:")

# Cache the DataFrame to improve performance for subsequent actions
entries_prepared_df.persist(StorageLevel.MEMORY_AND_DISK)

--> Loading AccountEntries.csv...
--> Data loaded. Starting cleaning, validation, and transformation...

--> Preparation complete. Verifying the result:

Final Schema:
root
 |-- Date: date (nullable = true)
 |-- Account: integer (nullable = true)
 |-- Amount: double (nullable = true)
 |-- Currency: string (nullable = true)
 |-- Text: string (nullable = true)


Final row count after cleaning: 16

Sample Data:


DataFrame[Date: date, Account: int, Amount: double, Currency: string, Text: string]

In [7]:
# Final code for calculations

# --- Data Preparation: Create an 'EffectiveDate' for Universal Logic ---
# create an 'EffectiveDate' column. 
# For 'Opening balance' transactions: treat them
# as if they occurred on the last day of the previous year (2024-12-31).
# This allows a single, robust logic to handle all cases correctly.
print("--> Preparing data with an 'EffectiveDate' for robust calculations...")

entries_with_effective_date_df = entries_prepared_df.withColumn(
    "EffectiveDate",
    F.when(
        F.col("Text") == "Opening balance",
        F.to_date(F.lit("2024-12-31")),
    ).otherwise(
        F.col("Date")
    ),
)

# 1. Identify all accounts that have an explicit "Opening balance" entry.
accounts_with_explicit_ob = (
    entries_prepared_df.filter(F.col("Text") == "Opening balance")
    .select(F.col("Account").alias("acc_with_ob"))
    .distinct()
)

# 2. Filter the main transactions DataFrame to implement the exclusion rule.
# We keep a transaction if:
#   a) Its EffectiveDate is in 2025 or later (it's a current year transaction).
#   OR
#   b) The account it belongs to is NOT in our list of accounts with an explicit opening balance.
# This effectively drops historical transactions ONLY for accounts that have an explicit OB.
final_entries_for_calc_df = (
    entries_with_effective_date_df.join(
        F.broadcast(accounts_with_explicit_ob),
        F.col("Account") == F.col("acc_with_ob"),
        "left",
    )
    .filter((F.year(F.col("EffectiveDate")) >= 2025) | (F.col("acc_with_ob").isNull()))
    .drop("acc_with_ob")
)

# --- Step 1: Calculate Opening Balance ---
# The opening balance is ALWAYS the sum of all transactions with an EffectiveDate before 2025.
print("--> Step 1: Calculating Opening Balance...")
opening_balance_df = (
    entries_with_effective_date_df.filter(F.year(F.col("EffectiveDate")) < 2025)
    .groupBy("Account")
    .agg(F.sum("Amount").alias("OpeningBalance2025"))
)

--> Preparing data with an 'EffectiveDate' for robust calculations...
--> Step 1: Calculating Opening Balance...


## Business Logic Implementation

This section implements the core business logic for calculating:
- Opening balances for 2025
- Monthly running totals  
- Year-to-date transaction sums
- Final balance calculations with mid-year account handling

In [8]:
# --- Step 2: Calculate Year-To-Date (YTD) Transactions for 2025 ---
# Filter for transactions with an EffectiveDate within 2025.
# 'Opening balance' rows are automatically excluded because we moved their EffectiveDate to 2024.
print("--> Step 2: Calculating Sum of Transactions for 2025 (YTD)...")
ytd_transactions_df = (
    entries_with_effective_date_df.filter(F.year(F.col("EffectiveDate")) == 2025)
    .groupBy("Account")
    .agg(F.sum("Amount").alias("SumTransactionsYTD"))
)

--> Step 2: Calculating Sum of Transactions for 2025 (YTD)...


In [9]:
# --- Step 3: Calculate End-of-Month cumulative sums for 2025 ---
print("--> Step 3: Calculating monthly running totals for 2025...")
window_spec = Window.partitionBy("Account").orderBy("Month")
monthly_balances_df = (
    entries_with_effective_date_df.filter(F.year(F.col("EffectiveDate")) == 2025)
    .withColumn("Month", F.month(F.col("EffectiveDate")))
    .groupBy("Account", "Month")
    .agg(F.sum("Amount").alias("MonthlySum"))
    .withColumn("RunningTotal2025", F.sum("MonthlySum").over(window_spec))
    .groupBy("Account")
    .pivot("Month", list(range(1, 13)))
    .agg(F.first("RunningTotal2025"))
)

month_map = {
    1: "Jan",
    2: "Feb",
    3: "Mar",
    4: "Apr",
    5: "May",
    6: "Jun",
    7: "Jul",
    8: "Aug",
    9: "Sep",
    10: "Oct",
    11: "Nov",
    12: "Dec",
}
for month_num, month_abbr in month_map.items():
    if str(month_num) in monthly_balances_df.columns:
        monthly_balances_df = monthly_balances_df.withColumnRenamed(
            str(month_num), f"RunningTotal_{month_abbr}"
        )

# --- Step 4: Assemble the Final Report by Joining All Components ---
print("--> Step 4: Assembling the final report...")
final_report_df = (
    info_prepared_df.join(opening_balance_df, "Account", "left")
    .join(ytd_transactions_df, "Account", "left")
    .join(monthly_balances_df, "Account", "left")
)

# --- Step 5: Final Calculations and Cleanup ---
print("--> Step 5: Performing final calculations and cleanup...")
final_report_df = final_report_df.na.fill(
    0, ["OpeningBalance2025", "SumTransactionsYTD"]
)

# --- Part 5a: First, forward-fill the RUNNING TOTALS to handle months with no activity ---
# This ensures that a running total from a previous month is carried over.
print("--> Step 5a: Forward-filling monthly running totals...")
monthly_running_total_cols = []
for i in range(1, 13):
    col_name = f"RunningTotal_{month_map[i]}"
    if col_name in final_report_df.columns:
        monthly_running_total_cols.append(col_name)
        if i > 1:
            prev_col_name = f"RunningTotal_{month_map[i-1]}"
            # If current month's running total is null, take it from the previous month
            final_report_df = final_report_df.withColumn(
                col_name,
                F.when(F.col(col_name).isNull(), F.col(prev_col_name)).otherwise(
                    F.col(col_name)
                ),
            )

# Fill any remaining nulls (for early months with no transactions) with 0
final_report_df = final_report_df.na.fill(0, monthly_running_total_cols)

# --- Part 5b: Now, create the final BalanceEndOf columns by adding the OpeningBalance ---
print("--> Step 5b: Calculating final end-of-month balances...")
for month_abbr in month_map.values():
    running_total_col = f"RunningTotal_{month_abbr}"
    final_balance_col = f"BalanceEndOf_{month_abbr}"

    if running_total_col in final_report_df.columns:
        final_report_df = final_report_df.withColumn(
            final_balance_col, F.col("OpeningBalance2025") + F.col(running_total_col)
        )

--> Step 3: Calculating monthly running totals for 2025...
--> Step 4: Assembling the final report...
--> Step 5: Performing final calculations and cleanup...
--> Step 5a: Forward-filling monthly running totals...
--> Step 4: Assembling the final report...
--> Step 5: Performing final calculations and cleanup...
--> Step 5a: Forward-filling monthly running totals...
--> Step 5b: Calculating final end-of-month balances...
--> Step 5b: Calculating final end-of-month balances...


In [14]:
# --- Step 6: Final Adjustments and Display ---

# --- Part 6a: Handle accounts opened mid-year ---
# Apply mid-year nullification ONLY for accounts that do NOT have an "Opening balance" entry in 2025.
# For accounts with "Opening balance" in 2025, ignore OpeningDate/ClosingDate constraints.
print(
    "--> Step 6a: Nullifying balances for months prior to account opening (refined logic)..."
)

# 1. Identify accounts that have "Opening balance" transactions in 2025
accounts_with_2025_ob = (
    entries_with_effective_date_df.filter(
        (F.col("Text") == "Opening balance") & (F.year(F.col("Date")) == 2025)
    )
    .select(F.col("Account").alias("acc_with_2025_ob"))
    .distinct()
)

# 2. Join this information to our final report
final_report_adjusted_df = final_report_df.join(
    F.broadcast(accounts_with_2025_ob),
    F.col("Account") == F.col("acc_with_2025_ob"),
    "left",
).withColumn("OpeningMonth", F.month(F.col("OpeningDate")))

# 3. Apply mid-year nullification ONLY for accounts WITHOUT 2025 opening balance
for i in range(1, 13):
    month_abbr = month_map.get(i)
    if month_abbr:
        col_name = f"BalanceEndOf_{month_abbr}"

        if col_name in final_report_adjusted_df.columns:
            final_report_adjusted_df = final_report_adjusted_df.withColumn(
                col_name,
                F.when(
                    # Apply nullification ONLY if account has NO 2025 opening balance AND opened mid-year
                    (F.col("acc_with_2025_ob").isNull())
                    & (F.year(F.col("OpeningDate")) == 2025)
                    & (i < F.col("OpeningMonth")),
                    None,
                ).otherwise(F.col(col_name)),
            )

# Drop the temporary helper columns
final_report_adjusted_df = final_report_adjusted_df.drop(
    "acc_with_2025_ob", "OpeningMonth"
)

# --- Part 6b: Display the Final Report using Pandas for professional formatting ---
print(
    "\n--> Step 6b: Formatting final report for presentation..."
)

# First, perform all rounding calculations on our adjusted DataFrame
final_report_rounded_df = final_report_adjusted_df.select(
    F.col("Account"),
    F.col("Customer"),
    F.round(F.col("OpeningBalance2025"), 2).alias("OpeningBalance2025"),
    *[
        F.round(F.col(f"BalanceEndOf_{month_map[i]}"), 2).alias(
            f"BalanceEndOf_{month_map[i]}"
        )
        for i in range(1, 13)
        if f"BalanceEndOf_{month_map[i]}" in final_report_adjusted_df.columns
    ],
    F.round(F.col("SumTransactionsYTD"), 2).alias("SumTransactionsYTD"),
)

# Convert the final, small Spark DataFrame to a Pandas DataFrame
pandas_df = final_report_rounded_df.limit(10).toPandas()

# Define the columns that need financial formatting
financial_columns = [
    col for col in pandas_df.columns if col not in ["Account", "Customer"]
]

# Apply formatting (thousands separator, two decimal places) to the financial columns
# This now also handles the None/NaN values correctly
for col in financial_columns:
    pandas_df[col] = pd.to_numeric(pandas_df[col], errors="coerce").apply(
        lambda x: f"{x:,.2f}" if pd.notnull(x) else ""
    )

# Display formatted Pandas DataFrame, replacing any remaining NaNs with an empty string
display(pandas_df.fillna(""))

--> Step 6a: Nullifying balances for months prior to account opening (refined logic)...

--> Step 6b: Formatting final report for presentation...


Unnamed: 0,Account,Customer,OpeningBalance2025,BalanceEndOf_Jan,BalanceEndOf_Feb,BalanceEndOf_Mar,BalanceEndOf_Apr,BalanceEndOf_May,BalanceEndOf_Jun,BalanceEndOf_Jul,BalanceEndOf_Aug,BalanceEndOf_Sep,BalanceEndOf_Oct,BalanceEndOf_Nov,BalanceEndOf_Dec,SumTransactionsYTD
0,1000001,John,3322909.9,3346333.3,3623080.7,3621837.36,3621837.36,3621837.36,3621837.36,3621837.36,3621837.36,3621837.36,3621837.36,3621837.36,3621837.36,298927.46
1,1000002,Betty,38383992.4,38383680.0,38384334.4,38389768.8,38389768.8,38389768.8,38389768.8,38389768.8,38389768.8,38389768.8,38389768.8,38389768.8,38389768.8,5776.4
2,1000003,Jessica,5584843.9,5596967.23,5591321.83,5588998.43,5588998.43,5588998.43,5588998.43,5588998.43,5588998.43,5588998.43,5588998.43,5588998.43,5588998.43,4154.53
3,1000004,Josh,988786.9,986432.6,1462888.2,1459465.3,1459465.3,1459465.3,1459465.3,1459465.3,1459465.3,1459465.3,1459465.3,1459465.3,1459465.3,470678.4


## Unit Testing Setup - Edge Case Scenarios

Creating controlled test data to validate the robustness of our business logic. This section defines specific edge cases that cover different account types and data quality scenarios.

In [15]:
# ==============================================================================
# BONUS: Unit Testing with Defined Edge Cases
# ==============================================================================
# To validate the robustness of my logic, create a small, controlled
# set of test data that covers specific edge cases.
# Run the entire calculation pipeline on this test data and verify the output.

print("--> Setting up test environment and data...")

# --- Test Case Data Definition ---

# Case 1: "Historical Harry" - No 'Opening balance' entry, balance calculated from past transactions.
# Expected OpeningBalance2025 = 1500 - 200 = 1300
harry_info = [(601, "Historical Harry", "2024-03-15", None)]
harry_entries = [
    (601, "2024-05-20", "1500.00", "Past Income"),
    (601, "2024-11-10", "-200.00", "Past Expense"),
    (601, "2025-01-25", "100.00", "Jan Deposit"),
]

# Case 2: "Corrupt Carla" - Has an 'Opening balance' entry but also a row with a malformed amount.
# Expected: The bad row should be filtered out and reported.
carla_info = [(602, "Corrupt Carla", "2023-01-01", None)]
carla_entries = [
    (602, "2025-01-01", "5000.00", "Opening balance"),
    (602, "2025-02-10", "BAD_DATA", "Corrupted Entry"),
    (602, "2025-02-15", "-50.00", "Feb Expense"),
]

# Case 3: "Mid-year Mike" - Account opened in the middle of 2025.
# Expected: Balances for Jan-Apr should be NULL.
mike_info = [(603, "Mid-year Mike", "2025-05-22", None)]
mike_entries = [
    (603, "2025-05-25", "2000.00", "Initial Deposit"),
    (603, "2025-06-10", "150.00", "Jun Deposit"),
]

# Case 4: "Josh" - Has Opening balance in 2025 but wrong OpeningDate (should ignore date constraints)
# Expected: All months should have valid balances, ignore the OpeningDate
josh_info = [(604, "Josh", "2025-06-01", None)]
josh_entries = [
    (604, "2025-01-01", "3000.00", "Opening balance"),
    (604, "2025-02-15", "500.00", "Feb Income"),
    (604, "2025-03-10", "-200.00", "Mar Expense"),
]

# --- Create Spark DataFrames from the test data ---

# Combine all test data
test_info_data = harry_info + carla_info + mike_info + josh_info
test_entries_data = harry_entries + carla_entries + mike_entries + josh_entries

# Define schemas to match our main DataFrames
# NOTE: Using pyspark.sql.types aliased as T is a common convention
from pyspark.sql import types as T

info_schema = T.StructType(
    [
        T.StructField("Account", T.IntegerType()),
        T.StructField("Customer", T.StringType()),
        T.StructField("OpeningDate", T.StringType()),
        T.StructField("ClosingDate", T.StringType()),
    ]
)

entries_schema = T.StructType(
    [
        T.StructField("Account", T.IntegerType()),
        T.StructField("Date", T.StringType()),
        T.StructField(
            "Amount", T.StringType()
        ),  # Read as string to test the cleaning logic
        T.StructField("Text", T.StringType()),
    ]
)

# Create the test DataFrames
test_info_df = spark.createDataFrame(test_info_data, schema=info_schema)
test_entries_df = spark.createDataFrame(test_entries_data, schema=entries_schema)

print("--> Test DataFrames created successfully.")
test_info_df.show();
test_entries_df.show();

--> Setting up test environment and data...
--> Test DataFrames created successfully.
+-------+----------------+-----------+-----------+
|Account|        Customer|OpeningDate|ClosingDate|
+-------+----------------+-----------+-----------+
|    601|Historical Harry| 2024-03-15|       NULL|
|    602|   Corrupt Carla| 2023-01-01|       NULL|
|    603|   Mid-year Mike| 2025-05-22|       NULL|
|    604|            Josh| 2025-06-01|       NULL|
+-------+----------------+-----------+-----------+

+-------+----------+--------+---------------+
|Account|      Date|  Amount|           Text|
+-------+----------+--------+---------------+
|    601|2024-05-20| 1500.00|    Past Income|
|    601|2024-11-10| -200.00|   Past Expense|
|    601|2025-01-25|  100.00|    Jan Deposit|
|    602|2025-01-01| 5000.00|Opening balance|
|    602|2025-02-10|BAD_DATA|Corrupted Entry|
|    602|2025-02-15|  -50.00|    Feb Expense|
|    603|2025-05-25| 2000.00|Initial Deposit|
|    603|2025-06-10|  150.00|    Jun Deposit|

## Unit Testing & Validation

This section contains comprehensive unit tests with edge cases:
- **Historical Harry**: Account with historical transactions (no opening balance entry)
- **Corrupt Carla**: Data validation testing with malformed amounts  
- **Mid-year Mike**: Account opened mid-year (should have NULL balances before opening)
- **Josh**: Account with 2025 opening balance but wrong opening date (should ignore date constraints)

In [12]:
# --- Encapsulate the entire ETL pipeline into a function ---
# This makes our logic reusable for both production data and our test data.


def generate_financial_report(info_df, entries_df):
    """
    Takes raw info and entries DataFrames and returns a final, formatted report.
    This function contains the entire logic developed in this notebook.
    """
    print("\n--> Starting report generation pipeline...")

    # --- Part 1: Prepare AccountInformation data ---
    info_prepared_df = (
        info_df.withColumn("OpeningDate", F.to_date(F.col("OpeningDate"), "yyyy-MM-dd"))
        .withColumn("ClosingDate", F.to_date(F.col("ClosingDate"), "yyyy-MM-dd"))
        .withColumn("Customer", F.trim(F.col("Customer")))
    )

    # --- Part 2: Prepare and Validate AccountEntries data ---
    entries_cleaned_amount_df = entries_df.withColumn(
        "Amount_numeric", F.col("Amount").cast(DoubleType())
    )
    bad_amount_rows = entries_cleaned_amount_df.filter(F.col("Amount_numeric").isNull())
    bad_amount_count = bad_amount_rows.count()
    if bad_amount_count > 0:
        print(
            f"\nVALIDATION: Found {bad_amount_count} rows with malformed 'Amount'. These will be excluded."
        )
        bad_amount_rows.select("Date", "Account", "Amount").show()

    entries_prepared_df = (
        entries_cleaned_amount_df.filter(F.col("Amount_numeric").isNotNull())
        .select(
            F.to_date(F.col("Date"), "yyyy-MM-dd").alias("Date"),
            "Account",
            "Amount_numeric",
            "Text",
        )
        .withColumnRenamed("Amount_numeric", "Amount")
    )

    # --- Part 3: Create EffectiveDate and calculate report components ---
    entries_with_effective_date_df = entries_prepared_df.withColumn(
        "EffectiveDate",
        F.when(
            F.col("Text") == "Opening balance", F.to_date(F.lit("2024-12-31"))
        ).otherwise(F.col("Date")),
    )

    opening_balance_df = (
        entries_with_effective_date_df.filter(F.year(F.col("EffectiveDate")) < 2025)
        .groupBy("Account")
        .agg(F.sum("Amount").alias("OpeningBalance2025"))
    )
    ytd_transactions_df = (
        entries_with_effective_date_df.filter(F.year(F.col("EffectiveDate")) == 2025)
        .groupBy("Account")
        .agg(F.sum("Amount").alias("SumTransactionsYTD"))
    )

    window_spec = Window.partitionBy("Account").orderBy("Month")
    monthly_balances_df = (
        entries_with_effective_date_df.filter(F.year(F.col("EffectiveDate")) == 2025)
        .withColumn("Month", F.month(F.col("EffectiveDate")))
        .groupBy("Account", "Month")
        .agg(F.sum("Amount").alias("MonthlySum"))
        .withColumn("RunningTotal2025", F.sum("MonthlySum").over(window_spec))
        .groupBy("Account")
        .pivot("Month", list(range(1, 13)))
        .agg(F.first("RunningTotal2025"))
    )
    for month_num, month_abbr in month_map.items():
        if str(month_num) in monthly_balances_df.columns:
            monthly_balances_df = monthly_balances_df.withColumnRenamed(
                str(month_num), f"RunningTotal_{month_abbr}"
            )

    # --- Part 4: Assemble and finalize the report ---
    final_report_df = (
        info_prepared_df.join(opening_balance_df, "Account", "left")
        .join(ytd_transactions_df, "Account", "left")
        .join(monthly_balances_df, "Account", "left")
    )
    final_report_df = final_report_df.na.fill(
        0, ["OpeningBalance2025", "SumTransactionsYTD"]
    )

    monthly_running_total_cols = [
        f"RunningTotal_{month_map[i]}"
        for i in range(1, 13)
        if f"RunningTotal_{month_map[i]}" in final_report_df.columns
    ]
    for i, col_name in enumerate(monthly_running_total_cols):
        if i > 0:
            prev_col = monthly_running_total_cols[i - 1]
            final_report_df = final_report_df.withColumn(
                col_name,
                F.when(F.col(col_name).isNull(), F.col(prev_col)).otherwise(
                    F.col(col_name)
                ),
            )
    final_report_df = final_report_df.na.fill(0, monthly_running_total_cols)

    for month_abbr in month_map.values():
        running_total_col = f"RunningTotal_{month_abbr}"
        final_balance_col = f"BalanceEndOf_{month_abbr}"
        if running_total_col in final_report_df.columns:
            final_report_df = final_report_df.withColumn(
                final_balance_col,
                F.col("OpeningBalance2025") + F.col(running_total_col),
            )

    # --- Part 5: Handle mid-year opening edge case ---
    # Apply mid-year nullification ONLY for accounts that do NOT have "Opening balance" in 2025
    accounts_with_2025_ob = (
        entries_with_effective_date_df.filter(
            (F.col("Text") == "Opening balance") & (F.year(F.col("Date")) == 2025)
        )
        .select(F.col("Account").alias("acc_with_2025_ob"))
        .distinct()
    )

    final_report_df = final_report_df.join(
        F.broadcast(accounts_with_2025_ob),
        F.col("Account") == F.col("acc_with_2025_ob"),
        "left",
    ).withColumn("OpeningMonth", F.month(F.col("OpeningDate")))

    for i, month_abbr in month_map.items():
        col_name = f"BalanceEndOf_{month_abbr}"
        if col_name in final_report_df.columns:
            final_report_df = final_report_df.withColumn(
                col_name,
                F.when(
                    # Apply nullification ONLY if account has NO 2025 opening balance AND opened mid-year
                    (F.col("acc_with_2025_ob").isNull())
                    & (F.year(F.col("OpeningDate")) == 2025)
                    & (i < F.col("OpeningMonth")),
                    None,
                ).otherwise(F.col(col_name)),
            )

    final_report_df = final_report_df.drop("acc_with_2025_ob", "OpeningMonth")

    # --- Part 6: Final Selection and Formatting ---
    final_columns = (
        ["Account", "Customer", "OpeningBalance2025"]
        + [
            f"BalanceEndOf_{m}"
            for m in month_map.values()
            if f"BalanceEndOf_{m}" in final_report_df.columns
        ]
        + ["SumTransactionsYTD"]
    )
    final_report_df = final_report_df.select(final_columns)

    print("--> Report generation pipeline finished.")
    return final_report_df

In [22]:
# --- Run the pipeline on our test data ---
test_report_df = generate_financial_report(test_info_df, test_entries_df)

print("\n\n--> FINAL TEST REPORT (for visual validation):")
test_report_df.show(truncate=False);

# --- Automated Assertions for Verification ---
print("\n--> Running automated assertions on the test report...")

# Convert to Pandas for easy row-by-row checking
test_report_pd = test_report_df.toPandas()
test_report_pd.set_index("Account", inplace=True)

# Test Case 1: Historical Harry
assert (
    test_report_pd.loc[601]["OpeningBalance2025"] == 1300.0
), "Test Failed: Harry's opening balance is incorrect."
assert (
    test_report_pd.loc[601]["SumTransactionsYTD"] == 100.0
), "Test Failed: Harry's YTD sum is incorrect."
assert (
    test_report_pd.loc[601]["BalanceEndOf_Jan"] == 1400.0
), "Test Failed: Harry's Jan balance is incorrect."
print("✅ Test Passed: Historical Harry was calculated correctly.")

# Test Case 2: Corrupt Carla
assert (
    test_report_pd.loc[602]["OpeningBalance2025"] == 5000.0
), "Test Failed: Carla's opening balance is incorrect."
assert (
    test_report_pd.loc[602]["SumTransactionsYTD"] == -50.0
), "Test Failed: Carla's YTD sum is incorrect (should ignore bad data)."
print("✅ Test Passed: Corrupt Carla was calculated correctly (bad data was filtered).")

# Test Case 3: Mid-year Mike
assert (
    test_report_pd.loc[603]["OpeningBalance2025"] == 0.0
), "Test Failed: Mike's opening balance should be 0."
assert pd.isna(
    test_report_pd.loc[603]["BalanceEndOf_Apr"]
), "Test Failed: Mike's April balance should be NULL."
assert (
    test_report_pd.loc[603]["BalanceEndOf_May"] == 2000.0
), "Test Failed: Mike's May balance is incorrect."
assert (
    test_report_pd.loc[603]["BalanceEndOf_Jun"] == 2150.0
), "Test Failed: Mike's June balance is incorrect."
print("✅ Test Passed: Mid-year Mike was calculated correctly (nulls before opening).")

# Test Case 4: Josh (Opening balance in 2025, should ignore OpeningDate)
assert (
    test_report_pd.loc[604]["OpeningBalance2025"] == 3000.0
), "Test Failed: Josh's opening balance is incorrect."
assert (
    test_report_pd.loc[604]["BalanceEndOf_Jan"] == 3000.0
), "Test Failed: Josh's Jan balance should be valid (ignore OpeningDate)."
assert (
    test_report_pd.loc[604]["BalanceEndOf_Feb"] == 3500.0
), "Test Failed: Josh's Feb balance is incorrect."
assert (
    test_report_pd.loc[604]["BalanceEndOf_Mar"] == 3300.0
), "Test Failed: Josh's Mar balance is incorrect."
assert (
    test_report_pd.loc[604]["SumTransactionsYTD"] == 300.0
), "Test Failed: Josh's YTD sum is incorrect."
print(
    "✅ Test Passed: Josh was calculated correctly (Opening balance in 2025 ignores OpeningDate)."
)

print("\n--> All unit tests passed successfully!")


--> Starting report generation pipeline...

VALIDATION: Found 1 rows with malformed 'Amount'. These will be excluded.
+----------+-------+--------+
|      Date|Account|  Amount|
+----------+-------+--------+
|2025-02-10|    602|BAD_DATA|
+----------+-------+--------+

--> Report generation pipeline finished.


--> FINAL TEST REPORT (for visual validation):
+-------+----------------+------------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+------------------+
|Account|Customer        |OpeningBalance2025|BalanceEndOf_Jan|BalanceEndOf_Feb|BalanceEndOf_Mar|BalanceEndOf_Apr|BalanceEndOf_May|BalanceEndOf_Jun|BalanceEndOf_Jul|BalanceEndOf_Aug|BalanceEndOf_Sep|BalanceEndOf_Oct|BalanceEndOf_Nov|BalanceEndOf_Dec|SumTransactionsYTD|
+-------+----------------+------------------+----------------+----------------+----------------+------

## Summary

### Successfully Completed:
1. **Data Loading**: Account Information and Entries loaded with proper validation
2. **Data Cleaning**: European number format handling and malformed data filtering  
3. **Business Logic**: Opening balances, monthly running totals, and YTD calculations
4. **Edge Case Handling**: Mid-year accounts and 2025 opening balance logic
5. **Quality Assurance**: Comprehensive unit tests with 4 test scenarios

### Key Features Implemented:
- **Robust Date Handling**: Automatic EffectiveDate assignment for opening balances
- **Memory Optimization**: StorageLevel.MEMORY_AND_DISK caching for performance
- **Error Recovery**: Graceful handling of malformed amount data
- **Professional Formatting**: Pandas display with thousands separators

### Final Output:
Account balance report with monthly progression from January to December 2025, including opening balances and year-to-date transaction summaries.