In [None]:
# Cell 1 — Imports + hyperparams (edit here)
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import animation
from matplotlib.colors import ListedColormap


cfg = {
    # Population / tape
    "B": 2048,            # population size
    "K": 8,             # bins per variable
    "Np": 6,             # program cells per bin
    "Vars": 4,           # cart-pole vars
    # EM33
    "T": 24,             # internal CA steps per control step
    # Decode → force
    "force_scale": 9.0,
    "force_bias_b": 3.0,
    "force_noise_std": 0.15,   # N(0, std) added to force
    "u_max": 10.0,
    # Cart-pole physics
    "dt": 0.02,
    "n_substeps": 1,
    "gravity": 9.8,
    "masscart": 1.0,
    "masspole": 0.1,
    "length": 0.5,       # half-length
    "x_threshold": 2.4,
    "theta_threshold_rad": 12 * np.pi / 180.0,
    # GA
    "GA_interval": 10,   # control steps between refills
    "p_rule_mut": 0.02,
    "p_state_mut": 0.005,   # only on P-zone
    "crossover_rate": 0.01,
    # Obs discretization (per var min/max, same K for all)
    "obs_min": np.array([-2.4, -3.0, -0.2095, -3.5], dtype=np.float32),
    "obs_max": np.array([ 2.4,  3.0,  0.2095,  3.5], dtype=np.float32),
    # Init env noise
    "init_state_std": np.array([0.05, 0.05, 0.02, 0.05], dtype=np.float32),
    # Run
    "n_control_steps": 40000,
}

rng = np.random.default_rng()  # true randomness

In [None]:
# Cell 2 — Indices & layout
K, Np, Vars = cfg["K"], cfg["Np"], cfg["Vars"]
Lvar = K * (1 + Np)
N = Vars * Lvar
cfg["N"] = N

region_starts = np.array([v * Lvar for v in range(Vars)], dtype=np.int64)  # (Vars,)
I_idx = (region_starts[:, None] + np.arange(K, dtype=np.int64)[None, :] * (1 + Np))  # (Vars,K)
P_offsets = 1 + np.arange(Np, dtype=np.int64)[None, None, :]                              # (1,1,Np)
P_idx = (region_starts[:, None, None] + np.arange(K, dtype=np.int64)[None, :, None] * (1 + Np) + P_offsets)  # (Vars,K,Np)

L_idx = (I_idx - 1) % N   # (Vars,K)
R_idx = (I_idx + 1) % N   # (Vars,K)

P_mask = np.zeros(N, dtype=bool)  # (N,)
P_mask[P_idx.reshape(-1)] = True

# Flatten helpers
I_flat = I_idx.reshape(-1)         # (Vars*K,)
L_flat = L_idx.reshape(-1)         # (Vars*K,)
R_flat = R_idx.reshape(-1)         # (Vars*K,)

In [None]:
# Cell 3 — Init population
B = cfg["B"]

# Rules R: (B,27), values in {0,1,2}, enforce 000->0 at index 0
R = rng.integers(0, 3, size=(B, 27), dtype=np.int8)
R[:, 0] = 0

# CA state S: (B,N) in {0,1,2}; start mostly zeros, light noise on P-zone
S = np.zeros((B, N), dtype=np.int8)
p_init = 0.02
mut_mask = (rng.random((B, N)) < p_init) & P_mask[None, :]
S[mut_mask] = rng.integers(1, 3, size=mut_mask.sum(), dtype=np.int8)  # avoid 0

# Env E: (B,4)
E = rng.normal(0.0, cfg["init_state_std"], size=(B, 4)).astype(np.float32)

# Alive mask + survival steps
alive = np.ones(B, dtype=bool)     # (B,)
steps_alive = np.zeros(B, dtype=np.int32)

# Accumulators for averages over time × batch
RL_stats = {"sum_L": 0.0, "sum_R": 0.0, "sum_u": 0.0, "n": 0}
best = {"steps": -1, "idx": None, "R": None, "S": None}


In [None]:
# Cell 4 — Helpers: discretize, encode, evolve, decode

def discretize_bins(E, obs_min, obs_max, K):
    z = (E - obs_min[None, :]) / (obs_max - obs_min)[None, :]
    z = np.clip(z, 0.0, 0.999999)
    return (z * K).astype(np.int32)  # (B,4) in [0..K-1]

