In [31]:
import requests
from typing import Dict, Any, Optional

BASE_URL = "https://berghain.challenges.listenlabs.ai/"  # ← replace with the real host
PLAYER_ID = "5d7389b8-683e-453b-a35f-6d21b1b5e94e"
SCENARIO = 2  # choose 1, 2, or 3

def create_new_game(scenario: int, player_id: str) -> Dict[str, Any]:
    url = f"{BASE_URL}/new-game"
    res = requests.get(url, params={"scenario": scenario, "playerId": player_id}, timeout=30)
    res.raise_for_status()
    data = res.json()
    print("New game:", data["gameId"])
    return data

In [32]:
new_game = create_new_game(SCENARIO, PLAYER_ID)
game_id = new_game["gameId"]
constraints = new_game.get("constraints", [])
stats = new_game.get("attributeStatistics", {})
print(constraints)
print(stats)

New game: f7f1690a-b41c-41a1-a8e8-589607b99340
[{'attribute': 'techno_lover', 'minCount': 650}, {'attribute': 'well_connected', 'minCount': 450}, {'attribute': 'creative', 'minCount': 300}, {'attribute': 'berlin_local', 'minCount': 750}]
{'relativeFrequencies': {'techno_lover': 0.6265000000000001, 'well_connected': 0.4700000000000001, 'creative': 0.06227, 'berlin_local': 0.398}, 'correlations': {'techno_lover': {'techno_lover': 1, 'well_connected': -0.4696169332674324, 'creative': 0.09463317039891586, 'berlin_local': -0.6549403815606182}, 'well_connected': {'techno_lover': -0.4696169332674324, 'well_connected': 1, 'creative': 0.14197259140471485, 'berlin_local': 0.5724067808436452}, 'creative': {'techno_lover': 0.09463317039891586, 'well_connected': 0.14197259140471485, 'creative': 1, 'berlin_local': 0.14446459505650772}, 'berlin_local': {'techno_lover': -0.6549403815606182, 'well_connected': 0.5724067808436452, 'creative': 0.14446459505650772, 'berlin_local': 1}}}


In [33]:
from dataclasses import dataclass
import math
import numpy as np
from typing import Optional, Tuple, Callable, Dict


# ----------------------------- Utilities ---------------------------------- #

def _ceil_int(x: float) -> int:
    return int(math.ceil(x))


def _clamp(x: float, lo: float, hi: float) -> float:
    return max(lo, min(hi, x))


def _norm_ppf(p: np.ndarray) -> np.ndarray:
    """Fast inverse CDF for standard normal (Acklam's approximation).

    Works for scalar or array-like `p` in (0, 1). Values are clipped to
    [1e-12, 1-1e-12] for numerical stability.
    """
    # Coefficients in rational approximations
    a = np.array([
        -3.969683028665376e+01,
         2.209460984245205e+02,
        -2.759285104469687e+02,
         1.383577518672690e+02,
        -3.066479806614716e+01,
         2.506628277459239e+00,
    ])
    b = np.array([
        -5.447609879822406e+01,
         1.615858368580409e+02,
        -1.556989798598866e+02,
         6.680131188771972e+01,
        -1.328068155288572e+01,
    ])
    c = np.array([
        -7.784894002430293e-03,
        -3.223964580411365e-01,
        -2.400758277161838e+00,
        -2.549732539343734e+00,
         4.374664141464968e+00,
         2.938163982698783e+00,
    ])
    d = np.array([
         7.784695709041462e-03,
         3.224671290700398e-01,
         2.445134137142996e+00,
         3.754408661907416e+00,
    ])

    p = np.asarray(p, dtype=float)
    p = np.clip(p, 1e-12, 1 - 1e-12)

    # Define break-points
    plow = 0.02425
    phigh = 1 - plow

    z = np.empty_like(p)

    # lower region
    mask = p < plow
    if np.any(mask):
        q = np.sqrt(-2 * np.log(p[mask]))
        z[mask] = (((((c[0]*q + c[1])*q + c[2])*q + c[3])*q + c[4])*q + c[5]) / \
                  ((((d[0]*q + d[1])*q + d[2])*q + d[3])*q + 1)
        z[mask] = -z[mask]

    # upper region
    mask = p > phigh
    if np.any(mask):
        q = np.sqrt(-2 * np.log(1 - p[mask]))
        z[mask] = (((((c[0]*q + c[1])*q + c[2])*q + c[3])*q + c[4])*q + c[5]) / \
                  ((((d[0]*q + d[1])*q + d[2])*q + d[3])*q + 1)

    # central region
    mask = (p >= plow) & (p <= phigh)
    if np.any(mask):
        q = p[mask] - 0.5
        r = q*q
        z[mask] = (((((a[0]*r + a[1])*r + a[2])*r + a[3])*r + a[4])*r + a[5]) * q / \
                  (((((b[0]*r + b[1])*r + b[2])*r + b[3])*r + b[4])*r + 1)

    return z


