# Fixed Encoder Denoiser (Adjoint-Compatible, Train Encoder Only)

In [1]:

# Imports & basic setup
import os, json, math, random
import numpy as np
import pennylane as qml
from pennylane import numpy as pnp

# Reproducibility
np.random.seed(42)
random.seed(42)

# Configuration (matches your V9: 4 qubits, 2-latent)
n_qubits = 4
n_latent = 2
signal_wires = [0, 1]   # <- where we want to reconstruct
trash_wires  = [2, 3]

dev = qml.device("default.qubit", wires=n_qubits)

# Scaling helpers (keep differentiable with pnp)
def scale_back(expvals, info):
    return (expvals + 1.0) / 2.0 * (info['scale_high'] - info['scale_low']) + info['scale_low']

def scale_to_angles(x):
    # Expect inputs normalized to [0,1] already; map to [0, pi]
    return x * np.pi

print("Device ready. n_qubits=", n_qubits, " latent=", n_latent)


Device ready. n_qubits= 4  latent= 2


In [None]:

# Load decoder parameters from your data folder (same logic as your original NB)
DATA_PATH = "qae_architectures/jacobs_examples/aintern/data/mackey_glass_n100"
data_folder = "half_qae_v9_compression_success.json"
# Probe for any folder containing the JSON
if os.path.isdir(DATA_PATH):
    for d in os.listdir(DATA_PATH):
        if os.path.isdir(os.path.join(DATA_PATH, d)):
            data_folder = d
            break
if not data_folder:
    raise RuntimeError("Could not locate data folder under " + DATA_PATH)
/Users/jacobzwoniarski/Desktop/qae_architectures/jacobs_examples/aintern/data/mackey_glass_n100/half_qae_v9_compression_success.json
with open(f"{DATA_PATH}/{data_folder}", "r") as f:
    info = json.load(f)

# Prefer the exact V9 success filename; fallback to first matching
cand = [f for f in os.listdir(f"{DATA_PATH}/{data_folder}") if f.startswith("half_qae_v9_compression_success.json")]
if not cand:
    cand = [f for f in os.listdir(f"{DATA_PATH}/{data_folder}") if f.endswith("half_qae_v9_compression_success.json")]
if not cand:
    raise RuntimeError("Could not find half_qae_v9_compression_success.json in the data folder.")

with open(os.path.join(DATA_PATH, data_folder, cand[0]), "r") as f:
    saved = json.load(f)

trained_dec_params = pnp.array(saved["dec_params"], requires_grad=False)
print("Loaded decoder params:", trained_dec_params.shape)


FileNotFoundError: [Errno 2] No such file or directory: 'qae_architectures/jacobs_examples/aintern/data/mackey_glass_n100/half_qae_v9_compression_success.json'

In [None]:

# --- Decoder template (exact V9 latent-only parameterization + light entanglement) ---
def decoder_template(params):
    n_layers = len(params) // (n_latent * 3)
    # Parametrized single-qubit rotations on latent wires only
    for layer in range(n_layers):
        for i in range(n_latent):
            idx = layer * n_latent * 3 + i * 3
            qml.RX(params[idx], wires=i)
            qml.RY(params[idx + 1], wires=i)
            qml.RZ(params[idx + 2], wires=i)
        # light entanglement between latent qubits
        if layer < n_layers - 1:
            qml.CNOT(wires=[0, 1])

    # Trash wires have fixed, *non-trainable* initialization (as in your V9)
    qml.RX(np.pi/3, wires=2);  qml.RY(np.pi/4, wires=2);  qml.RZ(np.pi/6, wires=2)
    qml.RY(-np.pi/3, wires=3); qml.RX(np.pi/5, wires=3); qml.RZ(-np.pi/4, wires=3)


In [None]:

# --- Encoder template as *parameterized adjoint* of the decoder architecture ---
# Key idea: SAME gate skeleton as the decoder, but trained parameters are free (phi).
# Using qml.adjoint on the decoder_template ensures architectural compatibility.
def encoder_template(phi):
    qml.adjoint(decoder_template)(phi)


In [None]:

# Input embedding (your V9 used RY(angle) data embedding on all 4 qubits)
def embed_input(x):
    # x expected shape (..., n_qubits) in [0,1]; we only use first n_qubits values
    for i, val in enumerate(x[:n_qubits]):
        qml.RY(scale_to_angles(val), wires=i)


In [None]:

# Compose: input -> encoder(phi) -> decoder(theta*) -> measure signal wires
@qml.qnode(dev, interface="autograd", diff_method="backprop")
def denoiser_qnode(phi, x_noisy):
    embed_input(x_noisy)
    encoder_template(phi)                     # compress
    decoder_template(trained_dec_params)      # reconstruct with fixed decoder
    return [qml.expval(qml.PauliZ(w)) for w in signal_wires]


In [None]:

# Loss function: MSE between clean targets and decoded predictions (kept as pnp ops)
def batch_loss(phi, clean_batch, noisy_batch):
    preds = []
    tgts  = []
    for c, n in zip(clean_batch, noisy_batch):
        y = pnp.array(denoiser_qnode(phi, n))  # shape (len(signal_wires),)
        y = scale_back(y, info)
        preds.append(y)
        tgts.append(pnp.array(c)[signal_wires])
    preds = pnp.stack(preds, axis=0)
    tgts  = pnp.stack(tgts, axis=0)
    return pnp.mean((preds - tgts) ** 2)


