In [None]:
import math
from typing import List, Tuple, Dict, Any

def _percentile(a: List[float], p: float) -> float:
    """Непараметрический персентиль (линейная интерполяция). p в [0,1]."""
    if not a:
        return 0.0
    a_sorted = sorted(a)
    k = (len(a_sorted) - 1) * p
    f = math.floor(k)
    c = math.ceil(k)
    if f == c:
        return float(a_sorted[int(k)])
    return a_sorted[f] * (c - k) + a_sorted[c] * (k - f)

def _weighted_quantile(values: List[float], q: float, weights: List[float]) -> float:
    """
    Взвешенный квантиль q в [0,1]. Если веса нулевые/пусто, падение в невзвешенный персентиль.
    """
    if not values:
        return 0.0
    if not weights or sum(weights) <= 0:
        return _percentile(values, q)
    # Сортируем по значениям
    pairs = sorted(zip(values, weights), key=lambda x: x[0])
    vals = [v for v, _ in pairs]
    wts = [max(0.0, w) for _, w in pairs]
    total = sum(wts)
    target = q * total
    cum = 0.0
    for i, (v, w) in enumerate(zip(vals, wts)):
        prev = cum
        cum += w
        if cum >= target:
            # Линейная интерполяция внутри отрезка (если нужно)
            if w == 0:
                return float(v)
            frac = (target - prev) / w
            if i == 0:
                return float(v)
            return vals[i-1] * (1 - frac) + v * frac
    return float(vals[-1])

def _alpha_from_half_life(h: float) -> float:
    """Перевод половинного периода (в днях) в коэффициент EWMA."""
    h = max(1e-6, float(h))
    return 1.0 - 2.0 ** (-1.0 / h)

