# 04 - Pricing engine: margin vs turnover

This notebook implements an executive-style pricing engine:

- Iterate over all country / currency / direction / channel segments.
- For each segment, analyse the relationship between margin_rate and daily turnover (in EUR).
- Fit a polynomial regression to approximate the "volume vs margin" curve.
- Simulate Gross Profit (GP) across a margin grid and identify the margin that maximises GP.
- Use bootstrap to estimate uncertainty around the optimal margin.
- Export one PDF scatter plot per segment, with:
  - Observed daily points (margin vs turnover in EUR)
  - Fitted turnover curve
  - Optimal margin marker (max GP)
  - Bootstrap band for the optimal margin.
- Generate a summary CSV with recommended margins and expected GP uplift.

The goal is to demonstrate a realistic pricing analysis workflow using synthetic data.


In [57]:
# ============================================
# Imports and global configuration
# ============================================

import os
import warnings
import numpy as np
import pandas as pd

import plotly.graph_objects as go
import plotly.io as pio

from sklearn.preprocessing import PolynomialFeatures
from sklearn.linear_model import LinearRegression
from sklearn.metrics import r2_score

warnings.filterwarnings("ignore")

# Input synthetic dataset (generated in 01_data_simulation.ipynb)
DATA_PATH = "../data/synthetic_pricing_daily.csv"

# Root folder for pricing engine outputs
OUTPUT_ROOT = "../output/pricing_engine"

# Segment dimensions to iterate over
SEGMENT_DIMENSIONS = ["country", "currency", "direction", "channel"]

# Polynomial degrees to test for turnover = f(margin_rate)
CANDIDATE_DEGREES = [1, 2, 3, 4, 5, 6]

# Minimum number of daily observations required per segment
MIN_OBSERVATIONS = 100

# Margin bounds used for simulation grid (safety clamp)
MARGIN_LOWER_BOUND = 0.10
MARGIN_UPPER_BOUND = 0.10

# Number of points in the margin grid
MARGIN_GRID_POINTS = 80

# Bootstrap configuration
USE_BOOTSTRAP = True
N_BOOTSTRAP = 500
BOOTSTRAP_LOWER_PCT = 10
BOOTSTRAP_UPPER_PCT = 90

# Random seed for reproducibility
RANDOM_STATE = 42

os.makedirs(OUTPUT_ROOT, exist_ok=True)

print("Configuration loaded.")

Configuration loaded.


In [58]:
# ============================================
# Load dataset
# ============================================

df = pd.read_csv(DATA_PATH)

print("Dataset loaded.")
print("Shape:", df.shape)
df.head()

Dataset loaded.
Shape: (87600, 17)


Unnamed: 0,date,country,branch,channel,currency,direction,pax,transactions,hit_rate,ATV,ATV_eur,turnover,turnover_eur,margin_rate,gp,gp_eur,gp_pct
0,2025-01-01,UK,UK_BR_1,Airport,USD,BUY,8.680953,5.992303,0.690282,584.031181,584.031181,3499.691705,3499.691705,0.114133,399.430382,399.430382,0.114133
1,2025-01-02,UK,UK_BR_1,Airport,USD,BUY,11.693377,7.831966,0.669778,555.65384,555.65384,4351.861892,4351.861892,0.098343,427.974082,427.974082,0.098343
2,2025-01-03,UK,UK_BR_1,Airport,USD,BUY,10.785967,6.899122,0.639639,590.087078,590.087078,4071.082724,4071.082724,0.111882,455.482473,455.482473,0.111882
3,2025-01-04,UK,UK_BR_1,Airport,USD,BUY,12.613527,10.393314,0.823982,570.959926,570.959926,5934.165825,5934.165825,0.112462,667.369432,667.369432,0.112462
4,2025-01-05,UK,UK_BR_1,Airport,USD,BUY,9.722016,6.866644,0.706298,590.042343,590.042343,4051.611003,4051.611003,0.114581,464.238676,464.238676,0.114581


