## Cooperative Pricing Model 

### Imports and Configs

In [None]:
import pandas as pd
import numpy as np

from sklearn.model_selection import GroupShuffleSplit
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.metrics import mean_absolute_error, r2_score
from sklearn.ensemble import RandomForestRegressor

from xgboost import XGBRegressor
from sklearn.metrics import mean_squared_error



In [None]:
SEED = 42

FLAGS = [
    "gratis_uren_flag", "woonwerkuren_flag", "daluren_flag",
    "urenstaffel_flag", "dagelijks_maximum_flag",
    "km_staffel_flag", "ritstaffel_flag", "dal_pct"
]
# Define costs for different car types (since the data is private, this dictionary is empty please fill in with the actual costs)
COSTS = {}

### Load Data and Preprocessing

In [None]:
# Update the path if needed.


DATA_PATH = "/path/to/the/dataset" # path to the csv file after merging cooperative information with user and car data  (since the data is private, this path is not right)
df_master = pd.read_csv(DATA_PATH)
df_master.shape


In [None]:
import pandas as pd
import matplotlib.pyplot as plt

# df_master ya cargado
df = df_master.copy()

# Solo viajes terminados
df = df[df["status"] == "FINISHED"].copy()

# Limpieza básica
df["voertuigcategorie"] = (
    df["voertuigcategorie"]
    .astype(str)
    .str.strip()
    .str.upper()
)

# Precio medio por km por categoría real
price_by_cat = (
    df.groupby("voertuigcategorie", as_index=False)
      .agg(
          mean_price_km=("prijs_km", "mean"),
          n=("prijs_km", "count")
      )
      .sort_values("mean_price_km", ascending=False)
)

# Plot
plt.figure()
plt.bar(price_by_cat["voertuigcategorie"], price_by_cat["mean_price_km"])
plt.xticks(rotation=90)
plt.ylabel("Mean price per km (€)")
plt.title("Mean price per km by voertuigcategorie")
plt.tight_layout()
plt.show()

price_by_cat


In [None]:
import pandas as pd
import numpy as np

def rebuild_cost_cat(df_master: pd.DataFrame) -> pd.DataFrame:
    """
    Rebuild a 'pure' cost_cat from voertuigcategorie:

    Rules: Depending on the voertuigcategorie, map to cost_cat.

    Returns:
      Cleaned df with new column 'cost_cat'.
    """
    df = df_master.copy()

    # 1) Clean strings (important: you had duplicated B3 etc)
    df["voertuigcategorie"] = (
        df["voertuigcategorie"]
        .astype(str)
        .str.strip()
        .str.upper()
    )

    # 2) Drop excluded categories
    to_drop = {"EXB2", "EXB3"}
    df = df[~df["voertuigcategorie"].isin(to_drop)].copy()

    # 3) Build pure mapping
    mapping = {
        "B1": "B1",
        "B2": "B2",
        "B3": "B3",
        "B4": "Z",
        "C":  "B1",
        "D":  "B1",
        "E":  "Z",
        "Z1": "Z",
        "V1": "B2",
        "V2": "B2",
        "V3": "B3",

    }

    df["cost_cat"] = df["voertuigcategorie"].map(mapping)

    # 4) Safety check: any unmapped categories?
    unmapped = df.loc[df["cost_cat"].isna(), "voertuigcategorie"].unique()
    if len(unmapped) > 0:
        raise ValueError(
            f"Unmapped voertuigcategorie values found: {list(unmapped)}. "
            "Add them to the mapping or decide to drop them."
        )

    # 5) Drop old columns that should no longer exist
    for col in ["cost_cat_x", "cost_cat_y"]:
        if col in df.columns:
            df.drop(columns=[col], inplace=True)

    # 6) Finally, drop voertuigcategorie (as you requested)
    df.drop(columns=["voertuigcategorie"], inplace=True)

    return df


df_clean = rebuild_cost_cat(df_master)

print(df_clean["cost_cat"].value_counts())


In [None]:
#drop for repeated use of columns 
cols_to_drop = [
    "gemeente", "id_naam", "ritbegin",
    "bruto_prijs_uren", "brutoprijs_km", "bruto_prijs_rit",
    "netto_prijs_rit", "netto_per_uur",
    "cost_cat_x", "cost_cat_y", "cooperatie"
]

In [None]:
def preprocess_master(df_master: pd.DataFrame) -> pd.DataFrame:
    """
    Preprocess raw dataset:
    - Keep only rides with status == "FINISHED" (if column exists)
    - Parse 'maand' to datetime
    - Drop rows missing critical columns used for training
    """
    df = df_master.copy()

    # Keep only finished rides
    if "status" in df.columns:
        df = df[df["status"] == "FINISHED"].copy()

    #change of month to data format 
    if "maand" in df.columns:
        df["maand"] = pd.to_datetime(df["maand"], errors="coerce")
    # 3) Drop not used columns 
    df = df.drop(columns=[c for c in cols_to_drop if c in df.columns], errors="ignore")


    # Drop rows missing critical fields
    critical = ["cooperatie_clean", "cost_cat", "id_hh", "kenteken", "uren_gereserveerd", "km", "maand"]
    df = df.dropna(subset=[c for c in critical if c in df.columns])


    return df

