#INIT

In [0]:
from pyspark.sql.functions import coalesce, col, lit, sum, row_number, to_date, when, count, countDistinct, round as spark_round
from pyspark.sql.types import DecimalType
from pyspark.sql.window import Window

COVERED_DATE = {
    "start": "2025-01-01",
    "end": "2025-01-31"
}

group_keys = [
    "loanid",
    "pawnticketno",
    "transactiondate",
    "branchcode",
    "tag",
    "principal"
]

REQUIRED_AMOUNT_COLS = [
    # C01
    "C01_400001","C01_400002","C01_400003","C01_400004","C01_500005","C01_205004",
    # C02
    "C02_400001","C02_400002","C02_400003","C02_400004","C02_500005","C02_205004",
    # D01
    "D01_400007","D01_400008","D01_400011","D01_653002","D01_653003","D01_653004",
    # D02
    "D02_400007","D02_400008","D02_400011","D02_653002","D02_653003","D02_653004",
]

L01_TAGS = ["L01_NewPawn", "L01_FrRenewal"]

L02_TAGS = [
    "L02_Renewed",
    "L02_RenewedOnline",
    "L02_Redeemed",
    "L02_Foreclosed"
]

def zero_if_null(expr):
    return coalesce(expr, lit(0))

agg_exprs = [

    # --- L01 ---
    zero_if_null(sum(when(col("tag") == "L01_NewPawn", 1))).alias("l01_newpawncnt"),
    zero_if_null(sum(when(col("tag") == "L01_FrRenewal", 1))).alias("l01_frrenewalcnt"),

    zero_if_null(sum(when(col("tag") == "L01_NewPawn", col("principal")))).alias("l01_newpawnprn"),
    zero_if_null(sum(when(col("tag") == "L01_FrRenewal", col("principal")))).alias("l01_frrenewalprn"),

    # --- L02 ---
    *[
        zero_if_null(sum(when(col("tag") == tag, 1))).alias(f"{tag.lower()}cnt")
        for tag in L02_TAGS
    ],

    *[
        zero_if_null(sum(when(col("tag") == tag, col("principal")))).alias(f"{tag.lower()}prn")
        for tag in L02_TAGS
    ],

    # --- Pawn Tickets ---
    zero_if_null(
        countDistinct(
            when(col("tag").isin(L01_TAGS), col("pawnticketno"))
        )
    ).alias("newfrrnwptcnt"),

    zero_if_null(
        sum(
            when(
                (col("tag").isin(L01_TAGS)) & (col("rn") == 1),
                col("principal")
            )
        )
    ).alias("newfrrnwptprn"),

    zero_if_null(
        countDistinct(
            when(col("tag").isin(L02_TAGS), col("pawnticketno"))
        )
    ).alias("closedptcnt"),

    zero_if_null(
        sum(
            when(
                (col("tag").isin(L02_TAGS)) & (col("rn") == 1),
                col("principal")
            )
        )
    ).alias("closedptprn"),

    # --- Totals ---
    zero_if_null(count("*")).alias("totalcnt"),
    zero_if_null(sum("principal")).alias("totalprn"),

    # --- Charges ---
    *[
        zero_if_null(sum(col(c))).alias(c.lower())
        for c in REQUIRED_AMOUNT_COLS
    ]
]

ROUND_2DP_COLS = [
    # --- Principal / Loan Amounts ---
    "l01_newpawnprn",
    "l01_frrenewalprn",
    "l02_renewedprn",
    "l02_renewedonlineprn",
    "l02_redeemedprn",
    "l02_foreclosedprn",
    "newfrrnwptprn",
    "closedptprn",
    "totalprn",
    "grossprn",
    "aveprnperpt",

    # --- C01 Charges ---
    "c01_400001",
    "c01_400002",
    "c01_400003",
    "c01_400004",
    "c01_500005",
    "c01_205004",

    # --- C02 Charges ---
    "c02_400001",
    "c02_400002",
    "c02_400003",
    "c02_400004",
    "c02_500005",
    "c02_205004",

    # --- D01 Discounts ---
    "d01_400007",
    "d01_400008",
    "d01_400011",
    "d01_653002",
    "d01_653003",
    "d01_653004",

    # --- D02 Discounts ---
    "d02_400007",
    "d02_400008",
    "d02_400011",
    "d02_653002",
    "d02_653003",
    "d02_653004",

    # --- Totals ---
    "total_01charges",
    "total_02charges",
    "total_charges",
    "total_01discounts",
    "total_02discounts",
    "total_discounts",

    # --- Gross Income ---
    "grossincome01",
    "grossincome02",
    "grossincometotal",
]

#Read From Silver Loans, Loancharges, and Loandiscounts

