# Dial Command Generator

Fill the variables in the next cell to generate commands for creating a dial spec and applying it.


In [39]:
from pathlib import Path

# ==== EDIT THESE ====
# === User-configurable paths and version below ===

# Input JSON source for dials (original data)
input_json = Path(r"C:\Git\LMSimData\data\STACR\stacr_v1.8.1.json")
# Path where the generated spec will be stored
spec_path = Path("dial\\stacr_v1.8.1_all_dials_spec.json")
# Output JSON file to write updated dials to (set to None to overwrite input)
output_json = Path(r"C:\Git\LMSimData\data\STACR\stacr_v1.8.1.json")
# Version string for the updated output; set to None or "" to auto-increment from input
version = "v1.8.1"


generate_only_dials = True
generate_group_by_model = True
generate_verbose_targets = False
# ==== END EDIT ====

flags = []
if generate_only_dials:
    flags.append("--generate-only-dials")
if generate_group_by_model:
    flags.append("--generate-group-by-model")
if generate_verbose_targets:
    flags.append("--generate-verbose-targets")

generate_cmd = (
    f'python dial/update_dials.py --generate-spec "{spec_path}" '
    + " ".join(flags)
    + f' --input "{input_json}"'
)

apply_cmd = f'python dial/update_dials.py --spec "{spec_path}"'
if output_json:
    apply_cmd += f' --output "{output_json}"'
if version:
    apply_cmd += f' --version "{version}"'

print("Generate spec:\n" + generate_cmd)
print("\nApply spec:\n" + apply_cmd)


Generate spec:
python dial/update_dials.py --generate-spec "dial\stacr_v1.8.1_all_dials_spec.json" --generate-only-dials --generate-group-by-model --input "C:\Git\LMSimData\data\STACR\stacr_v1.8.1.json"

Apply spec:
python dial/update_dials.py --spec "dial\stacr_v1.8.1_all_dials_spec.json" --output "C:\Git\LMSimData\data\STACR\stacr_v1.8.1.json" --version "v1.8.1"


In [34]:
%load_ext autoreload
%autoreload 2

import pandas as pd 
import numpy as np 
import matplotlib.pyplot as plt 
import seaborn as sns 
import plotly.express as px 
import plotly.graph_objects as go 
import plotly.io as pio 

from openpyxl import load_workbook
from openpyxl.styles import Font, Alignment
from openpyxl.utils import get_column_letter
from openpyxl.formatting.rule import ColorScaleRule


The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [35]:
from dial_utils import (
    dial_schedule as dial,
    find_latest_tracking_file,
    extract_summary_rows,
    normalize_error_window,
    select_error_columns,
)


# 1.017730482, 1.44474884


# stacr 0.958507478 1.55186727



In [33]:
res = dial(0.4)
print(res)

0.4x for 48 0.4x for 1 0.426x for 1 0.452x for 1 0.478x for 1 0.504x for 1 0.53x for 1 0.557x for 1 0.583x for 1 0.609x for 1 0.635x for 1 0.661x for 1 0.687x for 1 0.713x for 1 0.739x for 1 0.765x for 1 0.791x for 1 0.817x for 1 0.843x for 1 0.87x for 1 0.896x for 1 0.922x for 1 0.948x for 1 0.974x for 1 1.0x for 1 1x


In [36]:
from pathlib import Path

# === CONFIGURATION ===
DEALTYPE = "CAS"  # e.g., JUMBO, STACR, CAS, HE, NONQM

# Bucket types: 'WAC', 'AGE', 'FICO', or any other bucket type in your data
BUCKET_TYPE = "WAC"

# Error window selector: '3M', '6M', '12M' (set to None to show all error columns)
ERROR_WINDOW = "6M"

# Sheets with NO bucket sections: collect every Status row instead
STATUS_SHEETS = {"M30", "M60", "M90P", "M270P", "FCLS", "REO"}

# Sheets to exclude entirely
EXCLUDE_SHEETS = {"CDR", "CPR"}

# Base directories to load (Dialed + Undialed)
BASE_DIRS = {
    "Dialed": Path(r"R:\QR\Resi_shared\tracking\Dialed"),
    "Undialed": Path(r"R:\QR\Resi_shared\tracking\Undialed"),
}

# Optional manual overrides by report label (set to None to use latest)
MANUAL_EXCEL_PATHS = {
    "Dialed": None,
    "Undialed": None,
}
# =====================

summary_rows = []

