In [60]:
import numpy as np
import pandas as pd
import pennylane as qml
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score

# === Dataset ===
def make_staircase_dataset(n_samples=1000, random_state=42):
    rng = np.random.default_rng(random_state)

    # Create three uncorrelated signals
    s0 = rng.normal(0, 1, n_samples)
    s1 = rng.normal(0, 1, n_samples)
    s2 = rng.normal(0, 1, n_samples)

    # Generate label from a thresholded linear combination
    raw_score = 1.0 * s0 + 0.8 * s1 + 0.5 * s2
    y = (raw_score > 0).astype(int)

    # Create f0, f1, f2 from those same signals + added noise
    f0 = s0 + rng.normal(0, 0.3, n_samples)
    f1 = s1 + rng.normal(0, 0.5, n_samples)
    f2 = s2 + rng.normal(0, 0.8, n_samples)

    # Add 3 pure noise features
    f3_5 = rng.normal(0, 1.0, size=(n_samples, 3))

    X = np.column_stack([f0, f1, f2, f3_5])
    df = pd.DataFrame(X, columns=[f"f{i}" for i in range(6)])
    df["target"] = y
    return df


# === Angle‐Encoded VQC ===
class FarhiAngleVQC:
    def __init__(self, coeffs):
        """
        coeffs: list of feature coefficients, e.g. [1.0, 0.8, 0.5, 0.0, 0.0, 0.0]
        """
        self.coeffs = np.array(coeffs, dtype=float)
        self.num_qubits = len(self.coeffs)
        wires = list(range(self.num_qubits + 1))  # +1 for readout
        self.dev = qml.device("default.qubit", wires=wires)
        self.opt = qml.optimize.AdamOptimizer(0.1)
        self.num_layers = 1

        # Random init
        np.random.seed(0)
        self.weights = qml.numpy.array(
            (np.pi / 2) * np.random.uniform(-1, 1, size=self.num_layers * self.num_qubits),
            requires_grad=True,
        )
        self.bias = qml.numpy.array(0.0, requires_grad=True)

        self._initialize_circuit()

    def state_preparation(self, x):
        """Angle‐encode each feature x[j] via RX(coeff[j] * x[j] / 2)."""
        for j in range(self.num_qubits):
            #qml.RX(self.coeffs[j] * x[j] / 2, wires=j)
            qml.RX(self.coeffs[j] * np.pi * x[j], wires=j)
        qml.PauliX(wires=self.num_qubits)

    def relationship_unitary(self, x):
        """
        Apply pairwise controlled‐RZ interactions
        weighted by coeff_i * coeff_j.
        """
        for i in range(self.num_qubits):
            for j in range(i + 1, self.num_qubits):
                #angle = self.coeffs[i] * self.coeffs[j]
                #qml.ctrl(qml.RZ, control=i)(angle, wires=j)
                angle = (self.coeffs[i] * self.coeffs[j] * x[i] * x[j]) / 2
                qml.ctrl(qml.RZ, control=i)(angle, wires=j)

    def _initialize_circuit(self):
        @qml.qnode(self.dev, interface="autograd")
        def circuit(weights, x):
            # 1) encode input
            self.state_preparation(x)
        
            # 2) entangle features
            self.relationship_unitary(x)
        
            # 3) variational controlled RXs into readout qubit
            for layer in range(self.num_layers):
                for j in range(self.num_qubits):
                    idx = layer*self.num_qubits + j
                    qml.ctrl(qml.RX, control=j)(-weights[idx], wires=self.num_qubits)
        
            # 4) measurement
            return qml.expval(qml.PauliY(wires=self.num_qubits))


        self.circuit = circuit

    def variational_classifier(self, weights, bias, x):
        return 0.5 * self.circuit(weights, x) + bias

    def cost(self, weights, bias, X, Y):
        preds = qml.numpy.array([
            0.5 * (qml.numpy.tanh(self.variational_classifier(weights, bias, x)) + 1)
            for x in X
        ])
        return qml.numpy.mean((qml.numpy.array(Y) - preds) ** 2)

    # def fit(self, X_train, y_train, num_epochs=50, learning_rate=0.1, batch_size=48):
    #     X = np.array(X_train, dtype=np.float64)
    #     Y = np.array(y_train)
    #     self.opt = qml.optimize.AdamOptimizer(learning_rate)

    #     for _ in range(num_epochs):
    #         idx = np.random.choice(len(X), min(batch_size, len(X)), replace=False)
    #         Xb, Yb = X[idx], Y[idx]
    #         self.weights, self.bias = self.opt.step(
    #             lambda ws, b: self.cost(ws, b, Xb, Yb),
    #             self.weights, self.bias
    #         )
    #     return self

    def fit(self, X_train, y_train, num_epochs=50, learning_rate=0.05, batch_size=48):
        X = np.array(X_train, dtype=np.float64)
        Y = np.array(y_train)

        self.opt = qml.optimize.AdamOptimizer(learning_rate)
        for _ in range(num_epochs):
            idx = np.random.choice(len(X), min(batch_size, len(X)), replace=False)
            X_batch, Y_batch = X[idx], Y[idx]
            self.weights, self.bias = self.opt.step(
                lambda ws, b: self.cost(ws, b, X_batch, Y_batch),
                self.weights, self.bias
            )
        return self

    def predict(self, X):
        X = np.array(X, dtype=np.float64)
        return np.array([
            1 if self.variational_classifier(self.weights, self.bias, x) > 0 else 0
            for x in X
        ])







