In [24]:
import pandas as pd
import sqlite3
import gc

In [18]:
# ==========
# DB CONNECT
# ==========
db_path = "E:/cbs.db"

# ==========
# CONFIG
# ==========
output_file = "overall_levy_report_to_may_2025.xlsx"

# Define months you want
months = ["JAN", "FEB", "MAR", "APR", "MAY"]

# Product & TRN codes for the one relevant area
products = ['VCWR', 'SECW', 'CWAT', 'CHWL', 'CAAU', 'CQWL', 'CWOF', 'VCWN', 'CWRM', 'COWL', 'DAWM']
trn_codes = ['V01', 'S01', 'A01', '095', '728', '002', 'VON', 'M08', '116', 'D26']

In [19]:
LEVIES_EXCEL_PATH = "levy_bracket.xlsx"
LEVIES_SHEET_NAME = 1

# levy table columns
LEVIES_MIN_COL = "min_amount"
LEVIES_MAX_COL = "max_amount"
LEVIES_LEVY_COL = "levy"

In [20]:
# load levies
lev = pd.read_excel(LEVIES_EXCEL_PATH, sheet_name=LEVIES_SHEET_NAME)
print("Levies preview:")
display(lev.tail())

Levies preview:


Unnamed: 0,min_amount,max_amount,levy
17,700000,799999.0,1700
18,800000,899999.0,1750
19,900000,1000000.0,1776
20,1000001,3000000.0,1875
21,3000001,1e+48,2000


In [22]:
def process_month(conn, lev, month, products, trn_codes, writer):
    ACC_COL = "AC_NO"
    CLASS_COL = "ACCOUNT_CLASS"
    PROD_DESC_COL = "PRODUCT_DESC"

    # Step 1: Query for that month
    query = f"""
    SELECT 
        x.AC_BRANCH,
        x.AC_NO,
        acc.ACCOUNT_CLASS,
        x.DRCR_IND,
        x.EXCH_RATE,
        x.FINANCIAL_CYCLE,
        x.LCY_AMOUNT,
        x.PERIOD_CODE,
        x.PRODUCT,
        p.PRODUCT_DESCRIPTION AS PRODUCT_DESC,
        x.TRN_CODE,
        s.TRN_DESC AS TRN_DSC,
        x.TRN_DT,
        x.TRN_REF_NO,
        x.VALUE_DT
    FROM  
        ACVWS_ALL_AC_ENTRIES_ACRJRNAL_2025 x
    LEFT JOIN 
        STTM_TRN_CODE s ON x.TRN_CODE = s.TRN_CODE
    LEFT JOIN 
        CSTM_PRODUCT p ON x.PRODUCT = p.PRODUCT_CODE
    LEFT JOIN 
        STTM_CUST_ACCOUNT acc ON x.AC_NO = acc.CUST_AC_NO
    WHERE x.PRODUCT IN ({','.join(['?']*len(products))})
      AND x.TRN_CODE IN ({','.join(['?']*len(trn_codes))})
      AND x.PERIOD_CODE = ?
      AND x.CUST_GL = 'A'
      AND x.DRCR_IND = 'D'
    """
    params = products + trn_codes + [month]
    tx = pd.read_sql_query(query, conn, params=params)

    print(f"{month}: Loaded {len(tx)} transactions")

    # Step 2: keep only unique TRN_REF_NO
    tx = tx[tx.groupby("TRN_REF_NO")["TRN_REF_NO"].transform("count") == 1]
    print(f"{month}: {len(tx)} unique transactions remain")

    # Step 3: assign levy
    tx = assign_levy_by_ranges(tx, lev, amount_col="LCY_AMOUNT")

    # Step 4: aggregate & pivot
    summary = (
        tx.groupby(ACC_COL, dropna=False)
        .agg(
            account_class=(CLASS_COL, "first"),
            total_levy=('levy_assigned', 'sum'),
            total_amount=('LCY_AMOUNT', 'sum'),
            tx_count=('LCY_AMOUNT', 'count')
        )
        .reset_index()
    )

    product_pivot = (
        tx.groupby([ACC_COL, PROD_DESC_COL])
        .agg(product_levy=('levy_assigned','sum'))
        .reset_index()
        .pivot_table(
            index=ACC_COL,
            columns=PROD_DESC_COL,
            values="product_levy",
            fill_value=0
        )
        .reset_index()
    )

    summary_full = summary.merge(product_pivot, on=ACC_COL, how="left")

    # Totals row
    totals_dict = {
        ACC_COL: "TOTAL",
        "account_class": "--",
        "total_levy": summary_full["total_levy"].sum(),
        "total_amount": summary_full["total_amount"].sum(),
        "tx_count": summary_full["tx_count"].sum()
    }
    for col in product_pivot.columns:
        if col != ACC_COL:
            totals_dict[col] = summary_full[col].sum()

    totals = pd.DataFrame([totals_dict])
    summary_with_products = pd.concat([summary_full, totals], ignore_index=True)

    # Save monthly sheet
    summary_with_products.to_excel(writer, sheet_name=month, index=False)

    # Keep product-level totals for summary sheet
    monthly_product_totals = (
        tx.groupby(PROD_DESC_COL)['levy_assigned']
        .sum()
        .rename(month)
        .reset_index()
    )

    # Free memory
    del tx, summary, product_pivot, summary_full, summary_with_products
    gc.collect()

    return monthly_product_totals

In [25]:
conn = sqlite3.connect(db_path)

all_months_data = []

with pd.ExcelWriter(output_file, engine="openpyxl") as writer:
    for m in months:
        product_totals = process_month(
            conn=conn,
            lev=lev,
            month=m,
            products=products,
            trn_codes=trn_codes,
            writer=writer
        )
        all_months_data.append(product_totals)

    # Combine into summary pivot
    summary_df = all_months_data[0]
    for df in all_months_data[1:]:
        summary_df = summary_df.merge(df, on="PRODUCT_DESC", how="outer")

    # Fill NaN with 0
    summary_df = summary_df.fillna(0)

    # Grand total row
    totals_row = {"PRODUCT_DESC": "TOTAL"}
    for m in months:
        totals_row[m] = summary_df[m].sum()
    summary_df = pd.concat([summary_df, pd.DataFrame([totals_row])], ignore_index=True)

    # Save summary sheet
    summary_df.to_excel(writer, sheet_name="Summary", index=False)

conn.close()
print(f"Excel report created: {output_file}")

JAN: Loaded 92917 transactions
JAN: 91141 unique transactions remain
FEB: Loaded 90428 transactions
FEB: 89286 unique transactions remain
MAR: Loaded 98735 transactions
MAR: 95891 unique transactions remain
APR: Loaded 83394 transactions
APR: 82330 unique transactions remain
MAY: Loaded 91170 transactions
MAY: 90118 unique transactions remain
Excel report created: overall_levy_report_to_may_2025.xlsx
