In [1]:
from pathlib import Path
import numpy as np
import pandas as pd

In [2]:
DATA_DIR = Path("..") / "data"

In [3]:
WS_THRESHOLD = 0.10        # ±10%
HEALTHY_THRESHOLD = 0.95   # 3-month avg of (Actual_adj/Plan) >= 95%
HEALTHY_WINDOW = 3
P_SALES_DRIVEN = {5:0.80, 3:0.65, 1:0.50}
SEED = 1337
rng = np.random.default_rng(SEED)

### Data Loading

In [4]:
def read_csv_flex(path: Path) -> pd.DataFrame:
    try:
        df = pd.read_csv(path)
        if df.shape[1] == 1:
            df = pd.read_csv(path, sep=";")
    except Exception:
        df = pd.read_csv(path, sep=";")
    return df

org = read_csv_flex(DATA_DIR/"org_hierarchy.csv")
accounts = read_csv_flex(DATA_DIR/"accounts_dim.csv")
fact = read_csv_flex(DATA_DIR/"sales_monthly.csv")

fact["date"] = pd.to_datetime(fact["date"], errors="coerce")
fact = fact.sort_values(["account_id","date"]).reset_index(drop=True)

# start with Actual as Actual_adj (we will modify per rules)
fact["actual_adj"] = fact["actual_revenue"].astype(float)
fact["windfall_flag"] = 0
fact["shortfall_flag"] = 0
fact["sales_driven"] = 0
fact["healthy_prev3"] = 0

accounts = accounts.set_index("account_id")
fact.head(3)


Unnamed: 0,date,year,month,quarter,country_id,area_id,salesperson_id,account_id,tier,plan_revenue,actual_revenue,last_year_revenue,windfall_flag,shortfall_flag,sales_driven,healthy_prev3,actual_adj
0,2024-01-01,2024,1,Q1,PL,PL_N,SP_BIA_001,ACC_T1_SP_BIA_001_0001,1,8941.951131,9210.898894,7440.401123,0,0,0,0,9210.898894
1,2024-02-01,2024,2,Q1,PL,PL_N,SP_BIA_001,ACC_T1_SP_BIA_001_0001,1,8941.951131,8761.233837,7440.401123,0,0,0,0,8761.233837
2,2024-03-01,2024,3,Q1,PL,PL_N,SP_BIA_001,ACC_T1_SP_BIA_001_0001,1,10219.372722,11954.465374,8503.315569,0,0,0,0,11954.465374


### Logic

In [5]:
def rolling_health(prev_rows: pd.DataFrame) -> bool:
    """
    prev_rows: rows t-1..t-k with columns actual_adj, plan_revenue
    Returns True if 3-month avg (or available months) of actual_adj/plan >= 0.95
    """
    vals = []
    for _, r in prev_rows.tail(HEALTHY_WINDOW).iterrows():
        pr = r["plan_revenue"]
        if pr > 0:
            vals.append(r["actual_adj"] / pr)
    if len(vals) == 0:
        return False
    return (np.nanmean(vals) >= HEALTHY_THRESHOLD)

In [6]:
def apply_ws_for_account(grp: pd.DataFrame, is_new: bool) -> pd.DataFrame:
    grp = grp.copy().reset_index(drop=True)
    first_nonzero_seen = False

    for t in range(len(grp)):
        if t == 0:
            continue

        prev_actual = grp.loc[t-1, "actual_revenue"]
        # skip W/S if previous actual is zero and not a NEW account
        if prev_actual == 0 and not is_new:
            continue

        # NEW: first month with non-zero actual is treated as sales-driven (no W/S)
        if is_new and not first_nonzero_seen and grp.loc[t,"actual_revenue"] > 0:
            first_nonzero_seen = True
            continue

        # health over previous months (using actual_adj so far)
        healthy = rolling_health(grp.iloc[:t][["actual_adj","plan_revenue"]])
        grp.loc[t,"healthy_prev3"] = int(healthy)

        # compute delta MoM
        prev = prev_actual if prev_actual != 0 else 1.0
        delta = (grp.loc[t,"actual_revenue"] - prev_actual) / prev
        is_wind = delta >= WS_THRESHOLD
        is_short = delta <= -WS_THRESHOLD
        if not (is_wind or is_short):
            continue

        tier = int(grp.loc[t,"tier"])
        sales_driven = (np.random.default_rng(SEED + hash(grp.loc[t, "account_id"]) % 100000).random()
                        < P_SALES_DRIVEN[tier])
        grp.loc[t,"sales_driven"] = int(sales_driven)

        plan_t = grp.loc[t,"plan_revenue"]
        actual_t = grp.loc[t,"actual_revenue"]

        if is_wind:
            grp.loc[t,"windfall_flag"] = 1
            if not sales_driven:
                # cap to Plan but don't "help" if already under Plan
                grp.loc[t,"actual_adj"] = min(actual_t, plan_t) if plan_t > 0 else actual_t
        else:
            grp.loc[t,"shortfall_flag"] = 1
            if not sales_driven and healthy and plan_t > 0:
                # rescue healthy accounts to Plan
                grp.loc[t,"actual_adj"] = max(actual_t, plan_t)

    return grp

