In [None]:
import os
import zipfile
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import yaml 

# Load & prepare web logs (dw)

# 1. Extract the web log ZIP file
# 2. Load all .txt files and concatenate into dw
# 3. Merge Variation (Test/Control) from experiment file
# 4. Merge client age from demo file

# Paths to input files
# ZIP_PATH = "/home/rafael/Área de Trabalho/CUROS IRONHACK/SEMANA 6/PROJETO 2/vanguard-ab-test/data/raw/df_final_web_data_pt_.zip"
# EXPERIMENT_PATH = "/home/rafael/Área de Trabalho/CUROS IRONHACK/SEMANA 6/PROJETO 2/vanguard-ab-test/data/raw/df_final_experiment_clients.txt"
# DEMO_PATH = "/home/rafael/Área de Trabalho/CUROS IRONHACK/SEMANA 6/PROJETO 2/vanguard-ab-test/data/raw/df_final_demo.txt"
# EXTRACT_DIR = "/home/rafael/Área de Trabalho/CUROS IRONHACK/SEMANA 6/PROJETO 2/vanguard-ab-test/data/raw/webdata_extracted"

try:
    with open("../config.yaml", "r") as file:
        config = yaml.safe_load(file)
except:
    print("Yaml configuration file not found!")

EXP_PATH = config['input_data']['file2']
DEMO_PATH = config['input_data']['file1']
ZIP_PATH  = config['input_data']['file5']
EXTRACT_DIR = "../data/raw/rawwebdata_extracted"



In [None]:

# Extract ZIP
os.makedirs(EXTRACT_DIR, exist_ok=True)
with zipfile.ZipFile(ZIP_PATH, "r") as z:
    z.extractall(EXTRACT_DIR)

# Load and concatenate all .txt files
web_files = [f for f in os.listdir(EXTRACT_DIR) if f.endswith(".txt")]
frames = [
    pd.read_csv(os.path.join(EXTRACT_DIR, f), parse_dates=["date_time"]) 
    for f in web_files
]
dw = pd.concat(frames, ignore_index=True)
print(f"✅ Loaded web logs: {len(dw):,} rows from {len(web_files)} files")
print("Columns in dw (before merge):", dw.columns.tolist())

# Merge Variation (CSV file)
exp_clients = pd.read_csv(EXP_PATH, sep=",")
print("Columns in exp_clients:", exp_clients.columns.tolist())
exp_clients = exp_clients[["client_id", "Variation"]]
dw = dw.merge(exp_clients, on="client_id", how="inner")
print(f"✅ Variation merged. Unique groups: {dw['Variation'].unique()}")

# Merge client age (CSV file)

df_demo = pd.read_csv(DEMO_PATH, sep=",")
print("Columns in df_demo:", df_demo.columns.tolist())
df_demo = df_demo[["client_id", "clnt_age"]]
dw = dw.merge(df_demo, on="client_id", how="left")
print(f"✅ Age merged. Non-null ages: {dw['clnt_age'].notna().sum():,}")
print("Columns in dw (final):", dw.columns.tolist()[:10], "...")
print(dw.head(3))

#Create age_band column from clnt_age (if not exists)
if "age_band" not in dw.columns:
    # Convert age to numeric (just in case)
    dw["clnt_age"] = pd.to_numeric(dw["clnt_age"], errors="coerce")
    # Remove rows with missing age to avoid NaN groups
    dw = dw[dw["clnt_age"].notna()].copy()
    # Define age bins (10–19, ..., 80+)
    age_bins = list(range(10, 101, 10))
    age_labels = [f"{i}-{i+9}" for i in bins[:-1]]
    # Create categorical age_band column
    dw["age_band"] = pd.cut(
        dw["clnt_age"],
        bins=age_bins,
        labels=age_labels,
        right=False
    )
    print(":white_tick: Age band column created successfully!")
    print(dw[["clnt_age", "age_band"]].head(10))

In [None]:
dw

In [None]:
# 1: Define process step order

# Map each step to a numeric order, so we can detect when a user moves BACKWARD (from step_2 to step_1).

step_order = {
    "start": 0,
    "step_1": 1,
    "step_2": 2,
    "step_3": 3,
    "confirm": 4
}

In [None]:
# 2: Function to count backward moves per visit

