In [None]:
# run_tournament_v2_debug_fixed.py
"""
Run the full Farkle strategy tournament with multiprocessing,
queue-based logging, and resumable checkpoints.
"""
from __future__ import annotations

import logging
import logging.handlers
import multiprocessing as mp
import pickle
import sys
import time
from collections import Counter
from pathlib import Path
from typing import Tuple

import numpy as np

from farkle.simulation import generate_strategy_grid, _play_game
from farkle.stats import games_for_power


def short_vars(d: dict, max_len: int = 100) -> str:
    s = repr(d)
    return s if len(s) <= max_len else s[:max_len] + "…"

# ──────────────────────────────────────────────────────────────────────────────
# 0.--- Globals & tuning knobs
CHUNKSIZE       = 100          # tasks sent to each worker at a time
PROCESSES       = 16
MAXTASKS        = 50              # fork a fresh worker after this many tasks
REPORT_EVERY    = 100_000         # progress print interval
CHECKPOINT_FILE = Path("win_counter.chk")

# ──────────────────────────────────────────────────────────────────────────────
# 1.--- Set up root logger and handler (no debug() calls here yet)
DEBUG_FIRST_N = 10000   # change once; used by the filter below

class FirstNFilter(logging.Filter):
    """
    Let every unique call-site (file, lineno) speak `n` times, then silence it.
    Works no matter how tight the surrounding loop is.
    """
    def __init__(self, n: int = DEBUG_FIRST_N):
        super().__init__()
        self.n = n
        self.seen: Counter[Tuple[str, int]] = Counter()

    def filter(self, record: logging.LogRecord) -> bool:
        key = (record.pathname, record.lineno)
        self.seen[key] += 1
        return self.seen[key] <= self.n

# configure the root logger
root = logging.getLogger()             # grab root so *all* libraries inherit it
root.setLevel(logging.DEBUG)           # we really want to see DEBUG

handler = logging.StreamHandler(sys.stdout)  # print to stdout
handler.setFormatter(logging.Formatter(
        "%(asctime)s %(levelname)-5s %(filename)s:%(lineno)d | %(message)s",
        datefmt="%H:%M:%S"))
handler.addFilter(FirstNFilter(DEBUG_FIRST_N))

root.handlers[:] = [handler]            # replace any default handlers

# now that the handler is fully set up, we can safely grab “tournament” logger
log = logging.getLogger("tournament")
# No log.debug(...) here at module import time.

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s|%(processName)s|%(levelname)s|%(message)s",
    datefmt="%H:%M:%S"
)

# ──────────────────────────────────────────────────────────────────────────────
# 2.--- Strategy grid and powered sample size
strategies, meta = generate_strategy_grid()
meta["str_repr"] = [str(s) for s in strategies]

n_games_per_player = games_for_power(
    n_strategies=len(strategies),
    delta=0.03, alpha=0.025, power=0.90,
    method="bh", pairwise=True,
)
total_tasks = len(strategies) * n_games_per_player // 5  # == total games

log.info("n_games_per_player = %s, total tasks = %s", n_games_per_player, total_tasks)

# ──────────────────────────────────────────────────────────────────────────────


# ──────────────────────────────────────────────────────────────────────────────
# 5.--- Worker function
def _one(task):
    vars_snippet = short_vars(locals(), 100)                 
    log.debug("reached _one  vars=%s", vars_snippet)
    seed, idxs = task
    vars_snippet = short_vars(locals(), 100)                 
    log.debug("unpacked task in _one  vars=%s", vars_snippet)
    table = [strategies[i] for i in idxs]
    vars_snippet = short_vars(locals(), 100)                 
    log.debug("constructed table in _one  vars=%s", vars_snippet)
    row = _play_game(seed, table, 10_000)
    vars_snippet = short_vars(locals(), 100)                 
    log.debug("completed _play_game in _one  vars=%s", vars_snippet)
    win = str(row[f"{row['winner']}_strategy"])
    vars_snippet = short_vars(locals(), 100)                 
    log.debug("computed win in _one  vars=%s", vars_snippet)
    return win

# ──────────────────────────────────────────────────────────────────────────────
# 3.–6. Combined, memory-light task stream with resume support

def task_stream(already_done: int = 0):
    """
    Lazily yields (seed, idxs) for each game:
      - rep in [0..n_games_per_player)
      - permute strategies
      - take groups of 5 as one table
    Skips the first `already_done` tables in O(1), not by calling next() repeatedly.
    """
    vars_snippet = short_vars(locals(), 100)                 
    log.debug("reached task_stream:  vars=%s", vars_snippet)
    num_strats = len(strategies)
    tables_per_rep = num_strats // 5

    # figure out where to start
    start_rep, start_table = divmod(already_done, tables_per_rep)

    perm_rng = np.random.default_rng(999)       # RNG for the permutations
    seed_rng = np.random.default_rng(1234)      # RNG for the per‐game seeds
    vars_snippet = short_vars(locals(), 100)                 
    log.debug("assigned task_stream variables vars=%s", vars_snippet)

    for rep in range(start_rep, n_games_per_player):
        vars_snippet = short_vars(locals(), 100)                 
        log.debug("entered loop in task_stream   vars=%s", vars_snippet)
        perm = perm_rng.permutation(num_strats)   # one new random order
        # if this is our first rep, skip up to start_table
        first_j = 5 * (start_table if rep == start_rep else 0)

        for j in range(first_j, num_strats, 5):
            # stop if not a full group of 5
            if j + 5 > num_strats:
                break
            idxs = tuple(int(x) for x in perm[j : j + 5])
            seed = int(seed_rng.integers(2**32))
            yield (seed, idxs)


# ──────────────────────────────────────────────────────────────────────────────
# 7.--- Checkpoint helpers
def save_checkpoint(counter: Counter, done: int):
    vars_snippet = short_vars(locals(), 100)                 
    log.debug("reached save_checkpoint  vars=%s", vars_snippet)
    with CHECKPOINT_FILE.open("wb") as f:
        pickle.dump({"done": done, "counter": dict(counter)}, f)
        vars_snippet = short_vars(locals(), 100)                 
        log.debug("pickle.dump completed in save_checkpoint  vars=%s", vars_snippet)
    log.info("Checkpoint saved at %s tasks.", done)
    vars_snippet = short_vars(locals(), 100)                 
    log.debug("exiting save_checkpoint  vars=%s", vars_snippet)

