In [1]:
import torch

import numpy as np
import pandas as pd
import tenseal as ts
import base64

from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
from sklearn.preprocessing import StandardScaler

from time import time
from tqdm import tqdm_notebook

In [2]:
def write_data(file_name, file_content):
    if type(file_content) == bytes:
        file_content = base64.b64encode(file_content)
        
    with open(file_name, 'wb') as f:
        f.write(file_content)

def read_data(file_name):
    with open(file_name, 'rb') as f:
        file_content = f.read()
    
    return base64.b64decode(file_content)

In [3]:
context = ts.context_from(read_data(file_name="D:/data/customer/secret_context.txt"))

In [4]:
weight = ts.lazy_ckks_vector_from(read_data(file_name="D:/data/customer/weight.txt"))
bias = ts.lazy_ckks_vector_from(read_data(file_name="D:/data/customer/bias.txt"))

In [5]:
weight.link_context(context)
bias.link_context(context)
weight = weight.decrypt()
bias = bias.decrypt()


In [6]:
# Linear Regression Model that supports training
# but several settings have to be set baiscally and tough

class LR:
    def __init__(self, weight, bias):
        # we have to change the shape: 1xN -> N
        self.weight = weight  
        self.bias = bias
        
        self._delta_w = 0
        self._delta_b = 0
        self._count = 0

    def forward(self, enc_x):
        # forward propagation
        enc_out = enc_x.dot(self.weight) + self.bias
        enc_out = LR.sigmoid(enc_out)
        
        return enc_out
    
    def backward(self, enc_x, enc_out, enc_y):
        # backward propagation
        out_minus_y = (enc_out - enc_y)
        self._delta_w += enc_x * out_minus_y
        self._delta_b += out_minus_y
        self._count += 1
        
    def update_parameters(self):
        # optimizer
        if self._count == 0:
            raise RuntimeError("You should at least run one forward iteration")
        
        self.weight -= self._delta_w * (1 / self._count) + self.weight * 0.05
        # "self.weight * 0.05" means l2 regularization
        # it helps keep value between [-5, 5] (sigmoid)
        self.bias -= self._delta_b * (1 / self._count)
        
        self._delta_w = 0
        self._delta_b = 0
        self._count = 0

    @staticmethod
    def sigmoid(enc_x):
        # sigmoid function approximated between [-5, 5]
        return enc_x.polyval([0.5, 0.197, 0, -0.004])

    def plain_accuracy(self, x_test, y_test):
        # accuracy for regression
        w = torch.tensor(self.weight).double().reshape(-1, 1)
        b = torch.tensor(self.bias).double()
        x_test = x_test.double()
        result = x_test.matmul(w) + b
        out = torch.sigmoid(result)

        correct = torch.abs(y_test - out) < 0.5
        # if the loss under 0.5 -> correct
        return correct.float().mean()

    def encrypt(self, context):
        # encrypt the model before training
        self.weight = ts.ckks_vector(context, self.weight)
        self.bias = ts.ckks_vector(context, self.bias)

    def decrypt(self, key):
        # decrypt the model after training
        self.weight = self.weight.decrypt(key)
        self.bias = self.bias.decrypt(key)

    def __call__(self, *args, **kwargs):
        return self.forward(*args, **kwargs)

In [7]:
decrypted_lr = LR(weight, bias)

In [8]:
# Loading the dataset
df = pd.read_csv("data/hmeq.csv", sep=",")

# Replacement of NaN variables
df['MORTDUE'].fillna(value=df['MORTDUE'].mean(), inplace=True)
df['VALUE'].fillna(value=df['VALUE'].mean(), inplace=True)
df["REASON"].fillna(value="DebtCon", inplace=True)
df["JOB"].fillna(value="Other", inplace=True)
df["YOJ"].fillna(value=df['YOJ'].mean(), inplace=True)
df["DEROG"].fillna(value=0, inplace=True)
df["DELINQ"].fillna(value=0, inplace=True)
df['CLAGE'].fillna(value=df['CLAGE'].mean(), inplace=True)
df['NINQ'].fillna(value=df['NINQ'].mean(), inplace=True)
df['CLNO'].fillna(value=df['CLNO'].mean(), inplace=True)
df['CLNO'].fillna(value=df['CLNO'].mean(), inplace=True)
df['DEBTINC'].fillna(value=df['DEBTINC'].mean(), inplace=True)
df.drop(columns=["BAD", "JOB", "REASON"])

# Checking if there is anything left out
assert np.array_equal(df.isnull().sum(), [0] * len(df.isnull().sum()))

x_basic = df.drop(columns=["BAD", "JOB", "REASON"])
y = df["BAD"]

scaler = StandardScaler()
x_scaled = scaler.fit_transform(np.array(x_basic))
y_scaled = scaler.fit_transform(np.array(y).reshape(-1, 1))

outliers = []
temp = x_scaled.T
for i in range(10):
    outliers = outliers + np.where(abs(temp[i]) > 5)[0].tolist()
outliers = np.unique(np.array(outliers))

x_scaled = np.delete(x_scaled, outliers, axis=0)
y_scaled = np.delete(y_scaled, outliers, axis=0)

x_scaled = np.round(x_scaled, 3)
y_scaled = np.round(y_scaled, 3)

x_train, x_test, y_train, y_test = train_test_split(x_scaled, 
                                                    y_scaled, 
                                                    test_size=0.2, 
                                                    shuffle=True, 
                                                    stratify=y_scaled, 
                                                    random_state=42)

In [9]:
x_test = torch.tensor(x_test).double()
y_test = torch.tensor(y_test).double()

accuracy = decrypted_lr.plain_accuracy(x_test, y_test)
print(accuracy)

tensor(0.5213)