def order_qty(
    last_30_days: List[float],
    on_hand: float,
    on_order_arrivals_by_L: float,
    L: int,
    R: int,
    service_level: float = 0.95,
    h: float = 5.0,
    k: int = 5,
    m: int = 15,
    r_down: float = 0.7,
    h_after_break: float = 3.0,
    quantile_cap: float = 0.90,
    use_weighted_median: bool = False,
    return_details: bool = True,
) -> Tuple[int, Dict[str, Any]]:
    """
    Рассчитывает количество к заказу по политике "под уровень" с учётом нелинейного спроса.
    Шаги: winsorize, EWMA, детектор снижения режима, страховой запас.

    Параметры:
      last_30_days: продажи/день за последние 30 дней (список длиной <= 30, допускаются нули).
      on_hand: текущий складской остаток (шт).
      on_order_arrivals_by_L: уже заказано и приедет в течение L дней (шт).
      L: срок поставки (дни).
      R: период пересмотра (дни), обычно 1.
      service_level: целевой уровень сервиса (напр. 0.95 → z≈1.65).
      h: половинный период EWMA (чувствительность к последней неделе). Типично 5–7.
      k, m: окна детектора смены режима (последние k против предыдущих m).
      r_down: порог падения (если recent/prev < r_down → обнуляем состояние на низкий режим).
      h_after_break: временно более короткий half-life после детекта падения.
      quantile_cap: верхний квантиль для обрезки всплесков (winsorize), напр. 0.90.
      use_weighted_median: если True — использовать экспоненциально-взвешенную медиану вместо EWMA.
      return_details: если True, вернуть словарь с промежуточными расчётами.

    Возвращает:
      (q, details), где q — округлённое количество к заказу (int),
      details — словарь с диагностикой (уровень λ, дисперсия, прогноз, SS, S и т.п.).
    """
    # Санитация и обрезка входа
    d_raw = [max(0.0, float(x)) for x in last_30_days][-30:]
    if not d_raw:
        d_raw = [0.0]

    # 1) Winsorize по верхнему квантилю
    cap = _percentile(d_raw, max(0.0, min(1.0, quantile_cap)))
    d = [min(x, cap) for x in d_raw]

    # 2) EWMA: уровень λ и дисперсия остатков s^2
    alpha = _alpha_from_half_life(h)
    gamma = alpha
    # Инициализация λ и s2: среднее/дисперсия по первым min(5, len(d)) точкам
    init_n = min(5, len(d))
    lam = sum(d[:init_n]) / init_n
    # Несмещённость нам не критична: берём простую дисперсию
    mean0 = lam
    s2 = sum((x - mean0) ** 2 for x in d[:init_n]) / max(1, init_n)

    for x in d:
        e = x - lam
        s2 = gamma * (e * e) + (1.0 - gamma) * s2
        lam = alpha * x + (1.0 - alpha) * lam

    # 3) Детектор смены режима (падение спроса)
    mu_recent = mu_prev = ratio = None
    broke_regime = False
    if len(d) >= k + m and k > 0 and m > 0:
        mu_recent = sum(d[-k:]) / k
        mu_prev   = sum(d[-(k + m):-k]) / m
        if mu_prev > 0:
            ratio = mu_recent / mu_prev
            if ratio < r_down:
                # Перезапуск уровня на недавний режим
                lam = mu_recent
                alpha = _alpha_from_half_life(h_after_break)
                s2 = max(s2 * 0.5, 1e-6)
                broke_regime = True

    # (Опция) экспоненциально-взвешенная медиана вместо EWMA
    if use_weighted_median:
        ages = list(range(len(d)))[::-1]  # 0 для последнего дня
        weights = [0.5 ** (age / max(1e-6, h)) for age in ages]
        lam = _weighted_quantile(d, 0.5, weights)

    # 4) Заказ "под уровень"
    H = int(L) + int(R)
    H = max(1, H)
    z_table = {0.90: 1.28, 0.95: 1.65, 0.975: 1.96, 0.99: 2.33}
    # Берём ближайшее известное z или дефолт 1.65
    z = min(z_table.items(), key=lambda kv: abs(kv[0] - service_level))[1]

    demand_forecast = lam * H
    safety_stock = z * math.sqrt(max(s2, 1e-12)) * math.sqrt(H)
    S = demand_forecast + safety_stock

    q = max(0.0, S - float(on_hand) - float(on_order_arrivals_by_L))
    q_rounded = int(round(q))

    details = {
        "lambda_level": lam,
        "variance_s2": s2,
        "alpha": alpha,
        "cap_quantile": quantile_cap,
        "cap_value": cap,
        "detector": {
            "broke_regime": broke_regime,
            "k": k, "m": m, "r_down": r_down,
            "mu_recent": mu_recent, "mu_prev": mu_prev, "ratio": ratio,
        },
        "h": h, "h_after_break": h_after_break,
        "H": H, "z": z,
        "demand_forecast_H": demand_forecast,
        "safety_stock": safety_stock,
        "target_S": S,
        "on_hand": float(on_hand),
        "on_order_arrivals_by_L": float(on_order_arrivals_by_L),
        "q_continuous": q,
    }

    return (q_rounded, details) if return_details else (q_rounded, {})


In [None]:

last_30 = [
    26, 25, 28, 20, 22, 9, 19, 15, 9, 11, 26, 37, 32, 37, 35, 24, 23, 21, 40, 21, 11, 14, 17, 22, 27, 25, 25, 29, 24, 17
]
last_30.reverse()
on_hand = 1860

In [None]:
last_30 = [
    26, 33, 36, 56, 66, 51, 51, 121, 217, 296, 263, 313, 285, 188, 237, 265, 202, 139, 202, 156, 185, 185, 123, 132, 87, 108, 50, 64, 70, 68
]
last_30.reverse()
# on_hand = 1484
on_hand = 17

In [None]:
# Продажи за 30 дней: низко → пик в середине → снова низко
# last_30 = []
# on_hand = 1860
on_order_arrivals_by_L = 0
L = 7   # доставка 7 дней
R = 1   # проверяем каждый день

q, info = order_qty(
    last_30, on_hand, on_order_arrivals_by_L, L, R,
    service_level=0.95,
    h=5, k=5, m=15, r_down=0.7, h_after_break=3,
    quantile_cap=0.90,
    use_weighted_median=False,   # включите True, если хотите медиану
    return_details=True
)

print("Количество к заказу:", q)
print("Ключевые детали:", {k: info[k] for k in ["lambda_level","demand_forecast_H","safety_stock","target_S"]})
print("Детектор смены режима:", info["detector"])