# Imports

In [1]:
import math
import random
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
import pennylane as qml
import glob

# Pytorch imports
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader

In [2]:
import qiskit
qiskit.__version__

'0.19.2'

In [4]:
from qiskit import Aer
import pennylane_qiskit

# Dataset Loader

In [5]:
class MoleculesLoader(torch.utils.data.Dataset):
    def __init__(self, csv_file: str, transform=None) -> None:
        self.csv = csv_file
        self.atom_dict = {"H" : 1, "C" : 2, "O" : 3, "N" : 4}
        self.transform = transform
        self.df = self.process()
        self._normalize()

    def __len__(self) -> int:
        return len(self.df)

    def __getitem__(self, idx) -> list:
        if torch.is_tensor(idx): idx = idx.tolist()
        return self.df[idx]

    def process(self) -> None:
        tmp_df = self.csv #not actually csv file but array
        df = [None] * len(tmp_df)
        for i in range(len(tmp_df)):
            mol_df = pd.read_csv(tmp_df[i])
            tmp_vec = [None] * len(mol_df)
            for j in range(len(mol_df)):
                data = mol_df.iloc[j]
                atom, x, y, z = data["atom"], data["x"], data["y"], data["z"]
                tmp_vec[j] = (self.atom_dict[atom], x, y, z)
            df[i] = tmp_vec
        return df

    def _normalize(self):
        maxlen = max([len(x) for x in self.df])
        self.size = maxlen
        for i in range(len(self.df)):
            while len(self.df[i]) != maxlen:
                self.df[i].append([0, 0.0, 0.0, 0.0])

In [6]:
csv_list = list()
for filename in glob.glob("C:/Users/Diptanshu/Desktop/ASDRP - Quantum/QGAN/xyz_dataset/Chem/Test/*.csv"):
    csv_list.append(filename)
print(len(csv_list))

30


In [7]:
Loaded = MoleculesLoader(csv_list)

In [8]:
Loaded.df[0]

[(2, -0.0126981359, 1.0858041578, 0.0080009957999999),
 (1, 0.002150416, -0.0060313176, 0.0019761204),
 (1, 1.0117308433, 1.4637511618, 0.0002765748),
 (1, -0.540815069, 1.4475266138, -0.8766437152),
 (1, -0.5238136345000001, 1.4379326443, 0.9063972942),
 [0, 0.0, 0.0, 0.0],
 [0, 0.0, 0.0, 0.0],
 [0, 0.0, 0.0, 0.0],
 [0, 0.0, 0.0, 0.0],
 [0, 0.0, 0.0, 0.0],
 [0, 0.0, 0.0, 0.0],
 [0, 0.0, 0.0, 0.0],
 [0, 0.0, 0.0, 0.0],
 [0, 0.0, 0.0, 0.0]]

In [9]:
Loaded.size

14

# Generator

In [282]:
num_of_atoms = 1
bits_per_atom = 2
bits_per_coord = 6
sign_bit = 1
n_qubits = num_of_atoms*(bits_per_atom+3*(sign_bit+bits_per_coord))

print(f"Total number of qubits per atom: {int(num_of_total_bits/num_of_atoms)}")

q_depth = 1  # Depth of the parameterised quantum circuit

Total number of qubits per atom: 23


In [283]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") #NTS: use GPU runtime
dev = qml.device('default.qubit', wires=n_qubits)

In [284]:
device

device(type='cpu')

Uncomment the comments to add more complexity and/or parametrized gates

In [285]:
@qml.qnode(dev, interface="torch", diff_method="parameter-shift")
def quantum_circuit(noise, weights):
    weights = weights.reshape(q_depth, n_qubits)
    
    # Superposition and Entanglement
    for i in range(n_qubits):
        qml.Hadamard(wires=i)
        qml.CNOT(wires=[i, (i+1) % n_qubits])

    # Initialise latent vectors using noise
    for i in range(n_qubits):
        qml.RY(noise[i], wires=i)

    # Repeated layer
    """
    for i in range(q_depth):
        # Parameterised layer
        for y in range(n_qubits):
            qml.RY(weights[i][y], wires=y)
            # Optional: more parameters
            #qml.RX(weights[i][y], wires=y)
            #qml.RZ(weights[i][y], wires=y)

        # Control Z gates
        for y in range(n_qubits - 1):
            qml.CZ(wires=[y, y + 1])
    """

    # Return measurements on the Z basis
    return [qml.expval(qml.PauliZ(i)) for i in range(n_qubits)]