### Apply per account

In [7]:
out_parts = []
for acc_id, grp in fact.groupby("account_id", sort=False):
    is_new = bool(accounts.loc[acc_id, "is_new"]) if acc_id in accounts.index else False
    out_parts.append(apply_ws_for_account(grp, is_new))

fact_ws = pd.concat(out_parts, axis=0).sort_values(["account_id","date"]).reset_index(drop=True)
fact_ws.head(5)


Unnamed: 0,date,year,month,quarter,country_id,area_id,salesperson_id,account_id,tier,plan_revenue,actual_revenue,last_year_revenue,windfall_flag,shortfall_flag,sales_driven,healthy_prev3,actual_adj
0,2024-01-01,2024,1,Q1,PL,PL_N,SP_BIA_001,ACC_T1_SP_BIA_001_0001,1,8941.951131,9210.898894,7440.401123,0,0,0,0,9210.898894
1,2024-02-01,2024,2,Q1,PL,PL_N,SP_BIA_001,ACC_T1_SP_BIA_001_0001,1,8941.951131,8761.233837,7440.401123,0,0,0,1,8761.233837
2,2024-03-01,2024,3,Q1,PL,PL_N,SP_BIA_001,ACC_T1_SP_BIA_001_0001,1,10219.372722,11954.465374,8503.315569,1,0,1,1,11954.465374
3,2024-04-01,2024,4,Q2,PL,PL_N,SP_BIA_001,ACC_T1_SP_BIA_001_0001,1,10219.372722,11030.843159,8503.315569,0,0,0,1,11030.843159
4,2024-05-01,2024,5,Q2,PL,PL_N,SP_BIA_001,ACC_T1_SP_BIA_001_0001,1,10219.372722,11915.425923,8503.315569,0,0,0,1,11915.425923


### Rule checks

In [8]:
violations = []

# Windfall external: actual_adj <= plan when plan>0
wf_ext = fact_ws[(fact_ws["windfall_flag"]==1) & (fact_ws["sales_driven"]==0) & (fact_ws["plan_revenue"]>0)]
bad_wf = wf_ext[ wf_ext["actual_adj"] > wf_ext["plan_revenue"] + 1e-6 ]
violations.append(("windfall external capped to plan", len(bad_wf)))

# Shortfall external + healthy: actual_adj >= plan
sf_ext = fact_ws[(fact_ws["shortfall_flag"]==1) & (fact_ws["sales_driven"]==0) &
                 (fact_ws["healthy_prev3"]==1) & (fact_ws["plan_revenue"]>0)]
bad_sf = sf_ext[ sf_ext["actual_adj"] < sf_ext["plan_revenue"] - 1e-6 ]
violations.append(("shortfall external on healthy floored to plan", len(bad_sf)))

violations


[('windfall external capped to plan', 0),
 ('shortfall external on healthy floored to plan', 0)]

### Save output

In [9]:
out_path = DATA_DIR/"sales_monthly_ws.csv"
fact_ws.to_csv(out_path, index=False)
out_path


WindowsPath('../data/sales_monthly_ws.csv')

### Additional sanity checks

In [10]:
# WS
print(fact_ws[['windfall_flag','shortfall_flag']].sum())

#  windfall EXTERNAL 
print(len(fact_ws.query('windfall_flag==1 and sales_driven==0 and plan_revenue>0')))

# EXTERNAL + healthy
print(len(fact_ws.query('shortfall_flag==1 and sales_driven==0 and healthy_prev3==1 and plan_revenue>0')))


windfall_flag     2466
shortfall_flag    1682
dtype: int64
928
536


In [11]:
lost = fact_ws.query('shortfall_flag==1 and sales_driven==0 and healthy_prev3==0')
float((lost.actual_adj - lost.actual_revenue).abs().lt(1e-6).mean())  # 1.0 (100%)


1.0