# Model.ipynb 
A proof of use for Homomorphic Encryption. 
Code was created over several weeks of trial and error, different libraries and implementations. Began with sklearn, moved to torch. For FHE I ended up on Tenseal but I did have previous iterations with Conrete-ml by Zama AI and Phailiar cryptosystems.

# Problems
Specifically with the TenSEAL Implementation, which I decided to limit this capstone to along with the CKKS scheme. First it was the encrypted data was cast as a CKKS.Vector. The current release for most ML libraries do not have models that can use data in the 'scheme'.vector form. Was forced to write sections which would be predefined

In [17]:
import torch
import tenseal as ts
import pandas as pd
import random
from time import time
import numpy as np
import matplotlib.pyplot as plt
import sklearn
from sklearn.model_selection import train_test_split

In [None]:
# Set the random seeds for reproducibility
torch.random.manual_seed(13)
random.seed(13)

# Create indicies list, shuffles indices at random rate, and splits data into training and testing data
# meant to increase the randomness of the data
def split_train_test(x, y, test_ratio=0.2):
    idcs = [i for i in range(len(x))]
    random.shuffle(idcs)
    # delimiter between test and train data
    delim = int(len(x) * test_ratio)
    test_idcs, train_idcs = idcs[:delim], idcs[delim:]
    return x[train_idcs], y[train_idcs], x[test_idcs], y[test_idcs]

#*******************************************************************
# implementing sklearn's train_test_split
# Does only the splitting of the data
# Original split_train_test function has syntehtic testing capabilities. 
#
#    def split_train_test(x, y, test_ratio=0.2):
#       sklearn.utils.shuffle(x, y)
#       x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=test_ratio)
#       return x_train, y_train, x_test, y_test
#********************************************************************

# Load the data
# The data is a credit card fraud dataset, where the goal is to predict whether a transaction is fraudulent or not
# The dataset is highly imbalanced, with only 0.17% of the transactions being fraudulent
def Credit_data():
    data = pd.read_csv("payment_fraud.csv")
    # drop some features
    data = data.drop(columns=["paymentMethod"])
    # balance data
    grouped = data.groupby('label')
    data = grouped.apply(lambda x: x.sample(grouped.size().min(), random_state=13).reset_index(drop=True))
    # extract labels
    y = torch.tensor(data["label"].values).float().unsqueeze(1)
    data = data.drop(columns="label")
    # standardize data
    data = (data - data.mean()) / data.std()
    x = torch.tensor(data.values).float()
    return split_train_test(x, y)

