In [13]:
# Cell 1: Import all tools and setup environment
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
import sys
import os

# Add src to path
sys.path.insert(0, 'src')

# Import compatibility and suppress warnings
from utils.compatibility import apply_all_compatibility_patches
from utils.warning_suppression import setup_clean_environment

# Apply patches and clean environment
apply_all_compatibility_patches()
setup_clean_environment()

# Import quantum libraries
import strawberryfields as sf
from strawberryfields import ops

# Import our loss functions
from utils.quantum_bimodal_loss import QuantumBimodalLoss

print("  Environment Setup Complete")
print(f"   TensorFlow: {tf.__version__}")
print(f"   Strawberry Fields: {sf.__version__}")
print(f"   NumPy: {np.__version__}")

# Set random seeds for reproducibility
np.random.seed(42)
tf.random.set_seed(42)

print("Ready for quantum experiments!")


APPLYING COMPATIBILITY PATCHES
1. Applying NumPy 2.0+ compatibility...
  NumPy version: 2.0.2
  Applying NumPy 2.0+ compatibility patches...
    ✓ NumPy aliases already available

2. Applying SciPy 1.14+ compatibility...
  SciPy version: 1.15.3
    ✓ SciPy simps function already available

3. Applying TensorFlow compatibility...
  TensorFlow version: 2.18.0
    ⚠ No GPU detected, using CPU

4. Applying Strawberry Fields compatibility...
  SciPy version: 1.15.3
    ✓ SciPy simps function already available
  Strawberry Fields version: 0.23.0
    ✓ Strawberry Fields imports successfully
    ✓ Strawberry Fields Program creation works

5. Applying general compatibility...

✓ ALL COMPATIBILITY PATCHES APPLIED SUCCESSFULLY

Validating patches...
Validating compatibility patches...
    ✓ NumPy compatibility validated
    ✓ SciPy compatibility validated
    ✓ Strawberry Fields compatibility validated
    ✓ TensorFlow compatibility validated

✓ All compatibility patches validated successfully!
C

In [14]:
# Cell 2: Quantum Neural Network Foundation (COMPLETE FIXED VERSION)

