In [None]:
# === ES CONFIG ===
import os, time, glob, csv
from pathlib import Path
import numpy as np

# Paths (same layout you used for HC)
DATA_DIR    = "../lab2"        # folder with *.npy
RESULTS_DIR = "../results"     # will write es_results.csv here
os.makedirs(RESULTS_DIR, exist_ok=True)

# Reproducibility
SEED = 42
rng  = np.random.default_rng(SEED)

# ES hyperparameters (your chosen "Option B")
MU               = 50     # parents
LAMBDA           = 100    # offspring per generation
GENERATIONS      = 300
TOURNAMENT_SIZE  = 3
MUTATION_RATE    = 0.20   # prob an offspring gets a swap mutation
ES_MODE          = "(mu,lambda)"   # switch to "(mu,lambda)" to run comma selection

# Initial population mix
INIT_GREEDY_FRAC = 0.7    # 70% greedy seeds, 30% random seeds
ES_P_RANDOM      = 0.10   # ε for greedy seeding (random step prob)

# Optional tiny local search per offspring (kept OFF for speed)
USE_2OPT_LOCAL   = False
MAX_2OPT_CHECKS  = 2000

print("ES ready:", ES_MODE, "| μ=", MU, "λ=", LAMBDA)


ES ready: (mu+lambda) | μ= 50 λ= 100


In [2]:
def tour_length(D: np.ndarray, tour: np.ndarray) -> float:
    """Closed tour cost: sum D[i, i+1] + wrap to start (works for negative costs too)."""
    return float(D[tour, np.roll(tour, -1)].sum())

def canonicalize_start_zero(t: np.ndarray) -> np.ndarray:
    """Rotate so that city 0 is first, and pick lexicographically smaller between forward/reverse."""
    n  = t.size
    i0 = int(np.where(t == 0)[0][0]) if 0 in t else 0
    fwd = np.roll(t, -i0)
    rev = np.roll(t[::-1], -(n - 1 - i0))
    return fwd if tuple(fwd) < tuple(rev) else rev

def greedy_seed(D: np.ndarray, rng: np.random.Generator, p_random: float = 0.10) -> np.ndarray:
    """ε-greedy nearest-neighbor permutation."""
    n = D.shape[0]
    start = int(rng.integers(0, n))
    unvisited = set(range(n))
    t = [start]; unvisited.remove(start)
    while unvisited:
        last = t[-1]
        if rng.random() < p_random:
            nxt = int(rng.choice(list(unvisited)))
        else:
            cand = list(unvisited)
            nxt  = cand[int(np.argmin(D[last, cand]))]
        t.append(nxt); unvisited.remove(nxt)
    return canonicalize_start_zero(np.asarray(t, dtype=int))

def random_seed(D: np.ndarray, rng: np.random.Generator) -> np.ndarray:
    """Uniform random permutation."""
    n = D.shape[0]
    t = np.arange(n, dtype=int)
    rng.shuffle(t)
    return canonicalize_start_zero(t)


In [3]:
def tournament_select(costs: np.ndarray, k: int, tau: int, rng: np.random.Generator) -> np.ndarray:
    """Return k parent indices via tau-way tournaments (lower cost wins)."""
    n = costs.size
    out = np.empty(k, dtype=int)
    for i in range(k):
        cand = rng.integers(0, n, size=tau)
        best = cand[np.argmin(costs[cand])]
        out[i] = best
    return out

def ox_crossover(p1: np.ndarray, p2: np.ndarray, rng: np.random.Generator) -> np.ndarray:
    """Order Crossover (OX) — returns feasible permutation child."""
    n = p1.size
    a = int(rng.integers(0, n-1))
    b = int(rng.integers(a+1, n))
    child = -np.ones(n, dtype=int)
    # copy slice from p1
    child[a:b] = p1[a:b]
    used = set(child[a:b].tolist())
    # fill from p2 in order
    cur = b % n
    for x in np.concatenate([p2[b:], p2[:b]]):
        if x not in used:
            child[cur] = x
            used.add(x)
            cur = (cur + 1) % n
    return child

