# BBO Weekly Replay Notebook

This notebook **replays your optimisation week-by-week**.

For each week *t*, it:
1. Fits a GP surrogate for each function using data up to week *t*
2. Proposes a candidate query for week *t+1* using EI/UCB
3. Reports the **predicted mean/std** at that proposed point
4. Also generates a **heuristic** trust-region / rollback / freeze suggestion

It outputs a tidy table you can export to CSV and use in your report.

## Notes
- This is meant as a *transparent reconstruction tool*.
- Your actual weekly decisions in chat sometimes used heuristics, so comparing GP vs heuristic is useful.


In [None]:
import re
import numpy as np
import pandas as pd

from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import Matern, WhiteKernel, ConstantKernel

np.set_printoptions(suppress=True, precision=6)


## 1) Load & parse history

We support two formats:
1. Explicit labels: `F3: 0.930000-0.620000-0.880000`
2. Plain vectors: 8 lines per week, in order F1..F8, repeating.

Outputs are assumed scalar; if a vector appears, we take the first value.


In [None]:
FUNC_NAMES = [f"F{i}" for i in range(1, 9)]
DIMS = {"F1":2,"F2":2,"F3":3,"F4":4,"F5":4,"F6":5,"F7":6,"F8":8}

vec_pat = re.compile(r"^[0-9]\.[0-9]{6}(?:-[0-9]\.[0-9]{6})+$")
fx_pat  = re.compile(r"\b(F[1-8])\b\s*[:=]?\s*([0-9]\.[0-9]{6}(?:-[0-9]\.[0-9]{6}){1,7})")

def parse_vector(s: str) -> np.ndarray:
    parts = [p for p in s.strip().split('-') if p]
    return np.array([float(p) for p in parts], dtype=float)

def parse_file_to_sequence(text: str):
    """Return list of (fx, vector_string) in chronological order."""
    matches = list(fx_pat.finditer(text))
    if matches:
        return [(m.group(1), m.group(2)) for m in matches]

    # fallback to plain vectors: assume order is F1..F8 repeating
    lines = [ln.strip() for ln in text.splitlines() if ln.strip()]
    vec_lines = [ln for ln in lines if vec_pat.match(ln)]
    seq = []
    for i, ln in enumerate(vec_lines):
        fx = FUNC_NAMES[i % 8]
        seq.append((fx, ln))
    return seq

