# 使用整数训练加密前馈神经网络
Iris是一个非常著名的数据集，它包含了150个样本，分为3类，每类50个样本，每个样本包含4个属性，分别是花萼长度、花萼宽度、花瓣长度、花瓣宽度，目标是根据这4个属性预测鸢尾花的类别。

## 导入依赖库

In [50]:
import numpy
from concrete import fhe
from concrete.ml.quantization import QuantizedArray
from concrete.ml.quantization.quantizers import(
    QuantizedArray,
    MinMaxQuantizationStats,
)
from concrete.ml.quantization.quantized_ops import (
    QuantizationOptions,
)
import numpy as np
from sklearn import datasets
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import OneHotEncoder

## 查找表非线性激活函数

## 加载数据集

In [51]:
# 加载Iris数据集
iris = datasets.load_iris()
X = iris.data
y = iris.target

# 将输出标签进行独热编码
encoder = OneHotEncoder(sparse=False)
y_one_hot = encoder.fit_transform(y.reshape(-1, 1))

# 数据标准化
scaler = StandardScaler()
X = scaler.fit_transform(X)

# 划分训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(X, y_one_hot, test_size=0.2, random_state=42)


## 定义NumPy版的FFNN模型

In [74]:
# 定义一个类来表示NumPy版的FFNN模型
class NumPyFFNN:
    def __init__(self, input_dim, hidden_dim, output_dim):
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.output_dim = output_dim
        self.weights1 = np.random.randn(input_dim, hidden_dim)
        self.bias1 = np.zeros(hidden_dim)
        self.weights2 = np.random.randn(hidden_dim, output_dim)
        self.bias2 = np.zeros(output_dim)
        self.relu_circuit=None

    def q_relu(self, n_bits:int,input:numpy.ndarray):
        options=QuantizationOptions(n_bits,is_symmetric=True,is_signed=True)
        stats=MinMaxQuantizationStats(n_bits)
        stats.compute_quantization_stats(input)
        # Quantize the inputs with n_bits
        q_inputs = QuantizedArray(n_bits, input,options=options,stats=stats)
        @fhe.compiler({"q_inputs":"encrypted"})
        def q_relu_impl(q_inputs):
            return np.maximum(q_inputs, 0)
        inputset=[np.random.randint(-128, 128, size=input.size)
                  for i in range(10000)]
        circuit=q_relu_impl.compile(inputset)
        circuit.keygen()
        self.relu_circuit=circuit
        return circuit

    def q_sub(self, n_bits:int,input_0 :numpy.ndarray, input_1:numpy.ndarray):

        options=QuantizationOptions(n_bits,is_symmetric=True,is_signed=True)
        stats=MinMaxQuantizationStats(n_bits)
        stats.compute_quantization_stats(np.hstack((input_0, input_1)))
        # Quantize the inputs with n_bits
        q_inputs_0 = QuantizedArray(n_bits, input_0,options=options,stats=stats)
        q_inputs_1 = QuantizedArray(n_bits, input_1,options=options,stats=stats)
        @fhe.compiler({"q_inputs_0":"encrypted","q_inputs_1":"encrypted"})
        def q_sub_impl(q_inputs_0,q_inputs_1):
            return q_inputs_0-q_inputs_1
        inputset=[(np.random.randint(-128, 128, size=input_0.size),np.random.randint(-128, 128, size=input_1.size))
                  for i in range(10000)]
        circuit=q_sub_impl.compile(inputset)
        q_result=circuit.encrypt_run_decrypt(q_inputs_0.qvalues,q_inputs_1.qvalues)
        return q_result*q_inputs_0.quantizer.scale

    def q_add(self, n_bits:int,input_0 :numpy.ndarray, input_1:numpy.ndarray):

        options=QuantizationOptions(n_bits,is_symmetric=True,is_signed=True)
        stats=MinMaxQuantizationStats(n_bits)
        stats.compute_quantization_stats(np.hstack((input_0, input_1)))
        # Quantize the inputs with n_bits
        q_inputs_0 = QuantizedArray(n_bits, input_0,options=options,stats=stats)
        q_inputs_1 = QuantizedArray(n_bits, input_1,options=options,stats=stats)
        @fhe.compiler({"q_inputs_0":"encrypted","q_inputs_1":"encrypted"})
        def q_add_impl(q_inputs_0,q_inputs_1):
            return q_inputs_0+q_inputs_1
        inputset=[(np.random.randint(-128, 128, size=input_0.size),np.random.randint(-128, 128, size=input_1.size))
                  for i in range(10000)]
        circuit=q_add_impl.compile(inputset)
        q_result=circuit.encrypt_run_decrypt(q_inputs_0.qvalues,q_inputs_1.qvalues)
        return q_result*q_inputs_0.quantizer.scale

    def q_mul(self, n_bits:int,input_0 :numpy.ndarray, input_1:numpy.ndarray):
        options=QuantizationOptions(n_bits,is_symmetric=True,is_signed=True)
        q_inputs_0 = QuantizedArray(n_bits, input_0, is_signed=True,options=options)
        q_inputs_1 = QuantizedArray(n_bits, input_1, is_signed=True,options=options)
        @fhe.compiler({"q_inputs_0":"encrypted","q_inputs_1":"encrypted"})
        def q_mul_impl(q_inputs_0,q_inputs_1):
            return q_inputs_0*q_inputs_1
        inputset=[(np.random.randint(-128, 128, size=input_0.size),np.random.randint(-128, 128, size=input_1.size))
                  for i in range(10000)]
        circuit=q_mul_impl.compile(inputset)
        q_result=circuit.encrypt_run_decrypt(q_inputs_0.qvalues,q_inputs_1.qvalues)
        return q_result*q_inputs_0.quantizer.scale*q_inputs_1.quantizer.scale

    def q_div(self, n_bits:int,input_0 :numpy.ndarray, input_1:numpy.ndarray):
        options=QuantizationOptions(n_bits,is_symmetric=True,is_signed=True)
        q_inputs_0 = QuantizedArray(n_bits, input_0, is_signed=True,options=options)
        q_inputs_1 = QuantizedArray(n_bits, input_1, is_signed=True,options=options)
        @fhe.compiler({"q_inputs_0":"encrypted"})
        def q_div_impl(q_inputs_0):
            return numpy.floor_divide(q_inputs_0,q_inputs_1.qvalues)
        inputset=[np.random.randint(-128, 128, size=input_0.size)
                  for i in range(10000)]
        circuit=q_div_impl.compile(inputset)
        print(circuit)
        q_result=circuit.encrypt_run_decrypt(q_inputs_0.qvalues)
        print(q_result)
        return q_result*q_inputs_0.quantizer.scale/q_inputs_1.quantizer.scale

    def q_matmul_(
        self,
        n_bits: int,
        inputs: numpy.ndarray,
        weights: numpy.ndarray,
    ):
        options=QuantizationOptions(n_bits,is_symmetric=True,is_signed=True)
        q_inputs = QuantizedArray(n_bits, inputs,is_signed=True,options=options)
        q_weights = QuantizedArray(n_bits, weights,is_signed=True,options=options)

        @fhe.compiler({"q_inputs":"encrypted","q_weights":"clear"})
        def q_matmul_impl(q_inputs,q_weights):
            return q_inputs@q_weights
        inputset=[(np.random.randint(-128, 128, size=inputs.shape),np.random.randint(-128, 128, size=weights.shape))
                  for i in range(10000)]
        circuit=q_matmul_impl.compile(inputset)
        q_result=circuit.encrypt_run_decrypt(q_inputs.qvalues,q_weights.qvalues)
        return q_result*q_inputs.quantizer.scale*q_weights.quantizer.scale



    def forward(self, x):
        out1 = np.dot(x, self.weights1) + self.bias1
        # out1 = np.maximum(out1, 0)  # ReLU activation function
        out1[out1 < 0] = 0
        out2 = np.dot(out1, self.weights2) + self.bias2
        return out2

    def q_forward(self, x):
        n_bits=8
        # 线性层
        out1 = self.q_matmul_(n_bits,x,self.weights1)+self.bias1
        # 激活层
        # 执行激活函数需要判断tlu是否存在
        if self.relu_circuit is None:
            out1_relu = self.q_relu(n_bits,out1)  # ReLU activation function
        else:
            out1_relu=self.relu_circuit.encrypt_run_decrypt(out1)

        out2=self.q_matmul_(n_bits,out1_relu, self.weights2)
        return out2

    def backward(self, x, y, learning_rate):
        # 前向传播
        out1 = np.dot(x, self.weights1) + self.bias1
        out_relu = np.maximum(out1, 0)  # ReLU activation function
        out2 = np.dot(out_relu, self.weights2) + self.bias2
        loss = np.mean((out2 - y) ** 2)  # 均方误差损失

        # 反向传播
        delta_out2 = 2 * (out2 - y) / len(x)
        delta_weights2 = np.dot(out_relu.T, delta_out2)
        delta_bias2 = np.sum(delta_out2, axis=0)
        delta_relu_out = np.dot(delta_out2, self.weights2.T)
        # delta_out1[out1 <= 0] = 0  # ReLU反向传播
        delta_out1 = np.where(out_relu <= 0, 0, delta_relu_out) #不使用密文，直接算
        delta_weights1 = np.dot(x.T, delta_out1)
        delta_bias1 = np.sum(delta_out1, axis=0)

        # 更新参数
        self.weights2 -= learning_rate * delta_weights2
        self.bias2 -= learning_rate * delta_bias2
        self.weights1 -= learning_rate * delta_weights1
        self.bias1 -= learning_rate * delta_bias1

        return loss

    def q_backward(self, x, y, learning_rate):
        n_bits = 8
        # 前向传播
        out1 = self.q_matmul_(n_bits, x, self.weights1)+self.bias1
        # 执行激活函数需要判断tlu是否存在
        if self.relu_circuit is None:
            out1_relu = self.q_relu(n_bits,out1)  # ReLU activation function
        else:
            out1_relu=self.relu_circuit.encrypt_run_decrypt(out1)

        out2 = self.q_matmul_(n_bits, out1_relu, self.weights2)+ self.bias2
        loss = np.mean((out2 - y) ** 2)  # 均方误差损失

        # 反向传播
        delta_out2 = 2 * (out2 - y) / len(x)
        delta_weights2=self.q_matmul_(n_bits,out1_relu.T,delta_out2)  # 密文和密文矩阵乘法不能超过16位
        delta_bias2 = np.sum(delta_out2, axis=0) #delta_out2是密文，所以delta_bias2是密文
        # delta_out1=self.q_matmul_(n_bits,delta_out2,self.weights2.T)
        delta_out1=delta_out2@self.weights2.T
        delta_out1_relu=self.q_mul(n_bits,self.q_mul(n_bits,delta_out1,out1_relu),self.q_sub(n_bits,1,out1_relu))
        delta_weights1=self.q_mul(n_bits,x.T,delta_out1_relu)
        delta_bias1 = np.sum(delta_out1_relu, axis=0)

        # 更新参数
        self.weights2 -= learning_rate * delta_weights2
        self.bias2 -= learning_rate * delta_bias2
        self.weights1 -= learning_rate * delta_weights1
        self.bias1 -= learning_rate * delta_bias1

        return loss


