Erkennung von Herzversagen
===
Eine der vielen Möglichkeiten, in welchen personenbezogene Daten verarbeitet werden ist im Gesundheitswesen. So können zum Beispiel Modelle aus dem Maschinellen Lernen eingesetzt werden, um Krebs oder andere Krankheiten zu erkennen, welche aber wiederum persönliche Informationen des Patienten benötigen. Als Anwendungsfall wird in diesem Beispiel die Erkennung von Herzversagen anhand von verschiedenen Merkmalen getestet.<br />
https://www.kaggle.com/datasets/fedesoriano/heart-failure-prediction

In [1]:
from util.heNet import Network, FullyConnectedLayer, ActivationLayer
from util.heNet import square, square_prime, sigmoid, sigmoid_prime, binary_cross_entropy, binary_cross_entropy_prime
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
import tenseal as ts

heart_csv = pd.read_csv("../data/heart.csv")

heart_dataframe = pd.get_dummies(heart_csv)

y = heart_dataframe["HeartDisease"]
heart_dataframe.drop(["HeartDisease"], axis = 1, inplace = True, errors = "ignore")

Vergleich mit einer PyTorch Architektur
---
Um die HE freundlichen Implementierung zu vergleichen mit einer etablierten Bibliothek wird ein Netz mit PyTorch gebaut und evaluiert.

In [2]:
import torch
import torch.nn as nn

class TorchNet(nn.Module):
  def __init__(self):
    super(TorchNet, self).__init__()
    self.one = nn.Linear(20, 40)
    self.output = nn.Linear(40, 1)

  def forward(self, x):
    x = self.one(x)
    x = torch.sigmoid(x)
    x = self.output(x)
    x = torch.sigmoid(x)
    return x

HE Architektur Vergleich
---
Die hier erstellte Architektur besitzt 2 Schichten mit einer Quadratischen Aktivierungsfunktionen und einer Sigmoid Aktivierungsfunktion am Ende für das Training. Die Dimension der einzelnen Schichten nach der Eingabeschicht wurde niedrig gehalten, um die Größe der verarbeiteten Zahlen zu verringern. Ansonsten würden die Quadratischen Aktivierungsfunktionen zu einem Problem werden.

In [3]:
test_net = Network(debug=None)
test_net.add(FullyConnectedLayer(20, 10))
test_net.add(ActivationLayer(square, square_prime))
test_net.add(FullyConnectedLayer(10, 1))
test_net.add(ActivationLayer(sigmoid, sigmoid_prime))

Cross Validation
---
Nun können beide Architekturen miteinander verglichen werden indem beide die selbe Cross Validation durchlaufen.

In [4]:
from CrossValidation import cross_validate_he_torch

he_acc_sum, torch_acc_sum = cross_validate_he_torch(he_net=test_net, torch=TorchNet, heart_dataframe=heart_dataframe, label=y)

print(f"Cross Validation HE Architektur --> {round(np.array(he_acc_sum).mean(),3)}")
print(f"Cross Validation PyTorch Architektur --> {round(np.array(torch_acc_sum).mean(),3)}")

Cross Validation HE Architektur --> 0.855
Cross Validation PyTorch Architektur --> 0.865


Training eines HE freundlichen Modells
===

Da die durchschnittliche Genauigkeit ähnlich zur Genauigkeit des PyTorch Modells ist wird nun erneut ein HE freundliches Netz trainiert, welches eingesetzt wird für die Inferenzen.

In [3]:
def toNp(value_list):
    return np.array([[data] for data in value_list])

X_train, X_test, y_train, y_test = train_test_split(heart_dataframe, y, test_size=0.25) # TODO

scaler = MinMaxScaler()

train_scaler = scaler.fit(X_train)

X_test_normal = train_scaler.transform(X_test)
X_train = train_scaler.transform(X_train)
X_test = train_scaler.transform(X_test)

X_train = toNp(X_train)
X_test = toNp(X_test)
y_train = toNp(toNp(y_train.to_numpy()))
y_test =  toNp(toNp(y_test.to_numpy()))

In [65]:
net = Network(1)
net.add(FullyConnectedLayer(20, 10))
net.add(ActivationLayer(square, square_prime))
net.add(FullyConnectedLayer(10, 1))
net.add(ActivationLayer(sigmoid, sigmoid_prime))

net.use(binary_cross_entropy, binary_cross_entropy_prime)
net.fit(X_train, y_train, epochs=10, minibatch=8, learning_rate=0.001)

out = net.predict(X_test)
correct = 0
for idx,_ in enumerate(out):
    result = 1 if np.squeeze(out[idx]) > 0.5 else 0
    if result == np.squeeze(y_test[idx]):
        correct = correct + 1
print(f"Im Testdatensatz wurden {correct} von {len(out)} richtig klassifiziert --> {round(correct/len(out),3)}")

epoch 1/10
epoch 2/10
epoch 3/10
epoch 4/10
epoch 5/10
epoch 6/10
epoch 7/10
epoch 8/10
epoch 9/10
epoch 10/10
Im Testdatensatz wurden 198 von 230 richtig klassifiziert --> 0.861


