### AlphaZero for Togyz Kumalak — Kaggle Notebook

This notebook sets up a lightweight, Kaggle-friendly pipeline for training an AlphaZero-style agent for Togyz Kumalak.

- **Structure**: writes modules under `src/*` in the working directory and imports them.
- **Persistence**: saves checkpoints and artifacts under `kaggle/working/alphazero_togyz/`.
- **Config**: YAML config in `configs/default.yaml` controls self-play, model, MCTS, and training.
- **Run flow**: Setup → Build modules → (optional) edit config → Train / Evaluate.

You can run this end-to-end on a single Kaggle GPU session; it also supports checkpoint/resume between sessions.


In [4]:
# Environment + minimal deps
import os, sys, random, math
from pathlib import Path

# Detect Kaggle
IS_KAGGLE = os.path.exists("/kaggle")
WORK_DIR = Path("/kaggle/working/alphazero_togyz") if IS_KAGGLE else Path.cwd() / "alphazero_togyz"
WORK_DIR.mkdir(parents=True, exist_ok=True)
CHECKPOINT_DIR = WORK_DIR / "checkpoints"
CHECKPOINT_DIR.mkdir(parents=True, exist_ok=True)
ARTIFACTS_DIR = WORK_DIR / "artifacts"
ARTIFACTS_DIR.mkdir(parents=True, exist_ok=True)
CONFIGS_DIR = WORK_DIR / "configs"
CONFIGS_DIR.mkdir(parents=True, exist_ok=True)
SRC_DIR = WORK_DIR / "src"
SRC_DIR.mkdir(parents=True, exist_ok=True)

print({
    "IS_KAGGLE": IS_KAGGLE,
    "WORK_DIR": str(WORK_DIR),
    "CHECKPOINT_DIR": str(CHECKPOINT_DIR),
})

# Install minimal extras (most are preinstalled on Kaggle)
try:
    import yaml  # type: ignore
except Exception:
    # Non-interactive install
    !pip -q install --no-input pyyaml
    import yaml  # type: ignore

try:
    import tqdm  # type: ignore
except Exception:
    !pip -q install --no-input tqdm
    import tqdm  # type: ignore

# Basic reproducibility
import numpy as np

def set_global_seed(seed: int = 42):
    random.seed(seed)
    np.random.seed(seed)
    try:
        import torch
        torch.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
    except Exception:
        pass

set_global_seed(42)
print("Environment initialized.")


