# DFA: сравнение CPU vs GPU (CuPy)

В ноутбуке сравниваются **скорость** и **численная согласованность** расчёта DFA для трёх вариантов:

- **CPU**: `backend="cpu"` (возможен multiprocessing через `processes`).
- **GPU (fast)**: `StatTools.analysis.dfa.dfa`, `backend="gpu"`.
- **GPU (one-branch)**: `StatTools.analysis.dfa_one_branch.dfa`, `backend="gpu"`.

Результаты представлены в виде:
- таймингов и ускорения для нескольких размеров датасета;
- графиков флуктуационной функции в log–log координатах;
- графиков относительной ошибки относительно CPU.

In [6]:
import os
import platform
import sys
import time
from multiprocessing import cpu_count
from pathlib import Path

import numpy as np
import matplotlib.pyplot as plt

_cwd = Path.cwd().resolve()
_root = _cwd
while _root != _root.parent and not (_root / "StatTools").exists():
    _root = _root.parent
if (_root / "StatTools").exists() and str(_root) not in sys.path:
    sys.path.insert(0, str(_root))

from StatTools.analysis.dfa import dfa
from StatTools.analysis.dfa_one_branch import dfa as dfa_one_branch
from StatTools.generators import generate_fbn

import cupy as cp

%matplotlib inline
np.random.seed(42)

print("CWD =", _cwd)
print("PROJECT_ROOT =", _root)


CWD = E:\Python\FluctuationAnalysisTools\research\CUDA_acceleration
PROJECT_ROOT = E:\Python\FluctuationAnalysisTools


In [7]:
def env_info() -> dict:
    info = {
        "python": platform.python_version(),
        "platform": platform.platform(),
        "processor": platform.processor(),
        "cpu_count": cpu_count(),
        "numpy": np.__version__,
    }

    try:
        info["cupy"] = cp.__version__
        dev = cp.cuda.runtime.getDeviceProperties(0)
        info["gpu_name"] = dev.get("name", b"?").decode("utf-8", errors="ignore")
        info["gpu_total_mem_GB"] = float(dev.get("totalGlobalMem", 0)) / (1024**3)
    except Exception as e:
        info["cupy"] = "<error>"
        info["gpu_error"] = repr(e)

    return info


def print_env_info():
    info = env_info()
    for k, v in info.items():
        print(f"{k}: {v}")


print_env_info()


python: 3.14.2
platform: Windows-10-10.0.19045-SP0
processor: Intel64 Family 6 Model 183 Stepping 1, GenuineIntel
cpu_count: 32
numpy: 2.4.1
cupy: 13.6.0
gpu_name: NVIDIA GeForce RTX 5080
gpu_total_mem_GB: 15.92047119140625


## Настройки эксперимента

Ниже — три набора данных для сравнения:

- `benchmark`: умеренного размера.
- `large`: большой датасет.
- `very_large`: очень большой датасет (может быть тяжёлым по памяти/времени).

In [8]:
DEGREE = 2
N_INTEGRAL = 1

CPU_PROCESSES = cpu_count()

N_RUNS_BENCHMARK = 3

cases = [
    {"name": "benchmark", "n_series": 10, "length": 2**14, "n_runs": N_RUNS_BENCHMARK},
    {"name": "large", "n_series": 50, "length": 2**15, "n_runs": 1},
    {"name": "very_large", "n_series": 100, "length": 2**16, "n_runs": 1},
]

cases

[{'name': 'benchmark', 'n_series': 10, 'length': 16384, 'n_runs': 3},
 {'name': 'large', 'n_series': 50, 'length': 32768, 'n_runs': 1},
 {'name': 'very_large', 'n_series': 100, 'length': 65536, 'n_runs': 1}]

In [9]:
def make_dataset(n_series: int, length: int, hursts=(0.5, 1.0, 1.5), seed: int = 42) -> np.ndarray:
    """Генерация набора временных рядов (как в тесте): смесь H из hursts."""
    rng = np.random.default_rng(seed)

    data_list = []
    per_h = int(np.ceil(n_series / len(hursts)))
    for h in hursts:
        for _ in range(per_h):
            series = generate_fbn(hurst=h, length=length, method="kasdin").flatten()
            if series.shape[0] > 0:
                j = int(rng.integers(0, series.shape[0]))
                series = np.roll(series, j)
            data_list.append(series)

    data = np.array(data_list[:n_series], dtype=float)
    assert data.shape == (n_series, length)
    return data


