In [None]:
import pennylane as qml
from pennylane import numpy as np
import numpy as onp
from functools import partial

        
class QFM():
    def __init__(self, input_samples, output_samples, n_ancilla=3, num_time_steps=3, depth_per_time_step=3):
        """Quantum Flow Model (QFM) for learning time evolution.

        Args:
            input_samples (array[float]): Array of shape (num_samples, dim_Hilbert_space) with initial states.
            output_samples (array[float]): Array of shape (num_samples, dim_Hilbert_space) with target states.
            num_time_steps (int): Number of time steps to model.
            depth_per_time_step (int): Depth of the PQC for each time step.
        """
        
        self.input_samples = input_samples
        self.output_samples = output_samples
        self.num_time_steps = num_time_steps
        self.depth_per_time_step = depth_per_time_step
        
        assert input_samples.shape[0] == output_samples.shape[0], "Input and output samples must have the same number of samples."
        assert input_samples.shape[1] == output_samples.shape[1], "Input and output samples must have the same number of features."

        self.n_input = int(np.log2(input_samples.shape[1]))
        self.n_output = int(np.log2(input_samples.shape[1]))

        self.n_ancilla = n_ancilla
        
        self.num_wires = 2 * self.n_input + self.n_ancilla + 1 # add ancilla qubits to approximate non-unitary evolution
        
        
        # Define wire indices
        self.wires_sys = range(self.n_input)
        self.wires_anc = range(self.n_input, self.n_input + self.n_ancilla)
        self.wires_all = range(self.n_input + self.n_ancilla)
        self.wires_target = range(self.n_input + self.n_ancilla, self.num_wires-1)
        self.swap_test_wires = [self.num_wires-1]  # Ancilla wire for swap test
        
        # Initialize parameters for the PQC
        self.parameters = np.random.normal(-0.1, 0.1, 
                                            (num_time_steps, depth_per_time_step, self.num_wires, 3), 
                                            requires_grad=True)
        
        # Define the quantum device
        self.dev = qml.device("lightning.qubit", wires=self.num_wires) #+n_input + 1  for swap-test
        
        # Define the quantum node
        @qml.qnode(self.dev, interface='autograd',  diff_method='adjoint')
        def circuit(params, t, x, y):
            '''
            Quantum circuit for a single time step.
            Args:
                params (array[float]): Parameters for the PQC.
                t (int): Current time step index.
                x (array[float]): Input state vector.
                y (array[float]): Target state vector.
            Returns:
                float: Expectation value from the swap test.
            '''
            qml.templates.MottonenStatePreparation(x, wires=self.wires_sys)
            qml.templates.MottonenStatePreparation(y, wires=self.wires_target) 
            self.PQC(params, time_step_index=t, depth=self.depth_per_time_step)
            
            # Swap test
            qml.Hadamard(wires=self.num_wires-1)
            for i in range(self.n_input):
                qml.CSWAP([self.num_wires-1, self.wires_sys[i], self.wires_target[i]])
            qml.Hadamard(wires=self.num_wires-1)
            
            return qml.expval(qml.PauliZ(self.num_wires-1)) 
        self.circuit_per_time_step = circuit
        
        
    def PQC(self, parameters, time_step_index, depth=3):
        """Parameterized Quantum Circuit (PQC) for time evolution.

        Args:
            parameters (array[float]): Array of shape (depth, len(wires), 2) containing rotation angles.
            wires (list[int]): List of wire indices to apply the circuit on.
            time_step_index (int): Index of the current time step.
            depth (int): Number of layers in the PQC.
        """
        wires = range(self.n_input + self.n_ancilla)
        
        
        for depth_index in range(depth):
            for wire in wires:
                qml.RX(parameters[time_step_index, depth_index, wire, 0], wires=wire)
                qml.RY(parameters[time_step_index, depth_index, wire, 1], wires=wire)
                qml.RZ(parameters[time_step_index, depth_index, wire, 2], wires=wire)
            for i in range(1, len(wires)):
                qml.CNOT(wires=[wires[i], wires[i - 1]])
            qml.CNOT(wires=[wires[0], wires[-1]])
    
    def Interpolate(self, input, output, t_array):
        '''Limear interpolation between input and output states.
        Args:
            input (array[float]): Array of shape (num_samples, num_features) with initial states.
            output (array[float]): Array of shape (num_samples, num_features) with target states.
            t_array (array[float]): Array of time steps between 0 and 1.
        Returns:
            array[float]: Array of shape (len(t_array), num_samples, num_features) with interpolated states.
        '''
        targets = np.zeros((len(t_array), input.shape[0], input.shape[1]))
        
        for i in range(len(t_array)):
            targets[i, :, :] = (1 - t_array[i]) * input + t_array[i] * output
        return targets
        
    
    def cost(self, params, t_index, x_batch, y_batch):
        loss = 0
        for x, y in zip(x_batch, y_batch):
            loss += 2 * self.circuit_per_time_step(params, t_index, x, y)
        loss /= len(x_batch)
        return loss
    
    def fit(self, epochs=20, batch_size=5, learning_rate=0.01):
        """Train the QFM using the provided input and output samples.

        Args:
            epochs (int): Number of training epochs.
            batch_size (int): Size of each training batch.
            learning_rate (float): Learning rate for the optimizer.
        """
        opt = qml.QNGOptimizer() 
        #qml.AdamOptimizer(stepsize=learning_rate)
        
        # iterpolate targets for each time step
        num_samples = self.input_samples.shape[0]
        targets = self.Interpolate(self.input_samples, self.output_samples, np.linspace(0, 1, self.num_time_steps))
        
        # initialize input
        input = self.input_samples
        
        # Define cost function for optimization

        
        for t_index in range(self.num_time_steps):
            print(f"Training time step {t_index+1}/{self.num_time_steps}")
            target = targets[t_index]
            for epoch in range(epochs):
                if epoch % 10 == 0 or epoch == epochs - 1:
                    print(f" Epoch {epoch+1}/{epochs}")
                # Shuffle the data
                indices = np.random.permutation(num_samples)
                input_shuffled = input[indices]
                target_shuffled = target[indices]
                
                for start in range(0, num_samples, batch_size):
                    end = start + batch_size
                    x_batch = input_shuffled[start:end]
                    y_batch = target_shuffled[start:end]
                    
                    if len(x_batch) == 0:
                        continue  # skip empty batch
                    
                    x = np.sum(x_batch, axis=0) / len(x_batch)
                    y = np.sum(y_batch, axis=0) / len(y_batch)

                    # cost_fn = partial(self.circuit_per_time_step, t=t_index, x=x, y=y)
                    @qml.qnode(self.dev, interface='autograd', diff_method='adjoint')
                    def cost(params):
                        return self.circuit_per_time_step(params, t_index, x, y)
                    self.parameters = opt.step(cost, self.parameters)
            
            loss = 0 
            for x, y in zip(x_batch, y_batch):
                loss += 2 * self.circuit_per_time_step(self.parameters, t_index, x, y)
                loss /= len(x_batch)
            print(f" Cost: {loss:.6f} for time step {t_index+1}" )