In [0]:
query = f"""
        SELECT
            l.id AS loanid,
            l.transactiondate,
            l.branchcode,
            l.tag,
            l.principal,
            l.pawnticketno,
            c.chargegrp,
            c.amount AS chargeamount,
            d.discountgrp,
            d.amount AS discountamount
        FROM workspace.silver.silver_loans l
        LEFT JOIN workspace.silver.silver_loancharges c
            ON l.id = c.loanid
        LEFT JOIN workspace.silver.silver_loandiscounts d
            ON l.id = d.loanid
        WHERE l.transactiondate BETWEEN DATE('{COVERED_DATE["start"]}') AND DATE('{COVERED_DATE["end"]}')
        ORDER BY
            l.transactiondate,
            l.branchcode,
            l.id
"""

branchdailyagg_df = spark.sql(query)

#Data Transformations

##Clean the Nulls in Charge group and Discount group by assigning them Unknown and 0

In [0]:
clean_df = (
    branchdailyagg_df
        .withColumn(
            "chargegrp",
            coalesce(col("chargegrp"), lit("UNKNOWN"))
        )
        .withColumn(
            "discountgrp",
            coalesce(col("discountgrp"), lit("UNKNOWN"))
        )
        .withColumn(
            "chargeamount",
            coalesce(col("chargeamount"), lit(0))
        )
        .withColumn(
            "discountamount",
            coalesce(col("discountamount"), lit(0))
        )
)

##Pivot Charges and Discounts

In [0]:
charges_df = (
    clean_df
    .groupBy(*group_keys)
    .pivot("chargegrp")
    .agg(sum("chargeamount"))
    .fillna(0)
)

discount_df = (
    clean_df
    .groupBy(*group_keys)
    .pivot("discountgrp")
    .agg(sum("discountamount"))
    .fillna(0)
)

branchcombined_df = charges_df.join(discount_df, on=group_keys, how="outer")


##Ensure The Required Columns Exist

In [0]:
for c in REQUIRED_AMOUNT_COLS:
    if c not in branchcombined_df.columns:
        branchcombined_df = branchcombined_df.withColumn(c, lit(0))

##Order The Transactions by Date for each Pawn Ticket and assign sequence number

In [0]:
w = Window.partitionBy("pawnticketno").orderBy("transactiondate")

branchcombined_df = branchcombined_df.withColumn("rn", row_number().over(w))

##Aggregate Branch Daily

In [0]:
result_df = (
    branchcombined_df
        .withColumn("transactiondate", to_date("transactiondate"))
        .groupBy("transactiondate", "branchcode")
        .agg(*agg_exprs)
)

##Add Financial Metrics

In [0]:
result_df = (
        result_df.withColumn(
            "aveprnperpt",
            when(col("totalcnt") > 0, col("totalprn") / col("totalcnt")).otherwise(lit(0))
        )
        .withColumn("grosscnt", col("newfrrnwptcnt") - col("closedptcnt"))
        .withColumn("grossprn", col("newfrrnwptprn") - col("closedptprn"))

        .withColumn(
            "total_01charges",
            col("c01_400001") + col("c01_400002") + col("c01_400003")
            + col("c01_400004") + col("c01_500005") + col("c01_205004")
        )
        .withColumn(
            "total_02charges",
            col("c02_400001") + col("c02_400002") + col("c02_400003")
            + col("c02_400004") + col("c02_500005") + col("c02_205004")
        )
        .withColumn("total_charges", col("total_01charges") + col("total_02charges"))

        .withColumn(
            "total_01discounts",
            col("d01_400007") + col("d01_400008") + col("d01_400011")
            + col("d01_653002") + col("d01_653003") + col("d01_653004")
        )
        .withColumn(
            "total_02discounts",
            col("d02_400007") + col("d02_400008") + col("d02_400011")
            + col("d02_653002") + col("d02_653003") + col("d02_653004")
        )
        .withColumn("total_discounts", col("total_01discounts") + col("total_02discounts"))

        .withColumn("grossincome01", col("total_01charges") - col("total_01discounts"))
        .withColumn("grossincome02", col("total_02charges") - col("total_02discounts"))
        .withColumn("grossincometotal", col("grossincome01") + col("grossincome02"))
)

##Round to 2 decimals

In [0]:
def round_numeric_columns(df, columns, scale=2):
    for c in columns:
        if c in df.columns:
            df = df.withColumn(c, spark_round(col(c), scale))
    return df

result_df = round_numeric_columns(result_df, ROUND_2DP_COLS)

#Write to Gold Branch Daily Aggregates

In [0]:
result_df.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("gold.branchdailyaggregates")

In [0]:
%sql 
SELECT *
FROM workspace.gold.branchdailyaggregates
ORDER BY branchcode ASC, transactiondate ASC;