## Conjugate Gradient Method

In [None]:
import numpy as np
import matplotlib.image as mpimg
from scipy.ndimage import gaussian_filter


def laplace_dirichlet(u: np.ndarray) -> np.ndarray:
    up = np.pad(u, ((1, 1), (1, 1)), mode="constant", constant_values=0)
    out = (
        -4.0 * up[1:-1, 1:-1]
        + up[0:-2, 1:-1]
        + up[2:, 1:-1]
        + up[1:-1, 0:-2]
        + up[1:-1, 2:]
    )
    return out

def M_matrix(u, v, Ix, Iy, lam):
    Mu = (Ix * Ix) * u + (Ix * Iy) * v - lam * laplace_dirichlet(u)
    Mv = (Ix * Iy) * u + (Iy * Iy) * v - lam * laplace_dirichlet(v)
    return Mu, Mv

def compute_derivatives(I0, I1):
    I0x = np.zeros_like(I0)
    I1x = np.zeros_like(I1)
    I0x[:, :-1] = I0[:, 1:] - I0[:, :-1]
    I0x[:, -1]  = I0[:, -1] - I0[:, -2]
    I1x[:, :-1] = I1[:, 1:] - I1[:, :-1]
    I1x[:, -1]  = I1[:, -1] - I1[:, -2]
    Ix = 0.5 * (I0x + I1x)

    I0y = np.zeros_like(I0)
    I1y = np.zeros_like(I1)
    I0y[:-1, :] = I0[1:, :] - I0[:-1, :]
    I0y[-1, :]  = I0[-1, :] - I0[-2, :]
    I1y[:-1, :] = I1[1:, :] - I1[:-1, :]
    I1y[-1, :]  = I1[-1, :] - I1[-2, :]
    Iy = 0.5 * (I0y + I1y)

    return Ix, Iy


def OF_cg(u0, v0, Ix, Iy, reg, rhsu, rhsv, tol=1e-8, maxit=2000, verbose=False):
    u, v = u0.copy(), v0.copy()
    Au, Av = M_matrix(u, v, Ix, Iy, reg)
    ru, rv = rhsu - Au, rhsv - Av

    def dot2(a1, a2, b1, b2):
        return float(np.sum(a1 * b1) + np.sum(a2 * b2))

    norm_r0 = np.sqrt(dot2(ru, rv, ru, rv))
    if norm_r0 == 0.0:
        return u, v, {"it": 0, "relres": 0.0, "converged": True}

    pu, pv = ru.copy(), rv.copy()
    relres, it = 1.0, 0

    for k in range(1, maxit + 1):
        Apu, Apv = M_matrix(pu, pv, Ix, Iy, reg)
        denom = dot2(pu, pv, Apu, Apv)
        rr = dot2(ru, rv, ru, rv)
        alpha = rr / denom
        u += alpha * pu
        v += alpha * pv
        ru -= alpha * Apu
        rv -= alpha * Apv

        relres = np.sqrt(dot2(ru, rv, ru, rv)) / norm_r0
        if verbose and (k % 10 == 0 or relres < tol):
            print(f"it={k:4d} relres={relres:.3e}")
        if relres < tol:
            it = k
            break

        rr_new = dot2(ru, rv, ru, rv)
        beta = rr_new / rr
        pu, pv = ru + beta * pu, rv + beta * pv
        it = k

    return u, v, {"it": it, "relres": relres, "converged": relres < tol}

def build_right_hand_side(Ix, Iy, It):
    return -Ix * It, -Iy * It

def run_optical_flow(frame0_path, frame1_path, lam=1.0, sigma=1.0, tol=1e-8, maxit=2000):
    I0 = mpimg.imread(frame0_path).astype(float)
    I1 = mpimg.imread(frame1_path).astype(float)
    if I0.ndim == 3:
        I0 = np.mean(I0, axis=2)
        I1 = np.mean(I1, axis=2)

    I0 = gaussian_filter(I0, sigma)
    I1 = gaussian_filter(I1, sigma)

    Ix,Iy = compute_derivatives(I0,I1)
    It = I1 - I0

    rhsu, rhsv = build_right_hand_side(Ix, Iy, It)
    u0, v0 = np.zeros_like(I0), np.zeros_like(I0)

    u, v, info = OF_cg(u0, v0, Ix, Iy, lam, rhsu, rhsv, tol, maxit, verbose=True)

    print(f"CG finished in {info['it']} iterations, relres={info['relres']:.2e}, converged={info['converged']}")
    return u, v, info


