# Анализ эксперимента run-pid-v1

Анализ работы PID-регулятора на реальной установке.

In [None]:
import re
from pathlib import Path

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from nn_laser_stabilizer.config.config import load_config
from nn_laser_stabilizer.paths import get_experiment_dir

## Загрузка эксперимента

In [None]:
EXPERIMENT_NAME = "run-pid-v1"
EXPERIMENT_DATE = "2026-02-05"
EXPERIMENT_TIME = "12-45-24"

EXPERIMENT_DIR = get_experiment_dir(
    experiment_name=EXPERIMENT_NAME, 
    experiment_date=EXPERIMENT_DATE, 
    experiment_time=EXPERIMENT_TIME)

print(f"Директория эксперимента: {EXPERIMENT_DIR}")
print(f"Существует: {EXPERIMENT_DIR.exists()}")

In [None]:
config = load_config(EXPERIMENT_DIR / "config.yaml")

print(f"Эксперимент: {config.experiment_name}")
print(f"\nПараметры PID:")
print(f"  kp = {config.pid.kp}")
print(f"  ki = {config.pid.ki}")
print(f"  kd = {config.pid.kd}")
print(f"  dt = {config.pid.dt}")
print(f"\nУставка: {config.setpoint}")
print(f"Warmup: {config.warmup_steps} шагов, output={config.warmup_output}")

## Парсинг логов соединения

In [None]:
def parse_connection_log(log_path: Path) -> pd.DataFrame:
    send_pattern = re.compile(r"\[PHASE_SHIFTER\]\s+send:\s+control_output=(\d+)")
    read_pattern = re.compile(r"\[PHASE_SHIFTER\]\s+read:\s+process_variable=(\d+)")
    
    control_outputs = []
    process_variables = []
    
    with open(log_path, "r") as f:
        for line in f:
            send_match = send_pattern.search(line)
            if send_match:
                control_outputs.append(int(send_match.group(1)))
                continue
            
            read_match = read_pattern.search(line)
            if read_match:
                process_variables.append(int(read_match.group(1)))
    
    min_len = min(len(control_outputs), len(process_variables))
    
    return pd.DataFrame({
        "step": range(min_len),
        "control_output": control_outputs[:min_len],
        "process_variable": process_variables[:min_len],
    })

In [None]:
log_path = EXPERIMENT_DIR / "connection.log"
df = parse_connection_log(log_path)

df["setpoint"] = config.setpoint
df["error"] = df["process_variable"] - df["setpoint"]

print(f"Загружено {len(df)} шагов")
df.head(10)

## Обзор данных

In [None]:
warmup_steps = config.warmup_steps

df_warmup = df[df["step"] < warmup_steps]
df_work = df[df["step"] >= warmup_steps]

print(f"Warmup: {len(df_warmup)} шагов")
print(f"Рабочая фаза: {len(df_work)} шагов")

In [None]:
print("Статистика рабочей фазы:")
print(f"\nProcess Variable:")
print(f"  Среднее: {df_work['process_variable'].mean():.2f}")
print(f"  Std: {df_work['process_variable'].std():.2f}")
print(f"  Min: {df_work['process_variable'].min()}")
print(f"  Max: {df_work['process_variable'].max()}")

print(f"\nОшибка (error = pv - setpoint):")
print(f"  MAE: {df_work['error'].abs().mean():.2f}")
print(f"  Среднее: {df_work['error'].mean():.2f}")
print(f"  Std: {df_work['error'].std():.2f}")

print(f"\nControl Output:")
print(f"  Среднее: {df_work['control_output'].mean():.2f}")
print(f"  Std: {df_work['control_output'].std():.2f}")
print(f"  Min: {df_work['control_output'].min()}")
print(f"  Max: {df_work['control_output'].max()}")

## Визуализация

In [None]:
fig, axes = plt.subplots(3, 1, figsize=(14, 10), sharex=True)

ax1 = axes[0]
ax1.plot(df["step"], df["process_variable"], label="Process Variable", alpha=0.8, linewidth=0.5)
ax1.axhline(config.setpoint, color="red", linestyle="--", label=f"Setpoint = {config.setpoint}")
ax1.axvline(warmup_steps, color="orange", linestyle=":", alpha=0.7, label="Конец warmup")
ax1.set_ylabel("Process Variable")
ax1.legend(loc="upper right")
ax1.set_title("Process Variable vs Setpoint")
ax1.grid(True, alpha=0.3)

ax2 = axes[1]
ax2.plot(df["step"], df["control_output"], label="Control Output", alpha=0.8, linewidth=0.5, color="green")
ax2.axvline(warmup_steps, color="orange", linestyle=":", alpha=0.7)
ax2.set_ylabel("Control Output")
ax2.legend(loc="upper right")
ax2.set_title("Control Output")
ax2.grid(True, alpha=0.3)

ax3 = axes[2]
ax3.plot(df["step"], df["error"], label="Error", alpha=0.8, linewidth=0.5, color="purple")
ax3.axhline(0, color="red", linestyle="--", alpha=0.5)
ax3.axvline(warmup_steps, color="orange", linestyle=":", alpha=0.7)
ax3.set_ylabel("Error (PV - SP)")
ax3.set_xlabel("Step")
ax3.legend(loc="upper right")
ax3.set_title("Error")
ax3.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

## Детальный анализ рабочей фазы

In [None]:
fig, axes = plt.subplots(2, 2, figsize=(14, 8))

ax1 = axes[0, 0]
ax1.hist(df_work["error"], bins=50, edgecolor="black", alpha=0.7)
ax1.axvline(0, color="red", linestyle="--")
ax1.set_xlabel("Error")
ax1.set_ylabel("Count")
ax1.set_title("Распределение ошибки")

ax2 = axes[0, 1]
ax2.hist(df_work["control_output"], bins=50, edgecolor="black", alpha=0.7, color="green")
ax2.set_xlabel("Control Output")
ax2.set_ylabel("Count")
ax2.set_title("Распределение управляющего воздействия")

ax3 = axes[1, 0]
ax3.scatter(df_work["process_variable"], df_work["control_output"], alpha=0.3, s=1)
ax3.set_xlabel("Process Variable")
ax3.set_ylabel("Control Output")
ax3.set_title("PV vs CO")

ax4 = axes[1, 1]
window = min(1000, len(df_work) // 10)
if window > 0:
    rolling_mae = df_work["error"].abs().rolling(window=window).mean()
    ax4.plot(df_work["step"], rolling_mae, color="purple")
    ax4.set_xlabel("Step")
    ax4.set_ylabel("MAE")
    ax4.set_title(f"Скользящее MAE (окно={window})")
    ax4.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

## Анализ переходного процесса (начало рабочей фазы)

In [None]:
N_STEPS = 2000

df_transient = df[(df["step"] >= warmup_steps) & (df["step"] < warmup_steps + N_STEPS)]

fig, axes = plt.subplots(2, 1, figsize=(14, 6), sharex=True)

ax1 = axes[0]
ax1.plot(df_transient["step"], df_transient["process_variable"], linewidth=0.8)
ax1.axhline(config.setpoint, color="red", linestyle="--", label=f"Setpoint = {config.setpoint}")
ax1.set_ylabel("Process Variable")
ax1.legend()
ax1.set_title(f"Переходный процесс (первые {N_STEPS} шагов после warmup)")
ax1.grid(True, alpha=0.3)

ax2 = axes[1]
ax2.plot(df_transient["step"], df_transient["control_output"], linewidth=0.8, color="green")
ax2.set_ylabel("Control Output")
ax2.set_xlabel("Step")
ax2.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()