def _nearest_psd(corr: np.ndarray, eps: float = 1e-9) -> np.ndarray:
    """Project a symmetric matrix to the nearest positive semidefinite matrix
    by clipping negative eigenvalues, then renormalize diagonal to 1.
    """
    B = (corr + corr.T) / 2
    vals, vecs = np.linalg.eigh(B)
    vals = np.maximum(vals, eps)
    B_psd = (vecs * vals) @ vecs.T
    # Normalize to correlation (diag = 1)
    d = np.sqrt(np.clip(np.diag(B_psd), eps, None))
    B_corr = B_psd / d[:, None] / d[None, :]
    return (B_corr + B_corr.T) / 2


def build_correlation_from_cov(p: np.ndarray, cov: np.ndarray) -> np.ndarray:
    """Convert Bernoulli covariance to correlation matrix, then project to PSD.

    cov_ij = Corr_ij * sqrt(p_i(1-p_i) p_j(1-p_j))
    => Corr_ij = cov_ij / sqrt(var_i var_j)
    """
    p = np.asarray(p, dtype=float)
    var = p * (1 - p)
    denom = np.sqrt(np.maximum(var, 1e-12))
    corr = cov / (denom[:, None] * denom[None, :])
    # Clamp numerical range and set diag to 1
    np.fill_diagonal(corr, 1.0)
    corr = np.clip(corr, -0.999, 0.999)
    return _nearest_psd(corr)


class ArrivalSampler:
    """Sampler for future arrivals.

    If `corr` is provided, use a Gaussian copula with the given correlation
    matrix to couple attributes; otherwise sample attributes independently.
    """
    def __init__(self, p: np.ndarray, corr: Optional[np.ndarray] = None, seed: Optional[int] = None):
        self.p = np.asarray(p, dtype=float)
        self.m = self.p.size
        self.rng = np.random.default_rng(seed)
        self.use_copula = corr is not None
        if self.use_copula:
            corr = np.asarray(corr, dtype=float)
            # Project to PSD & ensure correlation-like properties
            self.corr = _nearest_psd(corr)
            self.thresholds = _norm_ppf(self.p)
            # Cholesky may still fail if near-singular; fall back to eigh
            try:
                self.L = np.linalg.cholesky(self.corr)
                self._mvnorm = self._mvnorm_chol
            except np.linalg.LinAlgError:
                vals, vecs = np.linalg.eigh(self.corr)
                vals = np.clip(vals, 0.0, None)
                self.L_ev = vecs @ np.diag(np.sqrt(vals))
                self._mvnorm = self._mvnorm_eig
        else:
            self.corr = None

    def _mvnorm_chol(self, n: int = 1) -> np.ndarray:
        z = self.rng.standard_normal(size=(n, self.m))
        return z @ self.L.T

    def _mvnorm_eig(self, n: int = 1) -> np.ndarray:
        z = self.rng.standard_normal(size=(n, self.m))
        return z @ self.L_ev.T

    def sample(self, n: int = 1) -> np.ndarray:
        if not self.use_copula:
            # independent Bernoulli
            u = self.rng.random(size=(n, self.m))
            return (u < self.p).astype(np.int8)
        else:
            z = self._mvnorm(n)
            # Dichotomize at per-dimension thresholds to get Bernoulli
            return (z <= self.thresholds).astype(np.int8)


# --------------------------- Solver classes -------------------------------- #

@dataclass
class SolverConfig:
    alphas: np.ndarray            # shape (m,)
    p: np.ndarray                 # shape (m,), P(x_k=1)
    N: int = 1000                 # total acceptances target
    reject_budget: int = 20000    # max rejections allowed (stopping condition in sim)
    # If you pass `cov` we convert it to correlation; if you have `corr` already
    # you can pass it directly and leave cov=None.
    cov: Optional[np.ndarray] = None    # shape (m,m) Bernoulli covariance
    corr: Optional[np.ndarray] = None   # shape (m,m) Pearson correlation
    risk_tolerance: float = 0.01        # max failure prob in rollout (chance constraint)
    sims: int = 0                       # 0 => deterministic-only; >0 enables rollout lookahead
    seed: Optional[int] = None