# 模型参数
input_dim = X_train.shape[1]
hidden_dim = 64
output_dim = y_train.shape[1]

In [76]:
# 模型参数
input_dim = X_train.shape[1]
hidden_dim = 64
output_dim = y_train.shape[1]

# 创建NumPy模型实例
numpy_model = NumPyFFNN(input_dim, hidden_dim, output_dim)

# 训练模型
learning_rate = 0.01
num_epochs = 1000

for epoch in range(num_epochs):
    # 随机选择一批训练数据
    batch_indices = np.random.choice(len(X_train), 32, replace=False)
    x_batch = X_train[batch_indices]
    y_batch = y_train[batch_indices]

    # 执行一次前向传播和反向传播，并获得损失
    loss = numpy_model.backward(x_batch, y_batch, learning_rate)

    # 打印损失
    if (epoch + 1) % 100 == 0:
        print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {loss}')

# 测试模型
    # 前向传播
    predictions = numpy_model.forward(X_test)

    # 计算准确率
    correct = (np.argmax(predictions, axis=1) == np.argmax(y_test, axis=1)).sum()
    total = len(X_test)
    accuracy = correct / total * 100

print(f'Test Accuracy: {accuracy:.2f}%')


Epoch [100/1000], Loss: 0.21421576908440051
Epoch [200/1000], Loss: 0.11406602364812184
Epoch [300/1000], Loss: 0.09065339377997102
Epoch [400/1000], Loss: 0.08039673118142834
Epoch [500/1000], Loss: 0.059730365248968464
Epoch [600/1000], Loss: 0.0575281151748615
Epoch [700/1000], Loss: 0.04766828966393224
Epoch [800/1000], Loss: 0.031198193394620276
Epoch [900/1000], Loss: 0.05187602793540308
Epoch [1000/1000], Loss: 0.03550224456771644
Test Accuracy: 96.67%


