In [2]:
import pennylane as qml
import numpy as np
from itertools import combinations
from typing import List, Optional, Sequence, Tuple
from sklearn.linear_model import Ridge
from sklearn.metrics import r2_score, mean_squared_error
from copy import deepcopy
from tqdm.auto import tqdm
import time

In [3]:
def generate_damped_oscillator_data(n_train=10, n_test=4, n_points=26, seed=1):
    rng = np.random.Generator(np.random.PCG64(seed))
    t = np.linspace(0, 26, num=n_points)
    n_total = n_train + n_test
    oscis = 0.5 + rng.standard_normal(n_total) * 0.4
    data = np.zeros((n_total, n_points))
    for j in range(n_total):
        omega = oscis[j]
        data[j, :] = (
            np.exp(-0.05 * t) * 
            (np.cos(omega * t) + np.cos(omega * t / np.sqrt(2))) * 30 +
            rng.standard_normal(n_points) * 0.3
        )
    return data

In [4]:
def _build_bitmasks(num_qubits, offset=0):
    masks = []
    for q in range(num_qubits):
        masks.append(1 << (offset + q))
    for i, j in combinations(range(num_qubits), 2):
        masks.append((1 << (offset + i)) | (1 << (offset + j)))
    return masks

def calc_observables(probs, bitmasks):
    exps = np.zeros(len(bitmasks), dtype=float)
    for basis_index, prob in enumerate(probs):
        if prob == 0.0:
            continue
        for obs_idx, mask in enumerate(bitmasks):
            parity = bin(mask & basis_index).count("1") % 2
            sign = -1.0 if parity else 1.0
            exps[obs_idx] += sign * prob
    return exps

def apply_amplitude_damping(probs, gamma_vec):
    damped = probs.astype(float, copy=True)
    if damped.size == 0:
        return damped
    indices = np.arange(damped.size, dtype=np.int64)
    for qubit, gamma in enumerate(gamma_vec):
        gamma = float(np.clip(gamma, 0.0, 1.0))
        if gamma <= 0.0:
            continue
        mask = ((indices >> qubit) & 1).astype(bool)
        if not mask.any():
            continue
        decay = gamma * damped[mask]
        damped[mask] -= decay
        targets = indices[mask] & ~(1 << qubit)
        np.add.at(damped, targets, decay)
    return damped

def build_params(f_b, arccos_zs):
    params = [f_b]
    params.extend(arccos_zs[:-1])
    params.extend(list(arccos_zs) * 4)
    return params

In [5]:
def apply_channel_coupling(observables_stack, coupling_pairs, coupling_strength):
    if not coupling_pairs or coupling_strength <= 0.0:
        return observables_stack
    mixed = observables_stack.copy()
    for i, j in coupling_pairs:
        oi = observables_stack[i]
        oj = observables_stack[j]
        mixed[i] = (1.0 - coupling_strength) * oi + coupling_strength * oj
        mixed[j] = (1.0 - coupling_strength) * oj + coupling_strength * oi
    return mixed

def qrc_circuit_independent(params, num_qubits):
    param_idx = 0
    for i in range(num_qubits):
        qml.RY(params[param_idx], wires=i)
        param_idx += 1
    for i in range(num_qubits):
        for j in range(i + 1, num_qubits):
            qml.CNOT(wires=[i, j])
    for i in range(num_qubits):
        qml.RY(params[param_idx], wires=i)
        param_idx += 1
    for i in range(num_qubits):
        for j in range(i + 1, num_qubits):
            qml.CNOT(wires=[i, j])
    for i in range(num_qubits):
        qml.RY(params[param_idx], wires=i)
        param_idx += 1
    for i in range(num_qubits):
        qml.RY(params[param_idx], wires=i)
        param_idx += 1
    for i in range(num_qubits):
        for j in range(i + 1, num_qubits):
            qml.CNOT(wires=[i, j])
    for i in range(num_qubits):
        qml.RY(params[param_idx], wires=i)
        param_idx += 1
    return qml.probs(wires=range(num_qubits))

