# Лабораторная работа №4. Единая математическая структура в разнородных системах.
## Задача 1. Задача от эндокринолога. 

«У меня есть пациенты с нарушением регуляции сахара в крови. Иногда у них случаются резкие приступы гипогликемии. Я хочу понять,
по какому принципу организм в норме управляет выбросом глюкозы из печени, чтобы смоделировать сбой этого процесса. Помогите мне
формализовать эту систему. Какие параметры мне нужно измерить у пациента?» 

Ограничения:  
• Уровень глюкозы можно измерить только дискретно (анализы крови)  
• Нельзя напрямую измерить "сигнал печени"о выбросе глюкозы  
• Индивидуальные вариации параметров у разных пациентов достигают
30-40%  
• Время реакции системы зависит от состояния пациента  

Выберем основные параметры системы:  
•$G(t)$ - концентрация глюкозы в крови в момент времени $t$ (ммоль/л);  
•$G_{crit}$ - критический (пороговый) уровень глюкозы, при достижении которого печень получает сигнал на вырос (ммоль/л);  
•$G_{inf}$  - естественный уровень глюкозы;  
•$B$ - фиксированный объем глюкозы, который печень вырасывает в кровь по сигналу (ммоль);   
•$\tau$ - время реакции системы (задержка между достижением порога и фактическим выросом глюкозы) (мин или ч);  
•$k$ - скорость потребления глюкозы тканями организма (ммоль/л в час).

Основные гипотезы:  
1. Организм стремится поддерживать уровень глюкозы в некотором рабочем диапазоне.  
2. При падении концентрации глюкозы до порогового уровня $G_{crit}$ активируется механизм экстренного пополнения.  
3. Пополнение представляет собой мгновенный выброс фиксированного количества глюкозы $B$.  
4. В промежутках между выбросами глюкоза потребляется тканями организма с постоянной скоростью $k$.  
5. Между выбросами концентрация глюкозы в крови меняется непрерывно.   
6. В начальный момент времени концентрация глюкозы в крови - $C_0$.   

Рассмотрим скорость изменения количетства глюкозы в крови в течение некоторого промежутка времени:

$\frac{ΔG}{Δt} = (0 - k)(G - G_{inf})$   (приток - отток)*(релаксация к  равновесию)  

Выполним предельный переход:

$lim_{Δt \to 0} \frac{ΔG}{Δt} =lim_{Δt \to 0} (-k(G - G_{inf}))$ 


$\frac{dG}{dt} = -k(G - G_{inf})$ - дифференциальное уравнение, описывающее поведение системы между выбросами глюкозы.

Начальные условия: $G(t_0)=G_0$.

Решение ДУ: $G(t)=G_{inf}+(G_0 - G_{inf})e^{-k(t-t_0)}$


Однако, исходя из поставленных гипотез, непрервной модель является только между выбросами глюкозы, которые имеют дискретную структуру.


Условие выброса: $G(t) \le G~crit~$.  
Правило пересчета: $G(t^+_{выбр.})=G(t^-_{выбр.}) + B$, где $t_{выбр.}$ - момент времени, в который происходит выброс глюкозы.

Программная реализация модели, для которой были взяты следующие начальные оценки параметров:  
• $k≈0.01-0.05 мин^{-1}$;  
• $B≈0.5–3 ммоль/л$;  
• $G_{inf}≈3-5 ммоль/л$;  
• $G_{crit}≈4-6 ммоль/л$;  
• $\tau≈5-30 мин$.  
Будем считать, что вариабельность параметров составляет примерно 35%.  
При этом для реализации модели генерируются аналоги проведенных измерений уровня глюкозы (решение уравнения с некоторым шумом, который симулирует погрешность измерений).

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from PIL._imaging import display
from scipy.optimize import least_squares

try:
    from caas_jupyter_tools import display_dataframe_to_user
    _have_display = True
except Exception:
    _have_display = False

np.random.seed(42)

# принятые согласно выдвинутым приближениям значения истинных параметров
true_params = {
    'k': 0.03,
    'G_inf': 3.2,
    'B': 1.1,
    'G_th': 4.9,
    'tau': 10.0
}

# Межиндивидуальная вариабельность
patient_variation = 0.35
patient_params = {k: v * (1 + patient_variation * (2 * np.random.rand() - 1)) for k, v in true_params.items()}


# Решение модели (симуляция пациента, на данных которого строится модель)
def simulate_patient(params, t_max=600, dt=0.5, sample_dt=15.0, noise_sd=0.12):
    """
    Что делает функция:
      - Непрерывно интегрируется dG/dt = -k*(G-G_inf)
      - При достижении G <= G_th планируется выброс(bolus) через tau минут
      - В момент выброса создается мгновенный скачок +B
      - Возвращает временной ряд, дискретные замеры с шумом и истинные значения в точках сэмплов
    """
    k = params['k']
    G_inf = params['G_inf']
    B = params['B']
    G_th = params['G_th']
    tau = params['tau']

    n_steps = int(np.ceil(t_max / dt)) + 1
    times = np.linspace(0, t_max, n_steps)
    G = np.zeros_like(times)

    G[0] = max(6.0, G_th + 0.5)

    pending_bolus_times = []  # очередь временных моментов выброса глюкозы

    for i in range(1, len(times)):
        t = times[i]
        # если пришло время выброса, применяем к предыдущему значению (упрощение)
        while pending_bolus_times and pending_bolus_times[0] <= t + 1e-9:
            # имитируем выброс — прибавляем B к последнему известному значению
            G[i - 1] += B
            pending_bolus_times.pop(0)
        # интегрируем экспоненциальное убывание
        G[i] = G[i - 1] + dt * (-k * (G[i - 1] - G_inf))
        # условие генерации сигнала
        if G[i] <= G_th:
            t_bolus = t + tau
            # смотрим на наличие ближайшего выброса, чтобы не создавать повторных
            if not pending_bolus_times or pending_bolus_times[-1] < t_bolus - 1e-6:
                pending_bolus_times.append(t_bolus)
                pending_bolus_times.sort()

    # Дискретные измерения и шум
    sample_times = np.arange(0, times[-1] + 1e-9, sample_dt)
    sample_indices = np.searchsorted(times, sample_times)
    true_samples = G[sample_indices]
    noisy_samples = true_samples + np.random.normal(scale=noise_sd, size=true_samples.shape)
    return times, G, sample_times, noisy_samples, true_samples

# x0 = np.array([0.025, 3.5, 1.6, 5.0, 8.0])
times0, G_initial, sample_times, true_samples, measurements_noisy = simulate_patient({
    'k': true_params['k'], 'G_inf': true_params['G_inf'], 'B': true_params['B'], 'G_th': true_params['G_th'],
    'tau': true_params['tau'], 'G0': 6.0
})

plt.figure(figsize=(12, 5))
plt.plot(times0, G_initial, label="Модель с исходными параметрами", linewidth=2)
plt.scatter(sample_times, measurements_noisy, color="red", label="Измерения", zorder=5)
plt.xlabel("Время, мин")
plt.ylabel("Глюкоза, ммоль/л")
plt.title("График модели")
plt.grid(True)
plt.legend()
plt.show()

