In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.distributions import Categorical, Beta
from scipy.linalg import expm

# ---------------------------
# Helper Functions (as provided)
# ---------------------------

def coupling_operator_with_phase(i, j, dim, phi):
    op = np.zeros((dim, dim), dtype=complex)
    op[i, j] = np.exp(+1j * phi)
    op[j, i] = np.exp(-1j * phi)
    return op

def pulse_duration_for_fraction(f, Omega):
    theta = np.pi * np.array(f)
    return theta / Omega if Omega != 0 else 0.0

def unitary(couplings, rabi_freqs, fractions, fixed_phase_flags, dim):
    U_seq = np.eye(dim, dtype=complex)
    for (levels, Omega, frac, fix_pflag) in zip(couplings, rabi_freqs, fractions, fixed_phase_flags):
        i, j = levels
        phi_fixed = fix_pflag * np.pi
        total_phase = phi_fixed
        H_op = coupling_operator_with_phase(i, j, dim, total_phase)
        H_coupling = 0.5 * Omega * H_op
        t_pulse = pulse_duration_for_fraction(frac, Omega)
        U_pulse = expm(-1j * H_coupling * t_pulse)
        U_seq = U_pulse @ U_seq
    return U_seq

def fix_couplings_and_phases(couplings, fixed_phase_flags):
    new_couplings = []
    new_fixed_phase_flags = []
    for (cpl, phase_flag) in zip(couplings, fixed_phase_flags):
        i, j = cpl
        if i != 0 and j == 0:
            cpl_fixed = (0, i)
            phase_flag_fixed = phase_flag + 1.0
        else:
            cpl_fixed = cpl
            phase_flag_fixed = phase_flag
        new_couplings.append(cpl_fixed)
        new_fixed_phase_flags.append(phase_flag_fixed)
    return new_couplings, new_fixed_phase_flags

def compute_fidelity(U_target, U_pred, dim):
    # Fidelity measure using the normalized absolute trace overlap.
    fid = np.abs(np.trace(np.conjugate(U_target.T) @ U_pred)) / dim
    return fid

# ---------------------------
# Problem Setup Parameters
# ---------------------------

dim = 5                  # Hilbert space dimension
sequence_length = 7      # Number of pulses in the sequence
max_fraction = 1.5       # Maximum value for the 'fraction' parameter
# For the RL agent, we fix rabi frequencies to 1
rabi_freqs = [1] * sequence_length

# We define a list of allowed couplings (discrete choices) for the policy.
allowed_couplings = [(0,1), (0,2), (0,3), (0,4), (1,0), (2,0), (3,0), (4,0)]
num_allowed_couplings = len(allowed_couplings)

# ---------------------------
# Dataset Generation Functions
# ---------------------------
def generate_random_sequence():
    """Generates a random pulse sequence using uniform sampling.
       Returns:
          couplings: list of tuples (each chosen from allowed_couplings)
          fractions: list of continuous parameters in [0, max_fraction]
          fixed_phase_flags: list of binary values (0 or 1)
    """
    couplings = []
    fractions = []
    fixed_phase_flags = []
    for _ in range(sequence_length):
        # Choose a random allowed coupling (an index into allowed_couplings)
        idx = np.random.randint(0, num_allowed_couplings)
        couplings.append(allowed_couplings[idx])
        # Sample a random fraction uniformly
        frac = np.random.uniform(0.0, max_fraction)
        fractions.append(frac)
        # Sample a binary fixed-phase flag
        fix_flag = np.random.randint(0, 2)
        fixed_phase_flags.append(fix_flag)
        
    # Apply the fix function (if applicable)
    couplings, fixed_phase_flags = fix_couplings_and_phases(couplings, fixed_phase_flags)
    return couplings, fractions, fixed_phase_flags

def generate_random_target_unitary():
    """Generates a target unitary by sampling a random pulse sequence.
       Returns:
          U_target (np.ndarray): the resulting unitary matrix.
          parameters: the pulse parameters used (for reference).
    """
    couplings, fractions, fixed_phase_flags = generate_random_sequence()
    U_target = unitary(couplings, rabi_freqs, fractions, fixed_phase_flags, dim)
    return U_target, (couplings, fractions, fixed_phase_flags)