Inferenz auf verschlüsselten Daten
---
Nachdem das HE freundliche Modell erstellt wurde kann dieses nun auch auf den verschlüsselten Daten getestet werden. Verwendet wird dabei der zuvor getestete Teil des Datensatz.

In [66]:
bits_scale = 26
context = ts.context(
    ts.SCHEME_TYPE.CKKS,
    poly_modulus_degree=8192,
    coeff_mod_bit_sizes=[31, bits_scale, bits_scale, bits_scale, 31]
)
context.global_scale = pow(2, bits_scale)
context.generate_galois_keys()

e_X_test = np.array([[ts.ckks_vector(context, [value]) for value in data] for data in X_test_normal])

Als nächstes wird die letzte Schicht im Modell entfernt, da die verschlüsselten Daten nicht mit der normalen Sigmoid Funktion verrechnet werden können.

In [67]:
net.layers = net.layers[:-1]

Der letzte und längste Schritt ist das eigentliche Testen der verschlüsselten Daten.

In [68]:
e_out = net.predict(e_X_test)
correct = 0
d_out = []
for idx,_ in enumerate(e_out):
    result = 1 if sigmoid(e_out[idx][0][0].decrypt()[0]) > 0.5 else 0
    d_out.append(result)
    if result == np.squeeze(y_test[idx]):
        correct = correct + 1
print(f"Im Testdatensatz wurden {correct} von {len(e_out)} richtig klassifiziert --> {round(correct/len(e_out),3)}")

Im Testdatensatz wurden 198 von 230 richtig klassifiziert --> 0.861


* Positiv: Genauigkeit zwischen unverschlüsselte und verschlüsselte Test sind gleich
* Negativ: Laufzeit hängt stark von der Architektur und Größe der verschlüsselten Objekte ab

Zum Vergleich wird noch überprüft ob jede Eingabe auch die gleiche Ausgabe produziert hat.

In [69]:
u_out = [1 if np.squeeze(value) > 0.5 else 0 for value in out]
print(u_out == d_out)

True


Training mit verschlüsselten Daten
===
Für das Training des Modells auf den verschlüsselten Daten wird der zweite Ansatz des Trainings der Linearen Regression auf verschlüsselten Daten verwendet. Das bedeutet, dass an dieser Stelle eine interaktive Variante dargestellt wird zwischen dem Datenbesitzer und den ML Experten, die das Netz trainieren.

In [70]:
from heNet import EncryptedActivationLayer, EncryptedFullyConnectedLayer

def encrypted_training(context):
    net = Network(debug=2)
    net.add(EncryptedFullyConnectedLayer(20, 10, context))
    net.add(EncryptedActivationLayer(square, square_prime, context))
    net.add(EncryptedFullyConnectedLayer(10, 1, context))
    net.add(EncryptedActivationLayer(square, square_prime, context))

    net.use(binary_cross_entropy, binary_cross_entropy_prime)

    net.crypt_fit(X_train, y_train, epochs=10, minibatch=8, learning_rate=0.0009, context=context)
    return net

def encrypted_predict(net):
    outN = net.predict(X_test)
    correct = 0
    for idx,result in enumerate(outN):
        result = 1 if result > 0.5 else 0
        if result == np.squeeze(y_test[idx]):
            correct = correct + 1
    print(f"Im Testdatensatz wurden {correct} von {len(outN)} richtig klassifiziert --> {round(correct/len(outN),3)}")

In [71]:
poly_mod_degree = 2 ** 12
bits_scale = 20

coeff_mod_bit_sizes=[40, bits_scale, 40]
context = ts.context(ts.SCHEME_TYPE.CKKS, poly_mod_degree, -1, coeff_mod_bit_sizes)

context.global_scale = pow(2, bits_scale)
context.generate_galois_keys()

net = encrypted_training(context)

epoch 1/10
epoch 3/10
epoch 5/10
epoch 7/10
epoch 9/10


In [72]:
encrypted_predict(net)

Im Testdatensatz wurden 193 von 230 richtig klassifiziert --> 0.839


Das Training mit niedrigen Verschlüsselungseinstellungen und erneute Verschlüsselung nach jeder verschlüsselten Multiplikation hat 116 Minuten gedauert.

In [84]:
test = Network()
test.add(FullyConnectedLayer(20, 10))
test.add(ActivationLayer(square, square_prime))
test.add(FullyConnectedLayer(10, 1))
test.add(ActivationLayer(square, square_prime))

test.use(binary_cross_entropy, binary_cross_entropy_prime)
test.fit(X_train, y_train, epochs=10, minibatch=8, learning_rate=0.0009)

out = test.predict(X_test)
correct = 0
for idx,_ in enumerate(out):
    result = 1 if np.squeeze(out[idx]) > 0.5 else 0
    if result == np.squeeze(y_test[idx]):
        correct = correct + 1
print(f"Im Testdatensatz wurden {correct} von {len(out)} richtig klassifiziert --> {round(correct/len(out),3)}")

Im Testdatensatz wurden 193 von 230 richtig klassifiziert --> 0.839


Die Genauigkeit beider Modelle ist aber nicht durch die Dauer oder den verschlüsselten Daten beeinflusst und wird nur von der Modellarchitektur zurückgehalten.