# From log-units SHAP to natural units

In [None]:
%pip install shap

In [None]:
from functools import partial
import numpy as np
import pandas as pd

from sklearn.datasets import fetch_openml
from sklearn.metrics import (
    mean_absolute_error,
    mean_squared_error,
    mean_tweedie_deviance,
)


def load_mtpl2(n_samples=None):
    """Fetch the French Motor Third-Party Liability Claims dataset.

    Parameters
    ----------
    n_samples: int, default=None
      number of samples to select (for faster run time). Full dataset has
      678013 samples.
    """
    # freMTPL2freq dataset from https://www.openml.org/d/41214
    df_freq = fetch_openml(data_id=41214, as_frame=True).data
    df_freq["IDpol"] = df_freq["IDpol"].astype(int)
    df_freq.set_index("IDpol", inplace=True)

    # freMTPL2sev dataset from https://www.openml.org/d/41215
    df_sev = fetch_openml(data_id=41215, as_frame=True).data

    # sum ClaimAmount over identical IDs
    df_sev = df_sev.groupby("IDpol").sum()

    df = df_freq.join(df_sev, how="left")
    df["ClaimAmount"] = df["ClaimAmount"].fillna(0)

    # unquote string fields
    for column_name in df.columns[[t is object for t in df.dtypes.values]]:
        df[column_name] = df[column_name].str.strip("'")
    return df.iloc[:n_samples]


def plot_obs_pred(
    df,
    feature,
    weight,
    observed,
    predicted,
    y_label=None,
    title=None,
    ax=None,
    fill_legend=False,
):
    """Plot observed and predicted - aggregated per feature level.

    Parameters
    ----------
    df : DataFrame
        input data
    feature: str
        a column name of df for the feature to be plotted
    weight : str
        column name of df with the values of weights or exposure
    observed : str
        a column name of df with the observed target
    predicted : DataFrame
        a dataframe, with the same index as df, with the predicted target
    fill_legend : bool, default=False
        whether to show fill_between legend
    """
    # aggregate observed and predicted variables by feature level
    df_ = df.loc[:, [feature, weight]].copy()
    df_["observed"] = df[observed] * df[weight]
    df_["predicted"] = predicted * df[weight]
    df_ = (
        df_.groupby([feature])[[weight, "observed", "predicted"]]
        .sum()
        .assign(observed=lambda x: x["observed"] / x[weight])
        .assign(predicted=lambda x: x["predicted"] / x[weight])
    )

    ax = df_.loc[:, ["observed", "predicted"]].plot(style=".", ax=ax)
    y_max = df_.loc[:, ["observed", "predicted"]].values.max() * 0.8
    p2 = ax.fill_between(
        df_.index,
        0,
        y_max * df_[weight] / df_[weight].values.max(),
        color="g",
        alpha=0.1,
    )
    if fill_legend:
        ax.legend([p2], ["{} distribution".format(feature)])
    ax.set(
        ylabel=y_label if y_label is not None else None,
        title=title if title is not None else "Train: Observed vs Predicted",
    )


def score_estimator(
    estimator,
    X_train,
    X_test,
    df_train,
    df_test,
    target,
    weights,
    tweedie_powers=None,
):
    """Evaluate an estimator on train and test sets with different metrics"""

    metrics = [
        ("D² explained", None),  # Use default scorer if it exists
        ("mean abs. error", mean_absolute_error),
        ("mean squared error", mean_squared_error),
    ]
    if tweedie_powers:
        metrics += [
            (
                "mean Tweedie dev p={:.4f}".format(power),
                partial(mean_tweedie_deviance, power=power),
            )
            for power in tweedie_powers
        ]

    res = []
    for subset_label, X, df in [
        ("train", X_train, df_train),
        ("test", X_test, df_test),
    ]:
        y, _weights = df[target], df[weights]
        for score_label, metric in metrics:
            if isinstance(estimator, tuple) and len(estimator) == 2:
                # Score the model consisting of the product of frequency and
                # severity models.
                est_freq, est_sev = estimator
                y_pred = est_freq.predict(X) * est_sev.predict(X)
            else:
                y_pred = estimator.predict(X)

            if metric is None:
                if not hasattr(estimator, "score"):
                    continue
                score = estimator.score(X, y, sample_weight=_weights)
            else:
                score = metric(y, y_pred, sample_weight=_weights)

            res.append({"subset": subset_label, "metric": score_label, "score": score})

    res = (
        pd.DataFrame(res)
        .set_index(["metric", "subset"])
        .score.unstack(-1)
        .round(4)
        .loc[:, ["train", "test"]]
    )
    return res

In [2]:
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import (
    FunctionTransformer,
    KBinsDiscretizer,
    OneHotEncoder,
    StandardScaler,
)

df = load_mtpl2()


