Distributed differential privacy on top of Apache Spark and Google's differential-privacy Java API.
This code is an experimental work-in-progress. API can change and is expected to break compatibility.
The library can be build with:
./gradlew build
The goal of this repo is to explore scalable approaches for DP primitives.
privacy-spark
wraps the Java API of differential-privacy.
Basic Laplacian Mechanism support is currently provided for BoundedMean, BoundedQuantiles, BoundedSum, Count
aggregations.
Upstream code implementation is well suited for divide-and conquer type of computations (they behave like monoids).
spark-privacy
exposes primitives as Spark UDAFs, using the typed Dataset Aggregator API.
A PrivateCount
monoid can be implemented with
class PrivateCount[T](epsilon: Double, contribution: Int) extends Aggregator[T, Count, Long] {
override def zero: Count = Count.builder()
.epsilon(epsilon)
.maxPartitionsContributed(contribution)
.build()
override def reduce(b: Count, a: T): Count = {
b.increment()
b
}
override def merge(b1: Count, b2: Count): Count = {
b1.mergeWith(b2.getSerializableSummary)
b1
}
override def finish(reduction: Count): Long = reduction.computeResult().toLong
override def bufferEncoder: Encoder[Count] = Encoders.kryo[Count]
override def outputEncoder: Encoder[Long] = Encoders.scalaLong
}
Note that this implementation violates the expected Aggregator
input for typed Datasets (= a
set of Row
s). This is currently intended behaviour. UDAFs are expected to be applied to DataFrame
s by registering them with functions.udafs
.
This boundary should be made implicit in our implementation.
A PrivateCount
aggregation can be performed on a group of rows like any built-in aggregate function:
val privateCount = new PrivateCount[Long](epsilon, maxContributions)
val privateCountUdf = functions.udaf(privateCount)
dataFrame.groupBy("Day").agg(privateCountUdf($"VisitorId") as "private_count").show
Or with the equivalent SparkSQL API:
spark.udf.register("private_count", privateCountUdf)
dataFrame.registerTempTable("visitors")
sql.sql("select Day, private_count(*) from visitors group by Day")
An example of private aggregation is available under examples
.