def load_checkpoint() -> tuple[int, Counter]:
    vars_snippet = short_vars(locals(), 100)                 
    log.debug("reached load_checkpoint  vars=%s", vars_snippet)
    if not CHECKPOINT_FILE.exists():
        vars_snippet = short_vars(locals(), 100)                 
        log.debug("load_checkpoint found no checkpoint file  vars=%s", vars_snippet)
        return 0, Counter()
    with CHECKPOINT_FILE.open("rb") as f:
        data = pickle.load(f)
        vars_snippet = short_vars(locals(), 100)                 
        log.debug("pickle.load completed in load_checkpoint  vars=%s", vars_snippet)
    log.info("Checkpoint loaded: %s tasks finished previously.", data["done"])
    vars_snippet = short_vars(locals(), 100)                 
    log.debug("exiting load_checkpoint with data  vars=%s", vars_snippet)
    return data["done"], Counter(data["counter"])

# ──────────────────────────────────────────────────────────────────────────────
# 8.--- Multiprocessing-friendly logging (single queue sink)
def _configure_worker_logging(queue):
    vars_snippet = short_vars(locals(), 100)                 
    log.debug("reached _configure_worker_logging  vars=%s", vars_snippet)
    qh = logging.handlers.QueueHandler(queue)
    root = logging.getLogger()
    root.handlers.clear()
    root.setLevel(logging.DEBUG)
    root.addHandler(qh)
    vars_snippet = short_vars(locals(), 100)                 
    log.debug("configured worker logging handlers  vars=%s", vars_snippet)

def _listener_process(queue):
    vars_snippet = short_vars(locals(), 100)                 
    log.debug("reached _listener_process  vars=%s", vars_snippet)
    h = logging.StreamHandler(sys.stdout)
    fmt = logging.Formatter("%(asctime)s|%(processName)s|%(levelname)s|%(message)s", "%H:%M:%S")
    h.setFormatter(fmt)
    root = logging.getLogger()
    root.addHandler(h)
    root.setLevel(logging.DEBUG)
    vars_snippet = short_vars(locals(), 100)                 
    log.debug("listener configured its handler  vars=%s", vars_snippet)
    while True:
        record = queue.get()
        vars_snippet = short_vars(locals(), 100)                 
        log.debug("listener received record  vars=%s", vars_snippet)
        if record is None:
            vars_snippet = short_vars(locals(), 100)                 
            log.debug("listener received sentinel None  vars=%s", vars_snippet)
            break
        root.handle(record)
    vars_snippet = short_vars(locals(), 100)                 
    log.debug("listener exiting  vars=%s", vars_snippet)

# ──────────────────────────────────────────────────────────────────────────────
def main():
    vars_snippet = short_vars(locals(), 100)                 
    log.debug("reached main  vars=%s", vars_snippet)
    already_done, win_counter = load_checkpoint()
    vars_snippet = short_vars(locals(), 100)                 
    log.debug("after load_checkpoint in main  vars=%s", vars_snippet)

    if already_done >= total_tasks:
        vars_snippet = short_vars(locals(), 100)                 
        log.debug("main early exit condition met  vars=%s", vars_snippet)
        log.warning("All %s tasks are already complete. Nothing to do!", total_tasks)
        return

    log_queue = mp.Queue()
    vars_snippet = short_vars(locals(), 100)                 
    log.debug("created log_queue in main  vars=%s", vars_snippet)
    listener = mp.Process(target=_listener_process, args=(log_queue,), daemon=True)
    listener.start()
    vars_snippet = short_vars(locals(), 100)                 
    log.debug("started listener process in main  vars=%s", vars_snippet)

    start_time = time.perf_counter()
    vars_snippet = short_vars(locals(), 100)                 
    log.debug("recorded start_time in main  vars=%s", vars_snippet)
    done = already_done
    vars_snippet = short_vars(locals(), 100)                 
    log.debug("initialized done counter in main  vars=%s", vars_snippet)

    with mp.Pool(
        processes=PROCESSES,
        maxtasksperchild=MAXTASKS,
        initializer=_configure_worker_logging,
        initargs=(log_queue,),
    ) as pool:
        vars_snippet = short_vars(locals(), 100)                 
        log.debug("entered Pool context in main  vars=%s", vars_snippet)
        log.info("Pool started with %d workers.", PROCESSES)
        vars_snippet = short_vars(locals(), 100)                 
        log.debug("logged pool start info in main  vars=%s", vars_snippet)
        stream = task_stream(already_done)
        vars_snippet = short_vars(locals(), 100)                 
        log.debug("created stream iterator in main  vars=%s", vars_snippet)

        for win in pool.imap_unordered(_one, stream, chunksize=CHUNKSIZE):
            vars_snippet = short_vars(locals(), 100)                 
            log.debug("received win from pool.imap_unordered  vars=%s", vars_snippet)
            win_counter[win] += 1
            vars_snippet = short_vars(locals(), 100)                 
            log.debug("updated win_counter in main loop  vars=%s", vars_snippet)
            done += 1
            vars_snippet = short_vars(locals(), 100)                 
            log.debug("incremented done in main loop  vars=%s", vars_snippet)

            if done % REPORT_EVERY == 0 or done == total_tasks:
                vars_snippet = short_vars(locals(), 100)                 
                log.debug("about to compute progress in main loop  vars=%s", vars_snippet)
                pct = 100 * done / total_tasks
                hrs = (time.perf_counter() - start_time) / 3600
                log.info("[%10d / %10d]  %6.2f %%  %6.2f h elapsed", done, total_tasks, pct, hrs)
                vars_snippet = short_vars(locals(), 100)                 
                log.debug("about to save checkpoint in main loop  vars=%s", vars_snippet)
                save_checkpoint(win_counter, done)
                vars_snippet = short_vars(locals(), 100)                 
                log.debug("saved checkpoint in main loop  vars=%s", vars_snippet)

        vars_snippet = short_vars(locals(), 100)                 
        log.debug("exited for loop in main  vars=%s", vars_snippet)

    # tell listener to finish
    vars_snippet = short_vars(locals(), 100)                 
    log.debug("sending sentinel to listener  vars=%s", vars_snippet)
    log_queue.put(None)
    vars_snippet = short_vars(locals(), 100)                 
    log.debug("sent sentinel to listener  vars=%s", vars_snippet)
    listener.join()
    vars_snippet = short_vars(locals(), 100)                 
    log.debug("listener joined in main  vars=%s", vars_snippet)

    # final dump + CSV
    vars_snippet = short_vars(locals(), 100)                 
    log.debug("about to save final checkpoint  vars=%s", vars_snippet)
    save_checkpoint(win_counter, done)
    vars_snippet = short_vars(locals(), 100)                 
    log.debug("saved final checkpoint  vars=%s", vars_snippet)

    summary = (
        meta[["strategy_idx", "str_repr"]]
        .assign(wincount=lambda df: df["str_repr"].map(win_counter).fillna(0).astype("int32"))
        .sort_values("strategy_idx")[["strategy_idx", "wincount"]]
    )
    vars_snippet = short_vars(locals(), 100)                 
    log.debug("constructed summary DataFrame  vars=%s", vars_snippet)
    summary.to_csv("wincounts.csv", index=False)
    vars_snippet = short_vars(locals(), 100)                 
    log.debug("wrote wincounts.csv  vars=%s", vars_snippet)
    log.info("Finished!  CSV written with final results.")
    vars_snippet = short_vars(locals(), 100)                 
    log.debug("exiting main  vars=%s", vars_snippet)