def qrc_circuit_entangled(params, coupling_angle, num_qubits, n_reservoirs, entangled_pair_indices):
    param_idx = 0
    total_qubits = num_qubits * n_reservoirs
    for res in range(n_reservoirs):
        offset = res * num_qubits
        for i in range(num_qubits):
            qml.RY(params[param_idx], wires=offset + i)
            param_idx += 1
        for i in range(num_qubits):
            for j in range(i + 1, num_qubits):
                qml.CNOT(wires=[offset + i, offset + j])
        for i in range(num_qubits):
            qml.RY(params[param_idx], wires=offset + i)
            param_idx += 1
        for i in range(num_qubits):
            for j in range(i + 1, num_qubits):
                qml.CNOT(wires=[offset + i, offset + j])
        for i in range(num_qubits):
            qml.RY(params[param_idx], wires=offset + i)
            param_idx += 1
        for i in range(num_qubits):
            qml.RY(params[param_idx], wires=offset + i)
            param_idx += 1
        for i in range(num_qubits):
            for j in range(i + 1, num_qubits):
                qml.CNOT(wires=[offset + i, offset + j])
        for i in range(num_qubits):
            qml.RY(params[param_idx], wires=offset + i)
            param_idx += 1
    
    if coupling_angle != 0.0 and len(entangled_pair_indices) > 0:
        pair_count = len(entangled_pair_indices) // 2
        for p in range(pair_count):
            ctrl_idx = entangled_pair_indices[2 * p]
            tgt_idx = entangled_pair_indices[2 * p + 1]
            qml.CNOT(wires=[ctrl_idx, tgt_idx])
            qml.RY(coupling_angle, wires=tgt_idx)
            qml.CNOT(wires=[ctrl_idx, tgt_idx])
    
    return qml.probs(wires=range(total_qubits))

In [6]:
class QuantumReservoirPennyLane:
    def __init__(self, num_qubits=6, n_reservoirs=3, f_bs=None, b=-0.33,
                 ridge_alpha=3e-4, warmup_steps=5, amplitude_damping=None,
                 coupling_mode="independent", coupling_strength=0.0,
                 coupling_pairs=None):
        
        self.num_qubits = num_qubits
        self.n_reservoirs = n_reservoirs
        
        if f_bs is None:
            default_fb = [0.11, 0.1375, 0.12375]
            self.f_bs = default_fb[:self.n_reservoirs]
        else:
            self.f_bs = list(f_bs)
        
        if len(self.f_bs) != self.n_reservoirs:
            raise ValueError("Length of f_bs must match n_reservoirs.")
        
        self.b = b
        self.ridge_alpha = ridge_alpha
        self.warmup_steps = warmup_steps
        self.total_qubits = self.num_qubits * self.n_reservoirs
        
        self._bitmasks_local = _build_bitmasks(self.num_qubits, offset=0)
        self.n_observables = len(self._bitmasks_local)
        self._bitmasks_entangled = [
            _build_bitmasks(self.num_qubits, offset=res * self.num_qubits)
            for res in range(self.n_reservoirs)
        ]
        
        self.last_outputs = [np.zeros(self.num_qubits) for _ in range(self.n_reservoirs)]
        self.init_qrc_states = deepcopy(self.last_outputs)
        self.amplitude_damping = amplitude_damping
        self._gamma_map = self._prepare_damping(amplitude_damping)
        
        # Coupling config
        valid = {"independent", "channel", "entangled"}
        if coupling_mode not in valid:
            raise ValueError(f"coupling_mode must be in {valid}")
        self.coupling_mode = coupling_mode
        self.coupling_strength = float(np.clip(coupling_strength, 0.0, 1.0))
        
        if self.n_reservoirs < 2:
            self._coupling_pairs = []
        else:
            if coupling_pairs is None:
                self._coupling_pairs = [(i, i + 1) for i in range(self.n_reservoirs - 1)]
            else:
                self._coupling_pairs = coupling_pairs
        
        self._entangled_pair_indices = []
        if self._coupling_pairs:
            for src, dst in self._coupling_pairs:
                ctrl = src * self.num_qubits + (self.num_qubits - 1)
                tgt = dst * self.num_qubits
                self._entangled_pair_indices.extend([ctrl, tgt])
        
        self.ridge = Ridge(alpha=self.ridge_alpha)
        self._setup_devices()
    
    def _setup_devices(self):
        try:
            self.dev_independent = qml.device('lightning.gpu', wires=self.num_qubits)
            self.dev_entangled = qml.device('lightning.gpu', wires=self.total_qubits)
            self._device_type = "lightning.gpu"
        except:
            self.dev_independent = qml.device('lightning.qubit', wires=self.num_qubits)
            self.dev_entangled = qml.device('lightning.qubit', wires=self.total_qubits)
            self._device_type = "lightning.qubit"
    
    def _prepare_damping(self, amplitude_damping: Optional[Sequence[float]]):
        if amplitude_damping is None:
            return None
        if np.isscalar(amplitude_damping):
            gamma = float(np.clip(amplitude_damping, 0.0, 1.0))
            return [np.full(self.num_qubits, gamma, dtype=float) for _ in range(self.n_reservoirs)]
        arr = np.array(amplitude_damping, dtype=float)
        if arr.ndim == 1:
            if arr.size == 1:
                gamma = float(np.clip(arr[0], 0.0, 1.0))
                return [np.full(self.num_qubits, gamma, dtype=float) for _ in range(self.n_reservoirs)]
            if arr.size == self.n_reservoirs:
                return [np.full(self.num_qubits, float(np.clip(g, 0.0, 1.0)), dtype=float) for g in arr]
            if arr.size == self.num_qubits:
                base = np.clip(arr, 0.0, 1.0).astype(float)
                return [base.copy() for _ in range(self.n_reservoirs)]
            if arr.size == self.total_qubits:
                clipped = np.clip(arr, 0.0, 1.0).astype(float)
                return [
                    clipped[i * self.num_qubits:(i + 1) * self.num_qubits].copy()
                    for i in range(self.n_reservoirs)
                ]
        if arr.ndim == 2 and arr.shape == (self.n_reservoirs, self.num_qubits):
            clipped = np.clip(arr, 0.0, 1.0).astype(float)
            return [row.copy() for row in clipped]
        raise ValueError(
            "amplitude_damping must be None, a scalar, length-n_reservoirs, "  "length-num_qubits, length-total_qubits, or (n_reservoirs, num_qubits)."
        )
         