In [59]:
# ============================================
# Helper: fit polynomial turnover model
# ============================================

def fit_turnover_model(df_segment, degrees):
    """
    Fit polynomial models turnover_eur = f(margin_rate) and select the best degree by R².

    Parameters
    ----------
    df_segment : pd.DataFrame
        Segment data with columns 'margin_rate' and 'turnover_eur'.
    degrees : list[int]
        Polynomial degrees to test.

    Returns
    -------
    best_degree : int
    best_model : LinearRegression
    best_poly : PolynomialFeatures
    best_r2 : float
    """
    df_mod = df_segment.copy()
    df_mod = df_mod[(df_mod["turnover_eur"] > 0) & (df_mod["margin_rate"] > 0)]

    if df_mod.empty:
        return None, None, None, None

    X = df_mod[["margin_rate"]].values
    y = df_mod["turnover_eur"].values

    best_degree = None
    best_model = None
    best_poly = None
    best_r2 = -np.inf

    for deg in degrees:
        poly = PolynomialFeatures(degree=deg, include_bias=False)
        X_poly = poly.fit_transform(X)

        model = LinearRegression()
        model.fit(X_poly, y)

        y_pred = model.predict(X_poly)
        r2 = r2_score(y, y_pred)

        if r2 > best_r2:
            best_r2 = r2
            best_degree = deg
            best_model = model
            best_poly = poly

    return best_degree, best_model, best_poly, best_r2


def simulate_gp_curve(df_segment, model, poly,
                      margin_lower, margin_upper,
                      grid_points=80):
    """
    Simulate turnover and GP across a margin grid.

    Parameters
    ----------
    df_segment : pd.DataFrame
        Segment data with 'ATV_eur' used for average ticket.
    model : LinearRegression
    poly : PolynomialFeatures
    margin_lower, margin_upper : float
        Bounds for the margin_rate grid.
    grid_points : int
        Number of points in the margin grid.

    Returns
    -------
    margin_grid : np.ndarray
    turnover_pred : np.ndarray
    gp_pred : np.ndarray
    """
    observed_min = df_segment["margin_rate"].min()
    observed_max = df_segment["margin_rate"].max()

    lower = max(margin_lower, observed_min)
    upper = min(margin_upper, observed_max)

    margin_grid = np.linspace(lower, upper, grid_points)
    X_grid = margin_grid.reshape(-1, 1)
    X_grid_poly = poly.transform(X_grid)

    turnover_pred = model.predict(X_grid_poly)
    turnover_pred = np.maximum(turnover_pred, 0.0)

    gp_pred = turnover_pred * margin_grid

    return margin_grid, turnover_pred, gp_pred


In [60]:
# ============================================
# Helper: bootstrap for optimal margin
# ============================================

def bootstrap_optimal_margin(df_segment,
                             degrees,
                             n_bootstrap,
                             margin_lower,
                             margin_upper,
                             grid_points=80,
                             random_state=42):
    """
    Bootstrap the optimal margin (max GP) across resampled datasets.

    Returns
    -------
    margins_opt : np.ndarray
        Array of optimal margin values across bootstrap samples.
    """
    if n_bootstrap <= 0:
        return np.array([])

    df_mod = df_segment.copy()
    df_mod = df_mod[(df_mod["turnover_eur"] > 0) & (df_mod["margin_rate"] > 0)]

    n = len(df_mod)
    if n == 0:
        return np.array([])

    rng = np.random.default_rng(random_state)
    margins_opt = []

    for _ in range(n_bootstrap):
        idx = rng.integers(0, n, size=n)
        sample = df_mod.iloc[idx]

        best_degree, model, poly, _ = fit_turnover_model(sample, degrees)
        if model is None:
            continue

        margin_grid, turnover_pred, gp_pred = simulate_gp_curve(
            sample, model, poly,
            margin_lower, margin_upper,
            grid_points=grid_points
        )

        if gp_pred.size == 0:
            continue

        i_max = int(np.argmax(gp_pred))
        margins_opt.append(float(margin_grid[i_max]))

    return np.array(margins_opt)