def em_encode(S, bin_idx, I_idx, alive):
    ai = np.where(alive)[0]
    S[np.ix_(ai, I_flat)] = 0
    for vi in range(4):
        pos = I_idx[vi, bin_idx[ai, vi]]  # (n_alive,)
        S[ai, pos] = 2

def em_evolve(S, R, T):
    for _ in range(T):
        left   = np.roll(S,  1, axis=1)  # (B,N)
        center = S
        right  = np.roll(S, -1, axis=1)
        idx = (left * 9 + center * 3 + right).astype(np.int16)  # (B,N)
        S = R[np.arange(S.shape[0])[:, None], idx]              # (B,N), int8
    return S

def em_decode(S, L_idx, R_idx, cfg, rng):
    A = (S == 2)  # (B,N)
    Lsum = 0
    Rsum = 0
    for vi in range(4):
        Lsum += A[:, L_idx[vi]].sum(axis=1)  # (B,)
        Rsum += A[:, R_idx[vi]].sum(axis=1)
    Lsum = Lsum.astype(np.float32)
    Rsum = Rsum.astype(np.float32)
    u = cfg["force_scale"] * ( (Rsum - Lsum) / (Rsum + Lsum + cfg["force_bias_b"]) )
    u += rng.normal(0.0, cfg["force_noise_std"], size=u.shape)  # robustness
    u = np.clip(u, -cfg["u_max"], cfg["u_max"]).astype(np.float32)  # (B,)
    return u, Lsum, Rsum


In [None]:
# Cell 5 — Cart-pole physics (vectorized)

def cartpole_step(E, u, alive, cfg):
    g  = cfg["gravity"]
    mc = cfg["masscart"]
    mp = cfg["masspole"]
    l  = cfg["length"]
    dt = cfg["dt"]
    ns = cfg["n_substeps"]

    x, x_dot, th, th_dot = E[:,0], E[:,1], E[:,2], E[:,3]

    for _ in range(ns):
        force = u
        costh = np.cos(th)
        sinth = np.sin(th)
        temp = (force + mp * l * th_dot**2 * sinth) / (mc + mp)
        thacc = (g * sinth - costh * temp) / (l * (4.0/3.0 - mp * costh**2 / (mc + mp)))
        xacc  = temp - mp * l * thacc * costh / (mc + mp)

        x      = x + dt * x_dot
        x_dot  = x_dot + dt * xacc
        th     = th + dt * th_dot
        th_dot = th_dot + dt * thacc

    E[:,0], E[:,1], E[:,2], E[:,3] = x, x_dot, th, th_dot

    # Termination
    dead = (np.abs(E[:,0]) > cfg["x_threshold"]) | (np.abs(E[:,2]) > cfg["theta_threshold_rad"])
    dead = dead & alive
    return dead  # (B,)

In [None]:
# Cell 6 — GA refill (crossover on full tape; I-cells get reset in encode)
def ga_refill(S, R, E, alive, steps_alive, P_mask, cfg, rng):
    B, N = S.shape
    dead_idx = np.where(~alive)[0]
    n_new = dead_idx.size
    if n_new == 0:
        return

    parents = np.where(alive)[0]
    if parents.size == 0:
            raise RuntimeError("GA refill: no alive parents available to sample from")
    else:
        pA = rng.choice(parents, size=n_new, replace=True)
        pB = rng.choice(parents, size=n_new, replace=True)
        # Rule crossover
        m_r = rng.random((n_new, 27)) < cfg["crossover_rate"]
        R_new = np.where(m_r, R[pA], R[pB]).copy()
        R_new[:,0] = 0
        # Rule mutation (skip idx 0)
        if cfg["p_rule_mut"] > 0.0:
            mut_r = rng.random((n_new, 27)) < cfg["p_rule_mut"]
            mut_r[:,0] = False  # keep rule 000→0 fixed
            cand = rng.integers(0, 3, size=(n_new, 27), dtype=np.int8)
            R_new[mut_r] = cand[mut_r]
        # State crossover (full tape)
        m_s = rng.random((n_new, N)) < cfg["crossover_rate"]
        S_new = np.where(m_s, S[pA], S[pB]).copy()
        # State mutation on P-zone
        if cfg["p_state_mut"] > 0.0:
            mut_s = (rng.random((n_new, N)) < cfg["p_state_mut"]) & P_mask[None, :]
            candS = rng.integers(0, 3, size=(n_new, N), dtype=np.int8)
            S_new[mut_s] = candS[mut_s]

    # Insert children
    S[dead_idx] = S_new
    R[dead_idx] = R_new
    E[dead_idx] = rng.normal(0.0, cfg["init_state_std"], size=(n_new,4)).astype(np.float32)
    alive[dead_idx] = True
    steps_alive[dead_idx] = 0