In [7]:
# Attach evolve methods
def _evolve_independent(self, input_value):
    observables = []
    qnode = qml.QNode(lambda p: qrc_circuit_independent(p, self.num_qubits), 
                      self.dev_independent)
    
    for res_idx in range(self.n_reservoirs):
        zs = np.clip(self.last_outputs[res_idx], -1.0, 1.0)
        f_b = input_value * self.b * self.f_bs[res_idx]
        arccos_zs = np.arccos(zs) * self.b
        params_list = build_params(f_b, arccos_zs)
        probs = qnode(params_list)
        if self._gamma_map is not None:
            probs = apply_amplitude_damping(probs, self._gamma_map[res_idx])
        observables.append(calc_observables(probs, self._bitmasks_local))
    
    observables_stack = np.stack(observables)
    if self.coupling_mode == "channel":
        observables_stack = apply_channel_coupling(observables_stack, 
                                                   self._coupling_pairs,
                                                   self.coupling_strength)
    for res_idx in range(self.n_reservoirs):
        self.last_outputs[res_idx] = observables_stack[res_idx, :self.num_qubits]
    
    return observables_stack.reshape(-1)

def _evolve_entangled(self, input_value):
    param_chunks = []
    for res_idx in range(self.n_reservoirs):
        zs = np.clip(self.last_outputs[res_idx], -1.0, 1.0)
        f_b = input_value * self.b * self.f_bs[res_idx]
        arccos_zs = np.arccos(zs) * self.b
        param_chunks.append(build_params(f_b, arccos_zs))
    
    params_flat = np.concatenate(param_chunks)
    coupling_angle = float(self.coupling_strength) * np.pi
    
    qnode = qml.QNode(
        lambda p, a: qrc_circuit_entangled(p, a, self.num_qubits, 
                                          self.n_reservoirs, 
                                          self._entangled_pair_indices),
        self.dev_entangled
    )
    probs = qnode(params_flat, coupling_angle)
    
    if self._gamma_map is not None:
        gamma_flat = np.concatenate(self._gamma_map)
        probs = apply_amplitude_damping(probs, gamma_flat)
    
    observables = []
    for res_idx, bitmasks in enumerate(self._bitmasks_entangled):
        obs = calc_observables(probs, bitmasks)
        self.last_outputs[res_idx] = obs[:self.num_qubits]
        observables.append(obs)
    
    return np.concatenate(observables)