def diff_stats(f2_cpu: np.ndarray, f2_gpu: np.ndarray) -> dict:
    abs_diff = np.abs(f2_cpu - f2_gpu)
    rel_diff = abs_diff / (np.abs(f2_cpu) + 1e-15)

    return {
        "mean_abs": float(np.mean(abs_diff)),
        "max_abs": float(np.max(abs_diff)),
        "mean_rel": float(np.mean(rel_diff)),
        "max_rel": float(np.max(rel_diff)),
    }


def reduce_f2(f2: np.ndarray, agg: str = "mean") -> np.ndarray:
    """Для 2D f2 (n_series, n_scales) вернём 1D по агрегации; для 1D вернём как есть."""
    if f2.ndim == 1:
        return f2
    if agg == "mean":
        return np.mean(f2, axis=0)
    if agg == "median":
        return np.median(f2, axis=0)
    raise ValueError(f"Unknown agg: {agg}")


def benchmark_case(
    data: np.ndarray,
    *,
    degree: int,
    n_integral: int,
    cpu_processes: int,
    n_runs: int,
    warmup: bool = True,
) -> dict:
    """Запуск DFA на CPU/GPU и замер времени."""

    if warmup:
        _ = dfa(data, degree=degree, processes=1, n_integral=n_integral, backend="cpu")
        _ = dfa(data, degree=degree, processes=1, n_integral=n_integral, backend="gpu")
        _ = dfa_one_branch(data, degree=degree, processes=1, n_integral=n_integral, backend="gpu")

    cpu_times = []
    s_cpu = f2_cpu = None
    for _ in range(n_runs):
        t0 = time.perf_counter()
        s_cpu, f2_cpu = dfa(data, degree=degree, processes=cpu_processes, n_integral=n_integral, backend="cpu")
        cpu_times.append(time.perf_counter() - t0)

    gpu_fast_times = []
    s_gpu_fast = f2_gpu_fast = None
    for _ in range(n_runs):
        t0 = time.perf_counter()
        s_gpu_fast, f2_gpu_fast = dfa(data, degree=degree, processes=1, n_integral=n_integral, backend="gpu")
        gpu_fast_times.append(time.perf_counter() - t0)

    gpu_one_branch_times = []
    s_gpu_one_branch = f2_gpu_one_branch = None
    for _ in range(n_runs):
        t0 = time.perf_counter()
        s_gpu_one_branch, f2_gpu_one_branch = dfa_one_branch(
            data,
            degree=degree,
            processes=1,
            n_integral=n_integral,
            backend="gpu",
        )
        gpu_one_branch_times.append(time.perf_counter() - t0)

    out = {
        "cpu_times": np.array(cpu_times),
        "gpu_fast_times": np.array(gpu_fast_times),
        "gpu_one_branch_times": np.array(gpu_one_branch_times),
        "s_cpu": s_cpu,
        "f2_cpu": f2_cpu,
        "s_gpu_fast": s_gpu_fast,
        "f2_gpu_fast": f2_gpu_fast,
        "s_gpu_one_branch": s_gpu_one_branch,
        "f2_gpu_one_branch": f2_gpu_one_branch,
    }

    return out


## Запуск экспериментов

Для каждого набора данных:
- генерируем датасет;
- прогреваем CPU и оба GPU-варианта;
- меряем время `n_runs` раз;
- проверяем согласованность результата;
- считаем разницу относительно CPU.

In [10]:
results = []

