In [20]:
import pennylane as qml
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
import torch.nn.functional as F
from sklearn.decomposition import PCA
from sklearn.model_selection import train_test_split
from sklearn.datasets import fetch_openml
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.svm import SVC
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import time

In [21]:
data = pd.read_csv('../../../creditcard.csv')

print(data.head(10))

   Time        V1        V2        V3        V4        V5        V6        V7  \
0   0.0 -1.359807 -0.072781  2.536347  1.378155 -0.338321  0.462388  0.239599   
1   0.0  1.191857  0.266151  0.166480  0.448154  0.060018 -0.082361 -0.078803   
2   1.0 -1.358354 -1.340163  1.773209  0.379780 -0.503198  1.800499  0.791461   
3   1.0 -0.966272 -0.185226  1.792993 -0.863291 -0.010309  1.247203  0.237609   
4   2.0 -1.158233  0.877737  1.548718  0.403034 -0.407193  0.095921  0.592941   
5   2.0 -0.425966  0.960523  1.141109 -0.168252  0.420987 -0.029728  0.476201   
6   4.0  1.229658  0.141004  0.045371  1.202613  0.191881  0.272708 -0.005159   
7   7.0 -0.644269  1.417964  1.074380 -0.492199  0.948934  0.428118  1.120631   
8   7.0 -0.894286  0.286157 -0.113192 -0.271526  2.669599  3.721818  0.370145   
9   9.0 -0.338262  1.119593  1.044367 -0.222187  0.499361 -0.246761  0.651583   

         V8        V9  ...       V21       V22       V23       V24       V25  \
0  0.098698  0.363787  ... -

In [22]:
X = data.drop(['Class'], axis=1)
y = data['Class']

X = np.array(X)
y = np.array(y)

print(X.shape)
print(y.shape)

(284807, 30)
(284807,)


In [23]:
X_train, X_test, y_train, y_test = train_test_split(X, y, stratify=y, test_size=0.2, random_state=42)

scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

y_train = y_train.astype(int)
y_test = y_test.astype(int)

In [24]:
X_train_torch = torch.tensor(X_train, dtype=torch.float32)
y_train_torch = torch.tensor(y_train, dtype=torch.long)
X_test_torch  = torch.tensor(X_test,  dtype=torch.float32)
y_test_torch  = torch.tensor(y_test,  dtype=torch.long)

train_dataset = TensorDataset(X_train_torch, y_train_torch)
test_dataset  = TensorDataset(X_test_torch,  y_test_torch)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader  = DataLoader(test_dataset,  batch_size=32, shuffle=False)

In [25]:
def sample_from_train(X_train, y_train, sample_size_for_each_class, batch_size):
    X_train_sampled = []
    y_train_sampled = []

    for class_label in np.unique(y_train):
        class_indices = np.where(y_train == class_label)[0]
        sampled_indices = np.random.choice(class_indices, size=sample_size_for_each_class, replace=False)

        X_train_sampled.extend(X_train[sampled_indices])
        y_train_sampled.extend(y_train[sampled_indices]) 

    X_train_sampled = np.array(X_train_sampled)
    y_train_sampled = np.array(y_train_sampled)

    X_train_sampled_torch = torch.tensor(X_train_sampled, dtype=torch.float32)
    y_train_sampled_torch = torch.tensor(y_train_sampled, dtype=torch.long)

    train_sampled_dataset = TensorDataset(X_train_sampled_torch, y_train_sampled_torch)
    train_sampled_loader = DataLoader(train_sampled_dataset, batch_size=batch_size, shuffle=True)

    return train_sampled_loader  

