<a href="https://colab.research.google.com/github/Marcin19721205/BasicTrainingPython/blob/main/SiecLSTM.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Stworzenie i wizualizacja obiektu

In [25]:
import numpy as np
import pandas as pd
import plotly.graph_objects as go

# ============================================================
# PARAMETRY SIMULACJI / OBIEKTU
# ============================================================
N           = 8000     # liczba kroków czasowych
dt          = 0.1      # krok czasowy [s]
t           = np.arange(N) * dt

K_proc      = 1.0       # wzmocnienie obiektu
zeta        = 0.4       # współczynnik tłumienia (0<zeta<1 → tłumienie)
omega_n     = 0.15      # częstość naturalna [rad/s] (~1/T)
T_dom       = 1.0 / omega_n   # umowna "stała czasowa" dla opóźnienia

delay_ratio = 0.8       # ułamek stałej czasowej
delay_steps = max(1, int(delay_ratio * T_dom / dt))

print(f"T_dom ≈ {T_dom:.3f} s")
print(f"Delay steps = {delay_steps} (≈ {delay_ratio*100:.1f}% T_dom)")

# ============================================================
# RNG + SZUM: persystentny, skośny, z outlierami
# ============================================================
rng = np.random.default_rng(42)

def generate_persistent_skewed_noise(N,
                                     phi=0.9,
                                     base_sigma=0.3,
                                     skew_factor=0.6,
                                     p_out=0.01,
                                     out_scale=4.0):
    eps  = rng.normal(0.0, base_sigma, size=N)
    noise = np.zeros(N)
    for k in range(1, N):
        noise[k] = phi * noise[k-1] + eps[k]

    skew_part = np.exp(skew_factor * noise)
    skew_part = skew_part - np.mean(skew_part)
    noise_skewed = noise + 0.7 * skew_part

    mask_out = rng.random(N) < p_out
    jumps    = rng.normal(0, out_scale * base_sigma, size=N)
    noise_skewed[mask_out] += jumps[mask_out]

    return noise_skewed

noise = generate_persistent_skewed_noise(N)

# ============================================================
# SYGNAŁ STERUJĄCY MV:
#  - start z MV = 0
#  - skok 0 -> 50% i trzymanie przez ~6*T_dom (faza STEP)
#  - potem PRBS skalowany do T_dom (faza PRBS)
# ============================================================
MV = np.zeros(N)

# czas ustalania: 6 * T_dom (w próbkach)
settle_steps = int(6 * T_dom / dt)
settle_steps = min(settle_steps, N)

# pierwszy skok 0 -> 50%
if settle_steps > 1:
    MV[0] = 0.0
    MV[1:settle_steps] = 50.0
else:
    MV[:] = 50.0

# parametry PRBS w krotnościach T_dom
prbs_min_T_factor = 0.5
prbs_max_T_factor = 2.0

prbs_min_len = max(1, int(prbs_min_T_factor * T_dom / dt))
prbs_max_len = max(prbs_min_len + 1, int(prbs_max_T_factor * T_dom / dt))

prbs_low     = 10.0
prbs_high    = 90.0

current_level = prbs_high
k = settle_steps

while k < N:
    seg_len = int(rng.integers(prbs_min_len, prbs_max_len + 1))
    end_idx = min(N, k + seg_len)
    MV[k:end_idx] = current_level
    current_level = prbs_low if current_level == prbs_high else prbs_high
    k = end_idx

print(f"PRBS segment length in samples: [{prbs_min_len}, {prbs_max_len}]")
print(f"PRBS segment length in seconds: [{prbs_min_len*dt:.2f}, {prbs_max_len*dt:.2f}]")

# ============================================================
# MODEL 2 RZĘDU Z TŁUMIENIEM + OPÓŹNIENIE
# ============================================================
x1 = np.zeros(N)  # CV
x2 = np.zeros(N)  # pochodna

