In [None]:
# Bootstrap (CI-safe)
# tags: [bootstrap]
import os, random, numpy as np
import ts2eg

SEED = 0
random.seed(SEED)
np.random.seed(SEED)

# Downshift knob: set TS2EG_CI=1 in CI to keep runtime small.
TS2EG_CI = os.getenv("TS2EG_CI") == "1"


# EGT demo: from multivariate time series to ESS

This notebook shows an end-to-end run:
1. Load an N×T multivariate series (players × time).
2. Build a **nonnegative strategy basis** $S$ via a simple NMF.
3. Estimate a $k\times k$ payoff operator $A$ with `estimate_A_from_series`.
4. Search for Nash/ESS with `find_ESS`.

**How to use:**
- If you have a CSV, set `CSV_PATH` below. CSV should be wide format (columns = players).
- Otherwise this will generate a structured synthetic dataset (common mode + contrasts).


In [None]:
# --- Canonical imports (ts2eg only) ---
import ts2eg as gm
from ts2eg.core import nmf_on_X, growth_payoffs, estimate_A_from_series, find_ESS
try:
    from ts2eg import extensions as ext
except Exception:
    ext = None  # optional


In [None]:
# --- Parameters ---
CSV_PATH = ''  # e.g., '/path/to/your_multivariate_timeseries.csv'
K = 3          # number of strategies
RIDGE = 1e-3   # ridge for A-estimation
TOL = 1e-8     # ESS tolerance
MAX_SUPPORT = K


In [None]:
if CSV_PATH and os.path.exists(CSV_PATH):
    df = pd.read_csv(CSV_PATH)
    # Keep numeric columns only
    df = df.select_dtypes(include=['number']).dropna()
    X = df.values.T  # N x T
    print(f'Loaded real dataset with shape N x T = {X.shape}')
else:
    # Synthetic fallback (structured): common mode + orthogonal contrasts + noise
    rng = np.random.default_rng(0)
    N, T = 4, 400
    t = np.arange(T)
    common = 0.5 * np.sin(2*np.pi*t/50)
    contrasts = np.vstack([
        0.8*np.sin(2*np.pi*(t+10)/30),
        0.5*np.cos(2*np.pi*(t+5)/40),
        -0.3*np.sin(2*np.pi*(t+3)/60),
        0.2*np.cos(2*np.pi*(t+7)/25),
    ])
    X = (common + contrasts + 0.2*rng.standard_normal((N, T)))
    print(f'No CSV found. Using synthetic structured data N x T = {X.shape}')

# Standardize per player (zero mean, unit variance)
X = (X - X.mean(axis=1, keepdims=True)) / (X.std(axis=1, keepdims=True) + 1e-12)
N, T = X.shape
print('N, T =', N, T)

## Nonnegative strategy basis via simple NMF
We shift/scale to nonnegative and use multiplicative updates to get $S\in\mathbb{R}_{\ge 0}^{N\times K}$.

In [None]:
def nmf_multiplicative(V, r=3, iters=500, seed=0):
    rng = np.random.default_rng(seed)
    # V ~ W H, with W: N x r, H: r x T
    N, T = V.shape
    W = np.maximum(rng.random((N, r)), 1e-6)
    H = np.maximum(rng.random((r, T)), 1e-6)
    for _ in range(iters):
        # Update H
        WH = W @ H + 1e-12
        H *= (W.T @ (V / WH)) / (W.T @ np.ones_like(V) + 1e-12)
        # Update W
        WH = W @ H + 1e-12
        W *= ((V / WH) @ H.T) / (np.ones_like(V) @ H.T + 1e-12)
        # Avoid zeros
        W = np.maximum(W, 1e-12)
        H = np.maximum(H, 1e-12)
    return W, H

# Shift and scale X to [0,1] per player for NMF
X_min = X.min(axis=1, keepdims=True)
X_rng = X.max(axis=1, keepdims=True) - X_min + 1e-9
X_pos = (X - X_min) / X_rng

S, H = nmf_multiplicative(X_pos, r=K, iters=300, seed=0)
print('S shape:', S.shape, 'H shape:', H.shape)
# Normalize columns of S (unit norm)
S = S / (np.linalg.norm(S, axis=0, keepdims=True) + 1e-12)

## Estimate A and search ESS
We use the series itself as a placeholder payoff field `v` for demonstration. In your project, set `v`
to the per-player payoff series from Option A (profile-conditioned means) or Option B (VAR gains).

In [None]:
# Ensure strategy basis S exists before estimation (and ensure X first)
import numpy as _np
if 'X' not in globals():
    if 'v_growth' in globals():
        X = _np.asarray(v_growth, dtype=float)
    elif 'counts' in globals():
        X = _np.asarray(counts, dtype=float)
    else:
        raise NameError("X is undefined; expected v_growth or counts earlier in the notebook.")
try:
    S  # noqa: F821
except NameError:
    try:
        K = int(globals().get('K', 3))
    except Exception:
        K = 3
    S, H = nmf_on_X(X, k=K, iters=50, seed=1, normalize='l2')


In [None]:
# Ensure strategy basis S exists before estimation
try:
    _ = S  # noqa: F821
except NameError:
    try:
        K = int(globals().get('K', 3))
    except Exception:
        K = 3
    S, H = nmf_on_X(X, k=K, iters=50, seed=1, normalize='l2')


In [None]:
v = X.copy()  # Placeholder: demo uses the standardized series as payoff field
est = estimate_A_from_series(S, X, v, k=K, ridge=RIDGE)
A = est['A']
print('A (k x k):\n', np.round(A, 4))
print('R^2 fit to centered strategy signals:', round(est['R2'], 3))

ess_list = find_ESS(A, tol=TOL, max_support=MAX_SUPPORT)
n_nash = sum(1 for r in ess_list if r['is_nash'])
n_ess = sum(1 for r in ess_list if r['is_ess'])
print('Nash candidates:', n_nash, ' ESS found:', n_ess)
for r in ess_list:
    if r['is_ess']:
        print('ESS support:', r['support'], ' x* =', np.round(r['x'], 4))

## Plot inferred strategy memberships $x(t)$

In [None]:
Xk = est['Xk']  # k x T
plt.figure(figsize=(8, 3))
for i in range(K):
    plt.plot(Xk[i], label=f'x_{i+1}')
plt.title('Strategy memberships over time')
plt.xlabel('time')
plt.ylabel('membership')
plt.legend()
plt.show()

## Save artifacts
Saves `A.csv` and `Xk.csv` in the working directory for inspection.

In [None]:
import pandas as pd
pd.DataFrame(A).to_csv('A.csv', index=False)
pd.DataFrame(Xk).to_csv('Xk.csv', index=False)
print('Wrote A.csv and Xk.csv')