In [None]:
#Closed-Loop Prediction：
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from pathlib import Path
from sklearn.preprocessing import StandardScaler

desktop    = Path.home() / "Desktop"
excel_path = desktop / "data2.xlsx"
df = pd.read_excel(excel_path, sheet_name="Sheet1", header=None)

u_drive   = df.iloc[2:, 0].astype(float).to_numpy()
angle_rad = df.iloc[2:, 1].astype(float).to_numpy()
T = len(u_drive)
import numpy as np

angle_rad = np.mod(angle_rad, 2*np.pi)

dt   = 0.01
time = np.arange(T) * dt

sin_ang = np.sin(angle_rad)
cos_ang = np.cos(angle_rad)
U = np.stack([sin_ang, cos_ang, u_drive], axis=1)  # (T,3)
Y = angle_rad.reshape(-1,1)    

idx_split = min(int(800.0/dt), T)
U_train, U_test = U[:idx_split], U[idx_split:]
Y_train, Y_test = Y[:idx_split], Y[idx_split:]
time_test       = time[idx_split:]

scaler_U  = StandardScaler().fit(U_train)
scaler_Y  = StandardScaler().fit(Y_train)
U_train_s = scaler_U.transform(U_train)
Y_train_s = scaler_Y.transform(Y_train)

U_test_s  = scaler_U.transform(U_test)
               
class ESN:
    def __init__(self, in_dim, res_dim, out_dim=1,
                 spectral_radius=1.00, leaking_rate=0.4,
                 sparsity=0.1, ridge_param=1e-8, random_state=None):
        rs = np.random.RandomState(random_state)
        self.alpha = leaking_rate
        self.Win = rs.uniform(-1,1,(res_dim, in_dim))
        W = rs.uniform(-1,1,(res_dim, res_dim))
        W[rs.rand(*W.shape) > sparsity] = 0
        W *= spectral_radius / np.max(np.abs(np.linalg.eigvals(W)))
        self.W = W
        self.ridge = ridge_param
        self.res_dim = res_dim               
    def _update(self, x, u):
        x_new = np.tanh(self.Win.dot(u) + self.W.dot(x))
        return (1 - self.alpha)*x + self.alpha*x_new

    def fit(self, U, Y, washout=86):
        T = U.shape[0]
        X = np.zeros((T-washout, self.res_dim))
        x = np.zeros(self.res_dim)
        for t in range(T):
            x = self._update(x, U[t])
            if t>=washout:
                X[t-washout] = x
        Yt = Y[washout:]
        self.Wout = np.linalg.solve(
            X.T.dot(X) + self.ridge*np.eye(self.res_dim),
            X.T.dot(Yt)
        ).T

    def closed_loop_predict(self, u_drive, angle0, scaler_U, scaler_Y, washout=100):
        
        T = len(u_drive)
       
        x = np.zeros(self.res_dim)
       
        for t in range(washout):
            sin0, cos0 = np.sin(angle0), np.cos(angle0)
            u_phys = np.array([sin0, cos0, u_drive[t]])
            u_s    = scaler_U.transform(u_phys.reshape(1,-1))[0]
            x      = self._update(x, u_s)
            angle0 = angle_rad[t]  

        Y_pred_s = np.zeros((T-washout,1))
        for t in range(washout, T):
           
            y_s = self.Wout.dot(x)
            Y_pred_s[t-washout] = y_s

            y_phys   = scaler_Y.scale_[0] * y_s + scaler_Y.mean_[0]
            angle_p  = y_phys if np.isscalar(y_phys) else y_phys[0]

            u_phys   = np.array([np.sin(angle_p), np.cos(angle_p),
                                  u_drive[t] if t < T else u_drive[-1]])
            u_s      = scaler_U.transform(u_phys.reshape(1,-1))[0]

            x = self._update(x, u_s)

        Y_pred = Y_pred_s[:,0] * scaler_Y.scale_[0] + scaler_Y.mean_[0]
        return Y_pred

esn = ESN(in_dim=3, res_dim=500, out_dim=1,
          spectral_radius=1.0, leaking_rate=0.4,
          sparsity=0.5, ridge_param=1e-8)

esn.fit(U_train_s, Y_train_s, washout=washout)

Y_pred = esn.closed_loop_predict(
    u_drive=u_drive[idx_split:],       
    angle0=angle_rad[idx_split-1],     
    scaler_U=scaler_U,
    scaler_Y=scaler_Y,
    washout=washout
)

time_pred = time[idx_split+washout:]

angle_true = angle_rad[idx_split+washout:]

plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False

plt.figure(figsize=(10,4))
plt.plot(time,          angle_rad, label='真实 转角', color='blue', alpha=0.5)
plt.plot(time_pred,     Y_pred,     '--', label='闭环预测 转角', color='orange')
plt.axvline(time[idx_split], color='gray', linestyle=':')
plt.xlabel('时间 (s)')
plt.ylabel('转角 (rad)')
plt.title('ESN 闭环预测：真实 vs. 预测')
plt.legend()
plt.tight_layout()
plt.show()

plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False

plt.figure(figsize=(10,4))
plt.plot(time,          angle_rad, label='真实 转角', color='blue', alpha=0.5)
plt.plot(time_pred,     Y_pred,     '--', label='闭环预测 转角', color='orange')
plt.axvline(time[idx_split], color='gray', linestyle=':')
plt.xlabel('时间 (s)')
plt.ylabel('转角 (rad)')
plt.title('ESN 闭环预测：真实 vs. 预测')
plt.legend()
plt.tight_layout()
plt.show()

err = (Y_pred - angle_true + np.pi) % (2 * np.pi) - np.pi
err = np.abs(err)


epsilon = 1

exceed_idxs = np.where(err > epsilon)[0]
if len(exceed_idxs) > 0:
    t_star_idx = exceed_idxs[0]
    t_star     = time_pred[t_star_idx]
    print(f"预测误差首次超过 ε={epsilon:.3f} 的索引: {t_star_idx}, 对应时间 t* = {t_star:.3f} s")
else:
    print(f"在整个预测时间段中，误差始终未超过 ε={epsilon:.3f}")

mask = (time_pred >= 800) & (time_pred <= 850)
plt.figure(figsize=(10,3))
plt.plot(time_pred[mask], err[mask], color='red', label='绝对误差 |ŷ - y|')
plt.axhline(epsilon, color='gray', linestyle='--', label=f'阈值 ε={epsilon}')
if len(exceed_idxs) > 0 and (time_pred[t_star_idx] >= 800) and (time_pred[t_star_idx] <= 1000):
    plt.axvline(time_pred[t_star_idx], color='black', linestyle=':', label=f't* ≈ {time_pred[t_star_idx]:.2f}s')
plt.xlabel('时间 (s)')
plt.ylabel('绝对误差 (rad)')
plt.title('闭环预测误差随时间演化 (800~1000s)')
plt.legend()
plt.tight_layout()
plt.show()

M_fit = 200  
idxs_fit = np.arange(min(M_fit, len(err)))
t_fit = time_pred[idxs_fit]
e_fit = err[idxs_fit]

eps_clip = 1e-8
e_fit_clipped = np.clip(e_fit, eps_clip, None)
log_e_fit = np.log(e_fit_clipped)


coef = np.polyfit(t_fit, log_e_fit, 1)
lambda_est = coef[0]   
intercept_est = coef[1]

print(f"线性拟合所得 λ_max ≈ {lambda_est:.4f}，可预测时间尺度 T_λ = 1/λ ≈ {1/lambda_est:.2f} s")

plt.figure(figsize=(10,3))
plt.plot(t_fit, log_e_fit, '.', label='ln(误差)')
plt.plot(t_fit, lambda_est * t_fit + intercept_est, '--', color='black', label=f'拟合直线: λ={lambda_est:.4f}')
plt.xlabel('时间 (s)')
plt.ylabel('ln(误差)')
plt.title('闭环预测误差指数增长初期拟合 (Lyapunov 指数 估计)')
plt.legend()
plt.tight_layout()
plt.show()

sin_pred = np.sin(Y_pred)
cos_pred = np.cos(Y_pred)
sin_true = np.sin(angle_true)
cos_true = np.cos(angle_true)

plt.figure(figsize=(10,4))
plt.plot(time_pred, sin_true, label='真实 sin(转角)', color='green', alpha=0.7)
plt.plot(time_pred, sin_pred, '--', label='闭环预测 sin(转角)', color='orange', alpha=0.7)
plt.xlabel('时间 (s)')
plt.ylabel('sin(转角)')
plt.title('ESN 闭环预测：sin(转角) 真实 vs. 预测')
plt.legend()
plt.tight_layout()
plt.show()

plt.figure(figsize=(10,4))
plt.plot(time_pred, cos_true, label='真实 cos(转角)', color='purple', alpha=0.7)
plt.plot(time_pred, cos_pred, '--', label='闭环预测 cos(转角)', color='red', alpha=0.7)
plt.xlabel('时间 (s)')
plt.ylabel('cos(转角)')
plt.title('ESN 闭环预测：cos(转角) 真实 vs. 预测')
plt.legend()
plt.tight_layout()
plt.show()



In [None]:


window_sec = 10
dt = 0.01
washout = 86

N = U_train_s.shape[0]
val_frac = 0.2
idx_val_start = int((1-val_frac) * N)
U_fit, U_val = U_train_s[:idx_val_start], U_train_s[idx_val_start:]
Y_fit, Y_val = Y_train_s[:idx_val_start], Y_train_s[idx_val_start:]


u_drive_val = u_drive[idx_val_start:idx_val_start+len(U_val)]
angle_rad_val = angle_rad[idx_val_start:idx_val_start+len(U_val)]

window_steps = int(window_sec / dt)
u_drive_win = u_drive_val[:window_steps]
angle_rad_win = angle_rad_val[:window_steps]

dims = [390,400,500,1000]
results = []