for report_label, base_dir in BASE_DIRS.items():
    if MANUAL_EXCEL_PATHS.get(report_label):
        excel_path = Path(MANUAL_EXCEL_PATHS[report_label])
    else:
        excel_path = find_latest_tracking_file(DEALTYPE, base_dir)

    if not excel_path.exists():
        raise FileNotFoundError(f"Excel file not found: {excel_path}")

    print(f"[{report_label}] Using file: {excel_path}")

    rows = extract_summary_rows(
        excel_path=excel_path,
        bucket_type=BUCKET_TYPE,
        status_sheets=STATUS_SHEETS,
        exclude_sheets=EXCLUDE_SHEETS,
        verbose=True,
    )

    for row in rows:
        row["Report"] = report_label

    summary_rows.extend(rows)


[Dialed] Using file: R:\QR\Resi_shared\tracking\Dialed\tracking_CAS_V1_8_1_CRT_20260209.xlsx
All sheet names: ['CPR', 'CDR', 'CtP', 'CtM30', 'M30tC', 'M30', 'M60', 'M90P', 'M270P', 'FCLS', 'REO']
'Status' row found in sheet 'CtP' (row 2).
  Found WAC 'ALL AVG' at row 19 in sheet 'CtP'
'Status' row found in sheet 'CtM30' (row 2).
  Found WAC 'ALL AVG' at row 19 in sheet 'CtM30'
'Status' row found in sheet 'M30tC' (row 2).
'Status' row found in sheet 'M30' (row 2).
  Collecting all Status rows in sheet 'M30'
'Status' row found in sheet 'M60' (row 2).
  Collecting all Status rows in sheet 'M60'
'Status' row found in sheet 'M90P' (row 2).
  Collecting all Status rows in sheet 'M90P'
'Status' row found in sheet 'M270P' (row 2).
  Collecting all Status rows in sheet 'M270P'
'Status' row found in sheet 'FCLS' (row 2).
  Collecting all Status rows in sheet 'FCLS'
'Status' row found in sheet 'REO' (row 2).
  Collecting all Status rows in sheet 'REO'
[Undialed] Using file: R:\QR\Resi_shared\trac

In [76]:
# Create DataFrame from collected summary rows
# (Bucket "ALL AVG" rows + Status rows from status-only sheets)
df_bucket_summary = pd.DataFrame(summary_rows)