In [30]:
import pennylane as qml
from pennylane import numpy as np
import numpy as onp

# ====== 构造测试数据 ====== 
# 两个 2-qubit 的态：|00> 和 |11>
num_samples = 10
n_features = 2**2  # 4

# 输入态：随机扰动的 |00>
input_states = []
for _ in range(num_samples):
    state = np.array([1, 0, 0, 0], dtype=float) + 0.01 * np.random.randn(4)
    state = state / np.linalg.norm(state)
    input_states.append(state)

# 输出态：随机扰动的 |11>
output_states = []
for _ in range(num_samples):
    state = np.array([0, 0, 0, 1], dtype=float) + 0.01 * np.random.randn(4)
    state = state / np.linalg.norm(state)
    output_states.append(state)

input_states = np.stack(input_states)
output_states = np.stack(output_states)

print(input_states)
print("Input shape:", input_states.shape)
print("Output shape:", output_states.shape)

# ====== 实例化并训练 ====== 
model = QFM(
    input_samples=input_states,
    output_samples=output_states,
    n_ancilla=2,
    num_time_steps=2,
    depth_per_time_step=2
)

model.fit(epochs=5, batch_size=2, learning_rate=0.05)

# ====== 测试推理 ====== 
x_test = input_states[0]
y_test = output_states[0]
print("\nTest swap test value:", model.circuit_per_time_step(model.parameters, 0, x_test, y_test))


[[ 9.99701414e-01  1.68508498e-03  9.27651047e-03 -2.25430609e-02]
 [ 9.99839067e-01 -1.42449324e-02  2.66155318e-03 -1.05753882e-02]
 [ 9.99499579e-01 -9.60897298e-03  2.26633255e-02 -1.98653657e-02]
 [ 9.99877204e-01  6.08175052e-04  1.06739075e-02 -1.14575246e-02]
 [ 9.99884797e-01 -1.43437922e-02 -3.11348614e-03  3.86715237e-03]
 [ 9.99950979e-01 -9.40603029e-03  2.28049482e-03 -2.08953425e-03]
 [ 9.99875047e-01  1.30469008e-02 -3.76482766e-03 -8.09290851e-03]
 [ 9.99704673e-01 -1.70564107e-02 -1.33276687e-03 -1.72588823e-02]
 [ 9.99990706e-01 -3.85907695e-03  1.81676648e-03  6.29100215e-04]
 [ 9.99914430e-01 -1.02105215e-02 -8.06203035e-03 -1.37203801e-03]]
Input shape: (10, 4)
Output shape: (10, 4)
Training time step 1/2
 Epoch 1/5


TypeError: iteration over a 0-d array