In [61]:
# === Evaluation ===
def evaluate_model(model, df, features, label):
    X = df[[f"f{i}" for i in features]].values
    y = df["target"].values

    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)

    X_train, X_test, y_train, y_test = train_test_split(
        X_scaled, y, test_size=0.2, random_state=42
    )

    model.fit(X_train, y_train, num_epochs=10, learning_rate=0.1)
    y_pred_train = model.predict(X_train)
    y_pred_test = model.predict(X_test)

    acc_train = accuracy_score(y_train, y_pred_train)
    acc_test = accuracy_score(y_test, y_pred_test)
    print(f"{label:<20} | Train: {acc_train:.4f} | Test: {acc_test:.4f}")

In [62]:
# === Sanity Check ===
def run_sanity_check():
    print("\n🔍 Running sanity checks on staircase_dataset...\n")
    df = make_staircase_dataset()

    # extend signal coeffs [1.0, 0.8, 0.5] to match 6 features by zeroing noise
    base_coeffs = [1.0, 0.8, 0.5] + [0.0] * 3  # now length == 6

    feature_sets = [
        ([0],       "f0"),
        ([0, 1],    "f0 + f1"),
        ([0, 2],    "f0 + f2"),
        ([0, 3],    "f0 + f3 (noise)"),
        ([0, 4],    "f0 + f4 (noise)"),
        ([0, 5],    "f0 + f5 (noise)"),
        ([0, 1, 2], "f0 + f1 + f2"),
    ]

    print("=== Angle‐Encoded VQC ===")
    for features, label in feature_sets:
        coeffs = [base_coeffs[i] for i in features]
        model = FarhiAngleVQC(coeffs)
        evaluate_model(model, df, features, label)


if __name__ == "__main__":
    run_sanity_check()


🔍 Running sanity checks on staircase_dataset...

=== Angle‐Encoded VQC ===
f0                   | Train: 0.4975 | Test: 0.5300
f0 + f1              | Train: 0.4788 | Test: 0.5050
f0 + f2              | Train: 0.5050 | Test: 0.5650
f0 + f3 (noise)      | Train: 0.4875 | Test: 0.5350
f0 + f4 (noise)      | Train: 0.4875 | Test: 0.5350
f0 + f5 (noise)      | Train: 0.4875 | Test: 0.5350
f0 + f1 + f2         | Train: 0.5050 | Test: 0.5000


In [21]:
import numpy as np
import pennylane as qml

def test_state_preparation_single_feature():
    """Test that state_preparation applies RX(x/2) and flips the readout qubit."""
    x = np.array([np.pi], dtype=float)                # use a real NumPy array
    vqc = FarhiAngleVQC(coeffs=[1.0])

    dev = qml.device("default.qubit", wires=[0, 1])

    @qml.qnode(dev, interface="autograd")             # set the interface explicitly
    def circuit(x):
        vqc.state_preparation(x)
        return qml.state()

    state = circuit(x)

    # RX(pi/2) on qubit 0 → cos(pi/4)|0> - i sin(pi/4)|1>
    expected_feature = np.array([np.cos(np.pi/4), -1j * np.sin(np.pi/4)])
    # Readout qubit flipped to |1>
    expected = np.kron(expected_feature, np.array([0, 1]))

    assert np.allclose(state, expected, atol=1e-6), (
        f"Got {state}, but expected {expected}"
    )
    print("✅ test_state_preparation_single_feature passed.")

