In [30]:
from datetime import date
from typing import Optional
import pandas as pd


def get_run_month(base_date: Optional[date] = None) -> pd.Timestamp:
    """
    Return the last calendar day of the run month.

    - If no base_date is provided, use today's date and return the last day of the previous month.
    - If a base_date is provided, return the last day of that month.
    """
    if base_date is None:
        today = pd.Timestamp(date.today())
        return (today.replace(day=1) - pd.DateOffset(days=1)).normalize()

    base_ts = pd.Timestamp(base_date)
    run_month = base_ts + pd.offsets.MonthEnd(0)
    return run_month.normalize()


def resolve_etl_dates(
    run_month: pd.Timestamp,
    source_client=None,
    source_query: Optional[str] = None,
) -> dict:
    """
    Derive all ETL-related date variables.

    Includes:
    - current_date
    - run_month (last day of selected month)
    - month_id (e.g., '202405')
    - calendar_start (13 months window start)
    - calendar_end (run_month)
    - Optional: source_start and source_end from external query
    """
    calendar_start = run_month.replace(day=1)# - pd.DateOffset(months=1)
    calendar_end = run_month  # last day of run_month

    date_vars = {
        "current_date": str(date.today()),
        "run_month": str(run_month.date()),
        "month_id": run_month.strftime("%Y%m"),
        "calendar_start": str(calendar_start.date()),
        "calendar_end": str(calendar_end.date()),
    }

    if source_client and source_query:
        row = next(source_client.query(source_query).result())
        date_vars["source_start"] = str(row["source_start"])
        date_vars["source_end"] = str(row["source_end"])

    return date_vars


# month = pd.Timestamp("2025-03-30")
# runMonth = get_run_month(month)
runMonth = get_run_month()
etl_dates = resolve_etl_dates(runMonth,None,None)

print(runMonth)
# print(etl_dates)
etl_dates

2025-04-30 00:00:00


{'current_date': '2025-05-11',
 'run_month': '2025-04-30',
 'month_id': '202504',
 'calendar_start': '2025-04-01',
 'calendar_end': '2025-04-30'}

In [37]:
from datetime import date
from typing import Optional
import pandas as pd


def get_run_month(base_date: Optional[date] = None) -> pd.Timestamp:
    """
    Determine the run month for the ETL job.

    If no base_date is provided:
        - Returns the last day of the previous month from today.
    If base_date is provided:
        - Returns the last day of the month that contains base_date.

    Args:
        base_date (Optional[date]): Optional override date to determine the run month.

    Returns:
        pd.Timestamp: The last calendar day of the run month.
    """
    if base_date is None:
        today = pd.Timestamp(date.today())
        return (today.replace(day=1) - pd.DateOffset(days=1)).normalize()
    base_ts = pd.Timestamp(base_date)
    return (base_ts + pd.offsets.MonthEnd(0)).normalize()


def resolve_etl_dates(
    run_month: pd.Timestamp,
    source_client=None,
    source_query: Optional[str] = None,
) -> dict:
    """
    Resolve all derived date variables for the ETL process.

    Includes:
        - current_date: Today's date
        - run_month: Last day of the run month
        - month_id: 'YYYYMM' string of the run month
        - calendar_start_1m / calendar_end_1m: Base month (run month)
        - calendar_start_2m...13m: Prior months up to 12 months before run month
        - source_start / source_end: Optional start/end dates from control table via BigQuery

    Args:
        run_month (pd.Timestamp): The resolved run month (last day of a calendar month).
        source_client (Optional[bigquery.Client]): BigQuery client for querying control table.
        source_query (Optional[str]): SQL query to return source_start and source_end.

    Returns:
        dict[str, str]: Dictionary of derived date variables formatted as 'YYYY-MM-DD' or 'YYYYMM'.
    """
    date_vars = {
        "current_date": str(date.today()),
        "run_month": str(run_month.date()),
        "month_id": run_month.strftime("%Y%m"),
        "calendar_start_1m": str(run_month.replace(day=1).date()),
        "calendar_end_1m": str(run_month.date()),
    }

    for i in range(2, 14):  # calendar_start_2m to calendar_start_13m
        prior_month = run_month - pd.DateOffset(months=i - 1)
        start_key = f"calendar_start_{i}m"
        end_key = f"calendar_end_{i}m"
        date_vars[start_key] = str(prior_month.replace(day=1).date())
        date_vars[end_key] = str((prior_month + pd.offsets.MonthEnd(0)).date())

    if source_client and source_query:
        row = next(source_client.query(source_query).result())
        date_vars["source_start"] = str(row["source_start"])
        date_vars["source_end"] = str(row["source_end"])

    return date_vars