df = preprocess_master(df_master)


### Model Preparation 

In [None]:
# 3) Model 1 Table (User-Month)
# Aggregation level: `(cooperatie_clean, cost_cat, id_hh, maand)`
def build_user_month(df: pd.DataFrame) -> pd.DataFrame:
    """
    Build user-month aggregation table for predicting total_hours per user-month.
    """
    user_month = (
        df.groupby(["cooperatie_clean", "cost_cat", "id_hh", "maand"], as_index=False)
          .agg(
              total_hours=("uren_gereserveerd", "sum"),
              total_rides=("kenteken", "count"),
              mean_start_hour=("start_hour", "mean"),
              mean_km=("km", "mean"),
          )
    )

    # coop size (unique users) per coop-month
    user_month["coop_size_month"] = (
        user_month.groupby(["cooperatie_clean", "maand"])["id_hh"].transform("nunique")
    )

    # consistent per user-month
    user_month["rides_per_user"] = user_month["total_rides"]

    # keep positive target
    user_month = user_month[user_month["total_hours"] > 0].copy()

    return user_month

user_month = build_user_month(df)
user_month.head()


#4) Model 2 Table (Car-Month)
# Aggregation level: `(cooperatie_clean, cost_cat, kenteken, maand)`


def build_car_month(df: pd.DataFrame) -> pd.DataFrame:
    """
    Build car-month aggregation table for predicting total_hours per car-month.
    Adds average pricing flags per coop+category.
    """
    # Ensure flags exist
    for c in FLAGS:
        if c not in df.columns:
            df[c] = 0.0

    car_month = (
        df.groupby(["cooperatie_clean", "cost_cat", "kenteken", "maand"], as_index=False)
          .agg(
              total_hours=("uren_gereserveerd", "sum"),
              total_rides=("kenteken", "count"),
              total_km=("km", "sum"),
              mean_start_hour=("start_hour", "mean"),
          )
    )

    # number of cars per coop-month
    car_month["coop_size_month"] = (
        car_month.groupby(["cooperatie_clean", "maand"])["kenteken"].transform("nunique")
    )

    # mean flags per coop+cat
    flags_ref = (
        df[["cooperatie_clean", "cost_cat"] + FLAGS]
          .groupby(["cooperatie_clean", "cost_cat"], as_index=False)
          .mean(numeric_only=True)
    )

    car_month = car_month.merge(flags_ref, on=["cooperatie_clean", "cost_cat"], how="left")

    # keep positive target
    car_month = car_month[car_month["total_hours"] > 0].copy()

    return car_month

car_month = build_car_month(df)
car_month.head()

### Training 

In [None]:
# Train Model 1 (RandomForest)

def train_user_model(user_month: pd.DataFrame, seed: int = SEED):
    """
    Train RandomForest model for user-month hours.
    Uses GroupShuffleSplit by user (id_hh) to avoid leakage.
    """
    feature_cols = [
        "cost_cat", "cooperatie_clean",
        "total_rides", "mean_start_hour", "mean_km",
        "coop_size_month", "rides_per_user"
    ]
    target = "total_hours"

    X = user_month[feature_cols]
    y = user_month[target]

    cat_cols = ["cost_cat", "cooperatie_clean"]
    num_cols = [c for c in feature_cols if c not in cat_cols]

    splitter = GroupShuffleSplit(n_splits=1, test_size=0.2, random_state=seed)
    tr_idx, te_idx = next(splitter.split(X, y, groups=user_month["id_hh"]))

    X_train, X_test = X.iloc[tr_idx], X.iloc[te_idx]
    y_train, y_test = y.iloc[tr_idx], y.iloc[te_idx]

    preprocess = ColumnTransformer(
        transformers=[
            ("cat", OneHotEncoder(handle_unknown="ignore"), cat_cols),
            ("num", Pipeline([("imp", SimpleImputer(strategy="median"))]), num_cols),
        ],
        remainder="drop"
    )

    model = Pipeline(steps=[
        ("prep", preprocess),
        ("rf", RandomForestRegressor(
            n_estimators=500,
            min_samples_leaf=3,
            max_depth=15,
            random_state=seed,
            n_jobs=-1
        ))
    ])

    model.fit(X_train, y_train)
    pred = model.predict(X_test)

    metrics = {
        "mae": float(mean_absolute_error(y_test, pred)),
        "r2": float(r2_score(y_test, pred)), 
        "rmse": float(np.sqrt(np.mean((y_test - pred) ** 2)))

        
    }
    return model, metrics

model_user_hours, user_metrics = train_user_model(user_month)
user_metrics

In [None]:
# Train Model 2 (XGBoost)