# For each visit (visit_id):
# 1. Sort events by date_time
# 2. Map steps to numeric order
# 3. Count decreases in the sequence (backward moves)

def count_backward_moves(df_visit):
    df_visit = df_visit.sort_values("date_time")
    step_nums = df_visit["process_step"].map(step_order).dropna().tolist()
    backward_moves = sum(
        1 for i in range(1, len(step_nums)) if step_nums[i] < step_nums[i - 1]
    )
    return backward_moves

In [None]:
# 3: Count backward moves for each visit

# Group by visit_id and Variation (Test/Control) to count how many backward moves each visit has.

error_counts = (
    dw.groupby(["visit_id", "Variation"])
    .apply(count_backward_moves, include_groups=False)
    .reset_index(name="num_backward_moves")
)

# whether the visit had at least one backward move
# error_counts_age["had_error"] = error_counts_age["num_backward_moves"] > 0
error_counts["had_error"] = error_counts["num_backward_moves"] > 0


In [None]:
error_counts

In [None]:
# 4: Error rate summary by group

# - Average number of backward moves per visit
# - Share of visits with at least one backward move

# Average number of backward moves per visit
error_rate_by_group = (
    error_counts.groupby("Variation")["num_backward_moves"]
    .mean()
    .reset_index(name="avg_backward_moves_per_visit")
)

# Share of visits with at least one backward move
error_counts["had_error"] = error_counts["num_backward_moves"] > 0
error_share_by_group = (
    error_counts.groupby("Variation")["had_error"]
    .mean()
    .reset_index(name="share_visits_with_error")
)

print("=== Error rate summary by group (Test vs Control) ===")
display(error_rate_by_group)
display(error_share_by_group)

In [None]:
# 5: Add age bands for age-based analysis

# Assign each visit an age band based on clnt_age.

bins = list(range(10, 101, 10))
labels = [f"{i}-{i+9}" for i in bins[:-1]]

# Create mapping of visit_id -> age_band
age_map = (
    dw[["visit_id", "clnt_age"]]
    .drop_duplicates()
    .assign(age_band=lambda d: pd.cut(d["clnt_age"], bins=bins, labels=labels, right=False))
)

# Merge age band into error counts
error_counts_age = error_counts.merge(age_map, on="visit_id", how="left")
error_counts_age

In [None]:
# 6: Error rate by age band & group — with formatted outputs

# For each (Variation, age_band):
# - Average number of backward moves per visit
# - Share of visits with at least one backward move

error_rate_by_age = (
    error_counts_age.groupby(["Variation", "age_band"], observed=True)["num_backward_moves"]
    .mean()
    .reset_index(name="avg_backward_moves_per_visit")
)

error_share_by_age = (
    error_counts_age.groupby(["Variation", "age_band"], observed=True)["had_error"]
    .mean()
    .reset_index(name="share_visits_with_error")
)

# Merge the two summaries for convenience
error_summary_by_age = error_rate_by_age.merge(
    error_share_by_age,
    on=["Variation", "age_band"],
    how="left"
)

# --- Formatting section ---
# Round numeric values for analysis
error_summary_by_age["avg_backward_moves_per_visit"] = (
    error_summary_by_age["avg_backward_moves_per_visit"].round(3)
)
error_summary_by_age["share_visits_with_error"] = (
    error_summary_by_age["share_visits_with_error"].round(3)
)

# Add formatted string columns for display
error_summary_by_age["avg_backward_moves_per_visit_fmt"] = (
    error_summary_by_age["avg_backward_moves_per_visit"]
    .apply(lambda x: f"{x*100:.1f}%")
)

error_summary_by_age["share_visits_with_error_fmt"] = (
    error_summary_by_age["share_visits_with_error"]
    .apply(lambda x: f"{x*100:.1f}%")
)

# --- Final display ---
print("=== Error Summary by Age Band & Group ===")
display(error_summary_by_age)


In [None]:
error_summary_by_age.to_csv("../data/clean/error_by_age_df.csv", index=False)

In [None]:
# 7: Plot error metrics by age band & group (Test vs Control)

import matplotlib.pyplot as plt
import numpy as np

