In [1]:
#pip install pennylane

# Imports

In [2]:
import math
import random
import numpy as np
import pandas as pd
from pandas.core.dtypes.dtypes import CategoricalDtype
import sklearn
from sklearn.datasets import make_circles, make_s_curve, make_swiss_roll
from sklearn.pipeline import make_pipeline
from sklearn.compose import ColumnTransformer
from sklearn.compose import make_column_selector as selector
from sklearn.preprocessing import MinMaxScaler, PowerTransformer,\
                            QuantileTransformer, KBinsDiscretizer
from sklearn.cluster import KMeans
from sklearn.linear_model import LogisticRegression
import pennylane as qml
from pennylane import numpy as pnp
import plotly.express as px
from plotly.subplots import make_subplots
import plotly.graph_objects as go

# Pytorch imports
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader

# Set the random seed for reproducibility
seed = 42
torch.manual_seed(seed)
np.random.seed(seed)
random.seed(seed)

In [3]:
# Enable CUDA device if available
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

#Load data

In [4]:
"""
Circles dataset: 2d binary classification dataset that is
challenging to certain algorithms (e.g. centroid-based
clustering or linear classification).
"""
X, y = make_circles(
    n_samples=1000,
    noise=0.01,
    random_state=seed
    )
df = pd.DataFrame(
    X,
    columns=['v1','v2']).merge(
        pd.DataFrame(y, columns=['y']),
        left_index=True,
        right_index=True
)
df['y'] = df['y'].astype('category')

In [5]:
"""
Swiss roll dataset. Ref: S. Marsland, “Machine Learning:
An Algorithmic Perspective”, 2nd edition, Chapter 6, 2014.
"""
"""X, y = make_swiss_roll(
    n_samples=1500,
    noise=0.01,
    random_state=seed
    )
df = pd.DataFrame(
    X,
    columns=['v1','v2','v3']).merge(
        pd.DataFrame(y, columns=['y']),
        left_index=True,
        right_index=True
)"""

"X, y = make_swiss_roll(\n    n_samples=1500,\n    noise=0.01,\n    random_state=seed\n    )\ndf = pd.DataFrame(\n    X,\n    columns=['v1','v2','v3']).merge(\n        pd.DataFrame(y, columns=['y']),\n        left_index=True,\n        right_index=True\n)"

## Input data transformation

In [6]:
def columns_type(df: pd.DataFrame):
    """
    Args:
        df(pd.DataFrame):
            input data

    Returns:
        two lists with numerical and
        categorical column names.
    """
    numerical_columns_selector = selector(
        dtype_exclude=CategoricalDtype
        )
    categorical_columns_selector = selector(
        dtype_include=CategoricalDtype
        )
    numerical_columns = numerical_columns_selector(df)
    categorical_columns = categorical_columns_selector(df)
    return numerical_columns, categorical_columns


def transform_input_data(
    df: pd.DataFrame,
    numerical_columns: list,
    categorical_columns: list,
    ):
    """
    Args:
        df(pd.DataFrame):
            input data
        numerical_columns:
            list with numerical columns names
        categorical_columns:
            list with categorical columns name

    Returns:
        prepared dataframe for input"""
    categorical_scaler = make_pipeline(
        MinMaxScaler(
            feature_range=(0, 1),
            )
        )
    numerical_scaler = make_pipeline(
        #QuantileTransformer(
        #    n_quantiles=100,
        #    output_distribution='normal',
        #   ),
        MinMaxScaler(
            feature_range=(0, 1),
            )
        )
    scaled_X = df.copy()
    for cat_c in categorical_columns:
      scaled_X[cat_c] = categorical_scaler.fit_transform(
          np.array(df[cat_c]).reshape(-1, 1)
          ).flatten()

    for num_c in numerical_columns:
      scaled_X[num_c] = numerical_scaler.fit_transform(
          np.array(df[num_c]).reshape(-1, 1)
          ).flatten()

    return np.array(scaled_X),\
                    categorical_scaler, numerical_scaler