In [286]:
# Returns a list of 1 and 0 based on probabilities
def partial_measure(noise, weights):
    probs = quantum_circuit(noise, weights)
    bits = ["0" if x < 0 else "1" for x in probs]
    return bits

In [287]:
tmp_noise = torch.rand(Loaded.size, n_qubits, device=device) * math.pi / 2

In [288]:
tmp_noise[0]

tensor([0.0549, 1.5541, 0.5440, 0.7885, 1.4404, 1.2232, 1.5673, 0.4475, 0.5992,
        1.3634, 0.8145, 1.3847, 1.0273, 1.0234, 0.3367, 0.4960, 1.3555, 1.4679,
        0.7767, 0.8303, 0.9031, 1.1445, 1.2700])

In [289]:
genParams = nn.ParameterList([nn.Parameter(torch.rand(q_depth * n_qubits), requires_grad=True)])

In [290]:
genParams[0]

Parameter containing:
tensor([0.4509, 0.4174, 0.9322, 0.5253, 0.4753, 0.9328, 0.8727, 0.0335, 0.5744,
        0.5046, 0.3768, 0.0931, 0.5962, 0.3038, 0.5423, 0.4647, 0.4712, 0.1290,
        0.5731, 0.5711, 0.1924, 0.1600, 0.8737], requires_grad=True)

In [291]:
probs = partial_measure(tmp_noise[0], genParams[0])

In [292]:
str_probs = ""
str_probs.join(probs)

'00011010111100011101101'

In [293]:
class QuantumGenerator(nn.Module):
    """Quantum generator class"""

    def __init__(self, q_delta=1):
        """
        Args:
            q_delta (float, optional): Spread of the random distribution for parameter initialisation.
        """

        super().__init__()

        # List of quantum parameters initialized with random values between 0 and 1 
        # nn.ParameterList = list of nn.Parameters that has random numbers between 0 to 1 in a tensor of size q_depth*n_qubits
        self.q_params = nn.ParameterList([nn.Parameter(q_delta * torch.rand(q_depth * n_qubits), requires_grad=True)])

    def forward(self, x):
        molecules = []
        for params in self.q_params:
            for elem in x:
                q_probs = partial_measure(elem, params) # q_probs is a list of 1's and 0's
                str_probs = "".join(q_probs)
                molecules.append(str_probs)
        return molecules

In [294]:
lrG = 0.3  # Learning rate for the generator
lrD = 0.01  # Learning rate for the discriminator
num_iter = 5  # Number of training iterations

In [296]:
# Trial Only

# tmp_gen = QuantumGenerator().to(device)
# tmp_out = tmp_gen.forward(tmp_noise)
# tmp_out, len(tmp_out)

# Post Generator / Pre Discriminator Processing