In [61]:
# ============================================
# Helper: create scatter plot and export PDF
# ============================================

def create_pricing_scatter(
    df_segment,
    margin_grid,
    turnover_pred,
    margin_opt,
    gp_opt,
    bootstrap_margins,
    summary_row
):
    """
    Create a Plotly scatter for a given segment:

    - X: margin_rate (GP%)
    - Y: turnover_eur
    - Color: gp_eur
    - Fitted turnover curve
    - Optimal margin marker (max GP)
    - Bootstrap band (10th–90th percentile) if available
    """

    country = summary_row["country"]
    currency = summary_row["currency"]
    direction = summary_row["direction"]
    channel = summary_row["channel"]

    seg_title = (
        f"Turnover vs Margin – Optimal GP<br>"
        f"Country={country} | Channel={channel} | Currency={currency} | Direction={direction}"
    )

    fig = go.Figure()

    # Scatter of observed points
    fig.add_trace(
        go.Scatter(
            x=df_segment["margin_rate"],
            y=df_segment["turnover_eur"],
            mode="markers",
            name="Observed days",
            marker=dict(
                size=6,
                color=df_segment["gp_eur"],
                colorscale="Viridis",
                showscale=True,
                colorbar=dict(title="Daily GP (EUR)")
            ),
            text=df_segment["date"].astype(str),
            hovertemplate=(
                "date=%{text}<br>"
                "margin_rate=%{x:.3f}<br>"
                "turnover_eur=%{y:,.0f} €<extra></extra>"
            )
        )
    )

    # Fitted turnover curve
    fig.add_trace(
        go.Scatter(
            x=margin_grid,
            y=turnover_pred,
            mode="lines",
            name="Fitted turnover curve",
            line=dict(width=3)
        )
    )

    # Optimal margin marker
    fig.add_trace(
        go.Scatter(
            x=[margin_opt],
            y=[gp_opt / margin_opt if margin_opt > 0 else 0],
            mode="markers+text",
            name="Optimal margin (max GP)",
            marker=dict(size=10, symbol="diamond"),
            text=[f"Opt margin={margin_opt:.3f}"],
            textposition="top center",
            hovertemplate=(
                "Optimal margin=%{x:.3f}<br>"
                "Implied turnover=%{y:,.0f} €<extra></extra>"
            )
        )
    )

    shapes = []
    annotations = []

    # Bootstrap band
    if bootstrap_margins is not None and len(bootstrap_margins) > 0:
        q_low = np.percentile(bootstrap_margins, BOOTSTRAP_LOWER_PCT)
        q_high = np.percentile(bootstrap_margins, BOOTSTRAP_UPPER_PCT)

        shapes.append(
            dict(
                type="rect",
                xref="x",
                yref="paper",
                x0=float(q_low),
                x1=float(q_high),
                y0=0,
                y1=1,
                fillcolor="rgba(0, 0, 200, 0.10)",
                line=dict(width=0),
                layer="below"
            )
        )

        annotations.append(
            dict(
                x=(q_low + q_high) / 2.0,
                y=1.02,
                xref="x",
                yref="paper",
                text="Bootstrap band (10th–90th percentile)",
                showarrow=False,
                font=dict(size=10)
            )
        )

    fig.update_layout(
        title=seg_title,
        xaxis_title="margin_rate (GP%)",
        yaxis_title="Daily turnover (EUR)",
        shapes=shapes,
        annotations=annotations,
        legend=dict(
            orientation="h",
            yanchor="bottom",
            y=-0.2,
            xanchor="center",
            x=0.5
        ),
        template="plotly_white"
    )

    # Folder structure for PDFs:
    # ../output/pricing_engine/country=UK/currency=USD/direction=BUY/channel=Airport.pdf
    segment_folder = os.path.join(
        OUTPUT_ROOT,
        f"country={country}",
        f"currency={currency}",
        f"direction={direction}"
    )
    os.makedirs(segment_folder, exist_ok=True)

    pdf_filename = f"channel={channel}.pdf".replace(" ", "")
    html_filename = f"channel={channel}.html".replace(" ", "")

    pdf_path = os.path.join(segment_folder, pdf_filename)
    html_path = os.path.join(segment_folder, html_filename)

    # Export PDF (requires kaleido)
    try:
        fig.write_image(pdf_path, format="pdf")
    except Exception as e:
        print(f"Could not export PDF for segment {country}-{currency}-{direction}-{channel}: {e}")

    # Also export interactive HTML
    fig.write_html(html_path)

    # Optionally show inside the notebook for debugging
    # fig.show()