def plot_error_metric(df, value_col, title, ylabel):
    """
    Plot a grouped bar chart for the given error metric.
    - df: error_summary_by_age DataFrame
    - value_col: name of the numeric column to plot
    - title: plot title
    - ylabel: y-axis label
    """
    # Pivot to have Test & Control as columns, age_band as index
    pivot = df.pivot(index="age_band", columns="Variation", values=value_col)
    pivot = pivot.reindex(index=sorted(pivot.index, key=lambda x: str(x)))  # ensure age order

    x = np.arange(len(pivot.index))
    width = 0.35

    fig, ax = plt.subplots(figsize=(10, 5))
    ax.bar(x - width/2, pivot["Test"], width, label="Test")
    ax.bar(x + width/2, pivot["Control"], width, label="Control")

    ax.set_xlabel("Age band")
    ax.set_ylabel(ylabel)
    ax.set_title(title)
    ax.set_xticks(x)
    ax.set_xticklabels(pivot.index, rotation=45, ha="right")
    ax.legend()
    plt.tight_layout()
    plt.show()


In [None]:
# --- Plot 1: Average backward moves per visit ---
plot_error_metric(
    error_summary_by_age,
    "avg_backward_moves_per_visit",
    "Average Backward Moves per Visit by Age Band & Group",
    "Average backward moves per visit"
)

In [None]:
# --- Plot 2: Share of visits with at least one backward move ---
plot_error_metric(
    error_summary_by_age,
    "share_visits_with_error",
    "Share of Visits with Error by Age Band & Group",
    "Share of visits with error"
)

In [None]:
# Separate the data for Test and Control groups-- Two sample test
df_test = error_counts_age.loc[error_counts_age["Variation"] == "Test", "num_backward_moves"]
df_control = error_counts_age.loc[error_counts_age["Variation"] == "Control", "num_backward_moves"]
# Compute sample means
test_mean = df_test.mean()
control_mean = df_control.mean()
# Compute sample standard deviations (ddof=1 for sample std)
test_std = df_test.std(ddof=1)
control_std = df_control.std(ddof=1)
# Sample sizes
n_test = len(df_test)
n_control = len(df_control)
# Compute t-statistic manually
t_statistic = (test_mean - control_mean) / np.sqrt( (test_std**2 / n_test) + (control_std**2 / n_control) )
t_statistic