def swap_mutation(t: np.ndarray, rng: np.random.Generator) -> None:
    """In-place swap of two random positions."""
    n = t.size
    i = int(rng.integers(0, n))
    j = int(rng.integers(0, n-1))
    if j >= i: j += 1
    t[i], t[j] = t[j], t[i]

# Optional tiny 2-opt local search (first-improvement with a small budget)
def two_opt_local(D: np.ndarray, tour: np.ndarray, max_checks: int, rng: np.random.Generator) -> None:
    n = tour.size
    checks = 0
    while checks < max_checks:
        i = int(rng.integers(0, n-2))
        low = i + 2
        hi  = (n - 1) if i == 0 else n
        if low >= hi:
            checks += 1
            continue
        k = int(rng.integers(low, hi))
        a, b = tour[i], tour[(i+1) % n]
        c, d = tour[k], tour[(k+1) % n]
        delta = (D[a, c] + D[b, d]) - (D[a, b] + D[c, d])
        if delta < 0:
            tour[i+1:k+1] = tour[i+1:k+1][::-1]
        checks += 1


In [4]:
def es_solve(D: np.ndarray,
             mu: int = MU,
             lam: int = LAMBDA,
             generations: int = GENERATIONS,
             tau: int = TOURNAMENT_SIZE,
             mutation_rate: float = MUTATION_RATE,
             init_greedy_frac: float = INIT_GREEDY_FRAC,
             p_random: float = ES_P_RANDOM,
             mode: str = ES_MODE,
             use_2opt_local: bool = USE_2OPT_LOCAL,
             max_2opt_checks: int = MAX_2OPT_CHECKS,
             rng: np.random.Generator = rng):
    """
    Return best_tour, best_cost, history (list of best costs).
    """
    n = D.shape[0]
    # --- init population
    pop = np.empty((mu, n), dtype=int)
    n_greedy = int(round(mu * init_greedy_frac))
    for i in range(mu):
        if i < n_greedy:
            pop[i] = greedy_seed(D, rng, p_random=p_random)
        else:
            pop[i] = random_seed(D, rng)
    costs = np.array([tour_length(D, t) for t in pop], dtype=float)

    best_idx = int(np.argmin(costs))
    best_tour = pop[best_idx].copy()
    best_cost = float(costs[best_idx])
    history = [best_cost]

    for g in range(generations):
        # --- parent selection (indices into current pop)
        # we need 2*lam parents (two per offspring)
        parent_idx = tournament_select(costs, k=2*lam, tau=tau, rng=rng).reshape(lam, 2)

        # --- offspring
        off = np.empty((lam, n), dtype=int)
        for k in range(lam):
            p1 = pop[parent_idx[k,0]]
            p2 = pop[parent_idx[k,1]]
            child = ox_crossover(p1, p2, rng)
            if rng.random() < mutation_rate:
                swap_mutation(child, rng)
            if use_2opt_local and max_2opt_checks > 0:
                two_opt_local(D, child, max_2opt_checks, rng)
            off[k] = canonicalize_start_zero(child)

        off_costs = np.array([tour_length(D, t) for t in off], dtype=float)

        # --- survivor selection
        if mode == "(mu+lambda)":
            cat = np.vstack([pop, off])
            cat_costs = np.concatenate([costs, off_costs])
            order = np.argsort(cat_costs)[:mu]
            pop = cat[order]
            costs = cat_costs[order]
        elif mode == "(mu,lambda)":
            order = np.argsort(off_costs)[:mu]
            pop = off[order]
            costs = off_costs[order]
        else:
            raise ValueError("ES_MODE must be '(mu+lambda)' or '(mu,lambda)'")

        # --- track global best
        if costs[0] < best_cost:
            best_cost = float(costs[0])
            best_tour = pop[0].copy()
        history.append(best_cost)

    return best_tour, best_cost, history