In [30]:
class QuantumLayer():
    def __init__(self, n_qubits, n_layers):
        self.n_qubits = n_qubits
        self.n_layers = n_layers
        self.dev = qml.device('lightning.gpu', wires=self.n_qubits)

    def qlayer(self, inputs, weights):
        @qml.qnode(self.dev, interface="torch")
        def circuit(inputs, weights):
            qml.AngleEmbedding(inputs, wires=list(np.arange(self.n_qubits)), rotation='X')
            qml.StronglyEntanglingLayers(weights, wires=range(self.n_qubits))
            return tuple(qml.expval(qml.PauliZ(i)) for i in range(self.n_qubits))

        return circuit(inputs, weights)  


class HybridClassifier(nn.Module):
    def __init__(self, n_qubits, n_layers, input_dim, learning_rate=1e-3, batch_size=32, epochs=3):
        super().__init__()
        self.n_qubits = n_qubits
        self.n_layers = n_layers
        self.learning_rate = learning_rate
        self.batch_size = batch_size
        self.epochs = epochs
        self.quantum_layer = QuantumLayer(n_qubits, n_layers)

        self.fc1 = nn.Linear(input_dim, 64)
        self.fc2 = nn.Linear(64, 16)
        self.fc3 = nn.Linear(16, n_qubits)
        self.scale = nn.Parameter(torch.tensor([2 * np.pi]))
        self.qnn_weights = nn.Parameter(torch.rand(n_layers, n_qubits, 3) * 1e-3)
        self.output_layer = nn.Linear(n_qubits, 2)

        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        self.to(self.device)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x))
        x = torch.tanh(x) * self.scale

        batch_size = x.size(0)
        out = torch.empty((batch_size, self.n_qubits), dtype=torch.float32, device=x.device)

        for batch_idx in range(batch_size):
            expvals = self.quantum_layer.qlayer(x[batch_idx], self.qnn_weights)
            out[batch_idx] = torch.stack(expvals)

        x = self.output_layer(out)
        return x

    def train_model(self, dataloader, loss_fn, optimizer):
        self.train()
        for _, (X, y) in enumerate(dataloader):
            X, y = X.to(self.device), y.to(self.device)
            pred = self(X)
            loss = loss_fn(pred, y)

            loss.backward()
            optimizer.step()
            optimizer.zero_grad()

    def test_model(self, dataloader, loss_fn):
        self.eval()
        num_batches = len(dataloader)
        test_loss = 0
        correct = 0
        with torch.no_grad():
            for X, y in dataloader:
                X, y = X.to(self.device), y.to(self.device)
                pred = self(X)
                predicted_label = torch.argmax(pred, dim=1)
                correct += torch.count_nonzero(predicted_label == y)
                test_loss += loss_fn(pred, y).item()

        accuracy = (correct / (len(dataloader) * self.batch_size)) * 100
        test_loss /= num_batches
        print(f"Accuracy: {accuracy:.2f}%")
        print(f"Avg loss: {test_loss:>8f} \n")

        return test_loss

    def train_with_scheduler(self, train_loader, test_loader):
        loss_fn = nn.CrossEntropyLoss()
        optimizer = torch.optim.AdamW(self.parameters(), lr=self.learning_rate)
        scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=5)

        start_time = time.time()

        for epoch in range(self.epochs):
            print(f"Epoch {epoch+1}\n--------------------------")
            self.train_model(train_loader, loss_fn, optimizer)
            test_loss = self.test_model(test_loader, loss_fn)
            scheduler.step(test_loss)

        end_time = time.time()
        elapsed_time = end_time - start_time
        print(f"Training took {elapsed_time/60:.2f} minutes.")

        self.test_model(test_loader, loss_fn)

In [32]:
n_qubits = 4
n_layers = 2
input_dim = len(X.T)

model = HybridClassifier(n_qubits, n_layers, input_dim, batch_size=128)

print("Training with full dataset:")
model.train_with_scheduler(train_loader, test_loader)

sampled_train_loader = sample_from_train(X_train, y_train, sample_size_for_each_class=5, batch_size=32)

print("\nTraining with sampled dataset:")
model.train_with_scheduler(sampled_train_loader, test_loader)

print("\nEvaluating on test set:")
model.test_model(test_loader, loss_fn=nn.CrossEntropyLoss())