In [None]:
def show_statistical_test(statistic: float, alpha: float, n: int, distribution: str=["t-student","normal"], alternative: str=["two-sided","lower","greater"]):
    if distribution not in ["t-student","normal"]:
        raise TypeError("Sorry, only 't-student', and 'normal' distributions are acepted")
    if alternative not in ["two-sided","lower","greater"]:
        raise TypeError("Sorry, only 'two-sided', 'lower', and 'greated' are acepted valued for the alternative")
    if not isinstance(statistic, float):
        raise TypeError("Sorry, the data type for the statistic must be float")
    if not isinstance(alpha, float):
        raise TypeError("Sorry, the data type for alpha must be float")
    if not isinstance(n, int):
        raise TypeError("Sorry, the data type for n must be int")
    x_values = np.linspace(-3, 3)
    if distribution == "t-student":
        y_values = st.t.pdf(x_values, df=n-1)
        if alternative == "two-sided": # Computing the critical values
            lower_critical_value = st.t.ppf(alpha/2, df=n-1)
            upper_critical_value = st.t.ppf(1-(alpha/2), df=n-1)
            x_values1 = np.linspace(-3, lower_critical_value)
            y_values1 = st.t.pdf(x_values1, df=n-1)
            x_values2 = np.linspace(upper_critical_value, 3)
            y_values2 = st.t.pdf(x_values2, df=n-1)
        elif alternative == "lower":
            critical_value = st.t.ppf(alpha, df=n-1)
            x_values1 = np.linspace(-3, critical_value)
            y_values1 = st.t.pdf(x_values1, df=n-1)
        elif alternative == "greater":
            critical_value = st.t.ppf(1-alpha, df=n-1)
            x_values2 = np.linspace(critical_value, 3)
            y_values2 = st.t.pdf(x_values2, df=n-1)
    elif distribution == "normal":
        y_values = st.norm.pdf(x_values)
        if alternative == "two-sided": # Computing the critical values
            lower_critical_value = st.norm.ppf(alpha/2)
            upper_critical_value = st.norm.ppf(1-(alpha/2))
            x_values1 = np.linspace(-3, lower_critical_value)
            y_values1 = st.norm.pdf(x_values1)
            x_values2 = np.linspace(upper_critical_value, 3)
            y_values2 = st.norm.pdf(x_values2)
        elif alternative == "lower":
            critical_value = st.norm.ppf(alpha)
            x_values1 = np.linspace(-3, critical_value)
            y_values1 = st.norm.pdf(x_values1)
        elif alternative == "greater":
            critical_value = st.norm.ppf(1-alpha)
            x_values2 = np.linspace(critical_value, 3)
            y_values2 = st.norm.pdf(x_values2)
    df = pd.DataFrame({"x": x_values, "pdf": y_values})
    title = f"{distribution} Probability Density Function"
    fig = px.line(df, x="x", y="pdf", title=title)
    if alternative == "two-sided":
        fig.add_vline(x=lower_critical_value, line_color="red")
        fig.add_vline(x=upper_critical_value, line_color="red")
        fig.add_annotation(x=lower_critical_value,y=0,text=f"Lower critical value {lower_critical_value: .2f}",xref="x",yref="paper",yanchor="bottom")
        fig.add_annotation(x=upper_critical_value,y=0,text=f"Upper critical value {upper_critical_value: .2f}",xref="x",yref="paper",yanchor="bottom")
        fig.add_scatter(x=x_values1, y=y_values1,fill='tozeroy', mode='none' , fillcolor='red')
        fig.add_scatter(x=x_values2, y=y_values2,fill='tozeroy', mode='none' , fillcolor='red')
    elif alternative == "lower":
        fig.add_vline(x=critical_value, line_color="red")
        fig.add_annotation(x=critical_value,y=0,text=f"Critical value {critical_value: .2f}",xref="x",yref="paper",yanchor="bottom")
        fig.add_scatter(x=x_values1, y=y_values1,fill='tozeroy', mode='none' , fillcolor='red')
    elif alternative == "greater":
        fig.add_vline(x=critical_value, line_color="red")
        fig.add_annotation(x=critical_value,y=0,text=f"Critical value {critical_value: .2f}",xref="x",yref="paper",yanchor="bottom")
        fig.add_scatter(x=x_values2, y=y_values2,fill='tozeroy', mode='none' , fillcolor='red')
    fig.add_vline(x=statistic)
    fig.add_annotation(x=statistic,y=0,text=f"Statistic {statistic: .2f}",xref="x",yref="paper",yanchor="bottom")
    fig.update_layout(title_text=f'{distribution} Probability Density Function', title_x=0.5)
    fig.update_layout(showlegend=False)
    fig.show()

In [None]:
import scipy.stats as st
import plotly.express as px
show_statistical_test(t_statistic, 0.05, n_test+n_control-2, distribution="t-student", alternative="two-sided")

In [None]:
# ==========================================================
# 1) Define the funnel steps in logical order
# ==========================================================
STEP_ORDER = ["start", "step_1", "step_2", "step_3", "confirm"]
# ==========================================================
# 2) Function to compute completion rates for each group
# ==========================================================
def compute_completion_rates(df):
    """
    Computes completion rates (as %) for each step in STEP_ORDER
    separately for Test and Control groups.
    Returns a DataFrame with Variation, step, n_completed, n_start, and completion_rate.
    """
    # Number of unique visitors who started, by group
    start_counts = (
        df[df["process_step"] == "start"]
        .groupby("Variation")["visitor_id"]
        .nunique()
        .rename("n_start")
    )
    records = []
    for step in STEP_ORDER:
        # Count unique visitors who reached this step
        step_counts = (
            df[df["process_step"] == step]
            .groupby("Variation")["visitor_id"]
            .nunique()
            .rename("n_completed")
        )
        merged = pd.concat([start_counts, step_counts], axis=1)
        merged["step"] = step
        merged["completion_rate"] = (
            merged["n_completed"] / merged["n_start"] * 100
        ).round(2)
        records.append(merged.reset_index())
    return pd.concat(records, ignore_index=True)