# ---------------------------
# Policy Network Definition
# ---------------------------
class PolicyNetwork(nn.Module):
    def __init__(self, input_dim, hidden_dim, sequence_length, num_couplings):
        """
        input_dim: dimension of flattened target unitary (real and imaginary parts concatenated)
        hidden_dim: hidden layer size
        sequence_length: number of pulses (time steps)
        num_couplings: number of allowed coupling choices.
        """
        super(PolicyNetwork, self).__init__()
        self.sequence_length = sequence_length
        self.num_couplings = num_couplings
        
        # A simple MLP that outputs parameters for each step.
        # Total output per time step: coupling logits (num_couplings) + fixed-phase logits (2) + fraction parameters (2 for Beta: raw_alpha, raw_beta)
        self.output_per_step = num_couplings + 2 + 2  
        total_output_dim = sequence_length * self.output_per_step
        
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, total_output_dim)
        self.relu = nn.ReLU()
        # For numerical stability in Beta parameters
        self.softplus = nn.Softplus()
        
    def forward(self, x):
        """
        x: tensor of shape (batch_size, input_dim)
        Returns for each sample in batch: a list of actions per step and the sum of their log probabilities.
        Each action is a tuple: (coupling_index, fraction, fixed_phase_flag)
        """
        batch_size = x.size(0)
        out = self.relu(self.fc1(x))
        out = self.fc2(out)
        # Reshape output: (batch_size, sequence_length, output_per_step)
        out = out.view(batch_size, self.sequence_length, self.output_per_step)
        
        actions_batch = []  # list with length batch_size; each element is a list (length sequence_length) of actions
        log_probs = []
        
        for b in range(batch_size):
            actions = []
            log_prob_sum = 0
            # Process each time step
            for t in range(self.sequence_length):
                step_out = out[b, t]
                # Split the outputs for the step:
                coupling_logits = step_out[0:self.num_couplings]            # shape: (num_couplings,)
                fixed_logits    = step_out[self.num_couplings:self.num_couplings+2]  # binary decision logits
                fraction_raw    = step_out[self.num_couplings+2:self.num_couplings+4]  # for Beta parameters
                
                # Sample coupling from a categorical distribution.
                coupling_dist = Categorical(logits=coupling_logits)
                coupling_index = coupling_dist.sample()
                log_prob_sum += coupling_dist.log_prob(coupling_index)
                
                # Sample fixed-phase flag from binary categorical distribution.
                fixed_dist = Categorical(logits=fixed_logits)
                fixed_phase_flag = fixed_dist.sample()
                log_prob_sum += fixed_dist.log_prob(fixed_phase_flag)
                
                # For fraction, we use a Beta distribution. To ensure positive concentration parameters, use softplus.
                # Add 1 to avoid too-small parameters.
                alpha = self.softplus(fraction_raw[0]) + 1.0
                beta_param = self.softplus(fraction_raw[1]) + 1.0
                fraction_dist = Beta(alpha, beta_param)
                fraction_sample = fraction_dist.rsample()  # using rsample for a reparameterized sample if needed
                # Scale the sample to our desired range [0, max_fraction]
                fraction = fraction_sample * max_fraction
                log_prob_sum += fraction_dist.log_prob(fraction_sample)
                
                # Append the actions (convert tensor values to Python numbers)
                actions.append({
                    'coupling': int(coupling_index.item()),
                    'fraction': fraction.item(),
                    'fixed_phase_flag': int(fixed_phase_flag.item())
                })
            actions_batch.append(actions)
            log_probs.append(log_prob_sum)
        
        # Return tensors: here we assume batch_size=1 for simplicity; otherwise further batching is needed.
        return actions_batch, torch.stack(log_probs)

# ---------------------------
# Helper: Convert Target Unitary to Network Input
# ---------------------------
def target_unitary_to_tensor(U):
    """
    Convert U (a complex matrix) into a real tensor that concatenates the flattened real and imaginary parts.
    """
    U_real = np.real(U).flatten()
    U_imag = np.imag(U).flatten()
    U_concat = np.concatenate([U_real, U_imag])
    return torch.tensor(U_concat, dtype=torch.float32)

# ---------------------------
# Training Loop (REINFORCE)
# ---------------------------
def train_policy(num_episodes=1000, lr=1e-3, print_every=100):
    # The input dimension is: dim*dim*2 (real & imag parts)
    input_dim = dim * dim * 2  
    hidden_dim = 128
    policy_net = PolicyNetwork(input_dim, hidden_dim, sequence_length, num_allowed_couplings)
    optimizer = optim.Adam(policy_net.parameters(), lr=lr)
    
    # For a simple baseline, we use a running average reward.
    baseline = None
    reward_history = []
    
    for episode in range(num_episodes):
        # Generate a target unitary (the "desired" evolution)
        U_target, true_params = generate_random_target_unitary()
        # Convert target unitary to network input format.
        target_tensor = target_unitary_to_tensor(U_target).unsqueeze(0)  # shape: (1, input_dim)
        
        # Forward pass: sample a candidate pulse sequence.
        actions_batch, log_prob = policy_net(target_tensor)
        # We assume a single sample (batch_size=1).
        actions = actions_batch[0]
        
        # Build candidate sequence parameters from actions:
        # For couplings, use the selected coupling from allowed_couplings.
        candidate_couplings = []
        candidate_fractions = []
        candidate_fixed_flags = []
        for action in actions:
            # Get the coupling pair from the allowed list using the sampled index.
            candidate_couplings.append(allowed_couplings[action['coupling']])
            candidate_fractions.append(action['fraction'])
            candidate_fixed_flags.append(action['fixed_phase_flag'])
        
        # Optionally, fix the couplings and phases (as in your helper function)
        candidate_couplings, candidate_fixed_flags = fix_couplings_and_phases(candidate_couplings, candidate_fixed_flags)
        U_pred = unitary(candidate_couplings, rabi_freqs, candidate_fractions, candidate_fixed_flags, dim)
        
        # Compute reward based on fidelity.
        reward = compute_fidelity(U_target, U_pred, dim)
        reward_history.append(reward)
        
        # Update running baseline (simple average)
        if baseline is None:
            baseline = reward
        else:
            baseline = 0.99 * baseline + 0.01 * reward
        
        # REINFORCE loss: we want to maximize reward, so minimize -log_prob * (reward - baseline)
        loss = -log_prob * (reward - baseline)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        if (episode + 1) % print_every == 0:
            avg_reward = np.mean(reward_history[-print_every:])
            print(f"Episode {episode+1}, Loss: {loss.item():.4f}, Reward: {reward:.4f}, Avg Reward: {avg_reward:.4f}")
    
    return policy_net