Training with full dataset:
Epoch 1
--------------------------


KeyboardInterrupt: 

In [27]:
class ClassicalClassifier(nn.Module):
    def __init__(self, input_dim, learning_rate=1e-3, batch_size=32, epochs=20):
        super().__init__()
        self.learning_rate = learning_rate
        self.batch_size = batch_size
        self.epochs = epochs
        
        self.fc1 = nn.Linear(input_dim, 64)
        self.fc2 = nn.Linear(64, 16)
        self.fc3 = nn.Linear(16, 4)
        self.output_layer = nn.Linear(4, 2)  

        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        self.to(self.device)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x))
        x = self.output_layer(x)
        return x

    def train_model(self, dataloader, loss_fn, optimizer):
        self.train()
        for _, (X, y) in enumerate(dataloader):
            X, y = X.to(self.device), y.to(self.device)
            pred = self(X)
            loss = loss_fn(pred, y)

            loss.backward()
            optimizer.step()
            optimizer.zero_grad()

    def test_model(self, dataloader, loss_fn):
        self.eval()
        num_batches = len(dataloader)
        test_loss = 0
        correct = 0
        with torch.no_grad():
            for X, y in dataloader:
                X, y = X.to(self.device), y.to(self.device)
                pred = self(X)
                predicted_label = torch.argmax(pred, dim=1)
                correct += torch.count_nonzero(predicted_label == y)
                test_loss += loss_fn(pred, y).item()

        accuracy = (correct / (len(dataloader) * self.batch_size)) * 100
        test_loss /= num_batches
        print(f"Accuracy: {accuracy:.2f}%")
        print(f"Avg loss: {test_loss:>8f} \n")

        return test_loss

    def train_with_scheduler(self, train_loader, test_loader):
        loss_fn = nn.CrossEntropyLoss()
        optimizer = optim.AdamW(self.parameters(), lr=self.learning_rate)
        scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=5)

        start_time = time.time()

        for epoch in range(self.epochs):
            print(f"Epoch {epoch+1}\n--------------------------")
            self.train_model(train_loader, loss_fn, optimizer)
            test_loss = self.test_model(test_loader, loss_fn)
            scheduler.step(test_loss)

        end_time = time.time()
        elapsed_time = end_time - start_time
        print(f"Training took {elapsed_time/60:.2f} minutes.")

        self.test_model(test_loader, loss_fn)


In [28]:
input_dim = X_train.shape[1]

model_classical = ClassicalClassifier(input_dim)

print("Training Classical Model with Full Dataset:")
model_classical.train_with_scheduler(train_loader, test_loader)

Training Classical Model with Full Dataset:
Epoch 1
--------------------------
Accuracy: 99.88%
Avg loss: 0.003286 

Epoch 2
--------------------------
Accuracy: 99.88%
Avg loss: 0.003244 

Epoch 3
--------------------------
Accuracy: 99.87%
Avg loss: 0.002997 

Epoch 4
--------------------------
Accuracy: 99.88%
Avg loss: 0.002949 

Epoch 5
--------------------------
Accuracy: 99.88%
Avg loss: 0.003171 

Epoch 6
--------------------------
Accuracy: 99.88%
Avg loss: 0.003048 

Epoch 7
--------------------------
Accuracy: 99.87%
Avg loss: 0.003106 

Epoch 8
--------------------------
Accuracy: 99.86%
Avg loss: 0.003298 

Epoch 9
--------------------------
Accuracy: 99.88%
Avg loss: 0.002998 

Epoch 10
--------------------------
Accuracy: 99.87%
Avg loss: 0.003066 

Epoch 11
--------------------------
Accuracy: 99.88%
Avg loss: 0.002956 

Epoch 12
--------------------------
Accuracy: 99.88%
Avg loss: 0.002959 

Epoch 13
--------------------------
Accuracy: 99.88%
Avg loss: 0.002906 

Epo