resolve_etl_dates(get_run_month(pd.Timestamp("2025-02-28")))

{'current_date': '2025-05-11',
 'run_month': '2025-02-28',
 'month_id': '202502',
 'calendar_start_1m': '2025-02-01',
 'calendar_end_1m': '2025-02-28',
 'calendar_start_2m': '2025-01-01',
 'calendar_end_2m': '2025-01-31',
 'calendar_start_3m': '2024-12-01',
 'calendar_end_3m': '2024-12-31',
 'calendar_start_4m': '2024-11-01',
 'calendar_end_4m': '2024-11-30',
 'calendar_start_5m': '2024-10-01',
 'calendar_end_5m': '2024-10-31',
 'calendar_start_6m': '2024-09-01',
 'calendar_end_6m': '2024-09-30',
 'calendar_start_7m': '2024-08-01',
 'calendar_end_7m': '2024-08-31',
 'calendar_start_8m': '2024-07-01',
 'calendar_end_8m': '2024-07-31',
 'calendar_start_9m': '2024-06-01',
 'calendar_end_9m': '2024-06-30',
 'calendar_start_10m': '2024-05-01',
 'calendar_end_10m': '2024-05-31',
 'calendar_start_11m': '2024-04-01',
 'calendar_end_11m': '2024-04-30',
 'calendar_start_12m': '2024-03-01',
 'calendar_end_12m': '2024-03-31',
 'calendar_start_13m': '2024-02-01',
 'calendar_end_13m': '2024-02-29'}

In [45]:
from pathlib import Path
from src.extract.extract_t2 import run_extraction

if __name__ == "__main__":
    config_path = "src/config/t2_sources.yaml"
    output_dir = Path("output/staging")
    run_extraction(config_path, output_dir)


[local] Loading data/dummy/customer_table.csv
✅ Saved to output/staging/dummy_data/customer_table.parquet
[local] Loading data/dummy/accounts_table.csv
✅ Saved to output/staging/dummy_data/accounts_table.parquet
[local] Loading data/dummy/account_ownership_table.csv
✅ Saved to output/staging/dummy_data/account_ownership_table.parquet
[local] Loading data/dummy/credit_transaction_table.csv
✅ Saved to output/staging/dummy_data/credit_transaction_table.parquet
[local] Loading data/dummy/deposit_transaction_table.csv
✅ Saved to output/staging/dummy_data/deposit_transaction_table.parquet


In [49]:
import pyarrow.parquet as pq
from pathlib import Path
import pandas as pd

customer_table = pq.read_table("output/staging/dummy_data/customer_table.parquet")
accounts_table = pq.read_table("output/staging/dummy_data/accounts_table.parquet")
acct_own_table = pq.read_table("output/staging/dummy_data/account_ownership_table.parquet")
cc_txn_table = pq.read_table("output/staging/dummy_data/credit_transaction_table.parquet")
dep_txn_table = pq.read_table("output/staging/dummy_data/deposit_transaction_table.parquet")

# print(customer_table.to_pandas().head())
# print(accounts_table.to_pandas().head())
# print(acct_own_table.to_pandas().head())
# print(cc_txn_table.to_pandas().head())
# print(dep_txn_table.to_pandas().head())

staging_dir = Path("output/staging")