for dim in dims:
    esn = ESN(in_dim=3, res_dim=dim, out_dim=1,
              spectral_radius=2.0, leaking_rate=0.7,
              sparsity=0.6, ridge_param=1e-10, random_state=42)
    esn.fit(U_fit, Y_fit, washout=washout)
    
    Y_pred = esn.closed_loop_predict(
        u_drive=u_drive_win,
        angle0=angle_rad_win[0],
        scaler_U=scaler_U,
        scaler_Y=scaler_Y,
        washout=washout
    )
    
    angle_true = angle_rad_win[washout:]
    
    from sklearn.metrics import mean_squared_error
    mse = mean_squared_error(angle_true, Y_pred)
    print(f"res_dim={dim:<4}  10s窗口闭环MSE={mse:.4f}")
    results.append((dim, mse))

import pandas as pd
df_res = pd.DataFrame(results, columns=['res_dim','MSE'])
print(df_res)

import matplotlib.pyplot as plt
plt.plot(df_res['res_dim'], df_res['MSE'], '-o')
plt.xlabel('Reservoir Dimension')
plt.ylabel('10s Closed-loop MSE')
plt.title('Closed-loop 10s MSE vs Reservoir Dimension')
plt.grid(True)
plt.show()

In [None]:

step = 10
n = min(len(time_pred[::step]), len(sin_true[::step]), len(sin_pred[::step]), len(cos_true[::step]), len(cos_pred[::step]))
x_sub = time_pred[::step][:n]
sin_true_sub = sin_true[::step][:n]
sin_pred_sub = sin_pred[::step][:n]
cos_true_sub = cos_true[::step][:n]
cos_pred_sub = cos_pred[::step][:n]

plt.figure(figsize=(10,4))
plt.plot(x_sub, sin_true_sub,
         '--', color='black', linewidth=0.5, alpha=0.7,
         label='正弦真实 (subsampled)')
plt.plot(x_sub, sin_pred_sub,
         '--', color='orange', linewidth=0.5, alpha=0.7,
         label='闭环正弦预测 (subsampled)')
plt.axvline(time[idx_split], color='gray', linestyle=':')
plt.xlabel('时间 (s)')
plt.ylim(-1, 1)
plt.ylabel('sin(转角)')
plt.title('ESN 预测：sin(转角) 真实 vs. 预测')
plt.legend()
plt.tight_layout()
plt.show()

plt.figure(figsize=(10,4))
plt.plot(x_sub, cos_true_sub,
         '--', color='blue', linewidth=0.5, alpha=0.7,
         label='余弦真实 (subsampled)')
plt.plot(x_sub, cos_pred_sub,
         '--', color='green', linewidth=0.5, alpha=0.7,
         label='闭环余弦预测 (subsampled)')
plt.axvline(time[idx_split], color='gray', linestyle=':')
plt.xlabel('时间 (s)')
plt.ylim(-1, 1)
plt.ylabel('cos(转角)')
plt.title('ESN 预测：cos(转角) 真实 vs. 预测')
plt.legend()
plt.tight_layout()
plt.show()

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from pathlib import Path

sin_pred = np.sin(Y_pred)
cos_pred = np.cos(Y_pred)
sin_true = np.sin(angle_true)
cos_true = np.cos(angle_true)

window_sec = 10      
window_steps = int(window_sec / dt)
n_steps = len(time_pred)
n_windows = int(np.ceil(n_steps / window_steps))

out_dir = Path("prediction_plots")
out_dir.mkdir(exist_ok=True)

for i in range(n_windows):
    start = i * window_steps
    end   = min(start + window_steps, n_steps)
    t_win = time_pred[start:end]
    sin_pred_win = sin_pred[start:end]
    sin_true_win = sin_true[start:end]
    cos_pred_win = cos_pred[start:end]
    cos_true_win = cos_true[start:end]

    plt.figure(figsize=(6,3))
    plt.plot(t_win, sin_true_win, '-', color='blue', label='真实 sin(转角)')
    plt.plot(t_win, sin_pred_win, '--', color='orange', label='预测 sin(转角)')
    plt.xlabel('时间 (s)')
    plt.ylabel('sin(转角)')
    plt.ylim(-1, 1)
    plt.title(f'sin(转角)窗口 {i+1}: {t_win[0]:.1f}s–{t_win[-1]:.1f}s')
    plt.legend(loc='upper right')
    plt.tight_layout()
    plt.show()
   
    plt.figure(figsize=(6,3))
    plt.plot(t_win, cos_true_win, '-', color='green', label='真实 cos(转角)')
    plt.plot(t_win, cos_pred_win, '--', color='red', label='预测 cos(转角)')
    plt.xlabel('时间 (s)')
    plt.ylabel('cos(转角)')
    plt.ylim(-1, 1)
    plt.title(f'cos(转角)窗口 {i+1}: {t_win[0]:.1f}s–{t_win[-1]:.1f}s')
    plt.legend(loc='upper right')
    plt.tight_layout()
    plt.show()
    # plt.savefig(out_dir / f'cos_window_{i+1}.png')  