class OnlineQuotaSolver:
    def __init__(self, cfg: SolverConfig):
        self.cfg = cfg
        self.m = int(cfg.alphas.size)
        self.alphas = np.asarray(cfg.alphas, dtype=float)
        assert np.all((self.alphas >= 0) & (self.alphas <= 1)), "alphas must be in [0,1]"
        self.p = np.asarray(cfg.p, dtype=float)
        assert self.p.size == self.m, "p and alphas must have same length"
        assert np.all((self.p > 0) & (self.p < 1)), "p must be strictly between 0 and 1 for all dims"
        if cfg.corr is not None:
            corr = np.asarray(cfg.corr, dtype=float)
        elif cfg.cov is not None:
            corr = build_correlation_from_cov(self.p, np.asarray(cfg.cov, dtype=float))
        else:
            corr = None
        self.sampler = ArrivalSampler(self.p, corr=corr, seed=cfg.seed)
        # Targets
        self.N = int(cfg.N)
        self.reject_budget = int(cfg.reject_budget)
        self.t = np.array([_ceil_int(a * self.N) for a in self.alphas], dtype=int)
        self.reset()

    # ----------------------------- State ---------------------------------- #
    def reset(self):
        self.a = 0                        # accepted so far
        self.r = 0                        # rejected so far
        self.c = np.zeros(self.m, dtype=int)  # counts of ones among accepted

    @property
    def remaining_slots(self) -> int:
        return self.N - self.a

    @property
    def remaining_required(self) -> np.ndarray:
        # Remaining required ones to hit targets, lower-bounded by 0
        return np.maximum(self.t - self.c, 0)

    # --------------------------- Core logic -------------------------------- #
    def _feasible_if_accept(self, x: np.ndarray) -> Tuple[bool, np.ndarray]:
        """Check the deterministic necessary feasibility after accepting x.
        Returns (feasible, per-attribute slack_after_accept), where slack is
        (c + x + r' - t). Negative slack for any k means infeasible.
        """
        x = np.asarray(x, dtype=int)
        r_after = self.remaining_slots - 1
        slack = self.c + x + r_after - self.t
        feasible = bool(np.all(slack >= 0))
        return feasible, slack

    def _greedy_safe_accept(self, x: np.ndarray) -> bool:
        feasible, _ = self._feasible_if_accept(x)
        return feasible

    def decide(self, x: np.ndarray) -> Tuple[bool, Dict]:
        """Return (accept_bool, info_dict) for the current candidate x.

        If cfg.sims == 0: purely deterministic — accept iff accepting is
        deterministically feasible. This is *very* fast and already performs
        well when you have a large rejection budget.

        If cfg.sims > 0: run a one-step stochastic rollout to choose the
        branch (accept vs reject) with lower expected rejections subject to a
        max failure probability (chance constraint).
        """
        x = np.asarray(x, dtype=int)
        assert x.size == self.m and np.all((x == 0) | (x == 1)), "x must be a 0/1 vector of length m"

        # Fast guard: if no slots left, must reject
        if self.remaining_slots <= 0:
            return False, {"reason": "no_slots_left"}

        # Deterministic necessary condition
        feasible, slack = self._feasible_if_accept(x)
        if not feasible and self.cfg.sims == 0:
            return False, {"reason": "infeasible_if_accept", "slack": slack}
        if feasible and self.cfg.sims == 0:
            return True, {"reason": "deterministic_feasible", "slack": slack}

        # If using rollout, evaluate both branches
        # Early pruning: if infeasible to accept, only 'reject' branch is valid
        branches = ["accept", "reject"] if feasible else ["reject"]
        best_choice = None
        best_metric = None
        details = {}

        for br in branches:
            ok, success_rate, exp_rej = self._rollout_metric(x, branch=br, sims=self.cfg.sims)
            details[br] = {"success_rate": success_rate, "exp_future_rejections": exp_rej}
            if not ok:
                continue
            metric = exp_rej  # we minimize expected future rejections
            if best_metric is None or metric < best_metric:
                best_metric = metric
                best_choice = br

        if best_choice is None:
            # Neither branch met risk tolerance; fall back to deterministic guard
            return (feasible, {"reason": "fallback_deterministic", "slack": slack, "rollout": details})

        return (best_choice == "accept", {"reason": "rollout_choice", "rollout": details})

    def update(self, x: np.ndarray, accept: bool):
        x = np.asarray(x, dtype=int)
        if accept:
            assert self._feasible_if_accept(x)[0], "update called with infeasible accept; guard with decide() first"
            self.a += 1
            self.c += x
        else:
            self.r += 1

    # ---------------------------- Rollout ---------------------------------- #
    def _clone_state(self):
        return self.a, self.r, self.c.copy()

    def _restore_state(self, snapshot):
        self.a, self.r, self.c = snapshot[0], snapshot[1], snapshot[2]

    def _rollout_metric(self, x: np.ndarray, branch: str, sims: int) -> Tuple[bool, float, float]:
        """Simulate the remainder under the greedy-safe policy starting with
        either accepting or rejecting the current x. Returns:
            (ok, success_rate, expected_future_rejections)
        where `ok` is True if success_rate >= 1 - risk_tolerance.
        """
        assert branch in ("accept", "reject")
        successes = 0
        rejections_list = []
        snapshot = self._clone_state()

        for _ in range(sims):
            self._restore_state(snapshot)
            # Apply the branch decision
            if branch == "accept":
                if not self._feasible_if_accept(x)[0]:
                    # Immediate failure for this path
                    success = False
                    future_rej = 0
                    rejections_list.append(future_rej)
                    continue
                self.a += 1
                self.c += x
            else:
                self.r += 1

            # Simulate until we hit N or the reject budget (only matters if you
            # want to guard against exhausting allowed rejections)
            while self.a < self.N and self.r < self.reject_budget:
                y = self.sampler.sample(1)[0]
                if self._greedy_safe_accept(y):
                    self.a += 1
                    self.c += y
                else:
                    self.r += 1

            success = (self.a >= self.N) and np.all(self.c >= self.t)
            future_rej = max(0, self.r - snapshot[1] - (1 if branch == "reject" else 0))
            rejections_list.append(future_rej)
            if success:
                successes += 1

        self._restore_state(snapshot)
        success_rate = successes / sims if sims > 0 else 1.0
        exp_rej = float(np.mean(rejections_list)) if sims > 0 else 0.0
        ok = (success_rate >= 1 - self.cfg.risk_tolerance)
        return ok, success_rate, exp_rej