for k in range(1, N):
    if k >= delay_steps:
        u_delayed = MV[k - delay_steps]
    else:
        u_delayed = MV[0]

    dx1 = x2[k-1]
    dx2 = -2.0 * zeta * omega_n * x2[k-1] - (omega_n**2) * x1[k-1] \
          + K_proc * (omega_n**2) * u_delayed

    x1[k] = x1[k-1] + dt * dx1
    x2[k] = x2[k-1] + dt * dx2

CV_clean = x1
CV = CV_clean + noise

# ============================================================
# FAZA TESTU: STEP / PRBS
# ============================================================
phase = np.empty(N, dtype=object)
phase[:settle_steps] = "STEP"
phase[settle_steps:] = "PRBS"

# można też mieć wersję numeryczną, jeśli chcesz pod ML:
# phase_id = np.zeros(N, dtype=int)
# phase_id[settle_steps:] = 1

# ============================================================
# DATAFRAME
# ============================================================
df = pd.DataFrame({
    "t":        t,
    "MV":       MV,
    "CV":       CV,
    "CV_clean": CV_clean,
    "noise":    noise,
    "phase":    phase
    # "phase_id": phase_id,   # jeśli jednak wolisz int
})

print(df.head())
print(df["phase"].value_counts())



# ============================================================
# WIZUALIZACJA
# ============================================================
fig1 = go.Figure()
fig1.add_trace(go.Scatter(x=df["t"], y=df["MV"],
                          mode="lines", name="MV (sterowanie)"))
fig1.add_trace(go.Scatter(x=df["t"], y=df["CV"],
                          mode="lines", name="CV (z szumem)"))
fig1.add_trace(go.Scatter(x=df["t"], y=df["CV_clean"],
                          mode="lines", name="CV_clean", line=dict(dash="dot")))

# pionowa linia w miejscu przejścia STEP -> PRBS
fig1.add_vline(x=settle_steps * dt,
               line=dict(dash="dash"),
               annotation_text="START PRBS",
               annotation_position="top right")

fig1.update_layout(
    title="Obiekt 2 rzędu z opóźnieniem + szum (STEP + PRBS, z fazą)",
    xaxis_title="t [s]",
    yaxis_title="Wartość",
)
fig1.show()

fig2 = go.Figure()
fig2.add_trace(go.Scatter(x=df["t"], y=df["noise"],
                          mode="lines", name="noise"))
fig2.update_layout(
    title="Szum (persystentny, skośny, z outlierami)",
    xaxis_title="t [s]",
    yaxis_title="noise"
)
fig2.show()


T_dom ≈ 6.667 s
Delay steps = 53 (≈ 80.0% T_dom)
PRBS segment length in samples: [33, 133]
PRBS segment length in seconds: [3.30, 13.30]
     t    MV        CV  CV_clean     noise phase
0  0.0   0.0 -0.051452       0.0 -0.051452  STEP
1  0.1  50.0 -0.482951       0.0 -0.482951  STEP
2  0.2  50.0 -0.130104       0.0 -0.130104  STEP
3  0.3  50.0  0.285207       0.0  0.285207  STEP
4  0.4  50.0 -0.569418       0.0 -0.569418  STEP
phase
PRBS    7600
STEP     400
Name: count, dtype: int64


Identyfikacja obiektu - dekonowulcja w dziedzinie częstotliwości

In [26]:
# ============================================================
# IDENTYFIKACJA DEAD TIME METODĄ DEKONWOLUCJI (FFT)
# u(t) = MV, y(t) = CV_clean
# ============================================================

# 1) bierzemy tylko fragment PRBS (lepsza stacjonarność)
mask_prbs = (df["phase"] == "PRBS")
u = df.loc[mask_prbs, "MV"].values.astype(float)       # wejście
y = df.loc[mask_prbs, "CV_clean"].values.astype(float) # wyjście "bez szumu"

# opcjonalnie: odjęcie średniej (żeby DC nie dominowało w FFT)
u = u - u.mean()
y = y - y.mean()

N_id = len(u)
print(f"Liczba próbek do identyfikacji: {N_id}")