In [297]:
class Processing(object):
    def __init__(self, no_atom_bits: int, no_coord_bits: int) -> None:
        self.pi = np.pi
        self.atom_dict = {"00" : "H", "01" : "C", "10" : "O", "11" : "N"}
        self.num_of_atom_bits = no_atom_bits
        self.num_of_coord_bits = no_coord_bits
        self.num_of_total_bits = self.num_of_atom_bits + 3 * self.num_of_coord_bits

    def binaryToDecimal(self, binary):
        binary1 = binary
        decimal, i, n = 0, 0, 0
        while(binary != 0):
            dec = binary % 10
            decimal = decimal + dec * pow(2, i)
            binary = binary//10
            i += 1
        return decimal 
        
    def whichAtom(self, atom: str) -> str:
        try: x = self.atom_dict[atom]
        except KeyError: raise Exception(f"Key: {atom} is not in atom_dict!")
        else: return x 

    def calcDistance(self, coord_dist, num_of_qubits: int, sign) -> float:
        distance = (self.binaryToDecimal(coord_dist))/(2**(num_of_qubits-2))
        if sign == 1:
            distance *= -1
        return distance

    def atomsAndCoordinates(self, generatedVector: str) -> list:
        genVec = generatedVector
        
        interval = bits_per_atom+3*(sign_bit+bits_per_coord)
        max_molecule_length = 5
        
        for char_index in range(0, len(genVec), interval):
            substring = genVec[char_index:char_index+interval]
            
            length = len(substring)

            sign_x = 1 if (int(substring[length-3]) == 1) else 0
            sign_y = 1 if (int(substring[length-2]) == 1) else 0
            sign_z = 1 if (int(substring[length-1]) == 1) else 0

            atom_bit_str = substring[0:bits_per_atom]
            x_coord = int(substring[bits_per_atom:bits_per_atom+bits_per_coord])
            y_coord = int(substring[bits_per_atom+bits_per_coord:bits_per_atom+2*bits_per_coord])
            z_coord = int(substring[bits_per_atom+2*bits_per_coord:bits_per_atom+3*bits_per_coord])
            
            atom = self.whichAtom(atom_bit_str)
            x_dist = self.calcDistance(x_coord, bits_per_coord, sign_x)
            y_dist = self.calcDistance(y_coord, bits_per_coord, sign_y)
            z_dist = self.calcDistance(z_coord, bits_per_coord, sign_z)
            
        return [ord(atom), x_dist, y_dist, z_dist]

In [298]:
## Trial only 

#PRO = Processing(bits_per_atom, bits_per_coord)
#tmp_pro = [PRO.atomsAndCoordinates(out) for out in tmp_out]
#tmp_pro = torch.tensor(tmp_pro)
#tmp_pro

# Discriminator

<ul> 
    <li> Task Deadline: **COMPLETED**  </li> 
    <li> TODO: Check how MolGAN people loaded the molecules ✅   </li> 
    <li> UPDATE: Added options to save and load model   </li> 
    <li> UPDATE: Changed model architecture   </li> 
    <li> FIXED: `TypeError: linear(): argument 'input' (position 1) must be Tensor, not list` </li> 
    <li> PROBLEM: `torch.tensor()` doesn't work on lists with more than one `dtype`   </li> 
    <li> FIXED: ⬆️ use ASCII value of string `(dtype=int)`.   </li> 
    <li> UPDATE: Fixed issues with Doubles and Floats    </li> 
    <li> UPDATE: Add capabilities for dynamic-sized inputs </li> 
</ul>

In [312]:
class Discriminator(nn.Module):
    def __init__(self, data_shape):
        super(Discriminator, self).__init__()
        self.data_shape = data_shape

        self.model = nn.Sequential(
            nn.Linear(int(np.prod(self.data_shape)), 20),
            nn.Tanh(),
            nn.Linear(20, 10),
            nn.Tanh(),
            nn.Linear(10, 1),
        )

    def forward(self, mol):
        validity = self.model(mol.float())
        return validity

    def save(self, path):
        save_dict = {
            'model': self.model.state_dict(),
            'data_shape': self.data_shape,
        }
        torch.save(save_dict, path)
        return

    @staticmethod
    def load(path):
        save_dict = torch.load(path)
        D = Discriminator(save_dict['data_shape'])
        D.model.load_state_dict(save_dict["model"])

        return D

In [313]:
Loaded.size

14

In [314]:
D = Discriminator(data_shape=(1, 4))

In [315]:
D.model

Sequential(
  (0): Linear(in_features=4, out_features=20, bias=True)
  (1): Tanh()
  (2): Linear(in_features=20, out_features=10, bias=True)
  (3): Tanh()
  (4): Linear(in_features=10, out_features=1, bias=True)
)

In [316]:
D.forward(tmp_pro)

tensor([[0.5235],
        [0.5116],
        [0.5370],
        [0.5334],
        [0.5394],
        [0.5300],
        [0.5182],
        [0.5449],
        [0.5119],
        [0.5204],
        [0.5174],
        [0.5211],
        [0.5452],
        [0.5151]], grad_fn=<AddmmBackward0>)

## Training Phase

In [317]:
discriminator = Discriminator(data_shape=(1, 4)).to(device)
generator = QuantumGenerator().to(device)
processor = Processing(bits_per_atom, bits_per_coord)

In [318]:
criterion = nn.BCEWithLogitsLoss()