if len(df_bucket_summary) > 0:
    # Reorder columns for better readability (similar to the screenshot)
    # Adjust column names based on what's actually in your Excel
    display_cols = ['Report', 'Sheet', 'Bucket_Type', 'Status', 'Transition', 'Bucket', 'Avg Bal', 'Loan Num', 'MV(MM)', 'WALA', 'WAC', 'FICO', 'OCLTV']

    # Add any additional columns that exist
    available_cols = [col for col in display_cols if col in df_bucket_summary.columns]
    other_cols = [col for col in df_bucket_summary.columns if col not in display_cols]

    # Select error columns by the requested window (3M/6M/12M)
    error_window = normalize_error_window(ERROR_WINDOW)
    error_cols = select_error_columns(df_bucket_summary.columns, error_window)

    # Compute Dial / Actual / Model if error columns exist
    computed_cols = []
    if error_window:
        abs_col = f"{error_window} Error Abs"
        ratio_col = f"{error_window} Error Ratio"
        if abs_col in df_bucket_summary.columns and ratio_col in df_bucket_summary.columns:
            ratio_vals = df_bucket_summary[ratio_col]
            denom = (ratio_vals - 1).where((ratio_vals - 1) != 0, np.nan)
            df_bucket_summary["Dial"] = 1 / ratio_vals
            df_bucket_summary["Actual"] = df_bucket_summary[abs_col] / denom
            df_bucket_summary["Model"] = df_bucket_summary["Actual"] * ratio_vals
            computed_cols = ["Dial", "Actual", "Model"]
        else:
            print(f"Warning: Missing '{abs_col}' or '{ratio_col}' for Dial/Actual/Model.")

    if error_window and error_cols:
        final_cols = available_cols + error_cols + computed_cols
    else:
        if error_window:
            available_windows = sorted({
                str(c).upper().replace(" ", "").split("ERROR")[0]
                for c in df_bucket_summary.columns
                if "ERROR" in str(c).upper()
            })
            if available_windows:
                print(f"Warning: No '{error_window}' error columns found. Available: {available_windows}")
            else:
                print("Warning: No error columns found in this sheet. Showing all columns instead.")
        final_cols = available_cols + other_cols + computed_cols

    # De-duplicate while preserving order
    seen = set()
    final_cols = [c for c in final_cols if not (c in seen or seen.add(c))]

    df_bucket_summary = df_bucket_summary[final_cols]

    print("\n=== Summary ===")
    if error_window and error_cols:
        print(f"(Showing error window: {error_window})")

    # Optional: also show derived/current model dial (Model_Dialed / Model_Undialed)
    if "Report" in df_bucket_summary.columns and "Model" in df_bucket_summary.columns:
        key_cols = [c for c in ["Sheet", "Bucket_Type", "Status", "Transition", "Bucket"] if c in df_bucket_summary.columns]

        # Normalize report labels and make NaNs mergeable in keys
        df_norm = df_bucket_summary.copy()
        df_norm["Report_Norm"] = df_norm["Report"].astype(str).str.strip().str.lower()

        # Use a sentinel for NaNs in key columns to avoid merge drop
        sentinel = "__NA__"
        for col in key_cols:
            df_norm[col] = df_norm[col].fillna(sentinel)

        ref_cols = ["Avg Bal", "Loan Num", "MV(MM)", "WALA", "WAC", "FICO", "OCLTV"]
        ref_cols = [c for c in ref_cols if c in df_norm.columns]

        dialed_cols = ["Model"] + (["Actual"] if "Actual" in df_norm.columns else []) + ref_cols
        dialed_df = df_norm[df_norm["Report_Norm"] == "dialed"][key_cols + dialed_cols].rename(
            columns={"Model": "Model_Dialed", "Actual": "Actual_Dialed"}
        )
        undialed_df = df_norm[df_norm["Report_Norm"] == "undialed"][key_cols + ["Model"]].rename(
            columns={"Model": "Model_Undialed"}
        )

        if len(dialed_df) > 0 and len(undialed_df) > 0:
            df_dial_ratio = dialed_df.merge(undialed_df, on=key_cols, how="inner")
            df_dial_ratio["Derived_Dial"] = df_dial_ratio["Model_Dialed"] / df_dial_ratio["Model_Undialed"]

            if "Actual_Dialed" in df_dial_ratio.columns:
                df_dial_ratio["Proposed_Dial"] = df_dial_ratio["Actual_Dialed"] / df_dial_ratio["Model_Undialed"]
                df_dial_ratio["Dial Diff (New - Current)"] = df_dial_ratio["Proposed_Dial"] - df_dial_ratio["Derived_Dial"]
            else:
                df_dial_ratio["Proposed_Dial"] = np.nan
                print("Warning: 'Actual' not found; Proposed_Dial set to NaN.")

            # Restore NaNs in key columns for display
            for col in key_cols:
                df_dial_ratio[col] = df_dial_ratio[col].replace(sentinel, np.nan)

            # Reorder columns for final output
            output_cols = key_cols + ref_cols + [
                "Model_Dialed",
                "Model_Undialed",
                "Derived_Dial",
                "Actual_Dialed",
                "Proposed_Dial",
                "Dial Diff (New - Current)",
            ]
            output_cols = [c for c in output_cols if c in df_dial_ratio.columns]
            df_dial_ratio = df_dial_ratio[output_cols]

            print("\n=== Derived Dial (Model_Dialed / Model_Undialed) ===")

            # Apply formatting for display
            format_map = {
                "Avg Bal": "{:,.0f}",
                "Loan Num": "{:,.0f}",
                "MV(MM)": "{:,.2f}",
                "WALA": "{:.2f}",
                "WAC": "{:.2f}",
                "FICO": "{:.0f}",
                "OCLTV": "{:.2f}",
                "Model_Dialed": "{:.6f}",
                "Model_Undialed": "{:.6f}",
                "Derived_Dial": "{:.4f}",
                "Actual_Dialed": "{:.6f}",
                "Proposed_Dial": "{:.4f}",
                "Dial Diff (New - Current)": "{:.4f}",
            }
            display(df_dial_ratio.style.format(format_map))

            # Export to Excel with formatting and conditional formatting
            from pathlib import Path

            output_dir = Path("dial/outputs")
            output_dir.mkdir(parents=True, exist_ok=True)
            window_label = error_window or "ALL"
            output_path = output_dir / f"dial_ratio_{DEALTYPE}_{window_label}.xlsx"

            df_dial_ratio.to_excel(output_path, index=False, sheet_name="Dial_Ratio")

            wb = load_workbook(output_path)
            ws = wb.active

            # Header styling
            header_font = Font(bold=True)
            for cell in ws[1]:
                cell.font = header_font
                cell.alignment = Alignment(horizontal="center")

            # Freeze top row and add filter
            ws.freeze_panes = "A2"
            ws.auto_filter.ref = f"A1:{get_column_letter(ws.max_column)}{ws.max_row}"

            # Column widths
            for col_idx, col_name in enumerate(df_dial_ratio.columns, start=1):
                series = df_dial_ratio[col_name].astype(str).head(50)
                max_len = max(len(str(col_name)), *(len(v) for v in series))
                ws.column_dimensions[get_column_letter(col_idx)].width = min(22, max(10, max_len + 2))

            # Number formats
            num_fmt = {
                "Avg Bal": "#,##0",
                "Loan Num": "#,##0",
                "MV(MM)": "#,##0.00",
                "WALA": "0.00",
                "WAC": "0.00",
                "FICO": "0",
                "OCLTV": "0.00",
                "Model_Dialed": "0.000000",
                "Model_Undialed": "0.000000",
                "Derived_Dial": "0.0000",
                "Actual_Dialed": "0.000000",
                "Proposed_Dial": "0.0000",
                "Dial Diff (New - Current)": "0.0000",
            }
            for col_idx, col_name in enumerate(df_dial_ratio.columns, start=1):
                if col_name in num_fmt:
                    fmt = num_fmt[col_name]
                    for row in ws.iter_rows(min_row=2, max_row=ws.max_row, min_col=col_idx, max_col=col_idx):
                        for cell in row:
                            cell.number_format = fmt

            # Conditional formatting for Dial Diff (New - Current) with 0 midpoint
            diff_col = "Dial Diff (New - Current)"
            if diff_col in df_dial_ratio.columns:
                diff_idx = df_dial_ratio.columns.get_loc(diff_col) + 1
                col_letter = get_column_letter(diff_idx)
                data_range = f"{col_letter}2:{col_letter}{ws.max_row}"
                color_rule = ColorScaleRule(
                    start_type="min",
                    start_color="F8696B",
                    mid_type="num",
                    mid_value=0,
                    mid_color="FFEB84",
                    end_type="max",
                    end_color="63BE7B",
                )
                ws.conditional_formatting.add(data_range, color_rule)

            wb.save(output_path)
            print(f"Saved formatted Excel: {output_path}")
        else:
            available_reports = sorted(df_norm["Report_Norm"].dropna().unique().tolist())
            print(f"Warning: Missing Dialed/Undialed reports for derived dial. Available: {available_reports}")
