Skip to content

Commit

Permalink
Merge ckks in FLServer LR Aggregator (intel-analytics#7)
Browse files Browse the repository at this point in the history
* update ckks in FLServer LR Aggregator

* some changes

* support ckks at scala FL Server

* add

* add

* fix and add ut

* fix and add ut

* add support for customized client module and divide example to 2 parties

* add comments
  • Loading branch information
Litchilitchy committed Sep 23, 2022
1 parent f39805b commit a948a81
Show file tree
Hide file tree
Showing 20 changed files with 806 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ class CAddTable(val ckksCommonPtr: Long) {
// Log4Error.invalidInputError(input.size().sameElements(target.size()),
// s"input size should be equal to target size, but got input size: ${input.size().toList}," +
// s" target size: ${target.size().toList}")
var ckksOutput = ckks.cadd(ckksCommonPtr, input(0), input(1))
if (input.size > 2) {
var ckksOutput = input(0)
if (input.size > 1) {
(2 to input.size).foreach{i =>
ckksOutput = ckks.cadd(ckksCommonPtr, ckksOutput, input(i))
}
Expand Down
11 changes: 10 additions & 1 deletion scala/ppml/scripts/vfl_split_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,27 @@ def vfl_split_dataset(file_name, num_pieces, has_rowkey_index):
print(f"data has {len(sample)} columns")
csv_reader = reader(read_obj)

# use following lines if you want to explicitly specify some columns
# col_list = []
# col_list.append([i for i in range(7)])
# col_list.append([i for i in range(7, 15)])

writer_list = []
col_idx_list = []
for i in range(num_pieces):
piece_file_name = file_name.split('/')[-1].split('.')[0]
writer_list.append(writer(open(f"{piece_file_name}-{i}.csv", "w"), delimiter=','))
if has_rowkey_index:
col_idx_list.append([0] + [i for i in range(1 + i, len(sample), num_pieces)])
# col_idx_list.append([0] + col_list[i])
else:
col_idx_list.append([i for i in range(i, len(sample), num_pieces)])
# col_idx_list.append(col_list[i])

for i, row in enumerate(csv_reader):
for i, row in enumerate(csv_reader):
row = np.array(row)
if len(row) == 0:
continue
for j in range(num_pieces):
writer_list[j].writerow(row[np.array(col_idx_list[j])])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ class FLClient(val _args: Array[String]) extends GrpcClientBase(_args) {
fgbostStub = new FGBoostStub(channel, clientUUID)
}

def initCkks(secret: Array[Array[Byte]]): Unit = {
nnStub = new NNStub(channel, clientUUID, secret)
}
override def shutdown(): Unit = {
try channel.shutdown.awaitTermination(5, TimeUnit.SECONDS)
catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class FLServer private[ppml](val _args: Array[String] = null) extends GrpcServer
configPath = "ppml-conf.yaml"
var clientNum: Int = 1
val fgBoostConfig = new FLConfig()
val nnService = new NNServiceImpl(clientNum)
parseConfig()

def setClientNum(clientNum: Int): Unit = {
Expand Down Expand Up @@ -80,7 +81,10 @@ class FLServer private[ppml](val _args: Array[String] = null) extends GrpcServer
}
def addService(): Unit = {
serverServices.add(new PSIServiceImpl(clientNum))
serverServices.add(new NNServiceImpl(clientNum))
serverServices.add(nnService)
serverServices.add(new FGBoostServiceImpl(clientNum, fgBoostConfig))
}
def setCkksAggregator(secret: Array[Array[Byte]]): Unit = {
nnService.initCkksAggregator(secret)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ abstract class NNModel() {
VFLTensorUtils.featureLabelToMiniBatch(xTrain, yTrain, batchSize),
VFLTensorUtils.featureLabelToMiniBatch(xValidate, yValidate, batchSize))
}

def trainStep(xTrain: Activity,
yTrain: Activity): Unit = {
estimator.trainStep(xTrain, yTrain)
}
/**
*
* @param trainData DataFrame of training data
Expand Down Expand Up @@ -117,6 +122,10 @@ abstract class NNModel() {
def predict(x: Tensor[Float], batchSize: Int = 4): Array[Activity] = {
estimator.predict(VFLTensorUtils.featureLabelToMiniBatch(x, null, batchSize))
}

def predictStep(x: Activity): Activity = {
estimator.predict(x)
}
/**
*
* @param data DataFrame of prediction data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@

package com.intel.analytics.bigdl.ppml.fl.algorithms

import com.intel.analytics.bigdl.Module
import com.intel.analytics.bigdl.dllib.nn.{Linear, Sequential}
import com.intel.analytics.bigdl.dllib.optim.Adam
import com.intel.analytics.bigdl.dllib.tensor.Tensor
import com.intel.analytics.bigdl.dllib.utils.Log4Error
import com.intel.analytics.bigdl.ppml.fl.NNModel
import com.intel.analytics.bigdl.ppml.fl.nn.VFLNNEstimator
import com.intel.analytics.bigdl.ppml.fl.utils.FLClientClosable
Expand All @@ -27,9 +30,15 @@ import com.intel.analytics.bigdl.ppml.fl.utils.FLClientClosable
* @param featureNum
* @param learningRate
*/
class VFLLogisticRegression(featureNum: Int,
learningRate: Float = 0.005f) extends NNModel() {
val model = Sequential[Float]().add(Linear(featureNum, 1))
class VFLLogisticRegression(featureNum: Int = -1,
learningRate: Float = 0.005f,
customModel: Module[Float] = null) extends NNModel() {
Log4Error.invalidInputError(featureNum != -1 || customModel != null,
"Either featureNum or customModel should be provided")
val clientModule = if (customModel == null) {
Linear[Float](featureNum, 1)
} else customModel
val model = Sequential[Float]().add(clientModule)
override val estimator = new VFLNNEstimator(
"vfl_logistic_regression", model, new Adam(learningRate))

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2016 The BigDL Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.intel.analytics.bigdl.ppml.fl.algorithms

import com.intel.analytics.bigdl.Module
import com.intel.analytics.bigdl.dllib.nn.{Linear, Sequential}
import com.intel.analytics.bigdl.dllib.optim.Adam
import com.intel.analytics.bigdl.dllib.utils.Log4Error
import com.intel.analytics.bigdl.ppml.fl.NNModel
import com.intel.analytics.bigdl.ppml.fl.nn.VFLNNEstimator
import com.intel.analytics.bigdl.ppml.fl.utils.FLClientClosable

/**
* VFL Logistic Regression
* @param featureNum
* @param learningRate
*/
class VFLLogisticRegressionCkks(featureNum: Int = -1,
learningRate: Float = 0.005f,
customModel: Module[Float] = null) extends NNModel () {
Log4Error.invalidInputError (featureNum != -1 || customModel != null,
"Either featureNum or customModel should be provided")
val clientModule = if (customModel == null) {
Linear[Float] (featureNum, 1)
} else customModel
val model = Sequential[Float]().add(Linear(featureNum, 1))
override val estimator = new VFLNNEstimator(
"vfl_logistic_regression_ckks", model, new Adam(learningRate))

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package com.intel.analytics.bigdl.ppml.fl.base

import com.intel.analytics.bigdl.dllib.feature.dataset.{LocalDataSet, MiniBatch}
import com.intel.analytics.bigdl.dllib.nn.abstractnn.Activity
import com.intel.analytics.bigdl.dllib.tensor.Tensor

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
Expand All @@ -27,11 +28,15 @@ trait Estimator {
def getEvaluateResults(): Map[String, Array[Float]] = {
evaluateResults.map(v => (v._1, v._2.toArray)).toMap
}
def trainStep(input: Activity,
target: Activity): Unit

def train(endEpoch: Int,
trainDataSet: LocalDataSet[MiniBatch[Float]],
valDataSet: LocalDataSet[MiniBatch[Float]]): Any

def evaluate(dataSet: LocalDataSet[MiniBatch[Float]])

def predict(input: Activity): Activity
def predict(dataSet: LocalDataSet[MiniBatch[Float]]): Array[Activity]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package com.intel.analytics.bigdl.ppml.fl.example.ckks

import com.intel.analytics.bigdl.dllib.NNContext
import com.intel.analytics.bigdl.dllib.nn.SparseLinear
import com.intel.analytics.bigdl.dllib.utils.{Log4Error, RandomGenerator}
import com.intel.analytics.bigdl.ppml.fl.NNModel
import com.intel.analytics.bigdl.ppml.fl.algorithms.{VFLLogisticRegression, VFLLogisticRegressionCkks}
import com.intel.analytics.bigdl.ppml.fl.utils.FlContextForTest
import org.apache.spark.sql.SparkSession

class Client(trainDataPath: String,
testDataPath: String,
clientId: Int,
appName: String) extends Thread {
override def run(): Unit = {
val testFlContext = new FlContextForTest()
testFlContext.initFLContext(clientId.toString, "localhost:8980")
val sqlContext = SparkSession.builder().getOrCreate()
val pre = new DataPreprocessing(sqlContext, trainDataPath, testDataPath, clientId)
val (trainDataset, validationDataset) = pre.loadCensusData()

val numFeature = 3049

RandomGenerator.RNG.setSeed(2L)
val linear = SparseLinear[Float](numFeature, 1)

val lr: NNModel = appName match {
case "dllib" => new VFLLogisticRegression(learningRate = 0.001f, customModel = linear)
case "ckks" => new VFLLogisticRegressionCkks(learningRate = 0.001f, customModel = linear)
case _ => throw new Error()
}


val epochNum = 20
var accTime: Long = 0
(0 until epochNum).foreach { epoch =>
trainDataset.shuffle()
val trainData = trainDataset.toLocal().data(false)
while (trainData.hasNext) {
val miniBatch = trainData.next()
val input = miniBatch.getInput()
val currentBs = input.toTensor[Float].size(1)
val target = miniBatch.getTarget()
val dllibStart = System.nanoTime()
lr.trainStep(input, target)
accTime += System.nanoTime() - dllibStart
}
println(s"$appName Time: " + accTime / 1e9)
}

linear.evaluate()
val evalData = validationDataset.toLocal().data(false)
var accDllib = 0
while (evalData.hasNext) {
val miniBatch = evalData.next()
val input = miniBatch.getInput()
val currentBs = input.toTensor[Float].size(1)
val target = miniBatch.getTarget().toTensor[Float]
val predict = lr.predictStep(input)
println(s"Predicting $appName")
(0 until currentBs).foreach { i =>
val dllibPre = predict.toTensor[Float].valueAt(i)
val t = target.valueAt(i + 1, 1)
if (t == 0) {
if (dllibPre <= 0.5) {
accDllib += 1
}
} else {
if (dllibPre > 0.5) {
accDllib += 1
}
}
// println(t + " " + dllibPre + " " + ckksPre)
}
}
println(s"$appName predict correct: $accDllib")

}
}

0 comments on commit a948a81

Please sign in to comment.