In [None]:
import os, time, math, json
from typing import Iterator, Tuple
import multiprocessing
# safe import of BrokenProcessPool
try:
    from concurrent.futures.process import BrokenProcessPool
except Exception:
    class BrokenProcessPool(Exception):
        pass
try:
    from tqdm import tqdm
    TQDM = True
except Exception:
    TQDM = False
# ---------------- Worker (top-level for multiprocessing) ----------------
def worker_compute_range(start_end: Tuple[int,int]) -> Tuple[int,int,int,int,str]:
    """
    Обробити інтервал [start, end).
    Повертає (start, end, total_steps, count, error_msg)
    error_msg == '' при успіху.
    Локальний memo реалізує "еволюцію даних" всередині воркера.
    """
    start, end = start_end
    try:
        memo = {1: 0}
        total_steps = 0
        # перебираємо кожне n в інтервалі
        for n in range(start, end):
            x = n
            path = []
            # рухаємось по градині поки не знайдемо відоме значення
            while x not in memo:
                path.append(x)
                if (x & 1) == 0:
                    x = x >> 1
                else:
                    x = 3 * x + 1
            # тепер x в memo
            known = memo[x]
            # кількість кроків для n = len(path) + known
            total_steps += len(path) + known
            # заповнюємо memo для пройденого шляху (зворотнім записом)
            cur = len(path) + known
            for v in path:
                memo[v] = cur
                cur -= 1
        return start, end, total_steps, (end - start), ''
    except Exception as e:
        return start, end, 0, 0, f"{type(e).__name__}: {e}"
# ---------------- Помічники ----------------
def chunk_ranges_gen(N:int, chunk_size:int) -> Iterator[Tuple[int,int]]:
    """Генератор інтервалів [1..N] в головному потоці."""
    start = 1
    while start <= N:
        end = min(start + chunk_size, N + 1)
        yield (start, end)
        start = end
# ---------------- Основний виконавець (multiprocessing.Pool, з fallback на sequential) ----------------
def compute_collatz(
    N: int = 10_000_000,
    chunk_size: int = 50_000,
    workers: int = None,
    use_threads: bool = False,
    checkpoint_path: str = "collatz_checkpoint.json",
    checkpoint_every: int = 10,
    maxtasksperchild: int = 100
):
    """
    Паралельний запуск (рекомендовано: процеси).
    Повертає (avg_steps, total_steps, total_count, elapsed_seconds).
    """
    if N < 1:
        raise ValueError("N має бути >= 1")
    if workers is None:
        cpu = os.cpu_count() or 1
        workers = max(1, cpu - 1)
    total_tasks = math.ceil(N / chunk_size)
    print(f"Параметри: N={N}, chunk_size={chunk_size}, total_tasks={total_tasks}, workers={workers}, use_threads={use_threads}")
    # відновлення з чекпоінту, якщо є
    resume_state = None
    if checkpoint_path and os.path.exists(checkpoint_path):
        try:
            with open(checkpoint_path, 'r') as f:
                resume_state = json.load(f)
            print("Знайдено чекпоінт. Відновлюємо прогрес:", resume_state.get('completed_tasks',0), "з", total_tasks, "задач")
        except Exception as e:
            print("Не вдалося прочитати чекпоінт:", e)
            resume_state = None
    # підготовка генератора й пропуск завершених задач при відновленні
    chunks_iter = chunk_ranges_gen(N, chunk_size)
    if resume_state:
        skip = resume_state.get('completed_tasks', 0)
        for _ in range(skip):
            try:
                next(chunks_iter)
            except StopIteration:
                break
    total_steps = 0 if resume_state is None else resume_state.get('total_steps', 0)
    total_count = 0 if resume_state is None else resume_state.get('total_count', 0)
    completed = 0 if resume_state is None else resume_state.get('completed_tasks', 0)
    start_time = time.perf_counter()
    # Використовуємо multiprocessing.Pool, який звичайно стабільніший на великих прогонах
    try:
        print(f"Запускаємо multiprocessing.Pool (processes={workers}, maxtasksperchild={maxtasksperchild}) ...")
        pool = multiprocessing.Pool(processes=workers, maxtasksperchild=maxtasksperchild)
        try:
            # imap_unordered повертає по мірі готовності результатів
            it = pool.imap_unordered(worker_compute_range, chunks_iter, chunksize=1)
            pbar = tqdm(total=total_tasks, desc="tasks", unit="task") if TQDM else None
            if pbar and resume_state:
                pbar.update(resume_state.get('completed_tasks', 0))
            for res in it:
                s_start, s_end, s_total_steps, s_count, s_err = res
                if s_err:
                    # якщо воркер вернув помилку, обробляємо цей chunk послідовно в головному процесі (fallback)
                    print(f"Помилка в воркері для chunk {s_start}-{s_end}: {s_err}. Виконуємо послідовний fallback для цього chunk'а.")
                    s_start, s_end, s_total_steps, s_count, s_err2 = worker_compute_range((s_start, s_end))
                    if s_err2:
                        print(f"Fallback також зафейлився для {s_start}-{s_end}: {s_err2}. Пропускаємо цей chunk.")
                        s_total_steps = 0
                        s_count = 0
                total_steps += int(s_total_steps)
                total_count += int(s_count)
                completed += 1
                if pbar:
                    pbar.update(1)
                # чекпоінтінг
                if checkpoint_path and (completed % checkpoint_every == 0):
                    with open(checkpoint_path, 'w') as f:
                        json.dump({
                            'total_steps': total_steps,
                            'total_count': total_count,
                            'completed_tasks': completed,
                            'N': N,
                            'chunk_size': chunk_size
                        }, f)
            if pbar:
                pbar.close()
        finally:
            pool.close()
            pool.join()
        elapsed = time.perf_counter() - start_time
        avg = total_steps / total_count if total_count else 0.0
        print("[multiprocessing.Pool] Завершено успішно.")
        return avg, total_steps, total_count, elapsed
    except Exception as e:
        # якщо Pool зламався — робимо послідовну обробку як останній fallback
        print("[multiprocessing.Pool] Exception:", type(e).__name__, e)
        print("[SEQUENTIAL FALLBACK] Починаємо послідовну обробку залишку (повільно, але гарантує завершення).")
        seq_start = time.perf_counter()
        for ch in chunks_iter:
            s_start, s_end, s_total_steps, s_count, s_err = worker_compute_range(ch)
            if s_err:
                print(f"[SEQUENTIAL] Error for chunk {ch}: {s_err} — пропускаємо.")
            total_steps += int(s_total_steps)
            total_count += int(s_count)
            completed += 1
            if checkpoint_path and (completed % checkpoint_every == 0):
                with open(checkpoint_path, 'w') as f:
                    json.dump({
                        'total_steps': total_steps,
                        'total_count': total_count,
                        'completed_tasks': completed,
                        'N': N,
                        'chunk_size': chunk_size
                    }, f)
        seq_elapsed = time.perf_counter() - seq_start
        elapsed = time.perf_counter() - start_time
        avg = total_steps / total_count if total_count else 0.0
        return avg, total_steps, total_count, elapsed
