/
NaiveBayes.scala
510 lines (446 loc) · 19 KB
/
NaiveBayes.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.mllib.classification
import java.lang.{Iterable => JIterable}
import scala.collection.JavaConverters._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.annotation.Since
import org.apache.spark.internal.Logging
import org.apache.spark.mllib.linalg.{BLAS, DenseMatrix, DenseVector, SparseVector, Vector}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.{Loader, Saveable}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
/**
* Model for Naive Bayes Classifiers.
*
* @param labels list of labels
* @param pi log of class priors, whose dimension is C, number of labels
* @param theta log of class conditional probabilities, whose dimension is C-by-D,
* where D is number of features
* @param modelType The type of NB model to fit can be "multinomial" or "bernoulli"
*/
@Since("0.9.0")
class NaiveBayesModel private[spark] (
@Since("1.0.0") val labels: Array[Double],
@Since("0.9.0") val pi: Array[Double],
@Since("0.9.0") val theta: Array[Array[Double]],
@Since("1.4.0") val modelType: String)
extends ClassificationModel with Serializable with Saveable {
import NaiveBayes.{Bernoulli, Multinomial, supportedModelTypes}
private val piVector = new DenseVector(pi)
private val thetaMatrix = new DenseMatrix(labels.length, theta(0).length, theta.flatten, true)
private[mllib] def this(labels: Array[Double], pi: Array[Double], theta: Array[Array[Double]]) =
this(labels, pi, theta, NaiveBayes.Multinomial)
/** A Java-friendly constructor that takes three Iterable parameters. */
private[mllib] def this(
labels: JIterable[Double],
pi: JIterable[Double],
theta: JIterable[JIterable[Double]]) =
this(labels.asScala.toArray, pi.asScala.toArray, theta.asScala.toArray.map(_.asScala.toArray))
require(supportedModelTypes.contains(modelType),
s"Invalid modelType $modelType. Supported modelTypes are $supportedModelTypes.")
// Bernoulli scoring requires log(condprob) if 1, log(1-condprob) if 0.
// This precomputes log(1.0 - exp(theta)) and its sum which are used for the linear algebra
// application of this condition (in predict function).
private val (thetaMinusNegTheta, negThetaSum) = modelType match {
case Multinomial => (None, None)
case Bernoulli =>
val negTheta = thetaMatrix.map(value => math.log(1.0 - math.exp(value)))
val ones = new DenseVector(Array.fill(thetaMatrix.numCols) {1.0})
val thetaMinusNegTheta = thetaMatrix.map { value =>
value - math.log(1.0 - math.exp(value))
}
(Option(thetaMinusNegTheta), Option(negTheta.multiply(ones)))
case _ =>
// This should never happen.
throw new UnknownError(s"Invalid modelType: $modelType.")
}
@Since("1.0.0")
override def predict(testData: RDD[Vector]): RDD[Double] = {
val bcModel = testData.context.broadcast(this)
testData.mapPartitions { iter =>
val model = bcModel.value
iter.map(model.predict)
}
}
@Since("1.0.0")
override def predict(testData: Vector): Double = {
modelType match {
case Multinomial =>
labels(multinomialCalculation(testData).argmax)
case Bernoulli =>
labels(bernoulliCalculation(testData).argmax)
}
}
/**
* Predict values for the given data set using the model trained.
*
* @param testData RDD representing data points to be predicted
* @return an RDD[Vector] where each entry contains the predicted posterior class probabilities,
* in the same order as class labels
*/
@Since("1.5.0")
def predictProbabilities(testData: RDD[Vector]): RDD[Vector] = {
val bcModel = testData.context.broadcast(this)
testData.mapPartitions { iter =>
val model = bcModel.value
iter.map(model.predictProbabilities)
}
}
/**
* Predict posterior class probabilities for a single data point using the model trained.
*
* @param testData array representing a single data point
* @return predicted posterior class probabilities from the trained model,
* in the same order as class labels
*/
@Since("1.5.0")
def predictProbabilities(testData: Vector): Vector = {
modelType match {
case Multinomial =>
posteriorProbabilities(multinomialCalculation(testData))
case Bernoulli =>
posteriorProbabilities(bernoulliCalculation(testData))
}
}
private def multinomialCalculation(testData: Vector) = {
val prob = thetaMatrix.multiply(testData)
BLAS.axpy(1.0, piVector, prob)
prob
}
private def bernoulliCalculation(testData: Vector) = {
testData.foreachActive((_, value) =>
if (value != 0.0 && value != 1.0) {
throw new SparkException(
s"Bernoulli naive Bayes requires 0 or 1 feature values but found $testData.")
}
)
val prob = thetaMinusNegTheta.get.multiply(testData)
BLAS.axpy(1.0, piVector, prob)
BLAS.axpy(1.0, negThetaSum.get, prob)
prob
}
private def posteriorProbabilities(logProb: DenseVector) = {
val logProbArray = logProb.toArray
val maxLog = logProbArray.max
val scaledProbs = logProbArray.map(lp => math.exp(lp - maxLog))
val probSum = scaledProbs.sum
new DenseVector(scaledProbs.map(_ / probSum))
}
@Since("1.3.0")
override def save(sc: SparkContext, path: String): Unit = {
val data = NaiveBayesModel.SaveLoadV2_0.Data(labels, pi, theta, modelType)
NaiveBayesModel.SaveLoadV2_0.save(sc, path, data)
}
override protected def formatVersion: String = "2.0"
}
@Since("1.3.0")
object NaiveBayesModel extends Loader[NaiveBayesModel] {
import org.apache.spark.mllib.util.Loader._
private[mllib] object SaveLoadV2_0 {
def thisFormatVersion: String = "2.0"
/** Hard-code class name string in case it changes in the future */
def thisClassName: String = "org.apache.spark.mllib.classification.NaiveBayesModel"
/** Model data for model import/export */
case class Data(
labels: Array[Double],
pi: Array[Double],
theta: Array[Array[Double]],
modelType: String)
def save(sc: SparkContext, path: String, data: Data): Unit = {
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()
// Create JSON metadata.
val metadata = compact(render(
("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~
("numFeatures" -> data.theta(0).length) ~ ("numClasses" -> data.pi.length)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path))
// Create Parquet data.
spark.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath(path))
}
@Since("1.3.0")
def load(sc: SparkContext, path: String): NaiveBayesModel = {
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()
// Load Parquet data.
val dataRDD = spark.read.parquet(dataPath(path))
// Check schema explicitly since erasure makes it hard to use match-case for checking.
checkSchema[Data](dataRDD.schema)
val dataArray = dataRDD.select("labels", "pi", "theta", "modelType").take(1)
assert(dataArray.length == 1, s"Unable to load NaiveBayesModel data from: ${dataPath(path)}")
val data = dataArray(0)
val labels = data.getAs[Seq[Double]](0).toArray
val pi = data.getAs[Seq[Double]](1).toArray
val theta = data.getAs[Seq[Seq[Double]]](2).map(_.toArray).toArray
val modelType = data.getString(3)
new NaiveBayesModel(labels, pi, theta, modelType)
}
}
private[mllib] object SaveLoadV1_0 {
def thisFormatVersion: String = "1.0"
/** Hard-code class name string in case it changes in the future */
def thisClassName: String = "org.apache.spark.mllib.classification.NaiveBayesModel"
/** Model data for model import/export */
case class Data(
labels: Array[Double],
pi: Array[Double],
theta: Array[Array[Double]])
def save(sc: SparkContext, path: String, data: Data): Unit = {
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()
// Create JSON metadata.
val metadata = compact(render(
("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~
("numFeatures" -> data.theta(0).length) ~ ("numClasses" -> data.pi.length)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path))
// Create Parquet data.
spark.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath(path))
}
def load(sc: SparkContext, path: String): NaiveBayesModel = {
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()
// Load Parquet data.
val dataRDD = spark.read.parquet(dataPath(path))
// Check schema explicitly since erasure makes it hard to use match-case for checking.
checkSchema[Data](dataRDD.schema)
val dataArray = dataRDD.select("labels", "pi", "theta").take(1)
assert(dataArray.length == 1, s"Unable to load NaiveBayesModel data from: ${dataPath(path)}")
val data = dataArray(0)
val labels = data.getAs[Seq[Double]](0).toArray
val pi = data.getAs[Seq[Double]](1).toArray
val theta = data.getAs[Seq[Seq[Double]]](2).map(_.toArray).toArray
new NaiveBayesModel(labels, pi, theta)
}
}
override def load(sc: SparkContext, path: String): NaiveBayesModel = {
val (loadedClassName, version, metadata) = loadMetadata(sc, path)
val classNameV1_0 = SaveLoadV1_0.thisClassName
val classNameV2_0 = SaveLoadV2_0.thisClassName
val (model, numFeatures, numClasses) = (loadedClassName, version) match {
case (className, "1.0") if className == classNameV1_0 =>
val (numFeatures, numClasses) = ClassificationModel.getNumFeaturesClasses(metadata)
val model = SaveLoadV1_0.load(sc, path)
(model, numFeatures, numClasses)
case (className, "2.0") if className == classNameV2_0 =>
val (numFeatures, numClasses) = ClassificationModel.getNumFeaturesClasses(metadata)
val model = SaveLoadV2_0.load(sc, path)
(model, numFeatures, numClasses)
case _ => throw new Exception(
s"NaiveBayesModel.load did not recognize model with (className, format version):" +
s"($loadedClassName, $version). Supported:\n" +
s" ($classNameV1_0, 1.0)")
}
assert(model.pi.length == numClasses,
s"NaiveBayesModel.load expected $numClasses classes," +
s" but class priors vector pi had ${model.pi.length} elements")
assert(model.theta.length == numClasses,
s"NaiveBayesModel.load expected $numClasses classes," +
s" but class conditionals array theta had ${model.theta.length} elements")
assert(model.theta.forall(_.length == numFeatures),
s"NaiveBayesModel.load expected $numFeatures features," +
s" but class conditionals array theta had elements of size:" +
s" ${model.theta.map(_.length).mkString(",")}")
model
}
}
/**
* Trains a Naive Bayes model given an RDD of `(label, features)` pairs.
*
* This is the Multinomial NB ([[http://tinyurl.com/lsdw6p]]) which can handle all kinds of
* discrete data. For example, by converting documents into TF-IDF vectors, it can be used for
* document classification. By making every vector a 0-1 vector, it can also be used as
* Bernoulli NB ([[http://tinyurl.com/p7c96j6]]). The input feature values must be nonnegative.
*/
@Since("0.9.0")
class NaiveBayes private (
private var lambda: Double,
private var modelType: String) extends Serializable with Logging {
import NaiveBayes.{Bernoulli, Multinomial}
@Since("1.4.0")
def this(lambda: Double) = this(lambda, NaiveBayes.Multinomial)
@Since("0.9.0")
def this() = this(1.0, NaiveBayes.Multinomial)
/** Set the smoothing parameter. Default: 1.0. */
@Since("0.9.0")
def setLambda(lambda: Double): NaiveBayes = {
require(lambda >= 0,
s"Smoothing parameter must be nonnegative but got $lambda")
this.lambda = lambda
this
}
/** Get the smoothing parameter. */
@Since("1.4.0")
def getLambda: Double = lambda
/**
* Set the model type using a string (case-sensitive).
* Supported options: "multinomial" (default) and "bernoulli".
*/
@Since("1.4.0")
def setModelType(modelType: String): NaiveBayes = {
require(NaiveBayes.supportedModelTypes.contains(modelType),
s"NaiveBayes was created with an unknown modelType: $modelType.")
this.modelType = modelType
this
}
/** Get the model type. */
@Since("1.4.0")
def getModelType: String = this.modelType
/**
* Run the algorithm with the configured parameters on an input RDD of LabeledPoint entries.
*
* @param data RDD of [[org.apache.spark.mllib.regression.LabeledPoint]].
*/
@Since("0.9.0")
def run(data: RDD[LabeledPoint]): NaiveBayesModel = {
val requireNonnegativeValues: Vector => Unit = (v: Vector) => {
val values = v match {
case sv: SparseVector => sv.values
case dv: DenseVector => dv.values
}
if (!values.forall(_ >= 0.0)) {
throw new SparkException(s"Naive Bayes requires nonnegative feature values but found $v.")
}
}
val requireZeroOneBernoulliValues: Vector => Unit = (v: Vector) => {
val values = v match {
case sv: SparseVector => sv.values
case dv: DenseVector => dv.values
}
if (!values.forall(v => v == 0.0 || v == 1.0)) {
throw new SparkException(
s"Bernoulli naive Bayes requires 0 or 1 feature values but found $v.")
}
}
// Aggregates term frequencies per label.
// TODO: Calling combineByKey and collect creates two stages, we can implement something
// TODO: similar to reduceByKeyLocally to save one stage.
val aggregated = data.map(p => (p.label, p.features)).combineByKey[(Long, DenseVector)](
createCombiner = (v: Vector) => {
if (modelType == Bernoulli) {
requireZeroOneBernoulliValues(v)
} else {
requireNonnegativeValues(v)
}
(1L, v.copy.toDense)
},
mergeValue = (c: (Long, DenseVector), v: Vector) => {
requireNonnegativeValues(v)
BLAS.axpy(1.0, v, c._2)
(c._1 + 1L, c._2)
},
mergeCombiners = (c1: (Long, DenseVector), c2: (Long, DenseVector)) => {
BLAS.axpy(1.0, c2._2, c1._2)
(c1._1 + c2._1, c1._2)
}
).collect().sortBy(_._1)
val numLabels = aggregated.length
var numDocuments = 0L
aggregated.foreach { case (_, (n, _)) =>
numDocuments += n
}
val numFeatures = aggregated.head match { case (_, (_, v)) => v.size }
val labels = new Array[Double](numLabels)
val pi = new Array[Double](numLabels)
val theta = Array.fill(numLabels)(new Array[Double](numFeatures))
val piLogDenom = math.log(numDocuments + numLabels * lambda)
var i = 0
aggregated.foreach { case (label, (n, sumTermFreqs)) =>
labels(i) = label
pi(i) = math.log(n + lambda) - piLogDenom
val thetaLogDenom = modelType match {
case Multinomial => math.log(sumTermFreqs.values.sum + numFeatures * lambda)
case Bernoulli => math.log(n + 2.0 * lambda)
case _ =>
// This should never happen.
throw new UnknownError(s"Invalid modelType: $modelType.")
}
var j = 0
while (j < numFeatures) {
theta(i)(j) = math.log(sumTermFreqs(j) + lambda) - thetaLogDenom
j += 1
}
i += 1
}
new NaiveBayesModel(labels, pi, theta, modelType)
}
}
/**
* Top-level methods for calling naive Bayes.
*/
@Since("0.9.0")
object NaiveBayes {
/** String name for multinomial model type. */
private[spark] val Multinomial: String = "multinomial"
/** String name for Bernoulli model type. */
private[spark] val Bernoulli: String = "bernoulli"
/* Set of modelTypes that NaiveBayes supports */
private[spark] val supportedModelTypes = Set(Multinomial, Bernoulli)
/**
* Trains a Naive Bayes model given an RDD of `(label, features)` pairs.
*
* This is the default Multinomial NB ([[http://tinyurl.com/lsdw6p]]) which can handle all
* kinds of discrete data. For example, by converting documents into TF-IDF vectors, it
* can be used for document classification.
*
* This version of the method uses a default smoothing parameter of 1.0.
*
* @param input RDD of `(label, array of features)` pairs. Every vector should be a frequency
* vector or a count vector.
*/
@Since("0.9.0")
def train(input: RDD[LabeledPoint]): NaiveBayesModel = {
new NaiveBayes().run(input)
}
/**
* Trains a Naive Bayes model given an RDD of `(label, features)` pairs.
*
* This is the default Multinomial NB ([[http://tinyurl.com/lsdw6p]]) which can handle all
* kinds of discrete data. For example, by converting documents into TF-IDF vectors, it
* can be used for document classification.
*
* @param input RDD of `(label, array of features)` pairs. Every vector should be a frequency
* vector or a count vector.
* @param lambda The smoothing parameter
*/
@Since("0.9.0")
def train(input: RDD[LabeledPoint], lambda: Double): NaiveBayesModel = {
new NaiveBayes(lambda, Multinomial).run(input)
}
/**
* Trains a Naive Bayes model given an RDD of `(label, features)` pairs.
*
* The model type can be set to either Multinomial NB ([[http://tinyurl.com/lsdw6p]])
* or Bernoulli NB ([[http://tinyurl.com/p7c96j6]]). The Multinomial NB can handle
* discrete count data and can be called by setting the model type to "multinomial".
* For example, it can be used with word counts or TF_IDF vectors of documents.
* The Bernoulli model fits presence or absence (0-1) counts. By making every vector a
* 0-1 vector and setting the model type to "bernoulli", the fits and predicts as
* Bernoulli NB.
*
* @param input RDD of `(label, array of features)` pairs. Every vector should be a frequency
* vector or a count vector.
* @param lambda The smoothing parameter
*
* @param modelType The type of NB model to fit from the enumeration NaiveBayesModels, can be
* multinomial or bernoulli
*/
@Since("1.4.0")
def train(input: RDD[LabeledPoint], lambda: Double, modelType: String): NaiveBayesModel = {
require(supportedModelTypes.contains(modelType),
s"NaiveBayes was created with an unknown modelType: $modelType.")
new NaiveBayes(lambda, modelType).run(input)
}
}