for c in cases:
    name = c["name"]
    n_series = c["n_series"]
    length = c["length"]
    n_runs = c["n_runs"]

    print("\n" + "=" * 80)
    print(f"CASE: {name} | n_series={n_series} | length={length} | total={n_series*length:,}")

    data = make_dataset(n_series=n_series, length=length, seed=42)

    out = benchmark_case(
        data,
        degree=DEGREE,
        n_integral=N_INTEGRAL,
        cpu_processes=CPU_PROCESSES,
        n_runs=n_runs,
        warmup=True,
    )

    cpu_t = out["cpu_times"]
    gpu_fast_t = out["gpu_fast_times"]
    gpu_one_t = out["gpu_one_branch_times"]

    avg_cpu = float(np.mean(cpu_t))
    std_cpu = float(np.std(cpu_t, ddof=1)) if cpu_t.size > 1 else 0.0

    avg_gpu_fast = float(np.mean(gpu_fast_t))
    std_gpu_fast = float(np.std(gpu_fast_t, ddof=1)) if gpu_fast_t.size > 1 else 0.0
    speedup_fast = avg_cpu / avg_gpu_fast

    avg_gpu_one = float(np.mean(gpu_one_t))
    std_gpu_one = float(np.std(gpu_one_t, ddof=1)) if gpu_one_t.size > 1 else 0.0
    speedup_one = avg_cpu / avg_gpu_one

    row = {
        "case": name,
        "n_series": n_series,
        "length": length,
        "total_points": n_series * length,
        "degree": DEGREE,
        "n_integral": N_INTEGRAL,
        "cpu_processes": CPU_PROCESSES,
        "cpu_avg_s": avg_cpu,
        "cpu_std_s": std_cpu,
        "gpu_fast_avg_s": avg_gpu_fast,
        "gpu_fast_std_s": std_gpu_fast,
        "speedup_fast": speedup_fast,
        "gpu_one_branch_avg_s": avg_gpu_one,
        "gpu_one_branch_std_s": std_gpu_one,
        "speedup_one_branch": speedup_one,
    }

    s_cpu, f2_cpu = out["s_cpu"], out["f2_cpu"]
    s_gpu_fast, f2_gpu_fast = out["s_gpu_fast"], out["f2_gpu_fast"]
    s_gpu_one, f2_gpu_one = out["s_gpu_one_branch"], out["f2_gpu_one_branch"]

    np.testing.assert_array_equal(s_cpu, s_gpu_fast)
    np.testing.assert_array_equal(s_cpu, s_gpu_one)

    np.testing.assert_allclose(f2_cpu, f2_gpu_fast, rtol=1e-5)
    np.testing.assert_allclose(f2_cpu, f2_gpu_one, rtol=1e-5)

    row.update({k + "_fast": v for k, v in diff_stats(f2_cpu, f2_gpu_fast).items()})
    row.update({k + "_one_branch": v for k, v in diff_stats(f2_cpu, f2_gpu_one).items()})

    results.append({"row": row, "raw": out})

    print(f"CPU: {avg_cpu:.4f} ± {std_cpu:.4f} s (runs={cpu_t.size}, processes={CPU_PROCESSES})")
    print(f"GPU (fast): {avg_gpu_fast:.4f} ± {std_gpu_fast:.4f} s (runs={gpu_fast_t.size})")
    print(f"Speedup (fast): {speedup_fast:.2f}x")
    print(f"GPU (one-branch): {avg_gpu_one:.4f} ± {std_gpu_one:.4f} s (runs={gpu_one_t.size})")
    print(f"Speedup (one-branch): {speedup_one:.2f}x")



CASE: benchmark | n_series=10 | length=16384 | total=163,840


KeyboardInterrupt: 

In [None]:
# Сводная таблица результатов
try:
    import pandas as pd

    df = pd.DataFrame([r["row"] for r in results])
    display(df)
except Exception:
    df = [r["row"] for r in results]
    df

## Графики производительности

Ниже строим графики:
- среднее время CPU/GPU по наборам данных;
- ускорение vs общее число точек.

In [None]:
labels = [r["row"]["case"] for r in results]

total_points = np.array([r["row"]["total_points"] for r in results], dtype=float)

cpu_avg = np.array([r["row"]["cpu_avg_s"] for r in results], dtype=float)
cpu_std = np.array([r["row"]["cpu_std_s"] for r in results], dtype=float)