frame0_path = "/Users/ethanclement/Documents/NLAProject/frame10.png"
frame1_path = "/Users/ethanclement/Documents/NLAProject/frame11.png"

print(run_optical_flow(frame0_path ,frame1_path ))


it=  10 relres=1.436e+00
it=  20 relres=1.331e+00
it=  30 relres=1.210e+00
it=  40 relres=1.052e+00
it=  50 relres=8.778e-01
it=  60 relres=7.130e-01
it=  70 relres=5.967e-01
it=  80 relres=4.992e-01
it=  90 relres=4.154e-01
it= 100 relres=3.554e-01
it= 110 relres=3.082e-01
it= 120 relres=2.619e-01
it= 130 relres=2.096e-01
it= 140 relres=1.803e-01
it= 150 relres=1.507e-01
it= 160 relres=1.246e-01
it= 170 relres=9.923e-02
it= 180 relres=8.198e-02
it= 190 relres=7.041e-02
it= 200 relres=5.994e-02
it= 210 relres=5.189e-02
it= 220 relres=4.611e-02
it= 230 relres=3.973e-02
it= 240 relres=3.315e-02
it= 250 relres=2.804e-02
it= 260 relres=2.301e-02
it= 270 relres=1.887e-02
it= 280 relres=1.565e-02
it= 290 relres=1.289e-02
it= 300 relres=1.056e-02
it= 310 relres=8.685e-03
it= 320 relres=7.409e-03
it= 330 relres=6.280e-03
it= 340 relres=5.562e-03
it= 350 relres=4.698e-03
it= 360 relres=3.862e-03
it= 370 relres=3.240e-03
it= 380 relres=2.856e-03
it= 390 relres=2.458e-03
it= 400 relres=2.074e-03


In [None]:
import numpy as np
import matplotlib.image as mpimg
from scipy.ndimage import gaussian_filter


def laplace_dirichlet(u: np.ndarray) -> np.ndarray:
    up = np.pad(u, ((1, 1), (1, 1)), mode="constant", constant_values=0)
    out = (
        -4.0 * up[1:-1, 1:-1]
        + up[0:-2, 1:-1]
        + up[2:, 1:-1]
        + up[1:-1, 0:-2]
        + up[1:-1, 2:]
    )
    return out


def M_matrix(u, v, Ix, Iy, lam):
    Mu = (Ix * Ix) * u + (Ix * Iy) * v - lam * laplace_dirichlet(u)
    Mv = (Ix * Iy) * u + (Iy * Iy) * v - lam * laplace_dirichlet(v)
    return Mu, Mv


def OF_cg(u0, v0, Ix, Iy, reg, rhsu, rhsv, tol=1e-8, maxit=2000, verbose=False):
    u, v = u0.copy(), v0.copy()
    Au, Av = M_matrix(u, v, Ix, Iy, reg)
    ru, rv = rhsu - Au, rhsv - Av

    def dot2(a1, a2, b1, b2):
        return float(np.sum(a1 * b1) + np.sum(a2 * b2))

    norm_r0 = np.sqrt(dot2(ru, rv, ru, rv))
    if norm_r0 == 0.0:
        return u, v, {"it": 0, "relres": 0.0, "converged": True}

    pu, pv = ru.copy(), rv.copy()
    relres, it = 1.0, 0

    for k in range(1, maxit + 1):
        Apu, Apv = M_matrix(pu, pv, Ix, Iy, reg)
        denom = dot2(pu, pv, Apu, Apv)
        rr = dot2(ru, rv, ru, rv)
        alpha = rr / denom
        u += alpha * pu
        v += alpha * pv
        ru -= alpha * Apu
        rv -= alpha * Apv

        relres = np.sqrt(dot2(ru, rv, ru, rv)) / norm_r0
        if verbose and (k % 10 == 0 or relres < tol):
            print(f"it={k:4d} relres={relres:.3e}")
        if relres < tol:
            it = k
            break

        rr_new = dot2(ru, rv, ru, rv)
        beta = rr_new / rr
        pu, pv = ru + beta * pu, rv + beta * pv
        it = k

    return u, v, {"it": it, "relres": relres, "converged": relres < tol}