def train_car_model(car_month: pd.DataFrame, seed: int = SEED):
    """
    Train XGBoost model for car-month hours.
    Uses GroupShuffleSplit by car (kenteken) to avoid leakage.
    """
    feature_cols = [
        "cost_cat", "cooperatie_clean",
        "total_rides", "total_km", "mean_start_hour", "coop_size_month",
        *FLAGS
    ]
    target = "total_hours"

    X = car_month[feature_cols]
    y = car_month[target]

    cat_cols = ["cost_cat", "cooperatie_clean"]
    num_cols = [c for c in feature_cols if c not in cat_cols]

    splitter = GroupShuffleSplit(n_splits=1, test_size=0.2, random_state=seed)
    tr_idx, te_idx = next(splitter.split(X, y, groups=car_month["kenteken"]))

    X_train, X_test = X.iloc[tr_idx], X.iloc[te_idx]
    y_train, y_test = y.iloc[tr_idx], y.iloc[te_idx]

    preprocess = ColumnTransformer(
        transformers=[
            ("cat", OneHotEncoder(handle_unknown="ignore"), cat_cols),
            ("num", Pipeline([("imp", SimpleImputer(strategy="median"))]), num_cols),
        ],
        remainder="drop"
    )

    model = Pipeline(steps=[
        ("prep", preprocess),
        ("xgb", XGBRegressor(
            n_estimators=300,
            max_depth=4,
            learning_rate=0.02,
            subsample=0.9,
            colsample_bytree=0.9,
            objective="reg:squarederror",
            random_state=seed,
            n_jobs=-1
        ))
    ])

    model.fit(X_train, y_train)
    pred = model.predict(X_test)

    metrics = {
        "mae": float(mean_absolute_error(y_test, pred)),
        "r2": float(r2_score(y_test, pred)), 
        "rmse": float(np.sqrt(np.mean((y_test - pred) ** 2)))
    }
    return model, metrics

model_car_hours, car_metrics = train_car_model(car_month)

car_metrics


### Build New Coorp Cols 

In [None]:
# Used when you need a synthetic feature row for a new cooperative.

def build_typicals(user_month: pd.DataFrame, car_month: pd.DataFrame):
    """
    Build category-level typical values used to create prediction rows for a new cooperative.
    """
    user_typ = (
        user_month.groupby("cost_cat", as_index=False)
        .agg(
            total_rides=("total_rides", "mean"),
            mean_start_hour=("mean_start_hour", "mean"),
            mean_km=("mean_km", "mean"),
            rides_per_user=("rides_per_user", "mean"),
        )
    )

    car_typ = (
        car_month.groupby("cost_cat", as_index=False)
        .agg(
            total_rides=("total_rides", "mean"),
            total_km=("total_km", "mean"),
            mean_start_hour=("mean_start_hour", "mean"),
        )
    )

    flags_typ = (
        car_month.groupby("cost_cat", as_index=False)[FLAGS]
        .mean(numeric_only=True)
    )

    return user_typ, car_typ, flags_typ

user_typ, car_typ, flags_typ = build_typicals(user_month, car_month)
user_typ.head()


### Pricing Function

- **Revenue rule**:  `price_km = price_hour / 10`
**Costs**:
- Fixed costs: `(lease + technique + other) * cars_needed`
- Variable km costs: `total_km * costs[cost_cat]["km"]`
- Minus user subscription fees: `n_users * fixed_user_fee`
- Apply margin: `*(1 + margin)`
Then solve:
- `target_revenue = price_hour*(H + K/10)`