![im1](https://raw.githubusercontent.com/Udav6/Mathem_model_2025/main/Fig_L4_1.png)

Далее оптимизируем модель с помощью метода наименьших квадратов. То есть найдем такие значения параметров $θ = (k, G_{inf}, B, G_{crit}, \tau)$, чтобы функционал:  
$J(\theta)=\sum_{j} (G_{исх.}(t_j;\theta)-G_{опт.}(t_j)) \to min$

In [None]:
#Оптимизация модели
def simulate_given_params(theta, t_max=480, dt=0.5, sample_dt=15.0):
    """theta = [k, G_inf, B, G_th, tau]"""
    params = {'k': theta[0], 'G_inf': theta[1], 'B': theta[2], 'G_th': theta[3], 'tau': theta[4]}
    _, _, sample_times, _, true = simulate_patient(params, t_max=t_max, dt=dt, sample_dt=sample_dt, noise_sd=0.0)
    return sample_times, true

def residuals(theta, observed_samples, t_max=480, dt=0.5, sample_dt=15.0):
    # штрафуем невалидные (<=0) параметры
    if np.any(np.array(theta) <= 0):
        return np.ones_like(observed_samples) * 1e3
    sample_times, model_samples = simulate_given_params(theta, t_max=t_max, dt=dt, sample_dt=sample_dt)
    return model_samples - observed_samples


#Симуляция "пациента"
times, G_trace, sample_times, measurements_noisy, measurements_true = simulate_patient(
    patient_params, t_max=480, dt=0.5, sample_dt=15.0, noise_sd=0.12)

#Начальное приближение и границы
#x0 = np.array([0.025, 3.5, 1.6, 5.0, 8.0])  #стартовые оценки: [k, G_inf, B, G_th, tau]
x0 = np.array([true_params['k'], true_params['G_inf'], true_params['B'],  true_params['G_th'],  true_params['tau']])
lower_bounds = [1e-4, 1.0, 0.1, 3.0, 1.0]
upper_bounds = [0.2, 8.0, 6.0, 8.0, 60.0]

res = least_squares(residuals, x0, bounds=(lower_bounds, upper_bounds),
                    args=(measurements_noisy, 480, 0.5, 15.0),
                    verbose=2, xtol=1e-6, ftol=1e-6, max_nfev=200)

fitted = res.x
fitted_params = {'k': fitted[0], 'G_inf': fitted[1], 'B': fitted[2], 'G_th': fitted[3], 'tau': fitted[4]}

times_fit, G_fit, sample_times_fit, noisy_fit, true_fit = simulate_patient(fitted_params, t_max=480, dt=0.5, sample_dt=15.0, noise_sd=0.0)

#Вывод результатов
print("\nВыбранные параметры:")
for k,v in patient_params.items():
    print(f"  {k} = {v:.6f}")
print("\nНайденные параметры:")
for k,v in fitted_params.items():
    print(f"  {k} = {v:.6f}")

#График
plt.figure(figsize=(11,6))
plt.plot(times, G_trace, label='Исходная', linewidth=1)
plt.plot(times_fit, G_fit, label='Оптимизированная модель', linewidth=1)
plt.scatter(sample_times, measurements_noisy, label='Зашумленные измерения', marker='o')
plt.xlabel('Время, мин.')
plt.ylabel('Глюкоза, ммоль/л')
plt.legend()
plt.title('Модель выброса глюкозы после оптимизации')
plt.grid(True)
plt.tight_layout()
plt.savefig('glucose_bolus_fit.png', dpi=150)
plt.show()

#Таблица сравнения (в точках измерений)
df = pd.DataFrame({
    'Время': sample_times,
    'Измеренный шум': measurements_noisy,
    'Исходная модель': measurements_true,
    'Оптимизировання модель': np.interp(sample_times, times_fit, G_fit)
})
if _have_display:
    display_dataframe_to_user("Сравнение измерений и модели", df)
else:
    print("\nПервые строки сравнения измерений и модели:")
    print(df.head())

## результаты

In [None]:
D:\Mathem_model\Attempt\Scripts\python.exe D:\Mathem_model\L1.py 
   Iteration     Total nfev        Cost      Cost reduction    Step norm     Optimality   
       0              1         8.9041e+02                                    2.32e+09    
       1              2         8.9041e+02      0.00e+00       0.00e+00       2.32e+09    
`xtol` termination condition is satisfied.
Function evaluations 2, initial cost 8.9041e+02, final cost 8.9041e+02, first-order optimality 2.32e+09.

Выбранные параметры:
  k = 0.027365
  G_inf = 4.209600
  B = 1.278635
  G_th = 5.238399
  tau = 7.592130

Найденные параметры:
  k = 0.030000
  G_inf = 3.200000
  B = 1.100000
  G_th = 4.900000
  tau = 10.000000


![im1](https://raw.githubusercontent.com/Udav6/Mathem_model_2025/main/Fig_L4_2.png)

In [None]:
Первые строки сравнения измерений и модели:
   Время  Измеренный шум  Исходная модель  Оптимизировання модель
0    0.0        5.894329         6.000000                6.000000
1   15.0        5.258202         5.393868                4.979283
2   30.0       11.229673        11.213542               12.682263
3   45.0       19.026905        18.957051               18.492092
4   60.0       14.070896        13.964366               12.917484

## ковариационная оценка параметров

In [None]:

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from numpy.linalg import inv, LinAlgError
from scipy.optimize import least_squares

# ковариационная оценка параметров
param_names = ['k','G_inf','B','G_th','tau']
m = len(measurements_noisy)
p = len(fitted)

resid = res.fun
ssq = np.sum(resid**2)
dof = max(1, m - p)
sigma2 = ssq / dof

J = res.jac
try:
    JTJ_inv = inv(J.T @ J)
    cov_theta = sigma2 * JTJ_inv
    se = np.sqrt(np.abs(np.diag(cov_theta)))
    ci_lower_cov = fitted - 1.96 * se
    ci_upper_cov = fitted + 1.96 * se
except:
    se = ci_lower_cov = ci_upper_cov = np.full(p, np.nan)

df_cov = pd.DataFrame({
    'Параметр': param_names,
    'Оценка': fitted,
    'Станд. ошибка (ков.)': se,
    'Нижн. граница 95% ДИ (ков.)': ci_lower_cov,
    'Верхн. граница 95% ДИ (ков.)': ci_upper_cov
})


n_boot = 60
boots = []
fitted_model_samples = simulate_given_params(fitted, 480, 0.5, 15.0)[1]
residuals_obs = measurements_noisy - fitted_model_samples

print(f"\nВыполняю бутстрэп ({n_boot} прогонов)...")

for _ in range(n_boot):
    resamp = np.random.choice(residuals_obs, size=len(residuals_obs), replace=True)
    meas_b = fitted_model_samples + resamp
    try:
        r_b = least_squares(
            residuals, x0, bounds=(lower_bounds, upper_bounds),
            args=(meas_b,480,0.5,15.0),
            max_nfev=200
        )
        boots.append(r_b.x)
    except:
        pass

boots = np.array(boots)
if len(boots) > 0:
    df_boot = pd.DataFrame({
        'Параметр': param_names,
        'Среднее (bootstrap)': boots.mean(axis=0),
        'Станд. откл. (bootstrap)': boots.std(axis=0),
        'Нижн. граница 95% ДИ (bootstrap)': np.percentile(boots, 2.5, axis=0),
        'Верхн. граница 95% ДИ (bootstrap)': np.percentile(boots, 97.5, axis=0)
    })
else:
    df_boot = pd.DataFrame()


def compute_metrics_trace(times, G, B_est, threshold=3.5):
    dt = times[1] - times[0]
    return (
        float(np.min(G)),
        float((G < threshold).sum() * dt),
        int((np.diff(G) > 0.7 * B_est).sum())
    )

# норма
times_n, G_n, *_ = simulate_patient(fitted_params,480,0.5,1.0,0.0)
minG_n, time_below_n, bolus_n = compute_metrics_trace(times_n, G_n, fitted_params['B'])

# критическая
params_c = fitted_params.copy()
params_c['k'] *= 1.5
params_c['tau'] *= 1.5
times_c, G_c, *_ = simulate_patient(params_c,480,0.5,1.0,0.0)
minG_c, time_below_c, bolus_c = compute_metrics_trace(times_c, G_c, params_c['B'])

# Пиковая нагрузка
def simulate_peak(params):
    t_max = 480; dt = 0.5
    times = np.arange(0, t_max+dt, dt)
    G = np.zeros_like(times)
    G[0] = max(6.0, params['G_th'] + 0.5)
    for i in range(1, len(times)):
        spike = 3.0 if 90 < times[i] < 150 else 1.0
        G[i] = G[i-1] + dt*(-params['k']*spike*(G[i-1]-params['G_inf']))
    return times, G

times_p, G_p = simulate_peak(fitted_params)
minG_p, time_below_p, bolus_p = compute_metrics_trace(times_p, G_p, fitted_params['B'])

print("\n=== Ковариационная оценка параметров ===")
def display(df_cov):
    pass

display(df_cov)
print(df_cov)

print("\n=== Bootstrap-оценки параметров ===")
pd.set_option('display.max_columns', None)
display(df_boot)
print(df_boot)


print("\n=== Верификация сценариев ===")
print(f"Норма: минимум глюкозы = {minG_n:.2f}, время ниже порога = {time_below_n:.1f} мин, болюсов = {bolus_n}")
print(f"Критическая: минимум = {minG_c:.2f}, ниже порога = {time_below_c:.1f} мин, болюсов = {bolus_c}")
print(f"Пиковая нагрузка: минимум = {minG_p:.2f}, ниже порога = {time_below_p:.1f} мин, болюсов = {bolus_p}")


plt.figure(figsize=(10,4))
plt.plot(times, G_trace, label='Глюкоза пациента (истинная)')
plt.scatter(sample_times, measurements_noisy, s=20, c='red', label='Измерения с шумом')
plt.plot(times, simulate_patient(fitted_params,480,0.5,1.0,0.0)[1], '--', label='Модель с параметрами (после оптимизации)')
plt.axhline(3.5, color='k', linestyle=':', label='Порог гипогликемии 3.5 ммоль/л')
plt.xlabel('Время (мин)')
plt.ylabel('Глюкоза (ммоль/л)')
plt.title('Подгонка модели')
plt.legend()
plt.grid(True)
plt.show()

plt.figure(figsize=(10,4))
plt.plot(times_n, G_n, label='Норма')
plt.plot(times_c, G_c, label='Критическая ситуация')
plt.plot(times_p, G_p, label='Пиковая нагрузка')
plt.axhline(3.5, linestyle=':', color='k', label='Порог гипогликемии')
plt.xlabel('Время (мин)')
plt.ylabel('Глюкоза (ммоль/л)')
plt.title('Верификация сценариев')
plt.legend()
plt.grid(True)
plt.show()


## результаты:

In [None]:

Выполняю бутстрэп (60 прогонов)...

=== Ковариационная оценка параметров ===
  Параметр  Оценка  ...  Нижн. граница 95% ДИ (ков.)  Верхн. граница 95% ДИ (ков.)
0        k    0.03  ...                          NaN                           NaN
1    G_inf    3.20  ...                          NaN                           NaN
2        B    1.10  ...                          NaN                           NaN
3     G_th    4.90  ...                          NaN                           NaN
4      tau   10.00  ...                          NaN                           NaN

[5 rows x 5 columns]

=== Bootstrap-оценки параметров ===
  Параметр  Среднее (bootstrap)  Станд. откл. (bootstrap)  \
0        k             0.030166              1.518043e-03   
1    G_inf             3.213150              1.345409e-01   
2        B             1.095788              2.532222e-02   
3     G_th             4.900000              8.881784e-16   
4      tau            10.000000              8.670056e-08   

   Нижн. граница 95% ДИ (bootstrap)  Верхн. граница 95% ДИ (bootstrap)  
0                          0.029882                           0.030507  
1                          3.199958                           3.201038  
2                          1.046402                           1.101441  
3                          4.900000                           4.900000  
4                         10.000000                          10.000000  

=== Верификация сценариев ===
Норма: минимум глюкозы = 4.48, время ниже порога = 0.0 мин, болюсов = 100
Критическая: минимум = 4.09, ниже порога = 0.0 мин, болюсов = 90
Пиковая нагрузка: минимум = 3.20, ниже порога = 406.5 мин, болюсов = 0

![im1](https://raw.githubusercontent.com/Udav6/Mathem_model_2025/main/Fig_L4_3.png)

![im1](https://raw.githubusercontent.com/Udav6/Mathem_model_2025/main/Fig_L4_4.png)

# Задача 2. Задача от нейробиолога.

«Мы изучаем, как нейроны поддерживают непрерывную передачу сигналов, не истощая запас нейромедиаторов. Есть гипотеза, что существует некий "порог тревоги при котором запускается пополнение. Постройте модель, которая объяснила бы, как нейрон "решает что пора синтезировать новую порцию медиатора, и почему передача не прерывается.»

Подсказка 1: Нейрон не может позволить себе полностью исчерпать запас нейромедиатора — это парализует коммуникацию. Мы видим, что процесс синтеза новой порции запускается заблаговременно, когда запас ещё есть. Интересно, что время на подготовку "партии" довольно велико и постоянно.
Подсказка 2: Рассмотрите систему с постоянным временем "производства"нового ресурса и пороговым значением для запуска этого производства.

Ограничения:
• Невозможно измерить точный запас нейромедиатора in vivo.
• Можно оценивать только косвенные показатели (ЭЭГ, МРТ).
• Время синтеза может варьироваться в зависимости от состояния нейрона.
• Размер "партии"зависит от активности предыдущих циклов.

Основные гипотезы:

Запас нейромедиатора уменьшается непрерывно пропорционально активности (чем выше активность, тем быстрее расход).
Когда запас спадает до порогового значения, то нейрон заблаговременно сигналит о необходимости новой партии.
Время синтеза новой партии постоянно.
По окончании подготовки накапливается новая партия, которая мгновенно пополняет запас, при этос размер партии напрямую зависит от текущего запаса.
Замеряемые данные имеют шум (воздействие аппаратов ЭЭГ или МРТ).

Основные параметры системы:  
• $N(t)$ - запас нейромедиатора в момент времени $t$;  
• $A(t) \ge 0$ - активность мозга в момент времени $t$ (представляет собой расход нейромедиатора в конкрентном временном отсчете);  
• $N_{порог.}$ - пороговое значение запаса нейромедиатора;  
• $B_n$ - партия нейромедиатора для пополнения,  
$B_n = B_{нач.}(1 + \alpha C_{норм.})$, где
$B_{нач.}$ - начальный объем пополнения, $C_{норм.}$ - нормированная величина потребления, накопленного с прошлого пополнения.  
• $P(t)$ - значение зашумленных данных.  
• $k>0$ - коэффициент расхода медиатора на единицу активности.    
• $\tau$ - время подготовки синтеза новой партии.   
• $\alpha \ge 0$ - коэффициент адаптации размера партии к предшествующей активности.    
• $s>0$ - масштаб дискретных наблюдений $P(t)$.    
• $\epsilon$ - шум наблюдения. 



Рассмотрим изменение запаса нейромедиатора в некоторый промежуток времени $\Delta t$:

$\Delta N(t) = -k \Delta A(t) \Delta t$, где $k$ - коэффициент расхода на единицу активности  

В таком случае скорость изменения:

$\frac{\Delta N(t)}{\Delta t} = -k \Delta A(t)$  

Выполним предельный переход:  

$lim_{Δt \to 0} \frac{\Delta N(t)}{\Delta t} = -k A(t) $

Получаем дифференциальное уравнение:  

$\frac{dN(t)}{dt} = -k A(t)$ для $t\not\in{t_n}$, где $t_n$ - моменты выброса. 

При этом в моменты времени $t_n$ происходит мгновенный выброс нейромедиатора:

$N(t^+_n) = N(t^-_n) + B_n$.  

Сигнал на синтез формируется при первом времени $t_{sig}: N(t_{sig}) \le N_{порог.}$, т.е. $t_n = t_{sig} + \tau$  

При этом для накопленного потребления $C(t)$ имеем:  

$\frac{dC(t)}{dt} = k A(t)$ и при каждом $t_n$: $C(t^+_n)=0$  

$C_{норм.} = \frac{C(t^-_n)}{C_{max}}$

Полученные наблюдения опишем следующим образом:

$P(t_j) = s N(t_j) + \epsilon_j$

Оптимизация будет вестись, аналогично предыдущей задаче, методом наименьших квадратов по параметрам: $k$, $B_{нач.}$, $N_{порог.}$, $\tau$, $\alpha$, $s$.  

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from PIL._imaging import display
from scipy.optimize import least_squares

np.random.seed(42)

# генерация активности мозга
def generate_activity(t_array, base_rate=1.0, amp=0.7, freq=0.02, noise_sd=0.2):
    raw = base_rate + amp * np.sin(2 * np.pi * freq * t_array) + noise_sd * np.random.randn(len(t_array))
    return np.maximum(0.0, raw)

# модель изменения количества нейромедиатора
def simulate_neuron(params, t_max=550.0, dt=0.5, sample_dt=10.0, noise_sd=0.03, activity=None):
    k = params['k']
    N_inf = params.get('N_inf', 0.0)
    B_base = params['B_base']
    N_th = params['N_th']
    tau = params['tau']
    alpha = params.get('alpha', 0.0)
    N0 = params.get('N0', 1.0)
    s = params.get('s', 1.0)

    times = np.arange(0.0, t_max + dt / 2, dt)
    n_steps = len(times)
    N = np.zeros(n_steps)
    N[0] = N0

    # очередь запланированных окончаний синтеза (времена, когда партия готова)
    scheduled = []
    # накопленное потребление с последнего пополнения
    consumption_since_last = 0.0

    # активность A(t)
    if activity is None:
        activity = generate_activity(times, base_rate=1.0, amp=0.9, freq=0.02, noise_sd=0.15)
    else:
        activity = np.array(activity)
    # верхняя оценка возможного потребления для нормировки накопления
    max_possible_consumption = (k * np.sum(activity) * dt) + 1e-9

    for i in range(1, n_steps):
        t = times[i - 1]
        A = activity[i - 1]

        # если запас опустился до порога — сигналим о запуске синтеза (один сигнал до готовности)
        if N[i - 1] <= N_th and (not scheduled or scheduled[-1] < t - 1e-6):
            t_bolus_ready = t + tau
            scheduled.append(t_bolus_ready)

        # применяем готовые партии (если время пришло)
        bolus_sum = 0.0
        new_sched = []
        for bt in scheduled:
            if bt <= t + dt / 2:
                # адаптивный размер партии: в пропорции к накопленному потреблению
                C_norm = min(1.0, consumption_since_last / (max_possible_consumption + 1e-12))
                Bn = B_base * (1.0 + alpha * C_norm)
                bolus_sum += Bn
                # после применения обнуляем накопление
                consumption_since_last = 0.0
            else:
                new_sched.append(bt)
        scheduled = new_sched

        # расход за шаг
        dN_consume = k * A * dt
        consumption_since_last += dN_consume

        # обновление запаса (не опускаем ниже N_inf)
        N[i] = max(N_inf, N[i - 1] - dN_consume + bolus_sum)

    # дискретные наблюдения прокси P = s * N + шум
    sample_times = np.arange(0.0, times[-1] + 1e-9, sample_dt)
    sample_indices = np.searchsorted(times, sample_times)
    true_samples = N[sample_indices]
    measurements_noisy = s * true_samples + np.random.normal(scale=noise_sd, size=true_samples.shape)

    return times, N, sample_times, measurements_noisy, true_samples, activity


# генерация исходных данных
true_params = {
    'k': 0.003,  # расход на единицу активности
    'N_inf': 0.0,
    'B_base': 0.5,
    'N_th': 0.30,
    'tau': 25.0,
    'alpha': 1.5,
    'N0': 1.2,
    's': 1.1
}

t_max = 500.0
dt = 0.5
sample_dt = 10.0
noise_sd = 0.02

times, N_true, sample_times, measurements_noisy, true_samples, activity = simulate_neuron(
    true_params, t_max=t_max, dt=dt, sample_dt=sample_dt, noise_sd=noise_sd, activity=None
)

print("Сгенерированы наблюдения прокси (косвенные измерения).")
print(f"Число точек наблюдения: {len(sample_times)}; интервал: {sample_dt} мин.")


# оптимизируемый функционал
def simulate_given_theta(theta, t_max=600.0, dt=0.5, sample_dt=10.0, activity=None):
    params = {
        'k': float(theta[0]),
        'N_inf': 0.0,
        'B_base': float(theta[1]),
        'N_th': float(theta[2]),
        'tau': float(theta[3]),
        'alpha': float(theta[4]),
        'N0': 1.0,
        's': float(theta[5])
    }
    _, _, sample_times_model, measurements_model, true_samples_model, _ = simulate_neuron(
        params, t_max=t_max, dt=dt, sample_dt=sample_dt, noise_sd=0.0, activity=activity
    )
    return sample_times_model, measurements_model, true_samples_model


def residuals(theta, observed, t_max, dt, sample_dt, activity):
    theta = np.array(theta, dtype=float)
    if np.any(theta <= 0):
        return np.ones_like(observed) * 1e3
    _, model_meas, _ = simulate_given_theta(theta, t_max=t_max, dt=dt, sample_dt=sample_dt, activity=activity)
    return model_meas - observed


# начальные приближения и границы
x0 = np.array([0.0030, 0.5, 0.4, 25.0, 0.5, 0.9])
lower = [1e-5, 0.05, 0.05, 5.0, 0.0, 0.1]
upper = [0.01, 2.0, 0.9, 120.0, 3.0, 2.0]

res = least_squares(residuals, x0, bounds=(lower, upper),
                    args=(measurements_noisy, t_max, dt, sample_dt, activity),
                    verbose=2, xtol=1e-6, ftol=1e-6, max_nfev=400)

fitted = res.x
fitted_params = {
    'k': float(fitted[0]),
    'B_base': float(fitted[1]),
    'N_th': float(fitted[2]),
    'tau': float(fitted[3]),
    'alpha': float(fitted[4]),
    's': float(fitted[5])
}

# построение графиков
# модель до оптимизации
params_init = {'k': x0[0], 'N_inf': 0.0, 'B_base': x0[1], 'N_th': x0[2], 'tau': x0[3], 'alpha': x0[4], 'N0': 1.0,
               's': x0[5]}
t_init, N_init, sample_t_init, meas_init, true_init, _ = simulate_neuron(params_init, t_max=t_max, dt=dt,
                                                                         sample_dt=sample_dt, noise_sd=0.0,
                                                                         activity=activity)

# модель после оптимизации
params_fit_full = {'k': fitted_params['k'], 'N_inf': 0.0, 'B_base': fitted_params['B_base'],
                   'N_th': fitted_params['N_th'], 'tau': fitted_params['tau'], 'alpha': fitted_params['alpha'],
                   'N0': 1.0, 's': fitted_params['s']}
t_fit, N_fit, sample_t_fit, meas_fit, true_fit, _ = simulate_neuron(params_fit_full, t_max=t_max, dt=dt,
                                                                    sample_dt=sample_dt, noise_sd=0.0,
                                                                    activity=activity)

plt.figure(figsize=(12, 9))
ax1 = plt.subplot(3, 1, 1)
ax1.plot(times, activity, label="Активность A(t)")
ax1.set_ylabel("Активность (отн.)")
ax1.set_title("Входной сигнал: активность нейрона")
ax1.grid(True)
ax1.legend()

ax2 = plt.subplot(3, 1, 2)
ax2.plot(times, N_true, label="Истинный запас N(t)")
ax2.plot(t_init, N_init, '--', label="Модель до оптимизации (N)")

ax2.set_ylabel("Нормированный запас N")
ax2.grid(True)
ax2.legend()
ax3 = plt.subplot(3, 1, 3)
ax3.plot(sample_times, measurements_noisy, 'o', label="Наблюдаемые данные P(t) (шум)")
ax3.plot(sample_t_init, meas_init, '--', label="Модель наблюдений до оптимизации")
ax3.grid(True)
plt.show()

plt.figure(figsize=(12, 9))

ax1 = plt.subplot(3, 1, 1)
ax1.plot(times, activity, label="Активность A(t)")
ax1.set_ylabel("Активность (отн.)")
ax1.set_title("Входной сигнал: активность нейрона")
ax1.grid(True)
ax1.legend()

ax2 = plt.subplot(3, 1, 2)
ax2.plot(times, N_true, label="Истинный запас N(t)")
ax2.plot(t_init, N_init, '--', label="Модель до оптимизации (N)")
ax2.plot(t_fit, N_fit, '-.', label="Модель после оптимизации (N)")
ax2.set_ylabel("Нормированный запас N")
ax2.grid(True)
ax2.legend()

ax3 = plt.subplot(3, 1, 3)
ax3.plot(sample_times, measurements_noisy, 'o', label="Наблюдаемые данные P(t) (шум)")
ax3.plot(sample_t_init, meas_init, '--', label="Модель наблюдений до оптимизации")
ax3.plot(sample_t_fit, meas_fit, '-', label="Модель наблюдений после оптимизации")
ax3.set_xlabel("Время, мин")
ax3.set_ylabel("Прокси P (ед.)")
ax3.grid(True)
ax3.legend()

plt.tight_layout()
plt.show()

# cравнение параметров в виде таблицы
df = pd.DataFrame([
    {'параметр': 'k', 'исходное': true_params['k'], 'найденное': fitted_params['k']},
    {'параметр': 'B_base', 'исходное': true_params['B_base'], 'найденное': fitted_params['B_base']},
    {'параметр': 'N_th', 'исходное': true_params['N_th'], 'найденное': fitted_params['N_th']},
    {'параметр': 'tau', 'исходное': true_params['tau'], 'найденное': fitted_params['tau']},
    {'параметр': 'alpha', 'исходное': true_params['alpha'], 'найденное': fitted_params['alpha']},
    {'параметр': 's', 'исходное': true_params['s'], 'найденное': fitted_params['s']},
])
print("\nСравнение параметров (иcходное vs найденное):")
print(df.to_string(index=False))


## результаты:

In [None]:
Сгенерированы наблюдения прокси (косвенные измерения).
Число точек наблюдения: 51; интервал: 10.0 мин.
   Iteration     Total nfev        Cost      Cost reduction    Step norm     Optimality   
       0              1         4.8226e+00                                    1.13e+01    
       1              2         3.6052e+00      1.22e+00       1.22e-03       9.26e+00    
       2              3         2.2831e+00      1.32e+00       1.31e-03       8.15e+00    
       3              7         2.2831e+00      0.00e+00       0.00e+00       8.15e+00    
`xtol` termination condition is satisfied.
Function evaluations 7, initial cost 4.8226e+00, final cost 2.2831e+00, first-order optimality 8.15e+00.

Сравнение параметров (иcходное vs найденное):
параметр  исходное  найденное
       k     0.003   0.001739
  B_base     0.500   0.501603
    N_th     0.300   0.400000
     tau    25.000  25.000000
   alpha     1.500   0.500553
       s     1.100   0.901383

![im1](https://raw.githubusercontent.com/Udav6/Mathem_model_2025/main/Fig_L4_5.png)

![im1](https://raw.githubusercontent.com/Udav6/Mathem_model_2025/main/Fig_L4_6.png)

## ковариационная оценка параметров

In [None]:
from numpy.linalg import inv, LinAlgError

param_names = ['k','B_base','N_th','tau','alpha','s']
p = len(param_names)
m = len(measurements_noisy)

resid = res.fun
ssq = np.sum(resid**2)
dof = max(1, m - p)
sigma2 = ssq / dof

J = res.jac
try:
    JTJ = J.T.dot(J)
    JTJ_inv = inv(JTJ)
    cov_theta = sigma2 * JTJ_inv
    se = np.sqrt(np.abs(np.diag(cov_theta)))
    ci_lo_cov = fitted - 1.96 * se
    ci_hi_cov = fitted + 1.96 * se
except Exception as e:
    cov_theta = np.full((p,p), np.nan)
    se = np.full(p, np.nan)
    ci_lo_cov = np.full(p, np.nan)
    ci_hi_cov = np.full(p, np.nan)

df_cov = pd.DataFrame({
    'Параметр': param_names,
    'Оценка (fitted)': fitted,
    'SE (ков.)': se,
    'Нижн. 95% CI (ков.)': ci_lo_cov,
    'Верхн. 95% CI (ков.)': ci_hi_cov
})
print("Ковариационная оценка (результат может быть NaN при неидентифицируемости):")
print(df_cov.round(6))

sample_times_model, fitted_model_meas, fitted_true_samples = simulate_given_theta(
    fitted, t_max=t_max, dt=dt, sample_dt=sample_dt, activity=activity
)

residuals_obs = measurements_noisy - fitted_model_meas
n_boot = 200
boots = []
print(f"\nЗапускаю bootstrap (residual resampling), N = {n_boot} (это может занять время)...")
for i in range(n_boot):
    resamp = np.random.choice(residuals_obs, size=len(residuals_obs), replace=True)
    meas_b = fitted_model_meas + resamp
    try:
        r_b = least_squares(residuals, x0, bounds=(lower, upper),
                            args=(meas_b, t_max, dt, sample_dt, activity),
                            xtol=1e-6, ftol=1e-6, max_nfev=300, verbose=0)
        boots.append(r_b.x)
    except Exception:
        continue

boots = np.array(boots)
print("Успешных бутстрэп-прогонов:", boots.shape[0])

if boots.size > 0:
    boot_mean = boots.mean(axis=0)
    boot_std = boots.std(axis=0, ddof=1)
    boot_ci_lo = np.percentile(boots, 2.5, axis=0)
    boot_ci_hi = np.percentile(boots, 97.5, axis=0)
else:
    boot_mean = boot_std = boot_ci_lo = boot_ci_hi = np.full(p, np.nan)

df_boot = pd.DataFrame({
    'Параметр': param_names,
    'Оценка (fitted)': fitted,
    'Среднее (bootstrap)': boot_mean,
    'Станд. откл. (bootstrap)': boot_std,
    'Нижн. 95%CI (boot)': boot_ci_lo,
    'Верхн. 95%CI (boot)': boot_ci_hi,
    'n_boot успешных': [boots.shape[0]]*p
})
print("\nРезультаты bootstrap:")
pd.set_option('display.max_columns', None) ///ВОТ ЭТА СТРОЧКА ОТОБРАЖАЕТ ВСЮ ТАБЛИЦУ А НЕ 1-2КОЛНКИ)))
print(df_boot.round(6))

if boots.size > 0:
    fig, axs = plt.subplots(p, 1, figsize=(8, 2.2*p), tight_layout=True)
    for j, ax in enumerate(axs):
        ax.hist(boots[:, j], bins=30, alpha=0.7, edgecolor='k')
        ax.axvline(fitted[j], color='red', linestyle='--', label='fitted')
        ax.axvline(boot_ci_lo[j], color='k', linestyle=':', label='95% CI' if j==0 else None)
        ax.axvline(boot_ci_hi[j], color='k', linestyle=':')
        ax.set_title(f"Bootstrap: {param_names[j]}")
        ax.set_xlabel('Значение')
        ax.set_ylabel('Частота')
        if j==0:
            ax.legend()
    plt.show()

def compute_metrics_from_trace(times, N_trace, params, threshold_frac=0.5):
    """
    Вернёт: min N, суммарное время ниже threshold (min), число запусков синтеза (по скачкам)
    threshold_frac — доля от N_th или абсолютный порог (если < N_th)
    """
    dt_local = times[1] - times[0]
    minN = float(N_trace.min())
    Nth = params.get('N_th', fitted_params.get('N_th', 0.0))
    threshold = Nth * threshold_frac if (Nth is not None and Nth > 0) else 0.0
    time_below = float(((N_trace < threshold).sum()) * dt_local)
    diffs = np.diff(N_trace)
    B_est = params.get('B_base', fitted_params.get('B_base', 0.0))
    bolus_count = int((diffs > 0.5 * max(1e-9, B_est)).sum())
    return {'minN': minN, 'time_below_min': time_below, 'bolus_count': bolus_count, 'threshold': threshold}

params_normal = dict(fitted_params)
times_n, N_n, *_ = simulate_neuron({**params_normal, 'N0': true_params.get('N0',1.0)}, t_max=t_max, dt=dt, sample_dt=1.0, noise_sd=0.0, activity=activity)[:2]
metrics_n = compute_metrics_from_trace(times_n, N_n, params_normal, threshold_frac=0.8)

params_crit = dict(params_normal)
params_crit['k'] = params_normal['k'] * 1.5
params_crit['tau'] = params_normal['tau'] * 1.5
params_crit['B_base'] = max(1e-6, params_normal['B_base'] * 0.6)
times_c, N_c, *_ = simulate_neuron({**params_crit, 'N0': true_params.get('N0',1.0)}, t_max=t_max, dt=dt, sample_dt=1.0, noise_sd=0.0, activity=activity)[:2]
metrics_c = compute_metrics_from_trace(times_c, N_c, params_crit, threshold_frac=0.8)

#  пиковая активация (локальный мультипликативный всплеск активности)
activity_peak = activity.copy()
# увеличим активность в окне [120, 180] мин в 4 раза
start_i = int(120.0 / dt); end_i = int(180.0 / dt)
activity_peak[start_i:end_i] *= 4.0
times_p, N_p, *_ = simulate_neuron({**params_normal, 'N0': true_params.get('N0',1.0)}, t_max=t_max, dt=dt, sample_dt=1.0, noise_sd=0.0, activity=activity_peak)[:2]
metrics_p = compute_metrics_from_trace(times_p, N_p, params_normal, threshold_frac=0.8)


print("\n=== Верификация сценариев (нейрон) ===")
print("Норма:")
print(f"  минимальный запас N = {metrics_n['minN']:.4f}")
print(f"  время ниже порога ({metrics_n['threshold']:.4f}) = {metrics_n['time_below_min']:.2f} мин")
print(f"  число запусков синтеза = {metrics_n['bolus_count']}")
print("\nКритическая ситуация:")
print(f"  минимальный запас N = {metrics_c['minN']:.4f}")
print(f"  время ниже порога ({metrics_c['threshold']:.4f}) = {metrics_c['time_below_min']:.2f} мин")
print(f"  число запусков синтеза = {metrics_c['bolus_count']}")
print("\nПиковая нагрузка:")
print(f"  минимальный запас N = {metrics_p['minN']:.4f}")
print(f"  время ниже порога ({metrics_p['threshold']:.4f}) = {metrics_p['time_below_min']:.2f} мин")
print(f"  число запусков синтеза = {metrics_p['bolus_count']}")


plt.figure(figsize=(10,4))
plt.plot(times, activity, label='Активность A(t) (исходная)')
plt.plot(times, activity_peak, '--', label='Активность A(t) (пиковая)')
plt.xlabel('Время (мин)')
plt.ylabel('Активность (отн.)')
plt.title('Активность: нормальная и пиковая')
plt.legend()
plt.grid(True)
plt.show()

plt.figure(figsize=(10,4))
plt.plot(times_n, N_n, label='Норма (подогнанные параметры)')
plt.plot(times_c, N_c, label='Критическая (ухудшенные параметры)')
plt.plot(times_p, N_p, ':', label='Пиковая нагрузка (всплеск активности)')
plt.axhline(params_normal['N_th']*0.8, color='k', linestyle=':', label='Порог безопасности (0.8 * N_th)')
plt.xlabel('Время (мин)')
plt.ylabel('Запас N (отн.)')
plt.title('Траектории запаса нейромедиатора — сценарии')
plt.legend()
plt.grid(True)
plt.show()

## результаты

In [None]:
Ковариационная оценка (результат может быть NaN при неидентифицируемости):
  Параметр  Оценка (fitted)  ...  Нижн. 95% CI (ков.)  Верхн. 95% CI (ков.)
0        k         0.001739  ...                  NaN                   NaN
1   B_base         0.501603  ...                  NaN                   NaN
2     N_th         0.400000  ...                  NaN                   NaN
3      tau        25.000000  ...                  NaN                   NaN
4    alpha         0.500553  ...                  NaN                   NaN
5        s         0.901383  ...                  NaN                   NaN

[6 rows x 5 columns]

Запускаю bootstrap (residual resampling), N = 200 (это может занять время)...
Успешных бутстрэп-прогонов: 200

Результаты bootstrap:
  Параметр  Оценка (fitted)  Среднее (bootstrap)  Станд. откл. (bootstrap)  \
0        k         0.001739             0.001727                  0.000410   
1   B_base         0.501603             0.501403                  0.016091   
2     N_th         0.400000             0.401533                  0.018666   
3      tau        25.000000            25.412651                  3.076174   
4    alpha         0.500553             0.499752                  0.009709   
5        s         0.901383             0.905194                  0.017930   

   Нижн. 95%CI (boot)  Верхн. 95%CI (boot)  n_boot успешных  
0            0.001433             0.003000              200  
1            0.496441             0.502201              200  
2            0.400000             0.400477              200  
3           25.000000            25.039019              200  
4            0.498449             0.500764              200  
5            0.900000             0.971746              200  

=== Верификация сценариев (нейрон) ===
Норма:
  минимальный запас N = 0.3362
  время ниже порога (0.3200) = 0.00 мин
  число запусков синтеза = 1

Критическая ситуация:
  минимальный запас N = 0.2967
  время ниже порога (0.3200) = 39.50 мин
  число запусков синтеза = 2

Пиковая нагрузка:
  минимальный запас N = 0.3696
  время ниже порога (0.3200) = 0.00 мин
  число запусков синтеза = 1

![im1](https://raw.githubusercontent.com/Udav6/Mathem_model_2025/main/Fig_L4_7.png)

# Задача 3. Задача от DevOps-инженера.

«Наш сервис периодически "ложится"во время всплесков трафика, потому что автоматическое добавление серверов срабатывает слишком медленно. Нам нужна модель, которая бы не реагировала на текущую нагрузку, а предсказывала, когда нам не хватит мощностей, и запускала масштабирование ЗАРАНЕЕ. Что это за модель и какие параметры нам нужно в ней настроить?»
Подсказка 1: Реагировать на 100% загрузку серверов — уже поздно. Нам нужно правило, которое срабатывает при достижении некоего "предупредительного" уровня и заказывает фиксированный пакет дополнительных мощностей. Учтите, что их развёртывание занимает предсказуемое время.
Подсказка 2: Сформулируйте правило вида: "Если свободная мощность < X, то заказать Y единиц мощности, которые будут доступны через время T".
Ограничения:
• Время развёртывания серверов непостоянно (зависит от нагрузки обла- ка).
• Стоимость простоя и избыточной мощности трудно оценить количе- ственно.
• Нагрузка имеет сильно случайный характер.
• Критический уровень должен подбираться с учитом "сезонных"колебаний.

Основные гипотезы:  
1. Входная нагрузка и степень загрузки серверов непрерывны между всплесками трафика.  
2. Пропускная способность одного сервера - постоянная величина.  
3. В качестве основого управляющего параметра будет рассматриваться свободная мощность сервера.  
4. Время развертывания сервера ненулевое.  
5. Масштабирование является дискретным процессом.  
6. Доступное количество серверов в моменты времени между скачками постоянны.

Ключевые параметры:  
1. $L(t)$ - входная нагрузка сервера (запр.).  
2. $\mu$ - пропускная способность одного сервера (запр./мин).  
3. $R(t)=\frac{L(t)}{\mu}$ - потребное количество серверов в момент времени $t$ (серв.).  
4. $K(t)$ - доступное кол-во серверов в момент времени $t$.   
5. $u(t)=\frac{R(t)}{K(t)}$ - степень загрузки системы в момент времени $t$.  
6. $X(t)=K(t)-R(t)$ - свободная мощность системы.  
$X(t)>0$ - имеется запас по мощности, система работает без ошибок;  
$X(t)=0$ - система работает в предельном режиме;  
$X(t) < 0$ - система деградирует, появляются ошибки, таймауты.  
7. $\tau>0$ - время развертывания сервера (сек).  
8. $B$ - объем увеличения нагрузки (числа серверов) при скачке трафика.
9. $X_{th}$ - пороговое значение свободной мощности.   


Рассмотрим изменение свободной мощности системы в момент времени $t$:  

$X(t) = K(t) - R(t)=K(t) - \frac{L(t)}{\mu}$     
($K(0)=K_0,   R(0)=R_0,  X(0)=X_0=K_0 - R_0$)

Продифференцируем равенство по переменной $t$:

$\frac{dX}{dt} = \frac{dK}{dt} - \frac{dL}{\mu dt}$  

Так как $K(t)=const$ между скачками трафика, то $\frac{dK}{dt} = 0$, следовательно имеем:  



$\frac{dX}{dt} = -\frac{1}{\mu} \frac{dL}{dt}$   


Определим момент подачи запроса: $t_{signal}:  X(t_{signal}) \le X_{th}$.  

В таком случае время подачи новых серверов: $t_{deploy} = t_{signal} + \tau$.  

Тогда правило дискретного скачка можно определить следующим образом:

$X(t^+_{deploy}) = X(t^-_{deploy}) + B$


Оптимизация будет проводить аналогично предыдущим задачам методом наименьших квадратов для параметров: $\Theta = [X_{th}, \tau, B, K_0]$

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.optimize import least_squares
import pandas as pd
from numpy.linalg import inv, LinAlgError
from scipy.optimize import least_squares
from scipy.stats import chi2

np.random.seed(42)

def gen_traffic(T=24*60, dt=1.0, baseline=1100, peak_amp=600, noise_scale=120, spike_prob=0.003):
    times = np.arange(0, T, dt)
    daily = peak_amp * np.sin(2*np.pi*times/1440.0)
    spikes = np.zeros_like(times)
    for i in range(len(times)):
        if np.random.rand() < spike_prob:
            length = np.random.randint(5, 30)
            spikes[i:i+length] += np.random.uniform(2000, 10000)
    noise = noise_scale * np.random.randn(len(times))
    L = np.maximum(0.0, baseline + daily + spikes + noise)
    return times, L

def sample_deploy_time(mu_tau=8.0, sigma_tau=3.0):
    val = np.random.normal(mu_tau, sigma_tau)
    return max(1.0, val)

def simulate_service(times, L, mu_per_server=120.0, X_th=5.0, B=2, init_servers=10,
                     deploy_mu=8.0, deploy_sigma=3.0, max_parallel=10, deterministic_tau=False):
    dt = times[1] - times[0]
    n = len(times)
    R = L / mu_per_server
    K = np.zeros(n)
    K[0] = init_servers
    pending = []
    util = np.zeros(n)
    X_series = np.zeros(n)
    downtime = 0.0
    for i, t in enumerate(times):
        arrivals = [p for p in pending if p[0] <= t + 1e-9]
        if arrivals:
            for p in arrivals:
                K[i-1 if i>0 else 0] += p[1]
            pending = [p for p in pending if p not in arrivals]
        if i > 0:
            K[i] = K[i-1]
        X = K[i] - R[i]
        X_series[i] = X
        util[i] = min(1.0, R[i] / max(1e-6, K[i]))
        if R[i] > K[i]:
            downtime += dt
        total_pending = sum([p[1] for p in pending])
        if X <= X_th and (len(pending) < max_parallel):
            if deterministic_tau:
                tau = deploy_mu
            else:
                tau = sample_deploy_time(deploy_mu, deploy_sigma)
            arrival = t + tau
            pending.append((arrival, int(round(B))))
    return {'times': times, 'L': L, 'R': R, 'K': K, 'X': X_series, 'util': util, 'downtime': downtime}

# Генерация данных и подгонка
times, L = gen_traffic()
true_params = {'mu_per_server': 120.0, 'X_th': 7.0, 'B': 4, 'init_servers': 11, 'deploy_mu': 8.0, 'deploy_sigma': 3.0}
out_true = simulate_service(times, L, mu_per_server=true_params['mu_per_server'],
                            X_th=true_params['X_th'], B=true_params['B'],
                            init_servers=true_params['init_servers'],
                            deploy_mu=true_params['deploy_mu'], deploy_sigma=true_params['deploy_sigma'],
                            deterministic_tau=False)
sample_dt = 5.0
sample_times = np.arange(0, times[-1]+1e-9, sample_dt)
sample_idx = np.searchsorted(times, sample_times)
util_true_samples = out_true['util'][sample_idx]
noise_sd = 0.02
util_noisy = np.clip(util_true_samples + np.random.normal(scale=noise_sd, size=util_true_samples.shape), 0.0, 1.0)

def simulate_policy_for_theta(theta, times, L, sample_idx, mu_per_server_known=120.0):
    X_th, B, tau, init_servers = theta
    out = simulate_service(times, L, mu_per_server=mu_per_server_known,
                           X_th=float(X_th), B=max(1,int(round(B))), init_servers=max(1,int(round(init_servers))),
                           deploy_mu=float(tau), deploy_sigma=0.0, deterministic_tau=True, max_parallel=100)
    util_samples = out['util'][sample_idx]
    return util_samples

def residuals(theta, observed, times, L, sample_idx):
    theta = np.array(theta, dtype=float)
    if np.any(theta <= 0):
        return np.ones_like(observed) * 1e3
    sim_vals = simulate_policy_for_theta(theta, times, L, sample_idx)
    return sim_vals - observed

x0 = np.array([5.0, 2.0, 8.0, 10.0])
lower_bounds = [0.0, 1.0, 1.0, 1.0]
upper_bounds = [20.0, 10.0, 60.0, 200.0]

res = least_squares(residuals, x0, bounds=(lower_bounds, upper_bounds),
                    args=(util_noisy, times, L, sample_idx),
                    verbose=2, xtol=1e-6, ftol=1e-6, max_nfev=300)

fitted = res.x
fitted_params = {'X_th': float(fitted[0]), 'B': int(round(fitted[1])), 'tau': float(fitted[2]), 'init_servers': int(round(fitted[3]))}

out_fit = simulate_service(times, L, mu_per_server=true_params['mu_per_server'],
                           X_th=fitted_params['X_th'], B=fitted_params['B'],
                           init_servers=fitted_params['init_servers'],
                           deploy_mu=fitted_params['tau'], deploy_sigma=0.0, deterministic_tau=True)

util_fit_samples = out_fit['util'][sample_idx]


import matplotlib.pyplot as plt

plt.figure(figsize=(12,8))
ax1 = plt.subplot(3,1,1)
ax1.plot(times, L, label='Трафик L(t) (запросы/мин)')
ax1.set_ylabel('L(t)'); ax1.legend(); ax1.grid(True)

ax2 = plt.subplot(3,1,2)
ax2.plot(times, out_true['K'], label='K(t)', color='tab:green')
ax2.set_ylabel('Сервера K(t)'); ax2.legend(); ax2.grid(True)

ax3 = plt.subplot(3,1,3)
ax3.plot(sample_times, util_noisy, 'o', label='Наблюдаемая загрузка (шум)')
ax3.plot(sample_times, util_true_samples, '-', label='Истинная загрузка (сэмплы)')
ax3.set_xlabel('Время, мин'); ax3.set_ylabel('Загрузка (util = R/K)')
ax3.legend(); ax3.grid(True)
plt.tight_layout(); plt.show()


plt.figure(figsize=(12,8))
ax1 = plt.subplot(3,1,1)
ax1.plot(times, L, label='Трафик L(t) (запросы/мин)')
ax1.set_ylabel('L(t)'); ax1.legend(); ax1.grid(True)

ax2 = plt.subplot(3,1,2)
ax2.plot(times, out_true['K'], label='K(t) — истинное', color='tab:green')
ax2.plot(times, out_fit['K'], '--', label='K(t) — модель (подгонка)', color='tab:olive')
ax2.set_ylabel('Сервера K(t)'); ax2.legend(); ax2.grid(True)

ax3 = plt.subplot(3,1,3)
ax3.plot(sample_times, util_noisy, 'o', label='Наблюдаемая загрузка (шум)')
ax3.plot(sample_times, util_true_samples, '-', label='Истинная загрузка (сэмплы)')
ax3.plot(sample_times, util_fit_samples, '--', label='Модельная загрузка (подгонка)')
ax3.set_xlabel('Время, мин'); ax3.set_ylabel('Загрузка (util = R/K)')
ax3.legend(); ax3.grid(True)
plt.tight_layout(); plt.show()


df = pd.DataFrame([
    {'параметр': 'X_th', 'исходное': true_params['X_th'], 'найденное': fitted_params['X_th']},
    {'параметр': 'B', 'исходное': true_params['B'], 'найденное': fitted_params['B']},
    {'параметр': 'tau', 'исходное': true_params['deploy_mu'], 'найденное': fitted_params['tau']},
    {'параметр': 'init_servers', 'исходное': true_params['init_servers'], 'найденное': fitted_params['init_servers']},
])
print(df.to_string(index=False))
#df.to_csv('devops_params_true_vs_fitted.csv', index=False)
#print("CSV сохранён: devops_params_true_vs_fitted.csv")

## результаты

![im1](https://raw.githubusercontent.com/Udav6/Mathem_model_2025/main/Fig_L4_8.png)

![im1](https://raw.githubusercontent.com/Udav6/Mathem_model_2025/main/Fig_L4_9.png)

In [None]:
D:\Mathem_model\Attempt\Scripts\python.exe D:\Mathem_model\L1.py 
   Iteration     Total nfev        Cost      Cost reduction    Step norm     Optimality   
       0              1         3.6410e-01                                    1.10e+08    
       1              2         1.7573e-01      1.88e-01       2.36e-07       0.00e+00    
`gtol` termination condition is satisfied.
Function evaluations 2, initial cost 3.6410e-01, final cost 1.7573e-01, first-order optimality 0.00e+00.
    параметр  исходное  найденное
        X_th       7.0        5.0
           B       4.0        2.0
         tau       8.0        8.0
init_servers      11.0       10.0

## Ковариационная оценка

In [None]:
needed = ['simulate_service', 'res', 'fitted', 'fitted_params', 'util_noisy', 'times', 'L', 'sample_idx']
for name in needed:
    if name not in globals():
        raise RuntimeError(f"Требуется объект/функция '{name}' в окружении — выполните предыдущие ячейки.")

param_names = ['X_th','B','tau','init_servers']
p = len(param_names)
m = len(util_noisy)


print("1) Попытка ковариационной оценки параметров (σ^2 (J^T J)^{-1})...")
resid_vals = res.fun
ssq = np.sum(resid_vals**2)
dof = max(1, m - p)
sigma2 = ssq / dof

J = res.jac
try:
    JTJ = J.T.dot(J)
    JTJ_inv = inv(JTJ)
    cov_theta = sigma2 * JTJ_inv
    se = np.sqrt(np.abs(np.diag(cov_theta)))
    ci_lo_cov = fitted - 1.96 * se
    ci_hi_cov = fitted + 1.96 * se
except Exception:
    cov_theta = np.full((p,p), np.nan)
    se = np.full(p, np.nan)
    ci_lo_cov = np.full(p, np.nan)
    ci_hi_cov = np.full(p, np.nan)

df_cov = pd.DataFrame({
    'Параметр': param_names,
    'Оценка (fitted)': fitted,
    'SE (ков.)': se,
    'Нижн. 95% CI (ков.)': ci_lo_cov,
    'Верхн. 95% CI (ков.)': ci_hi_cov
})
print(df_cov.round(6))


print("\n2) Bootstrap (residual resampling) — оценки погрешностей (эмпирические).")

sim_model_samples = simulate_policy_for_theta(fitted, times, L, sample_idx, mu_per_server_known=true_params['mu_per_server'])
residuals_obs = util_noisy - sim_model_samples
n_boot = 200
boots = []
print(f"Запускаю bootstrap: n={n_boot} (это может занять время)...")
for i in range(n_boot):
    resamp = np.random.choice(residuals_obs, size=len(residuals_obs), replace=True)
    meas_b = sim_model_samples + resamp
    try:
        r_b = least_squares(residuals, x0, bounds=(lower_bounds, upper_bounds),
                            args=(meas_b, times, L, sample_idx),
                            xtol=1e-6, ftol=1e-6, max_nfev=400, verbose=0)
        boots.append(r_b.x)
    except Exception:
        continue

boots = np.array(boots)
print("Успешных бутстрэп-прогонов:", boots.shape[0])

if boots.size > 0:
    boot_mean = boots.mean(axis=0)
    boot_std = boots.std(axis=0, ddof=1)
    boot_ci_lo = np.percentile(boots, 2.5, axis=0)
    boot_ci_hi = np.percentile(boots, 97.5, axis=0)
else:
    boot_mean = boot_std = boot_ci_lo = boot_ci_hi = np.full(p, np.nan)

df_boot = pd.DataFrame({
    'Параметр': param_names,
    'Оценка (fitted)': fitted,
    'Среднее (bootstrap)': boot_mean,
    'Станд. откл. (bootstrap)': boot_std,
    'Нижн. 95% CI (boot)': boot_ci_lo,
    'Верхн. 95% CI (boot)': boot_ci_hi,
    'n_boot успешных': [boots.shape[0]]*p
})
print(df_boot.round(6))

if boots.size > 0:
    fig, axs = plt.subplots(p, 1, figsize=(8, 2.2*p), tight_layout=True)
    for j, ax in enumerate(axs):
        ax.hist(boots[:, j], bins=30, alpha=0.7, edgecolor='k')
        ax.axvline(fitted[j], color='red', linestyle='--', label='fitted')
        ax.axvline(boot_ci_lo[j], color='k', linestyle=':', label='95% CI' if j==0 else None)
        ax.axvline(boot_ci_hi[j], color='k', linestyle=':')
        ax.set_title(f"Bootstrap: {param_names[j]}")
        ax.set_xlabel('Значение')
        ax.set_ylabel('Частота')
        if j==0:
            ax.legend()
    plt.show()


print("\n3) Верификация сценарием: Normal / Critical / Peak")

def compute_service_metrics(out):
    times_local = out['times']
    X = out['X']
    dt_local = times_local[1] - times_local[0]
    minX = float(np.min(X))
    time_negative = float(((X < 0).sum()) * dt_local)
    time_zero = float(((np.isclose(X, 0.0)).sum()) * dt_local)
    downtime = float(((out['R'] > out['K']).sum()) * dt_local)
    diffsK = np.diff(out['K'])
    deployments = int((diffsK > 0).sum())
    return {'minX': minX, 'time_X_lt_0_min': time_negative, 'time_X_eq_0_min': time_zero, 'downtime_min': downtime, 'deployments': deployments}


theta_fit = [fitted[0], fitted[1], fitted[2], fitted[3]]
out_normal = simulate_service(times, L, mu_per_server=true_params['mu_per_server'],
                              X_th=fitted_params['X_th'], B=fitted_params['B'],
                              init_servers=fitted_params['init_servers'],
                              deploy_mu=fitted_params['tau'], deploy_sigma=0.0, deterministic_tau=True, max_parallel=100)
metrics_normal = compute_service_metrics(out_normal)


crit = dict(fitted_params)
crit['init_servers'] = max(1, int(round(fitted_params['init_servers'] * 0.7)))
crit['tau'] = fitted_params['tau'] * 1.5
crit['B'] = max(1, int(round(fitted_params['B'] * 0.6)))
crit['X_th'] = max(0.0, fitted_params['X_th'] * 0.5)
out_crit = simulate_service(times, L, mu_per_server=true_params['mu_per_server'],
                            X_th=crit['X_th'], B=crit['B'],
                            init_servers=crit['init_servers'],
                            deploy_mu=crit['tau'], deploy_sigma=0.0, deterministic_tau=True, max_parallel=100)
metrics_critical = compute_service_metrics(out_crit)


L_peak = L.copy()
start_t = 600; end_t = 720
idx_start = np.searchsorted(times, start_t); idx_end = np.searchsorted(times, end_t)
L_peak[idx_start:idx_end] += 3 * np.max(L)
out_peak = simulate_service(times, L_peak, mu_per_server=true_params['mu_per_server'],
                            X_th=fitted_params['X_th'], B=fitted_params['B'],
                            init_servers=fitted_params['init_servers'],
                            deploy_mu=fitted_params['tau'], deploy_sigma=0.0, deterministic_tau=True, max_parallel=100)
metrics_peak = compute_service_metrics(out_peak)


print("\n=== Результаты верификации ===")
print("Норма (по найденным параметрам):")
print(f"  min X = {metrics_normal['minX']:.3f}")
print(f"  время X < 0 = {metrics_normal['time_X_lt_0_min']:.2f} мин")
print(f"  время X == 0 = {metrics_normal['time_X_eq_0_min']:.2f} мин")
print(f"  downtime (R>K) = {metrics_normal['downtime_min']:.2f} мин")
print(f"  число деплоев = {metrics_normal['deployments']}")

print("\nКритическая ситуация (ухудшенные параметры):")
print(f"  min X = {metrics_critical['minX']:.3f}")
print(f"  время X < 0 = {metrics_critical['time_X_lt_0_min']:.2f} мин")
print(f"  время X == 0 = {metrics_critical['time_X_eq_0_min']:.2f} мин")
print(f"  downtime (R>K) = {metrics_critical['downtime_min']:.2f} мин")
print(f"  число деплоев = {metrics_critical['deployments']}")

print("\nПиковая нагрузка (длинный spike в трафике):")
print(f"  min X = {metrics_peak['minX']:.3f}")
print(f"  время X < 0 = {metrics_peak['time_X_lt_0_min']:.2f} мин")
print(f"  время X == 0 = {metrics_peak['time_X_eq_0_min']:.2f} мин")
print(f"  downtime (R>K) = {metrics_peak['downtime_min']:.2f} мин")
print(f"  число деплоев = {metrics_peak['deployments']}")


plt.figure(figsize=(12,6))
plt.subplot(2,1,1)
plt.plot(times, L, label='Трафик L(t) (запросы/мин)')
plt.plot(times, L_peak, '--', label='Трафик (пиковая нагрузка)')
plt.ylabel('L(t)')
plt.legend(); plt.grid(True)

plt.subplot(2,1,2)
plt.plot(times, out_normal['X'], label='X(t) — Норма')
plt.plot(times, out_crit['X'], label='X(t) — Критическая', alpha=0.8)
plt.plot(times, out_peak['X'], label='X(t) — Пиковая нагрузка', linestyle=':')
plt.axhline(0.0, color='k', linestyle=':', label='Граница X=0 (предельный режим)')
plt.xlabel('Время (мин)')
plt.ylabel('Запас мощности X (ед.)')
plt.legend(); plt.grid(True)
plt.tight_layout(); plt.show()

plt.figure(figsize=(12,5))
plt.plot(sample_times, util_noisy, 'o', label='Наблюдаемая загрузка (шум)')
plt.plot(sample_times, sim_model_samples, '-', label='Модельная загрузка (подгонка)')
plt.plot(sample_times, util_fit_samples, '--', label='Модельная загрузка (fit output)')
plt.xlabel('Время (мин)')
plt.ylabel('Загрузка (util = R/K)')
plt.legend(); plt.grid(True); plt.show()


## результаты

![im1](https://raw.githubusercontent.com/Udav6/Mathem_model_2025/main/Fig_L4_10.png)

In [None]:
1) Попытка ковариационной оценки параметров (σ^2 (J^T J)^{-1})...
       Параметр  Оценка (fitted)  ...  Нижн. 95% CI (ков.)  Верхн. 95% CI (ков.)
0          X_th              5.0  ...                  NaN                   NaN
1             B              2.0  ...                  NaN                   NaN
2           tau              8.0  ...                  NaN                   NaN
3  init_servers             10.0  ...                  NaN                   NaN

[4 rows x 5 columns]

2) Bootstrap (residual resampling) — оценки погрешностей (эмпирические).
Запускаю bootstrap: n=200 (это может занять время)...
Успешных бутстрэп-прогонов: 200
       Параметр  Оценка (fitted)  ...  Верхн. 95% CI (boot)  n_boot успешных
0          X_th              5.0  ...                   5.0              200
1             B              2.0  ...                   2.0              200
2           tau              8.0  ...                   8.0              200
3  init_servers             10.0  ...                  10.0              200

[4 rows x 7 columns]

3) Верификация сценарием: Normal / Critical / Peak

=== Результаты верификации ===
Норма (по найденным параметрам):
  min X = -1.745
  время X < 0 = 3.00 мин
  время X == 0 = 0.00 мин
  downtime (R>K) = 2.00 мин
  число деплоев = 11

Критическая ситуация (ухудшенные параметры):
  min X = -4.745
  время X < 0 = 16.00 мин
  время X == 0 = 0.00 мин
  downtime (R>K) = 15.00 мин
  число деплоев = 17

Пиковая нагрузка (длинный spike в трафике):
  min X = -31.009
  время X < 0 = 26.00 мин
  время X == 0 = 0.00 мин
  downtime (R>K) = 24.00 мин
  число деплоев = 37
Сетка по параметрам:
 X_th: [ 0  1  2  3  4  5  6  7  8  9 10 11]
 B   : [1 2 3 4]
 tau : [ 1.    1.85  2.7   3.55  4.4   5.25  6.1   6.95  7.8   8.65  9.5  10.35
 11.2  12.05 12.9  13.75 14.6  15.45 16.3  17.15 18.  ]
 init: [ 6  7  8  9 10 11 12 13 14]

SSQ в точке fitted = 0.351458; sigma2_hat = 1.237527e-03; dof = 284

Профили параметров и допустимые диапазоны (по 2 критериям):
          param  grid_len   ssq_min  argmin rel_tol_range chi2_range  \
0          X_th        12  0.187346     6.0        (6, 9)     (6, 7)   
1             B         4  0.351458     2.0        (2, 2)     (2, 2)   
2           tau        21  0.257326     9.5        (9, 9)     (9, 9)   
3  init_servers         9  0.303558     9.0       (9, 12)     (9, 9)   

          rel_tol_idxs            chi2_idxs  
0         [6, 7, 8, 9]               [6, 7]  
1                  [2]                  [2]  
2  [9.500000117945435]  [9.500000117945435]  
3              [9, 12]                  [9]  