<a href="https://colab.research.google.com/github/georgejoo/FinalCode/blob/master/Untitled0.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:

import numpy as np
import matplotlib.pyplot as plt
from dataclasses import dataclass, field

# =============================================================================
#  CONFIGURATION
# =============================================================================
FS            = 20_000      # Hz
DT            = 1.0 / FS
POLE_PAIRS    = 5

# Kalman
Q_SLOW        = 1e-9
Q_FAST        = 0.01        # FIXED -- do NOT set to R*100
R             = 0.1         # scale: 0.1 * (20 / RPM)
JUMP_THRESH   = 0.25        # V -- enter fast mode
EXIT_THRESH   = 0.08        # V -- exit fast mode  (~4 * NOISE_STD)
SETTLE_STEPS  = 200         # samples -- 10 ms calm window

# PLL (Type-2, critically damped)
BW_HZ         = 8.0
ZETA          = 0.7071
_WN           = 2 * np.pi * BW_HZ
PLL_K1        = 2 * ZETA * _WN     # proportional
PLL_K2        = _WN ** 2           # integral

# Amplitude estimator
GAIN_TAU      = 2.0         # s
GAIN_A_MIN    = 0.3
GAIN_A_MAX    = 3.0
NORM_CLAMP    = 1.5

# Speed output smoother (display only -- NOT in PLL loop)
SPEED_TAU_OUT = 0.05        # s

# =============================================================================
#  SCALAR KALMAN BIAS ESTIMATOR
# =============================================================================
@dataclass
class KalmanBias:
    """
    Scalar Kalman filter for per-channel DC bias estimation.
    One independent instance per sensor channel.

    State   x = b  [V]
    Process     b[k] = b[k-1] + w,   w ~ N(0, Q)
    Measurement z[k] = y[k] - A*trig(theta_hat) = b_true + leakage + noise

    Q_FAST is FIXED at 0.01 -- decoupled from R.
        Setting Q_FAST = R*100 breaks fast-mode convergence for large R
        because P never settles when Q_FAST >> P_ss.

    R controls slow-mode tau_eff:
        tau_eff = 1 / (K_ss * Fs)
        Rule: tau_eff >> T_elec = 1 / f_elec
        At 20 RPM (f_elec=1.67 Hz, T_elec=600 ms): R=0.1 -> tau_eff=500 ms

    Adaptive Q state machine (per channel, independent):
        slow -> fast : |innovation| > JUMP_THRESH  (step detected)
        fast -> slow : |innovation| < EXIT_THRESH  for SETTLE_STEPS samples
    """
    dt:            float = DT
    Q_slow:        float = Q_SLOW
    Q_fast:        float = Q_FAST
    R:             float = R
    jump_thresh:   float = JUMP_THRESH
    exit_thresh:   float = EXIT_THRESH
    settle_steps:  int   = SETTLE_STEPS

    x:          float = field(default=0.0,   init=False)
    P:          float = field(default=1.0,   init=False)
    K:          float = field(default=0.0,   init=False)
    innov:      float = field(default=0.0,   init=False)
    fast_mode:  bool  = field(default=False, init=False)
    calm_count: int   = field(default=0,     init=False)

    def reset(self):
        self.x = 0.0; self.P = 1.0; self.K = 0.0; self.innov = 0.0
        self.fast_mode = False; self.calm_count = 0

    def update(self, y: float, A: float, trig_val: float) -> float:
        """
        y        : raw sensor sample              [V]
        A        : channel amplitude gain         [V/V]
        trig_val : sin(theta_hat) or cos(theta_hat) [-]
        Returns  : bias estimate                  [V]
        """
        Q       = self.Q_fast if self.fast_mode else self.Q_slow
        P_prior = self.P + Q

        z          = y - A * trig_val
        self.innov = z - self.x

        if not self.fast_mode:
            if abs(self.innov) > self.jump_thresh:
                self.fast_mode = True;  self.calm_count = 0
        else:
            if abs(self.innov) < self.exit_thresh:
                self.calm_count += 1
                if self.calm_count >= self.settle_steps:
                    self.fast_mode = False;  self.calm_count = 0
            else:
                self.calm_count = 0

        self.K = P_prior / (P_prior + self.R)
        self.x = self.x + self.K * self.innov
        self.P = (1.0 - self.K) * P_prior
        return self.x