In [None]:
def recommend_pricing_new_coop(
    n_users: int,
    cost_cat: str,
    cooperatie_name: str,
    model_user_hours,
    model_car_hours,
    user_typ: pd.DataFrame,
    car_typ: pd.DataFrame,
    flags_typ: pd.DataFrame,
    costs: dict,
    margin: float = 0.20,
    hours_buffer: float = 0.15,
    fixed_user_fee: float = 15.0,
):
    """
    Recommend:
    - cars_needed based on predicted demand vs predicted capacity
    - prices with cost-plus and constraint price_km = price_hour/10

    Returns a dict with demand, supply, costs and suggested prices.
    """
    if cost_cat not in costs:
        raise ValueError(f"cost_cat '{cost_cat}' not in costs dict. Available: {list(costs.keys())}")

    ut = user_typ[user_typ["cost_cat"] == cost_cat]
    ct = car_typ[car_typ["cost_cat"] == cost_cat]
    ft = flags_typ[flags_typ["cost_cat"] == cost_cat]

    if len(ut) == 0 or len(ct) == 0:
        raise ValueError(f"cost_cat '{cost_cat}' not found in typicals.")

    ut = ut.iloc[0]
    ct = ct.iloc[0]

    if len(ft) == 0:
        flags_vals = {k: 0.0 for k in FLAGS}
        flags_vals["dal_pct"] = 50.0
    else:
        ft = ft.iloc[0]
        flags_vals = {k: float(0.0 if pd.isna(ft[k]) else ft[k]) for k in FLAGS}

    # ---- demand -----
    X_user = pd.DataFrame([{
        "cost_cat": cost_cat,
        "cooperatie_clean": cooperatie_name,
        "total_rides": float(ut["total_rides"]),
        "mean_start_hour": float(ut["mean_start_hour"]),
        "mean_km": float(ut["mean_km"]),
        "coop_size_month": int(n_users),
        "rides_per_user": float(ut["rides_per_user"]),
    }])

    hours_per_user = float(model_user_hours.predict(X_user)[0])
    hours_per_user = max(1e-6, hours_per_user)

    H = float(n_users) * hours_per_user
    K = float(n_users) * float(ut["total_rides"]) * float(ut["mean_km"])

    # ---- supply ----
    cars_guess = max(1, int(np.ceil(n_users / 10)))

    X_car = pd.DataFrame([{
        "cost_cat": cost_cat,
        "cooperatie_clean": cooperatie_name,
        "total_rides": float(ct["total_rides"]),
        "total_km": float(ct["total_km"]),
        "mean_start_hour": float(ct["mean_start_hour"]),
        "coop_size_month": cars_guess,
        **flags_vals
    }])

    hours_per_car = float(model_car_hours.predict(X_car)[0])
    hours_per_car = max(1e-6, hours_per_car)

    cars_needed = int(np.ceil((H * (1 + hours_buffer)) / hours_per_car))
    cars_needed = max(1, cars_needed)

    # ---- costs
    base = costs[cost_cat]
    fixed_cost_per_car = float(base["lease"] + base["technique"] + base["other"])
    km_cost = float(base["km"])

    premium_cost_multiplier = {"Z": 1.15}
    fixed_cost_per_car *= premium_cost_multiplier.get(cost_cat, 1.0)

    fixed_cost_total = cars_needed * fixed_cost_per_car
    variable_cost_total = K * km_cost
    total_cost = fixed_cost_total + variable_cost_total

    fee_revenue = float(n_users) * float(fixed_user_fee)
    net_cost = max(0.0, total_cost - fee_revenue)
    target_revenue = net_cost * (1 + margin)


    # ---- pricing constraint
    denom = H + (K / 10.0)
    denom = max(1e-6, denom)

    price_hour = target_revenue / denom
    price_km = price_hour / 10.0


    return {
        "cooperatie_name": cooperatie_name,
        "n_users": int(n_users),
        "cost_cat": cost_cat,

        "pred_hours_per_user_month": round(hours_per_user, 3),
        "pred_total_hours_month": round(H, 2),
        "pred_total_km_month": round(K, 2),

        "pred_hours_per_car_month": round(hours_per_car, 2),
        "cars_needed": int(cars_needed),

        "fixed_cost_per_car_eur": round(fixed_cost_per_car, 2),
        "fixed_cost_total_eur": round(fixed_cost_total, 2),
        "variable_cost_total_eur": round(variable_cost_total, 2),
        "total_cost_eur": round(total_cost, 2),

        "fixed_user_fee_eur": float(fixed_user_fee),
        "fee_revenue_eur": round(fee_revenue, 2),
        "net_cost_after_fees_eur": round(net_cost, 2),
        "target_revenue_eur": round(target_revenue, 2),

        "recommended_price_hour_eur": round(price_hour, 2),
        "recommended_price_km_eur": round(price_km, 3),

        #"margin_pct": round(margin * 100, 1),
        #"hours_buffer": hours_buffer,
    }

### Prediction

In [None]:
#predict for a new cooperative with 100 users and cost for only one category
result = recommend_pricing_new_coop(
    n_users=100,
    cost_cat="B1",
    cooperatie_name="NEW_COOP",
    model_user_hours=model_user_hours,
    model_car_hours=model_car_hours,
    user_typ=user_typ,
    car_typ=car_typ,
    flags_typ=flags_typ,
    costs=COSTS,
    margin=0.10,
    hours_buffer=0.05,
    fixed_user_fee=15.0,
)

pd.DataFrame([result])

## Mix Category