def transform_synthetic_data(
    df: pd.DataFrame,
    x_synthetic: np.array,
    categorical_columns: list,
    numerical_columns: list,
    categorical_scaler: sklearn.pipeline.Pipeline,
    numerical_scaler: sklearn.pipeline.Pipeline
    ):
    """
    Args:
        df(pd.DataFrame):
            input data
        x_synthetic(np.array):
            data generated by GAN network
        categorical_columns(list):
            list with categorical columns name
        numerical_columns:(list)
                list with numerical columns names
        categorical_scaler(sklearn.pipeline.Pipeline):
                scikit learn transfomed used for
                input data preparation for categorical
                features
        numerical_scaler(sklearn.pipeline.Pipeline):
                scikit learn transfomed used for
                input data preparation for numerical
                features
    Returns:
        Synthetic dataframe (pd.DataFrame)"""

    newdf = pd.DataFrame(
        x_synthetic,
        columns=df.columns
        )
    for cat_c in categorical_columns:
      if np.unique(df[cat_c]).shape[0] > 1:
        newdf[cat_c] = categorical_scaler.inverse_transform(
            np.array(
                newdf[cat_c]).reshape(-1,1)
                )
        kbins = KBinsDiscretizer(
              n_bins=np.unique(df[cat_c]).shape[0],
              encode='ordinal',
              strategy='uniform'
              )
        newdf[cat_c] = kbins.fit_transform(
            np.array(newdf[cat_c]).reshape(-1,1)
            ).astype(int)
        newdf[cat_c]= newdf[cat_c].astype('category')
      else:
        pass

      if np.unique(df[cat_c]).shape[0] == 2:
        newdf[cat_c].replace(
            [
                np.unique(newdf[cat_c])[0],
                np.unique(newdf[cat_c])[1]
             ],
            [
                np.unique(df[cat_c])[0],
                np.unique(df[cat_c])[1]
             ],
            inplace=True
            )
      else:
        pass

    for num_c in numerical_columns:
      newdf[num_c] = numerical_scaler.inverse_transform(
          np.array(
              newdf[num_c]).reshape(-1,1)
              ).flatten().astype('float64')
      mima_scaler = MinMaxScaler(
                feature_range=(
                    df[num_c].min(),
                    df[num_c].max()
                    )
                )
      newdf[num_c] = mima_scaler.fit_transform(
              np.array(
                  newdf[num_c]).reshape(-1, 1)
                  ).flatten().astype('float64')

    return newdf

In [7]:
# Dataset class
class ArrayDataset(Dataset):
    """PyTorch dataset for an array of data"""

    def __init__(self, data_array, transform=None):
        """
        Args:
            data_array (numpy.ndarray): Array with the data.
            transform (callable, optional): Optional transform to be applied on a sample.
        """
        self.data_array = data_array
        self.transform = transform

    def __len__(self):
        return len(self.data_array)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        sample = self.data_array[idx]
        if self.transform:
            sample = self.transform(sample)

        return sample, 0


# Custom transform to convert numpy array to tensor
class ToTensorTransform:
    def __call__(self, sample):
        return torch.tensor(sample, dtype=torch.float32)

In [8]:
# Identify numerical and categorical columns
numerical_columns, categorical_columns = columns_type(df)

# Transform input data
transformed_data, categorical_scaler, numerical_scaler = transform_input_data(
    df, numerical_columns, categorical_columns
)

# Copy the transformed data
data = transformed_data.copy()
# Set batch size
batch_size = 1

# Create a transformation object
transform = ToTensorTransform()

# Create a dataset from the data array with the specified transformation
dataset = ArrayDataset(data_array=data, transform=transform)

# Create a data loader with the specified batch size, shuffle, and drop_last options
dataloader = DataLoader(
    dataset, batch_size=batch_size, shuffle=True, drop_last=True
    )