# ──────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
    # At this point, the handler + root logger are fully configured.
    vars_snippet = short_vars(locals(), 100)                 
    log.debug("about to call main from __main__  vars=%s", vars_snippet)
    main()
    vars_snippet = short_vars(locals(), 100)                 
    log.debug("returned from main in __main__  vars=%s", vars_snippet)


16:45:16 INFO  2510037422.py:89 | n_games_per_player = 10223, total tasks = 16683936
16:45:16 DEBUG 2510037422.py:322 | about to call main from __main__  vars={'__name__': '__main__', '__doc__': '\nRun the full Farkle strategy tournament with multiprocessing,…
16:45:16 DEBUG 2510037422.py:221 | reached main  vars={}
16:45:16 DEBUG 2510037422.py:168 | reached load_checkpoint  vars={}
16:45:16 DEBUG 2510037422.py:171 | load_checkpoint found no checkpoint file  vars={'vars_snippet': '{}'}
16:45:16 DEBUG 2510037422.py:224 | after load_checkpoint in main  vars={'vars_snippet': '{}', 'already_done': 0, 'win_counter': Counter()}
16:45:16 DEBUG 2510037422.py:234 | created log_queue in main  vars={'vars_snippet': "{'vars_snippet': '{}', 'already_done': 0, 'win_counter': Counter()}", 'already_don…
16:45:16 DEBUG 2510037422.py:238 | started listener process in main  vars={'vars_snippet': '{\'vars_snippet\': "{\'vars_snippet\': \'{}\', \'already_done\': 0, \'win_counter\…
16:45:16 DEBUG 2510037422

In [2]:
import threading, multiprocessing as mp
from collections import Counter
from pathlib import Path
import numpy as np
import pickle
import time
from __future__ import annotations

import logging
import logging.handlers
import multiprocessing as mp
import pickle
import sys
import time
from collections import Counter
from pathlib import Path
from typing import Tuple

import numpy as np

from farkle.simulation import generate_strategy_grid, _play_game
from farkle.stats import games_for_power

# ──────────────────────────────────────────────────────────────────────────────
# Globals
CHUNKSIZE       = 6
QUEUE_MAXSIZE   = 100
PROCESSES       = 16
REPORT_every    = 100_000
CHECKPOINT_FILE = Path("win_counter.chk")

# ──────────────────────────────────────────────────────────────────────────────
def producer(task_q, n_games_per_player, num_strats):
    """Thread #1: generate (seed, idxs) and put into task_q, up to QUEUE_MAXSIZE."""
    perm_rng = np.random.default_rng(999)
    seed_rng = np.random.default_rng(1234)

    for rep in range(n_games_per_player):
        perm = perm_rng.permutation(num_strats)
        # slice into 5‐player tables
        for j in range(0, num_strats, 5):
            if j + 5 > num_strats:
                break
            seed = int(seed_rng.integers(2**32))
            task_q.put((seed, tuple(int(x) for x in perm[j:j+5])))
    # signal end to workers
    for _ in range(PROCESSES):
        task_q.put(None)


def worker(task_q, result_q):
    """N processes: pull tasks 6 at a time, run games, push wins."""
    from farkle.simulation import _play_game  # worker imports
    while True:
        batch = []
        # gather up to CHUNKSIZE tasks
        for _ in range(CHUNKSIZE):
            task = task_q.get()
            if task is None:
                # re‐signal None so other workers see it
                task_q.put(None)
                break
            batch.append(task)

        if not batch:
            break

        # run each game in this batch
        for seed, idxs in batch:
            # reconstruct strategies list and play
            row = _play_game(seed, [strategies[i] for i in idxs], 10_000)
            win = str(row[f"{row['winner']}_strategy"])
            result_q.put(win)
    # when finished, let collector know
    result_q.put(None)


def collector(result_q, total_tasks):
    """Thread #2: pull wins, update counter, save checkpoints & CSV."""
    win_counter = Counter()
    done = 0
    active_workers = PROCESSES
    start = time.perf_counter()

    while active_workers:
        msg = result_q.get()
        if msg is None:
            active_workers -= 1
            continue
        win_counter[msg] += 1
        done += 1

        if done % REPORT_every == 0 or done == total_tasks:
            hrs = (time.perf_counter() - start)/3600
            print(f"[{done}/{total_tasks}] {hrs:.2f} h elapsed")
            with CHECKPOINT_FILE.open("wb") as f:
                pickle.dump({"done": done, "counter": dict(win_counter)}, f)

    # all workers done
    # final CSV
    import pandas as pd
    summary = (
      pd.DataFrame({"strategy_idx": range(num_strats),
                    "str_repr": [str(s) for s in strategies]})
        .assign(wincount=lambda df: df["str_repr"].map(win_counter).fillna(0).astype(int))
        .sort_values("strategy_idx")
    )
    summary.to_csv("wincounts.csv", index=False)
    print("Finished! CSV written.")

# ──────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
    strategies, meta = generate_strategy_grid()
    num_strats      = len(strategies)
    n_games         = games_for_power(...)
    total_tasks     = num_strats * n_games // 5

    # Queues
    task_q   = mp.Queue(maxsize=QUEUE_MAXSIZE)
    result_q = mp.Queue()

    # Start producer thread
    prod_thread = threading.Thread(target=producer,
                                   args=(task_q, n_games, num_strats),
                                   daemon=True)
    prod_thread.start()

    # Start collector thread
    coll_thread = threading.Thread(target=collector,
                                   args=(result_q, total_tasks),
                                   daemon=True)
    coll_thread.start()

    # Spawn worker processes
    processes = []
    for _ in range(PROCESSES):
        p = mp.Process(target=worker, args=(task_q, result_q))
        p.start()
        processes.append(p)

    # Wait for everyone to finish
    for p in processes:
        p.join()
    coll_thread.join()
    print("All done.")

TypeError: unsupported operand type(s) for +: 'ellipsis' and 'int'

In [None]:
import threading
import multiprocessing as mp
from collections import Counter
from pathlib import Path
import pickle
import time

import numpy as np
import pandas as pd

from farkle.simulation import generate_strategy_grid
from farkle.stats import games_for_power

# ──────────────────────────────────────────────────────────────────────────────
# Globals
CHUNKSIZE       = 6
QUEUE_MAXSIZE   = 100
PROCESSES       = 16
REPORT_EVERY    = 100_000
CHECKPOINT_FILE = Path("win_counter.chk")


def producer(task_q: mp.Queue, n_games_per_player: int, num_strats: int) -> None:
    """Thread #1: generate (seed, idxs) and put into task_q, up to QUEUE_MAXSIZE."""
    perm_rng = np.random.default_rng(999)
    seed_rng = np.random.default_rng(1234)

    for rep in range(n_games_per_player):
        perm = perm_rng.permutation(num_strats)
        for j in range(0, num_strats, 5):
            if j + 5 > num_strats:
                break
            seed = int(seed_rng.integers(2**32))
            task_q.put((seed, tuple(int(x) for x in perm[j : j + 5])))
    # signal end to workers
    for _ in range(PROCESSES):
        task_q.put(None)


def worker(task_q: mp.Queue, result_q: mp.Queue) -> None:
    """Process: pull tasks CHUNKSIZE at a time, run games, push wins."""
    from farkle.simulation import _play_game  # worker import

    while True:
        batch = []
        for _ in range(CHUNKSIZE):
            task = task_q.get()
            if task is None:
                task_q.put(None)
                break
            batch.append(task)

        if not batch:
            break

        for seed, idxs in batch:
            row = _play_game(seed, [strategies[i] for i in idxs], 10_000)
            win = str(row[f"{row['winner']}_strategy"])
            result_q.put(win)
    # signal collector that this worker is done
    result_q.put(None)


def collector(result_q: mp.Queue, total_tasks: int) -> None:
    """Thread #2: pull wins, update counter, save checkpoints & CSV."""
    win_counter = Counter()
    done = 0
    active_workers = PROCESSES
    start = time.perf_counter()

    while active_workers:
        msg = result_q.get()
        if msg is None:
            active_workers -= 1
            continue
        win_counter[msg] += 1
        done += 1

        if done % REPORT_EVERY == 0 or done == total_tasks:
            hrs = (time.perf_counter() - start) / 3600
            print(f"[{done}/{total_tasks}] {hrs:.2f} h elapsed")
            with CHECKPOINT_FILE.open("wb") as f:
                pickle.dump({"done": done, "counter": dict(win_counter)}, f)

    # all workers done → final CSV
    summary = (
        pd.DataFrame({
            "strategy_idx": range(num_strats),
            "str_repr": [str(s) for s in strategies]
        })
        .assign(wincount=lambda df: df["str_repr"].map(win_counter).fillna(0).astype(int))
        .sort_values("strategy_idx")
    )
    summary.to_csv("wincounts.csv", index=False)
    print("Finished! CSV written.")


if __name__ == "__main__":
    # Prepare strategy grid and determine total tasks
    strategies, meta = generate_strategy_grid()
    num_strats = len(strategies)
    n_games_per_player = games_for_power(
        n_strategies=num_strats,
        delta=0.03, alpha=0.025, power=0.90,
        method="bh", pairwise=True,
    )
    total_tasks = num_strats * n_games_per_player // 5

    # Queues
    task_q = mp.Queue(maxsize=QUEUE_MAXSIZE)
    result_q = mp.Queue()

    # Start producer thread
    prod_thread = threading.Thread(
        target=producer,
        args=(task_q, n_games_per_player, num_strats),
        daemon=True
    )
    prod_thread.start()

    # Start collector thread
    coll_thread = threading.Thread(
        target=collector,
        args=(result_q, total_tasks),
        daemon=True
    )
    coll_thread.start()

    # Spawn worker processes
    processes = []
    for _ in range(PROCESSES):
        p = mp.Process(target=worker, args=(task_q, result_q))
        p.start()
        processes.append(p)

    # Wait for workers and collector to finish
    for p in processes:
        p.join()
    coll_thread.join()
    print("All done.")

In [None]:
#!/usr/bin/env python3
"""
Run the full Farkle strategy tournament with threading, multiprocessing,
producer–worker–collector pattern, queue-based logging, and resumable checkpoints.
"""
from __future__ import annotations

import logging
import logging.handlers
import threading
import multiprocessing as mp
import pickle
import sys
import time
from collections import Counter
from pathlib import Path
from typing import Tuple

import numpy as np
import pandas as pd

from farkle.simulation import generate_strategy_grid
from farkle.stats import games_for_power

# ──────────────────────────────────────────────────────────────────────────────
def short_vars(d: dict, max_len: int = 100) -> str:
    s = repr(d)
    return s if len(s) <= max_len else s[:max_len] + "…"

# ──────────────────────────────────────────────────────────────────────────────
# 0.--- Globals & tuning knobs
CHUNKSIZE       = 6           # tasks sent to each worker at a time
QUEUE_MAXSIZE   = 100         # max tasks buffered
PROCESSES       = 16
REPORT_EVERY    = 100_000     # progress print interval
CHECKPOINT_FILE = Path("win_counter.chk")

# ──────────────────────────────────────────────────────────────────────────────
# 1.--- Set up root logger and handler
DEBUG_FIRST_N = 10000   # change once; used by the filter below

class FirstNFilter(logging.Filter):
    """
    Let every unique call-site (file, lineno) speak `n` times, then silence it.
    """
    def __init__(self, n: int = DEBUG_FIRST_N):
        super().__init__()
        self.n = n
        self.seen: Counter[Tuple[str, int]] = Counter()

    def filter(self, record: logging.LogRecord) -> bool:
        key = (record.pathname, record.lineno)
        self.seen[key] += 1
        return self.seen[key] <= self.n

# configure the root logger
root = logging.getLogger()
root.setLevel(logging.DEBUG)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter(
    "%(asctime)s %(levelname)-5s %(filename)s:%(lineno)d | %(message)s",
    datefmt="%H:%M:%S"
))
handler.addFilter(FirstNFilter(DEBUG_FIRST_N))
root.handlers[:] = [handler]

# tournament logger
log = logging.getLogger("tournament")
log.setLevel(logging.DEBUG)

# ──────────────────────────────────────────────────────────────────────────────
# 2.--- Producer: enqueue tasks

def producer(task_q: mp.Queue, n_games_per_player: int, num_strats: int) -> None:
    vars_snippet = short_vars(locals(), 100)
    log.debug("reached producer start vars=%s", vars_snippet)
    perm_rng = np.random.default_rng(999)
    seed_rng = np.random.default_rng(1234)
    vars_snippet = short_vars(locals(), 100)
    log.debug("initialized producer RNGs vars=%s", vars_snippet)

    for rep in range(n_games_per_player):
        vars_snippet = short_vars(locals(), 100)
        log.debug("producer loop rep=%d vars=%s", rep, vars_snippet)
        perm = perm_rng.permutation(num_strats)
        for j in range(0, num_strats, 5):
            vars_snippet = short_vars(locals(), 100)
            # log.debug("producer inner loop j=%d vars=%s", j, vars_snippet)
            if j + 5 > num_strats:
                log.debug("producer breaking inner loop at j=%d", j)
                break
            seed = int(seed_rng.integers(2**32))
            task = (seed, tuple(int(x) for x in perm[j : j + 5]))
            task_q.put(task)
    vars_snippet = short_vars(locals(), 100)
    log.debug("producer signaling end vars=%s", vars_snippet)
    for _ in range(PROCESSES):
        task_q.put(None)
    log.debug("producer done")

# ──────────────────────────────────────────────────────────────────────────────
# 3.--- Worker processes: consume tasks

def worker(task_q: mp.Queue, result_q: mp.Queue) -> None:
    vars_snippet = short_vars(locals(), 100)
    log.debug("reached worker start vars=%s", vars_snippet)
    from farkle.simulation import _play_game  # delay import in worker

    while True:
        batch: list[tuple[int, tuple[int, ...]]] = []
        for _ in range(CHUNKSIZE):
            task = task_q.get()
            vars_snippet = short_vars(locals(), 100)
            log.debug("worker got task vars=%s", vars_snippet)
            if task is None:
                log.debug("worker received None sentinel")
                task_q.put(None)
                break
            batch.append(task)
        vars_snippet = short_vars(locals(), 100)
        log.debug("worker batch size=%d vars=%s", len(batch), vars_snippet)
        if not batch:
            log.debug("worker breaking - empty batch")
            break

        for seed, idxs in batch:
            vars_snippet = short_vars(locals(), 100)
            log.debug("worker playing game seed=%d idxs=%s vars=%s", seed, idxs, vars_snippet)
            row = _play_game(seed, [strategies[i] for i in idxs], 10_000)
            win = str(row[f"{row['winner']}_strategy"])
            result_q.put(win)
    vars_snippet = short_vars(locals(), 100)
    log.debug("worker sending None to collector vars=%s", vars_snippet)
    result_q.put(None)

# ──────────────────────────────────────────────────────────────────────────────
# 4.--- Collector: tally wins, checkpoints, and CSV

def collector(result_q: mp.Queue, total_tasks: int) -> None:
    vars_snippet = short_vars(locals(), 100)
    log.debug("reached collector start vars=%s", vars_snippet)
    win_counter = Counter()
    done = 0
    active_workers = PROCESSES
    start = time.perf_counter()

    while active_workers:
        msg = result_q.get()
        vars_snippet = short_vars(locals(), 100)
        log.debug("collector got msg vars=%s", vars_snippet)
        if msg is None:
            active_workers -= 1
            log.debug("collector worker done, active_workers=%d", active_workers)
            continue
        win_counter[msg] += 1
        done += 1

        if done % REPORT_EVERY == 0 or done == total_tasks:
            hrs = (time.perf_counter() - start) / 3600
            log.info("[%d/%d] %6.2f h elapsed", done, total_tasks, hrs)
            with CHECKPOINT_FILE.open("wb") as f:
                pickle.dump({"done": done, "counter": dict(win_counter)}, f)
            log.debug("collector saved checkpoint at done=%d", done)

    vars_snippet = short_vars(locals(), 100)
    log.debug("collector constructing final summary vars=%s", vars_snippet)
    summary = (
        pd.DataFrame({
            "strategy_idx": range(num_strats),
            "str_repr": [str(s) for s in strategies]
        })
        .assign(wincount=lambda df: df["str_repr"].map(win_counter).fillna(0).astype(int))
        .sort_values("strategy_idx")
    )
    log.debug("collector writing final CSV")
    summary.to_csv("wincounts.csv", index=False)
    log.info("Finished! CSV written.")

# ──────────────────────────────────────────────────────────────────────────────
# 5.--- Main entrypoint

if __name__ == "__main__":
    vars_snippet = short_vars(locals(), 100)
    log.debug("reached __main__ start vars=%s", vars_snippet)

    strategies, meta = generate_strategy_grid()
    num_strats = len(strategies)
    n_games_per_player = games_for_power(
        n_strategies=num_strats,
        delta=0.03, alpha=0.025, power=0.90,
        method="bh", pairwise=True,
    )
    total_tasks = num_strats * n_games_per_player // 5
    log.debug("computed grid vars=%s", short_vars(locals(), 100))

    task_q = mp.Queue(maxsize=QUEUE_MAXSIZE)
    result_q = mp.Queue()
    log.debug("queues created vars=%s", short_vars(locals(), 100))

    prod_thread = threading.Thread(
        target=producer,
        args=(task_q, n_games_per_player, num_strats),
        daemon=True
    )
    prod_thread.start()
    log.debug("producer thread started")

    coll_thread = threading.Thread(
        target=collector,
        args=(result_q, total_tasks),
        daemon=True
    )
    coll_thread.start()
    log.debug("collector thread started")

    processes: list[mp.Process] = []
    for _ in range(PROCESSES):
        p = mp.Process(target=worker, args=(task_q, result_q))
        p.start()
        processes.append(p)
    log.debug("worker processes started count=%d", len(processes))

    for p in processes:
        p.join()
    coll_thread.join()
    log.debug("all processes and collector joined")
    log.info("All done.")


17:52:38 DEBUG 1541090730.py:184 | reached __main__ start vars={'__name__': '__main__', '__doc__': '\nRun the full Farkle strategy tournament with threading, multi…
17:52:38 DEBUG 1541090730.py:194 | computed grid vars={'__name__': '__main__', '__doc__': '\nRun the full Farkle strategy tournament with threading, multi…
17:52:38 DEBUG 1541090730.py:198 | queues created vars={'__name__': '__main__', '__doc__': '\nRun the full Farkle strategy tournament with threading, multi…
17:52:38 DEBUG 1541090730.py:76 | reached producer start vars={'task_q': <multiprocessing.queues.Queue object at 0x000001F77877E390>, 'n_games_per_player': 10223,…
17:52:38 DEBUG 1541090730.py:206 | producer thread started
17:52:38 DEBUG 1541090730.py:80 | initialized producer RNGs vars={'task_q': <multiprocessing.queues.Queue object at 0x000001F77877E390>, 'n_games_per_player': 10223,…
17:52:38 DEBUG 1541090730.py:84 | producer loop rep=0 vars={'task_q': <multiprocessing.queues.Queue object at 0x000001F77877E390>, '

In [None]:
#!/usr/bin/env python3
from __future__ import annotations
import logging, multiprocessing as mp, threading, pickle, sys, time
from collections import Counter
from pathlib import Path
from typing import List, Tuple
import numpy as np, pandas as pd
from farkle.simulation import generate_strategy_grid, _play_game
from farkle.stats import games_for_power

CHUNKSIZE = 200
FLUSH_EVERY = 1000
PROCESSES = 16
QUEUE_MAXSIZE = 50_000
REPORT_EVERY = 100_000
CHECKPOINT_FILE = Path("win_counter.chk")
DEBUG_FIRST_N = 10000

class FirstNFilter(logging.Filter):
    def __init__(self, n: int = DEBUG_FIRST_N):
        super().__init__()
        self.n = n
        self.seen: Counter[Tuple[str, int]] = Counter()
    def filter(self, record: logging.LogRecord) -> bool:
        key = (record.pathname, record.lineno)
        self.seen[key] += 1
        return self.seen[key] <= self.n

root = logging.getLogger()
root.setLevel(logging.DEBUG)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter(
    "%(asctime)s %(levelname)-5s | %(filename)s:%(lineno)d | %(message)s",
    datefmt="%H:%M:%S"
))
handler.addFilter(FirstNFilter())
root.handlers[:] = [handler]
log = logging.getLogger("tournament")
log.setLevel(logging.DEBUG)

def producer(task_q: mp.Queue, n_games_per_player: int, num_strats: int) -> None:
    log.debug("producer start n_games_per_player=%d num_strats=%d", n_games_per_player, num_strats)
    perm_rng = np.random.default_rng(999)
    seed_rng = np.random.default_rng(1234)
    log.debug("producer RNGs initialized")
    for rep in range(n_games_per_player):
        # log.debug("producer iteration rep=%d", rep)
        perm = perm_rng.permutation(num_strats)
        for j in range(0, num_strats, 5):
            # log.debug("producer inner j=%d", j)
            if j + 5 > num_strats:
                log.debug("producer breaking inner loop at j=%d", j)
                break
            seed = int(seed_rng.integers(2**32))
            task = (seed, tuple(int(x) for x in perm[j:j+5]))
            task_q.put(task)
            # log.debug("producer enqueued task %s", task)
    for _ in range(PROCESSES):
        task_q.put(None)
        log.debug("producer sent sentinel None")
    log.debug("producer done")

def worker(strat_list: List, task_q: mp.Queue, result_q: mp.Queue) -> None:
    log.debug("worker start")
    global strategies
    strategies = strat_list
    local_counter: Counter[str] = Counter()
    processed = 0
    while True:
        batch: List[Tuple[int, Tuple[int, ...]]] = []
        sentinel = False
        for _ in range(CHUNKSIZE):
            task = task_q.get()
            log.debug("worker got task %s", task)
            if task is None:
                log.debug("worker received sentinel None")
                sentinel = True
                break
            batch.append(task)
        log.debug("worker batch size=%d", len(batch))
        if not batch and sentinel:
            log.debug("worker exiting: empty batch and sentinel")
            break
        for seed, idxs in batch:
            row = _play_game(seed, [strategies[i] for i in idxs], 10_000)
            winner_key = str(row[f"{row['winner']}_strategy"])
            local_counter[winner_key] += 1
            processed += 1
            log.debug("worker processed game %d winner=%s", processed, winner_key)
            if processed % FLUSH_EVERY == 0:
                result_q.put(local_counter)
                log.debug("worker flushed local_counter at processed=%d", processed)
                local_counter = Counter()
        if sentinel:
            log.debug("worker exiting after sentinel")
            break
    if local_counter:
        result_q.put(local_counter)
        log.debug("worker final flush local_counter")
    result_q.put(None)
    log.debug("worker sent final sentinel None")

def collector(result_q: mp.Queue) -> None:
    log.debug("collector start")
    win_counter: Counter[str] = Counter()
    done_batches = 0
    active_workers = PROCESSES
    start = time.perf_counter()
    while active_workers:
        msg = result_q.get()
        log.debug("collector got msg %s", msg)
        if msg is None:
            active_workers -= 1
            log.debug("collector worker done, active_workers=%d", active_workers)
            continue
        win_counter.update(msg)
        done_batches += 1
        log.debug("collector updated win_counter with batch, done_batches=%d", done_batches)
        if done_batches % REPORT_EVERY == 0 or active_workers == 0:
            hrs = (time.perf_counter() - start) / 3600
            log.info("[batch %d] %.2f h elapsed", done_batches, hrs)
            with CHECKPOINT_FILE.open("wb") as f:
                pickle.dump({"done": done_batches, "counter": dict(win_counter)}, f)
            log.debug("collector saved checkpoint at done_batches=%d", done_batches)
    df = pd.DataFrame({
        "strategy_idx": range(len(strategies)),
        "str_repr": [str(s) for s in strategies],
    })
    df["wincount"] = df["str_repr"].map(win_counter).fillna(0).astype(int)
    df.to_csv("wincounts.csv", index=False)
    log.info("collector CSV written")

if __name__ == "__main__":
    log.debug("main start")
    strategies, _ = generate_strategy_grid()
    num_strats = len(strategies)
    n_games_per_player = games_for_power(
        n_strategies=num_strats,
        delta=0.03,
        alpha=0.025,
        power=0.90,
        method="bh",
        pairwise=True,
    )
    total_tasks = num_strats * n_games_per_player // 5
    log.info("Grid: %d strategies, %d games/strat ⇒ %d tasks.", num_strats, n_games_per_player, total_tasks)
    ctx = mp.get_context("spawn")
    task_q = ctx.Queue(maxsize=QUEUE_MAXSIZE)
    result_q = ctx.Queue(maxsize=QUEUE_MAXSIZE)
    threading.Thread(target=producer, args=(task_q, n_games_per_player, num_strats), daemon=True).start()
    log.debug("producer thread started")
    threading.Thread(target=collector, args=(result_q,), daemon=True).start()
    log.debug("collector thread started")
    processes = [ctx.Process(target=worker, args=(strategies, task_q, result_q)) for _ in range(PROCESSES)]
    for p in processes:
        p.start()
        log.debug("worker process started pid=%s", p.pid)
    for p in processes:
        p.join()
        log.debug("worker process joined pid=%s", p.pid)
    log.info("All workers joined – tournament complete.")


18:13:21 DEBUG | 4021196756.py:134 | main start
18:13:21 INFO  | 4021196756.py:146 | Grid: 8160 strategies, 10223 games/strat ⇒ 16683936 tasks.
18:13:21 DEBUG | 4021196756.py:42 | producer start n_games_per_player=10223 num_strats=8160
18:13:21 DEBUG | 4021196756.py:151 | producer thread started
18:13:21 DEBUG | 4021196756.py:45 | producer RNGs initialized
18:13:21 DEBUG | 4021196756.py:104 | collector start
18:13:21 DEBUG | 4021196756.py:153 | collector thread started


In [None]:
#!/usr/bin/env python3
from __future__ import annotations
import logging, multiprocessing as mp, threading, pickle, sys, time
from collections import Counter
from pathlib import Path
from typing import List, Tuple
import numpy as np, pandas as pd
from farkle.simulation import generate_strategy_grid, _play_game
from farkle.stats import games_for_power

CHUNKSIZE = 200
FLUSH_EVERY = 1000
PROCESSES = 16
QUEUE_MAXSIZE = 50_000
REPORT_EVERY = 100_000

# Robust project-root detection (works in scripts, modules, notebooks)
def find_project_root() -> Path:
    try:
        start = Path(__file__).resolve()
    except NameError:
        start = Path.cwd()
    for p in (start, *start.parents):
        if (p / "pyproject.toml").is_file():  # package folder marks root
            return p
    return Path.cwd()

PROJECT_ROOT = find_project_root()
CHECKPOINT_DIR = PROJECT_ROOT / "data" / "checkpoints"
CHECKPOINT_DIR.mkdir(parents=True, exist_ok=True)
CHECKPOINT_FILE = CHECKPOINT_DIR / "win_counter.chk"

DEBUG_FIRST_N = 10000
class FirstNFilter(logging.Filter):
    def __init__(self, n: int = DEBUG_FIRST_N):
        super().__init__()
        self.n = n
        self.seen: Counter[Tuple[str, int]] = Counter()
    def filter(self, record: logging.LogRecord) -> bool:
        key = (record.pathname, record.lineno)
        self.seen[key] += 1
        return self.seen[key] <= self.n

root = logging.getLogger()
root.setLevel(logging.DEBUG)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter(
    "%(asctime)s %(levelname)-5s | %(filename)s:%(lineno)d | %(message)s",
    datefmt="%H:%M:%S"
))
handler.addFilter(FirstNFilter())
root.handlers[:] = [handler]
log = logging.getLogger("tournament")
log.setLevel(logging.DEBUG)

def producer(task_q: mp.Queue, n_games_per_player: int, num_strats: int) -> None:
    log.debug("producer start n_games_per_player=%d num_strats=%d", n_games_per_player, num_strats)
    perm_rng = np.random.default_rng(999)
    seed_rng = np.random.default_rng(1234)
    log.debug("producer RNGs initialized")
    for rep in range(n_games_per_player):
        # log.debug("producer iteration rep=%d", rep)
        perm = perm_rng.permutation(num_strats)
        for j in range(0, num_strats, 5):
            # log.debug("producer inner j=%d", j)
            if j + 5 > num_strats:
                log.debug("producer breaking inner loop at j=%d", j)
                break
            seed = int(seed_rng.integers(2**32))
            task = (seed, tuple(int(x) for x in perm[j:j+5]))
            task_q.put(task)
            # log.debug("producer enqueued task %s", task)
    log.debug("Finished 1st producer loop")
    for _ in range(PROCESSES):
        task_q.put(None)
        log.debug("producer sent sentinel None")
    log.debug("producer done")

def worker(strat_list: List, task_q: mp.Queue, result_q: mp.Queue) -> None:
    log.debug("worker start")
    global strategies
    strategies = strat_list
    local_counter: Counter[str] = Counter()
    processed = 0
    while True:
        batch: List[Tuple[int, Tuple[int, ...]]] = []
        sentinel = False
        for _ in range(CHUNKSIZE):
            task = task_q.get()
            log.debug("worker got task %s", task)
            if task is None:
                log.debug("worker received sentinel None")
                sentinel = True
                break
            batch.append(task)
        log.debug("worker batch size=%d", len(batch))
        if not batch and sentinel:
            log.debug("worker exiting: empty batch and sentinel")
            break
        for seed, idxs in batch:
            row = _play_game(seed, [strategies[i] for i in idxs], 10_000)
            winner_key = str(row[f"{row['winner']}_strategy"])
            local_counter[winner_key] += 1
            processed += 1
            log.debug("worker processed game %d winner=%s", processed, winner_key)
            if processed % FLUSH_EVERY == 0:
                result_q.put(local_counter)
                log.debug("worker flushed local_counter at processed=%d", processed)
                local_counter = Counter()
        if sentinel:
            log.debug("worker exiting after sentinel")
            break
    if local_counter:
        result_q.put(local_counter)
        log.debug("worker final flush local_counter")
    result_q.put(None)
    log.debug("worker sent final sentinel None")

def collector(result_q: mp.Queue) -> None:
    log.debug("collector start")
    win_counter: Counter[str] = Counter()
    done_batches = 0
    active_workers = PROCESSES
    start = time.perf_counter()
    while active_workers:
        msg = result_q.get()
        log.debug("collector got msg %s", msg)
        if msg is None:
            active_workers -= 1
            log.debug("collector worker done, active_workers=%d", active_workers)
            continue
        win_counter.update(msg)
        done_batches += 1
        log.debug("collector updated win_counter with batch, done_batches=%d", done_batches)
        if done_batches % REPORT_EVERY == 0 or active_workers == 0:
            hrs = (time.perf_counter() - start) / 3600
            log.info("[batch %d] %.2f h elapsed", done_batches, hrs)
            with CHECKPOINT_FILE.open("wb") as f:
                pickle.dump({"done": done_batches, "counter": dict(win_counter)}, f)
            log.debug("collector saved checkpoint at done_batches=%d", done_batches)
    df = pd.DataFrame({
        "strategy_idx": range(len(strategies)),
        "str_repr": [str(s) for s in strategies],
    })
    df["wincount"] = df["str_repr"].map(win_counter).fillna(0).astype(int)
    df.to_csv("wincounts.csv", index=False)
    log.info("collector CSV written")

def main():
    log.debug("main start")
    strategies, _ = generate_strategy_grid()
    num_strats = len(strategies)
    n_games_per_player = games_for_power(
        n_strategies=num_strats,
        delta=0.03,
        alpha=0.025,
        power=0.90,
        method="bh",
        pairwise=True,
    )
    total_tasks = num_strats * n_games_per_player // 5
    log.info("Grid: %d strategies, %d games/strat ⇒ %d tasks.", num_strats, n_games_per_player, total_tasks)
    ctx = mp.get_context("spawn")
    task_q = ctx.Queue(maxsize=QUEUE_MAXSIZE)
    result_q = ctx.Queue(maxsize=QUEUE_MAXSIZE)
    threading.Thread(target=producer, args=(task_q, n_games_per_player, num_strats), daemon=True).start()
    log.debug("producer thread started")
    threading.Thread(target=collector, args=(result_q,), daemon=True).start()
    log.debug("collector thread started")
    processes = [ctx.Process(target=worker, args=(strategies, task_q, result_q)) for _ in range(PROCESSES)]
    for p in processes:
        p.start()
        log.debug("worker process started pid=%s", p.pid)
    for p in processes:
        p.join()
        log.debug("worker process joined pid=%s", p.pid)
    log.info("All workers joined – tournament complete.")

if __name__ == "__main__":
    main()

18:43:59 DEBUG | 2990622369.py:150 | main start
18:43:59 INFO  | 2990622369.py:162 | Grid: 8160 strategies, 10223 games/strat ⇒ 16683936 tasks.
18:43:59 DEBUG | 2990622369.py:57 | producer start n_games_per_player=10223 num_strats=8160
18:43:59 DEBUG | 2990622369.py:167 | producer thread started
18:43:59 DEBUG | 2990622369.py:60 | producer RNGs initialized
18:43:59 DEBUG | 2990622369.py:120 | collector start
18:43:59 DEBUG | 2990622369.py:169 | collector thread started
18:43:59 DEBUG | 2990622369.py:65 | producer inner j=0
18:43:59 DEBUG | 2990622369.py:65 | producer inner j=5
18:43:59 DEBUG | 2990622369.py:65 | producer inner j=10
18:43:59 DEBUG | 2990622369.py:65 | producer inner j=15
18:43:59 DEBUG | 2990622369.py:65 | producer inner j=20
18:43:59 DEBUG | 2990622369.py:65 | producer inner j=25
18:43:59 DEBUG | 2990622369.py:65 | producer inner j=30
18:43:59 DEBUG | 2990622369.py:65 | producer inner j=35
18:43:59 DEBUG | 2990622369.py:65 | producer inner j=40
18:43:59 DEBUG | 2990622