# ==========================================================
# 3) Apply function to the main web log DataFrame `dw`
# ==========================================================
completion_rates = compute_completion_rates(dw)
print("=== Completion Rates by Step and Group ===")
display(completion_rates)
# ==========================================================
# 4) Plot completion rate per step - side by side bars
# ==========================================================
def plot_completion_by_step(df):
    """
    Plots side-by-side bars of completion rates for each step by Variation.
    """
    pivot = df.pivot(index="step", columns="Variation", values="completion_rate")
    pivot = pivot.reindex(index=STEP_ORDER)  # keep logical order
    x = range(len(pivot.index))
    width = 0.35
    fig, ax = plt.subplots(figsize=(10, 6))
    ax.bar([i - width/2 for i in x], pivot["Test"], width, label="Test")
    ax.bar([i + width/2 for i in x], pivot["Control"], width, label="Control")
    ax.set_xticks(x)
    ax.set_xticklabels(pivot.index, rotation=45)
    ax.set_ylabel("Completion rate (%)")
    ax.set_title("Step-by-Step Completion Rate: Test vs Control")
    ax.legend()
    plt.tight_layout()
    plt.show()
plot_completion_by_step(completion_rates)
# ==========================================================
# 5) Global completion rate (Start → Confirm)
# ==========================================================
global_completion = completion_rates[completion_rates["step"] == "confirm"].copy()
print("=== Global Completion Rate (Start → Confirm) ===")
display(global_completion)
# ==========================================================
# 6) Plot global completion comparison
# ==========================================================
fig, ax = plt.subplots(figsize=(6, 5))
ax.bar(global_completion["Variation"], global_completion["completion_rate"], color=["orange", "blue"])
ax.set_ylabel("Completion rate (%)")
ax.set_title("Global Completion Rate (Start → Confirm)")
for i, val in enumerate(global_completion["completion_rate"]):
    ax.text(i, val + 1, f"{val:.1f}%", ha="center", va="bottom")
plt.tight_layout()
plt.show()

In [None]:
df

In [None]:
# 7) Completion rate by step and age band
# ==========================================================
def compute_completion_by_age(df):
    """
    Computes completion rates per step grouped by (Variation, age_band).
    Returns a DataFrame with Variation, age_band, step, n_completed, n_start, completion_rate.
    """
    records = []
    # Count how many visitors started in each (Variation, age_band)
    start_counts = (
        df[df["process_step"] == "start"]
        .groupby(["Variation", "age_band"],observed =True)["visitor_id"]
        .nunique()
        .rename("n_start")
    )
    for step in STEP_ORDER:
        step_counts = (
            df[df["process_step"] == step]
            .groupby(["Variation", "age_band"], observed = True)["visitor_id"]
            .nunique()
            .rename("n_completed")
        )
        merged = pd.concat([start_counts, step_counts], axis=1)
        merged["step"] = step
        merged["completion_rate"] = (
            merged["n_completed"] / merged["n_start"] * 100
        ).round(2)
        records.append(merged.reset_index())
    return pd.concat(records, ignore_index=True)
completion_by_age = compute_completion_by_age(dw)
print("=== Completion rate by step, group, and age band ===")
display(completion_by_age.head(53))
display(completion_by_age.tail(37))

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
# Make sure we're using Seaborn's style
sns.set(style="whitegrid")
def plot_completion_by_age(df, step):
    """
    Plot completion rates for a specific step, comparing Test vs Control by age band.
    """
    subset = df[df["step"] == step].copy()
    subset = subset.sort_values("age_band")
    plt.figure(figsize=(12,6))
    sns.barplot(
        data=subset,
        x="age_band",
        y="completion_rate",
        hue="Variation",
        palette={"Test":"orange","Control":"steelblue"}
    )
    plt.title(f"Completion Rate by Age Band – {step}", fontsize=14)
    plt.xlabel("Age band")
    plt.ylabel("Completion rate (%)")
    plt.xticks(rotation=45)
    plt.legend(title="Group", loc="best")
    plt.tight_layout()
    plt.show()
# Plot for each step in the funnel
for step in STEP_ORDER:
    plot_completion_by_age(completion_by_age, step)

#This graph shows, for each individual stage, the completion rate by age group—comparing the Test and Control groups side by side.