# Tabular Quantum GAN

## Discriminator

In [9]:
# Discriminator
class Discriminator(nn.Module):
    """Fully connected classical discriminator"""

    def __init__(self, input_dim):
        super().__init__()

        self.model = nn.Sequential(
            nn.Linear(input_dim, 512),
            nn.ReLU(),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Linear(128, 1),
            nn.Sigmoid(),
        )

    def forward(self, x):
        return self.model(x)


## Quantum circuit

In [10]:
# Quantum variables
n_qubits = 5  # Total number of qubits
n_a_qubits = 1  # Number of ancillary qubits
q_depth = 12  # Depth of the parameterised quantum circuit
n_generators = 4  # Number of subgenerators for the patch method

# Quantum simulator
dev = qml.device("lightning.qubit", wires=n_qubits)


# Quantum circuit
@qml.qnode(dev, diff_method="parameter-shift")
def quantum_circuit(noise, weights):

    weights = weights.reshape(q_depth, n_qubits)

    # Initialise latent vectors
    for i in range(n_qubits):
        qml.RY(noise[i], wires=i)

    # Repeated layer
    for i in range(q_depth):
        # Parametrized layer
        for y in range(n_qubits):
            qml.RY(weights[i][y], wires=y)
            #qml.RX(weights[i][y], wires=y)
            #qml.RZ(weights[i][y], wires=y)

        # Control Z gates
        for y in range(n_qubits - 1):
            qml.CZ(wires=[y, y + 1])

        # Include an additional entangling gate (CNOT)
        #qml.CNOT(wires=[n_qubits - 1, 0])

    return qml.probs(wires=list(range(n_qubits)))


def partial_measure(noise, weights):
    probs = quantum_circuit(noise, weights)
    probsgiven0 = probs[: (2 ** (n_qubits - n_a_qubits))]
    probsgiven0 /= torch.sum(probs)
    probsgiven = probsgiven0 / torch.max(probsgiven0)
    return probsgiven


## Patch Quantum Generator

In [11]:
# Quantum generator
class PatchQuantumGenerator(nn.Module):
    """Quantum generator class for the patch method"""

    def __init__(self, n_generators, output_dim, q_delta=1):
        super().__init__()

        self.q_params = nn.ParameterList(
            [nn.Parameter(q_delta * torch.rand(q_depth * n_qubits), requires_grad=True) for _ in range(n_generators)]
        )
        self.n_generators = n_generators
        self.output_dim = output_dim

    def forward(self, x):
        patch_size = 2 ** (n_qubits - n_a_qubits)
        total_patches = (self.output_dim + patch_size - 1) // patch_size  # Total number of patches needed
        fake = torch.Tensor(x.size(0), 0).to(device)

        for params in self.q_params:
            patches = torch.Tensor(0, patch_size).to(device)

            for elem in x:
                q_out = partial_measure(elem, params).float().unsqueeze(0)
                patches = torch.cat((patches, q_out))

            fake = torch.cat((fake, patches), 1)

        fake = fake[:, :self.output_dim]  # Ensure the output dimension matches exactly
        return fake

## Train GAN

In [12]:
# Generate the same number of samples than input data
input_dim = data.shape[1]

lrG = 0.0001
lrD = 0.0001
num_iter = 300

discriminator = Discriminator(input_dim).to(device)
generator = PatchQuantumGenerator(n_generators, input_dim).to(device)

criterion = nn.BCELoss()
optD = optim.Adam(discriminator.parameters(), lr=lrD)
optG = optim.Adam(generator.parameters(), lr=lrG)

real_labels = torch.full((batch_size,), 1.0, dtype=torch.float, device=device)
fake_labels = torch.full((batch_size,), 0.0, dtype=torch.float, device=device)

counter = 0