{'IS_KAGGLE': False, 'WORK_DIR': '/Users/daurenzhunussov/togyzkumalak/alphazero_togyz', 'CHECKPOINT_DIR': '/Users/daurenzhunussov/togyzkumalak/alphazero_togyz/checkpoints'}

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.0.1[0m[39;49m -> [0m[32;49m25.3[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


ModuleNotFoundError: No module named 'yaml'

In [5]:
# Project scaffolding and import path
from pathlib import Path
import sys, os

# Create package structure
for sub in [
    SRC_DIR / "game",
    SRC_DIR / "encoding",
    SRC_DIR / "mcts",
    SRC_DIR / "nn",
    SRC_DIR / "selfplay",
    SRC_DIR / "trainer",
    SRC_DIR / "arena",
    WORK_DIR / "tests",
]:
    sub.mkdir(parents=True, exist_ok=True)
    (sub / "__init__.py").write_text("")

# Ensure import path
if str(SRC_DIR) not in sys.path:
    sys.path.insert(0, str(SRC_DIR))
print("sys.path[0] =", sys.path[0])

# Utility: atomic write helper
from contextlib import contextmanager

@contextmanager
def atomic_write(path: Path, mode: str = "w"):
    tmp = path.with_suffix(path.suffix + ".tmp")
    with open(tmp, mode) as f:
        yield f
    os.replace(tmp, path)

print("Scaffold ready.")


sys.path[0] = /Users/daurenzhunussov/togyzkumalak/alphazero_togyz/src
Scaffold ready.


In [None]:
# Generate core module files under src/
from textwrap import dedent

files = {}

files[SRC_DIR / "nn" / "model.py"] = dedent('''
from typing import Tuple
import torch
import torch.nn as nn
import torch.nn.functional as F


class ResidualBlock(nn.Module):
    def __init__(self, channels: int, kernel_size: int = 3):
        super().__init__()
        padding = kernel_size // 2
        self.conv1 = nn.Conv1d(channels, channels, kernel_size, padding=padding, bias=False)
        self.bn1 = nn.BatchNorm1d(channels)
        self.conv2 = nn.Conv1d(channels, channels, kernel_size, padding=padding, bias=False)
        self.bn2 = nn.BatchNorm1d(channels)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        residual = x
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out = F.relu(out + residual)
        return out


class AlphaZero1D(nn.Module):
    def __init__(
        self,
        in_channels: int = 8,
        channels: int = 128,
        num_blocks: int = 8,
        board_len: int = 18,
        num_actions: int = 9,
    ) -> None:
        super().__init__()
        self.board_len = board_len
        padding = 1
        self.stem = nn.Sequential(
            nn.Conv1d(in_channels, channels, kernel_size=3, padding=padding, bias=False),
            nn.BatchNorm1d(channels),
            nn.ReLU(inplace=True),
        )
        self.blocks = nn.Sequential(*[ResidualBlock(channels, kernel_size=3) for _ in range(num_blocks)])
        # Policy head
        self.policy_conv = nn.Conv1d(channels, 2, kernel_size=3, padding=1, bias=False)
        self.policy_bn = nn.BatchNorm1d(2)
        self.policy_fc = nn.Linear(2 * board_len, num_actions)
        # Value head
        self.value_conv = nn.Conv1d(channels, channels, kernel_size=1, bias=False)
        self.value_bn = nn.BatchNorm1d(channels)
        self.value_fc1 = nn.Linear(channels, channels)
        self.value_fc2 = nn.Linear(channels, 1)

    def forward(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
        # x: (B, C, 18)
        h = self.stem(x)
        h = self.blocks(h)
        # policy
        p = F.relu(self.policy_bn(self.policy_conv(h)))
        p = p.view(p.size(0), -1)
        p_logits = self.policy_fc(p)
        # value
        v = F.relu(self.value_bn(self.value_conv(h)))
        v = v.mean(dim=2)  # (B, channels)
        v = F.relu(self.value_fc1(v))
        v = torch.tanh(self.value_fc2(v)).squeeze(-1)
        return p_logits, v
''')

files[SRC_DIR / "game" / "togyzkumalak.py"] = dedent('''
from __future__ import annotations
from dataclasses import dataclass
from typing import List, Optional, Tuple

NUM_PITS_PER_SIDE = 9
TOTAL_PITS = NUM_PITS_PER_SIDE * 2
TOTAL_SEEDS = 162

@dataclass(frozen=True)
class TogyzKumalakState:
    pits: Tuple[int, ...]  # length 18
    kazan: Tuple[int, int]
    player: int  # 0 or 1; player to move
    tuzdyk: Tuple[Optional[int], Optional[int]]  # per-side tuzdyk on opponent row (0..8)
    move_count: int = 0

    def legal_moves(self) -> List[int]:
        start = 0 if self.player == 0 else NUM_PITS_PER_SIDE
        end = start + NUM_PITS_PER_SIDE
        return [i - start for i in range(start, end) if self.pits[i] > 0]

    def is_terminal(self) -> bool:
        if self.kazan[0] >= 82 or self.kazan[1] >= 82:
            return True
        start = 0 if self.player == 0 else NUM_PITS_PER_SIDE
        end = start + NUM_PITS_PER_SIDE
        if sum(self.pits[start:end]) == 0:
            return True
        return False

    def winner(self) -> Optional[int]:
        if not self.is_terminal():
            return None
        if self.kazan[0] >= 82:
            return 0
        if self.kazan[1] >= 82:
            return 1
        k0 = self.kazan[0] + sum(self.pits[0:NUM_PITS_PER_SIDE])
        k1 = self.kazan[1] + sum(self.pits[NUM_PITS_PER_SIDE:TOTAL_PITS])
        if k0 > k1:
            return 0
        if k1 > k0:
            return 1
        return None  # draw

    def apply_move(self, action: int) -> "TogyzKumalakState":
        raise NotImplementedError("Rules engine apply_move not yet implemented.")

    def to_fen(self) -> str:
        pits_s = ",".join(map(str, self.pits))
        kaz_s = f"{self.kazan[0]}|{self.kazan[1]}"
        t0 = -1 if self.tuzdyk[0] is None else self.tuzdyk[0]
        t1 = -1 if self.tuzdyk[1] is None else self.tuzdyk[1]
        return f"{pits_s} {kaz_s} {self.player} {t0}:{t1} {self.move_count}"


def initial_state() -> TogyzKumalakState:
    pits = tuple([9] * TOTAL_PITS)
    return TogyzKumalakState(pits=pits, kazan=(0, 0), player=0, tuzdyk=(None, None), move_count=0)
''')

files[SRC_DIR / "encoding" / "encoding.py"] = dedent('''
from typing import Tuple
import numpy as np
from game.togyzkumalak import TogyzKumalakState, NUM_PITS_PER_SIDE, TOTAL_PITS, TOTAL_SEEDS


def to_canonical(state: TogyzKumalakState, include_phase: bool = True) -> np.ndarray:
    """Encode state into (C, 18) canonical tensor.
    Channels: [my_pits, opp_pits, my_tuz, opp_tuz, my_kazan, opp_kazan, move_phase]
    """
    pits = np.array(state.pits, dtype=np.float32)
    if state.player == 1:
        # Flip perspective: bring current player to front
        pits = np.concatenate([pits[NUM_PITS_PER_SIDE:], pits[:NUM_PITS_PER_SIDE]], axis=0)
        t0, t1 = state.tuzdyk[1], state.tuzdyk[0]
    else:
        t0, t1 = state.tuzdyk

    my = np.zeros(18, dtype=np.float32)
    opp = np.zeros(18, dtype=np.float32)
    my[:NUM_PITS_PER_SIDE] = pits[:NUM_PITS_PER_SIDE]
    opp[NUM_PITS_PER_SIDE:] = pits[NUM_PITS_PER_SIDE:]

    my_tuz = np.zeros(18, dtype=np.float32)
    opp_tuz = np.zeros(18, dtype=np.float32)
    if t0 is not None:
        # my tuz lives on opponent row (right half in canonical)
        opp_idx = NUM_PITS_PER_SIDE + t0
        opp_tuz[opp_idx] = 1.0
    if t1 is not None:
        # opponent tuz on my row (left half)
        my_idx = t1
        my_tuz[my_idx] = 1.0

    my_k = np.full(18, float(state.kazan[state.player] / TOTAL_SEEDS), dtype=np.float32)
    opp_k = np.full(18, float(state.kazan[1 - state.player] / TOTAL_SEEDS), dtype=np.float32)

    if include_phase:
        # crude phase heuristic: normalize by 200 moves
        phase = np.full(18, min(1.0, state.move_count / 200.0), dtype=np.float32)
        feats = np.stack([my, opp, my_tuz, opp_tuz, my_k, opp_k, phase], axis=0)
    else:
        feats = np.stack([my, opp, my_tuz, opp_tuz, my_k, opp_k], axis=0)
    return feats
''')

files[SRC_DIR / "mcts" / "search.py"] = dedent('''
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Dict, Optional, Tuple, List
import math, random
import numpy as np
import torch

@dataclass
class MCTSNode:
    prior: float
    visit_count: int = 0
    total_value: float = 0.0
    children: Dict[int, 'MCTSNode'] = field(default_factory=dict)

    @property
    def value(self) -> float:
        if self.visit_count == 0:
            return 0.0
        return self.total_value / self.visit_count


class MCTS:
    def __init__(self, model, encoder_fn, c_puct: float = 1.5, device: Optional[torch.device] = None):
        self.model = model
        self.encoder_fn = encoder_fn
        self.c_puct = c_puct
        self.device = device or torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.model.to(self.device)
        self.model.eval()

    @torch.no_grad()
    def _eval(self, state) -> Tuple[np.ndarray, float]:
        x = self.encoder_fn(state)  # (C, 18)
        xt = torch.from_numpy(x).float().unsqueeze(0).to(self.device)
        logits, v = self.model(xt)
        pi = torch.softmax(logits, dim=-1).squeeze(0).cpu().numpy()
        return pi, float(v.item())

    def search(self, root_state, num_simulations: int = 100, dirichlet_alpha: float = 0.3, dirichlet_eps: float = 0.25):
        root = MCTSNode(prior=1.0)
        priors, value = self._eval(root_state)
        legal = root_state.legal_moves()
        mask = np.zeros_like(priors)
        for a in legal:
            mask[a] = 1.0
        priors = priors * mask
        s = priors.sum()
        if s > 0:
            priors = priors / s
        # Dirichlet noise at root
        noise = np.random.dirichlet([dirichlet_alpha] * len(priors))
        priors = (1 - dirichlet_eps) * priors + dirichlet_eps * noise
        for a in range(len(priors)):
            if priors[a] > 0:
                root.children[a] = MCTSNode(prior=float(priors[a]))

        for _ in range(num_simulations):
            path: List[Tuple[MCTSNode, int]] = []
            node = root
            state = root_state
            # Selection
            while node.children:
                best_score = -1e9
                best_action = None
                total_visits = sum(c.visit_count for c in node.children.values())
                for a, child in node.children.items():
                    u = self.c_puct * child.prior * math.sqrt(total_visits + 1e-8) / (1 + child.visit_count)
                    score = child.value + u
                    if score > best_score:
                        best_score = score
                        best_action = a
                path.append((node, best_action))
                # Apply move
                try:
                    state = state.apply_move(best_action)
                except NotImplementedError:
                    break
                node = node.children[best_action]

            # Expansion
            try:
                priors, value = self._eval(state)
            except Exception:
                value = 0.0
                priors = np.ones(9, dtype=np.float32) / 9.0
            legal = state.legal_moves() if hasattr(state, 'legal_moves') else list(range(9))
            mask = np.zeros_like(priors)
            for a in legal:
                mask[a] = 1.0
            priors = priors * mask
            s = priors.sum()
            if s > 0:
                priors = priors / s
            leaf = MCTSNode(prior=1.0)
            for a in range(len(priors)):
                if priors[a] > 0:
                    leaf.children[a] = MCTSNode(prior=float(priors[a]))

            # Backprop
            for parent, action in reversed(path):
                child = parent.children[action]
                child.visit_count += 1
                child.total_value += value

        visit_counts = np.zeros(9, dtype=np.float32)
        for a, child in root.children.items():
            visit_counts[a] = child.visit_count
        if visit_counts.sum() > 0:
            policy = visit_counts / visit_counts.sum()
        else:
            policy = np.ones(9, dtype=np.float32) / 9.0
        return policy
''')

files[SRC_DIR / "selfplay" / "worker.py"] = dedent('''
from typing import Dict, List, Tuple


def generate_self_play_games(*args, **kwargs):
    """Placeholder for self-play episodes generator.
    Returns a list of (state_tensor, policy_target, value_target) triplets.
    """
    return []
''')

files[SRC_DIR / "trainer" / "loop.py"] = dedent('''
from typing import Dict, Iterable


def train_loop(*args, **kwargs):
    """Placeholder training loop with AMP, clipping, and checkpointing."""
    pass
''')

files[SRC_DIR / "arena" / "eval.py"] = dedent('''
from typing import Any


def evaluate_gating(*args, **kwargs) -> float:
    """Placeholder arena evaluation; returns win-rate vs. baseline."""
    return 0.5
''')

files[SRC_DIR / "agent.py"] = dedent('''
from typing import Optional
import numpy as np


def select_move(policy: np.ndarray, valid_mask: Optional[np.ndarray] = None) -> int:
    """Select argmax over masked policy."""
    p = policy.copy()
    if valid_mask is not None:
        p *= valid_mask
        s = p.sum()
        if s > 0:
            p /= s
    return int(np.argmax(p))
''')

for path, content in files.items():
    with atomic_write(path) as f:
        f.write(content)
    print("Wrote", path.relative_to(WORK_DIR))

print("Core modules generated.")


In [None]:
# Write default config to configs/default.yaml
import yaml
from pprint import pprint

config = {
    "model": {
        "in_channels": 8,
        "channels": 128,
        "num_blocks": 8,
    },
    "mcts": {
        "c_puct": 1.5,
        "dirichlet_alpha": 0.3,
        "dirichlet_eps": 0.25,
        "simulations": 160,
    },
    "selfplay": {
        "temperature_moves": 10,
        "num_games_per_iter": 256,
    },
    "training": {
        "buffer_size": int(1e6),
        "batch_size": 512,
        "lr": 1e-3,
        "weight_decay": 1e-4,
    },
    "arena": {
        "num_games": 200,
        "gate_win_rate": 0.55,
        "confidence": 0.95,
    },
    "paths": {
        "work_dir": str(WORK_DIR),
        "checkpoints": str(CHECKPOINT_DIR),
        "artifacts": str(ARTIFACTS_DIR),
    },
}

CONFIGS_DIR.mkdir(parents=True, exist_ok=True)
with open(CONFIGS_DIR / "default.yaml", "w") as f:
    yaml.safe_dump(config, f, sort_keys=False)

print("Wrote", (CONFIGS_DIR / "default.yaml").relative_to(WORK_DIR))
pprint(config)


In [None]:
# Smoke test: import model and run dummy forward
import torch
from nn.model import AlphaZero1D

model = AlphaZero1D(in_channels=8, channels=128, num_blocks=4)
model.eval()

x = torch.randn(2, 8, 18)
with torch.no_grad():
    logits, v = model(x)
print("policy logits:", logits.shape, "value:", v.shape)
assert logits.shape == (2, 9) and v.shape == (2,), "Unexpected output shapes"


### How to use on Kaggle

1. Run the setup cells above (Environment, Scaffolding, Core modules, Config).
2. Implement game rules in `src/game/togyzkumalak.py:apply_move` (and add unit tests under `tests/`).
3. Implement self-play episodes in `src/selfplay/worker.py` and the training loop in `src/trainer/loop.py`.
4. Wire up MCTS from `src/mcts/search.py` with the model from `src/nn/model.py` and the encoder `src/encoding/encoding.py`.
5. Start a training iteration: generate self-play, train, evaluate arena, then checkpoint under `checkpoints/`.
6. To resume next session, ensure you reload the latest checkpoint from `checkpoints/`.

Notes:
- All files are written under `alphazero_togyz/` in `kaggle/working`.
- Tweak `configs/default.yaml` to change MCTS sims, network size, batch size, etc.
- Optional logging tools (e.g., Weights & Biases) can be added later.

Next steps (from the plan):
- Rules engine with tuzdyk and terminal states.
- Canonical state encoding, symmetry, and hashing.
- PUCT MCTS with transposition table and Dirichlet noise.
- 1D residual PyTorch model (already in place) with policy/value heads.
- Self-play worker with temperature schedule and batching.
- Replay buffer, AMP training, checkpoint/resume.
- Evaluation arena and Elo with gating.
- Kaggle GPU training loop with resume and logging.
- CLI agent running MCTS on a given position.


### Alpha–Beta Engine (No Training) — Quick Start

This section adds a strong, training-free engine using iterative-deepening alpha–beta with a simple heuristic. It plays well immediately and fits within 1 hour on Kaggle. You can tweak hyperparameters (time per move, max depth, evaluation weights) below.


In [None]:
# Implement full rules: overwrite src/game/togyzkumalak.py apply_move
from pathlib import Path
from textwrap import dedent

rules_impl = dedent('''
from __future__ import annotations
from dataclasses import dataclass
from typing import List, Optional, Tuple

NUM_PITS_PER_SIDE = 9
TOTAL_PITS = NUM_PITS_PER_SIDE * 2
TOTAL_SEEDS = 162

@dataclass(frozen=True)
class TogyzKumalakState:
    pits: Tuple[int, ...]  # length 18
    kazan: Tuple[int, int]
    player: int  # 0 or 1; player to move
    tuzdyk: Tuple[Optional[int], Optional[int]]  # per-side tuzdyk on opponent row (0..8)
    move_count: int = 0

    def legal_moves(self) -> List[int]:
        start = 0 if self.player == 0 else NUM_PITS_PER_SIDE
        end = start + NUM_PITS_PER_SIDE
        return [i - start for i in range(start, end) if self.pits[i] > 0]

    def is_terminal(self) -> bool:
        if self.kazan[0] >= 82 or self.kazan[1] >= 82:
            return True
        start = 0 if self.player == 0 else NUM_PITS_PER_SIDE
        end = start + NUM_PITS_PER_SIDE
        if sum(self.pits[start:end]) == 0:
            return True
        return False

    def winner(self) -> Optional[int]:
        if not self.is_terminal():
            return None
        if self.kazan[0] >= 82:
            return 0
        if self.kazan[1] >= 82:
            return 1
        k0 = self.kazan[0] + sum(self.pits[0:NUM_PITS_PER_SIDE])
        k1 = self.kazan[1] + sum(self.pits[NUM_PITS_PER_SIDE:TOTAL_PITS])
        if k0 > k1:
            return 0
        if k1 > k0:
            return 1
        return None  # draw

    def apply_move(self, action: int) -> "TogyzKumalakState":
        pits = list(self.pits)
        k0, k1 = self.kazan
        p = self.player
        t0, t1 = self.tuzdyk

        start = 0 if p == 0 else 9
        src = start + action
        stones = pits[src]
        if stones <= 0:
            raise ValueError("Illegal move from empty pit")
        pits[src] = 0

        # Absolute tuzdyk indices
        tuz_abs0 = (9 + t0) if t0 is not None else None  # P0's tuz on P1 row
        tuz_abs1 = (0 + t1) if t1 is not None else None  # P1's tuz on P0 row

        last_idx: Optional[int] = None
        pos = src
        if stones > 1:
            # Drop first stone into starting pit
            pits[pos] += 1
            remaining = stones - 1
            while remaining > 0:
                pos = (pos + 1) % 18
                if tuz_abs0 is not None and pos == tuz_abs0:
                    k0 += 1
                elif tuz_abs1 is not None and pos == tuz_abs1:
                    k1 += 1
                else:
                    pits[pos] += 1
                    last_idx = pos
                remaining -= 1
        else:
            # single stone → place in next pit
            pos = (src + 1) % 18
            if tuz_abs0 is not None and pos == tuz_abs0:
                k0 += 1
                last_idx = None
            elif tuz_abs1 is not None and pos == tuz_abs1:
                k1 += 1
                last_idx = None
            else:
                pits[pos] += 1
                last_idx = pos

        new_t0, new_t1 = t0, t1

        # Captures / tuzdyk creation only if last stone ended in opponent pit
        if last_idx is not None:
            if p == 0 and 9 <= last_idx <= 17:
                local = last_idx - 9
                cnt = pits[last_idx]
                if cnt == 3 and new_t0 is None and local != 8 and (t1 is None or local != t1):
                    k0 += 3
                    pits[last_idx] = 0
                    new_t0 = local
                elif cnt % 2 == 0 and cnt > 0:
                    k0 += cnt
                    pits[last_idx] = 0
            elif p == 1 and 0 <= last_idx <= 8:
                local = last_idx - 0
                cnt = pits[last_idx]
                if cnt == 3 and new_t1 is None and local != 8 and (t0 is None or local != t0):
                    k1 += 3
                    pits[last_idx] = 0
                    new_t1 = local
                elif cnt % 2 == 0 and cnt > 0:
                    k1 += cnt
                    pits[last_idx] = 0

        return TogyzKumalakState(
            pits=tuple(pits),
            kazan=(k0, k1),
            player=1 - p,
            tuzdyk=(new_t0, new_t1),
            move_count=self.move_count + 1,
        )
''')

with atomic_write(SRC_DIR / "game" / "togyzkumalak.py") as f:
    f.write(rules_impl)
print("Updated src/game/togyzkumalak.py with full apply_move.")


In [None]:
# Alpha–Beta engine (negamax + alpha-beta + transposition table)
from typing import Dict, Tuple, Optional, List
import time

INF = 10_000_000

EVAL_WEIGHTS = {
    "kazan": 1000,   # weight for kazan difference
    "row": 5,        # stones on my row vs opponent row
    "tuz": 200,      # having a tuzdyk
    "pits": 2,       # non-empty pits difference
}


def eval_state(state) -> int:
    me = state.player
    opp = 1 - me
    pits = state.pits
    my_row = pits[0:9] if me == 0 else pits[9:18]
    opp_row = pits[9:18] if me == 0 else pits[0:9]
    my_k, opp_k = state.kazan[me], state.kazan[opp]
    my_tuz = 1 if state.tuzdyk[me] is not None else 0
    opp_tuz = 1 if state.tuzdyk[opp] is not None else 0

    s = 0
    s += EVAL_WEIGHTS["kazan"] * (my_k - opp_k)
    s += EVAL_WEIGHTS["row"]   * ((sum(my_row)) - (sum(opp_row)))
    s += EVAL_WEIGHTS["tuz"]   * (my_tuz - opp_tuz)
    s += EVAL_WEIGHTS["pits"]  * ((sum(1 for x in my_row if x > 0)) - (sum(1 for x in opp_row if x > 0)))
    return s


def order_moves(state, moves: List[int]) -> List[int]:
    start = 0 if state.player == 0 else 9
    return sorted(moves, key=lambda a: state.pits[start + a], reverse=True)


def negamax(state, depth: int, alpha: int, beta: int, tt: Dict[str, Tuple[int,int]], deadline: float) -> Tuple[int, Optional[int]]:
    if time.time() >= deadline:
        return eval_state(state), None
    key = state.to_fen()
    if key in tt:
        d_cached, score_cached = tt[key]
        if d_cached >= depth:
            return score_cached, None
    if state.is_terminal():
        w = state.winner()
        if w is None:
            return 0, None
        return (INF - 1 if w == state.player else -(INF - 1)), None
    if depth == 0:
        return eval_state(state), None

    best_move = None
    value = -INF
    for a in order_moves(state, state.legal_moves()):
        child = state.apply_move(a)
        sc, _ = negamax(child, depth - 1, -beta, -alpha, tt, deadline)
        sc = -sc
        if sc > value:
            value = sc
            best_move = a
        if value > alpha:
            alpha = value
        if alpha >= beta or time.time() >= deadline:
            break
    tt[key] = (depth, value)
    return value, best_move


class ABEngine:
    def __init__(self):
        self.tt: Dict[str, Tuple[int,int]] = {}

    def choose(self, state, max_time_ms: int = 600, max_depth: int = 8) -> int:
        deadline = time.time() + (max_time_ms / 1000.0)
        last_best = 0
        for d in range(2, max_depth + 1):
            sc, bm = negamax(state, d, -INF, INF, self.tt, deadline)
            if bm is not None:
                last_best = bm
            if time.time() >= deadline:
                break
        return last_best

print("Alpha–Beta engine ready.")


In [None]:
# Hyperparameters + example usage
from game.togyzkumalak import initial_state

# Tunable knobs
MAX_TIME_MS_OPENING = 400   # 300–600
MAX_TIME_MS_ENDGAME = 900   # 800–1200
MAX_DEPTH = 8               # 6–8 typical in Python for ~0.6s

# Evaluation weights (increase to bias behavior)
EVAL_WEIGHTS["kazan"] = 1000
EVAL_WEIGHTS["tuz"]   = 200
EVAL_WEIGHTS["row"]   = 5
EVAL_WEIGHTS["pits"]  = 2

engine = ABEngine()
state = initial_state()

# Small demo: play 10 plies with the engine moving for both sides
for ply in range(10):
    budget = MAX_TIME_MS_OPENING if ply < 30 else MAX_TIME_MS_ENDGAME
    mv = engine.choose(state, max_time_ms=budget, max_depth=MAX_DEPTH)
    state = state.apply_move(mv)
print("Demo plies executed.")


In [None]:
# Quick timing smoke test
import time
from game.togyzkumalak import initial_state

s = initial_state()
engine = ABEngine()
start = time.time()
plies = 20
for ply in range(plies):
    mv = engine.choose(s, max_time_ms=300, max_depth=8)
    s = s.apply_move(mv)
elapsed = time.time() - start
print({"plies": plies, "elapsed_sec": round(elapsed, 2), "per_ply_ms": int(1000*elapsed/plies)})