def load_weekly_history(inputs_path: str, outputs_path: str):
    with open(inputs_path, "r", encoding="utf-8") as f:
        inp_text = f.read()
    with open(outputs_path, "r", encoding="utf-8") as f:
        out_text = f.read()

    inp_seq = parse_file_to_sequence(inp_text)
    out_seq = parse_file_to_sequence(out_text)

    # align length (pairs per function evaluation)
    n = min(len(inp_seq), len(out_seq))
    inp_seq = inp_seq[:n]
    out_seq = out_seq[:n]

    # infer number of weeks: every week has 8 functions
    if n % 8 != 0:
        print(f"Warning: total records {n} not multiple of 8; truncating to full weeks.")
        n = (n // 8) * 8
        inp_seq = inp_seq[:n]
        out_seq = out_seq[:n]

    n_weeks = n // 8

    # Build per-week dict: week -> {fx: (x, y)}
    weeks = []
    idx = 0
    for w in range(n_weeks):
        week_data = {}
        for j in range(8):
            fx_i, x_str = inp_seq[idx]
            fx_o, y_str = out_seq[idx]
            # If labels exist, ensure they match; if not, trust sequence order
            fx = fx_i
            x = parse_vector(x_str)
            y = float(parse_vector(y_str)[0])
            week_data[fx] = (x, y)
            idx += 1
        weeks.append(week_data)
    return weeks

# Example:
# weeks = load_weekly_history("inputs.txt", "outputs.txt")
# print(len(weeks), "weeks loaded")


## 2) GP surrogate + acquisitions (EI/UCB)
We fit a GP on data up to week *t* and propose a point for week *t+1* by maximising EI (or UCB) over random candidates.

To avoid extra dependencies, EI uses an erf-based Normal approximation.


In [None]:
def fit_gp(X: np.ndarray, y: np.ndarray, seed: int = 0) -> GaussianProcessRegressor:
    y_mean = y.mean() if len(y) else 0.0
    y_std = y.std() if len(y) > 1 else 1.0
    y_std = y_std if y_std > 1e-8 else 1.0
    y_norm = (y - y_mean) / y_std

    kernel = ConstantKernel(1.0, (1e-3, 1e3)) * Matern(length_scale=np.ones(X.shape[1]), nu=2.5)              + WhiteKernel(noise_level=1e-5, noise_level_bounds=(1e-8, 1e-1))

    gp = GaussianProcessRegressor(
        kernel=kernel,
        alpha=0.0,
        normalize_y=False,
        n_restarts_optimizer=5,
        random_state=seed
    )
    gp.fit(X, y_norm)
    gp._y_mean = y_mean
    gp._y_std = y_std
    return gp

def gp_predict(gp: GaussianProcessRegressor, Xcand: np.ndarray):
    mu_norm, std_norm = gp.predict(Xcand, return_std=True)
    mu = mu_norm * gp._y_std + gp._y_mean
    std = std_norm * gp._y_std
    return mu, std

def ucb(mu: np.ndarray, std: np.ndarray, beta: float = 2.0):
    return mu + beta * std

def expected_improvement(mu: np.ndarray, std: np.ndarray, best: float, xi: float = 0.01):
    sqrt2 = np.sqrt(2.0)
    Z = np.divide(mu - best - xi, std, out=np.zeros_like(mu), where=std > 1e-12)
    pdf = (1.0 / np.sqrt(2*np.pi)) * np.exp(-0.5 * Z**2)
    cdf = 0.5 * (1.0 + np.erf(Z / sqrt2))
    imp = mu - best - xi
    ei = imp * cdf + std * pdf
    ei[std <= 1e-12] = 0.0
    return ei

def propose_next_gp(X: np.ndarray, y: np.ndarray, acquisition: str = "EI",
                    n_candidates: int = 20000, seed: int = 0, beta: float = 2.0, xi: float = 0.01):
    d = X.shape[1]
    rng = np.random.default_rng(seed)

    if len(y) < max(3, d+1):
        x_next = rng.random(d)
        return x_next, {"reason":"too_few_points_random", "pred_mu": np.nan, "pred_std": np.nan}

    gp = fit_gp(X, y, seed=seed)
    Xcand = rng.random((n_candidates, d))
    mu, std = gp_predict(gp, Xcand)
    best = float(np.max(y))

    if acquisition.upper() == "EI":
        score = expected_improvement(mu, std, best=best, xi=xi)
    else:
        score = ucb(mu, std, beta=beta)

    idx = int(np.argmax(score))
    x_star = Xcand[idx]
    # Predict at the chosen point
    mu_star, std_star = gp_predict(gp, x_star.reshape(1, -1))
    return x_star, {"pred_mu": float(mu_star[0]), "pred_std": float(std_star[0]), "best_so_far": best, "acq": acquisition}


## 3) Heuristic proposer
This is a lightweight approximation of the decision rules we often used:
- If the function is flat: freeze at 0.5
- If last point regressed a lot: rollback to best
- Else: small perturbation around best (trust region)


In [None]:
def propose_next_heuristic(X: np.ndarray, y: np.ndarray, step: float = 0.01, seed: int = 0):
    d = X.shape[1]
    rng = np.random.default_rng(seed)

    if len(y) == 0:
        return rng.random(d), {"reason":"no_data_random"}

    y_range = float(np.max(y) - np.min(y))
    best_idx = int(np.argmax(y))
    x_best = X[best_idx].copy()
    y_best = float(y[best_idx])

    if y_range < 1e-6:
        return np.full(d, 0.5), {"reason":"flat_freeze_center", "best": y_best}

    if len(y) >= 2 and (y_best - float(y[-1])) > 0.25 * max(1e-6, y_range):
        return x_best, {"reason":"rollback_to_best", "best": y_best}

    x = x_best.copy()
    k = 1 if d <= 3 else 2
    dims = rng.choice(d, size=k, replace=False)
    for j in dims:
        x[j] = np.clip(x[j] + rng.choice([-1.0, 1.0]) * step, 0.0, 0.999999)
    return x, {"reason":"trust_region_perturb", "best": y_best}


## 4) Weekly replay table
For each week *t*, we train on weeks `1..t` and propose a point for week `t+1`.

You can choose:
- `METHOD = 'GP'` or `METHOD = 'HEURISTIC'`
- or run both and compare.


In [None]:
def fmt_vec(x: np.ndarray) -> str:
    # Keep values in [0, 0.999999] and print to 6dp
    x = np.clip(x, 0.0, 0.999999)
    return "-".join([f"{v:.6f}" for v in x])

def build_replay_table(weeks, acquisition="EI", n_candidates=20000, beta=2.0, xi=0.01):
    rows = []
    n_weeks = len(weeks)

    # Pre-accumulate data per function
    X_hist = {fx: [] for fx in FUNC_NAMES}
    y_hist = {fx: [] for fx in FUNC_NAMES}

    for t in range(n_weeks):  # week index 0..n_weeks-1
        # add week t observations
        for fx in FUNC_NAMES:
            x, y = weeks[t][fx]
            X_hist[fx].append(x)
            y_hist[fx].append(y)

        # propose for next week (t+1) if exists
        if t == n_weeks - 1:
            break

        for fx in FUNC_NAMES:
            X = np.vstack(X_hist[fx])
            y = np.array(y_hist[fx], dtype=float)

            d = DIMS[fx]
            seed = 1000*t + (hash(fx) % 997)

            # GP propose
            x_gp, info_gp = propose_next_gp(
                X, y,
                acquisition=acquisition,
                n_candidates=n_candidates,
                seed=seed,
                beta=beta,
                xi=xi
            )

            # heuristic propose
            step = 0.005 if d >= 4 else 0.01
            x_h, info_h = propose_next_heuristic(X, y, step=step, seed=seed+7)

            rows.append({
                "train_up_to_week": t+1,
                "predict_for_week": t+2,
                "function": fx,
                "best_so_far": float(np.max(y)),
                "GP_suggestion": fmt_vec(x_gp),
                "GP_pred_mu": info_gp.get("pred_mu", np.nan),
                "GP_pred_std": info_gp.get("pred_std", np.nan),
                "Heuristic_suggestion": fmt_vec(x_h),
                "Heuristic_reason": info_h.get("reason","")
            })

    return pd.DataFrame(rows)

# --- Run ---
INPUTS_PATH = "inputs.txt"
OUTPUTS_PATH = "outputs.txt"
weeks = load_weekly_history(INPUTS_PATH, OUTPUTS_PATH)

replay_df = build_replay_table(weeks, acquisition="EI", n_candidates=25000, xi=0.01)
replay_df.head(16)


## 5) Compare suggestions to what you actually submitted
This helps you audit how close the GP/heuristic recommendations were to your real submissions.


In [None]:
def actual_submissions_table(weeks):
    rows = []
    for t, wk in enumerate(weeks, start=1):
        for fx in FUNC_NAMES:
            x, y = wk[fx]
            rows.append({
                "week": t,
                "function": fx,
                "submitted_x": fmt_vec(x),
                "observed_y": float(y)
            })
    return pd.DataFrame(rows)

actual_df = actual_submissions_table(weeks)

# Merge: for week t, compare to model suggestion made after training up to t-1.
# replay_df predicts_for_week = t, so join on that.
merged = actual_df.merge(
    replay_df,
    left_on=["week","function"],
    right_on=["predict_for_week","function"],
    how="left"
)

merged.head(16)


## 6) Export to CSV (optional)
You can export the replay table and merged comparison to CSV.


In [None]:
# replay_df.to_csv("replay_suggestions.csv", index=False)
# merged.to_csv("replay_vs_actual.csv", index=False)

print("Rows in replay_df:", len(replay_df))
print("Rows in merged:", len(merged))