# 2) zero-padding (żeby uniknąć zawijania splotu)
n_fft = 2**int(np.ceil(np.log2(2 * N_id)))   # najbliższa potęga 2 > 2*N
u_pad = np.zeros(n_fft)
y_pad = np.zeros(n_fft)
u_pad[:N_id] = u
y_pad[:N_id] = y

# 3) transformaty Fouriera
U = np.fft.fft(u_pad)
Y = np.fft.fft(y_pad)

# 4) dekonwolucja w dziedzinie częstotliwości:
#    H(ω) = Y(ω) / U(ω) – z lekką regularizacją, żeby nie dzielić przez ~0
eps = 1e-6 * np.max(np.abs(U))
H = Y * np.conj(U) / (np.abs(U)**2 + eps)

# 5) odpowiedź impulsowa: h[n] = IFFT{H(ω)}
h = np.fft.ifft(H).real  # interesuje nas część rzeczywista

# 6) kroimy sensowny zakres odpowiedzi impulsowej
#    (np. pierwsze 400 próbek ≈ 40 s przy dt = 0.1)
n_imp = min(400, len(h))
t_imp = np.arange(n_imp) * dt
h_trunc = h[:n_imp]

# 7) odpowiedź skokowa = całka (suma) z odpowiedzi impulsowej
step_resp = np.cumsum(h_trunc) * dt

# 8) estymacja dead time:
#    zakładamy, że odpowiedź na skok = 0 do chwili L,
#    potem startuje. Bierzemy np. próg 3,5% wartości ustalonej.
g_final = step_resp[-1]             # przybliżona wartość ustalona dla skoku 1
threshold = 0.035 * g_final          # 3,5% poziomu końcowego

idx_dead = np.argmax(step_resp > threshold)
t_dead_est = idx_dead * dt

true_dead = delay_steps * dt

print(f"Estymowany dead time z FFT:  t_dead_est ≈ {t_dead_est:.3f} s")
print(f"Prawdziwy dead time w modelu: t_dead_true ≈ {true_dead:.3f} s")
print(f"Indeks dead time: {idx_dead}, (kroki), vs. {delay_steps} z modelu")

# ============================================================
# WIZUALIZACJA: odpowiedź impulsowa i skokowa z FFT
# ============================================================

fig_imp = go.Figure()
fig_imp.add_trace(go.Scatter(x=t_imp, y=h_trunc,
                             mode="lines", name="h[n] (impulse response)"))
fig_imp.add_vline(x=true_dead,
                  line=dict(dash="dash"),
                  annotation_text="true dead time",
                  annotation_position="top right")
fig_imp.add_vline(x=t_dead_est,
                  line=dict(dash="dot"),
                  annotation_text="est dead time",
                  annotation_position="bottom right")
fig_imp.update_layout(
    title="Odpowiedź impulsowa (z dekonwolucji FFT)",
    xaxis_title="t [s]",
    yaxis_title="h(t)"
)
fig_imp.show()

fig_step = go.Figure()
fig_step.add_trace(go.Scatter(x=t_imp, y=step_resp,
                              mode="lines", name="step response (z FFT)"))
fig_step.add_hline(y=g_final, line=dict(dash="dash"),
                   annotation_text="wartość ustalona (≈ K_proc)",
                   annotation_position="top right")
fig_step.add_hline(y=threshold, line=dict(dash="dot"),
                   annotation_text="próg 2%",
                   annotation_position="bottom right")
fig_step.add_vline(x=true_dead,
                  line=dict(dash="dash"),
                  annotation_text="true dead time",
                  annotation_position="top left")
fig_step.add_vline(x=t_dead_est,
                  line=dict(dash="dot"),
                  annotation_text="est dead time",
                  annotation_position="bottom left")
fig_step.update_layout(
    title="Odpowiedź skokowa (całka z h[n]) + estymacja dead time",
    xaxis_title="t [s]",
    yaxis_title="y_step(t)"
)
fig_step.show()


Liczba próbek do identyfikacji: 7600
Estymowany dead time z FFT:  t_dead_est ≈ 6.800 s
Prawdziwy dead time w modelu: t_dead_true ≈ 5.300 s
Indeks dead time: 68, (kroki), vs. 53 z modelu


