
# Simplified pipeline (debug-friendly)

You said the **2D polynomial fit looks good**, so we simplify:

1. Capture webcam frames → resize to `TARGET_SIZE = 4·2^k`.
2. Compute **polynomial reconstruction** `G_poly[t]` for all frames and verify its error vs the raw resized frames.
3. For a **small subset of frames** (e.g., first 5), build the **true 2D gated MPS** from the polynomial patch coefficients and verify MPS quality vs `G_poly[t]`.
4. Compute deltas in *image space*:
   \[
   \Delta_t = G_{\text{poly}}[t] - G_{\text{poly}}[t-1]
   \]
5. Reconstruct by cumulative sum:
   \[
   G_{\text{cum}}[t] = G_{\text{poly}}[0] + \sum_{\tau=1}^{t}\Delta_\tau
   \]
   and plot the **error** `G_cum[t] - G_poly[t]` (should be ~0 up to floating-point).

This avoids mixing “poly-fit error” with “MPS error” and makes failures obvious.


In [None]:
import sys
import numpy as np
import matplotlib.pyplot as plt
import cv2
from itertools import product
import quimb.tensor as qtn

# Uploaded code
sys.path.insert(0, "/mnt/data")
from generate_tt import generate_mps

from mps_2d_encoding_tools import (
    break_up_list,
    convert_to_indices,
    fit_bicubic_to_image,
    reconstruct_from_xy,
    reconstruct_image_from_coefficients,
)


In [None]:
# -----------------------------
# Webcam capture
# -----------------------------
DEVICE_INDEX = 0
N_FRAMES = 30
TARGET_SIZE = 64        # must be 4 * 2^k (64, 128, 256, ...)
SHOW_LIVE = True

cap = cv2.VideoCapture(DEVICE_INDEX)
if not cap.isOpened():
    raise RuntimeError("Could not open webcam. Try DEVICE_INDEX=1, or check OS permissions.")