In [None]:
def recommend_pricing_new_coop_mixed(
    n_users: int,
    mix: dict,  # e.g. {"B2": 0.7, "Z": 0.3}  (must sum ~1.0) OR {"B2": 70, "Z": 30}
    cooperatie_name: str,
    model_user_hours,
    model_car_hours,
    user_typ: pd.DataFrame,
    car_typ: pd.DataFrame,
    flags_typ: pd.DataFrame,
    costs: dict,
    margin: float = 0.20,
    hours_buffer: float = 0.15,
    fixed_user_fee: float = 15.0,
    premium_cost_multiplier: dict = None,   # e.g. {"Z": 1.15}
    normalize_mix: bool = True,             # if mix given as weights that don't sum to 1
):
    """
    Mixed-category version:
      - Input mix indicates user preference distribution across categories.
      - Returns distribution of cars per category + global prices (hour/km) under your constraint km = hour/10.

    mix can be:
      - proportions summing to ~1.0, or
      - user counts summing to n_users.
    """
    if premium_cost_multiplier is None:
        premium_cost_multiplier = {"Z": 1.15}

    # ---------- 0) validate + convert mix to user counts per category ----------
    if not isinstance(mix, dict) or len(mix) == 0:
        raise ValueError("mix must be a non-empty dict, e.g. {'B2':0.7,'Z':0.3}")

    # keep only categories that exist in costs
    for cat in mix.keys():
        if cat not in costs:
            raise ValueError(f"Category '{cat}' not in costs. Available: {list(costs.keys())}")

    values = np.array(list(mix.values()), dtype=float)

    # detect if it's proportions (sum ~1) or counts (sum ~ n_users)
    sum_vals = values.sum()

    if abs(sum_vals - 1.0) < 1e-6 or (normalize_mix and sum_vals <= 2.0):
        # treat as proportions
        props = values / sum_vals if sum_vals > 0 else values
        users_by_cat = {cat: int(round(n_users * p)) for cat, p in zip(mix.keys(), props)}
        # adjust rounding to match exactly n_users
        diff = n_users - sum(users_by_cat.values())
        if diff != 0:
            # add/subtract diff to the largest share category
            max_cat = max(users_by_cat, key=users_by_cat.get)
            users_by_cat[max_cat] += diff
    else:
        # treat as counts
        users_by_cat = {cat: int(v) for cat, v in mix.items()}
        if sum(users_by_cat.values()) != n_users:
            raise ValueError(f"mix counts must sum to n_users ({n_users}). Got {sum(users_by_cat.values())}.")

    # remove cats with 0 users
    users_by_cat = {cat: u for cat, u in users_by_cat.items() if u > 0}
    if len(users_by_cat) == 0:
        raise ValueError("All categories have 0 users after processing mix.")

    # ---------- helper to get typical rows ----------
    def _get_typ_row(df, cost_cat, name):
        sub = df[df["cost_cat"] == cost_cat]
        if len(sub) == 0:
            raise ValueError(f"cost_cat '{cost_cat}' not found in {name}. Check your mapping/data.")
        return sub.iloc[0]

    # ---------- 1) demand per category ----------
    demand = {}
    total_H = 0.0
    total_K = 0.0

    for cat, u_cat in users_by_cat.items():
        ut = _get_typ_row(user_typ, cat, "user_typ")

        X_user = pd.DataFrame([{
            "cost_cat": cat,
            "cooperatie_clean": cooperatie_name,
            "total_rides": float(ut["total_rides"]),
            "mean_start_hour": float(ut["mean_start_hour"]),
            "mean_km": float(ut["mean_km"]),
            "coop_size_month": int(n_users),   # coop scale (global), NOT u_cat
            "rides_per_user": float(ut["rides_per_user"]),
        }])

        hours_per_user = float(model_user_hours.predict(X_user)[0])
        hours_per_user = max(1e-6, hours_per_user)

        H_cat = float(u_cat) * hours_per_user
        K_cat = float(u_cat) * float(ut["total_rides"]) * float(ut["mean_km"])

        demand[cat] = {
            "users": int(u_cat),
            "hours_per_user": hours_per_user,
            "H_cat": H_cat,
            "K_cat": K_cat
        }

        total_H += H_cat
        total_K += K_cat

    # ---------- 2) supply per category -> cars needed per category ----------
    supply = {}
    total_cars = 0

    for cat, info in demand.items():
        ct = _get_typ_row(car_typ, cat, "car_typ")

        ft = flags_typ[flags_typ["cost_cat"] == cat]
        if len(ft) == 0:
            flags_vals = {k: 0.0 for k in FLAGS}
            flags_vals["dal_pct"] = 50.0
        else:
            ft = ft.iloc[0]
            flags_vals = {k: float(0.0 if pd.isna(ft[k]) else ft[k]) for k in FLAGS}

        cars_guess = max(1, int(np.ceil(n_users / 10)))

        X_car = pd.DataFrame([{
            "cost_cat": cat,
            "cooperatie_clean": cooperatie_name,
            "total_rides": float(ct["total_rides"]),
            "total_km": float(ct["total_km"]),
            "mean_start_hour": float(ct["mean_start_hour"]),
            "coop_size_month": cars_guess,
            **flags_vals
        }])

        hours_per_car = float(model_car_hours.predict(X_car)[0])
        hours_per_car = max(1e-6, hours_per_car)

        cars_needed_cat = int(np.ceil((info["H_cat"] * (1 + hours_buffer)) / hours_per_car))
        cars_needed_cat = max(1, cars_needed_cat)

        supply[cat] = {
            "hours_per_car": hours_per_car,
            "cars_needed": cars_needed_cat
        }
        total_cars += cars_needed_cat

    # ---------- 3) costs (sum across categories) ----------
    fixed_cost_total = 0.0
    variable_cost_total = 0.0

    for cat, s in supply.items():
        base = costs[cat]
        fixed_cost_per_car = float(base["lease"] + base["technique"] + base["other"])
        fixed_cost_per_car *= premium_cost_multiplier.get(cat, 1.0)

        km_cost = float(base["km"])

        fixed_cost_total += s["cars_needed"] * fixed_cost_per_car
        variable_cost_total += demand[cat]["K_cat"] * km_cost

    total_cost = fixed_cost_total + variable_cost_total

    # fees
    fee_revenue = float(n_users) * float(fixed_user_fee)
    net_cost = max(0.0, total_cost - fee_revenue)
    target_revenue = net_cost * (1 + margin)

    # ---------- 4) pricing (global, with constraint km = hour/10) ----------
    denom = total_H + (total_K / 10.0)
    denom = max(1e-6, denom)

    price_hour = target_revenue / denom
    price_km = price_hour / 10.0

    
    # ---------- 5) return ----------
    return {
        "cooperatie_name": cooperatie_name,
        "n_users": int(n_users),
        "mix_users_by_cat": users_by_cat,

        "total_H": round(total_H, 2),
        "total_K": round(total_K, 2),

        "cars_by_cat": {cat: int(supply[cat]["cars_needed"]) for cat in supply},
        "total_cars": int(total_cars),

        "fixed_cost_total_eur": round(fixed_cost_total, 2),
        "variable_cost_total_eur": round(variable_cost_total, 2),
        "total_cost_eur": round(total_cost, 2),

        "fee_revenue_eur": round(fee_revenue, 2),
        "net_cost_after_fees_eur": round(net_cost, 2),
        "target_revenue_eur": round(target_revenue, 2),

        "recommended_price_hour_eur": round(price_hour, 2),
        "recommended_price_km_eur": round(price_km, 3),

        "details_demand": demand,
        "details_supply": supply,

        "margin_pct": round(margin * 100, 1),
        "hours_buffer": hours_buffer,
    }