In [None]:

# Parameter initialization: start from the true adjoint of decoder (good starting point)
# For single-qubit Euler angles RX(a)RY(b)RZ(c), the inverse is RZ(-c) RY(-b) RX(-a).
# However, because we built encoder via qml.adjoint(decoder_template)(phi),
# the *matching* seed is simply phi = trained_dec_params (up to sign handled by adjoint).
phi0 = pnp.array(trained_dec_params, requires_grad=True)
print("Encoder params initialized from decoder (adjoint-compatible).")


In [None]:

# Optional: simple noise model for augmentation
def ts_add_noise(y, noise_level=0.05):
    rng = info['scale_high'] - info['scale_low']
    noise = noise_level * rng * np.random.randn(*np.array(y).shape)
    return pnp.array(y) + noise

# Prepare a tiny synthetic demo if your variables are not already in memory.
# If you *already* have: pure_train_windows, pure_val_windows, set USE_DEMO = False.
USE_DEMO = 'pure_train_windows' not in globals()
if USE_DEMO:
    print("No dataset detected in memory. Creating a tiny synthetic demo (sine).")
    xs = np.linspace(0, 2*np.pi, n_qubits)
    clean_series = [0.5 + 0.5*np.sin(xs + phi) for phi in np.linspace(0, 2*np.pi, 128)]
    pure_train_windows = np.array(clean_series[:96])
    pure_val_windows   = np.array(clean_series[96:])
else:
    print("Using in-memory dataset: pure_train_windows / pure_val_windows")

print("Train windows shape:", np.array(pure_train_windows).shape)


In [None]:

# Sanity check: with phi=trained_dec_params (ideal adjoint), the pipeline should be close to identity on signal_wires
def sanity_identity(phi):
    xs = np.linspace(0, 2*np.pi, 16)
    clean = [0.5 + 0.5*np.sin(xs + a) for a in np.linspace(0, 2*np.pi, 24)]
    errs = []
    for w in clean:
        y = pnp.array(denoiser_qnode(phi, w))
        y = scale_back(y, info)
        tgt = pnp.array(w)[signal_wires]
        errs.append(pnp.mean((y - tgt)**2))
    return float(pnp.mean(pnp.stack(errs)))

mse_id = sanity_identity(trained_dec_params)
mse_init = sanity_identity(phi0)
print(f"Sanity MSE with phi=decoder params (ideal adjoint seed): {mse_id:.6f}")
print(f"Sanity MSE with phi0 (our init): {mse_init:.6f}")


In [None]:

# Training loop (Adam on autograd)
opt = qml.AdamOptimizer(stepsize=0.004)
n_epochs = 60
batch_size = 32

def iterate_minibatches(X, B):
    idx = np.random.permutation(len(X))
    for i in range(0, len(X), B):
        yield idx[i:i+B]

phi = phi0
best_val = float("inf")
best_phi = None
patience = 10
no_improve = 0

for epoch in range(n_epochs):
    # Training
    train_cost = 0.0; batches = 0
    for idx in iterate_minibatches(pure_train_windows, batch_size):
        clean_batch = pure_train_windows[idx]
        noisy_batch = [ts_add_noise(w, np.random.uniform(0.03, 0.08)) for w in clean_batch]
        def loss_fn(p): return batch_loss(p, clean_batch, noisy_batch)
        phi, cost = opt.step_and_cost(loss_fn, phi)
        train_cost += cost; batches += 1
    train_cost /= max(batches, 1)

    # Validation (no noise)
    val_preds = []
    val_tgts  = []
    for w in pure_val_windows:
        y = pnp.array(denoiser_qnode(phi, w))
        y = scale_back(y, info)
        val_preds.append(y)
        val_tgts.append(pnp.array(w)[signal_wires])
    val_preds = pnp.stack(val_preds, axis=0)
    val_tgts  = pnp.stack(val_tgts,  axis=0)
    val_cost = float(pnp.mean((val_preds - val_tgts)**2))

    # Simple scheduler / early stop
    if val_cost + 1e-6 < best_val:
        best_val = val_cost
        best_phi = phi
        no_improve = 0
    else:
        no_improve += 1
        if no_improve >= patience:
            print("Early stopping.")
            break

    print(f"Epoch {epoch:03d} | train {train_cost:.6f} | val {val_cost:.6f}")

phi_trained = best_phi if best_phi is not None else phi


In [None]:

# Save the trained encoder parameters
out = {
    "encoder_params": list(map(float, np.array(phi_trained))),
    "n_qubits": n_qubits,
    "n_latent": n_latent,
    "signal_wires": signal_wires,
    "trash_wires": trash_wires,
}
out_path = os.path.join(DATA_PATH, data_folder, "adjoint_encoder_qae_model_trained.json")
with open(out_path, "w") as f:
    json.dump(out, f, indent=2)
print("Saved:", out_path)