Metoda korelacyjna

In [27]:
# ============================================================
# IDENTYFIKACJA DEAD TIME METODĄ KORELACYJNĄ
# y(t) = CV_clean, u(t) = MV, tylko faza PRBS
# ============================================================

# 1) bierzemy fragment PRBS
mask_prbs = (df["phase"] == "PRBS")
u = df.loc[mask_prbs, "MV"].values.astype(float)
y = df.loc[mask_prbs, "CV_clean"].values.astype(float)

# odejmujemy średnią (żeby DC nie zdominowało korelacji)
u = u - u.mean()
y = y - y.mean()

N_id = len(u)
print(f"Liczba próbek do identyfikacji (PRBS): {N_id}")

# 2) AUTOKORELACJA R_uu[k] i KORELACJA KRZYŻOWA R_yu[k] dla k >= 0
def auto_corr_pos(x, max_lag):
    """Autokorelacja dla lagów >=0."""
    N = len(x)
    r = np.zeros(max_lag)
    for k in range(max_lag):
        r[k] = np.dot(x[k:], x[:N-k])
    return r / N

def cross_corr_pos(y, u, max_lag):
    """Korelacja krzyżowa R_yu[k] dla lagów >=0 (y opóźnione względem u)."""
    N = len(u)
    r = np.zeros(max_lag)
    for k in range(max_lag):
        r[k] = np.dot(y[k:], u[:N-k])
    return r / N