In [None]:
# Cell 7 — One control step
def control_step(S, R, E, alive, steps_alive, cfg, rng):
    bin_idx = discretize_bins(E, cfg["obs_min"], cfg["obs_max"], cfg["K"])  # (B,4)
    em_encode(S, bin_idx, I_idx, alive)
    S[:] = em_evolve(S, R, cfg["T"])
    u, Lsum, Rsum = em_decode(S, L_idx, R_idx, cfg, rng)  # (B,), (B,), (B,)

    # Accumulate over alive only
    a = alive.astype(np.float32)
    RL_stats["sum_L"] += float((Lsum * a).sum())
    RL_stats["sum_R"] += float((Rsum * a).sum())
    RL_stats["sum_u"] += float((np.abs(u) * a).sum())
    RL_stats["n"]     += int(a.sum())

    dead_now = cartpole_step(E, u, alive, cfg)
    alive[dead_now] = False
    steps_alive[alive] += 1
    return dead_now.sum()


In [None]:
# Cell 8 — Main loop
total_deaths = 0
new_dead = 0
for t in range(cfg["n_control_steps"]):
    d = control_step(S, R, E, alive, steps_alive, cfg, rng)
    total_deaths += int(d)
    new_dead += int(d)

    if (t+1) % cfg["GA_interval"] == 0:
        ga_refill(S, R, E, alive, steps_alive, P_mask, cfg, rng)

    if (t+1) % 100 == 0:
        alive_count = int(alive.sum())
        mean_surv = steps_alive[alive].mean() if alive_count else 0.0
        avg_L = RL_stats["sum_L"] / RL_stats["n"] if RL_stats["n"] else 0.0
        avg_R = RL_stats["sum_R"] / RL_stats["n"] if RL_stats["n"] else 0.0
        avg_abs_u = RL_stats["sum_u"] / RL_stats["n"] if RL_stats["n"] else 0.0
        print(f"[step {t+1:05d}] alive={alive_count:4d}  dead={int(new_dead/100):6d}  "
              f"mean_steps_alive={mean_surv:6.2f}  avg#L={avg_L:.3f}  avg#R={avg_R:.3f}  avg|u|={avg_abs_u:.3f}")

        i = np.argmax(steps_alive[alive])
        if steps_alive[i] > best["steps"]:
            best["R"], best["S"], best["idx"], best["steps"]=R[i].copy(), S[i].copy(), i, steps_alive[i].copy()
            np.savez_compressed("best_agent.npz", R=best["R"], S=best["S"], steps=best["steps"], cfg=cfg)


        new_dead = 0
        RL_stats = {"sum_L": 0.0, "sum_R": 0.0, "sum_u": 0.0, "n": 0}


In [None]:
# Cell — Load best agent and build single-agent copies
# Try in-memory snapshot; else load from disk

if "best" in globals() and best.get("R") is not None and best.get("S") is not None:
    R1 = best["R"][None, :].copy()   # (1,27)
    S1 = best["S"][None, :].copy()   # (1,N)
    print("loading from locals")
else:
    data = np.load("best_agent.npz", allow_pickle=True)
    R1 = data["R"][None, :].copy()
    S1 = data["S"][None, :].copy()
    print("loading from disk")

E1 = rng.normal(0.0, cfg["init_state_std"], size=(1, 4)).astype(np.float32)  # (1,4)
alive1 = np.array([True], dtype=bool)
steps1 = np.array([0], dtype=np.int32)

H = 800  # control steps to visualize


In [None]:
# Cell — decode for visualization + single-agent rollout (logs CA lines, env, actions)