# Run them
test_state_preparation_single_feature()


✅ test_state_preparation_single_feature passed.


In [22]:
def test_relationship_unitary_pairwise():
    """Test that relationship_unitary is the identity on |000>."""
    vqc = FarhiAngleVQC(coeffs=[1.0, 0.5])
    dev = qml.device("default.qubit", wires=[0, 1, 2])

    @qml.qnode(dev, interface="autograd")
    def circuit():
        # starts in |000>, then apply the controlled-RZ block
        vqc.relationship_unitary()
        return qml.state()

    state = circuit()

    # Expect no change: |000> → amplitude 1 in index 0
    expected = np.zeros(2**3, dtype=complex)
    expected[0] = 1.0

    assert np.allclose(state, expected, atol=1e-6), (
        f"Got {state}, but expected {expected}"
    )
    print("✅ test_relationship_unitary_pairwise passed.")

test_relationship_unitary_pairwise()

✅ test_relationship_unitary_pairwise passed.


In [23]:
import numpy as np

def test_angle_learner_basis_states():
    """
    Verify that FarhiAngleVQC can perfectly classify two orthogonal
    one‐hot inputs under angle encoding.
    """
    # two one-hot inputs; after RX(x/2):
    # [1,0] → qubit0 = RX(0/2)|0> = |0>
    # [0,1] → qubit0 = RX(1/2)|0> = cos(.5/2)|0> − i sin(.5/2)|1> 
    # (not perfectly orthogonal, but distinct)
    # Instead use full π rotation for a clean test:
    X = np.array([
        [0.0,         np.pi],  # RX(0)=I, RX(π/2)=|1>
        [0.0,         0.0],    # RX(0)=I 
    ])
    y = np.array([1, 0])

    model = FarhiAngleVQC(coeffs=[1.0, 1.0])
    init_cost  = model.cost(model.weights, model.bias, X, y)
    model.fit(X, y, num_epochs=200, learning_rate=0.1, batch_size=2)
    final_cost = model.cost(model.weights, model.bias, X, y)
    preds     = model.predict(X)

    assert final_cost < init_cost, f"Cost did not decrease: {init_cost:.4f} → {final_cost:.4f}"
    assert np.array_equal(preds, y), f"Failed to classify basis states: got {preds}"
    print("✅ Angle‐encoded basis‐state learner test passed!")
    
test_angle_learner_basis_states()

✅ Angle‐encoded basis‐state learner test passed!


In [24]:
import numpy as np
import pennylane as qml

def test_angle_encoding_stateprep(x):
    """
    Check that state_preparation(x) applies RX(x[j]/2) on each qubit j
    and flips the readout qubit to |1>.
    """
    num_qubits = len(x)
    wires = list(range(num_qubits + 1))
    dev = qml.device("default.qubit", wires=wires)

    model = FarhiAngleVQC(coeffs=[1.0]*num_qubits)

    @qml.qnode(dev)
    def circuit(x):
        model.state_preparation(x)
        return qml.state()

    state = circuit(x)

    # Build the expected multi‐qubit state:
    # For j in 0..n-1: qubit j = cos(x[j]/4)|0> - i sin(x[j]/4)|1>
    # readout qubit = |1>, so full statevector has zeros on entries with last bit=0
    amp_per_feature = [
        np.array([np.cos(xj/4), -1j*np.sin(xj/4)]) for xj in x
    ]
    # take Kronecker product of all feature amps
    feat_state = amp_per_feature[0]
    for a in amp_per_feature[1:]:
        feat_state = np.kron(feat_state, a)
    # then tensor with readout |1> = [0,1]
    expected = np.kron(feat_state, np.array([0, 1]))

    assert np.allclose(state, expected, atol=1e-6), (
        f"\nGot:\n{state}\nExpected:\n{expected}"
    )
    print(f"✅ Angle encoding stateprep test passed for x={x}")

# examples
test_angle_encoding_stateprep(np.array([np.pi, np.pi/2]))
test_angle_encoding_stateprep(np.random.rand(3) * np.pi)