## 前向传播过程
```python
out1 = self.q_matmul_(n_bits, x, self.weights1)+self.bias1 # 线性层1
out1_sigmoid = self.q_sigmoid(n_bits, out1)  # 使用Sigmoid激活函数
out2 = self.q_matmul_(n_bits, out1_sigmoid, self.weights2)+ self.bias2 # 线性层2
loss = np.mean((out2 - y) ** 2)  # 均方误差损失
```

$$[out1]=[q_x]\cdot q_w_1+q_b_1\\$$
$$[out1\_sigmoid]=sigmoid([out1])\\$$
$$[out2]=[out1\_sigmoid]\cdot q_w_2+q_b_2\\$$
$$[loss]=\frac{1}{n}\sum_{i=1}^{n}([out2]-[y])^2$$


## 反向传播过程
```python
delta_out2=self.q_div(n_bits,self.q_mul(n_bits,self.q_sub(n_bits,out2,y),2),len(x))
delta_weights2=self.q_matmul_(n_bits,out1_sigmoid.T,delta_out2)  # 密文和密文矩阵乘法不能超过16位
delta_bias2 = np.sum(delta_out2, axis=0) #delta_out2是密文，所以delta_bias2是密文
delta_out1=self.q_matmul_(n_bits,delta_out2,self.weights2.T)
delta_out1_sigmoid=self.q_mul(n_bits,self.q_mul(n_bits,delta_out1,out1_sigmoid),self.q_sub(n_bits,1,out1_sigmoid))
delta_weights1=self.q_mul(n_bits,x.T,delta_out1_sigmoid)
delta_bias1 = np.sum(delta_out1_sigmoid, axis=0)
```