def plot_funnel_by_age(df, variation):
    """
    Line plot showing step-by-step completion funnel for each age band, within a given Variation group.
    """
    subset = df[df["Variation"] == variation]
    pivot = subset.pivot_table(
        index="step",
        columns="age_band",
        values="completion_rate"
    ).reindex(index=STEP_ORDER)
    plt.figure(figsize=(14,7))
    for col in pivot.columns:
        plt.plot(pivot.index, pivot[col], marker="o", label=str(col))
    plt.title(f"Step-by-Step Completion Funnel by Age Band — {variation}", fontsize=14)
    plt.xlabel("Step")
    plt.ylabel("Completion rate (%)")
    plt.xticks(rotation=30)
    plt.legend(title="Age band", bbox_to_anchor=(1.05, 1), loc='upper left')
    plt.grid(True, linestyle="--", alpha=0.5)
    plt.tight_layout()
    plt.show()
# Plot funnels for Test and Control separately
plot_funnel_by_age(completion_by_age, "Test")
plot_funnel_by_age(completion_by_age, "Control")

#Here, we've turned the rates per step into a "funnel line," showing how the rate changes from Start → Confirm.

# Filter only the 'confirm' step for the final completion rate
global_by_age = completion_by_age[completion_by_age["step"] == "confirm"].copy()
plt.figure(figsize=(12,6))
sns.barplot(
    data=global_by_age,
    x="age_band",
    y="completion_rate",
    hue="Variation",
    palette={"Test":"orange","Control":"steelblue"}
)
plt.title("Global Completion Rate (Start → Confirm) by Age Band", fontsize=14)
plt.xlabel("Age band")
plt.ylabel("Completion rate (%)")
plt.xticks(rotation=45)
plt.legend(title="Group", loc="best")
# Add value labels on top of bars
for i, row in global_by_age.iterrows():
    plt.text(
        x=i%len(global_by_age["age_band"].unique()),
        y=row["completion_rate"] + 1,
        s=f"{row['completion_rate']:.1f}%",
        ha="center",
        va="bottom",
        fontsize=9
    )
plt.tight_layout()
plt.show()

In [None]:
import numpy as np
from math import erf
# --------------------------------------------------------
# Normal cumulative distribution function (CDF)
# Used to calculate p-values for z-test without SciPy
# --------------------------------------------------------
def normal_cdf(z):
    return 0.5 * (1 + erf(z / np.sqrt(2)))
# --------------------------------------------------------
# Two-proportion Z-test (two-sided)
# Compares two proportions (e.g., Test vs Control completion rates)
# --------------------------------------------------------
def two_prop_ztest(x1, n1, x2, n2):
    if n1 == 0 or n2 == 0:
        return np.nan, np.nan
    p1 = x1 / n1
    p2 = x2 / n2
    p_pool = (x1 + x2) / (n1 + n2)
    se = np.sqrt(p_pool * (1 - p_pool) * (1/n1 + 1/n2))
    # If standard error is zero, handle special cases
    if se == 0:
        if p1 == p2:
            return 0.0, 1.0
        return np.nan, np.nan
    z = (p1 - p2) / se
    p = 2 * (1 - normal_cdf(abs(z)))  # two-sided
    return z, p
# --------------------------------------------------------
# Convert p-value to a significance marker using α = 0.05
# * = significant, ns = not significant
# --------------------------------------------------------
def p_to_star_0_05(p):
    if np.isnan(p):
        return "n/a"
    return "*" if p < 0.05 else "ns"

In [None]:
def ztests_stepwise_overall(completion_rates):
    """
    Run two-proportion Z-tests for each step (Test vs Control),
    using the total number of users who started vs. completed each step.
    """
    test = completion_rates.query("Variation == 'Test'")[["step","n_completed","n_start"]]
    ctrl = completion_rates.query("Variation == 'Control'")[["step","n_completed","n_start"]]
    merged = test.merge(ctrl, on="step", suffixes=("_test","_ctrl"), how="inner")
    out = []
    for _, r in merged.iterrows():
        x1, n1 = int(r["n_completed_test"]), int(r["n_start_test"])
        x2, n2 = int(r["n_completed_ctrl"]), int(r["n_start_ctrl"])
        z, p = two_prop_ztest(x1, n1, x2, n2)
        out.append({
            "step": r["step"],
            "p_test": x1/n1 if n1 else np.nan,
            "p_ctrl": x2/n2 if n2 else np.nan,
            "z_stat": z,
            "p_value": p,
            "sig": p_to_star_0_05(p)
        })
    return pd.DataFrame(out)