gpu_fast_avg = np.array([r["row"]["gpu_fast_avg_s"] for r in results], dtype=float)
gpu_fast_std = np.array([r["row"]["gpu_fast_std_s"] for r in results], dtype=float)
speedup_fast = np.array([r["row"]["speedup_fast"] for r in results], dtype=float)

gpu_one_avg = np.array([r["row"]["gpu_one_branch_avg_s"] for r in results], dtype=float)
gpu_one_std = np.array([r["row"]["gpu_one_branch_std_s"] for r in results], dtype=float)
speedup_one = np.array([r["row"]["speedup_one_branch"] for r in results], dtype=float)

fig, ax = plt.subplots(1, 2, figsize=(12, 4))

x = np.arange(len(labels))
width = 0.25
ax[0].bar(x - width, cpu_avg, width, yerr=cpu_std, label="CPU")
ax[0].bar(x, gpu_fast_avg, width, yerr=gpu_fast_std, label="GPU (fast)")
ax[0].bar(x + width, gpu_one_avg, width, yerr=gpu_one_std, label="GPU (one-branch)")
ax[0].set_xticks(x)
ax[0].set_xticklabels(labels)
ax[0].set_ylabel("Время, с")
ax[0].set_title("Время выполнения")
ax[0].legend()
ax[0].grid(True, alpha=0.3)

ax[1].plot(total_points, speedup_fast, marker="o", label="CPU / GPU (fast)")
ax[1].plot(total_points, speedup_one, marker="o", label="CPU / GPU (one-branch)")
ax[1].set_xscale("log")
ax[1].set_xlabel("Размер данных (число точек)")
ax[1].set_ylabel("Ускорение")
ax[1].set_title("Ускорение в зависимости от размера")
ax[1].legend()
ax[1].grid(True, which="both", alpha=0.3)

plt.tight_layout()
plt.show()


## Графики точности

Проверяем, что:
- значения масштабов совпадают;
- результаты совпадают с точностью `rtol=1e-5`;
- визуально сравниваем флуктуационную функцию и ошибки по масштабам (относительно CPU) для двух GPU-вариантов.

In [None]:
n = len(results)
fig, axes = plt.subplots(n, 2, figsize=(12, 4 * n))
if n == 1:
    axes = np.array([axes])

for i, r in enumerate(results):
    name = r["row"]["case"]
    s = r["raw"]["s_cpu"]
    f2_cpu = r["raw"]["f2_cpu"]
    f2_gpu_fast = r["raw"]["f2_gpu_fast"]
    f2_gpu_one = r["raw"]["f2_gpu_one_branch"]

    f_cpu_1d = reduce_f2(f2_cpu, agg="mean")
    f_fast_1d = reduce_f2(f2_gpu_fast, agg="mean")
    f_one_1d = reduce_f2(f2_gpu_one, agg="mean")

    ax0 = axes[i, 0]
    ax0.loglog(s, f_cpu_1d, marker="o", label="CPU (среднее по сериям)")
    ax0.loglog(s, f_fast_1d, marker="o", label="GPU (fast)")
    ax0.loglog(s, f_one_1d, marker="o", label="GPU (one-branch)")
    ax0.set_title(f"{name}: флуктуационная функция (CPU vs GPU)")
    ax0.set_xlabel("Масштаб окна")
    ax0.set_ylabel("Флуктуационная функция")
    ax0.grid(True, which="both", alpha=0.3)
    ax0.legend()

    ax1 = axes[i, 1]
    rel_fast = np.abs(f_cpu_1d - f_fast_1d) / (np.abs(f_cpu_1d) + 1e-15)
    rel_one = np.abs(f_cpu_1d - f_one_1d) / (np.abs(f_cpu_1d) + 1e-15)
    ax1.semilogx(s, rel_fast, marker="o", label="CPU / GPU (fast)")
    ax1.semilogx(s, rel_one, marker="o", label="CPU / GPU (one-branch)")
    ax1.set_title(f"{name}: относительная ошибка")
    ax1.set_xlabel("Масштаб окна")
    ax1.set_ylabel("Относительная ошибка")
    ax1.grid(True, which="both", alpha=0.3)
    ax1.legend()

    plt.tight_layout()
    plt.show()