# Correct for unreasonable observations (that might be data error)
# and a few exceptionally large claim amounts
df["ClaimNb"] = df["ClaimNb"].clip(upper=4)
df["Exposure"] = df["Exposure"].clip(upper=1)
df["ClaimAmount"] = df["ClaimAmount"].clip(upper=200000)
# If the claim amount is 0, then we do not count it as a claim. The loss function
# used by the severity model needs strictly positive claim amounts. This way
# frequency and severity are more consistent with each other.
df.loc[(df["ClaimAmount"] == 0) & (df["ClaimNb"] >= 1), "ClaimNb"] = 0

log_scale_transformer = make_pipeline(
    FunctionTransformer(func=np.log), StandardScaler()
)

column_trans = ColumnTransformer(
    [
        (
            "binned_numeric",
            KBinsDiscretizer(
                n_bins=10, quantile_method="averaged_inverted_cdf", random_state=0
            ),
            ["VehAge", "DrivAge"],
        ),
        (
            "onehot_categorical",
            OneHotEncoder(),
            ["VehBrand", "VehPower", "VehGas", "Region", "Area"],
        ),
        ("passthrough_numeric", "passthrough", ["BonusMalus"]),
        ("log_scaled_numeric", log_scale_transformer, ["Density"]),
    ],
    remainder="drop",
)
X = column_trans.fit_transform(df)

# Insurances companies are interested in modeling the Pure Premium, that is
# the expected total claim amount per unit of exposure for each policyholder
# in their portfolio:
df["PurePremium"] = df["ClaimAmount"] / df["Exposure"]

# This can be indirectly approximated by a 2-step modeling: the product of the
# Frequency times the average claim amount per claim:
df["Frequency"] = df["ClaimNb"] / df["Exposure"]
df["AvgClaimAmount"] = df["ClaimAmount"] / np.fmax(df["ClaimNb"], 1)

with pd.option_context("display.max_columns", 15):
    print(df[df.ClaimAmount > 0].head())

       ClaimNb  Exposure Area  VehPower  VehAge  DrivAge  BonusMalus VehBrand  \
IDpol                                                                           
139          1      0.75    F         7       1       61          50      B12   
190          1      0.14    B        12       5       50          60      B12   
414          1      0.14    E         4       0       36          85      B12   
424          2      0.62    F        10       0       51         100      B12   
463          1      0.31    A         5       0       45          50      B12   

          VehGas  Density Region  ClaimAmount   PurePremium  Frequency  \
IDpol                                                                    
139    'Regular'    27000    R11       303.00    404.000000   1.333333   
190     'Diesel'       56    R25      1981.84  14156.000000   7.142857   
414    'Regular'     4792    R11      1456.55  10403.928571   7.142857   
424    'Regular'    27000    R11     10834.00  17474.193548   

In [3]:
from sklearn.model_selection import train_test_split

df_train, df_test, X_train, X_test = train_test_split(df, X, random_state=0)

In [6]:
from sklearn.linear_model import TweedieRegressor

glm_pure_premium = TweedieRegressor(power=1.9, alpha=0.1, solver="newton-cholesky")
glm_pure_premium.fit(
    X_train, df_train["PurePremium"], sample_weight=df_train["Exposure"]
)

tweedie_powers = [1.1, 1.25, 1.5, 1.7]

scores_glm_pure_premium = score_estimator(
    glm_pure_premium,
    X_train,
    X_test,
    df_train,
    df_test,
    target="PurePremium",
    weights="Exposure",
    tweedie_powers=tweedie_powers,
)

scores = pd.concat(
    [scores_glm_pure_premium],
    axis=1,
    sort=True,
    keys=("Product Model", "TweedieRegressor"),
)
print("Evaluation of the Product Model and the Tweedie Regressor on target PurePremium")
with pd.option_context("display.expand_frame_repr", False):
    print(scores)

Evaluation of the Product Model and the Tweedie Regressor on target PurePremium
                          Product Model              