class QuantumNeuralNetwork:
    """
    Core quantum neural network with coherent state encoding.
    
    Key principles:
    - Single weight tensor for all parameters
    - Coherent state encoding strategy
    - Clean parameter management
    - FIXED: Proper program management to avoid locking
    - FIXED: Proper engine management
    """
    
    def __init__(self, n_modes=2, layers=2, cutoff_dim=6):
        self.n_modes = n_modes
        self.layers = layers
        self.cutoff_dim = cutoff_dim
        
        # Calculate total parameters needed
        self.params_per_layer = self._calculate_params_per_layer()
        self.total_params = self.layers * self.params_per_layer
        
        # Single weight tensor (this is key!)
        self.weights = tf.Variable(
            tf.random.normal([self.total_params], stddev=0.1),
            name="quantum_weights"
        )
        
        print(f"QNN initialized: {n_modes} modes, {layers} layers, {self.total_params} parameters")
    
    def _calculate_params_per_layer(self):
        """Calculate parameters needed per quantum layer."""
        # Interferometer parameters
        M = int(self.n_modes * (self.n_modes - 1)) + max(1, self.n_modes - 1)
        # Total: 2 interferometers + squeezing + displacement + kerr
        return 2 * M + 4 * self.n_modes
    
    def _build_quantum_program_with_input(self, input_encoding=None):
        """Build quantum program including input encoding (FIXED)."""
        # Create fresh program each time to avoid locking
        prog = sf.Program(self.n_modes)
        
        # Create symbolic parameters
        param_names = [f"p_{i}" for i in range(self.total_params)]
        sf_params = [prog.params(name) for name in param_names]
        
        # Build circuit
        with prog.context as q:
            # Apply input encoding first (if provided)
            if input_encoding is not None:
                for i, alpha in enumerate(input_encoding[:self.n_modes]):
                    ops.Dgate(tf.math.real(alpha), tf.math.imag(alpha)) | q[i]
            
            # Apply quantum layers
            for layer in range(self.layers):
                start_idx = layer * self.params_per_layer
                layer_params = sf_params[start_idx:start_idx + self.params_per_layer]
                self._quantum_layer(layer_params, q)
        
        return prog, sf_params
    
    def _quantum_layer(self, params, q):
        """Single quantum layer implementation."""
        N = self.n_modes
        M = int(N * (N - 1)) + max(1, N - 1)
        
        # Extract parameter groups
        int1 = params[:M]
        s = params[M:M+N]
        int2 = params[M+N:2*M+N]
        dr = params[2*M+N:2*M+2*N]
        dp = params[2*M+2*N:2*M+3*N]
        k = params[2*M+3*N:2*M+4*N]
        
        # Apply operations
        self._interferometer(int1, q)
        for i in range(N):
            ops.Sgate(s[i]) | q[i]
        self._interferometer(int2, q)
        for i in range(N):
            ops.Dgate(dr[i], dp[i]) | q[i]
            ops.Kgate(k[i]) | q[i]
    
    def _interferometer(self, params, q):
        """Interferometer implementation."""
        N = len(q)
        if N == 1:
            ops.Rgate(params[-1]) | q[0]
            return
        
        theta = params[:N*(N-1)//2]
        phi = params[N*(N-1)//2:N*(N-1)]
        rphi = params[-N+1:]
        
        n = 0
        for l in range(N):
            for k, (q1, q2) in enumerate(zip(q[:-1], q[1:])):
                if (l + k) % 2 != 1:
                    ops.BSgate(theta[n], phi[n]) | (q1, q2)
                    n += 1
        
        for i in range(max(1, N - 1)):
            ops.Rgate(rphi[i]) | q[i]
    
    def execute_circuit(self, input_encoding=None):
        """Execute quantum circuit with current weights (FIXED - proper engine management)."""
        # Build fresh program with input encoding
        prog, sf_params = self._build_quantum_program_with_input(input_encoding)
        
        # Create parameter mapping
        mapping = {param.name: weight for param, weight in zip(sf_params, self.weights)}
        
        # Create fresh engine for each execution (FIXED)
        eng = sf.Engine(backend="tf", backend_options={
            "cutoff_dim": self.cutoff_dim,
            "pure": True
        })
        
        # Run the program
        result = eng.run(prog, args=mapping)
        return result.state

def create_encoding_matrix(input_dim, n_modes, encoding_type='coherent'):
    """
    Create encoding matrix to transform input data to quantum encoding.
    
    Args:
        input_dim: Dimension of input data
        n_modes: Number of quantum modes
        encoding_type: Type of encoding ('coherent', 'displacement')
    
    Returns:
        Encoding matrix with no null space
    """
    if encoding_type == 'coherent':
        # For coherent states, we need complex amplitudes
        # Create matrix that maps input_dim -> 2*n_modes (real + imag parts)
        matrix = tf.Variable(
            tf.random.normal([input_dim, 2 * n_modes], stddev=0.5),
            name="encoding_matrix"
        )
    else:
        # Direct mapping
        matrix = tf.Variable(
            tf.random.normal([input_dim, n_modes], stddev=0.5),
            name="encoding_matrix"
        )
    
    return matrix

def apply_coherent_encoding(input_data, encoding_matrix):
    """
    Apply coherent state encoding to input data.
    
    Args:
        input_data: Input tensor [batch_size, input_dim]
        encoding_matrix: Encoding transformation matrix
    
    Returns:
        Complex coherent state amplitudes [batch_size, n_modes]
    """
    # Transform input
    encoded = tf.matmul(input_data, encoding_matrix)  # [batch_size, 2*n_modes]
    
    # Split into real and imaginary parts
    n_modes = encoded.shape[-1] // 2
    real_parts = encoded[:, :n_modes]
    imag_parts = encoded[:, n_modes:]
    
    # Create complex amplitudes
    coherent_amplitudes = tf.complex(real_parts, imag_parts)
    
    return coherent_amplitudes

print("Quantum Neural Network ready!")

Quantum Neural Network ready!


In [15]:
# Cell 3: Generator Class (CORRECTED)

class QuantumGenerator:
    """
    Quantum generator using PURE QuantumNeuralNetwork encoding.
    
    Architecture (consistent with discriminator):
    1. Latent input -> QuantumNeuralNetwork (feature processing)
    2. Quantum features -> QuantumNeuralNetwork (output generation)
    3. Quantum measurement -> final output
    
    Uses the QuantumNeuralNetwork class for both stages!
    """
    
    def __init__(self, latent_dim=4, n_modes=2, layers=2, cutoff_dim=6, output_dim=2):
        self.latent_dim = latent_dim
        self.output_dim = output_dim
        self.n_modes = n_modes
        
        # Stage 1: Feature processing using QuantumNeuralNetwork
        self.feature_qnn = QuantumNeuralNetwork(n_modes, layers, cutoff_dim)
        
        # Stage 2: Output generation using SAME QuantumNeuralNetwork class
        self.output_qnn = QuantumNeuralNetwork(
            n_modes=output_dim,  # One mode per output dimension
            layers=1,            # Simple output generation
            cutoff_dim=cutoff_dim
        )
        
        # Encoding matrices
        self.input_encoding = create_encoding_matrix(latent_dim, n_modes, 'coherent')
        
        # For output generation: quantum features -> coherent amplitudes
        self.output_encoding = create_encoding_matrix(n_modes, output_dim, 'coherent')
        
        print(f"  PURE QNN Generator:")
        print(f"   Stage 1: {latent_dim}D -> QNN({n_modes} modes)")
        print(f"   Stage 2: {n_modes}D -> QNN({output_dim} modes)")
        print(f"   Both stages use QuantumNeuralNetwork class!")
    
    @property
    def trainable_variables(self):
        """All quantum - no classical neural networks!"""
        return [
            self.feature_qnn.weights,
            self.output_qnn.weights,
            self.input_encoding,
            self.output_encoding
        ]
    
    def generate(self, latent_input):
        """Pure quantum generation using QuantumNeuralNetwork twice."""
        batch_size = tf.shape(latent_input)[0]
        all_outputs = []
        
        for i in range(batch_size):
            # Stage 1: Feature processing via QuantumNeuralNetwork
            coherent_amplitudes_1 = apply_coherent_encoding(
                latent_input[i:i+1], self.input_encoding
            )[0]
            
            quantum_state_1 = self.feature_qnn.execute_circuit(coherent_amplitudes_1)
            quantum_features = self._extract_quantum_features(quantum_state_1)
            
            # Stage 2: Output generation via SAME QuantumNeuralNetwork class
            coherent_amplitudes_2 = apply_coherent_encoding(
                tf.expand_dims(quantum_features, 0), self.output_encoding
            )[0]
            
            quantum_state_2 = self.output_qnn.execute_circuit(coherent_amplitudes_2)
            final_output = self._quantum_measurement_to_output(quantum_state_2)
            
            all_outputs.append(final_output)
        
        return tf.stack(all_outputs)
    
    def _extract_quantum_features(self, quantum_state):
        """Extract features from first QuantumNeuralNetwork (SINGLE measurement)."""
        ket = quantum_state.ket()
        prob_amplitudes = tf.abs(ket) ** 2
        n_vals = tf.range(self.feature_qnn.cutoff_dim, dtype=tf.float32)
        
        features = []
        for mode in range(self.n_modes):
            # Single measurement: photon number ⟨n̂⟩ (quantum mechanically correct!)
            photon_number = tf.reduce_sum(prob_amplitudes * n_vals)
            normalized_feature = tf.tanh((photon_number - self.feature_qnn.cutoff_dim/2) / 2.0)
            features.append(normalized_feature)
        
        return tf.stack(features)
    
    def _quantum_measurement_to_output(self, quantum_state):
        """Convert quantum state from second QuantumNeuralNetwork to final output."""
        ket = quantum_state.ket()
        prob_amplitudes = tf.abs(ket) ** 2
        n_vals = tf.range(self.output_qnn.cutoff_dim, dtype=tf.float32)
        
        outputs = []
        for mode in range(self.output_dim):
            # Photon number measurement per output dimension
            photon_number = tf.reduce_sum(prob_amplitudes * n_vals)
            
            # Map to desired output range (e.g., [-2, 2] for bimodal data)
            output_value = (photon_number - self.output_qnn.cutoff_dim/2) * 4.0 / self.output_qnn.cutoff_dim
            outputs.append(output_value)
        
        return tf.stack(outputs)
    
    def compute_quantum_metrics(self):
        """Metrics for both QuantumNeuralNetworks."""
        return {
            'architecture': 'Pure QuantumNeuralNetwork',
            'feature_qnn_params': self.feature_qnn.total_params,
            'output_qnn_params': self.output_qnn.total_params,
            'total_quantum_params': self.feature_qnn.total_params + self.output_qnn.total_params,
            'classical_params': 0  # Zero classical parameters!
        }

In [16]:
# Cell 4: Discriminator Class 

class QuantumDiscriminator:
    """
    Quantum discriminator using PURE QuantumNeuralNetwork encoding.
    
    Architecture:
    1. Input -> QuantumNeuralNetwork (feature extraction)
    2. Quantum features -> QuantumNeuralNetwork (classification)
    3. Quantum measurement -> probability
    
    Uses the QuantumNeuralNetwork class for both stages!
    """
    
    def __init__(self, input_dim=2, n_modes=2, layers=2, cutoff_dim=6):
        self.input_dim = input_dim
        self.n_modes = n_modes
        
        # Stage 1: Feature extraction using QuantumNeuralNetwork
        self.feature_qnn = QuantumNeuralNetwork(n_modes, layers, cutoff_dim)
        
        # Stage 2: Classification using SAME QuantumNeuralNetwork class
        self.classifier_qnn = QuantumNeuralNetwork(
            n_modes=1,  # Single mode for binary classification
            layers=1,   # Simple classification
            cutoff_dim=cutoff_dim
        )
        
        # Encoding matrices
        self.input_encoding = create_encoding_matrix(input_dim, n_modes, 'coherent')
        
        # For classification: quantum features -> coherent amplitudes
        self.classification_encoding = create_encoding_matrix(n_modes, 1, 'coherent')
        
        print(f"  PURE QNN Discriminator:")
        print(f"   Stage 1: {input_dim}D -> QNN({n_modes} modes)")
        print(f"   Stage 2: {n_modes}D -> QNN(1 mode)")
        print(f"   Both stages use QuantumNeuralNetwork class!")
    
    @property
    def trainable_variables(self):
        """All quantum - no classical neural networks!"""
        return [
            self.feature_qnn.weights,
            self.classifier_qnn.weights,
            self.input_encoding,
            self.classification_encoding
        ]
    
    def discriminate(self, input_data):
        """Pure quantum discrimination using QuantumNeuralNetwork twice."""
        batch_size = tf.shape(input_data)[0]
        all_probabilities = []
        
        for i in range(batch_size):
            # Stage 1: Feature extraction via QuantumNeuralNetwork
            coherent_amplitudes_1 = apply_coherent_encoding(
                input_data[i:i+1], self.input_encoding
            )[0]
            
            quantum_state_1 = self.feature_qnn.execute_circuit(coherent_amplitudes_1)
            quantum_features = self._extract_quantum_features(quantum_state_1)
            
            # Stage 2: Classification via SAME QuantumNeuralNetwork class
            coherent_amplitudes_2 = apply_coherent_encoding(
                tf.expand_dims(quantum_features, 0), self.classification_encoding
            )[0]
            
            quantum_state_2 = self.classifier_qnn.execute_circuit(coherent_amplitudes_2)
            probability = self._quantum_measurement_to_probability(quantum_state_2)
            
            all_probabilities.append(probability)
        
        return tf.stack(all_probabilities)
    
    def _extract_quantum_features(self, quantum_state):
        """Extract features from first QuantumNeuralNetwork."""
        ket = quantum_state.ket()
        prob_amplitudes = tf.abs(ket) ** 2
        n_vals = tf.range(self.feature_qnn.cutoff_dim, dtype=tf.float32)
        
        features = []
        for mode in range(self.n_modes):
            # Single measurement: photon number ⟨n̂⟩
            photon_number = tf.reduce_sum(prob_amplitudes * n_vals)
            normalized_feature = tf.tanh(photon_number / self.feature_qnn.cutoff_dim)
            features.append(normalized_feature)
        
        return tf.stack(features)
    
    def _quantum_measurement_to_probability(self, quantum_state):
        """Convert quantum state from second QuantumNeuralNetwork to probability."""
        ket = quantum_state.ket()
        prob_amplitudes = tf.abs(ket) ** 2
        n_vals = tf.range(self.classifier_qnn.cutoff_dim, dtype=tf.float32)
        
        # Photon number measurement on single mode
        photon_number = tf.reduce_sum(prob_amplitudes * n_vals)
        
        # Map to [0,1] probability
        probability = tf.sigmoid((photon_number - self.classifier_qnn.cutoff_dim/2) * 2.0)
        
        return tf.expand_dims(probability, 0)
    
    def compute_quantum_metrics(self):
        """Metrics for both QuantumNeuralNetworks."""
        return {
            'architecture': 'Pure QuantumNeuralNetwork',
            'feature_qnn_params': self.feature_qnn.total_params,
            'classifier_qnn_params': self.classifier_qnn.total_params,
            'total_quantum_params': self.feature_qnn.total_params + self.classifier_qnn.total_params,
            'classical_params': 0  # Zero classical parameters!
        }

print("  PURE QuantumNeuralNetwork Discriminator ready!")
print("  Uses the SAME QuantumNeuralNetwork class for both feature extraction and classification!")

  PURE QuantumNeuralNetwork Discriminator ready!
  Uses the SAME QuantumNeuralNetwork class for both feature extraction and classification!


In [18]:
# Cell 5: Assembly and Testing (FIXED)

print(" Assembling Quantum GAN Components...")

# Configuration
config = {
    'latent_dim': 4,
    'n_modes': 4,
    'layers': 2,
    'cutoff_dim': 10,
    'output_dim': 2,
    'batch_size': 4  # Small for testing
}

print(f" Configuration: {config}")

# Initialize components
generator = QuantumGenerator(
    latent_dim=config['latent_dim'],
    n_modes=config['n_modes'],
    layers=config['layers'],
    cutoff_dim=config['cutoff_dim'],
    output_dim=config['output_dim']
)

discriminator = QuantumDiscriminator(
    input_dim=config['output_dim'],
    n_modes=config['n_modes'],
    layers=config['layers'],
    cutoff_dim=config['cutoff_dim']
)

# Initialize loss function
bimodal_loss = QuantumBimodalLoss(
    mode1_center=[-1.5, -1.5],
    mode2_center=[1.5, 1.5]
)

print("\n🧪 Testing Components...")

# Initialize variables for testing
generated_samples = None
test_latent = tf.random.normal([config['batch_size'], config['latent_dim']])

# Test 1: Generator forward pass
print("\n🔄 Testing Generator:")
try:
    generated_samples = generator.generate(test_latent)
    print(f"   ✅ Generator output shape: {generated_samples.shape}")
    print(f"   📊 Sample range: [{tf.reduce_min(generated_samples):.3f}, {tf.reduce_max(generated_samples):.3f}]")
    
    # Test quantum metrics
    gen_metrics = generator.compute_quantum_metrics()
    print(f"   📈 Quantum metrics: {gen_metrics}")
    
except Exception as e:
    print(f"   ❌ Generator test failed: {e}")
    # Create fallback for testing
    generated_samples = tf.random.normal([config['batch_size'], config['output_dim']])

# Test 2: Discriminator forward pass
print("\n🔍 Testing Discriminator:")
try:
    test_data = tf.random.normal([config['batch_size'], config['output_dim']])
    discriminator_output = discriminator.discriminate(test_data)
    print(f"   ✅ Discriminator output shape: {discriminator_output.shape}")
    print(f"   📊 Probability range: [{tf.reduce_min(discriminator_output):.3f}, {tf.reduce_max(discriminator_output):.3f}]")
    
    # Test quantum metrics
    disc_metrics = discriminator.compute_quantum_metrics()
    print(f"   📈 Quantum metrics: {disc_metrics}")
    
except Exception as e:
    print(f"   ❌ Discriminator test failed: {e}")

# Test 3: Loss function (FIXED - generated_samples now defined)
print("\n📉 Testing Loss Function:")
try:
    # Create test real data (bimodal)
    real_mode1 = tf.random.normal([config['batch_size']//2, 2], mean=[-1.5, -1.5], stddev=0.2)
    real_mode2 = tf.random.normal([config['batch_size']//2, 2], mean=[1.5, 1.5], stddev=0.2)
    real_samples = tf.concat([real_mode1, real_mode2], axis=0)
    
    # Test loss (generated_samples is now defined)
    total_loss, loss_metrics = bimodal_loss(real_samples, generated_samples, generator)
    print(f"   ✅ Loss computation successful")
    print(f"   📊 Total loss: {total_loss:.4f}")
    print(f"   📋 Loss components:")
    for key, value in loss_metrics.items():
        print(f"      {key}: {value:.4f}")
    
except Exception as e:
    print(f"   ❌ Loss test failed: {e}")

# Rest of testing continues...

 Assembling Quantum GAN Components...
 Configuration: {'latent_dim': 4, 'n_modes': 4, 'layers': 2, 'cutoff_dim': 10, 'output_dim': 2, 'batch_size': 4}
QNN initialized: 4 modes, 2 layers, 92 parameters
QNN initialized: 2 modes, 1 layers, 14 parameters
  PURE QNN Generator:
   Stage 1: 4D -> QNN(4 modes)
   Stage 2: 4D -> QNN(2 modes)
   Both stages use QuantumNeuralNetwork class!
QNN initialized: 4 modes, 2 layers, 92 parameters
QNN initialized: 1 modes, 1 layers, 6 parameters
  PURE QNN Discriminator:
   Stage 1: 2D -> QNN(4 modes)
   Stage 2: 4D -> QNN(1 mode)
   Both stages use QuantumNeuralNetwork class!

🧪 Testing Components...

🔄 Testing Generator:
   ✅ Generator output shape: (4, 2)
   📊 Sample range: [-1.995, -1.992]
   📈 Quantum metrics: {'architecture': 'Pure QuantumNeuralNetwork', 'feature_qnn_params': 92, 'output_qnn_params': 14, 'total_quantum_params': 106, 'classical_params': 0}

🔍 Testing Discriminator:
   ✅ Discriminator output shape: (4, 1)
   📊 Probability range: [0.00