# =============================================================================
#  KALMAN BIAS + TYPE-2 PLL
# =============================================================================
class KalmanPLL:
    """
    Kalman Bias Estimator + Type-2 PLL.
    No external angle required -- theta_hat derived entirely from y_s, y_c.

    Loop order each sample
    â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€
    1. kf_s.update(y_s, a_s, sin(theta_hat))  -> b_s
       kf_c.update(y_c, a_c, cos(theta_hat))  -> b_c   (independent)
    2. s_n = clip((y_s - b_s) / a_s, +-NORM_CLAMP)
       c_n = clip((y_c - b_c) / a_c, +-NORM_CLAMP)
    3. phi_err = s_n*cos(theta_hat) - c_n*sin(theta_hat)  (cross-product)
    4. w_hat     += dt * K2 * phi_err
       theta_hat += dt * (w_hat + K1 * phi_err)
    5. a_s, a_c updated via sin^2/cos^2-weighted magnitude error
    """

    def __init__(self,
                 dt:          float = DT,
                 pole_pairs:  int   = POLE_PAIRS,
                 BW_hz:       float = BW_HZ,
                 zeta:        float = ZETA,
                 R_kf:        float = R,
                 rpm_nominal: float = 20.0,
                 y_s0:        float = 0.0,
                 y_c0:        float = 1.0):

        self.dt   = dt
        self.pp   = pole_pairs
        self.kf_s = KalmanBias(dt=dt, R=R_kf)
        self.kf_c = KalmanBias(dt=dt, R=R_kf)
        self.b_s = self.b_c = 0.0
        self.a_s = self.a_c = 1.0
        self.tau_gain = GAIN_TAU
        self.A_MIN    = GAIN_A_MIN
        self.A_MAX    = GAIN_A_MAX

        self.theta_hat = np.arctan2(y_s0, y_c0) % (2 * np.pi)
        self.w_hat     = rpm_nominal / 60.0 * pole_pairs * 2 * np.pi

        wn = 2 * np.pi * BW_hz
        self.k1 = 2 * zeta * wn    # proportional
        self.k2 = wn ** 2          # integral

        self._w_filt   = self.w_hat
        self.tau_w_out = SPEED_TAU_OUT
        self.rpm_out   = rpm_nominal
        self.phi_err   = 0.0

    def reset(self, y_s0=0.0, y_c0=1.0, rpm_nominal=20.0):
        self.kf_s.reset();  self.kf_c.reset()
        self.b_s = self.b_c = 0.0
        self.a_s = self.a_c = 1.0
        self.theta_hat = np.arctan2(y_s0, y_c0) % (2 * np.pi)
        self.w_hat = rpm_nominal / 60.0 * self.pp * 2 * np.pi
        self._w_filt = self.w_hat
        self.rpm_out = rpm_nominal
        self.phi_err = 0.0

    def step(self, y_s: float, y_c: float) -> dict:
        """
        Process one sample pair.

        Returns dict:
            theta, rpm, b_s, b_c, a_s, a_c, K_s, K_c, mode_s, mode_c, phi_err
        """
        # 1. Trig at current angle
        s_h = np.sin(self.theta_hat)
        c_h = np.cos(self.theta_hat)

        # 2. Kalman bias -- independent per channel
        self.b_s = self.kf_s.update(y_s, self.a_s, s_h)
        self.b_c = self.kf_c.update(y_c, self.a_c, c_h)

        # 3. Normalise + clamp
        a_s_s = max(self.a_s, self.A_MIN)
        a_c_s = max(self.a_c, self.A_MIN)
        s_n = float(np.clip((y_s - self.b_s) / a_s_s, -NORM_CLAMP,  NORM_CLAMP))
        c_n = float(np.clip((y_c - self.b_c) / a_c_s, -NORM_CLAMP,  NORM_CLAMP))

        # 4. Phase detector (cross-product)
        self.phi_err = s_n * c_h - c_n * s_h

        # 5. Type-2 PLL
        self.w_hat     += self.dt * self.k2 * self.phi_err
        self.theta_hat += self.dt * (self.w_hat + self.k1 * self.phi_err)
        self.theta_hat %= (2 * np.pi)

        # 6. Amplitude gain (sin^2/cos^2 weighting)
        mag_err   = np.sqrt(s_n**2 + c_n**2) - 1.0
        self.a_s += (self.dt / self.tau_gain) * mag_err * s_h**2
        self.a_c += (self.dt / self.tau_gain) * mag_err * c_h**2
        self.a_s  = float(np.clip(self.a_s, self.A_MIN, self.A_MAX))
        self.a_c  = float(np.clip(self.a_c, self.A_MIN, self.A_MAX))

        # 7. Speed output smoother (display only)
        alp           = self.dt / (self.tau_w_out + self.dt)
        self._w_filt += alp * (self.w_hat - self._w_filt)
        self.rpm_out  = self._w_filt / self.pp * 60.0 / (2 * np.pi)

        return dict(
            theta   = self.theta_hat,
            rpm     = self.rpm_out,
            b_s     = self.b_s,       b_c     = self.b_c,
            a_s     = self.a_s,       a_c     = self.a_c,
            K_s     = self.kf_s.K,    K_c     = self.kf_c.K,
            mode_s  = int(self.kf_s.fast_mode),
            mode_c  = int(self.kf_c.fast_mode),
            phi_err = self.phi_err,
        )