while True:
    for i, (data, _) in enumerate(dataloader):
        real_data = data.to(device)
        noise = torch.rand(batch_size, n_qubits, device=device) * math.pi / 2
        fake_data = generator(noise)

        discriminator.zero_grad()
        outD_real = discriminator(real_data).view(-1)
        outD_fake = discriminator(fake_data.detach()).view(-1)

        errD_real = criterion(outD_real, real_labels)
        errD_fake = criterion(outD_fake, fake_labels)
        errD_real.backward()
        errD_fake.backward()
        errD = (errD_real + errD_fake)/2
        optD.step()

        generator.zero_grad()
        outD_fake = discriminator(fake_data).view(-1)
        errG = criterion(outD_fake, real_labels)
        errG.backward()
        optG.step()

        counter += 1

        if counter % 20 == 0:
            print(f'Iteration: {counter}, Discriminator Loss: {errD:0.3f}, Generator Loss: {errG:0.3f}')

        if counter == num_iter:
            break

    if counter == num_iter:
        break

Iteration: 20, Discriminator Loss: 0.682, Generator Loss: 0.665
Iteration: 40, Discriminator Loss: 0.623, Generator Loss: 0.755
Iteration: 60, Discriminator Loss: 0.737, Generator Loss: 0.621
Iteration: 80, Discriminator Loss: 0.575, Generator Loss: 0.791
Iteration: 100, Discriminator Loss: 0.646, Generator Loss: 0.587
Iteration: 120, Discriminator Loss: 0.554, Generator Loss: 0.806
Iteration: 140, Discriminator Loss: 0.486, Generator Loss: 0.826
Iteration: 160, Discriminator Loss: 0.372, Generator Loss: 1.078
Iteration: 180, Discriminator Loss: 0.372, Generator Loss: 1.096
Iteration: 200, Discriminator Loss: 0.793, Generator Loss: 0.664
Iteration: 220, Discriminator Loss: 0.380, Generator Loss: 0.960
Iteration: 240, Discriminator Loss: 0.389, Generator Loss: 1.018
Iteration: 260, Discriminator Loss: 0.675, Generator Loss: 1.227
Iteration: 280, Discriminator Loss: 0.358, Generator Loss: 0.968
Iteration: 300, Discriminator Loss: 0.561, Generator Loss: 1.136


## Synthetic data

In [13]:
num_synthetic_samples = transformed_data.shape[0]
noise_for_synthetic = torch.rand(num_synthetic_samples, n_qubits, device=device) * math.pi / 2
synthetic_data = generator(noise_for_synthetic).detach().cpu().numpy()
df_synthetic = transform_synthetic_data(
    df=df,
    x_synthetic=synthetic_data,
    categorical_columns=categorical_columns,
    numerical_columns=numerical_columns,
    categorical_scaler=categorical_scaler,
    numerical_scaler=numerical_scaler
    )

# Plot results

In [14]:
fig = make_subplots(
    rows=1,
    cols=2,
    subplot_titles=("Original data", "Synthetic data"))

fig.add_trace(
    go.Scatter(x=df['v1'],
               y=df['v2'],
               marker_color = df['y'],
               mode='markers'),
    row=1, col=1
)

fig.update_xaxes(range=[-1.2, 1.2], row=1, col=1)
fig.update_yaxes(range=[-1.2, 1.2], row=1, col=1)
fig.update_xaxes(range=[-1.2, 1.2], row=1, col=2)
fig.update_yaxes(range=[-1.2, 1.2], row=1, col=2)

fig.add_trace(
    go.Scatter(x=df_synthetic['v1'],
               y=df_synthetic['v2'],
               marker_color = df_synthetic['y'],
               mode='markers'),
    row=1, col=2
)


fig.update_layout(height=600, width=800)
fig.show()

In [18]:
# For swiss roll problem
"""fig = go.Figure(data=[go.Scatter3d(
    x=df['v1'],
    y=df['v2'],
    z=df['v3'],
    mode='markers',
    marker_color=df['y'],
    marker_size=2)]
                )
fig.show()"""

