In [1]:
# Google Colab specific code for mounting Google Drive
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

# # Define the directory path on your Google Drive
# # Replace 'Your_directory' with the actual directory
# directory = '/content/drive/My Drive/Colab Notebooks/ML4GST/'

# # Now use this directory for reading and writing data
# data_template_filename = directory + "dataset.txt"
# gst_dir = directory + "test_gst_dir"

Mounted at /content/drive


In [2]:
%cd /content/drive/MyDrive/Colab Notebooks/ML4GST_v2

/content/drive/MyDrive/Colab Notebooks/ML4GST_v2


In [3]:
pip install pygsti

Collecting pygsti
  Downloading pyGSTi-0.9.11.2-cp310-cp310-manylinux_2_12_x86_64.manylinux2010_x86_64.whl (17.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m17.2/17.2 MB[0m [31m20.5 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: pygsti
Successfully installed pygsti-0.9.11.2


In [34]:
import pygsti
import pygsti.algorithms.fiducialselection as fidsel
import pygsti.algorithms.germselection as germsel
import numpy as np
import pandas as pd

In [35]:
def easy_PTM_depol_channel(depol_mat):
  PTM_depol = (1-depol_mat)*np.eye(4)
  PTM_depol[0,0] = 1
  return PTM_depol

In [36]:
easy_PTM_depol_channel(0.1)

array([[1. , 0. , 0. , 0. ],
       [0. , 0.9, 0. , 0. ],
       [0. , 0. , 0.9, 0. ],
       [0. , 0. , 0. , 0.9]])

In [37]:
def pauli_matrices():
    """Return the Pauli matrices including identity."""
    I = np.eye(2, dtype=complex)
    X = np.array([[0, 1], [1, 0]], dtype=complex)
    Y = np.array([[0, -1j], [1j, 0]], dtype=complex)
    Z = np.array([[1, 0], [0, -1]], dtype=complex)
    return [I, X, Y, Z]

def compute_ideal_ptm(unitary):
    """Compute the ideal PTM from a given unitary."""
    paulis = pauli_matrices()
    ptm_ideal = np.zeros((4, 4), dtype=complex)

    for i in range(4):
        for j in range(4):
            ptm_ideal[i, j] = 0.5 * np.trace(np.dot(paulis[i], np.dot(unitary, np.dot(paulis[j], np.conjugate(unitary.T)))))
    return ptm_ideal

def general_custom_gate(theta, delta, depol_amt, gate):
  # Parameters
  # theta = np.pi / 2  # Example theta (45 degrees)
  # delta = 0.1  # Over-rotational error in radians
  # depolarizing_error = 0.01  # Depolarizing error rate

  # Calculate PTM for ideal Rx(theta + delta) rotation including the over-rotational error
  unitary_rx_adjusted = np.cos((theta + delta) / 2) * np.eye(2) - 1j * np.sin((theta + delta) / 2) * pauli_matrices()[gate]
  ptm_adjusted_rx = compute_ideal_ptm(unitary_rx_adjusted)

  # Calculate combined PTM with depolarizing error
  ptm = np.dot(easy_PTM_depol_channel(depol_amt), ptm_adjusted_rx)

  return ptm.real

# print('ptm_adjusted_rx: \n', np.round(ptm_adjusted_rx,5))
# print('final ptm: \n', np.round(ptm,5))


In [38]:
np.round(general_custom_gate(theta=np.pi/2, delta=0.0, depol_amt=0.0, gate=1))

array([[ 1.,  0.,  0.,  0.],
       [ 0.,  1.,  0.,  0.],
       [ 0.,  0.,  0., -1.],
       [ 0.,  0.,  1.,  0.]])

In [39]:
# Define the Pauli Transfer Matrices for the gates
# I = np.array([[1, 0, 0, 0], [0, 1, 0, 0], [0, 0, 1, 0], [0, 0, 0, 1]])
# X_pi_4 = np.array([[1, 0, 0, 0], [0, 1, 0, 0], [0, 0, np.cos(np.pi/4), -np.sin(np.pi/4)], [0, 0, np.sin(np.pi/4), np.cos(np.pi/4)]])
# Y_pi_2 = np.array([[1, 0, 0, 0], [0, np.cos(np.pi/2), 0, np.sin(np.pi/2)], [0, 0, 1, 0], [0, -np.sin(np.pi/2), 0, np.cos(np.pi/2)]])

# Create the explicit model
ideal_target_model = pygsti.models.create_explicit_model_from_expressions(
    [('Q0',)], ['Gi', 'Gx', 'Gy'],
    ["I(Q0)", "X(pi/2,Q0)", "Y(pi/2,Q0)"])

class MyXPi2Operator(pygsti.modelmembers.operations.DenseOperator):
    def __init__(self):
        #initialize with no noise
        super(MyXPi2Operator,self).__init__(np.identity(4,'d'), 'pp', "densitymx") # this is *super*-operator, so "densitymx"
        self.from_vector([0, 0.1])

    @property
    def num_params(self):
        return 2 # we have two parameters

    def to_vector(self):
        return np.array([self.depol_amt, self.over_rotation],'d') #our parameter vector

    def from_vector(self, v, close=False, dirty_value=True):
        #initialize from parameter vector v
        self.depol_amt = v[0]
        self.over_rotation = v[1]

        # print(f'depol_amt: {self.depol_amt}, over_rotation: {self.over_rotation}')

        # theta = (np.pi/4 + self.over_rotation)/2
        # a = 1.0-self.depol_amt
        # b = a*2*np.cos(theta)*np.sin(theta)
        # c = a*(np.sin(theta)**2 - np.cos(theta)**2)

        # print(f'a: {a}, b: {b}, c: {c}')

        # ._ptr is a member of DenseOperator and is a numpy array that is
        # the dense Pauli transfer matrix of this operator
        # Technical note: use [:,:] instead of direct assignment so id of self._ptr doesn't change
        # self._ptr[:,:] = np.array([[1,   0,   0,   0],
        #                           [0,   a,   0,   0],
        #                           [0,   0,   c,  -b],
        #                           [0,   0,   b,   c]],'d')
        self._ptr[:,:] = np.array(general_custom_gate(theta=np.pi/2, delta=0.1, depol_amt=0.01, gate=1), 'd')

        general_custom_gate
        self.dirty = dirty_value  # mark that parameter vector may have changed

    def transform(self, S):
        # Update self with inverse(S) * self * S (used in gauge optimization)
        raise NotImplementedError("MyXPi2Operator cannot be transformed!")



class MyYPi2Operator(pygsti.modelmembers.operations.DenseOperator):
    def __init__(self):
        #initialize with no noise
        super(MyYPi2Operator,self).__init__(np.identity(4,'d'), 'pp', "densitymx") # this is *super*-operator, so "densitymx"
        self.from_vector([0, 0.1])

    @property
    def num_params(self):
        return 2 # we have two parameters

    def to_vector(self):
        return np.array([self.depol_amt, self.over_rotation],'d') #our parameter vector

    def from_vector(self, v, close=False, dirty_value=True):
        #initialize from parameter vector v
        self.depol_amt = v[0]
        self.over_rotation = v[1]

        # print(f'depol_amt: {self.depol_amt}, over_rotation: {self.over_rotation}')

        # theta = (np.pi/4 + self.over_rotation)/2
        # a = 1.0-self.depol_amt
        # b = a*2*np.cos(theta)*np.sin(theta)
        # c = a*(np.sin(theta)**2 - np.cos(theta)**2)

        # print(f'a: {a}, b: {b}, c: {c}')

        # ._ptr is a member of DenseOperator and is a numpy array that is
        # the dense Pauli transfer matrix of this operator
        # Technical note: use [:,:] instead of direct assignment so id of self._ptr doesn't change
        # self._ptr[:,:] = np.array([[1,   0,   0,   0],
        #                           [0,   a,   0,   0],
        #                           [0,   0,   c,  -b],
        #                           [0,   0,   b,   c]],'d')
        self._ptr[:,:] = np.array(general_custom_gate(theta=np.pi/2, delta=0.15, depol_amt=0.01, gate=2), 'd')

        general_custom_gate
        self.dirty = dirty_value  # mark that parameter vector may have changed

    def transform(self, S):
        # Update self with inverse(S) * self * S (used in gauge optimization)
        raise NotImplementedError("MyXPi2Operator cannot be transformed!")

import copy
target_model = copy.deepcopy(ideal_target_model)
target_model.operations[('Gx')] = MyXPi2Operator()
target_model.operations[('Gy')] = MyYPi2Operator()
print('target_model: \n', target_model)
print('ideal_target_model: \n', ideal_target_model)



target_model: 
 rho0 = FullState with dimension 4
 0.71   0   0 0.71


Mdefault = UnconstrainedPOVM with effect vectors:
0: FullPOVMEffect with dimension 4
 0.71   0   0 0.71

1: FullPOVMEffect with dimension 4
 0.71   0   0-0.71



Gi = 
FullArbitraryOp with shape (4, 4)
 1.00   0   0   0
   0 1.00   0   0
   0   0 1.00   0
   0   0   0 1.00


Gx = 
MyXPi2Operator with shape (4, 4)
 1.00   0   0   0
   0 0.99   0   0
   0   0-0.10-0.99
   0   0 0.99-0.10


Gy = 
MyYPi2Operator with shape (4, 4)
 1.00   0   0   0
   0-0.15   0 0.98
   0   0 0.99   0
   0-0.98   0-0.15




ideal_target_model: 
 rho0 = FullState with dimension 4
 0.71   0   0 0.71


Mdefault = UnconstrainedPOVM with effect vectors:
0: FullPOVMEffect with dimension 4
 0.71   0   0 0.71

1: FullPOVMEffect with dimension 4
 0.71   0   0-0.71



Gi = 
FullArbitraryOp with shape (4, 4)
 1.00   0   0   0
   0 1.00   0   0
   0   0 1.00   0
   0   0   0 1.00


Gx = 
FullArbitraryOp with shape (4, 4)
 1.00   0   0   0
   0 1.00 

In [40]:
# Automatic selection of fiducials and germs using "laissez-faire" method
prepFiducials, measFiducials = fidsel.find_fiducials(ideal_target_model)
germs = germsel.find_germs(ideal_target_model, seed = 1234)

Initial Length Available Fiducial List: 7
Length Available Fiducial List Dropped Identities and Duplicates: 7
Using GRASP algorithm.
Preparation fiducials:
['{}', 'Gx', 'Gy', 'GxGx']
Score: 31.99999999999997
Measurement fiducials:
['{}', 'Gx', 'Gy']
Score: 9.999999999999996
Initial Length Available Germ List: 196
Length Available Germ List After Deduping: 24
Length Available Germ List After Dropping Random Fraction: 24
Length Available Germ List After Adding Back In Forced Germs: 24
Memory estimate of 0.0 GB for all-Jac mode.
Memory estimate of 0.0 GB for single-Jac mode.
Using greedy algorithm.
Constructed germ set:
['Gi', 'Gx', 'Gy', 'GxGy', 'GiGxGx', 'GiGyGyGx', 'GiGxGyGx', 'GiGyGxGx', 'GxGxGxGy', 'GiGxGyGxGx']
Score: major=-34.0 minor=494007.94737039437, N: 34


In [41]:
print(f'prepFiducials: {prepFiducials} \n measFiducials: {measFiducials} \n germs: {germs}')

prepFiducials: [Circuit({}), Circuit(Gx), Circuit(Gy), Circuit(GxGx)] 
 measFiducials: [Circuit({}), Circuit(Gx), Circuit(Gy)] 
 germs: [Circuit(Gi), Circuit(Gx), Circuit(Gy), Circuit(GxGy), Circuit(GiGxGx), Circuit(GiGyGyGx), Circuit(GiGxGyGx), Circuit(GiGyGxGx), Circuit(GxGxGxGy), Circuit(GiGxGyGxGx)]


In [42]:
# Generate a list of circuits using the long-sequence gate set tomography (LSGST) method
maxLengths = [2**n for n in range(5)]

listOfExperiments = pygsti.circuits.create_lsgst_circuits(
    target_model, prepFiducials, measFiducials, germs, maxLengths)

# Simulate the probability outcomes of these circuits
ds = pygsti.data.simulate_data(target_model, listOfExperiments, num_samples=1000,
                                            sample_error="binomial", seed=1234)
# print(ds)

pygsti.io.write_dataset("Custom_1Q_XYI_dataset_abc.txt", ds, outcome_label_order=['0','1'])

# Convert the probabilities to a DataFrame and save to a CSV file

In [None]:
import math
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Lambda
from tensorflow.keras.layers import Input
from tensorflow.keras.layers import Concatenate
from tensorflow.keras.models import Model
from sklearn.model_selection import train_test_split
from tensorflow.keras.losses import MeanSquaredError
from tensorflow.keras.optimizers import Adam

# Load the sorted data directly from the modified CSV
df_sorted = pd.read_csv('Sorted_Encoded_Padded_Probabilities.csv')

def prepare_data(df_part):
    # Extracting features and labels
    X = df_part['Padded'].apply(lambda x: [int(xi) for xi in x.strip('[]').split()]).to_list()
    y = df_part[['Prob1', 'Prob2']].values

    # Convert to numpy arrays
    X = np.array(X)
    y = np.array(y)

    return X, y

X, y = prepare_data(df_sorted)

# Create new input data
X_new = [X, y]

# Split the original input (X[0]) and target labels (y) into training and test sets
X_train_0, X_test_0, y_train, y_test = train_test_split(X_new[0], y, test_size=0.2, random_state=42)

# Manually combine the split y labels into the X data
X_train = [np.array(X_train_0), np.array(y_train)]
X_test = [np.array(X_test_0), np.array(y_test)]

# Convert y data to numpy arrays
y_train = np.array(y_train)
y_test = np.array(y_test)



In [None]:
X_train[0]

array([[2, 2, 2, ..., 0, 0, 0],
       [2, 1, 0, ..., 0, 0, 0],
       [3, 2, 2, ..., 0, 0, 0],
       ...,
       [2, 3, 2, ..., 0, 0, 0],
       [2, 2, 3, ..., 0, 0, 0],
       [2, 1, 1, ..., 0, 0, 0]])

In [None]:
X_train[0][0]

array([2, 2, 2, 2, 2, 2, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])

In [None]:
X_train[1]

array([[0.49 , 0.51 ],
       [0.492, 0.508],
       [0.526, 0.474],
       [0.227, 0.773],
       [0.   , 1.   ],
       [0.922, 0.078],
       [0.513, 0.487],
       [0.519, 0.481],
       [0.501, 0.499],
       [0.715, 0.285],
       [0.474, 0.526],
       [0.969, 0.031],
       [0.41 , 0.59 ],
       [0.508, 0.492],
       [0.519, 0.481],
       [0.586, 0.414],
       [0.95 , 0.05 ],
       [0.498, 0.502],
       [0.491, 0.509],
       [0.644, 0.356],
       [0.475, 0.525],
       [0.491, 0.509],
       [0.594, 0.406],
       [0.294, 0.706],
       [0.668, 0.332],
       [1.   , 0.   ],
       [0.525, 0.475],
       [0.5  , 0.5  ],
       [0.403, 0.597],
       [0.   , 1.   ],
       [0.501, 0.499],
       [0.418, 0.582],
       [0.511, 0.489],
       [0.323, 0.677],
       [0.   , 1.   ],
       [0.383, 0.617],
       [0.037, 0.963],
       [0.983, 0.017],
       [0.821, 0.179],
       [0.036, 0.964],
       [1.   , 0.   ],
       [0.052, 0.948],
       [0.486, 0.514],
       [0.1

In [None]:
X_train[1][0]

array([0.49, 0.51])

In [None]:
class CustomLearningRateScheduler(tf.keras.optimizers.schedules.LearningRateSchedule):
    def __init__(self, initial_learning_rate, decay_steps, decay_rate):
        super().__init__()
        self.initial_learning_rate = initial_learning_rate
        self.decay_steps = decay_steps
        self.decay_rate = decay_rate

    def __call__(self, step):
        step = tf.cast(step, dtype=tf.float32)
        return self.initial_learning_rate / (1 + self.decay_rate * step / self.decay_steps)

    def get_config(self):
        return {
            "initial_learning_rate": self.initial_learning_rate,
            "decay_steps": self.decay_steps,
            "decay_rate": self.decay_rate
        }


# class CustomLearningRateScheduler(tf.keras.optimizers.schedules.LearningRateSchedule):

#   def __init__(self, initial_learning_rate):
#     self.initial_learning_rate = initial_learning_rate

#   def __call__(self, step):
#      return self.initial_learning_rate / (step + 1)

# Define model
# model = Sequential([
#     Dense(20, activation='relu', input_shape=(len(X_train[0]),)),
#     Dense(128, activation='relu'),
#     Dense(64, activation='relu'),
#     Dense(2, activation='sigmoid'),  # Sigmoid function outputs in the range [0, 1]
# ])
# model = Sequential([
#     Dense(20, activation='relu', input_shape=(len(X_train[0]),)),
#     Dense(128, activation='relu'),
#     Dense(64, activation='relu'),
#     Dense(2, activation='relu'),
# ])

# # Scale model output to range [0, 2pi]
# scaling_layer = Lambda(lambda x: x * 2 * math.pi)
# model.add(scaling_layer)

# Define model architecture
input_X = Input(shape=(len(X_train[0][0]),), name='input_X')
input_y = Input(shape=(len(X_train[1][0]),), name='input_y')

# Original branch
x1 = Dense(20, activation='relu')(input_X)
x1 = Dense(128, activation='relu')(x1)

# New branch
x2 = Dense(2, activation='relu')(input_y)
x2 = Dense(64, activation='relu')(x2)

# Concatenate branches
merged = Concatenate()([x1, x2])
merged = Dense(64, activation='relu')(merged)
merged = Dense(2, activation='relu')(merged)

# Create and compile model
model = Model(inputs=[input_X, input_y], outputs=merged)

# Compile model

initial_learning_rate = 1e-3
decay_steps = 1000
decay_rate = 0.01
lr_schedule = CustomLearningRateScheduler(initial_learning_rate, decay_steps, decay_rate)
# lr_schedule = CustomLearningRateScheduler(initial_learning_rate)
optimizer = Adam(learning_rate=lr_schedule)

model.compile(optimizer=optimizer, loss='mean_squared_error')

# Custom X gate
def custom_X(depol_amt, over_rotation):

  # print(f'depol_amt: {self.depol_amt}, over_rotation: {self.over_rotation}')

  theta = (math.pi/4 + over_rotation)/2
  a = 1.0-depol_amt
  b = a*2*tf.math.cos(theta)*tf.math.sin(theta)
  c = a*(tf.math.sin(theta)**2 - tf.math.cos(theta)**2)

  # print(f'a: {a}, b: {b}, c: {c}')

  # ._ptr is a member of DenseOperator and is a numpy array that is
  # the dense Pauli transfer matrix of this operator
  # Technical note: use [:,:] instead of direct assignment so id of self._ptr doesn't change
  custom_X_arr = tf.convert_to_tensor([[1,   0,   0,   0],
                            [0,   a,   0,   0],
                            [0,   0,   c,  -b],
                            [0,   0,   b,   c]], dtype=tf.float32)

  return custom_X_arr


# Define gate application function
def apply_gate(state, depol_amt, over_rotation, label):
    # Construct arrays using NumPy
    # Define gates in PTM form
    I_np = np.array([[1, 0, 0, 0],
                    [0, 1, 0, 0],
                    [0, 0, 1, 0],
                    [0, 0, 0, 1]], dtype=np.float32)  #Gi

    Y_pi_2_np = np.array([[1, 0, 0, 0],
                          [0, np.cos(np.pi/2), 0, np.sin(np.pi/2)],
                          [0, 0, 1, 0],
                          [0, -np.sin(np.pi/2), 0, np.cos(np.pi/2)]], dtype=np.float32) #Gy

    # Normalized State corresponding to |0⟩ in Pauli basis
    # state_np = np.array([1/np.sqrt(2), 0, 0, 1/np.sqrt(2)], dtype=np.float32)

    # Convert NumPy arrays to TensorFlow tensors
    I = tf.convert_to_tensor(I_np)
    Y_pi_2 = tf.convert_to_tensor(Y_pi_2_np)
    # state = tf.convert_to_tensor(state_np)

    # X_theta = tf.convert_to_tensor([[1, 0, 0, 0], [0, tf.math.cos(theta_value), 0, tf.math.sin(theta_value)],
    #                        [0, 0, 1, 0], [0, -tf.math.sin(theta_value), 0, tf.math.cos(theta_value)]], dtype=tf.float32)  # Gx

    X_theta = custom_X(depol_amt, over_rotation)
    # print('current label: ', label)

    if label == 1:
        return tf.linalg.matmul(X_theta, tf.reshape(state, [-1, 1]))
    elif label == 2:
        return tf.linalg.matmul(Y_pi_2, tf.reshape(state, [-1, 1]))
    elif label == 3:
        return tf.linalg.matmul(I, tf.reshape(state, [-1, 1]))
    else:
        return state  # If label is 0, don't apply any gate


# def apply_gate_sequence(single_gate_sequence):
#     # Initialize state in Pauli basis
#     state = tf.convert_to_tensor([1/np.sqrt(2), 0, 0, 1/np.sqrt(2)], dtype=tf.float32)

#   # Apply each gate in the sequence
#     # print('model(single_gate_sequence[tf.newaxis, :]) ->', model(single_gate_sequence[tf.newaxis, :]))
#     depol_amt, over_rotation = tf.squeeze(model(single_gate_sequence[tf.newaxis, :])) # Predict depolar_error, over_rotation for the current gate sequence
#     # print('theta_value: ', theta_value)
#     # depol_amt = tf.clip_by_value(tf.squeeze(depol_amt), 0, 0.1)
#     # over_rotation = tf.clip_by_value(tf.squeeze(over_rotation), 0, 0.1)
#     # print(f"depol_amt: {depol_amt}, over_rotation: {over_rotation}")
#     # print('squeezed theta_value: ', theta_value)
#     for i in range(tf.shape(single_gate_sequence)[0]):
#       if single_gate_sequence[i] == 0:
#         break
#       # print('tf.shape(single_gate_sequence): ', tf.shape(single_gate_sequence))
#       # print('tf.shape(single_gate_sequence[0]): ', tf.shape(single_gate_sequence)[0])
#       # print('single_gate_sequence[i]: ', single_gate_sequence[i])
#       state = apply_gate(state, depol_amt, over_rotation, single_gate_sequence[i])
#       # print('current state: ', state)

#     return state

def apply_gate_sequence(single_gate_sequence, single_y_label):
    # Initialize state in Pauli basis
    state = tf.convert_to_tensor([1/np.sqrt(2), 0, 0, 1/np.sqrt(2)], dtype=tf.float32)

    # print("Shape of single_gate_sequence:", tf.shape(single_gate_sequence))
    # print("Shape of single_y_label:", tf.shape(single_y_label))
    # print("Shape of model input single_gate_sequence:", tf.shape(single_gate_sequence[tf.newaxis, :]))
    # print("Shape of model input single_gate_sequence:", tf.shape(single_y_label[tf.newaxis, :]))


    # Apply each gate in the sequence
    depol_amt, over_rotation = tf.squeeze(model([single_gate_sequence[tf.newaxis, :], single_y_label[tf.newaxis, :]])) # Predict depolar_error, over_rotation for the current gate sequence

    for i in range(tf.shape(single_gate_sequence)[0]):
        if single_gate_sequence[i] == 0:
            break
        state = apply_gate(state, depol_amt, over_rotation, single_gate_sequence[i])

    return state



def compute_probabilities(ptm_vector):
    # PTM representations for |0> and |1> states
    ptm_0 = tf.constant([1, 0, 0, 1], dtype=tf.float32)
    ptm_1 = tf.constant([1, 0, 0, -1], dtype=tf.float32)
    # ptm_0 = tf.convert_to_tensor([1, 0, 0, 1], dtype=tf.float32)
    # ptm_1 = tf.convert_to_tensor([1, 0, 0, -1], dtype=tf.float32)

    # Normalize the vectors
    ptm_vector = tf.squeeze(tf.linalg.l2_normalize(ptm_vector))
    ptm_0 = tf.linalg.l2_normalize(ptm_0)
    ptm_1 = tf.linalg.l2_normalize(ptm_1)

    # Compute dot products
    prob_0 = tf.tensordot(ptm_vector, ptm_0, axes=1)
    prob_1 = tf.tensordot(ptm_vector, ptm_1, axes=1)

    return tf.stack([prob_0, prob_1])


# Define loss function
loss_fn = MeanSquaredError()

# Define training loop
# def train_step(X, y):
#     with tf.GradientTape() as tape:
#       batched_final_states = []
#       batched_probabilities = []

#       # Process each sequence in the batch individually
#       for i in range(tf.shape(X)[0]):
#           single_sequence = tf.gather(X, i, axis=0)
#           final_state = apply_gate_sequence(single_sequence)
#           probabilities = compute_probabilities(final_state)
#           batched_final_states.append(final_state)
#           # print('batched_final_states: ', batched_final_states)
#           batched_probabilities.append(probabilities)
#           # print('batched_probabilities: ', batched_probabilities)

#       batched_final_states = tf.stack(batched_final_states)
#       # print('batched_final_states: ', batched_final_states)
#       # print('batched_final_states.shape: ', batched_final_states.shape)
#       batched_probabilities = tf.stack(batched_probabilities)
#       # print('batched_probabilities: ', batched_probabilities)
#       # print('batched_probabilities.shape: ', batched_probabilities.shape)


#       loss = loss_fn(y, batched_probabilities)
#       # print('loss: ', loss)

#     grads = tape.gradient(loss, model.trainable_weights)
#     optimizer.apply_gradients(zip(grads, model.trainable_weights))
#     return loss

def train_step(X, y):
    with tf.GradientTape() as tape:
        batched_final_states = []
        batched_probabilities = []

        # print("Shape of X[0]:", tf.shape(X[0]))
        # print("Shape of X[1]:", tf.shape(X[1]))
        # print("Shape of y:", tf.shape(y))

        # Process each sequence in the batch individually
        # print('tf.shape(X[0])[0]:', tf.shape(X[0])[0])
        for i in range(tf.shape(X[0])[0]):
            single_sequence = tf.gather(X[0], i, axis=0)
            single_y_label = tf.gather(X[1], i, axis=0)
            final_state = apply_gate_sequence(single_sequence, single_y_label)
            probabilities = compute_probabilities(final_state)
            batched_final_states.append(final_state)
            batched_probabilities.append(probabilities)

        batched_final_states = tf.stack(batched_final_states)
        batched_probabilities = tf.stack(batched_probabilities)

        loss = loss_fn(y, batched_probabilities)

    grads = tape.gradient(loss, model.trainable_weights)
    optimizer.apply_gradients(zip(grads, model.trainable_weights))
    return loss


# Define validation loop (similar to training loop but without gradients)
# def validate_step(X, y, print_results = False):
#     batched_final_states = []
#     batched_probabilities = []
#     # batched_thetas = []
#     batched_errors = []

#     for i in range(tf.shape(X)[0]):
#         single_sequence = tf.gather(X, i, axis=0)
#         final_state = apply_gate_sequence(single_sequence)
#         probabilities = compute_probabilities(final_state)
#         batched_final_states.append(final_state)
#         batched_probabilities.append(probabilities)
#         if print_results == True:
#             theta_value = tf.squeeze(model(single_sequence[tf.newaxis, :])) # Predict theta for the current gate
#             depol_amt, over_rotation = tf.squeeze(model(single_sequence[tf.newaxis, :])) # Predict depolar_error, over_rotation for the current gate sequence
#             # batched_thetas.append(theta_value)
#             batched_errors.append(tf.stack([depol_amt, over_rotation]))


#     batched_final_states = tf.stack(batched_final_states)
#     batched_probabilities = tf.stack(batched_probabilities)
#     loss = loss_fn(y, batched_probabilities)
#     if print_results == True:
#       # print('batched_thetas: ', batched_thetas)
#       print('batched_errors: ', batched_errors)
#     return loss

def validate_step(X, y, print_results = False):
    batched_final_states = []
    batched_probabilities = []
    batched_errors = []

    for i in range(tf.shape(X[0])[0]):
        single_sequence = tf.gather(X[0], i, axis=0)
        single_y_label = tf.gather(X[1], i, axis=0)
        final_state = apply_gate_sequence(single_sequence, single_y_label)
        probabilities = compute_probabilities(final_state)
        batched_final_states.append(final_state)
        batched_probabilities.append(probabilities)
        if print_results == True:
            depol_amt, over_rotation = tf.squeeze(model([single_sequence[tf.newaxis, :], single_y_label[tf.newaxis, :]])) # Predict depolar_error, over_rotation for the current gate sequence
            batched_errors.append(tf.stack([depol_amt, over_rotation]))

    batched_final_states = tf.stack(batched_final_states)
    batched_probabilities = tf.stack(batched_probabilities)
    loss = loss_fn(y, batched_probabilities)
    if print_results == True:
        print('batched_errors: ', batched_errors)
    return loss


In [None]:
# Add these lines at the top of your code
import os

# Directory to save the model and weights
model_save_dir = "saved_models"
if not os.path.exists(model_save_dir):
    os.makedirs(model_save_dir)

In [None]:
# Training loop
EPOCHS = 100
BATCH_SIZE = 64
num_parts = 3
part_size = len(df_sorted) // num_parts

total_epochs_elapsed = 0  # Counter for total number of epochs elapsed

# Lists to store the mean train and validation losses for each epoch across all parts
all_train_losses = []
all_val_losses = []

for part in range(num_parts):
    # Determine the dataset subset for the current part
    end_idx = (part + 1) * part_size
    X_subset, y_subset = prepare_data(df_sorted.iloc[:end_idx])

    # Convert X_subset and y_subset to a single numpy array
    X_subset = np.array(X_subset)
    X_subset = [X_subset, y_subset]
    y_subset = np.array(y_subset)


    for epoch in range(EPOCHS):
        train_losses_per_epoch = []
        val_losses_per_epoch = []

        for i in range(0, len(X_subset[0]), BATCH_SIZE):
            # print('len of X_subset[0][i:i+BATCH_SIZE]: ', len(X_subset[0][i:i+BATCH_SIZE]))
            X_batch = [X_subset[0][i:i+BATCH_SIZE], X_subset[1][i:i+BATCH_SIZE]]
            y_batch = y_subset[i:i+BATCH_SIZE]

            # Training and validation steps remain unchanged
            train_loss = train_step(X_batch, y_batch)
            val_loss = validate_step(X_test, y_test)
            train_losses_per_epoch.append(train_loss)
            val_losses_per_epoch.append(val_loss)

        mean_train_loss = np.mean(train_losses_per_epoch)
        mean_val_loss = np.mean(val_losses_per_epoch)

        # Store the mean losses for this epoch
        all_train_losses.append(mean_train_loss)
        all_val_losses.append(mean_val_loss)

        total_epochs_elapsed += 1
        print(f"Part: {part+1}/{num_parts}, Epoch: {epoch+1}/{EPOCHS}, Total Epochs: {total_epochs_elapsed}, Train Loss: {mean_train_loss}, Validation Loss: {mean_val_loss}")

        if total_epochs_elapsed % 50 == 0:
          model_path = os.path.join(model_save_dir, f"model_epoch_{total_epochs_elapsed}.h5")
          weights_path = os.path.join(model_save_dir, f"weights_epoch_{total_epochs_elapsed}.h5")
          model.save(model_path)
          model.save_weights(weights_path)

          # model_path = os.path.join(model_save_dir, f"model_epoch_{total_epochs_elapsed}")
          # weights_path = os.path.join(model_save_dir, f"weights_epoch_{total_epochs_elapsed}")

          # # Save model in SavedModel format
          # model.save(model_path, save_format="tf")

          # # Save weights in SavedModel format
          # model.save_weights(weights_path, save_format="tf")

          print(f"Model and weights saved at epoch {total_epochs_elapsed}")

        if mean_train_loss <= 1e-6 or mean_val_loss <= 1e-6:
            print(f"Train Loss: {mean_train_loss} <= 1e-6, skipping to next stage of training")
            break


Part: 1/3, Epoch: 1/100, Total Epochs: 1, Train Loss: 0.0037216488271951675, Validation Loss: 0.007291092537343502
Part: 1/3, Epoch: 2/100, Total Epochs: 2, Train Loss: 0.0036276611499488354, Validation Loss: 0.007020854856818914
Part: 1/3, Epoch: 3/100, Total Epochs: 3, Train Loss: 0.003313925117254257, Validation Loss: 0.0040528541430830956
Part: 1/3, Epoch: 4/100, Total Epochs: 4, Train Loss: 0.001822200370952487, Validation Loss: 0.005419028922915459
Part: 1/3, Epoch: 5/100, Total Epochs: 5, Train Loss: 0.001263399375602603, Validation Loss: 0.0039192866533994675
Part: 1/3, Epoch: 6/100, Total Epochs: 6, Train Loss: 0.000793521641753614, Validation Loss: 0.0038001658394932747
Part: 1/3, Epoch: 7/100, Total Epochs: 7, Train Loss: 0.0007786608766764402, Validation Loss: 0.003943103365600109
Part: 1/3, Epoch: 8/100, Total Epochs: 8, Train Loss: 0.00064138974994421, Validation Loss: 0.0033824550919234753
Part: 1/3, Epoch: 9/100, Total Epochs: 9, Train Loss: 0.0005249122623354197, Valid

In [None]:
model.save_weights('./checkpoints/my_checkpoint')

NameError: ignored

In [None]:
model.save('./saved_model')

In [None]:
model.load_weights('./saved_models/weights_epoch_200.h5')

In [None]:
model = tf.keras.models.load_model('./saved_models/model_epoch_200.h5')

ValueError: ignored

In [None]:
# Plotting training and validation losses
import matplotlib.pyplot as plt

plt.figure(figsize=(12, 6))
plt.plot(all_train_losses, label='Training Loss')
plt.plot(all_val_losses, label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.title('Training and Validation Losses')
plt.show()

NameError: ignored

<Figure size 1200x600 with 0 Axes>

In [None]:
# Convert the recorded losses into a DataFrame
loss_df = pd.DataFrame({
    'Epoch': list(range(1, len(all_train_losses) + 1)),
    'Training_Loss': all_train_losses,
    'Validation_Loss': all_val_losses
})

# Save the DataFrame to a CSV file
loss_df.to_csv('losses.csv', index=False)


In [None]:
# Function to sample part of the training data and evaluate model's performance
def sample_and_evaluate(num_samples=20):
    indices = np.random.choice(len(X_train[0]), size=num_samples, replace=False)
    # indices = [i for i in range(10)]
    sampled_X = [X_train[0][indices], X_train[1][indices]]
    sampled_y = y_train[indices]
    loss = validate_step(sampled_X, sampled_y, print_results = True)
    print(f"Loss on sampled data: {loss.numpy()}")
    return loss.numpy()

In [None]:
X_train[0][0]

array([2, 2, 2, 2, 2, 2, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])

In [None]:
sample_and_evaluate()

batched_errors:  [<tf.Tensor: shape=(2,), dtype=float32, numpy=array([0., 0.], dtype=float32)>, <tf.Tensor: shape=(2,), dtype=float32, numpy=array([0., 0.], dtype=float32)>, <tf.Tensor: shape=(2,), dtype=float32, numpy=array([0., 0.], dtype=float32)>, <tf.Tensor: shape=(2,), dtype=float32, numpy=array([0., 0.], dtype=float32)>, <tf.Tensor: shape=(2,), dtype=float32, numpy=array([0.09584745, 0.        ], dtype=float32)>, <tf.Tensor: shape=(2,), dtype=float32, numpy=array([0., 0.], dtype=float32)>, <tf.Tensor: shape=(2,), dtype=float32, numpy=array([0., 0.], dtype=float32)>, <tf.Tensor: shape=(2,), dtype=float32, numpy=array([0., 0.], dtype=float32)>, <tf.Tensor: shape=(2,), dtype=float32, numpy=array([0., 0.], dtype=float32)>, <tf.Tensor: shape=(2,), dtype=float32, numpy=array([0., 0.], dtype=float32)>, <tf.Tensor: shape=(2,), dtype=float32, numpy=array([0., 0.], dtype=float32)>, <tf.Tensor: shape=(2,), dtype=float32, numpy=array([0., 0.], dtype=float32)>, <tf.Tensor: shape=(2,), dtype=

0.033717684

In [None]:
tf.shape(X_train)[0]

<tf.Tensor: shape=(), dtype=int32, numpy=272>

In [None]:
apply_gate(np.array([1/np.sqrt(2), 0, 0, 1/np.sqrt(2)], dtype=np.float32), 0.5, 0)

current label:  0


array([0.70710677, 0.        , 0.        , 0.70710677], dtype=float32)

In [None]:
test_vector = np.array(
    [[ 0.70710677],
     [-0.7064972 ],
     [ 0.        ],
     [-0.02935636]], dtype=np.float32)

In [None]:
test_vector.shape

(4, 1)

In [None]:
tf.squeeze(test_vector).shape

TensorShape([4])

In [None]:
compute_probabilities(tf.constant([1, 0, 0, 1], dtype=tf.float32))

(<tf.Tensor: shape=(), dtype=float32, numpy=0.99999994>,
 <tf.Tensor: shape=(), dtype=float32, numpy=0.0>)

In [None]:
compute_probabilities(tf.constant([1, 0, 0, -1], dtype=tf.float32))

(<tf.Tensor: shape=(), dtype=float32, numpy=0.0>,
 <tf.Tensor: shape=(), dtype=float32, numpy=0.99999994>)

In [None]:
# for i in range (tf.shape(X_train[:1])[0]):
#   single_sequence = tf.gather(X_train, i, axis=0)
#   print('single_sequence: ', single_sequence)
#   final_state = apply_gate_sequence(single_sequence)
#   print('final_state: ', final_state)
#   probabilities = compute_probabilities(final_state)
#   print('probabilities: ',probabilities )

In [None]:
len(X_train[0])

20

In [None]:
debug = tf.convert_to_tensor([[1, 0, 0, 0], [0, tf.math.cos(0.5), 0, tf.math.sin(0.5)],
                           [0, 0, 1, 0], [0, -tf.math.sin(0.5), 0, tf.math.cos(0.5)]], dtype=tf.float32)

In [None]:
debug

<tf.Tensor: shape=(4, 4), dtype=float32, numpy=
array([[ 1.        ,  0.        ,  0.        ,  0.        ],
       [ 0.        ,  0.87758255,  0.        ,  0.47942555],
       [ 0.        ,  0.        ,  1.        ,  0.        ],
       [ 0.        , -0.47942555,  0.        ,  0.87758255]],
      dtype=float32)>