# =============================================================================
#  SIMULATION + PLOT  (runs when executed as a script: python kalman_pll.py)
# =============================================================================
if __name__ == "__main__":

    T_TOTAL   = 8.0;   NOISE_STD = 0.02
    GAIN_S    = 1.2;   GAIN_C    = 0.8
    RPM       = 20.0
    BS_BASE   = 0.3;   BC_BASE   = 0.3
    STEP_AMP  = 0.8
    STEP_S_T  = 3.0;   STEP_C_T  = 5.5

    rng = np.random.default_rng(42)
    t   = np.arange(0.0, T_TOTAL, DT);  N = len(t)
    w_elec     = RPM / 60.0 * POLE_PAIRS * 2 * np.pi
    theta_true = (w_elec * t) % (2 * np.pi)
    T_ELEC     = 1.0 / (w_elec / (2 * np.pi))

    bs_true = np.where(t < STEP_S_T, BS_BASE, BS_BASE + STEP_AMP)
    bc_true = np.where(t < STEP_C_T, BC_BASE, BC_BASE + STEP_AMP)
    y_s = GAIN_S * np.sin(theta_true) + bs_true + rng.normal(0, NOISE_STD, N)
    y_c = GAIN_C * np.cos(theta_true) + bc_true + rng.normal(0, NOISE_STD, N)

    # Run estimator
    sys = KalmanPLL(rpm_nominal=RPM, y_s0=y_s[0], y_c0=y_c[0])
    out = [sys.step(y_s[i], y_c[i]) for i in range(N)]

    theta_hat = np.array([o["theta"]   for o in out])
    rpm_out   = np.array([o["rpm"]     for o in out])
    b_s_hat   = np.array([o["b_s"]     for o in out])
    b_c_hat   = np.array([o["b_c"]     for o in out])
    a_s_hat   = np.array([o["a_s"]     for o in out])
    a_c_hat   = np.array([o["a_c"]     for o in out])
    K_s       = np.array([o["K_s"]     for o in out])
    mode_s    = np.array([o["mode_s"]  for o in out])
    mode_c    = np.array([o["mode_c"]  for o in out])
    phi_err   = np.array([o["phi_err"] for o in out])

    def wrap_err(a, b):
        return np.degrees((a - b + np.pi) % (2 * np.pi) - np.pi)

    theta_naive = np.arctan2(y_s, y_c) % (2 * np.pi)
    err_theta   = wrap_err(theta_hat, theta_true)
    err_naive   = wrap_err(theta_naive, theta_true)
    err_bs      = b_s_hat - bs_true
    err_bc      = b_c_hat - bc_true

    win_conv  = (t > 4.5) & (t < 5.4)
    i_s = np.argmax(t >= STEP_S_T);  i_c = np.argmax(t >= STEP_C_T)
    settle_s = np.argmax(np.abs(b_s_hat[i_s:] - (BS_BASE+STEP_AMP)) < 0.05)*DT*1000
    settle_c = np.argmax(np.abs(b_c_hat[i_c:] - (BC_BASE+STEP_AMP)) < 0.05)*DT*1000

    print("=" * 65)
    print(f"  RPM={RPM}  f_elec={w_elec/(2*np.pi):.2f}Hz  T_elec={T_ELEC*1e3:.0f}ms")
    print(f"  R={R}  tau_eff={1/(K_s[win_conv].mean()*FS)*1e3:.0f}ms  Q_fast={Q_FAST}")
    print()
    print(f"  Angle -- naive std:     {err_naive.std():.2f} deg")
    print(f"  Angle -- KF+PLL std:    {err_theta[win_conv].std():.2f} deg")
    print()
    print(f"  B_s ripple (converged): {err_bs[win_conv].std()*1e3:.2f} mV")
    print(f"  B_s settle after step:  {settle_s:.1f} ms")
    print(f"  B_s final (true 1.1V):  {b_s_hat[t>7].mean():.4f} V")
    print()
    print(f"  B_c ripple (converged): {err_bc[win_conv].std()*1e3:.2f} mV")
    print(f"  B_c settle after step:  {settle_c:.1f} ms")
    print(f"  B_c final (true 1.1V):  {b_c_hat[t>7.5].mean():.4f} V")
    print()
    print(f"  a_s={a_s_hat[t>7].mean():.4f} (true={GAIN_S})  "
          f"a_c={a_c_hat[t>7].mean():.4f} (true={GAIN_C})")
    print("=" * 65)

    # â”€â”€ Plot â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€
    plt.rcParams.update({
        "figure.facecolor": "#0f1117", "axes.facecolor": "#0f1117",
        "axes.edgecolor": "#3a3a3a",   "axes.labelcolor": "#cccccc",
        "xtick.color": "#888888",      "ytick.color": "#888888",
        "text.color": "#eeeeee",       "grid.color": "#252525",
        "grid.linestyle": "--",        "grid.linewidth": 0.5,
        "lines.linewidth": 1.6,        "font.size": 9.5,
        "axes.titlesize": 10,          "axes.titlepad": 7,
        "legend.fontsize": 8.5,        "legend.framealpha": 0.15,
    })

    DS = 10; sl = slice(None, None, DS); t_p = t[sl]
    C_TS = "#EF553B"; C_TC = "#636EFA"
    C_ES = "#FFA15A"; C_EC = "#19D3F3"
    C_NA = "#666666"; C_PL = "#00CC96"
    C_RS = "#FF6692"; C_RC = "#AB63FA"

    fig, axes = plt.subplots(4, 2, figsize=(13, 16),
        gridspec_kw=dict(hspace=0.52, wspace=0.30,
                         left=0.07, right=0.97, top=0.93, bottom=0.04))
    fig.suptitle(
        "Kalman Bias + Type-2 PLL  --  No External Angle  --  20 RPM\n"
        "R=0.1  Q_fast=0.01  Q_slow=1e-9  BW=8Hz  zeta=0.707\n"
        "B_s +800mV @ t=3s   |   B_c +800mV @ t=5.5s",
        fontsize=10.5, y=0.97)

    def shade(ax, t0, c):
        ax.axvspan(t0, T_TOTAL, alpha=0.05, color=c)
        ax.axvline(t0, color=c, lw=1.2, ls=":", alpha=0.85)

    ax_ang, ax_ae = axes[0]
    ax_bs,  ax_bc = axes[1]
    ax_es,  ax_ec = axes[2]
    ax_rpm, ax_ph = axes[3]

    # Angle
    ax_ang.plot(t_p, np.degrees(theta_true[sl]), "--", color="#888", lw=2.0,
                label="True", alpha=0.7)
    ax_ang.plot(t_p, np.degrees(theta_naive[sl]), "-", color=C_NA, lw=1.0,
                label="Naive atan2", alpha=0.55)
    ax_ang.plot(t_p, np.degrees(theta_hat[sl]), "-", color=C_PL, lw=1.6,
                label="KF + PLL")
    ax_ang.set_title("Angle Estimate"); ax_ang.set_ylabel("deg")
    ax_ang.set_xlabel("Time (s)"); ax_ang.legend(loc="upper left")
    ax_ang.grid(True); ax_ang.set_xlim(0, T_TOTAL)

    # Angle error
    shade(ax_ae, STEP_S_T, C_TS); shade(ax_ae, STEP_C_T, C_TC)
    ax_ae.plot(t_p, err_naive[sl], "-", color=C_NA, lw=1.0,
               label="Naive", alpha=0.55)
    ax_ae.plot(t_p, err_theta[sl], "-", color=C_PL, lw=1.5, label="KF+PLL")
    ax_ae.axhline(5, color="red", lw=1.0, ls="--", alpha=0.4, label="+-5 deg")
    ax_ae.axhline(-5, color="red", lw=1.0, ls="--", alpha=0.4)
    ax_ae.fill_between(t_p, -5, 5, alpha=0.05, color="#00ff88")
    ax_ae.set_ylim(-80, 80)
    ax_ae.set_title(f"Angle Error  (SS: {err_theta[win_conv].std():.2f} deg std)")
    ax_ae.set_ylabel("deg"); ax_ae.set_xlabel("Time (s)")
    ax_ae.legend(loc="upper right"); ax_ae.grid(True); ax_ae.set_xlim(0, T_TOTAL)

    # Bias sine
    shade(ax_bs, STEP_S_T, C_TS)
    ax_bs.plot(t_p, bs_true[sl], "--", color=C_TS, lw=2.2, label="True B_s", alpha=0.85)
    ax_bs.plot(t_p, b_s_hat[sl], "-", color=C_ES, lw=1.6, label="KF Est. B_s")
    ax_bs.annotate("+800mV perm.",
        xy=(STEP_S_T, BS_BASE+STEP_AMP), xytext=(STEP_S_T+0.4, 0.72),
        color=C_TS, fontsize=8, arrowprops=dict(arrowstyle="->", color=C_TS, lw=0.8))
    ax_bs.set_title(f"Sine Bias  (settle={settle_s:.0f}ms  "
                    f"ripple={err_bs[win_conv].std()*1e3:.1f}mV)")
    ax_bs.set_ylabel("V"); ax_bs.set_xlabel("Time (s)")
    ax_bs.legend(loc="upper left"); ax_bs.grid(True); ax_bs.set_xlim(0, T_TOTAL)

    # Bias cosine
    shade(ax_bc, STEP_C_T, C_TC)
    ax_bc.plot(t_p, bc_true[sl], "--", color=C_TC, lw=2.2, label="True B_c", alpha=0.85)
    ax_bc.plot(t_p, b_c_hat[sl], "-", color=C_EC, lw=1.6, label="KF Est. B_c")
    ax_bc.annotate("+800mV perm.",
        xy=(STEP_C_T, BC_BASE+STEP_AMP), xytext=(STEP_C_T+0.2, 0.72),
        color=C_TC, fontsize=8, arrowprops=dict(arrowstyle="->", color=C_TC, lw=0.8))
    ax_bc.set_title(f"Cosine Bias  (settle={settle_c:.0f}ms  "
                    f"ripple={err_bc[win_conv].std()*1e3:.1f}mV)")
    ax_bc.set_ylabel("V"); ax_bc.set_xlabel("Time (s)")
    ax_bc.legend(loc="upper left"); ax_bc.grid(True); ax_bc.set_xlim(0, T_TOTAL)

    # Bias errors
    shade(ax_es, STEP_S_T, C_TS)
    ax_es.plot(t_p, err_bs[sl]*1e3, "-", color=C_RS, lw=1.1, label="B_s error")
    ax_es.axhline(20, color="red", lw=1.0, ls="--", alpha=0.4, label="+-20 mV")
    ax_es.axhline(-20, color="red", lw=1.0, ls="--", alpha=0.4)
    ax_es.fill_between(t_p, -20, 20, alpha=0.05, color="#00ff88")
    ax_es.set_ylim(-100, 920); ax_es.set_title("Sine Bias Error (mV)")
    ax_es.set_ylabel("mV"); ax_es.set_xlabel("Time (s)")
    ax_es.legend(loc="upper right"); ax_es.grid(True); ax_es.set_xlim(0, T_TOTAL)

    shade(ax_ec, STEP_C_T, C_TC)
    ax_ec.plot(t_p, err_bc[sl]*1e3, "-", color=C_RC, lw=1.1, label="B_c error")
    ax_ec.axhline(20, color="red", lw=1.0, ls="--", alpha=0.4, label="+-20 mV")
    ax_ec.axhline(-20, color="red", lw=1.0, ls="--", alpha=0.4)
    ax_ec.fill_between(t_p, -20, 20, alpha=0.05, color="#00ff88")
    ax_ec.set_ylim(-100, 920); ax_ec.set_title("Cosine Bias Error (mV)")
    ax_ec.set_ylabel("mV"); ax_ec.set_xlabel("Time (s)")
    ax_ec.legend(loc="upper right"); ax_ec.grid(True); ax_ec.set_xlim(0, T_TOTAL)

    # Speed
    ax_rpm.plot(t_p, np.full_like(t_p, RPM), "--", color="#888", lw=2.0,
                label="True RPM", alpha=0.7)
    ax_rpm.plot(t_p, rpm_out[sl], "-", color=C_PL, lw=1.6, label="PLL RPM")
    ax_rpm.set_ylim(0, 40); ax_rpm.set_title("Speed Estimate (PLL)")
    ax_rpm.set_ylabel("RPM"); ax_rpm.set_xlabel("Time (s)")
    ax_rpm.legend(loc="upper right"); ax_rpm.grid(True); ax_rpm.set_xlim(0, T_TOTAL)

    # Phase error + KF mode
    shade(ax_ph, STEP_S_T, C_TS); shade(ax_ph, STEP_C_T, C_TC)
    ax_ph.plot(t_p, phi_err[sl], "-", color=C_ES, lw=0.9, label="phi_err", alpha=0.85)
    ax_ph.axhline(0, color="#888", lw=1.0, ls="--", alpha=0.5)
    ax_ph.set_ylim(-0.5, 0.5)
    ax_K = ax_ph.twinx()
    ax_K.fill_between(t_p, mode_s[sl], 0, alpha=0.25, color=C_ES, label="Fast S")
    ax_K.fill_between(t_p, mode_c[sl], 0, alpha=0.18, color=C_EC, label="Fast C")
    ax_K.set_yticks([0, 1]); ax_K.set_yticklabels(["slow", "fast"], color="#888", fontsize=8)
    ax_K.set_ylim(-0.1, 2.5)
    l1, b1 = ax_ph.get_legend_handles_labels()
    l2, b2 = ax_K.get_legend_handles_labels()
    ax_ph.legend(l1+l2, b1+b2, loc="upper right", fontsize=8)
    ax_ph.set_title("PLL Phase Error + KF Mode")
    ax_ph.set_ylabel("phi_err"); ax_ph.set_xlabel("Time (s)")
    ax_ph.grid(True)
    plt.show()