✅ Angle encoding stateprep test passed for x=[3.14159265 1.57079633]
✅ Angle encoding stateprep test passed for x=[1.33095081 2.0291362  1.37472077]


In [25]:
import numpy as np

def test_angle_learner_cost_decrease():
    """
    Verify that FarhiAngleVQC’s training actually reduces the cost
    on a simple two‐sample angle‐encoded dataset.
    """
    # Choose two inputs that map to distinct rotations on qubit 1:
    # x = [0, 2π] → RX(0), RX(π)  ⇒ qubit1 flips to |1>
    X = np.array([
        [0.0, 2 * np.pi],
        [0.0, 0.0]
    ])
    # Labels: sample 0 → 1, sample 1 → 0
    y = np.array([1, 0])

    model = FarhiAngleVQC(coeffs=[1.0, 1.0])

    # compute initial cost
    init_cost = model.cost(model.weights, model.bias, X, y)

    # train
    model.fit(X, y, num_epochs=100, learning_rate=0.1, batch_size=2)

    # compute final cost
    final_cost = model.cost(model.weights, model.bias, X, y)

    assert final_cost < init_cost, (
        f"Cost did not decrease: {init_cost:.4f} → {final_cost:.4f}"
    )
    print("✅ Angle‐encoded learner cost‐decrease test passed!")

# Run the test
test_angle_learner_cost_decrease()


✅ Angle‐encoded learner cost‐decrease test passed!


In [35]:
import numpy as np

def test_cost_nonnegative_and_scalar():
    """
    The cost() method should return a single real >= 0.
    """
    # Toy 2‐sample, 2‐feature dataset
    X = np.array([[0.0, 1.0], [1.0, 0.0]])
    y = np.array([0, 1])

    model = FarhiAngleVQC(coeffs=[1.0, 1.0])
    c = model.cost(model.weights, model.bias, X, y)

    # It must be a scalar
    assert np.ndim(c) == 0, f"Cost must be a scalar, got shape {np.shape(c)}"
    # It must be nonnegative
    assert c >= 0, f"Cost must be >= 0, got {c}"
    # It shouldn’t be absurdly large for this toy problem
    assert c <= 1, f"Cost should be <= 1, got {c}"

    print("✅ test_cost_nonnegative_and_scalar passed.")

test_cost_nonnegative_and_scalar()


✅ test_cost_nonnegative_and_scalar passed.


In [36]:
# 2) Test that fit() reduces cost on a simple dataset
def test_fit_reduces_cost():
    """
    On a toy binary dataset with orthogonal angle encodings, fit() should reduce cost.
    """
    # encode two orthogonal points: [0, 0] vs [0, 2π]
    X = np.array([[0.0, 2 * np.pi], [0.0, 0.0]])
    y = np.array([1, 0])
    model = FarhiAngleVQC(coeffs=[1.0, 1.0])

    # initial cost
    init_cost = model.cost(model.weights, model.bias, X, y)

    # train
    model.fit(X, y, num_epochs=50, learning_rate=0.1, batch_size=2)

    # final cost
    final_cost = model.cost(model.weights, model.bias, X, y)
    assert final_cost < init_cost, (
        f"Cost did not decrease: {init_cost:.4f} → {final_cost:.4f}"
    )
    print("✅ test_fit_reduces_cost passed.")

test_fit_reduces_cost()


✅ test_fit_reduces_cost passed.


In [39]:
def test_predict():
    """
    After training on the toy dataset, predict() should return a 0/1 array
    of the correct shape and achieve at least random‐chance accuracy.
    """
    X = np.array([[0.0, 2 * np.pi], [0.0, 0.0]])
    y = np.array([1, 0])
    model = FarhiAngleVQC(coeffs=[1.0, 1.0])

    # train a bit longer to give the optimizer a chance
    model.fit(X, y, num_epochs=200, learning_rate=0.1, batch_size=2)

    preds = model.predict(X)
    # shape and binary values
    assert preds.shape == y.shape, f"Expected shape {y.shape}, got {preds.shape}"
    assert set(np.unique(preds)).issubset({0, 1}), f"Predictions not in {{0,1}}: {preds}"

    # require at least random‐chance accuracy (>=50%)
    acc = (preds == y).mean()
    assert acc >= 0.5, f"Accuracy {acc:.2f} below chance on toy data"
    print("✅ test_predict passed.")

test_predict()


✅ test_predict passed.