# ---------------------------
# Testing the Trained Policy
# ---------------------------
def test_policy(policy_net, num_tests=10):
    test_rewards = []
    for i in range(num_tests):
        U_target, true_params = generate_random_target_unitary()
        target_tensor = target_unitary_to_tensor(U_target).unsqueeze(0)
        actions_batch, _ = policy_net(target_tensor)
        actions = actions_batch[0]
        candidate_couplings = []
        candidate_fractions = []
        candidate_fixed_flags = []
        for action in actions:
            candidate_couplings.append(allowed_couplings[action['coupling']])
            candidate_fractions.append(action['fraction'])
            candidate_fixed_flags.append(action['fixed_phase_flag'])
        candidate_couplings, candidate_fixed_flags = fix_couplings_and_phases(candidate_couplings, candidate_fixed_flags)
        U_pred = unitary(candidate_couplings, rabi_freqs, candidate_fractions, candidate_fixed_flags, dim)
        reward = compute_fidelity(U_target, U_pred, dim)
        test_rewards.append(reward)
        print(f"Test {i+1}: Fidelity = {reward:.4f}")
    print(f"Average test fidelity: {np.mean(test_rewards):.4f}")

# ---------------------------
# Main Execution
# ---------------------------
if __name__ == '__main__':
    # Train the policy network using our RL approach.
    policy_net = train_policy(num_episodes=1000, lr=1e-3, print_every=100)
    # Evaluate on a test set.
    test_policy(policy_net, num_tests=10)


In [5]:
pip install torch

Note: you may need to restart the kernel to use updated packages.


In [None]:
pip uninstall torch

In [1]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.distributions import Categorical, Beta
from scipy.linalg import expm

# ---------------------------
# Helper Functions (as provided)
# ---------------------------

def coupling_operator_with_phase(i, j, dim, phi):
    op = np.zeros((dim, dim), dtype=complex)
    op[i, j] = np.exp(+1j * phi)
    op[j, i] = np.exp(-1j * phi)
    return op

def pulse_duration_for_fraction(f, Omega):
    theta = np.pi * np.array(f)
    return theta / Omega if Omega != 0 else 0.0

def unitary(couplings, rabi_freqs, fractions, fixed_phase_flags, dim):
    U_seq = np.eye(dim, dtype=complex)
    for (levels, Omega, frac, fix_pflag) in zip(couplings, rabi_freqs, fractions, fixed_phase_flags):
        i, j = levels
        phi_fixed = fix_pflag * np.pi
        total_phase = phi_fixed
        H_op = coupling_operator_with_phase(i, j, dim, total_phase)
        H_coupling = 0.5 * Omega * H_op
        t_pulse = pulse_duration_for_fraction(frac, Omega)
        U_pulse = expm(-1j * H_coupling * t_pulse)
        U_seq = U_pulse @ U_seq
    return U_seq

def fix_couplings_and_phases(couplings, fixed_phase_flags):
    new_couplings = []
    new_fixed_phase_flags = []
    for (cpl, phase_flag) in zip(couplings, fixed_phase_flags):
        i, j = cpl
        if i != 0 and j == 0:
            cpl_fixed = (0, i)
            phase_flag_fixed = phase_flag + 1.0
        else:
            cpl_fixed = cpl
            phase_flag_fixed = phase_flag
        new_couplings.append(cpl_fixed)
        new_fixed_phase_flags.append(phase_flag_fixed)
    return new_couplings, new_fixed_phase_flags

def compute_fidelity(U_target, U_pred, dim):
    # Fidelity measure using the normalized absolute trace overlap.
    fid = np.abs(np.trace(np.conjugate(U_target.T) @ U_pred)) / dim
    return fid

# ---------------------------
# Problem Setup Parameters
# ---------------------------

dim = 5                  # Hilbert space dimension
sequence_length = 7      # Number of pulses in the sequence
max_fraction = 2    # Maximum value for the 'fraction' parameter
# For the RL agent, we fix rabi frequencies to 1 for every pulse.
rabi_freqs = [1] * sequence_length

# Define a list of allowed couplings (discrete choices) for the policy.
allowed_couplings = [(0, 1), (0, 2), (0, 3), (0, 4)]
num_allowed_couplings = len(allowed_couplings)

# ---------------------------
# Dataset Generation Functions
# ---------------------------
def generate_random_sequence():
    """Generates a random pulse sequence using uniform sampling.
       Returns:
          couplings: list of tuples (each chosen from allowed_couplings)
          fractions: list of continuous parameters in [0, max_fraction]
          fixed_phase_flags: list of binary values (0 or 1)
    """
    couplings = []
    fractions = []
    fixed_phase_flags = []
    for _ in range(sequence_length):
        # Randomly choose a coupling from the allowed list.
        idx = np.random.randint(0, num_allowed_couplings)
        couplings.append(allowed_couplings[idx])
        # Sample a random fraction uniformly.
        frac = np.random.uniform(0.0, max_fraction)
        fractions.append(frac)
        # Sample a binary fixed-phase flag.
        fix_flag = np.random.randint(0, 2)
        fixed_phase_flags.append(fix_flag)
        
    # Apply the fix function (if applicable)
    couplings, fixed_phase_flags = fix_couplings_and_phases(couplings, fixed_phase_flags)
    return couplings, fractions, fixed_phase_flags