# Generate random data to be used for synthetic data for testing
def random_data(m=1024, n=2):
    # data separable by the line `y = x`
    x_train = torch.randn(m, n)
    x_test = torch.randn(m // 2, n)
    y_train = (x_train[:, 0] >= x_train[:, 1]).float().unsqueeze(0).t()
    y_test = (x_test[:, 0] >= x_test[:, 1]).float().unsqueeze(0).t()
    return x_train, y_train, x_test, y_test
x_train, y_train, x_test, y_test = Credit_data()

print("############# Data summary #############")
print(f"x_train has shape: {x_train.shape}")
print(f"y_train has shape: {y_train.shape}")
print(f"x_test has shape: {x_test.shape}")
print(f"y_test has shape: {y_test.shape}")
print("#######################################")


############# Data summary #############
x_train has shape: torch.Size([896, 4])
y_train has shape: torch.Size([896, 1])
x_test has shape: torch.Size([224, 4])
y_test has shape: torch.Size([224, 1])
#######################################


  data = grouped.apply(lambda x: x.sample(grouped.size().min(), random_state=13).reset_index(drop=True))


In [None]:
# Deining the Logistic Regression torch NN model.
class NE_LR(torch.nn.Module):
    # n_features is the number of features in the input data    
    def __init__(self, n_features):
        super(NE_LR, self).__init__()
        # the linear layer is the logistic regression model
        # it takes n_features inputs and outputs 1 value
        self.lr = torch.nn.Linear(n_features, 1)
    
    # pass data through the model and apply sigmoid activation
    def forward(self, x):
        output = torch.sigmoid(self.lr(x))
        return output

# Define the model, optimizer and loss function
# Unencrypted training
n_features = x_train.shape[1]
model = NE_LR(n_features)
# use gradient descent with a learning_rate=1
optim = torch.optim.SGD(model.parameters(), lr=1)
# use Binary Cross Entropy Loss
# BCELoss is the loss function used for binary classification
criterion = torch.nn.BCELoss()

# train the model for 5 epochs
EPOCHS = 5
# creating timimng list to store the time taken for each epoch
times = []
def train(model, optim, criterion, x, y, epochs=EPOCHS):
    for e in range(1, epochs + 1):
        start = time()
        # set the gradients to zero
        optim.zero_grad()
        # pass the data through the model
        output = model(x)
        # calculate the loss
        loss = criterion(output, y)
        loss.backward()
        # update the weights
        optim.step()
        end = time()
        # loss is printed at each epoch
        print(f"Loss at epoch {e}: {loss.data:.4f}")
        times.append(end - start)
    return model

# Evaluate the model
model = train(model, optim, criterion, x_train, y_train)
#Calculating the accuracy of the model
def accuracy(model, x, y):
    out = model(x)
    correct = torch.abs(y - out) < 0.5
    return correct.float().mean()

print(f"\nAverage time per epoch: {int(sum(times) / len(times))} seconds")

NE_accuracy = accuracy(model, x_test, y_test)
print(f"Non-Encrypted Accuracy: {NE_accuracy:.4f}")

Loss at epoch 1: 0.6125
Loss at epoch 2: 0.5438
Loss at epoch 3: 0.5030
Loss at epoch 4: 0.4783
Loss at epoch 5: 0.4623

Average time per epoch: 0 seconds
Non-Encrypted Accuracy: 0.8036


In [None]:

class EncryptedLR:
    # Encrypted Logistic Regression model    
    def __init__(self, torch_lr):
        # extract the weights and bias from the torch model
        self.weight = torch_lr.lr.weight.data.tolist()[0]
        # extract the bias from the torch model
        self.bias = torch_lr.lr.bias.data.tolist()
        #initialize the gradient accumulators and iterations count
        self._delta_w = 0
        self._delta_b = 0
        self._count = 0
    
    #Forward pass
    def forward(self, enc_x):
        enc_out = enc_x.dot(self.weight) + self.bias
        #Calculates linear combination of input and weight, adds bias
        enc_out = EncryptedLR.sigmoid(enc_out)
        #Applies sigmoid function
        return enc_out
    
    #Backward pass
    #Calculates the gradient of the loss w.r.t the weights and bias
    def backward(self, enc_x, enc_out, enc_y):
        out_minus_y = (enc_out - enc_y)
        #Calculates the difference between the predicted value and the true value
        self._delta_w += enc_x * out_minus_y
        #Calculates the gradient of the loss w.r.t the weights
        self._delta_b += out_minus_y
        #Calculates the gradient of the loss w.r.t the bias
        self._count += 1
        #Increment the iteration count
        
    #Update the weights and bias
    def update_parameters(self):
        if self._count == 0:
            raise RuntimeError("You should at least run one forward iteration")
        # update weights
        # We use a small regularization term to keep the output
        # of the linear layer in the range of the sigmoid approximation
        self.weight -= self._delta_w * (1 / self._count) + self.weight * 0.05
        self.bias -= self._delta_b * (1 / self._count)
        # reset gradient accumulators and iterations count
        self._delta_w = 0
        self._delta_b = 0
        self._count = 0
        
    @staticmethod
    def sigmoid(enc_x):
        # sigmoid = 0.5 + 0.197 * x - 0.004 * x^3
        # this is a degree 3 polynomial approximation of the sigmoid function
        # it's used to keep the output of the linear layer in the range of the sigmoid approximation
        return enc_x.polyval([0.5, 0.197, 0, -0.004])
    
    def plain_accuracy(self, x_test, y_test):
    #Calculates the accuracy of the model on non-encrypted data
        # convert the weights and bias to torch tensors
        w = torch.tensor(self.weight)
        b = torch.tensor(self.bias)
        # pass the data through the linear layer
        out = torch.sigmoid(x_test.matmul(w) + b).reshape(-1, 1)
        # calculate the accuracy
        correct = torch.abs(y_test - out) < 0.5
        return correct.float().mean()    
    
    def encrypt(self, context):
    #Encrypts the weights and bias
        self.weight = ts.ckks_vector(context, self.weight)
        self.bias = ts.ckks_vector(context, self.bias)
        
    def decrypt(self):
    #Decrypts the weights and bias
        self.weight = self.weight.decrypt()
        self.bias = self.bias.decrypt()
        
    def __call__(self, *args, **kwargs):
        return self.forward(*args, **kwargs)
    

In [None]:
# parameters
# the degree of the polynomial modulus
poly_mod_degree = 8192
# the bit-length of the modulus chain
coeff_mod_bit_sizes = [40, 21, 21, 21, 21, 21, 21, 40]
# create TenSEALContext
enc_training = ts.context(ts.SCHEME_TYPE.CKKS, poly_mod_degree, -1, coeff_mod_bit_sizes)
# generate keys
enc_training.global_scale = 2 ** 21
enc_training.generate_galois_keys()

t_start = time()
enc_x_train = [ts.ckks_vector(enc_training, x.tolist()) for x in x_train]
enc_y_train = [ts.ckks_vector(enc_training, y.tolist()) for y in y_train]
t_end = time()
print(f"Encryption of the training_set took {int(t_end - t_start)} seconds")

Encryption of the training_set took 18 seconds


In [None]:
# create the encrypted model
ELR = EncryptedLR(NE_LR(n_features))
accuracy = ELR.plain_accuracy(x_test, y_test)
print(f"Accuracy at epoch #0 is {accuracy}")
# train the encrypted model
times = []
for epoch in range(EPOCHS):
    ELR.encrypt(enc_training)
    
    t_start = time()
    for enc_x, enc_y in zip(enc_x_train, enc_y_train):
        # forward pass
        enc_out = ELR.forward(enc_x)
        # backward pass
        ELR.backward(enc_x, enc_out, enc_y)
    ELR.update_parameters()
    t_end = time()
    times.append(t_end - t_start)
    # decrypt the model and calculate the accuracy
    ELR.decrypt()
    EN_accuracy = ELR.plain_accuracy(x_test, y_test)
    print(f"Accuracy at epoch #{epoch + 1} is {EN_accuracy:.4f}")
    print(f"Loss at epoch #{epoch + 1} is {(1 - EN_accuracy):.4f}")


print(f"\nAverage time per epoch: {int(sum(times) / len(times))} seconds")
print(f"Accuracy {EN_accuracy:.4f}")

diff_accuracy = NE_accuracy - EN_accuracy
print(f"Difference between plain and encrypted accuracies: {diff_accuracy:.4f}")

Accuracy at epoch #0 is 0.3392857015132904
Accuracy at epoch #1 is 0.8482
Loss at epoch #1 is 0.1518
Accuracy at epoch #2 is 0.8839
Loss at epoch #2 is 0.1161
Accuracy at epoch #3 is 0.8527
Loss at epoch #3 is 0.1473
Accuracy at epoch #4 is 0.7991
Loss at epoch #4 is 0.2009
Accuracy at epoch #5 is 0.7589
Loss at epoch #5 is 0.2411

Average time per epoch: 62 seconds
Accuracy 0.7589
Difference between plain and encrypted accuracies: 0.0446
