Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 133 additions & 61 deletions api/app_analytics/experiments.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
GET /api/v1/environments/{env}/experiments/results/?feature=checkout_button
"""

import math
import random
from typing import Any

import numpy as np
from common.environments.permissions import VIEW_ENVIRONMENT
from django.db.models import Count, Q
from drf_spectacular.utils import OpenApiParameter, extend_schema
Expand All @@ -29,7 +30,6 @@
from rest_framework.permissions import IsAuthenticated
from rest_framework.request import Request
from rest_framework.response import Response
from scipy import stats as scipy_stats # type: ignore[import-untyped]

from environments.identities.traits.models import Trait
from environments.models import Environment
Expand Down Expand Up @@ -146,43 +146,34 @@ def _calculate_two_variant_stats(
"sample_size_warning": "One or more variants have zero samples",
}

# Build contingency table
table = np.array(
# Build contingency table, replacing zeros with 0.5 to avoid log(0)
table = [
[
[
results[a]["conversions"],
total_a - results[a]["conversions"],
],
[
results[b]["conversions"],
total_b - results[b]["conversions"],
],
]
)
max(results[a]["conversions"], 0.5),
max(total_a - results[a]["conversions"], 0.5),
],
[
max(results[b]["conversions"], 0.5),
max(total_b - results[b]["conversions"], 0.5),
],
]

# G-test (log-likelihood ratio test)
# Replace zeros with small values to avoid division errors
table_safe = np.where(table == 0, 0.5, table)
try:
_, p_value, _, _ = scipy_stats.chi2_contingency(
table_safe, correction=True, lambda_="log-likelihood"
)
except ValueError:
p_value = 1.0
p_value = _g_test(table)

# Bayesian "Chance to Win" using Monte Carlo simulation
samples = 50000
samples_a = np.random.beta(
results[a]["conversions"] + 1,
total_a - results[a]["conversions"] + 1,
samples,
)
samples_b = np.random.beta(
results[b]["conversions"] + 1,
total_b - results[b]["conversions"] + 1,
samples,
)
chance_a_wins = float((samples_a > samples_b).mean())
n_samples = 10000
a_wins = 0
alpha_a = results[a]["conversions"] + 1
beta_a = total_a - results[a]["conversions"] + 1
alpha_b = results[b]["conversions"] + 1
beta_b = total_b - results[b]["conversions"] + 1
for _ in range(n_samples):
sa = random.betavariate(alpha_a, beta_a)
sb = random.betavariate(alpha_b, beta_b)
if sa > sb:
a_wins += 1
chance_a_wins = a_wins / n_samples

# Determine winner and calculate lift (winner vs loser)
if results[a]["rate"] > results[b]["rate"]:
Expand Down Expand Up @@ -239,41 +230,36 @@ def _calculate_multi_variant_stats(
"sample_size_warning": f"Variant '{v}' has zero samples",
}

# Build contingency table for all variants
table = np.array(
# Build contingency table, replacing zeros with 0.5 to avoid log(0)
table = [
[
[results[v]["conversions"], results[v]["total"] - results[v]["conversions"]]
for v in variants
max(results[v]["conversions"], 0.5),
max(results[v]["total"] - results[v]["conversions"], 0.5),
]
)
for v in variants
]

# G-test
table_safe = np.where(table == 0, 0.5, table)
try:
_, p_value, _, _ = scipy_stats.chi2_contingency(
table_safe, correction=True, lambda_="log-likelihood"
)
except ValueError:
p_value = 1.0
p_value = _g_test(table)

# Bayesian chance to win for each variant
samples = 50000
variant_samples = {}
for v in variants:
variant_samples[v] = np.random.beta(
# Bayesian chance to win for each variant via Monte Carlo
n_samples = 10000
params = [
(
results[v]["conversions"] + 1,
results[v]["total"] - results[v]["conversions"] + 1,
samples,
)

# Calculate chance each variant is the best
chance_to_win = {}
for v in variants:
wins = np.ones(samples, dtype=bool)
for other in variants:
if other != v:
wins &= variant_samples[v] > variant_samples[other]
chance_to_win[v] = round(float(wins.mean()), 3)
for v in variants
]
win_counts = [0] * len(variants)
for _ in range(n_samples):
draws = [random.betavariate(a, b) for a, b in params]
best_idx = max(range(len(draws)), key=lambda i: draws[i])
win_counts[best_idx] += 1

chance_to_win = {
v: round(win_counts[i] / n_samples, 3) for i, v in enumerate(variants)
}

# Find the best performing variant
best_variant = max(variants, key=lambda v: results[v]["rate"])
Expand All @@ -296,6 +282,92 @@ def _calculate_multi_variant_stats(
}


def _g_test(table: list[list[float]]) -> float:
"""
Perform a G-test (log-likelihood ratio test) on a contingency table.

Returns the p-value under the chi-squared distribution.
"""
rows = len(table)
cols = len(table[0])

row_totals = [sum(row) for row in table]
col_totals = [sum(table[r][c] for r in range(rows)) for c in range(cols)]
grand_total = sum(row_totals)

if grand_total == 0: # pragma: no cover
return 1.0

g = 0.0
for r in range(rows):
for c in range(cols):
observed = table[r][c]
expected = row_totals[r] * col_totals[c] / grand_total
if observed > 0 and expected > 0:
g += observed * math.log(observed / expected)
g *= 2

df = (rows - 1) * (cols - 1)
if df == 0: # pragma: no cover
return 1.0

return _chi2_sf(g, df)


def _chi2_sf(x: float, df: int) -> float:
"""Survival function (1 - CDF) for the chi-squared distribution."""
if x <= 0:
return 1.0
return _upper_gamma_reg(df / 2.0, x / 2.0)


def _upper_gamma_reg(a: float, x: float) -> float:
"""Regularised upper incomplete gamma function Q(a, x)."""
if x <= 0: # pragma: no cover
return 1.0
if x < a + 1:
return 1.0 - _lower_gamma_series(a, x)
return _upper_gamma_cf(a, x)


def _lower_gamma_series(a: float, x: float) -> float:
"""Regularised lower incomplete gamma P(a, x) via series expansion."""
ap = a
total = 1.0 / a
term = 1.0 / a
for _ in range(300):
ap += 1
term *= x / ap
total += term
if abs(term) < abs(total) * 1e-15:
break
return total * math.exp(-x + a * math.log(x) - math.lgamma(a))


def _upper_gamma_cf(a: float, x: float) -> float:
"""Regularised upper incomplete gamma Q(a, x) via continued fraction."""
tiny = 1e-30
b = x + 1.0 - a
c = 1.0 / tiny
d = 1.0 / b
h = d
for i in range(1, 300):
an = -i * (i - a)
b += 2.0
d = an * d + b
if abs(d) < tiny: # pragma: no cover
d = tiny
c = b + an / c
if abs(c) < tiny: # pragma: no cover
c = tiny
d = 1.0 / d
delta = d * c
h *= delta
if abs(delta - 1.0) < 1e-15:
break
return h * math.exp(-x + a * math.log(x) - math.lgamma(a))


@extend_schema(
parameters=[
OpenApiParameter(
Expand Down
Loading
Loading