In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, random_split
from torchvision.datasets import GTSRB
import pennylane as qml
from pennylane import numpy as np
import torch.nn.functional as F

# Enhanced Data Augmentation
transform_train = transforms.Compose([
    transforms.Resize((32, 32)),
    transforms.RandomRotation(10),  # Random rotation
    transforms.RandomHorizontalFlip(p=0.5),  # Random horizontal flip
    transforms.ColorJitter(brightness=0.2, contrast=0.2),  # Color jittering
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

transform_test = transforms.Compose([
    transforms.Resize((32, 32)),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])


In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Load datasets with enhanced augmentation
train_dataset = GTSRB(root="data", split="train", 
                      transform=transform_train, download=True)

# Split training data into train and validation
train_size = int(0.8 * len(train_dataset))
val_size = len(train_dataset) - train_size
train_dataset, val_dataset = random_split(train_dataset, [train_size, val_size])

test_dataset = GTSRB(root="data", split="test", 
                     transform=transform_test, download=True)

# DataLoaders with improved parameters
train_loader = DataLoader(train_dataset, batch_size=64, 
                          shuffle=True, pin_memory=True, 
                          num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=64, 
                        shuffle=False, pin_memory=True, 
                        num_workers=4)
test_loader = DataLoader(test_dataset, batch_size=64, 
                         shuffle=False, pin_memory=True, 
                         num_workers=4)

Using device: cuda


In [3]:
def variational_circuit(inputs, weights):
    """
    Enhanced quantum circuit with more flexible encoding
    """
    n_qubits = len(inputs)
    dev = qml.device("default.qubit", wires=n_qubits)

    @qml.qnode(dev)
    def circuit(inputs, weights):
        # Convert inputs to numpy if they're tensors
        inputs = inputs.cpu().numpy() if torch.is_tensor(inputs) else inputs
        weights = weights.cpu().numpy() if torch.is_tensor(weights) else weights
        
        # More sophisticated input encoding
        for i in range(n_qubits):
            # Angle encoding with multiple rotation gates
            qml.RX(inputs[i], wires=i)
            qml.RY(inputs[i] * 0.5, wires=i)
        
        # More complex entanglement strategy
        for layer in range(weights.shape[0]):
            # Entangling layer
            for i in range(n_qubits):
                qml.RZ(weights[layer, i, 0], wires=i)
                qml.RY(weights[layer, i, 1], wires=i)
            
            # Controlled rotations for entanglement
            for i in range(n_qubits - 1):
                qml.CNOT(wires=[i, i+1])
        
        # Multi-observable measurement
        return [qml.expval(qml.PauliZ(i)) for i in range(n_qubits)]

    return circuit(inputs, weights)

In [4]:
class HybridModel(nn.Module):
    def __init__(self, n_qubits, n_layers, n_classes):
        super(HybridModel, self).__init__()
        self.n_qubits = n_qubits
        
        # Classical preprocessing layers
        self.preprocess = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Flatten()
        )
        
        # Determine input size for fc1
        with torch.no_grad():
            test_input = torch.zeros(1, 3, 32, 32)
            fc_input_size = self.preprocess(test_input).size(1)
        
        # Quantum-related parameters
        self.q_weights = nn.Parameter(torch.randn(
            n_layers, n_qubits, 2, device=device, dtype=torch.float32))
        
        # Linear layers with dropout for regularization
        self.fc1 = nn.Linear(fc_input_size, n_qubits).to(device)
        self.dropout = nn.Dropout(0.3)
        self.fc2 = nn.Linear(n_qubits, n_classes).to(device)

    def forward(self, x):
        # Ensure input is on GPU
        x = x.to(device)
        
        # Classical preprocessing
        x = self.preprocess(x)
        x = self.fc1(x)
        x = self.dropout(x)
        
        batch_size = x.size(0)
        quantum_output = []
        
        for i in range(batch_size):
            # Detach and convert to numpy, then back to tensor
            q_out = variational_circuit(
                x[i].detach().cpu().numpy(), 
                self.q_weights.detach().cpu().numpy()
            )
            # Convert back to GPU tensor
            q_out = torch.tensor(q_out, dtype=torch.float32, device=device)
            quantum_output.append(q_out)
        
        x = torch.stack(quantum_output)
        x = self.fc2(x)
        return x


In [5]:
class EarlyStopping:
    def __init__(self, patience=7, verbose=False, delta=0):
        self.patience = patience
        self.verbose = verbose
        self.counter = 0
        self.best_score = None
        self.early_stop = False
        self.val_loss_min = float('inf')
        self.delta = delta

    def __call__(self, val_loss, model):
        score = -val_loss
        if self.best_score is None:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
        elif score < self.best_score + self.delta:
            self.counter += 1
            print(f'EarlyStopping counter: {self.counter} out of {self.patience}')
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
            self.counter = 0

    def save_checkpoint(self, val_loss, model):
        if self.verbose:
            print(f'Validation loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}). Saving model ...')
        torch.save(model.state_dict(), 'checkpoint.pt')
        self.val_loss_min = val_loss

In [6]:
# Setup
n_qubits = 8  # Increased number of qubits
n_layers = 4  # More quantum layers
all_labels = [label for _, label in train_dataset]
n_classes = len(set(all_labels))

# Initialize model on GPU
model = HybridModel(n_qubits=n_qubits, n_layers=n_layers, n_classes=n_classes).to(device)

# Loss and Optimizer with Learning Rate Scheduler
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=0.01, weight_decay=1e-4)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=3, factor=0.5)