In [5]:
test_path = os.path.join(DATA_DIR, "test_problem.npy")
if os.path.exists(test_path):
    D = np.load(test_path)
    t0 = time.time()
    bt, bc, hist = es_solve(D)
    dt = time.time() - t0
    print(f"[ES] test_problem.npy  n={D.shape[0]}  best_cost={bc:.3f}  time={dt:.2f}s  iters={len(hist)-1}")
else:
    print("No test_problem.npy found; skipping quick check.")


[ES] test_problem.npy  n=20  best_cost=3006.200  time=1.29s  iters=300


In [None]:
def list_instances(data_dir: str) -> list[str]:
    files = glob.glob(os.path.join(data_dir, "*.npy"))
    # stable sort: by (prefix, numeric suffix if present)
    def key_fn(p):
        name = os.path.splitext(os.path.basename(p))[0]
        import re
        m = re.match(r"(.*?)(\d+)$", name)
        if m:
            return (m.group(1), int(m.group(2)))
        return (name, -1)
    return sorted(files, key=key_fn)

rows = []
t_batch0 = time.time()
for f in list_instances(DATA_DIR):
    D = np.load(f)
    t0 = time.time()
    bt, bc, hist = es_solve(D)
    dt = time.time() - t0
    rows.append({
        "file": os.path.basename(f),
        "n": D.shape[0],
        "best_cost": bc,
        "generations": GENERATIONS,
        "mu": MU,
        "lambda": LAMBDA,
        "tau": TOURNAMENT_SIZE,
        "mutation_rate": MUTATION_RATE,
        "init_greedy_frac": INIT_GREEDY_FRAC,
        "es_mode": ES_MODE,
        "use_2opt_local": int(USE_2OPT_LOCAL),
        "time_seconds": round(dt, 3)
    })
    print(f"[ES] {os.path.basename(f):>18} | n={D.shape[0]:4d} | best={bc:12.3f} | {dt:.2f}s")

out_csv = os.path.join(RESULTS_DIR, "es_comma.csv")
if rows:
    with open(out_csv, "w", newline="", encoding="utf-8") as fp:
        writer = csv.DictWriter(fp, fieldnames=list(rows[0].keys()))
        writer.writeheader()
        writer.writerows(rows)
    print(f"\nSaved {len(rows)} rows to {out_csv}  (batch {time.time()-t_batch0:.1f}s)")
else:
    print("No results written; rows is empty.")


[ES]   problem_g_10.npy | n=  10 | best=    1497.664 | 1.19s
[ES]   problem_g_20.npy | n=  20 | best=    1755.515 | 1.28s
[ES]   problem_g_50.npy | n=  50 | best=    3268.538 | 1.46s
[ES]  problem_g_100.npy | n= 100 | best=    5240.010 | 1.79s
[ES]  problem_g_200.npy | n= 200 | best=    7982.550 | 2.38s
[ES]  problem_g_500.npy | n= 500 | best=   16900.436 | 4.65s
[ES] problem_g_1000.npy | n=1000 | best=   34847.472 | 10.41s
[ES]  problem_r1_10.npy | n=  10 | best=     184.273 | 1.17s
[ES]  problem_r1_20.npy | n=  20 | best=     392.255 | 1.25s
[ES]  problem_r1_50.npy | n=  50 | best=     599.129 | 1.44s
[ES] problem_r1_100.npy | n= 100 | best=     915.655 | 1.78s
[ES] problem_r1_200.npy | n= 200 | best=    1583.334 | 2.44s
[ES] problem_r1_500.npy | n= 500 | best=    2895.879 | 4.80s
[ES] problem_r1_1000.npy | n=1000 | best=    6131.336 | 8.75s
[ES]  problem_r2_10.npy | n=  10 | best=    -411.702 | 1.22s
[ES]  problem_r2_20.npy | n=  20 | best=    -796.863 | 1.28s
[ES]  problem_r2_50.np