# długość szukanej odpowiedzi impulsowej (w próbkach)
L_imp = 200        # np. 200 próbek = 20 s przy dt=0.1
L_imp = min(L_imp, N_id // 2)  # zabezpieczenie

R_uu = auto_corr_pos(u, L_imp)
R_yu = cross_corr_pos(y, u, L_imp)

# 3) Układ równań:
#    R_yu[k] ≈ Σ_{j=0}^{L_imp-1} g[j] * R_uu[k-j], k=0..L_imp-1
#    → A g = b, A – macierz Toeplitza z R_uu
A = np.zeros((L_imp, L_imp))
for k in range(L_imp):
    for j in range(L_imp):
        if j <= k:
            A[k, j] = R_uu[k-j]
        else:
            A[k, j] = 0.0

b = R_yu.copy()

# 4) Rozwiązanie w sensie najmniejszych kwadratów (na wypadek szumu)
g_est, *_ = np.linalg.lstsq(A, b, rcond=None)

# 5) Odpowiedź impulsowa i skokowa
h_corr = g_est             # estymowana odpowiedź impulsowa
t_imp_corr = np.arange(L_imp) * dt

step_corr = np.cumsum(h_corr) * dt
g_final_corr = step_corr[-1]          # wartość ustalona
threshold_corr = 0.02 * g_final_corr  # 2% wartości końcowej

# 6) Dead time z odpowiedzi skokowej z korelacji
idx_dead_corr = np.argmax(step_corr > threshold_corr)
t_dead_corr = idx_dead_corr * dt
true_dead = delay_steps * dt

print(f"[Korelacja] Estymowany dead time:   {t_dead_corr:.3f} s")
print(f"[Model]     Prawdziwy dead time:    {true_dead:.3f} s")
print(f"Idx dead (corr vs model): {idx_dead_corr} vs {delay_steps}")

# 7) Dodatkowo: 'proste' oszacowanie z maksimum R_yu[k]
idx_max_Ryu = np.argmax(np.abs(R_yu))
t_lag_max = idx_max_Ryu * dt
print(f"[Korelacja] Lag max |R_yu| (proste oszacowanie): {t_lag_max:.3f} s")

# ============================================================
# WIZUALIZACJA – odpowiedź impulsowa i skokowa z korelacji
# ============================================================

fig_imp_corr = go.Figure()
fig_imp_corr.add_trace(go.Scatter(x=t_imp_corr, y=h_corr,
                                  mode="lines", name="h_corr[n] (impulse)"))
fig_imp_corr.add_vline(x=true_dead,
                       line=dict(dash="dash"),
                       annotation_text="true dead time",
                       annotation_position="top right")
fig_imp_corr.add_vline(x=t_dead_corr,
                       line=dict(dash="dot"),
                       annotation_text="est dead time (corr)",
                       annotation_position="bottom right")
fig_imp_corr.update_layout(
    title="Odpowiedź impulsowa (metoda korelacyjna)",
    xaxis_title="t [s]",
    yaxis_title="h_corr(t)"
)
fig_imp_corr.show()

fig_step_corr = go.Figure()
fig_step_corr.add_trace(go.Scatter(x=t_imp_corr, y=step_corr,
                                   mode="lines", name="step (corr)"))
fig_step_corr.add_hline(y=g_final_corr, line=dict(dash="dash"),
                        annotation_text="wartość ustalona",
                        annotation_position="top right")
fig_step_corr.add_hline(y=threshold_corr, line=dict(dash="dot"),
                        annotation_text="próg 2%",
                        annotation_position="bottom right")
fig_step_corr.add_vline(x=true_dead,
                        line=dict(dash="dash"),
                        annotation_text="true dead time",
                        annotation_position="top left")
fig_step_corr.add_vline(x=t_dead_corr,
                        line=dict(dash="dot"),
                        annotation_text="est dead time (corr)",
                        annotation_position="bottom left")
fig_step_corr.update_layout(
    title="Odpowiedź skokowa z metody korelacyjnej + estymacja dead time",
    xaxis_title="t [s]",
    yaxis_title="y_step_corr(t)"
)
fig_step_corr.show()


Liczba próbek do identyfikacji (PRBS): 7600
[Korelacja] Estymowany dead time:   14.200 s
[Model]     Prawdziwy dead time:    5.300 s
Idx dead (corr vs model): 142 vs 53
[Korelacja] Lag max |R_yu| (proste oszacowanie): 13.200 s


Ustalenie horyzontu predykcji z dead time

In [28]:
import numpy as np

# ============================================================
# KROK 1: HORYZONT PREDYKCJI = DEAD TIME
# ============================================================

h_dead_steps = delay_steps          # horyzont w próbkach
h_dead_time  = h_dead_steps * dt    # horyzont w sekundach

print(f"Horyzont predykcji: {h_dead_steps} próbek ≈ {h_dead_time:.3f} s")

# ============================================================
# KROK 2: SYGNAŁ DOCELOWY (pseudoCV) = CV(t + L)
# ============================================================
# Uwaga: używam CV_clean jako "idealnego" CV bez opóźnienia.
# Jeśli chcesz uczyć sieć na zaszumionych etykietach, zamień na df['CV'].

# 3a) wybór fragmentu do ML – na razie tylko PRBS (bardziej informatywny)
USE_ONLY_PRBS = True

if USE_ONLY_PRBS:
    df_ml = df[df["phase"] == "PRBS"].copy()
else:
    df_ml = df.copy()

df_ml = df_ml.reset_index(drop=True)

# 3b) tworzymy kolumnę CV_target przesuniętą o -h_dead_steps
df_ml["CV_target"] = df_ml["CV_clean"].shift(-h_dead_steps)

# obcinamy końcówkę, gdzie brak etykiet
df_ml = df_ml.iloc[:-h_dead_steps].copy()
df_ml.reset_index(drop=True, inplace=True)

print("df_ml columns:", df_ml.columns)
print(df_ml.head())

# ============================================================
# KROK 3: OKNA CZASOWE POD LSTM
# ============================================================
# Wybieramy cechy wejściowe (na razie: MV + CV z zaszumionego pomiaru)
feature_cols = ["MV", "CV"]   # można później rozszerzyć
target_col   = "CV_target"

window_len = 50  # liczba próbek historii na wejściu

data   = df_ml[feature_cols].values   # shape (N_ml, n_features)
target = df_ml[target_col].values     # shape (N_ml,)

N_ml, n_features = data.shape
print(f"Liczba próbek ML (po obcięciu): {N_ml}, liczba cech: {n_features}")

X_list = []
y_list = []

# dla indeksu t bierzemy okno [t-window_len+1 .. t] jako wejście,
# a etykietą jest CV_target[t] = CV_clean[t + h_dead_steps]
for t_idx in range(window_len - 1, N_ml):
    X_window = data[t_idx - window_len + 1 : t_idx + 1, :]  # (window_len, n_features)
    y_value  = target[t_idx]

    X_list.append(X_window)
    y_list.append(y_value)

X = np.stack(X_list)   # (n_samples, window_len, n_features)
y = np.array(y_list)   # (n_samples,)

print(f"X shape: {X.shape}  (n_samples, window_len, n_features)")
print(f"y shape: {y.shape}  (n_samples,)")

# ============================================================
# KROK 4: PODZIAŁ NA TRAIN / VAL / TEST (CZASOWO)
# ============================================================

n_samples = X.shape[0]
train_end = int(0.6 * n_samples)
val_end   = int(0.8 * n_samples)

X_train, y_train = X[:train_end],        y[:train_end]
X_val,   y_val   = X[train_end:val_end], y[train_end:val_end]
X_test,  y_test  = X[val_end:],          y[val_end:]

print(f"Train samples: {X_train.shape[0]}")
print(f"Val   samples: {X_val.shape[0]}")
print(f"Test  samples: {X_test.shape[0]}")

# ============================================================
# KROK 5: NORMALIZACJA (NA PODSTAWIE TRAIN)
# ============================================================

# spłaszczamy wymiar czasowy, żeby policzyć statystyki po cechach
X_train_flat = X_train.reshape(-1, n_features)

feat_mean = X_train_flat.mean(axis=0)
feat_std  = X_train_flat.std(axis=0) + 1e-8   # żeby nie dzielić przez 0

def normalize_X(X, mean, std):
    return (X - mean) / std

X_train_norm = normalize_X(X_train, feat_mean, feat_std)
X_val_norm   = normalize_X(X_val,   feat_mean, feat_std)
X_test_norm  = normalize_X(X_test,  feat_mean, feat_std)

# (opcjonalnie) normalizacja y – często pomaga w regresji
y_mean = y_train.mean()
y_std  = y_train.std() + 1e-8

y_train_norm = (y_train - y_mean) / y_std
y_val_norm   = (y_val   - y_mean) / y_std
y_test_norm  = (y_test  - y_mean) / y_std

print("feat_mean:", feat_mean)
print("feat_std :", feat_std)
print("y_mean:", y_mean, " y_std:", y_std)

# Na tym etapie masz:
#  - X_train_norm, y_train_norm
#  - X_val_norm,   y_val_norm
#  - X_test_norm,  y_test_norm
# gotowe do podania do modelu LSTM w Keras.


Horyzont predykcji: 53 próbek ≈ 5.300 s
df_ml columns: Index(['t', 'MV', 'CV', 'CV_clean', 'noise', 'phase', 'CV_target'], dtype='object')
      t    MV         CV   CV_clean     noise phase  CV_target
0  40.0  90.0  51.939735  52.230852 -0.291118  PRBS  47.974337
1  40.1  90.0  51.941011  52.126433 -0.185422  PRBS  47.931070
2  40.2  90.0  52.202633  52.022765  0.179868  PRBS  47.897778
3  40.3  90.0  51.905934  51.919863 -0.013929  PRBS  47.874351
4  40.4  90.0  52.026234  51.817740  0.208494  PRBS  47.860678
Liczba próbek ML (po obcięciu): 7547, liczba cech: 2
X shape: (7498, 50, 2)  (n_samples, window_len, n_features)
y shape: (7498,)  (n_samples,)
Train samples: 4498
Val   samples: 1500
Test  samples: 1500
feat_mean: [53.05913739 52.8585122 ]
feat_std : [39.88284944 14.91300027]
y_mean: 53.04567484866972  y_std: 14.84184846473113