# Early Stopping
early_stopping = EarlyStopping(patience=10, verbose=True)

# Training Loop with Validation
epochs = 20
best_val_accuracy = 0.0

for epoch in range(epochs):
    # Training Phase
    model.train()
    train_loss = 0.0
    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)
        
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item()
    
    # Validation Phase
    model.eval()
    val_loss = 0.0
    correct = 0
    total = 0
    
    with torch.no_grad():
        for images, labels in val_loader:
            images, labels = images.to(device), labels.to(device)
            
            outputs = model(images)
            loss = criterion(outputs, labels)
            val_loss += loss.item()
            
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    
    val_accuracy = 100 * correct / total
    val_loss /= len(val_loader)
    train_loss /= len(train_loader)
    
    print(f"Epoch {epoch+1}: Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.2f}%")
    
    # Learning rate scheduling
    scheduler.step(val_loss)
    
    # Early stopping
    early_stopping(val_loss, model)
    if early_stopping.early_stop:
        print("Early stopping triggered!")
        break

Epoch 1: Train Loss: 3.4834, Val Loss: 3.4307, Val Accuracy: 8.39%
Validation loss decreased (inf --> 3.430696). Saving model ...
Epoch 2: Train Loss: 3.4228, Val Loss: 3.4096, Val Accuracy: 9.80%
Validation loss decreased (3.430696 --> 3.409583). Saving model ...
Epoch 3: Train Loss: 3.4138, Val Loss: 3.3960, Val Accuracy: 9.98%
Validation loss decreased (3.409583 --> 3.396049). Saving model ...
Epoch 4: Train Loss: 3.4081, Val Loss: 3.3933, Val Accuracy: 9.44%
Validation loss decreased (3.396049 --> 3.393328). Saving model ...
Epoch 5: Train Loss: 3.4015, Val Loss: 3.3876, Val Accuracy: 10.19%
Validation loss decreased (3.393328 --> 3.387639). Saving model ...
Epoch 6: Train Loss: 3.3983, Val Loss: 3.3861, Val Accuracy: 9.52%
Validation loss decreased (3.387639 --> 3.386054). Saving model ...
Epoch 7: Train Loss: 3.3954, Val Loss: 3.3734, Val Accuracy: 9.37%
Validation loss decreased (3.386054 --> 3.373435). Saving model ...
Epoch 8: Train Loss: 3.3964, Val Loss: 3.3750, Val Accuracy

In [7]:
model.load_state_dict(torch.load('checkpoint.pt'))

  model.load_state_dict(torch.load('checkpoint.pt'))


<All keys matched successfully>

In [8]:
# Final Evaluation
model.eval()
correct = 0
total = 0
with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

accuracy = 100 * correct / total
print(f"Final Accuracy on the test set: {accuracy:.2f}%")

Final Accuracy on the test set: 9.88%