def em_decode_vis(S, L_idx, R_idx, cfg):
    A = (S == 2)
    Lsum = 0
    Rsum = 0
    for vi in range(4):
        Lsum += A[:, L_idx[vi]].sum(axis=1)
        Rsum += A[:, R_idx[vi]].sum(axis=1)
    Lsum = Lsum.astype(np.float32)
    Rsum = Rsum.astype(np.float32)
    u = cfg["force_scale"] * ((Rsum - Lsum) / (Rsum + Lsum + cfg["force_bias_b"]))
    u += rng.normal(0.0, cfg["force_noise_std"], size=u.shape)  # robustness
    u = np.clip(u, -cfg["u_max"], cfg["u_max"]).astype(np.float32)
    return u  # (1,)

# Logs
M = np.zeros((H, cfg["N"]), dtype=np.uint8)   # CA last-state per control step (H,N)
E_hist = np.zeros((H, 4), dtype=np.float32)   # env after physics (H,4)
u_hist = np.zeros(H, dtype=np.float32)        # action per step (H,)

for t in range(H):
    if not alive1[0]:
        M[t:] = 0
        E_hist[t:] = E_hist[t-1]
        u_hist[t:] = 0.0
        print("dead at time:",t)
        break
    bins = discretize_bins(E1, cfg["obs_min"], cfg["obs_max"], cfg["K"])
    em_encode(S1, bins, I_idx, alive1)
    S1[:] = em_evolve(S1, R1, cfg["T"])
    u = em_decode_vis(S1, L_idx, R_idx, cfg)
    dead_now = cartpole_step(E1, u, alive1, cfg)
    alive1[dead_now] = False
    steps1[alive1] += 1

    M[t] = S1[0]
    E_hist[t] = E1[0]
    u_hist[t] = u[0]


In [None]:
# Cell — Save tall CA image (time ↓, space →)

cmap = ListedColormap([[1,1,1], [1,0,0], [0,0,1]])  # 0:white, 1:red, 2:blue

# Very tall figure: use imsave for memory efficiency
plt.imsave(
    "ca_raster.png",
    M,
    cmap=cmap,
    vmin=0, vmax=2,
    format="png"
)
print("Saved CA raster to ca_raster.png")


In [None]:
# Cell — Cart-pole MP4 (uses E_hist/u_hist already computed)

# Trim to the portion actually simulated before death
valid = np.where((E_hist[:,0] != 0) | (E_hist[:,2] != 0) | (u_hist != 0))[0]
T_vid = valid[-1]+1 if valid.size else 1

fig, ax = plt.subplots(figsize=(6, 3))
ax.set_xlim(-cfg["x_threshold"]*1.2, cfg["x_threshold"]*1.2)
ax.set_ylim(-0.5, 1.2)
ax.set_xlabel("x")
ax.set_ylabel("height")

cart_w, cart_h = 0.3, 0.2
y_cart = 0.0
cart = plt.Rectangle((0 - cart_w/2, y_cart - cart_h/2), cart_w, cart_h, fill=False)
ax.add_patch(cart)
pole_line, = ax.plot([], [], lw=2)
force_txt = ax.text(0.02, 0.92, "", transform=ax.transAxes)

def init():
    cart.set_xy((0 - cart_w/2, y_cart - cart_h/2))
    pole_line.set_data([], [])
    force_txt.set_text("")
    return cart, pole_line, force_txt

def animate(i):
    x, x_dot, th, th_dot = E_hist[i]
    cart.set_x(x - cart_w/2)

    pivot = np.array([x, y_cart + cart_h/2])
    tip = pivot + np.array([ np.sin(th)*cfg["length"]*2.0, np.cos(th)*cfg["length"]*2.0 ])
    pole_line.set_data([pivot[0], tip[0]], [pivot[1], tip[1]])
    force_txt.set_text(f"t={i}  u={u_hist[i]:.2f}  θ={th:.3f}")

    return cart, pole_line, force_txt

writer = animation.FFMpegWriter(fps=30, bitrate=1800)
ani = animation.FuncAnimation(fig, animate, init_func=init, frames=T_vid, blit=True)
ani.save("cartpole.mp4", writer=writer)
plt.close(fig)
print("Saved cartpole.mp4")


In [None]:
# Minimal cell to dump the whole notebook (test.ipynb) to a .txt file
import json

with open("test.ipynb", "r", encoding="utf-8") as f:
    nb = json.load(f)

with open("test.txt", "w", encoding="utf-8") as f:
    for cell in nb.get("cells", []):
        if cell.get("cell_type") == "code":
            f.write("".join(cell.get("source", [])) + "\n\n")