In [None]:
res = recommend_pricing_new_coop_mixed(
    n_users=124,
    mix={"B1": 1},
    cooperatie_name="NEW_COOP",
    model_user_hours=model_user_hours,
    model_car_hours=model_car_hours,
    user_typ=user_typ,
    car_typ=car_typ,
    flags_typ=flags_typ,
    costs=COSTS,
    margin=0.1,
    hours_buffer=0.05,
    fixed_user_fee=15.0,
)
pd.DataFrame([res])[["mix_users_by_cat","cars_by_cat","total_cars","recommended_price_hour_eur","recommended_price_km_eur"]]


## Calculating the price after discount

In [None]:
import numpy as np
import random

# ============================================================
# DISCOUNTS (hardcoded catalog)
# ============================================================
DISCOUNT_CATALOG = {
    "work_hours_50pct": {
        "title": "50% off during work hours",
        "desc": "If the ride happens during work hours, apply a 50% discount to the hourly price."
    },
    "night_free": {
        "title": "Night hours are free (00:00–07:00)",
        "desc": "Usage between 00:00 and 07:00 is free (hourly charge = €0)."
    },
    "over_50h_price_cap_1eur": {
        "title": "Price cap after 50h/month",
        "desc": "If monthly usage exceeds 50 hours, the hourly price becomes €1/h (cap) for the hours above 50."
    },
    "tiered_over_50_33_over_100_66": {
        "title": "Tiered discount on hours (monthly)",
        "desc": "33% discount on hours above 50h/month, and 66% discount on hours above 100h/month."
    },
    "flat_25h_minus_25eur": {
        "title": "€25 discount after 25h/month",
        "desc": "If monthly usage exceeds 25 hours, apply a €25 flat discount on the monthly bill."
    },
    "over_50h_50pct": {
        "title": "50% discount after 50h/month",
        "desc": "If monthly usage exceeds 50 hours, apply a 50% discount on the hourly price for hours above 50."
    },
}