# Run stepwise Z-tests (overall)
z_overall = ztests_stepwise_overall(completion_rates)
print("=== Stepwise Z-tests (α=0.05) ===")
display(z_overall)

In [None]:
# Filter the 'confirm' step for global completion comparison
z_global = z_overall.loc[z_overall["step"] == "confirm"].copy()
print("=== Global Z-test (Start → Confirm) ===")
display(z_global)

In [None]:
def ztests_stepwise_by_age(completion_by_age):
    """
    Run two-proportion Z-tests for each (age_band, step),
    comparing Test vs Control groups.
    """
    test = completion_by_age.query("Variation == 'Test'")[["age_band","step","n_completed","n_start"]]
    ctrl = completion_by_age.query("Variation == 'Control'")[["age_band","step","n_completed","n_start"]]
    merged = test.merge(ctrl, on=["age_band","step"], suffixes=("_test","_ctrl"), how="inner")
    out = []
    for _, r in merged.iterrows():
        x1, n1 = int(r["n_completed_test"]), int(r["n_start_test"])
        x2, n2 = int(r["n_completed_ctrl"]), int(r["n_start_ctrl"])
        z, p = two_prop_ztest(x1, n1, x2, n2)
        out.append({
            "age_band": r["age_band"],
            "step": r["step"],
            "p_test": x1/n1 if n1 else np.nan,
            "p_ctrl": x2/n2 if n2 else np.nan,
            "z_stat": z,
            "p_value": p,
            "sig": p_to_star_0_05(p)
        })
    return pd.DataFrame(out)
# Run the stepwise Z-tests per age band
z_by_age = ztests_stepwise_by_age(completion_by_age)
print("=== Stepwise Z-tests by age band (α=0.05) ===")
display(z_by_age.head(20))

In [None]:
import matplotlib.pyplot as plt
import numpy as np
def plot_completion_by_step_with_sig(crates, z_overall, step_order):
    """
    Plot step-by-step completion rates (Test vs Control),
    with significance markers above each step.
    """
    pivot = crates.pivot(index="step", columns="Variation", values="completion_rate").reindex(step_order)
    x = np.arange(len(pivot.index))
    width = 0.35
    fig, ax = plt.subplots(figsize=(10,6))
    ax.bar(x - width/2, pivot["Test"], width, label="Test")
    ax.bar(x + width/2, pivot["Control"], width, label="Control")
    # Add significance stars
    star_map = dict(zip(z_overall["step"], z_overall["sig"]))
    for i, step in enumerate(pivot.index):
        sig = star_map.get(step, "")
        y = np.nanmax([pivot.loc[step,"Test"], pivot.loc[step,"Control"]])
        ax.text(i, y + 1.5, sig, ha="center", va="bottom", fontsize=12, fontweight="bold")
    ax.set_xticks(x)
    ax.set_xticklabels(pivot.index, rotation=45)
    ax.set_ylabel("Completion rate (%)")
    ax.set_title("Step-by-Step Completion Rate — Test vs Control (α=0.05)")
    ax.legend()
    plt.tight_layout()
    plt.show()

In [None]:
# Plot overall step-by-step completion with significance
plot_completion_by_step_with_sig(completion_rates, z_overall, STEP_ORDER)

In [None]:
def plot_global_with_sig(crates, z_global):
    """
    Plot global completion rate (Start → Confirm) for Test vs Control,
    with significance annotation above bars.
    """
    sub = crates[crates["step"]=="confirm"].copy()
    fig, ax = plt.subplots(figsize=(6,5))
    ax.bar(sub["Variation"], sub["completion_rate"])
    ax.set_ylabel("Completion rate (%)")
    ax.set_title("Global Completion Rate (Start → Confirm)")
    # Add significance marker
    if not z_global.empty and np.isfinite(z_global["p_value"].iloc[0]):
        sig = z_global["sig"].iloc[0]
        y = sub["completion_rate"].max()
        ax.text(0.5, y + 1.5, sig, ha="center", va="bottom", fontsize=12, fontweight="bold")
    # Add labels on bars
    for i, row in sub.iterrows():
        ax.text(i, row["completion_rate"] + 0.8,
                f"{row['completion_rate']:.1f}%", ha="center", va="bottom")
    plt.tight_layout()
    plt.show()