frames_bgr = []
print("Capturing... (press 'q' to stop early)")
for t in range(N_FRAMES):
    ok, frame = cap.read()
    if not ok:
        print(f"Frame read failed at t={t}")
        break
    frames_bgr.append(frame)
    if SHOW_LIVE:
        cv2.imshow("Live webcam (press q to stop)", frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

cap.release()
cv2.destroyAllWindows()
print(f"Captured {len(frames_bgr)} frames.")


In [None]:
def preprocess_frames(frames_bgr, target_size):
    out = []
    for fr in frames_bgr:
        gray = cv2.cvtColor(fr, cv2.COLOR_BGR2GRAY)
        gray = cv2.resize(gray, (target_size, target_size), interpolation=cv2.INTER_AREA)
        out.append(gray.astype(np.float32) / 255.0)
    return np.stack(out, axis=0)

G = preprocess_frames(frames_bgr, TARGET_SIZE)
T, H, W = G.shape
print("G:", G.shape, "range:", (float(G.min()), float(G.max())))


In [None]:
# Validate TARGET_SIZE = 4 * 2^k and define (k, n)
if TARGET_SIZE % 4 != 0:
    raise ValueError("TARGET_SIZE must be divisible by 4 (bicubic 4x4 patches).")
N = TARGET_SIZE // 4
if (N & (N-1)) != 0:
    raise ValueError("TARGET_SIZE/4 must be power of two (TARGET_SIZE = 4*2^k).")
k = int(np.log2(N))
n = k + 2
assert 2**n == TARGET_SIZE
print(f"Derived: k={k}, n={n}, image size={2**n}")


In [None]:
# -----------------------------
# 1) Polynomial fit for ALL frames + verification
# -----------------------------
G_poly = np.zeros_like(G, dtype=np.float32)
coeffs_list = []   # store (coefficients, patch_coeffs) per frame

for t in range(T):
    coefficients, _ = fit_bicubic_to_image(G[t], k, show_plot=False)
    _, patch_coeffs = reconstruct_image_from_coefficients(coefficients, k)
    # Reconstruct polynomial image (fast, no MPS)
    # patch_coeffs is dict (i,j)->A; reconstruct_image_from_coefficients already returns an image too,
    # but we rely on the function output image from the helper here:
    img_poly, _ = reconstruct_image_from_coefficients(coefficients, k)
    G_poly[t] = np.clip(img_poly.astype(np.float32), 0.0, 1.0)
    coeffs_list.append((coefficients, patch_coeffs))

def psnr(a, b, eps=1e-12):
    mse = float(np.mean((a-b)**2))
    return 10.0 * np.log10(1.0 / max(mse, eps))

psnr_poly = np.array([psnr(G[t], G_poly[t]) for t in range(T)], dtype=float)
print("Poly fit PSNR mean/min:", float(psnr_poly.mean()), float(psnr_poly.min()))


In [None]:
# Show a few poly recon comparisons
def show_poly(t):
    plt.figure(figsize=(10,3))
    ax1 = plt.subplot(1,3,1); ax1.imshow(G[t], cmap="gray", vmin=0, vmax=1); ax1.set_title(f"G[{t}]"); ax1.axis("off")
    ax2 = plt.subplot(1,3,2); ax2.imshow(G_poly[t], cmap="gray", vmin=0, vmax=1); ax2.set_title("G_poly"); ax2.axis("off")
    ax3 = plt.subplot(1,3,3); ax3.imshow(np.abs(G_poly[t]-G[t]), cmap="gray"); ax3.set_title("|poly - raw|"); ax3.axis("off")
    plt.tight_layout(); plt.show()

for t in [0, T//3, 2*T//3, T-1]:
    if 0 <= t < T:
        show_poly(t)

plt.figure()
plt.plot(psnr_poly)
plt.xlabel("frame t"); plt.ylabel("PSNR (dB)")
plt.title("Polynomial fit quality: PSNR(G_poly vs raw resized G)")
plt.grid(True)
plt.show()


In [None]:
# -----------------------------
# 2) Build TRUE MPS for a SMALL SUBSET of frames, verify vs G_poly
# -----------------------------
MAX_FRAMES_MPS = min(5, T)   # keep small; global MPS sum is expensive

CUTOFF_MPS = 1e-8   # compression cutoff used when summing patch MPSs
c, d = 0.0, 1.0     # domain bounds for generate_mps (matches your driver defaults)

def bit_flip(v: int) -> int:
    return abs(1 - v)

def break_pairs_from_generate_mps(mps):
    pairs = break_up_list([core.data for core in mps])  # pairs of (xcore, ycore)
    x_cores, y_cores = [], []
    for xc, yc in pairs:
        x_cores.append(xc)
        y_cores.append(yc)
    return x_cores, y_cores

def get_2d_mps_from_patch(A, n, k, c, d, bits):
    # same gating pattern as your driver
    x_bits = [bit_flip(x) for x in bits[0::2]]
    y_bits = [bit_flip(y) for y in bits[1::2]]

    mps = generate_mps(A, n, c, d)
    x_cores, y_cores = break_pairs_from_generate_mps(mps)

    x_cores[0][:, :, x_bits[0]] = np.zeros((16, 2))
    y_cores[0][:, y_bits[0]] = np.zeros(2)
    for i in range(1, k):
        x_cores[i][:, :, x_bits[i]] = np.zeros((16, 16))
        y_cores[i][:, :, y_bits[i]] = np.zeros((16, 16))

    new_cores = reconstruct_from_xy(xs=x_cores, ys=y_cores)
    return qtn.MatrixProductState(new_cores)

def add_2d_mps(mps_list, cutoff):
    total = mps_list[0]
    for m in mps_list[1:]:
        total.add_MPS(m, inplace=True, compress=True, cutoff=cutoff)
    return total

# Dense->image mapping using convert_to_indices (interleaved bits)
def int_to_bits(i, L):
    return [(i >> (L-1-b)) & 1 for b in range(L)]

def dense_to_image_interleaved(arr, n):
    size = 2**n
    img = np.zeros((size, size), dtype=np.float32)
    L = 2*n
    for idx in range(arr.shape[0]):
        bits = int_to_bits(idx, L)
        ix, iy = convert_to_indices(bits)
        img[ix, iy] = float(np.real(arr[idx]))
    return img

def scale_match(ref2d, img_hat):
    denom = float(np.sum(img_hat * img_hat))
    if denom <= 0:
        return img_hat
    alpha = float(np.sum(ref2d * img_hat)) / denom
    return alpha * img_hat

def mps_storage_kb(M, bytes_per_entry=4):
    arrays = [t.data for t in M.tensors]
    total_entries = int(sum(np.prod(a.shape) for a in arrays))
    return total_entries * bytes_per_entry / 1024.0

G_mps = np.zeros((MAX_FRAMES_MPS, H, W), dtype=np.float32)
psnr_mps_vs_poly = np.zeros(MAX_FRAMES_MPS, dtype=float)
kb_mps = np.zeros(MAX_FRAMES_MPS, dtype=float)
maxbond = np.zeros(MAX_FRAMES_MPS, dtype=int)

for t in range(MAX_FRAMES_MPS):
    _, patch_coeffs = coeffs_list[t]
    mps_list = []
    for bits in product([0,1], repeat=2*k):
        i, j = convert_to_indices(bits)
        A = patch_coeffs[(i, j)]
        mps_list.append(get_2d_mps_from_patch(A, n=n, k=k, c=c, d=d, bits=bits))
    Mtot = add_2d_mps(mps_list, cutoff=CUTOFF_MPS)
    # NOTE: do NOT normalize (keeps intensity scale meaningful)

    arr = np.array(Mtot.to_dense()).reshape(-1)
    img_hat = dense_to_image_interleaved(arr, n=n)
    img_hat = scale_match(G_poly[t], img_hat)  # keep from going black

    G_mps[t] = np.clip(img_hat, 0.0, 1.0).astype(np.float32)
    psnr_mps_vs_poly[t] = psnr(G_poly[t], G_mps[t])
    kb_mps[t] = mps_storage_kb(Mtot, bytes_per_entry=4)
    maxbond[t] = int(Mtot.max_bond())

    print(f"MPS frame {t}: PSNR(MPS vs poly)={psnr_mps_vs_poly[t]:.2f} dB, kb≈{kb_mps[t]:.1f}, maxbond={maxbond[t]}")

print("Mean PSNR(MPS vs poly) over subset:", float(psnr_mps_vs_poly.mean()))


In [None]:
# Visualize subset MPS vs poly (and error)
def show_mps(t):
    plt.figure(figsize=(10,3))
    ax1 = plt.subplot(1,3,1); ax1.imshow(G_poly[t], cmap="gray", vmin=0, vmax=1); ax1.set_title("G_poly"); ax1.axis("off")
    ax2 = plt.subplot(1,3,2); ax2.imshow(G_mps[t], cmap="gray", vmin=0, vmax=1); ax2.set_title("G_mps"); ax2.axis("off")
    ax3 = plt.subplot(1,3,3); ax3.imshow(np.abs(G_mps[t]-G_poly[t]), cmap="gray"); ax3.set_title("|mps - poly|"); ax3.axis("off")
    plt.tight_layout(); plt.show()

for t in range(MAX_FRAMES_MPS):
    show_mps(t)

plt.figure()
plt.plot(psnr_mps_vs_poly)
plt.xlabel("frame t"); plt.ylabel("PSNR (dB)")
plt.title("MPS quality vs polynomial image (subset)")
plt.grid(True)
plt.show()


In [None]:
# -----------------------------
# 3) Compute deltas from POLY images and verify cumulative identity
# -----------------------------
Delta = np.zeros_like(G_poly, dtype=np.float32)
Delta[1:] = G_poly[1:] - G_poly[:-1]

G_cum = np.zeros_like(G_poly, dtype=np.float32)
G_cum[0] = G_poly[0]
G_cum[1:] = G_poly[0] + np.cumsum(Delta[1:], axis=0)

err = G_cum - G_poly
err_mse = np.mean(err**2, axis=(1,2))
print("Cumsum identity check (poly): MSE min/mean/max =",
      float(err_mse.min()), float(err_mse.mean()), float(err_mse.max()))


In [None]:
# Plot some delta frames with symmetric contrast
idxs = np.linspace(1, T-1, num=min(6, max(1,T-1)), dtype=int)
v = float(np.percentile(np.abs(Delta[1:]), 99)) if T > 1 else 1.0
v = max(v, 1e-6)

plt.figure(figsize=(12, 2.8))
for kk, t in enumerate(idxs):
    ax = plt.subplot(1, len(idxs), kk+1)
    ax.imshow(Delta[t], cmap="gray", vmin=-v, vmax=v)
    ax.set_title(f"Δ[{t}]")
    ax.axis("off")
plt.tight_layout()
plt.show()

# Plot cumulative reconstruction error images for a few times
def show_cum_err(t):
    plt.figure(figsize=(10,3))
    ax1=plt.subplot(1,3,1); ax1.imshow(G_poly[t], cmap="gray", vmin=0, vmax=1); ax1.set_title("G_poly"); ax1.axis("off")
    ax2=plt.subplot(1,3,2); ax2.imshow(np.clip(G_cum[t],0,1), cmap="gray", vmin=0, vmax=1); ax2.set_title("G_cum"); ax2.axis("off")
    ax3=plt.subplot(1,3,3); ax3.imshow(np.abs(err[t]), cmap="gray"); ax3.set_title("|G_cum - G_poly|"); ax3.axis("off")
    plt.tight_layout(); plt.show()

for t in [0, T//3, 2*T//3, T-1]:
    if 0 <= t < T:
        show_cum_err(t)

plt.figure()
plt.plot(err_mse)
plt.xlabel("frame t"); plt.ylabel("MSE")
plt.title("Error of identity: ||G_poly[0] + ΣΔ - G_poly[t]||^2 (should be ~0)")
plt.grid(True)
plt.show()