"fig = go.Figure(data=[go.Scatter3d(\n    x=df['v1'], \n    y=df['v2'], \n    z=df['v3'],\n    mode='markers',\n    marker_color=df['y'],\n    marker_size=2)]\n                )\nfig.show()"

In [19]:
# For swiss roll problem
"""fig = go.Figure(data=[go.Scatter3d(
    x=df_synthetic['v1'],
    y=df_synthetic['v2'],
    z=df_synthetic['v3'],
    mode='markers',
    marker_color=df_synthetic['y'],
    marker_size=2)])
fig.show()"""

"fig = go.Figure(data=[go.Scatter3d(\n    x=df_synthetic['v1'], \n    y=df_synthetic['v2'], \n    z=df_synthetic['v3'],\n    mode='markers',\n    marker_color=df_synthetic['y'],\n    marker_size=2)])\nfig.show()"

# Metrics

In [16]:
"""Cluster Analysis Measure (Woo et al. 2009)
Largevalues of Uc indicate disparities in the cluster
memberships, which in turn suggest differencesin the
distributions of the original and masked data.
References:
Woo M.-J., Reiter J. P., Oganian A., Karr A. F. Global
Measures of Data Utility for Microdata Masked for Disclosure
Limitation. J. Priv Confidentiality. 2009;1(1):111–24."""

def cluster_measure(original, synthetic):
  X = original.copy()
  Y = synthetic.copy()
  X['Synthetic'] = 0
  Y['Synthetic'] = 1
  #Stochastic sampling
  data_cluster = pd.concat([X, Y.sample(len(X))])
  kmeans = KMeans(
      n_clusters=2,
      n_init='auto',
      random_state=0
      ).fit(data_cluster)
  data_cluster['Cluster'] = kmeans.predict(
      data_cluster
      )
  r = 0
  for i in [0,1]:
    cluster = data_cluster[
        data_cluster['Cluster'] == i
        ]
    original = cluster[
        cluster['Synthetic'] == 0
        ]
    lo = len(original)
    lc = len(cluster)
    if lo > 0 and lc-lo > 0:
      r += np.square(lo/(lc-lo) - 0.5)
  Uc = np.log10(r/2)
  return Uc

"""Propensity score method
Propensity score mean-squared error pMSE (Woo et al. 2009). If
a ratio score of 0 implies the two datasets are identical. A
score of 1 implies that are totally different.

References:
Woo M.-J., Reiter J. P., Oganian A., Karr A. F. Global
Measures of Data Utility for Microdata Masked for Disclosure
Limitation. J Priv Confidentiality. 2009;1(1):111–24."""

def pmse(original, synthetic):
  X = original.copy()
  Y = synthetic.copy()
  X['Synthetic'] = 0
  Y['Synthetic'] = 1
  data = pd.concat([X, Y.sample(len(X))])
  X_f = data.drop(columns=['Synthetic'])
  y_f = data['Synthetic']
  clf = LogisticRegression(
      max_iter=1000,
      random_state=0
      ).fit(X_f, y_f)
  pMSE = np.sum(
      np.square(
          clf.predict_proba(X_f)[0:,0] - 0.5
          )
      )/len(data)
  X = X.drop(columns=['Synthetic'])
  Y = Y.drop(columns=['Synthetic'])
  return pMSE

In [17]:
print(f"""
Synthetic - Cluster measure = {cluster_measure(df, df_synthetic):.2f}
Synthetic - Propensity score mean-squared error pMSE = {pmse(df, df_synthetic):.4f}""")
print(f'Synthetic data similarity = {100*(1-pmse(df, df_synthetic)):.2f}%')


Synthetic - Cluster measure = -0.11
Synthetic - Propensity score mean-squared error pMSE = 0.0097
Synthetic data similarity = 99.03%


In [20]:
diff = df.corr() - df_synthetic.corr()
px.imshow(
    abs(diff),
    color_continuous_scale='reds',
    zmin=0, zmax=1)