In [62]:
# ============================================
# Segment processing function
# ============================================

def process_segment(df_segment, segment_key_dict):
    """
    Run the full pricing analysis for a single segment.

    Parameters
    ----------
    df_segment : pd.DataFrame
        Segment data with daily records.
    segment_key_dict : dict
        Keys: country, currency, direction, channel.

    Returns
    -------
    summary_row : dict
        One row with recommended margin and uplift metrics.
    """
    df_seg = df_segment.copy()
    df_seg = df_seg[(df_seg["turnover_eur"] > 0) & (df_seg["margin_rate"] > 0)]

    seg_size = len(df_seg)
    if seg_size < MIN_OBSERVATIONS:
        return {
            **segment_key_dict,
            "n_obs": seg_size,
            "status": "insufficient_data",
            "best_degree": None,
            "r2": None,
            "avg_margin": df_seg["margin_rate"].mean() if seg_size > 0 else None,
            "recommended_margin": None,
            "base_gp_eur": df_seg["gp_eur"].sum() if seg_size > 0 else None,
            "optimal_gp_eur": None,
            "gp_uplift_abs": None,
            "gp_uplift_pct": None,
            "bootstrap_mean_margin": None,
            "bootstrap_std_margin": None
        }

    # 1) Fit turnover model
    best_degree, model, poly, best_r2 = fit_turnover_model(df_seg, CANDIDATE_DEGREES)
    if model is None:
        return {
            **segment_key_dict,
            "n_obs": seg_size,
            "status": "model_failed",
            "best_degree": None,
            "r2": None,
            "avg_margin": df_seg["margin_rate"].mean(),
            "recommended_margin": None,
            "base_gp_eur": df_seg["gp_eur"].sum(),
            "optimal_gp_eur": None,
            "gp_uplift_abs": None,
            "gp_uplift_pct": None,
            "bootstrap_mean_margin": None,
            "bootstrap_std_margin": None
        }

    # 2) Simulate GP curve
    margin_grid, turnover_pred, gp_pred = simulate_gp_curve(
        df_seg,
        model,
        poly,
        MARGIN_LOWER_BOUND,
        MARGIN_UPPER_BOUND,
        grid_points=MARGIN_GRID_POINTS
    )

    if gp_pred.size == 0:
        return {
            **segment_key_dict,
            "n_obs": seg_size,
            "status": "no_margin_range",
            "best_degree": best_degree,
            "r2": best_r2,
            "avg_margin": df_seg["margin_rate"].mean(),
            "recommended_margin": None,
            "base_gp_eur": df_seg["gp_eur"].sum(),
            "optimal_gp_eur": None,
            "gp_uplift_abs": None,
            "gp_uplift_pct": None,
            "bootstrap_mean_margin": None,
            "bootstrap_std_margin": None
        }

    idx_max = int(np.argmax(gp_pred))
    margin_opt = float(margin_grid[idx_max])
    gp_opt = float(gp_pred[idx_max])

    base_gp = float(df_seg["gp_eur"].sum())
    avg_margin = float(df_seg["margin_rate"].mean())

    gp_uplift_abs = gp_opt - base_gp
    gp_uplift_pct = (gp_opt / base_gp - 1.0) if base_gp > 0 else None

    # 3) Bootstrap
    bootstrap_margins = None
    bootstrap_mean = None
    bootstrap_std = None

    if USE_BOOTSTRAP:
        bootstrap_margins = bootstrap_optimal_margin(
            df_seg,
            CANDIDATE_DEGREES,
            N_BOOTSTRAP,
            MARGIN_LOWER_BOUND,
            MARGIN_UPPER_BOUND,
            grid_points=MARGIN_GRID_POINTS,
            random_state=RANDOM_STATE
        )
        if len(bootstrap_margins) > 0:
            bootstrap_mean = float(np.mean(bootstrap_margins))
            bootstrap_std = float(np.std(bootstrap_margins))

    # 4) Build summary row
    summary_row = {
        **segment_key_dict,
        "n_obs": seg_size,
        "status": "ok",
        "best_degree": best_degree,
        "r2": best_r2,
        "avg_margin": avg_margin,
        "recommended_margin": margin_opt,
        "base_gp_eur": base_gp,
        "optimal_gp_eur": gp_opt,
        "gp_uplift_abs": gp_uplift_abs,
        "gp_uplift_pct": gp_uplift_pct,
        "bootstrap_mean_margin": bootstrap_mean,
        "bootstrap_std_margin": bootstrap_std
    }

    # 5) Create and export scatter plot
    create_pricing_scatter(
        df_segment=df_seg,
        margin_grid=margin_grid,
        turnover_pred=turnover_pred,
        margin_opt=margin_opt,
        gp_opt=gp_opt,
        bootstrap_margins=bootstrap_margins,
        summary_row=summary_row
    )

    return summary_row