# ---------------- Виконання ----------------
if __name__ == "__main__":
    try:
        multiprocessing.freeze_support()
    except Exception:
        pass
    # --------- Налаштування (змініть під вашу машину при потребі) ----------
    N = 10_000_000
    CHUNK_SIZE = 100_000
    CPU = os.cpu_count() or 1
    WORKERS = 4
    USE_THREADS = True
    CHECKPOINT = "collatz_checkpoint.json"
    CHECKPOINT_EVERY = 10
    MAxTASKS = 100
    print("Починаємо обчислення (Ctrl+C для переривання). Параметри:")
    print(f" N={N}, CHUNK_SIZE={CHUNK_SIZE}, WORKERS={WORKERS}, USE_THREADS={USE_THREADS}, CHECKPOINT={CHECKPOINT}")
    t0 = time.perf_counter()
    avg, tot_steps, tot_count, elapsed = compute_collatz(
        N=N,
        chunk_size=CHUNK_SIZE,
        workers=WORKERS,
        use_threads=USE_THREADS,
        checkpoint_path=CHECKPOINT,
        checkpoint_every=CHECKPOINT_EVERY,
        maxtasksperchild=MAxTASKS
    )
    t_total = time.perf_counter() - t0
    print("\n=== РЕЗУЛЬТАТ ===")
    print(f"Оброблено чисел: {tot_count:,}")
    print(f"Загальна сума кроків: {tot_steps:,}")
    print(f"Середня кількість кроків на число: {avg:.6f}")
    print(f"Час виконання воркерів (приблизно): {elapsed:.2f} с")
    print(f"Повний час (wall-clock): {t_total:.2f} с")

Починаємо обчислення (Ctrl+C для переривання). Параметри:
 N=10000000, CHUNK_SIZE=100000, WORKERS=4, USE_THREADS=True, CHECKPOINT=collatz_checkpoint.json
Параметри: N=10000000, chunk_size=100000, total_tasks=100, workers=4, use_threads=True
Запускаємо multiprocessing.Pool (processes=4, maxtasksperchild=100) ...


tasks:   0%|          | 0/100 [00:00<?, ?task/s]