# Plot global completion Test vs Control
plot_global_with_sig(completion_rates, z_global)

In [None]:
def plot_step_by_age_with_sig(cby_age, z_by_age, step):
    """
    Plot completion rates for a single step,
    broken down by age band and group (Test vs Control),
    with significance markers for each age band.
    """
    sub = cby_age[cby_age["step"]==step].copy()
    age_order = sorted(sub["age_band"].dropna().unique(), key=lambda x: str(x))
    piv = sub.pivot(index="age_band", columns="Variation", values="completion_rate").reindex(age_order)
    x = np.arange(len(age_order))
    width = 0.38
    fig, ax = plt.subplots(figsize=(12,6))
    ax.bar(x - width/2, piv["Test"], width, label="Test", color="orange")
    ax.bar(x + width/2, piv["Control"], width, label="Control", color="steelblue")
    # Add significance markers
    zb = z_by_age[z_by_age["step"]==step].copy()
    star_map = dict(zip(zb["age_band"].astype(str), zb["sig"]))
    for i, band in enumerate(age_order):
        sig = star_map.get(str(band), "")
        y = np.nanmax([piv.loc[band,"Test"], piv.loc[band,"Control"]])
        ax.text(i, y + 1.5, sig, ha="center", va="bottom", fontsize=11, fontweight="bold")
    ax.set_xticks(x)
    ax.set_xticklabels(age_order, rotation=45)
    ax.set_ylabel("Completion rate (%)")
    ax.set_title(f"Completion Rate by Age Band — {step} (α=0.05)")
    ax.legend()
    plt.tight_layout()
    plt.show()
# Generate a plot per step for all age bands
for step in STEP_ORDER:
    plot_step_by_age_with_sig(completion_by_age, z_by_age, step)

In [None]:
#  Z-tests by Age Band (Step-by-Step)
# ==========================================================
# This block compares Test vs Control completion rates
# for each process step, broken down by age_band.
# It uses two-proportion z-tests (α = 0.05).
# Then it formats the numeric output for better readability.
def ztests_stepwise_by_age(completion_by_age):
    """
    Run two-proportion Z-tests for each (age_band, step),
    comparing Test vs Control groups.
    """
    test = completion_by_age.query("Variation == 'Test'")[["age_band","step","n_completed","n_start"]]
    ctrl = completion_by_age.query("Variation == 'Control'")[["age_band","step","n_completed","n_start"]]
    merged = test.merge(ctrl, on=["age_band","step"], suffixes=("_test","_ctrl"), how="inner")
    out = []
    for _, r in merged.iterrows():
        x1, n1 = int(r["n_completed_test"]), int(r["n_start_test"])
        x2, n2 = int(r["n_completed_ctrl"]), int(r["n_start_ctrl"])
        z, p = two_prop_ztest(x1, n1, x2, n2)
        out.append({
            "age_band": r["age_band"],
            "step": r["step"],
            "p_test": x1/n1 if n1 else np.nan,
            "p_ctrl": x2/n2 if n2 else np.nan,
            "z_stat": z,
            "p_value": p,
            "sig": p_to_star_0_05(p)
        })
    return pd.DataFrame(out)
# Run the stepwise Z-tests per age band
z_by_age = ztests_stepwise_by_age(completion_by_age)
# ==========================================================
# Format numeric columns for clearer visualization
# ==========================================================
z_by_age_fmt = z_by_age.copy()
# Format proportions to 2 decimal places
z_by_age_fmt["p_test"] = z_by_age_fmt["p_test"].map(lambda x: f"{x:.2f}")
z_by_age_fmt["p_ctrl"] = z_by_age_fmt["p_ctrl"].map(lambda x: f"{x:.2f}")
# Format z-statistics to 3 decimal places
z_by_age_fmt["z_stat"] = z_by_age_fmt["z_stat"].map(lambda x: f"{x:.3f}")
# Format p-values: scientific notation if <0.001, else 3 decimal places
z_by_age_fmt["p_value"] = z_by_age_fmt["p_value"].apply(
    lambda x: f"{x:.3e}" if x < 0.001 else f"{x:.3f}"
)
print("=== Stepwise Z-tests by age band (α=0.05, formatted) ===")
display(z_by_age_fmt.head(20))