def generate_random_target_unitary():
    """Generates a target unitary by sampling a random pulse sequence.
       Returns:
          U_target (np.ndarray): the resulting unitary matrix.
          parameters: the pulse parameters used (for reference).
    """
    couplings, fractions, fixed_phase_flags = generate_random_sequence()
    U_target = unitary(couplings, rabi_freqs, fractions, fixed_phase_flags, dim)
    return U_target, (couplings, fractions, fixed_phase_flags)

# ---------------------------
# LSTM-Based Policy Network Definition
# ---------------------------
class PolicyNetworkLSTM(nn.Module):
    def __init__(self, input_dim, hidden_dim, sequence_length, num_couplings):
        """
        input_dim: Dimension of the flattened target unitary (real and imaginary parts concatenated)
        hidden_dim: Hidden layer size for both the encoder and LSTM
        sequence_length: Number of pulses (time steps)
        num_couplings: Number of allowed coupling choices.
        """
        super(PolicyNetworkLSTM, self).__init__()
        self.sequence_length = sequence_length
        self.num_couplings = num_couplings
        # Each time step will output:
        # - Coupling logits (num_couplings)
        # - Fixed-phase logits (2)
        # - Fraction parameters (2 for Beta: raw_alpha, raw_beta)
        self.output_per_step = num_couplings + 2 + 2

        # Encoder: Encode the target unitary into a hidden representation.
        self.encoder = nn.Linear(input_dim, hidden_dim)
        self.relu = nn.ReLU()

        # LSTM decoder: receives a learned start token at each time step.
        self.lstm = nn.LSTM(input_size=hidden_dim, hidden_size=hidden_dim, batch_first=True)
        # Learned start token; shape: (1, hidden_dim) which will be repeated.
        self.start_token = nn.Parameter(torch.zeros(1, hidden_dim))
        
        # Decoder to produce output for each time step from the LSTM hidden state.
        self.decoder = nn.Linear(hidden_dim, self.output_per_step)
        self.softplus = nn.Softplus()
    
    def forward(self, x):
        """
        x: Tensor of shape (batch_size, input_dim) representing the target unitary.
        Returns for each sample in the batch: a list of actions per time step and the sum of their log probabilities.
        Each action is a dictionary with keys: 'coupling', 'fraction', and 'fixed_phase_flag'.
        """
        batch_size = x.size(0)
        # Encode the target unitary.
        encoded = self.relu(self.encoder(x))  # shape: (batch_size, hidden_dim)
        # Use the encoded representation as the initial hidden state.
        h0 = encoded.unsqueeze(0)  # shape: (1, batch_size, hidden_dim)
        c0 = torch.zeros_like(h0)  # initialize cell state to zeros
        
        # Prepare LSTM input: we use the same start token at each time step.
        # Create tokens with shape: (batch_size, sequence_length, hidden_dim)
        tokens = self.start_token.expand(batch_size, self.sequence_length, -1)
        
        # Generate the sequence from the LSTM.
        lstm_out, _ = self.lstm(tokens, (h0, c0))  # shape: (batch_size, sequence_length, hidden_dim)
        # Project LSTM outputs to the desired output shape.
        decoded = self.decoder(lstm_out)  # shape: (batch_size, sequence_length, output_per_step)
        
        actions_batch = []  # List to hold actions per sample in the batch.
        log_probs = []      # List for the sum of log probabilities for each sample.
        
        for b in range(batch_size):
            actions = []
            log_prob_sum = 0
            # Process each time step.
            for t in range(self.sequence_length):
                step_out = decoded[b, t]
                # Split outputs into parts.
                coupling_logits = step_out[0:self.num_couplings]  # Discrete coupling selection.
                fixed_logits = step_out[self.num_couplings:self.num_couplings+2]  # Fixed-phase binary decision.
                fraction_raw = step_out[self.num_couplings+2:self.num_couplings+4]  # For Beta distribution parameters.
                
                # Sample from the coupling distribution.
                coupling_dist = Categorical(logits=coupling_logits)
                coupling_index = coupling_dist.sample()
                log_prob_sum = log_prob_sum + coupling_dist.log_prob(coupling_index)
                
                # Sample from the fixed-phase distribution.
                fixed_dist = Categorical(logits=fixed_logits)
                fixed_phase_flag = fixed_dist.sample()
                log_prob_sum = log_prob_sum + fixed_dist.log_prob(fixed_phase_flag)
                
                # Sample the fraction from a Beta distribution.
                alpha = self.softplus(fraction_raw[0]) + 1.0
                beta_param = self.softplus(fraction_raw[1]) + 1.0
                fraction_dist = Beta(alpha, beta_param)
                fraction_sample = fraction_dist.rsample()  # Use rsample for reparameterization.
                # Scale to the desired range [0, max_fraction].
                fraction = fraction_sample * max_fraction
                log_prob_sum = log_prob_sum + fraction_dist.log_prob(fraction_sample)
                
                # Append the action.
                actions.append({
                    'coupling': int(coupling_index.item()),
                    'fraction': fraction.item(),
                    'fixed_phase_flag': int(fixed_phase_flag.item())
                })
            actions_batch.append(actions)
            log_probs.append(log_prob_sum)
        
        return actions_batch, torch.stack(log_probs)