In [34]:
# Create the sampling-based solver
constraints_data = [
    {'attribute': 'techno_lover', 'minCount': 650}, 
    {'attribute': 'well_connected', 'minCount': 450}, 
    {'attribute': 'creative', 'minCount': 300}, 
    {'attribute': 'berlin_local', 'minCount': 750}
]

population_data = {
    'relativeFrequencies': {
        'techno_lover': 0.6265000000000001, 
        'well_connected': 0.4700000000000001, 
        'creative': 0.06227, 
        'berlin_local': 0.398
    }, 
    'correlations': {
        'techno_lover': {'techno_lover': 1, 'well_connected': -0.4696169332674324, 'creative': 0.09463317039891586, 'berlin_local': -0.6549403815606182}, 
        'well_connected': {'techno_lover': -0.4696169332674324, 'well_connected': 1, 'creative': 0.14197259140471485, 'berlin_local': 0.5724067808436452}, 
        'creative': {'techno_lover': 0.09463317039891586, 'well_connected': 0.14197259140471485, 'creative': 1, 'berlin_local': 0.14446459505650772}, 
        'berlin_local': {'techno_lover': -0.6549403815606182, 'well_connected': 0.5724067808436452, 'creative': 0.14446459505650772, 'berlin_local': 1}
    }
}

# Map attribute names to indices
attribute_names = ['techno_lover', 'well_connected', 'creative', 'berlin_local']
attr_name_to_idx = {name: idx for idx, name in enumerate(attribute_names)}

# Create population sampler
attribute_probs = np.array([
    population_data['relativeFrequencies']['techno_lover'],
    population_data['relativeFrequencies']['well_connected'],
    population_data['relativeFrequencies']['creative'],
    population_data['relativeFrequencies']['berlin_local']
])

# Build covariance matrix from correlations
correlations = population_data['correlations']
covariance_matrix = np.zeros((4, 4))
for i, attr1 in enumerate(attribute_names):
    for j, attr2 in enumerate(attribute_names):
        correlation = correlations[attr1][attr2]
        # Convert correlation to covariance using attribute probabilities
        std_i = np.sqrt(attribute_probs[i] * (1 - attribute_probs[i]))
        std_j = np.sqrt(attribute_probs[j] * (1 - attribute_probs[j]))
        covariance_matrix[i, j] = correlation * std_i * std_j