# ============================================================
# 1) Revenue under selected discounts (keeps price_km = price_hour/10)
# ============================================================
def revenue_after_discounts(
    price_hour: float,
    H_total: float,
    K_total: float,
    selected_rules: list[str],
    share_work_hours: float = 0.45,
    share_night_hours: float = 0.08,
) -> float:
    """
    Compute monthly revenue given a base hourly price, total hours (H_total),
    total km (K_total), and a set of discount rules.

    Assumptions:
      - price_km is always tied to price_hour: price_km = price_hour / 10
      - Hours are split into: night / work / other via the shares.
      - Discounts apply at an aggregated (cooperative) monthly level.
    """
    price_hour = float(price_hour)
    price_km = price_hour / 10.0

    # Split hours into segments
    H_night = H_total * share_night_hours
    H_work = H_total * share_work_hours
    H_other = max(0.0, H_total - H_night - H_work)

    # Base hourly revenue (before discounts)
    rev_hour_night = H_night * price_hour
    rev_hour_work = H_work * price_hour
    rev_hour_other = H_other * price_hour

    flat_bill_discount = 0.0

    # --- Rule: night hours free
    if "night_free" in selected_rules:
        rev_hour_night = 0.0

    # --- Rule: 50% off during work hours
    if "work_hours_50pct" in selected_rules:
        rev_hour_work *= 0.5

    # Billable hours exclude night (if free)
    H_billable = H_work + H_other
    H_billable = max(1e-9, H_billable)

    # Helper: current avg price on billable hours after time-of-day discounts
    current_billable_rev = rev_hour_work + rev_hour_other
    current_avg_price_billable = current_billable_rev / H_billable

    # For tier rules, approximate: apply them on billable hours proportionally
    billable_share = H_billable / max(1e-9, H_total)

    # --- Rule: price cap after 50h/month -> hours above 50 billed at €1/h
    if "over_50h_price_cap_1eur" in selected_rules and H_total > 50:
        H_above = H_total - 50
        H_first = H_total - H_above

        H_first_billable = H_first * billable_share
        H_above_billable = H_above * billable_share

        rev_billable_new = H_first_billable * current_avg_price_billable + H_above_billable * 1.0

        # Redistribute back to work/other proportional to their shares
        work_share = H_work / max(1e-9, H_billable)
        rev_hour_work = rev_billable_new * work_share
        rev_hour_other = rev_billable_new * (1.0 - work_share)

        # update avg
        current_billable_rev = rev_hour_work + rev_hour_other
        current_avg_price_billable = current_billable_rev / H_billable

    # --- Rule: tiered discount 33% above 50, 66% above 100
    if "tiered_over_50_33_over_100_66" in selected_rules and H_total > 50:
        H_above_50 = max(0.0, H_total - 50)
        H_above_100 = max(0.0, H_total - 100)
        H_50_to_100 = max(0.0, H_above_50 - H_above_100)

        H_50_to_100_billable = H_50_to_100 * billable_share
        H_above_100_billable = H_above_100 * billable_share

        # revenue lost = discount% * price * hours
        rev_lost = (
            H_50_to_100_billable * current_avg_price_billable * 0.33
            + H_above_100_billable * current_avg_price_billable * 0.66
        )

        work_share = H_work / max(1e-9, H_billable)
        rev_hour_work = max(0.0, rev_hour_work - rev_lost * work_share)
        rev_hour_other = max(0.0, rev_hour_other - rev_lost * (1.0 - work_share))

        # update avg
        current_billable_rev = rev_hour_work + rev_hour_other
        current_avg_price_billable = current_billable_rev / H_billable

    # --- Rule: 50% discount on hours above 50
    if "over_50h_50pct" in selected_rules and H_total > 50:
        H_above = H_total - 50
        H_above_billable = H_above * billable_share

        rev_lost = H_above_billable * current_avg_price_billable * 0.5

        work_share = H_work / max(1e-9, H_billable)
        rev_hour_work = max(0.0, rev_hour_work - rev_lost * work_share)
        rev_hour_other = max(0.0, rev_hour_other - rev_lost * (1.0 - work_share))

    # --- Rule: flat €25 discount if >25h/month
    if "flat_25h_minus_25eur" in selected_rules and H_total > 25:
        flat_bill_discount += 25.0

    # Total revenue = hour revenue + km revenue - flat discount
    rev_hour = max(0.0, rev_hour_night + rev_hour_work + rev_hour_other)
    rev_km = K_total * price_km

    total_rev = max(0.0, rev_hour + rev_km - flat_bill_discount)
    return float(total_rev)

# ============================================================
# 2) Pick exactly 2 random discounts (optionally forbidding some pairs)
# ============================================================
def pick_two_discounts(
    seed: int = 42,
    forbid_pairs: set[tuple[str, str]] | None = None
) -> list[str]:
    """
    Pick 2 discount keys from DISCOUNT_CATALOG.
    forbid_pairs are order-insensitive, e.g. {("a","b"),("c","d")}.
    """
    rng = random.Random(seed)
    keys = list(DISCOUNT_CATALOG.keys())

    forbid = set()
    if forbid_pairs:
        for a, b in forbid_pairs:
            forbid.add(tuple(sorted((a, b))))

    while True:
        a, b = rng.sample(keys, 2)
        pair = tuple(sorted((a, b)))
        if pair not in forbid:
            return [a, b]

# ============================================================
# 3) Evaluate discounts on the BASE price (no re-solving price)
#    -> returns effective price + revenue drop
# ============================================================
def evaluate_discounts_on_base_price(
    base_price_hour: float,
    H_total: float,
    K_total: float,
    selected_rules: list[str],
    share_work_hours: float = 0.45,
    share_night_hours: float = 0.08,
) -> dict:
    """
    Given a base hourly price, compute:
      - discounted_revenue
      - effective_price_hour such that:
            discounted_revenue = effective_price_hour * (H + K/10)
        (because price_km = price_hour/10)
      - effective_price_km = effective_price_hour/10
      - revenue_drop_pct vs the base (no discount) revenue.
    """
    base_price_hour = float(base_price_hour)

    discounted_revenue = revenue_after_discounts(
        price_hour=base_price_hour,
        H_total=H_total,
        K_total=K_total,
        selected_rules=selected_rules,
        share_work_hours=share_work_hours,
        share_night_hours=share_night_hours,
    )

    denom = max(1e-9, H_total + (K_total / 10.0))
    effective_price_hour = discounted_revenue / denom
    effective_price_km = effective_price_hour / 10.0

    base_revenue = base_price_hour * denom
    revenue_drop_pct = 0.0 if base_revenue <= 1e-9 else (1.0 - discounted_revenue / base_revenue) * 100.0

    return {
        "discounted_revenue": float(discounted_revenue),
        "effective_price_hour": float(effective_price_hour),
        "effective_price_km": float(effective_price_km),
        "revenue_drop_pct": float(revenue_drop_pct),
    }