else:
    print("\nNo rows found.")


=== Summary ===
(Showing error window: 6M)

=== Derived Dial (Model_Dialed / Model_Undialed) ===


Unnamed: 0,Sheet,Bucket_Type,Status,Transition,Bucket,Avg Bal,Loan Num,MV(MM),WALA,WAC,FICO,OCLTV,Model_Dialed,Model_Undialed,Derived_Dial,Actual_Dialed,Proposed_Dial,Dial Diff (New - Current)
0,CtP,WAC,All Current,,ALL AVG,303331,250000,75832.63,39.94,4.67,758,82.14,0.496493,0.575894,0.8621,0.702498,1.2198,0.3577
1,CtM30,WAC,All Current,,ALL AVG,303331,250000,75832.63,39.94,4.67,758,82.14,0.551746,0.432484,1.2758,0.530445,1.2265,-0.0493
2,M30,STATUS,M30,M30tC,,251206,62331,15657.93,58.68,4.34,717,83.96,36.101347,45.818707,0.7879,35.281392,0.77,-0.0179
3,M30,STATUS,M30,M30tM60,,251206,62331,15657.93,58.68,4.34,717,83.96,18.902806,13.007621,1.4532,18.516018,1.4235,-0.0297
4,M30,STATUS,M30,M30tP,,251206,62331,15657.93,58.68,4.34,717,83.96,0.811998,1.377596,0.5894,0.910981,0.6613,0.0719
5,M60,STATUS,M60,M60tC,,259329,15823,4103.36,57.93,4.39,714,84.97,15.963073,16.292502,0.9798,14.461158,0.8876,-0.0922
6,M60,STATUS,M60,M60tM30,,259329,15823,4103.36,57.93,4.39,714,84.97,13.350601,13.34902,1.0001,13.35032,1.0001,-0.0
7,M60,STATUS,M60,M60tM90,,259329,15823,4103.36,57.93,4.39,714,84.97,37.493401,37.498514,0.9999,38.029024,1.0141,0.0143
8,M60,STATUS,M60,M60tP,,259329,15823,4103.36,57.93,4.39,714,84.97,1.103088,0.993599,1.1102,1.159414,1.1669,0.0567
9,M60,STATUS,M60,M60tD,,259329,15823,4103.36,57.93,4.39,714,84.97,0.018511,0.029834,0.6205,0.011711,0.3925,-0.2279


Saved formatted Excel: dial\outputs\dial_ratio_CAS_6M.xlsx