def build_right_hand_side(Ix, Iy, It):
    return -Ix * It, -Iy * It

def run_optical_flow(frame0_path, frame1_path, lam=1.0, sigma=1.0, tol=1e-8, maxit=1000):
    I0 = mpimg.imread(frame0_path).astype(float)
    I1 = mpimg.imread(frame1_path).astype(float)
    if I0.ndim == 3:
        I0 = np.mean(I0, axis=2)
        I1 = np.mean(I1, axis=2)

    I0 = gaussian_filter(I0, sigma)
    I1 = gaussian_filter(I1, sigma)

    # Derivatives (central differences, h=1)
    Ix = 0.5 * ((np.roll(I0, -1, axis=1) - np.roll(I0, 1, axis=1)) + (np.roll(I1, -1, axis=1) - np.roll(I1, 1, axis=1))) / 2
    Iy = 0.5 * ((np.roll(I0, -1, axis=0) - np.roll(I0, 1, axis=0)) + (np.roll(I1, -1, axis=0) - np.roll(I1, 1, axis=0))) / 2
    It = I1 - I0

    rhsu, rhsv = build_right_hand_side(Ix, Iy, It)
    u0, v0 = np.zeros_like(I0), np.zeros_like(I0)

    u, v, info = OF_cg(u0, v0, Ix, Iy, lam, rhsu, rhsv, tol, maxit, verbose=True)

    print(f"CG finished in {info['it']} iterations, relres={info['relres']:.2e}, converged={info['converged']}")
    return u, v, info


def _neighbor_sum(u: np.ndarray) -> np.ndarray:
    up = np.pad(u, ((1, 1), (1, 1)), mode="constant", constant_values=0)
    return (
        up[0:-2, 1:-1] + up[2:, 1:-1] + up[1:-1, 0:-2] + up[1:-1, 2:]
    )


def smoothing(u: np.ndarray,
              v: np.ndarray,
              Ix: np.ndarray,
              Iy: np.ndarray,
              reg: float,
              rhsu: np.ndarray,
              rhsv: np.ndarray,
              level: int,
              s: int) -> tuple[np.ndarray, np.ndarray]:
    """Red–black *block* Gauss–Seidel smoother.
    At each pixel, solves the 2x2 local system exactly using the current neighbors.
    """
    m, n = u.shape
    # checkerboard masks
    ii = np.arange(m)[:, None]
    jj = np.arange(n)[None, :]
    red = ((ii + jj) % 2) == 0
    black = ~red

    Ix2 = Ix * Ix
    Iy2 = Iy * Iy
    Ixy = Ix * Iy
    a11 = Ix2 + 4.0 * reg
    a22 = Iy2 + 4.0 * reg
    a12 = Ixy

    for _ in range(s):
        for mask in (red, black):
            su = _neighbor_sum(u)
            sv = _neighbor_sum(v)
            b1 = rhsu + reg * su
            b2 = rhsv + reg * sv
            det = a11 * a22 - a12 * a12 + 1e-12
            u_new = (b1 * a22 - a12 * b2) / det
            v_new = (a11 * b2 - a12 * b1) / det
            u[mask] = u_new[mask]
            v[mask] = v_new[mask]
    return u, v