$$[\delta_{out2}]=2*\frac{[out2]-[y]}{len(x)}\\$$
$$[\delta_{weights2}]=[out1\_sigmoid]^T\cdot [\delta_{out2}]\\$$
$$[\delta_{bias2}]=\sum_{i=1}^{n}[\delta_{out2}]\\$$
$$[\delta_{out1}]=[\delta_{out2}]\cdot {q_w_2^T}\\$$
$$[\delta_{out1\_sigmoid}]=[\delta_{out1}]\cdot [out1\_sigmoid]\cdot(1-[out1\_sigmoid])\\$$
$$[\delta_{weights1}]=[{x^T}]\cdot [\delta_{out1\_sigmoid}]\\$$
$$[\delta_{bias1}]=\sum_{i=1}^{n}[\delta_{out1\_sigmoid}]$$

```
```


 ## 更新参数过程
```python
self.weights2 -= learning_rate * delta_weights2
self.bias2 -= learning_rate * delta_bias2
self.weights1 -= learning_rate * delta_weights1
self.bias1 -= learning_rate * delta_bias1
```

$$[\boldsymbol{w}_2] = \boldsymbol{w}_2 - \text{learning\_rate} \cdot [\boldsymbol{\delta}_{\text{weights2}}]\\$$
$$[\boldsymbol{b}_2] = \boldsymbol{b}_2 - \text{learning\_rate} \cdot [\boldsymbol{\delta}_{\text{bias2}}]\\$$
$$[\boldsymbol{w}_1] = \boldsymbol{w}_1 - \text{learning\_rate} \cdot [\boldsymbol{\delta}_{\text{weights1}}]\\$$
$$[\boldsymbol{b}_1] = \boldsymbol{b}_1 - \text{learning\_rate} \cdot [\boldsymbol{\delta}_{\text{bias1}}]\\$$