for parquet_path in staging_dir.rglob("*.parquet"):
    print(f"\n📄 File: {parquet_path.name}")
    try:
        df = pd.read_parquet(parquet_path)
        print(f"🧱 Columns: {list(df.columns)}")
        print(f"🔢 Rows: {len(df)}")
        display(df.head())
    except Exception as e:
        print(f"❌ Error reading {parquet_path.name}: {e}")



📄 File: accounts_table.parquet
🧱 Columns: ['CUST_ID', 'ACNO', 'PRODUCT_CODE', 'PRODUCT_CATEGORY', 'SEGMENT', 'IMAGE_DT']
🔢 Rows: 487


Unnamed: 0,CUST_ID,ACNO,PRODUCT_CODE,PRODUCT_CATEGORY,SEGMENT,IMAGE_DT
0,100000,505463,1004,DEP,P,2025-03-31
1,100000,505464,2003,CC,P,2025-03-31
2,100000,505465,1004,DEP,P,2025-03-31
3,100000,505466,1003,DEP,P,2025-03-31
4,100000,505467,1001,DEP,P,2025-03-31



📄 File: deposit_transaction_table.parquet
🧱 Columns: ['ACNO', 'CUST_ID', 'TRAN_TYPE', 'TRAN_DATE', 'AMOUNT', 'SEGMENT', 'IMAGE_DT']
🔢 Rows: 1250


Unnamed: 0,ACNO,CUST_ID,TRAN_TYPE,TRAN_DATE,AMOUNT,SEGMENT,IMAGE_DT
0,505463,100000,Self-to-Self Transfer,2025-03-12 19:55:00,384.75,P,2025-03-31
1,505463,100000,Customer-Initiated Credit,2025-03-07 02:31:00,644.2,P,2025-03-31
2,505463,100000,Self-to-Self Transfer,2025-03-14 08:07:00,354.37,P,2025-03-31
3,505463,100000,Customer-Initiated Credit,2025-03-13 13:10:00,363.02,P,2025-03-31
4,505463,100000,Customer-Initiated Debit,2025-03-04 02:24:00,912.46,P,2025-03-31



📄 File: credit_transaction_table.parquet
🧱 Columns: ['ACNO', 'CUST_ID', 'TRAN_TYPE', 'TRAN_DATE', 'AMOUNT', 'SEGMENT', 'IMAGE_DT']
🔢 Rows: 1213


Unnamed: 0,ACNO,CUST_ID,TRAN_TYPE,TRAN_DATE,AMOUNT,SEGMENT,IMAGE_DT
0,505464,100000,Customer-Initiated Debit,2025-03-23 02:48:00,962.92,P,2025-03-31
1,505464,100000,Customer-Initiated Credit,2025-03-12 03:38:00,315.52,P,2025-03-31
2,505464,100000,Self-to-Self Transfer,2025-03-10 01:37:00,704.45,P,2025-03-31
3,505464,100000,Self-to-Self Transfer,2025-03-20 05:10:00,92.62,P,2025-03-31
4,505464,100000,Bank Fee,2025-03-25 03:20:00,33.64,P,2025-03-31



📄 File: customer_table.parquet
🧱 Columns: ['CUST_ID', 'SEGMENT', 'IMAGE_DT']
🔢 Rows: 200


Unnamed: 0,CUST_ID,SEGMENT,IMAGE_DT
0,100000,B,2025-03-31
1,100001,P,2025-03-31
2,100002,P,2025-03-31
3,100003,S,2025-03-31
4,100004,P,2025-03-31



📄 File: account_ownership_table.parquet
🧱 Columns: ['ACNO', 'CUST_ID', 'RELATIONSHIP', 'SEGMENT', 'IMAGE_DT']
🔢 Rows: 638


Unnamed: 0,ACNO,CUST_ID,RELATIONSHIP,SEGMENT,IMAGE_DT
0,505463,100000,P,P,2025-03-31
1,505464,100000,P,P,2025-03-31
2,505465,100000,P,P,2025-03-31
3,505466,100000,P,P,2025-03-31
4,505467,100000,P,P,2025-03-31