def residual(u: np.ndarray,
             v: np.ndarray,
             Ix: np.ndarray,
             Iy: np.ndarray,
             reg: float,
             rhsu: np.ndarray,
             rhsv: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
    Au, Av = M_matrix(u, v, Ix, Iy, reg)
    return rhsu - Au, rhsv - Av


def _restrict2(x: np.ndarray) -> np.ndarray:
    m, n = x.shape
    m2, n2 = m // 2, n // 2
    return 0.25 * (
        x[0:2*m2:2, 0:2*n2:2] + x[1:2*m2:2, 0:2*n2:2]
        + x[0:2*m2:2, 1:2*n2:2] + x[1:2*m2:2, 1:2*n2:2]
    )


def restriction(rhu: np.ndarray,
                rhv: np.ndarray,
                Ix: np.ndarray,
                Iy: np.ndarray) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    r2hu = _restrict2(rhu)
    r2hv = _restrict2(rhv)
    Ix2h = _restrict2(Ix)
    Iy2h = _restrict2(Iy)
    return r2hu, r2hv, Ix2h, Iy2h


def _prolong2(x: np.ndarray) -> np.ndarray:
    m2, n2 = x.shape
    out = np.zeros((2 * m2, 2 * n2), dtype=x.dtype)
    out[0::2, 0::2] = x
    out[1::2, 0::2] = x
    out[0::2, 1::2] = x
    out[1::2, 1::2] = x
    return out


def prolongation(e2hu: np.ndarray,
                 e2hv: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
    return _prolong2(e2hu), _prolong2(e2hv)


def V_cycle(u0: np.ndarray,
            v0: np.ndarray,
            Ix: np.ndarray,
            Iy: np.ndarray,
            reg: float,
            rhsu: np.ndarray,
            rhsv: np.ndarray,
            s1: int,
            s2: int,
            level: int,
            max_level: int) -> tuple[np.ndarray, np.ndarray]:
    """One recursive V-cycle for the coupled optical-flow system."""
    # 1) Pre-smoothing
    u, v = smoothing(u0.copy(), v0.copy(), Ix, Iy, reg, rhsu, rhsv, level, s1)

    # 2) Residual
    rhu, rhv = residual(u, v, Ix, Iy, reg, rhsu, rhsv)

    # 3) Restrict residuals and coefficients
    r2hu, r2hv, Ix2h, Iy2h = restriction(rhu, rhv, Ix, Iy)

    # 4) Coarse-grid correction (solve A e = r on coarse grid)
    if level + 1 >= max_level - 1:
        # Coarsest level: solve with CG
        z0u = np.zeros_like(r2hu)
        z0v = np.zeros_like(r2hv)
        e2hu, e2hv, _ = OF_cg(z0u, z0v, Ix2h, Iy2h, reg, r2hu, r2hv,
                               tol=1e-8, maxit=1000, verbose=False)
    else:
        e2hu, e2hv = V_cycle(np.zeros_like(r2hu), np.zeros_like(r2hv),
                              Ix2h, Iy2h, reg, r2hu, r2hv, s1, s2, level + 1, max_level)

    # 5) Prolongate and correct
    ehu, ehv = prolongation(e2hu, e2hv)
    u += ehu
    v += ehv

    # 6) Post-smoothing
    u, v = smoothing(u, v, Ix, Iy, reg, rhsu, rhsv, level, s2)
    return u, v


def MG_solve(u0: np.ndarray,
             v0: np.ndarray,
             Ix: np.ndarray,
             Iy: np.ndarray,
             reg: float,
             rhsu: np.ndarray,
             rhsv: np.ndarray,
             levels: int = 4,
             s1: int = 2,
             s2: int = 2,
             max_cycles: int = 50,
             tol: float = 1e-8,
             verbose: bool = False) -> tuple[np.ndarray, np.ndarray, dict]:
    """Multigrid V-cycle solver (as an outer iteration)."""
    u, v = u0.copy(), v0.copy()
    rhu, rhv = residual(u, v, Ix, Iy, reg, rhsu, rhsv)
    nr0 = float(np.sqrt(np.sum(rhu * rhu) + np.sum(rhv * rhv)))
    if nr0 == 0.0:
        return u, v, {"cycles": 0, "relres": 0.0, "converged": True}

    relres = 1.0
    for it in range(1, max_cycles + 1):
        u, v = V_cycle(u, v, Ix, Iy, reg, rhsu, rhsv, s1, s2, level=0, max_level=levels)
        rhu, rhv = residual(u, v, Ix, Iy, reg, rhsu, rhsv)
        relres = float(np.sqrt(np.sum(rhu * rhu) + np.sum(rhv * rhv))) / nr0
        if verbose:
            print(f"V{it:03d}: relres={relres:.3e}")
        if relres < tol:
            return u, v, {"cycles": it, "relres": relres, "converged": True}
    return u, v, {"cycles": max_cycles, "relres": relres, "converged": relres < tol}


def run_optical_flow_mg(frame0_path: str,
                        frame1_path: str,
                        lam: float = 1.0,
                        sigma: float = 1.0,
                        levels: int = 4,
                        s1: int = 2,
                        s2: int = 2,
                        max_cycles: int = 50,
                        tol: float = 1e-8):
    """Driver that loads frames, builds derivatives, and solves via multigrid."""
    I0 = mpimg.imread(frame0_path).astype(float)
    I1 = mpimg.imread(frame1_path).astype(float)
    if I0.ndim == 3:
        I0 = np.mean(I0, axis=2)
        I1 = np.mean(I1, axis=2)

    I0 = gaussian_filter(I0, sigma)
    I1 = gaussian_filter(I1, sigma)

    # Simple symmetric (across frames) central differences (h=1)
    Ix = 0.5 * ((np.roll(I0, -1, axis=1) - np.roll(I0, 1, axis=1))
                + (np.roll(I1, -1, axis=1) - np.roll(I1, 1, axis=1))) / 2.0
    Iy = 0.5 * ((np.roll(I0, -1, axis=0) - np.roll(I0, 1, axis=0))
                + (np.roll(I1, -1, axis=0) - np.roll(I1, 1, axis=0))) / 2.0
    It = I1 - I0

    rhsu, rhsv = build_right_hand_side(Ix, Iy, It)
    u0 = np.zeros_like(I0)
    v0 = np.zeros_like(I0)

    u, v, info = MG_solve(u0, v0, Ix, Iy, lam, rhsu, rhsv,
                           levels=levels, s1=s1, s2=s2,
                           max_cycles=max_cycles, tol=tol, verbose=True)

    print(f"MG finished in {info['cycles']} cycles, relres={info['relres']:.2e}, converged={info['converged']}")
    return u, v, info

# ------------------------------------------------------------
# PCG: Conjugate Gradient with Multigrid V-cycle preconditioner
# ------------------------------------------------------------

def MG_preconditioner(ru: np.ndarray,
                      rv: np.ndarray,
                      Ix: np.ndarray,
                      Iy: np.ndarray,
                      reg: float,
                      levels: int = 4,
                      s1: int = 1,
                      s2: int = 1) -> tuple[np.ndarray, np.ndarray]:
    """Apply one multigrid V-cycle as a preconditioner.

    Solves approximately M z = r for (z_u, z_v).
    Uses zero initial guess so that the mapping r -> z is linear.
    For use in PCG, choose symmetric V-cycle parameters (same hierarchy,
    fixed s1/s2) to keep the preconditioner close to SPD.
    """
    zu0 = np.zeros_like(ru)
    zv0 = np.zeros_like(rv)
    zu, zv = V_cycle(zu0, zv0, Ix, Iy, reg, ru, rv,
                     s1=s1, s2=s2, level=0, max_level=levels)
    return zu, zv


def OF_pcg(u0: np.ndarray,
           v0: np.ndarray,
           Ix: np.ndarray,
           Iy: np.ndarray,
           reg: float,
           rhsu: np.ndarray,
           rhsv: np.ndarray,
           tol: float = 1e-8,
           maxit: int = 2000,
           levels: int = 4,
           s1: int = 1,
           s2: int = 1,
           verbose: bool = False) -> tuple[np.ndarray, np.ndarray, dict]:
    """Preconditioned CG for the optical flow system.

    Uses one multigrid V-cycle as left preconditioner in each iteration.

    Requirements for CG are met by:
    - System matrix M being SPD (Dirichlet case as in the project).
    - Preconditioner defined by a fixed, linear, and (approximately) SPD
      MG operator (fixed levels, s1, s2, zero initial guess in MG_preconditioner).
    """
    # Helper inner product over coupled (u,v)
    def dot2(a1, a2, b1, b2):
        return float(np.sum(a1 * b1) + np.sum(a2 * b2))

    # Initial guess
    u = u0.copy()
    v = v0.copy()

    # Initial residual r0 = b - A x0
    Au, Av = M_matrix(u, v, Ix, Iy, reg)
    ru = rhsu - Au
    rv = rhsv - Av

    # Preconditioned residual z0 = P^{-1} r0 via one V-cycle
    zu, zv = MG_preconditioner(ru, rv, Ix, Iy, reg,
                               levels=levels, s1=s1, s2=s2)

    rz_old = dot2(ru, rv, zu, zv)
    norm_r0 = np.sqrt(dot2(ru, rv, ru, rv))
    if norm_r0 == 0.0:
        return u, v, {"it": 0, "relres": 0.0, "converged": True}

    pu, pv = zu.copy(), zv.copy()
    relres = 1.0
    it = 0

    for k in range(1, maxit + 1):
        # Matrix-vector product A p_k
        Apu, Apv = M_matrix(pu, pv, Ix, Iy, reg)

        denom = dot2(pu, pv, Apu, Apv)
        if denom <= 0.0:
            if verbose:
                print("PCG breakdown: non-positive (p,Ap) at iter", k)
            break

        alpha = rz_old / denom

        # x_{k+1}
        u += alpha * pu
        v += alpha * pv

        # r_{k+1}
        ru -= alpha * Apu
        rv -= alpha * Apv

        # Check convergence
        relres = np.sqrt(dot2(ru, rv, ru, rv)) / norm_r0
        if verbose and (k % 10 == 0 or relres < tol):
            print(f"it={k:4d} PCG relres={relres:.3e}")
        if relres < tol:
            it = k
            break

        # z_{k+1} = P^{-1} r_{k+1}
        zu, zv = MG_preconditioner(ru, rv, Ix, Iy, reg,
                                   levels=levels, s1=s1, s2=s2)

        rz_new = dot2(ru, rv, zu, zv)
        beta = rz_new / rz_old
        rz_old = rz_new

        # p_{k+1}
        pu = zu + beta * pu
        pv = zv + beta * pv

        it = k

    info = {"it": it, "relres": relres, "converged": relres < tol}
    return u, v, info

print(OF_pcg(u0,v0,Ix,Iy,reg,rhsu,rhsv))

(array([[-2.08127673e-02, -1.88247851e-02, -1.02006710e-02, ...,
        -5.64269868e-06, -1.06682712e-05, -9.65178611e-06],
       [-1.96206979e-02, -1.95824232e-02, -1.23194063e-02, ...,
        -7.12307316e-06, -2.06115901e-05, -2.21977611e-05],
       [-1.02542948e-02, -1.21823243e-02, -9.54229271e-03, ...,
        -9.35813802e-07, -1.86332261e-05, -2.51319048e-05],
       ...,
       [-6.31673019e-05, -1.24656229e-04, -1.81578422e-04, ...,
        -1.19952653e-04, -7.79084437e-05, -3.80955834e-05],
       [-4.17788674e-05, -8.25290921e-05, -1.20930228e-04, ...,
        -8.43641417e-05, -5.50651875e-05, -2.72437956e-05],
       [-2.09443697e-05, -4.29451818e-05, -6.59295180e-05, ...,
        -4.48310484e-05, -2.98313343e-05, -1.52767041e-05]],
      shape=(480, 640)), array([[-1.35118876e-02, -1.75846606e-02, -1.03261774e-02, ...,
         3.31561722e-05,  2.47317784e-05,  1.43457452e-05],
       [-1.12227173e-02, -1.52008792e-02, -1.06443823e-02, ...,
         5.71191532e-05,  4.5