# ---------------------------
# Helper: Convert Target Unitary to Network Input
# ---------------------------
def target_unitary_to_tensor(U):
    """
    Convert U (a complex matrix) into a real tensor that concatenates the flattened real and imaginary parts.
    """
    U_real = np.real(U).flatten()
    U_imag = np.imag(U).flatten()
    U_concat = np.concatenate([U_real, U_imag])
    return torch.tensor(U_concat, dtype=torch.float32)

# ---------------------------
# Training Loop (REINFORCE)
# ---------------------------
def train_policy(num_episodes=1000, lr=1e-3, print_every=100):
    # Input dimension: dim*dim*2 (for real and imaginary parts).
    input_dim = dim * dim * 2
    hidden_dim = 128
    policy_net = PolicyNetworkLSTM(input_dim, hidden_dim, sequence_length, num_allowed_couplings)
    optimizer = optim.Adam(policy_net.parameters(), lr=lr)
    
    # Running baseline for variance reduction.
    baseline = None
    reward_history = []
    
    for episode in range(num_episodes):
        # Generate a target unitary.
        U_target, true_params = generate_random_target_unitary()
        target_tensor = target_unitary_to_tensor(U_target).unsqueeze(0)  # shape: (1, input_dim)
        
        # Forward pass: sample a candidate pulse sequence.
        actions_batch, log_prob = policy_net(target_tensor)
        actions = actions_batch[0]  # Assume batch_size=1.
        
        # Build candidate sequence from actions.
        candidate_couplings = []
        candidate_fractions = []
        candidate_fixed_flags = []
        for action in actions:
            candidate_couplings.append(allowed_couplings[action['coupling']])
            candidate_fractions.append(action['fraction'])
            candidate_fixed_flags.append(action['fixed_phase_flag'])
        
        # Optionally apply the fix on couplings/phases.
        candidate_couplings, candidate_fixed_flags = fix_couplings_and_phases(candidate_couplings, candidate_fixed_flags)
        U_pred = unitary(candidate_couplings, rabi_freqs, candidate_fractions, candidate_fixed_flags, dim)
        
        # Compute reward based on fidelity.
        reward = compute_fidelity(U_target, U_pred, dim)
        reward_history.append(reward)
        
        # Update running baseline.
        if baseline is None:
            baseline = reward
        else:
            baseline = 0.99 * baseline + 0.01 * reward
        
        # REINFORCE loss: maximize reward by minimizing -log_prob * (reward - baseline)
        loss = -log_prob * (reward - baseline)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        if (episode + 1) % print_every == 0:
            avg_reward = np.mean(reward_history[-print_every:])
            print(f"Episode {episode+1}, Loss: {loss.item():.4f}, Reward: {reward:.4f}, Avg Reward: {avg_reward:.4f}")
    
    return policy_net

# ---------------------------
# Testing the Trained Policy
# ---------------------------
def test_policy(policy_net, num_tests=10):
    test_rewards = []
    for i in range(num_tests):
        U_target, true_params = generate_random_target_unitary()
        target_tensor = target_unitary_to_tensor(U_target).unsqueeze(0)
        actions_batch, _ = policy_net(target_tensor)
        actions = actions_batch[0]
        candidate_couplings = []
        candidate_fractions = []
        candidate_fixed_flags = []
        for action in actions:
            candidate_couplings.append(allowed_couplings[action['coupling']])
            candidate_fractions.append(action['fraction'])
            candidate_fixed_flags.append(action['fixed_phase_flag'])
        candidate_couplings, candidate_fixed_flags = fix_couplings_and_phases(candidate_couplings, candidate_fixed_flags)
        U_pred = unitary(candidate_couplings, rabi_freqs, candidate_fractions, candidate_fixed_flags, dim)
        reward = compute_fidelity(U_target, U_pred, dim)
        test_rewards.append(reward)
        print(f"Test {i+1}: Fidelity = {reward:.4f}")
    print(f"Average test fidelity: {np.mean(test_rewards):.4f}")

# ---------------------------
# Main Execution
# ---------------------------
if __name__ == '__main__':
    # Train the LSTM-based policy network using the RL approach.
    policy_net = train_policy(num_episodes=1000, lr=1e-3, print_every=100)
    # Evaluate on a test set.
    test_policy(policy_net, num_tests=10)


Episode 100, Loss: 1.8815, Reward: 0.4169, Avg Reward: 0.1936
Episode 200, Loss: 0.1494, Reward: 0.2496, Avg Reward: 0.2168
Episode 300, Loss: -0.5086, Reward: 0.1785, Avg Reward: 0.1994
Episode 400, Loss: -0.8907, Reward: 0.0374, Avg Reward: 0.2234
Episode 500, Loss: -0.1181, Reward: 0.1697, Avg Reward: 0.1978
Episode 600, Loss: 1.0598, Reward: 0.5207, Avg Reward: 0.2122
Episode 700, Loss: -0.0964, Reward: 0.1652, Avg Reward: 0.2154
Episode 800, Loss: 0.0444, Reward: 0.3038, Avg Reward: 0.2293
Episode 900, Loss: -0.5467, Reward: 0.0636, Avg Reward: 0.2190
Episode 1000, Loss: 0.1164, Reward: 0.2623, Avg Reward: 0.2331
Test 1: Fidelity = 0.0329
Test 2: Fidelity = 0.2014
Test 3: Fidelity = 0.2818
Test 4: Fidelity = 0.2214
Test 5: Fidelity = 0.2410
Test 6: Fidelity = 0.3204
Test 7: Fidelity = 0.2794
Test 8: Fidelity = 0.2094
Test 9: Fidelity = 0.6352
Test 10: Fidelity = 0.2816
Average test fidelity: 0.2704