subset                            train          test
metric                                               
D² explained               1.640000e-02  1.370000e-02
mean Tweedie dev p=1.1000  6.590273e+02  6.496857e+02
mean Tweedie dev p=1.2500  2.683968e+02  2.664248e+02
mean Tweedie dev p=1.5000  7.640460e+01  7.641980e+01
mean Tweedie dev p=1.7000  3.682720e+01  3.692730e+01
mean abs. error            2.740176e+02  2.731633e+02
mean squared error         3.295518e+07  3.213087e+07


  scores = pd.concat(


In [8]:
res = []
for subset_label, X, df in [
    ("train", X_train, df_train),
    ("test", X_test, df_test),
]:
    exposure = df["Exposure"].values
    res.append(
        {
            "subset": subset_label,
            "observed": df["ClaimAmount"].values.sum(),
            "predicted, tweedie, power=%.2f" % glm_pure_premium.power: np.sum(
                exposure * glm_pure_premium.predict(X)
            ),
        }
    )

print(pd.DataFrame(res).set_index("subset").T)

subset                                 train          test
observed                        3.917618e+07  1.299546e+07
predicted, tweedie, power=1.90  3.952914e+07  1.325666e+07


In [None]:
import shap
import numpy as np

# Fit the model
glm_pure_premium = TweedieRegressor(power=1.5, alpha=0.1, solver="newton-cholesky")
glm_pure_premium.fit(
    X_train,
    df_train["PurePremium"],
    sample_weight=df_train["Exposure"],
)

# Build a LinearExplainer for the log‐link η = Xβ
explainer = shap.LinearExplainer(
    glm_pure_premium,
    X_train,
)

# Get raw SHAP on η for X_test
shap_values_loglink = explainer.shap_values(X_test)
shap_multipliers = np.exp(shap_values_loglink)
base_value_log = explainer.expected_value      # E[η], scalar
mu_baseline = np.exp(base_value_log)         # E[μ], the baseline (EUR)

# For each test sample i (scikit-learn already applies the link function internally),
mu_test = glm_pure_premium.predict(X_test) 

# Reconstruct μ_i from baseline × ∏_j multipliers
mu_reconstructed = mu_baseline * np.prod(shap_multipliers, axis=1)

print(f"Reconstructed μ_i: {mu_reconstructed[:5]}")
print(f"Directly predicted μ_i: {mu_test[:5]}")

Reconstructed μ_i: [ 88.31531587  76.75336521  87.04875287 114.22201482 116.51736045]
Directly predicted μ_i: [ 88.31531587  76.75336521  87.04875287 114.22201482 116.51736045]


## From multiplicative to additive attribution

Exact additive attribution but path dependent, depends on the predictors order

In [None]:
import copy
import numpy as np

# copy the SHAP object (so you don’t overwrite the original)
shap_exp_centered = copy.deepcopy(shap_values_loglink)

# convert the baseline from log-scale to EUR:
base_value = np.exp(explainer.expected_value)
# Now base_values[i] = exp( base_eta_i ).  Usually base_values is a length-n array,
# but often all elements are identical = explainer.expected_value.  

# turn each log-SHAP φ_{ij}^{(log)} into a multiplier exp(φ_{ij}^{(log)}).  
# Then do cumulative product across features (axis=1), scaling by the baseline:
shap_exp_centered = np.cumprod(
    np.exp(shap_exp_centered), axis=1
) * base_value
#    shap_exp_centered.values[i, k] now = μ₀ × ∏_{j=1..k} exp(φ_{ij}^{(log)})
#    which is exactly “model’s prediction after including features 1..k.”

# Convert that into per-feature “additive € differences”:
target_diff = np.diff(shap_exp_centered, axis=1)
# target_diff[i, k-1] =  μ₀×∏_{j=1..k}exp(φ_{ij})  – μ₀×∏_{j=1..k-1}exp(φ_{ij})

# For the very first feature, you need Δ of (step1 – baseline):
# zero_value = np.zeros((len(base_value), 1))
first_contrib_minus_base = (
    shap_exp_centered[:, [0]]  # “μ₀×exp(φ_{i,1})”
    - base_value  # “μ₀”
)

# Rebuild shap_exp_centered.values as 
# [Δ_feature1, Δ_feature2, …, Δ_feature_p] for each row i:
shap_exp_centered = np.concatenate(
    (first_contrib_minus_base, target_diff), axis=1
)

# Sum up the per‐feature “€ contributions” for each sample:
sum_contribs = np.sum(shap_exp_centered, axis=1)  # shape = (n_test,)

# Add back the baseline (in EUR) to reconstruct the predicted pure‐premium:
reconstructed_mu = base_value + sum_contribs
# shap_exp_centered.base_values[i] should already be exp(original_log_base).

# Get the model’s predicted μ in EUR directly:
predicted_mu = glm_pure_premium.predict(X_test)  # this is exp(η_i) = μ_i

# Compare them—ideally they match up to machine‐precision:
diff = np.abs(reconstructed_mu - predicted_mu)

print("Max discrepancy (EUR):", diff.max())
if not np.allclose(reconstructed_mu, predicted_mu, rtol=1e-6, atol=1e-8):
    raise ValueError(
        "Sum of SHAP‐based contributions + baseline "
        "does not match model prediction (EUR)!"
    )

Max discrepancy (EUR): 1.1641532182693481e-10


In [None]:
# First‐order (Taylor) approximation: convert each log‐SHAP φ_{i,j} into an additive € amount

# Convert the baseline from log‐scale to EUR:
base_value_log = explainer.expected_value       
base_value_EUR = np.exp(base_value_log)        
shap_values_EUR = shap_values_loglink * base_value_EUR

# Additivity is usually not met
# μ_i ≈ base_value_EUR + sum_j Δμ_{i,j}
reconstructed_mu = base_value_EUR + np.sum(shap_values_EUR, axis=1)
print("Additivity check passed. Max discrepancy:", np.max(np.abs(mu_test - reconstructed_mu)))

Additivity check passed. Max discrepancy: 70546.10494107334