QuantumReservoirPennyLane._evolve_independent = _evolve_independent
QuantumReservoirPennyLane._evolve_entangled = _evolve_entangled

def evolve_qrc(self, input_value):
    if self.coupling_mode == "entangled":
        return self._evolve_entangled(input_value)
    return self._evolve_independent(input_value)

QuantumReservoirPennyLane.evolve_qrc = evolve_qrc

In [8]:
def reset_reservoirs(self):
    self.last_outputs = deepcopy(self.init_qrc_states)

def train(self, train_data):
    all_states, all_targets = [], []
    for series_idx in tqdm(range(train_data.shape[0]), desc="Training", leave=False):
        series = train_data[series_idx]
        self.reset_reservoirs()
        for _ in range(self.warmup_steps):
            _ = self.evolve_qrc(series[0])
        for t in range(len(series) - 1):
            state = self.evolve_qrc(series[t])
            if not np.all(np.isfinite(state)):
                state = np.nan_to_num(state, nan=0.0)
            all_states.append(state)
            all_targets.append(series[t + 1])
    
    X = np.array(all_states)
    y = np.array(all_targets)
    self.ridge.fit(X, y)
    print(f"Training complete: {len(all_states)} samples, {X.shape[1]} features")

def predict(self, test_data, n_predict=20):
    n_test = test_data.shape[0]
    predictions = np.zeros((n_test, n_predict))
    for test_idx in tqdm(range(n_test), desc="Predicting", leave=False):
        series = test_data[test_idx]
        self.reset_reservoirs()
        for _ in range(self.warmup_steps):
            _ = self.evolve_qrc(series[0])
        for t in range(len(series) - 1):
            _ = self.evolve_qrc(series[t])
        last_value = series[-1]
        for step in range(n_predict):
            state = self.evolve_qrc(last_value)
            if not np.all(np.isfinite(state)):
                state = np.nan_to_num(state, nan=0.0)
            last_value = self.ridge.predict(state.reshape(1, -1))[0]
            predictions[test_idx, step] = last_value
    return predictions

QuantumReservoirPennyLane.reset_reservoirs = reset_reservoirs
QuantumReservoirPennyLane.train = train
QuantumReservoirPennyLane.predict = predict

In [None]:
# Damped oscillator dataset (matches benchmark)
import numpy as np

def generate_damped_oscillator_data(n_train=10, n_test=4, n_points=26, seed=1):

    rng = np.random.Generator(np.random.PCG64(seed))
    t = np.linspace(0, 26, num=n_points)
    n_total = n_train + n_test
    oscis = 0.5 + rng.standard_normal(n_total) * 0.4
    data = np.zeros((n_total, n_points))
    for j in range(n_total):
        omega = oscis[j]
        data[j, :] = (
            np.exp(-0.05 * t) *
            (np.cos(omega * t) + np.cos(omega * t / np.sqrt(2))) * 30 +
            rng.standard_normal(n_points) * 0.3
        )
    return data

data = generate_damped_oscillator_data(n_train=10, n_test=4, n_points=26, seed=1)
train_data = data[:10]
test_data = data[10:]
print(f"Train: {train_data.shape}, Test: {test_data.shape}")

Train: (10, 26), Test: (4, 26)


In [12]:
# Experiment runner on damped oscillator benchmark
from sklearn.metrics import r2_score, mean_squared_error