In [None]:
# ============================================
# Main loop over all segments
# ============================================

summary_rows = []

grouped = df.groupby(SEGMENT_DIMENSIONS)

for keys, df_segment in grouped:
    segment_key_dict = dict(zip(SEGMENT_DIMENSIONS, keys))
    print("Processing segment:", segment_key_dict)

    summary_row = process_segment(df_segment, segment_key_dict)
    summary_rows.append(summary_row)

summary_df = pd.DataFrame(summary_rows)
summary_df

Processing segment: {'country': 'AU', 'currency': 'AUD', 'direction': 'BUY', 'channel': 'Airport'}
Could not export PDF for segment AU-AUD-BUY-Airport: 
Image export using the "kaleido" engine requires the kaleido package,
which can be installed using pip:
    $ pip install -U kaleido

Processing segment: {'country': 'AU', 'currency': 'AUD', 'direction': 'BUY', 'channel': 'High Street'}
Could not export PDF for segment AU-AUD-BUY-High Street: 
Image export using the "kaleido" engine requires the kaleido package,
which can be installed using pip:
    $ pip install -U kaleido

Processing segment: {'country': 'AU', 'currency': 'AUD', 'direction': 'SELL', 'channel': 'Airport'}
Could not export PDF for segment AU-AUD-SELL-Airport: 
Image export using the "kaleido" engine requires the kaleido package,
which can be installed using pip:
    $ pip install -U kaleido

Processing segment: {'country': 'AU', 'currency': 'AUD', 'direction': 'SELL', 'channel': 'High Street'}
Could not export PDF for 

In [None]:
# ============================================
# Save summary and create an executive view
# ============================================

summary_csv_path = os.path.join(OUTPUT_ROOT, "pricing_recommendations.csv")
summary_df.to_csv(summary_csv_path, index=False)

print(f"Summary exported to: {summary_csv_path}")

# Executive view: only successful segments
exec_view = (
    summary_df[summary_df["status"] == "ok"]
    .copy()
    .sort_values("gp_uplift_abs", ascending=False)
)

# Keep key columns
exec_view = exec_view[
    SEGMENT_DIMENSIONS
    + [
        "n_obs",
        "best_degree",
        "r2",
        "avg_margin",
        "recommended_margin",
        "base_gp_eur",
        "optimal_gp_eur",
        "gp_uplift_abs",
        "gp_uplift_pct",
        "bootstrap_mean_margin",
        "bootstrap_std_margin"
    ]
]

exec_view.head(20)