In [1]:
pip install torch scipy

Collecting torch
  Downloading torch-2.6.0-cp312-none-macosx_11_0_arm64.whl.metadata (28 kB)
Collecting sympy==1.13.1 (from torch)
  Downloading sympy-1.13.1-py3-none-any.whl.metadata (12 kB)
Downloading torch-2.6.0-cp312-none-macosx_11_0_arm64.whl (66.5 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m66.5/66.5 MB[0m [31m22.7 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hDownloading sympy-1.13.1-py3-none-any.whl (6.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.2/6.2 MB[0m [31m19.6 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: sympy, torch
  Attempting uninstall: sympy
    Found existing installation: sympy 1.13.2
    Uninstalling sympy-1.13.2:
      Successfully uninstalled sympy-1.13.2
Successfully installed sympy-1.13.1 torch-2.6.0
Note: you may need to restart the kernel to use updated packages.


In [9]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.distributions import Categorical, Beta
from scipy.linalg import expm

# ---------------------------
# Helper Functions (as provided)
# ---------------------------

def coupling_operator_with_phase(i, j, dim, phi):
    op = np.zeros((dim, dim), dtype=complex)
    op[i, j] = np.exp(+1j * phi)
    op[j, i] = np.exp(-1j * phi)
    return op

def pulse_duration_for_fraction(f, Omega):
    theta = np.pi * np.array(f)
    return theta / Omega if Omega != 0 else 0.0

def unitary(couplings, rabi_freqs, fractions, fixed_phase_flags, dim):
    U_seq = np.eye(dim, dtype=complex)
    for (levels, Omega, frac, fix_pflag) in zip(couplings, rabi_freqs, fractions, fixed_phase_flags):
        i, j = levels
        phi_fixed = fix_pflag * np.pi
        total_phase = phi_fixed
        H_op = coupling_operator_with_phase(i, j, dim, total_phase)
        H_coupling = 0.5 * Omega * H_op
        t_pulse = pulse_duration_for_fraction(frac, Omega)
        U_pulse = expm(-1j * H_coupling * t_pulse)
        U_seq = U_pulse @ U_seq
    return U_seq

def fix_couplings_and_phases(couplings, fixed_phase_flags):
    new_couplings = []
    new_fixed_phase_flags = []
    for (cpl, phase_flag) in zip(couplings, fixed_phase_flags):
        i, j = cpl
        if i != 0 and j == 0:
            cpl_fixed = (0, i)
            phase_flag_fixed = phase_flag + 1.0
        else:
            cpl_fixed = cpl
            phase_flag_fixed = phase_flag
        new_couplings.append(cpl_fixed)
        new_fixed_phase_flags.append(phase_flag_fixed)
    return new_couplings, new_fixed_phase_flags

def compute_fidelity(U_target, U_pred, dim):
    # Fidelity measure using the normalized absolute trace overlap.
    fid = np.abs(np.trace(np.conjugate(U_target.T) @ U_pred)) / dim
    return fid

# ---------------------------
# Problem Setup Parameters
# ---------------------------

dim = 5                  # Hilbert space dimension
sequence_length = 7      # Number of pulses in the sequence
max_fraction = 2       # Maximum value for the 'fraction' parameter
# For the RL agent, we fix rabi frequencies to 1 for every pulse.
rabi_freqs = [1] * sequence_length

# Define a list of allowed couplings (discrete choices) for the policy.
allowed_couplings = [(0, 1), (0, 2), (0, 3), (0, 4)]
num_allowed_couplings = len(allowed_couplings)

# ---------------------------
# Dataset Generation Functions
# ---------------------------
def generate_random_sequence():
    """Generates a random pulse sequence using uniform sampling.
       Returns:
          couplings: list of tuples (each chosen from allowed_couplings)
          fractions: list of continuous parameters in [0, max_fraction]
          fixed_phase_flags: list of binary values (0 or 1)
    """
    couplings = []
    fractions = []
    fixed_phase_flags = []
    for _ in range(sequence_length):
        # Randomly choose a coupling from the allowed list.
        idx = np.random.randint(0, num_allowed_couplings)
        couplings.append(allowed_couplings[idx])
        # Sample a random fraction uniformly.
        frac = np.random.uniform(0.0, max_fraction)
        fractions.append(frac)
        # Sample a binary fixed-phase flag.
        fix_flag = np.random.randint(0, 2)
        fixed_phase_flags.append(fix_flag)
        
    # Apply the fix function (if applicable)
    couplings, fixed_phase_flags = fix_couplings_and_phases(couplings, fixed_phase_flags)
    return couplings, fractions, fixed_phase_flags

def generate_random_target_unitary():
    """Generates a target unitary by sampling a random pulse sequence.
       Returns:
          U_target (np.ndarray): the resulting unitary matrix.
          parameters: the pulse parameters used (for reference).
    """
    couplings, fractions, fixed_phase_flags = generate_random_sequence()
    U_target = unitary(couplings, rabi_freqs, fractions, fixed_phase_flags, dim)
    return U_target, (couplings, fractions, fixed_phase_flags)

# ---------------------------
# LSTM-Based Policy Network Definition
# ---------------------------
class PolicyNetworkLSTM(nn.Module):
    def __init__(self, input_dim, hidden_dim, sequence_length, num_couplings):
        """
        input_dim: Dimension of the flattened target unitary (real and imaginary parts concatenated)
        hidden_dim: Hidden layer size for both the encoder and LSTM
        sequence_length: Number of pulses (time steps)
        num_couplings: Number of allowed coupling choices.
        """
        super(PolicyNetworkLSTM, self).__init__()
        self.sequence_length = sequence_length
        self.num_couplings = num_couplings
        # Each time step will output:
        # - Coupling logits (num_couplings)
        # - Fixed-phase logits (2)
        # - Fraction parameters (2 for Beta: raw_alpha, raw_beta)
        self.output_per_step = num_couplings + 2 + 2

        # Encoder: Encode the target unitary into a hidden representation.
        self.encoder = nn.Linear(input_dim, hidden_dim)
        self.relu = nn.ReLU()

        # LSTM decoder: receives a learned start token at each time step.
        self.lstm = nn.LSTM(input_size=hidden_dim, hidden_size=hidden_dim, batch_first=True)
        # Learned start token; shape: (1, hidden_dim) which will be repeated.
        self.start_token = nn.Parameter(torch.zeros(1, hidden_dim))
        
        # Decoder to produce output for each time step from the LSTM hidden state.
        self.decoder = nn.Linear(hidden_dim, self.output_per_step)
        self.softplus = nn.Softplus()
    
    def forward(self, x):
        """
        x: Tensor of shape (batch_size, input_dim) representing the target unitary.
        Returns for each sample in the batch: a list of actions per time step and the sum of their log probabilities.
        Each action is a dictionary with keys: 'coupling', 'fraction', and 'fixed_phase_flag'.
        """
        batch_size = x.size(0)
        # Encode the target unitary.
        encoded = self.relu(self.encoder(x))  # shape: (batch_size, hidden_dim)
        # Use the encoded representation as the initial hidden state.
        h0 = encoded.unsqueeze(0)  # shape: (1, batch_size, hidden_dim)
        c0 = torch.zeros_like(h0)  # initialize cell state to zeros
        
        # Prepare LSTM input: we use the same start token at each time step.
        # Create tokens with shape: (batch_size, sequence_length, hidden_dim)
        tokens = self.start_token.expand(batch_size, self.sequence_length, -1)
        
        # Generate the sequence from the LSTM.
        lstm_out, _ = self.lstm(tokens, (h0, c0))  # shape: (batch_size, sequence_length, hidden_dim)
        # Project LSTM outputs to the desired output shape.
        decoded = self.decoder(lstm_out)  # shape: (batch_size, sequence_length, output_per_step)
        
        actions_batch = []  # List to hold actions per sample in the batch.
        log_probs = []      # List for the sum of log probabilities for each sample.
        
        for b in range(batch_size):
            actions = []
            log_prob_sum = 0
            # Process each time step.
            for t in range(self.sequence_length):
                step_out = decoded[b, t]
                # Split outputs into parts.
                coupling_logits = step_out[0:self.num_couplings]  # Discrete coupling selection.
                fixed_logits = step_out[self.num_couplings:self.num_couplings+2]  # Fixed-phase binary decision.
                fraction_raw = step_out[self.num_couplings+2:self.num_couplings+4]  # For Beta distribution parameters.
                
                # Sample from the coupling distribution.
                coupling_dist = Categorical(logits=coupling_logits)
                coupling_index = coupling_dist.sample()
                log_prob_sum = log_prob_sum + coupling_dist.log_prob(coupling_index)
                
                # Sample from the fixed-phase distribution.
                fixed_dist = Categorical(logits=fixed_logits)
                fixed_phase_flag = fixed_dist.sample()
                log_prob_sum = log_prob_sum + fixed_dist.log_prob(fixed_phase_flag)
                
                # Sample the fraction from a Beta distribution.
                alpha = self.softplus(fraction_raw[0]) + 1.0
                beta_param = self.softplus(fraction_raw[1]) + 1.0
                fraction_dist = Beta(alpha, beta_param)
                fraction_sample = fraction_dist.rsample()  # Use rsample for reparameterization.
                # Scale to the desired range [0, max_fraction].
                fraction = fraction_sample * max_fraction
                log_prob_sum = log_prob_sum + fraction_dist.log_prob(fraction_sample)
                
                # Append the action.
                actions.append({
                    'coupling': int(coupling_index.item()),
                    'fraction': fraction.item(),
                    'fixed_phase_flag': int(fixed_phase_flag.item())
                })
            actions_batch.append(actions)
            log_probs.append(log_prob_sum)
        
        return actions_batch, torch.stack(log_probs)

# ---------------------------
# Helper: Convert Target Unitary to Network Input
# ---------------------------
def target_unitary_to_tensor(U):
    """
    Convert U (a complex matrix) into a real tensor that concatenates the flattened real and imaginary parts.
    """
    U_real = np.real(U).flatten()
    U_imag = np.imag(U).flatten()
    U_concat = np.concatenate([U_real, U_imag])
    return torch.tensor(U_concat, dtype=torch.float32)

# ---------------------------
# Training Loop (REINFORCE)
# ---------------------------
def train_policy(num_episodes=1000, lr=1e-3, print_every=100):
    # Input dimension: dim*dim*2 (for real and imaginary parts).
    input_dim = dim * dim * 2
    hidden_dim = 128
    policy_net = PolicyNetworkLSTM(input_dim, hidden_dim, sequence_length, num_allowed_couplings)
    optimizer = optim.Adam(policy_net.parameters(), lr=lr)
    
    # Running baseline for variance reduction.
    baseline = None
    reward_history = []
    
    for episode in range(num_episodes):
        # Generate a target unitary.
        U_target, true_params = generate_random_target_unitary()
        target_tensor = target_unitary_to_tensor(U_target).unsqueeze(0)  # shape: (1, input_dim)
        
        # Forward pass: sample a candidate pulse sequence.
        actions_batch, log_prob = policy_net(target_tensor)
        actions = actions_batch[0]  # Assume batch_size=1.
        
        # Build candidate sequence from actions.
        candidate_couplings = []
        candidate_fractions = []
        candidate_fixed_flags = []
        for action in actions:
            candidate_couplings.append(allowed_couplings[action['coupling']])
            candidate_fractions.append(action['fraction'])
            candidate_fixed_flags.append(action['fixed_phase_flag'])
        
        # Optionally apply the fix on couplings/phases.
        candidate_couplings, candidate_fixed_flags = fix_couplings_and_phases(candidate_couplings, candidate_fixed_flags)
        U_pred = unitary(candidate_couplings, rabi_freqs, candidate_fractions, candidate_fixed_flags, dim)
        
        # Compute reward based on fidelity.
        reward = compute_fidelity(U_target, U_pred, dim)
        reward_history.append(reward)
        
        # Update running baseline.
        if baseline is None:
            baseline = reward
        else:
            baseline = 0.99 * baseline + 0.01 * reward
        
        # REINFORCE loss: maximize reward by minimizing -log_prob * (reward - baseline)
        loss = -log_prob * (reward - baseline)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        if (episode + 1) % print_every == 0:
            avg_reward = np.mean(reward_history[-print_every:])
            print(f"Episode {episode+1}, Loss: {loss.item():.4f}, Reward: {reward:.4f}, Avg Reward: {avg_reward:.4f}")
    
    return policy_net

# ---------------------------
# Testing the Trained Policy
# ---------------------------
def test_policy(policy_net, num_tests=10):
    test_rewards = []
    for i in range(num_tests):
        U_target, true_params = generate_random_target_unitary()
        target_tensor = target_unitary_to_tensor(U_target).unsqueeze(0)
        actions_batch, _ = policy_net(target_tensor)
        actions = actions_batch[0]
        candidate_couplings = []
        candidate_fractions = []
        candidate_fixed_flags = []
        for action in actions:
            candidate_couplings.append(allowed_couplings[action['coupling']])
            candidate_fractions.append(action['fraction'])
            candidate_fixed_flags.append(action['fixed_phase_flag'])
        candidate_couplings, candidate_fixed_flags = fix_couplings_and_phases(candidate_couplings, candidate_fixed_flags)
        U_pred = unitary(candidate_couplings, rabi_freqs, candidate_fractions, candidate_fixed_flags, dim)
        reward = compute_fidelity(U_target, U_pred, dim)
        test_rewards.append(reward)
        print(f"Test {i+1}: Fidelity = {reward:.4f}")
    print(f"Average test fidelity: {np.mean(test_rewards):.4f}")

# ---------------------------
# Main Execution
# ---------------------------
if __name__ == '__main__':
    # Train the LSTM-based policy network using the RL approach.
    policy_net = train_policy(num_episodes=1000000, lr=1e-2, print_every=1000)
    # Evaluate on a test set.
    test_policy(policy_net, num_tests=10)


Episode 1000, Loss: 0.0012, Reward: 0.1923, Avg Reward: 0.2294
Episode 2000, Loss: 0.0013, Reward: 0.1060, Avg Reward: 0.2294
Episode 3000, Loss: 0.0001, Reward: 0.2621, Avg Reward: 0.2384
Episode 4000, Loss: -0.0001, Reward: 0.2055, Avg Reward: 0.2357
Episode 5000, Loss: -3.3969, Reward: 0.6592, Avg Reward: 0.2371
Episode 6000, Loss: 0.8371, Reward: 0.0722, Avg Reward: 0.2394
Episode 7000, Loss: -0.1080, Reward: 0.1910, Avg Reward: 0.2324
Episode 8000, Loss: -0.0922, Reward: 0.2708, Avg Reward: 0.2333
Episode 9000, Loss: 0.0000, Reward: 0.2554, Avg Reward: 0.2322
Episode 10000, Loss: -0.0000, Reward: 0.0262, Avg Reward: 0.2323
Episode 11000, Loss: -0.2179, Reward: 0.0219, Avg Reward: 0.2373
Episode 12000, Loss: -0.0000, Reward: 0.0721, Avg Reward: 0.2413
Episode 13000, Loss: 0.0000, Reward: 0.6465, Avg Reward: 0.2398
Episode 14000, Loss: 0.0000, Reward: 0.1343, Avg Reward: 0.2326
Episode 15000, Loss: -0.0000, Reward: 0.1369, Avg Reward: 0.2288
Episode 16000, Loss: -0.0000, Reward: 0.7

ValueError: Expected parameter logits (Tensor of shape (4,)) of distribution Categorical(logits: torch.Size([4])) to satisfy the constraint IndependentConstraint(Real(), 1), but found invalid values:
tensor([nan, nan, nan, nan], grad_fn=<SubBackward0>)