def run_experiment(cfg, train_data, test_data, init_points=6, n_predict=20):
    model = QuantumReservoirPennyLane(**cfg)
    model.train(train_data)
    preds = model.predict(test_data[:, :init_points], n_predict=n_predict)
    truth = test_data[:, init_points:].flatten()
    pred = preds.flatten()
    mse = mean_squared_error(truth, pred)
    rmse = float(np.sqrt(mse))
    nrmse = rmse / (np.std(truth) + 1e-12)
    r2 = r2_score(truth, pred)
    return {"mse": mse, "rmse": rmse, "nrmse": nrmse, "r2": r2, "preds": preds}

In [17]:
# Baseline configs (2 reservoirs for fair/fast compare)
base_cfg = dict(
    num_qubits=6,
    n_reservoirs=3,
    f_bs=[0.08, 0.10, 0.12],
    b=-0.40,
    warmup_steps=8,
    ridge_alpha=3e-4
)

cfg_ind = dict(base_cfg, coupling_mode="independent", coupling_strength=0.0)
cfg_chn = dict(base_cfg, coupling_mode="channel",     coupling_strength=0.20)
cfg_ent = dict(base_cfg, coupling_mode="entangled",   coupling_strength=0.20)

res_ind = run_experiment(cfg_ind, train_data, test_data, init_points=6, n_predict=20)
res_chn = run_experiment(cfg_chn, train_data, test_data, init_points=6, n_predict=20)
res_ent = run_experiment(cfg_ent, train_data, test_data, init_points=6, n_predict=20)

print(f"[independent] MSE={res_ind['mse']:.2f}  NRMSE={res_ind['nrmse']:.4f}  R²={res_ind['r2']:.4f}")
print(f"[channel 0.20] MSE={res_chn['mse']:.2f}  NRMSE={res_chn['nrmse']:.4f}  R²={res_chn['r2']:.4f}")
print(f"[entangled0.20] MSE={res_ent['mse']:.2f}  NRMSE={res_ent['nrmse']:.4f}  R²={res_ent['r2']:.4f}")

Training:   0%|          | 0/10 [00:00<?, ?it/s]

Training complete: 250 samples, 63 features


Predicting:   0%|          | 0/4 [00:00<?, ?it/s]

Training:   0%|          | 0/10 [00:00<?, ?it/s]

Training complete: 250 samples, 63 features


Predicting:   0%|          | 0/4 [00:00<?, ?it/s]

Training:   0%|          | 0/10 [00:00<?, ?it/s]

Training complete: 250 samples, 63 features


Predicting:   0%|          | 0/4 [00:00<?, ?it/s]

[independent] MSE=101.76  NRMSE=0.7317  R²=0.4646
[channel 0.20] MSE=74.53  NRMSE=0.6262  R²=0.6079
[entangled0.20] MSE=62.42  NRMSE=0.5731  R²=0.6716


In [18]:
# Sweep coupling strengths on benchmark
strengths = [0.05, 0.10, 0.20, 0.30]
grid = []

for s in strengths:
    ch = run_experiment(dict(base_cfg, coupling_mode="channel",   coupling_strength=s),
                        train_data, test_data, 6, 20)
    en = run_experiment(dict(base_cfg, coupling_mode="entangled", coupling_strength=s),
                        train_data, test_data, 6, 20)
    grid.append(("channel", s, ch["r2"], ch["nrmse"]))
    grid.append(("entangled", s, en["r2"], en["nrmse"]))
    print(f"channel   s={s:.2f}  R²={ch['r2']:.4f}  NRMSE={ch['nrmse']:.4f}")
    print(f"entangled s={s:.2f}  R²={en['r2']:.4f}  NRMSE={en['nrmse']:.4f}")

print("\nBest per mode:")
for mode in ("channel", "entangled"):
    best = max([(s, r2, n) for (m, s, r2, n) in grid if m == mode], key=lambda x: x[1])
    print(f"{mode:9s} s={best[0]:.2f}  R²={best[1]:.4f}  NRMSE={best[2]:.4f}")

Training:   0%|          | 0/10 [00:00<?, ?it/s]

Training complete: 250 samples, 63 features


Predicting:   0%|          | 0/4 [00:00<?, ?it/s]

Training:   0%|          | 0/10 [00:00<?, ?it/s]

Training complete: 250 samples, 63 features


Predicting:   0%|          | 0/4 [00:00<?, ?it/s]