# ============================================================
# 4) Sample 2 discounts BUT enforce floor:
#    effective_price_hour >= base_price_hour * (1 - margin_floor)
# ============================================================
def sample_discount_scenario_with_floor(
    base_price_hour: float,
    H_total: float,
    K_total: float,
    margin_floor: float = 0.10,      # e.g. 0.10 means "not below 90% of base"
    max_tries: int = 200,
    seed: int = 42,
    share_work_hours: float = 0.45,
    share_night_hours: float = 0.08,
    forbid_pairs: set[tuple[str, str]] | None = None,
) -> tuple[list[str], dict]:
    """
    Randomly picks 2 discounts until the effective price is not below the floor.
    Floor is: base_price_hour * (1 - margin_floor).
    """
    base_price_hour = float(base_price_hour)
    floor_hour = base_price_hour * (1.0 - float(margin_floor))

    # Try repeatedly with different seeds
    for i in range(max_tries):
        rules = pick_two_discounts(seed=seed + i, forbid_pairs=forbid_pairs)

        disc = evaluate_discounts_on_base_price(
            base_price_hour=base_price_hour,
            H_total=H_total,
            K_total=K_total,
            selected_rules=rules,
            share_work_hours=share_work_hours,
            share_night_hours=share_night_hours,
        )

        if disc["effective_price_hour"] >= floor_hour:
            return rules, disc

    # fallback: return best (closest to floor)
    best_rules, best_disc, best_gap = None, None, float("inf")
    for i in range(max_tries):
        rules = pick_two_discounts(seed=seed + 1000 + i, forbid_pairs=forbid_pairs)
        disc = evaluate_discounts_on_base_price(
            base_price_hour=base_price_hour,
            H_total=H_total,
            K_total=K_total,
            selected_rules=rules,
            share_work_hours=share_work_hours,
            share_night_hours=share_night_hours,
        )
        gap = abs(disc["effective_price_hour"] - floor_hour)
        if gap < best_gap:
            best_gap = gap
            best_rules, best_disc = rules, disc

    return best_rules, best_disc


def attach_random_discount_scenario(
    res: dict,
    seed: int = 42,
    forbid_pairs: set[tuple[str, str]] | None = None,
    share_work_hours: float = 0.45,
    share_night_hours: float = 0.08,
):
    """
    res must contain:
      - "recommended_price_hour_eur" (base price)
      - "total_H" and "total_K" (or use your keys)
      - "margin_pct" (optional) or you pass floor separately

    This updates res in-place with:
      base_price_hour_eur, base_price_km_eur,
      discount_rules,
      discounted_revenue_eur, effective_price_hour_eur, effective_price_km_eur,
      revenue_drop_pct
    """
    base_price_hour = float(res["recommended_price_hour_eur"])
    H_total = float(res["total_H"])
    K_total = float(res["total_K"])

    # floor based on margin (if present), else default 10%
    if "margin_pct" in res:
        margin_floor = float(res["margin_pct"]) / 100.0
    else:
        margin_floor = 0.15

    rules, disc = sample_discount_scenario_with_floor(
        base_price_hour=base_price_hour,
        H_total=H_total,
        K_total=K_total,
        margin_floor=margin_floor,
        seed=seed,
        forbid_pairs=forbid_pairs,
        share_work_hours=share_work_hours,
        share_night_hours=share_night_hours,
    )

    res.update({
        "base_price_hour_eur": round(base_price_hour, 2),
        "base_price_km_eur": round(base_price_hour / 10.0, 3),

        "discount_rules": rules,
        "discounted_revenue_eur": round(disc["discounted_revenue"], 2),
        "effective_price_hour_eur": round(disc["effective_price_hour"], 2),
        "effective_price_km_eur": round(disc["effective_price_km"], 3),
        "revenue_drop_pct": round(disc["revenue_drop_pct"], 1),
    })

    return res


In [None]:
res = recommend_pricing_new_coop_mixed(
    n_users=45,
    mix={"B1": 0.8, "B2": 0.2},
    cooperatie_name="NEW_COOP",
    model_user_hours=model_user_hours,
    model_car_hours=model_car_hours,
    user_typ=user_typ,
    car_typ=car_typ,
    flags_typ=flags_typ,
    costs=COSTS,
    margin=0.15,
    hours_buffer=0.05,
    fixed_user_fee=15.0,
)
pd.DataFrame([res])[["mix_users_by_cat","cars_by_cat","total_cars","recommended_price_hour_eur","recommended_price_km_eur"]]


In [None]:

res = attach_random_discount_scenario(
    res,
    seed=42,
    forbid_pairs={("over_50h_50pct", "tiered_over_50_33_over_100_66")}  # optional
)

pd.DataFrame([res])[[
    "mix_users_by_cat","cars_by_cat","total_cars",
    "recommended_price_hour_eur","recommended_price_km_eur",
    "discount_rules","effective_price_hour_eur","effective_price_km_eur","revenue_drop_pct"
]]

# recommended price it is the before 
# effective price is the one after discounts 