# Optimisers
optD = optim.Adam(discriminator.parameters(), lr=lrD)
optG = optim.Adam(generator.parameters(), lr=lrG)

batch_size = 14

real_labels = torch.full((batch_size,), 1.0, dtype=torch.float, device=device)
fake_labels = torch.full((batch_size,), 0.0, dtype=torch.float, device=device)

# Fixed noise allows us to visually track the generated MOLECULES throughout training
fixed_noise = torch.rand(Loaded.size, n_qubits, device=device) * math.pi / 2

# Collect generator's molecules
results = []

In [319]:
print(real_labels,'\n',fake_labels)

tensor([1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.]) 
 tensor([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.])


In [320]:
fixed_noise.shape

torch.Size([14, 23])

In [321]:
#x = generator.forward(fixed_noise)

In [322]:
#x

In [323]:
num_iter = 4
counter = 0

In [328]:
num_of_samples = len(Loaded)

for sample in range(num_of_samples):
    data = Loaded.df[sample]
    data = torch.tensor(data, dtype=torch.double)
    
    # Data from molecule dataset for training the discriminator
    real_data = data.to(device)

    # Noise follwing a uniform distribution in range [0,pi/2)
    noise = torch.rand(Loaded.size, n_qubits, device=device) * math.pi / 2
    fake_data = generator(noise)
    fake_data = [processor.atomsAndCoordinates(out) for out in fake_data]
    fake_data = torch.tensor(fake_data).to(device)
    results += fake_data

    # Training the discriminator
    discriminator.zero_grad()
    outD_real = discriminator(real_data).view(-1)
    outD_fake = discriminator(fake_data).view(-1)
    #outD_fake = [abs(breh) for breh in outD_fake]
    #outD_real = [abs(breh) for breh in outD_real]

    errD_real = criterion(torch.tensor(outD_real).to(device), real_labels)
    errD_fake = criterion(torch.tensor(outD_fake).to(device), fake_labels)

    #for j in range(5):
        #print(f"outD_real is {torch.tensor([outD_real[j]])}\n outD_fake is {torch.tensor([outD_fake[j]])}")
    #    errD_real += criterion(torch.tensor([abs(outD_real[j])]).to(device), real_labels)
    #    errD_fake += criterion(torch.tensor([abs(outD_fake[j])]).to(device), fake_labels)
    
    errD_fake /= batch_size
    errD_real /= batch_size

    # Propagate gradients
    #errD_real.backward()
    #errD_fake.backward()

    errD = errD_real + errD_fake
    optD.step()

    # Training the generator
    generator.zero_grad()
    outD_fake = discriminator(fake_data).view(-1)
    #outD_fake = [abs(breh) for breh in outD_fake]
    errG = criterion(torch.tensor(outD_fake).to(device), real_labels)
    errG /= batch_size
    #errG.backward()
    optG.step()

    counter += 1

    # Show loss values
    if counter % 1 == 0:
        print(f'Iteration: {counter}, Discriminator Loss: {errD:0.6f}, Generator Loss: {errG:0.6f}')
        #random print statements because yes

    if counter == num_iter:
        break

  errD_real = criterion(torch.tensor(outD_real).to(device), real_labels)
  errD_fake = criterion(torch.tensor(outD_fake).to(device), fake_labels)
  errG = criterion(torch.tensor(outD_fake).to(device), real_labels)


Iteration: 1, Discriminator Loss: 0.103710, Generator Loss: 0.044771
Iteration: 2, Discriminator Loss: 0.104683, Generator Loss: 0.044497
Iteration: 3, Discriminator Loss: 0.103958, Generator Loss: 0.044187
Iteration: 4, Discriminator Loss: 0.103982, Generator Loss: 0.044793


In [404]:
molecules = []
for idx in range(len(results)):
    molecules.append(results[idx].tolist())

In [405]:
np_arr = np.array(molecules)

df = pd.DataFrame(np_arr, columns = ['atom','x','y','z'])

In [406]:
for idx in range(len(df)):
    #print(df['atom'][idx])
    df['atom'][idx] = chr(int(df['atom'][idx]))

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['atom'][idx] = chr(int(df['atom'][idx]))


In [408]:
df.to_csv('fake_mol.csv')