# Create constraints vector
t = np.array([
    constraints_data[0]['minCount']/1000,  # techno_lover: 650
    constraints_data[1]['minCount']/1000,  # well_connected: 450
    constraints_data[2]['minCount']/1000,  # creative: 300
    constraints_data[3]['minCount']/1000   # berlin_local: 750
])

# Create solver configuration
cfg = SolverConfig(
    p=attribute_probs,
    alphas=t,                 # Constraint thresholds
    risk_tolerance=0.05,  # 5% risk tolerance
    sims=100,  # Number of simulations per decision
    reject_budget=20000, # Maximum rejections allowed
    N=1000,              # Target number to accept
    cov=covariance_matrix,
)

# Create the sampling solver
solver = OnlineQuotaSolver(cfg=cfg)

# Wrapper to convert API format to solver format
class SolverWrapper:
    def __init__(self, solver, attribute_names):
        self.solver = solver
        self.attribute_names = attribute_names
    
    def decide(self, attrs_dict):
        # Convert API format to solver format
        x = np.array([attrs_dict.get(name, False) for name in self.attribute_names], dtype=int)
        accept, _ = self.solver.decide(x)
        self.solver.update(x, accept)
        return accept

policy = SolverWrapper(solver, attribute_names)


In [35]:
def decide_and_next(game_id: str, person_index: int, accept: Optional[bool] = None) -> Dict[str, Any]:
    url = f"{BASE_URL}/decide-and-next"
    params = {"gameId": game_id, "personIndex": person_index}
    if accept is not None:
        params["accept"] = str(accept).lower()  # true/false in query string
    res = requests.get(url, params=params, timeout=30)
    res.raise_for_status()
    return res.json()

In [36]:
state = decide_and_next(game_id, person_index=0)
if state.get("status") == "failed":
    raise RuntimeError(f"Game failed: {state.get('reason')}")
print(state)

{'status': 'running', 'admittedCount': 0, 'rejectedCount': 0, 'nextPerson': {'personIndex': 0, 'attributes': {'techno_lover': True, 'well_connected': False, 'creative': False, 'berlin_local': False}}}


In [37]:
admitted = state.get("admittedCount", 0)
rejected = state.get("rejectedCount", 0)
next_person = state.get("nextPerson")

In [38]:
while state.get("status") == "running" and next_person:
    idx = next_person["personIndex"]
    attrs = next_person["attributes"]  # { attributeId: bool }
    accept = policy.decide(attrs)
    state = decide_and_next(game_id, person_index=idx, accept=accept)
    status = state.get("status")
    if status == "failed":
        raise RuntimeError(f"Game failed: {state.get('reason')}")

    admitted = state.get("admittedCount", admitted)
    rejected = state.get("rejectedCount", rejected)
    next_person = state.get("nextPerson")

    print(f"Person {idx}: accept={accept} | admitted={admitted} rejected={rejected}")

    # 5) done
if state.get("status") == "completed":
    print("Game completed.")
    print("Final rejected count:", state.get("rejectedCount"))
else:
    print("Game ended with state:", state)

Person 0: accept=False | admitted=0 rejected=1
Person 1: accept=True | admitted=1 rejected=1
Person 2: accept=True | admitted=2 rejected=1
Person 3: accept=True | admitted=3 rejected=1
Person 4: accept=True | admitted=4 rejected=1
Person 5: accept=True | admitted=5 rejected=1
Person 6: accept=True | admitted=6 rejected=1
Person 7: accept=True | admitted=7 rejected=1
Person 8: accept=True | admitted=8 rejected=1
Person 9: accept=True | admitted=9 rejected=1
Person 10: accept=True | admitted=10 rejected=1
Person 11: accept=True | admitted=11 rejected=1
Person 12: accept=True | admitted=12 rejected=1
Person 13: accept=True | admitted=13 rejected=1
Person 14: accept=True | admitted=14 rejected=1
Person 15: accept=True | admitted=15 rejected=1
Person 16: accept=True | admitted=16 rejected=1
Person 17: accept=True | admitted=17 rejected=1
Person 18: accept=True | admitted=18 rejected=1
Person 19: accept=True | admitted=19 rejected=1
Person 20: accept=True | admitted=20 rejected=1
Person 21: 

RuntimeError: Game failed: Venue full but constraints not met: techno_lover: 633/650 (need 17 more), creative: 75/300 (need 225 more), berlin_local: 406/750 (need 344 more)