channel   s=0.05  R²=0.5267  NRMSE=0.6880
entangled s=0.05  R²=0.6420  NRMSE=0.5983


Training:   0%|          | 0/10 [00:00<?, ?it/s]

Training complete: 250 samples, 63 features


Predicting:   0%|          | 0/4 [00:00<?, ?it/s]

Training:   0%|          | 0/10 [00:00<?, ?it/s]

Training complete: 250 samples, 63 features


Predicting:   0%|          | 0/4 [00:00<?, ?it/s]

channel   s=0.10  R²=0.5760  NRMSE=0.6511
entangled s=0.10  R²=0.6413  NRMSE=0.5989


Training:   0%|          | 0/10 [00:00<?, ?it/s]

Training complete: 250 samples, 63 features


Predicting:   0%|          | 0/4 [00:00<?, ?it/s]

Training:   0%|          | 0/10 [00:00<?, ?it/s]

Training complete: 250 samples, 63 features


Predicting:   0%|          | 0/4 [00:00<?, ?it/s]

channel   s=0.20  R²=0.6079  NRMSE=0.6262
entangled s=0.20  R²=0.6716  NRMSE=0.5731


Training:   0%|          | 0/10 [00:00<?, ?it/s]

Training complete: 250 samples, 63 features


Predicting:   0%|          | 0/4 [00:00<?, ?it/s]

Training:   0%|          | 0/10 [00:00<?, ?it/s]

Training complete: 250 samples, 63 features


Predicting:   0%|          | 0/4 [00:00<?, ?it/s]

channel   s=0.30  R²=0.5559  NRMSE=0.6664
entangled s=0.30  R²=0.6082  NRMSE=0.6259

Best per mode:
channel   s=0.20  R²=0.6079  NRMSE=0.6262
entangled s=0.20  R²=0.6716  NRMSE=0.5731


In [19]:
# Inspect best among three fixed-strength runs
def summarize(name, entry):
    return f"{name:12s}  MSE={entry['mse']:.2f}  NRMSE={entry['nrmse']:.4f}  R²={entry['r2']:.4f}"

candidates = {
    "independent": res_ind,
    "channel_0.20": res_chn,
    "ent_0.20": res_ent
}
best_name = max(candidates, key=lambda k: candidates[k]["r2"])
print("Summary:")
for k, v in candidates.items():
    print(summarize(k, v))
print(f"\nBest: {best_name}")

preds = candidates[best_name]["preds"]
truth = test_data[:, 6:]
idx = 0
steps = min(20, truth.shape[1])
print(f"\nSample {idx} (first {steps} steps)")
print("step  expected      predicted     error")
for t in range(steps):
    print(f"{t:>4}  {truth[idx, t]: .6f}  {preds[idx, t]: .6f}  {preds[idx, t]-truth[idx, t]: .6f}")

Summary:
independent   MSE=101.76  NRMSE=0.7317  R²=0.4646
channel_0.20  MSE=74.53  NRMSE=0.6262  R²=0.6079
ent_0.20      MSE=62.42  NRMSE=0.5731  R²=0.6716

Best: ent_0.20

Sample 0 (first 20 steps)
step  expected      predicted     error
   0  -35.578434  -36.168673  -0.590239
   1  -35.761099  -35.182372   0.578727
   2  -28.446608  -27.484777   0.961830
   3  -16.700774  -12.761002   3.939772
   4  -4.642688  -0.493542   4.149147
   5   6.280768   8.207246   1.926478
   6   12.866558   8.942810  -3.923747
   7   14.799790   7.352017  -7.447773
   8   12.937450   3.060487  -9.876963
   9   9.251257  -0.220156  -9.471413
  10   4.377753  -0.695366  -5.073119
  11   1.127388   0.484839  -0.642549
  12  -1.189487   1.423903   2.613390
  13  -1.179924   1.066590   2.246514
  14  -0.221376  -0.191961   0.029415
  15   1.374224  -1.733037  -3.107261
  16   2.064434  -2.941058  -5.005491
  17   2.291690  -3.631353  -5.923043
  18   1.046105  -4.042796  -5.088902
  19  -2.169868  -4.406443 