## 创建NumPy模型实例

In [4]:
numpy_model = NumPyFFNN(input_dim, hidden_dim, output_dim)

### 测试减法

In [5]:
result=numpy_model.q_sub(8,np.array([-1,5,6]).astype(np.float32),np.array([1,2,3]).astype(np.float32))
print("result:",result)
print("real_result",np.array([-1,5,6])-np.array([1,2,3]))

result: [-1.98425197  3.02362205  2.97637795]
real_result [-2  3  3]


### 测试加法

In [6]:
# result=numpy_model.q_add(8,np.array([-1,5,6]).astype(np.float32),np.array([1,2,3]).astype(np.float32))
# print("result:",result)
# print("real_result",np.array([-1,5,6])+np.array([1,2,3]))

### 测试乘法

In [7]:
# result=numpy_model.q_mul(8,np.array([-1,5,6]).astype(np.float32),np.array([1,2,3]).astype(np.float32))
# print("result:",result)
# print("real_result",np.array([-1,5,6])*np.array([1,2,3]))

### 测试除法

In [8]:
# result=numpy_model.q_div(8,np.array([2,4,6]).astype(np.float32),np.array([1,2,3]).astype(np.float32))
# print("result:",result)
# print("real_result",np.array([2,4,6])/np.array([1,2,3]))

### 测试矩阵相乘

In [9]:
# matrix1 = np.array([[1.0, 2.0], [3.0, 4.0]], dtype=np.float64)
# matrix2 = np.array([[5.0, 6.0], [7.0, 8.0]], dtype=np.float64)
# result=numpy_model.q_matmul_(8,matrix1,matrix2)
# print("result:\n", result)
# real_result = matrix1 @ matrix2
# print("real_result:\n", real_result)

### 测试q_sigmoid

In [10]:
print("result:",numpy_model.q_sigmoid(8,np.array([2,3]).astype(np.float32)))
print("real_result:",numpy_model.sigmoid(np.array([2,3])))

result: [0.88235294 0.95294118]
real_result: [0.88079708 0.95257413]


### 测试q_np.sum

## 训练模型

In [None]:
learning_rate = 0.01
num_epochs = 1000

for epoch in range(num_epochs):
    # 随机选择一批训练数据
    batch_indices = np.random.choice(len(X_train), 32, replace=False)
    x_batch = X_train[batch_indices]
    y_batch = y_train[batch_indices]

    # 执行一次前向传播和反向传播，并获得损失
    loss = numpy_model.q_backward(x_batch, y_batch, learning_rate)

    # 打印损失
    if (epoch + 1) % 100 == 0:
        print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {loss}')

## 测试模型

In [None]:
# 前向传播
predictions = numpy_model.forward(X_test)

# 计算准确率
correct = (np.argmax(predictions, axis=1) == np.argmax(y_test, axis=1)).sum()
total = len(X_test)
accuracy = correct / total * 100

print(f'Test Accuracy: {accuracy:.2f}%')
