From 19986353b9197a84092b1d4765ae36986b8d93e8 Mon Sep 17 00:00:00 2001 From: LIDIAgroup Date: Mon, 24 Mar 2014 17:40:44 +0100 Subject: [PATCH 01/10] Added Entropy Minimization Discretization. --- .../discretization/DiscretizerModel.scala | 47 ++ .../mllib/discretization/EMDDiscretizer.scala | 402 ++++++++++++++++++ .../mllib/discretization/MapAccumulator.scala | 53 +++ .../spark/mllib/discretization/Utils.scala | 54 +++ .../apache/spark/mllib/util/InfoTheory.scala | 49 +++ .../discretization/EMDDiscretizerSuite.scala | 60 +++ 6 files changed, 665 insertions(+) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/discretization/DiscretizerModel.scala create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/discretization/EMDDiscretizer.scala create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/discretization/MapAccumulator.scala create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/discretization/Utils.scala create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/util/InfoTheory.scala create mode 100644 mllib/src/test/scala/org/apache/spark/mllib/discretization/EMDDiscretizerSuite.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/discretization/DiscretizerModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/discretization/DiscretizerModel.scala new file mode 100644 index 0000000000000..7b9747e690c7a --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/discretization/DiscretizerModel.scala @@ -0,0 +1,47 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.mllib.discretization + +import org.apache.spark.rdd.RDD + +trait DiscretizerModel extends Serializable { + /** + * Return the thresholds used to discretized the given feature + * + * @param feature The number of the feature to discretize + */ + def getThresholdsForFeature(feature: Int): Seq[Double] + + /** + * Return the thresholds used to discretized the given features + * + * @param features The number of the feature to discretize + */ + def getThresholdsForFeature(features: Seq[Int]): Map[Int, Seq[Double]] + + /** + * Return the thresholds used to discretized the continuous features + */ + def getThresholdsForContinuousFeatures: Map[Int, Seq[Double]] + + /** + * Discretizes an RDD + */ + def discretize: RDD[_] + +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/discretization/EMDDiscretizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/discretization/EMDDiscretizer.scala new file mode 100644 index 0000000000000..39868d16fb54e --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/discretization/EMDDiscretizer.scala @@ -0,0 +1,402 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.mllib.discretization + +import scala.collection.mutable.Stack +import org.apache.spark.SparkContext._ +import org.apache.spark.mllib.util.InfoTheory +import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel +import org.apache.spark.mllib.regression.LabeledPoint +import scala.collection.mutable + + +/** + * This class contains methods to discretize continuous values with the method proposed in + * [Fayyad and Irani, Multi-Interval Discretization of Continuous-Valued Attributes, 1993] + */ +class EMDDiscretizer private ( + @transient var data: RDD[LabeledPoint], + @transient var continousFeatures: Seq[Int], + var elementsPerPartition: Int = 18000, + var maxBins: Int = Int.MaxValue) + extends DiscretizerModel { + + private var thresholds = Map.empty[Int, Seq[Double]] + private val partitions = { x : Long => math.ceil(x.toDouble / elementsPerPartition).toInt } + + def this() = this(null, null) + + /** + * Sets the RDD[LabeledPoint] to be discretized + * + * @param data RDD[LabeledPoint] to be discretized + */ + def setData(data: RDD[LabeledPoint]): EMDDiscretizer = { + this.data = data + this + } + + /** + * Sets the indexes of the features to be discretized + * + * @param continuousFeatures Indexes of features to be discretized + */ + def setContinuousFeatures(continuousFeatures: Seq[Int]): EMDDiscretizer = { + this.continousFeatures = continuousFeatures + this + } + + /** + * Sets the maximum number of elements that a partition should have + * + * @param ratio Maximum number of elements for a partition + * @return The Discretizer with the parameter changed + */ + def setElementsPerPartition(ratio: Int): EMDDiscretizer = { + this.elementsPerPartition = ratio + this + } + + /** + * Sets the maximum number of discrete values + * + * @param maxBins Maximum number of discrete values + * @return The Discretizer with the parameter changed + */ + def setMaxBins(maxBins: Int): EMDDiscretizer = { + this.maxBins = maxBins + this + } + + /** + * Returns the thresholds used to discretized the given feature + * + * @param feature The number of the feature to discretize + */ + def getThresholdsForFeature(feature: Int): Seq[Double] = { + thresholds.get(feature) match { + case Some(a) => a + case None => + val featureValues = data.map({ + case LabeledPoint(label, values) => (values(feature), label.toString) + }) + val sortedValues = featureValues.sortByKey() + val initial_candidates = initialThresholds(sortedValues) + val thresholdsForFeature = this.getThresholds(initial_candidates) + this.thresholds += ((feature, thresholdsForFeature)) + thresholdsForFeature + } + } + + /** + * Returns the thresholds used to discretized the given features + * + * @param features The number of the feature to discretize + */ + def getThresholdsForFeature(features: Seq[Int]): Map[Int, Seq[Double]] = { + for (feature <- features diff this.thresholds.keys.toSeq) { + getThresholdsForFeature(feature) + } + + this.thresholds.filter({ features.contains(_) }) + } + + /** + * Returns the thresholds used to discretized the continuous features + */ + def getThresholdsForContinuousFeatures: Map[Int, Seq[Double]] = { + for (feature <- continousFeatures diff this.thresholds.keys.toSeq) { + getThresholdsForFeature(feature) + } + + this.thresholds + } + + /** + * Calculates the initial candidate treholds for a feature + * @param data RDD of (value, label) pairs + * @return RDD of (candidate, class frequencies between last and current candidate) pairs + */ + private def initialThresholds(data: RDD[(Double, String)]): RDD[(Double, Map[String,Int])] = { + data.mapPartitions({ it => + var lastX = Double.NegativeInfinity + var lastY = "" + var result = Seq.empty[(Double, Map[String, Int])] + var freqs = Map.empty[String, Int] + + for ((x, y) <- it) { + if (x != lastX && y != lastY && lastX != Double.NegativeInfinity) { + // new candidate and interval + result = ((x + lastX) / 2, freqs) +: result + freqs = freqs.empty + ((y, 1)) + } else { + // we continue on the same interval + freqs = freqs.updated(y, freqs.getOrElse(y, 0) + 1) + } + lastX = x + lastY = y + } + + // we add last element as a candidate threshold for convenience + result = (lastX, freqs) +: result + + result.reverse.toIterator + }).persist(StorageLevel.MEMORY_AND_DISK) + } + + /** + * Returns a sequence of doubles that define the intervals to make the discretization. + * + * @param candidates RDD of (value, label) pairs + */ + private def getThresholds(candidates: RDD[(Double, Map[String,Int])]): Seq[Double] = { + + //Create queue + val stack = new mutable.Queue[((Double, Double), Option[Double])] + + //Insert first in the stack + stack.enqueue(((Double.NegativeInfinity, Double.PositiveInfinity), None)) + var result = Seq(Double.NegativeInfinity) + + // While more elements to eval, continue + while(stack.length > 0 && result.size < this.maxBins){ + + val (bounds, lastThresh) = stack.dequeue + + var cands = candidates.filter({ case (th, _) => th > bounds._1 && th <= bounds._2 }) + val nCands = cands.count + if (nCands > 0) { + cands = cands.coalesce(partitions(nCands)) + + evalThresholds(cands, lastThresh) match { + case Some(th) => + result = th +: result + stack.enqueue(((bounds._1, th), Some(th))) + stack.enqueue(((th, bounds._2), Some(th))) + case None => /* criteria not fulfilled, finish */ + } + } + } + (Double.PositiveInfinity +: result).sorted + } + + /** + * Selects one final thresholds among the candidates and returns two partitions to recurse + * + * @param candidates RDD of (candidate, class frequencies between last and current candidate) + * @param lastSelected last selected threshold to avoid selecting it again + */ + private def evalThresholds( + candidates: RDD[(Double, Map[String, Int])], + lastSelected : Option[Double]) = { + + var result = candidates.map({ + case (cand, freqs) => + (cand, freqs, Seq.empty[Int], Seq.empty[Int]) + }).cache + + val numPartitions = candidates.partitions.size + val bc_numPartitions = candidates.context.broadcast(numPartitions) + + // stores accumulated freqs from left to right + val l_total = candidates.context.accumulator(Map.empty[String, Int])(MapAccumulator) + // stores accumulated freqs from right to left + val r_total = candidates.context.accumulator(Map.empty[String, Int])(MapAccumulator) + + // calculates accumulated frequencies for each candidate + (0 until numPartitions) foreach { l2r_i => + + val bc_l_total = l_total.value + val bc_r_total = r_total.value + + result = + result.mapPartitionsWithIndex({ (slice, it) => + + val l2r = slice == l2r_i + val r2l = slice == bc_numPartitions.value - 1 - l2r_i + + if (l2r && r2l) { + + // accumulate both from left to right and right to left + var partialResult = Seq.empty[(Double, Map[String, Int], Seq[Int], Seq[Int])] + var accum = Map.empty[String, Int] + + for ((cand, freqs, _, r_freqs) <- it) { + accum = Utils.sumFreqMaps(accum, freqs) + val l_freqs = Utils.sumFreqMaps(accum, bc_l_total).values.toList + partialResult = (cand, freqs, l_freqs, r_freqs) +: partialResult + } + + l_total += accum + + val r2lIt = partialResult.iterator + partialResult = Seq.empty[(Double, Map[String, Int], Seq[Int], Seq[Int])] + accum = Map.empty[String, Int] + for ((cand, freqs, l_freqs, _) <- r2lIt) { + val r_freqs = Utils.sumFreqMaps(accum, bc_r_total).values.toList + accum = Utils.sumFreqMaps(accum, freqs) + partialResult = (cand, freqs, l_freqs, r_freqs) +: partialResult + } + r_total += accum + + partialResult.iterator + + } else if (l2r) { + + // accumulate freqs from left to right + var partialResult = Seq.empty[(Double, Map[String, Int], Seq[Int], Seq[Int])] + var accum = Map.empty[String, Int] + + for ((cand, freqs, _, r_freqs) <- it) { + accum = Utils.sumFreqMaps(accum, freqs) + val l_freqs = Utils.sumFreqMaps(accum, bc_l_total).values.toList + partialResult = (cand, freqs, l_freqs, r_freqs) +: partialResult + } + + l_total += accum + partialResult.reverseIterator + + } else if (r2l) { + + // accumulate freqs from right to left + var partialResult = Seq.empty[(Double, Map[String, Int], Seq[Int], Seq[Int])] + var accum = Map.empty[String, Int] + val r2lIt = it.toSeq.reverseIterator + + for ((cand, freqs, l_freqs, _) <- r2lIt) { + val r_freqs = Utils.sumFreqMaps(accum, bc_r_total).values.toList + accum = Utils.sumFreqMaps(accum, freqs) + partialResult = (cand, freqs, l_freqs, r_freqs) +: partialResult + } + + r_total += accum + + partialResult.iterator + + } else { + // do nothing in this iteration + it + } + }, true) // important to maintain partitions within the loop + .persist(StorageLevel.MEMORY_AND_DISK) // needed, otherwise spark repeats calculations + + result.foreachPartition({ _ => }) // Forces the iteration to be calculated + } + + // calculate h(S) + // s: number of elements + // k: number of distinct classes + // hs: entropy + + val s = l_total.value.values.reduce(_+_) + val hs = InfoTheory.entropy(l_total.value.values.toSeq, s) + val k = l_total.value.values.size + + // select best threshold according to the criteria + val final_candidates = + result.flatMap({ + case (cand, _, l_freqs, r_freqs) => + + val k1 = l_freqs.size + val s1 = if (k1 > 0) l_freqs.reduce(_ + _) else 0 + val hs1 = InfoTheory.entropy(l_freqs, s1) + + val k2 = r_freqs.size + val s2 = if (k2 > 0) r_freqs.reduce(_ + _) else 0 + val hs2 = InfoTheory.entropy(r_freqs, s2) + + val weighted_hs = (s1 * hs1 + s2 * hs2) / s + val gain = hs - weighted_hs + val delta = Utils.log2(3 ^ k - 2) - (k * hs - k1 * hs1 - k2 * hs2) + var criterion = (gain - (Utils.log2(s - 1) + delta) / s) > -1e-5 + + lastSelected match { + case None => + case Some(last) => criterion = criterion && (cand != last) + } + + if (criterion) { + Seq((weighted_hs, cand)) + } else { + Seq.empty[(Double, Double)] + } + }) + + // choose best candidates and partition data to make recursive calls + if (final_candidates.count > 0) { + val selected_threshold = final_candidates.reduce({ case ((whs1, cand1), (whs2, cand2)) => + if (whs1 < whs2) (whs1, cand1) else (whs2, cand2) + })._2; + Some(selected_threshold) + } else { + None + } + + } + + /** + * Discretizes a value with a set of intervals. + * + * @param value The value to be discretized + * @param thresholds Thresholds used to asign a discrete value + */ + private def assignDiscreteValue(value: Double, thresholds: Seq[Double]) = { + var aux = thresholds.zipWithIndex + while (value > aux.head._1) aux = aux.tail + aux.head._2 + } + + /** + * Discretizes an RDD of (label, array of values) pairs. + */ + def discretize: RDD[LabeledPoint] = { + // calculate thresholds that aren't already calculated + getThresholdsForContinuousFeatures + + val bc_thresholds = this.data.context.broadcast(this.thresholds) + + // applies thresholds to discretize every continuous feature + data.map { + case LabeledPoint(label, values) => + LabeledPoint(label, + values.zipWithIndex map { + case (value, i) => + if (bc_thresholds.value.keySet contains i) { + assignDiscreteValue(value, bc_thresholds.value(i)) + } else { + value + } + }) + } + } + +} + +object EMDDiscretizer { + + def apply( + data: RDD[LabeledPoint], + continuousFeatures: Seq[Int]) + : EMDDiscretizer = { + new EMDDiscretizer(data, continuousFeatures) + } + + def apply: EMDDiscretizer = new EMDDiscretizer() + +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/discretization/MapAccumulator.scala b/mllib/src/main/scala/org/apache/spark/mllib/discretization/MapAccumulator.scala new file mode 100644 index 0000000000000..ec7ba6f60af26 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/discretization/MapAccumulator.scala @@ -0,0 +1,53 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.mllib.discretization + +import org.apache.spark.AccumulatorParam + +object MapAccumulator extends AccumulatorParam[Map[String, Int]] { + + def addInPlace(map1: Map[String, Int], map2: Map[String, Int]): Map[String, Int] = { + if (map1 isEmpty) { + map2 + } else if (map2 isEmpty) { + map1 + } else { + var result = Map.empty[String, Int] + for ((y1, x1) <- map1; (y2, x2) <- map2) { + if (y1.trim() == y2.trim()) { + result += ((y1, x1 + x2)) + } + } + + (map1.keySet diff map2.keySet) foreach { y => + result += ((y, map1(y))) + } + + (map2.keySet diff map1.keySet) foreach { y => + result += ((y, map2(y))) + } + + result + } + } + + def zero(initialValue: Map[String, Int]): Map[String, Int] = { + Map.empty[String, Int] + } + +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/discretization/Utils.scala b/mllib/src/main/scala/org/apache/spark/mllib/discretization/Utils.scala new file mode 100644 index 0000000000000..64de687defa0d --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/discretization/Utils.scala @@ -0,0 +1,54 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.mllib.discretization + +object Utils { + + implicit class MyRichSeq[T](val seq: Seq[T]) extends AnyVal { + + def apply(indexes: Seq[Int]): Option[Seq[T]] = { + if (indexes.length == 0) { + None + } else { + Some(indexes.map(i => seq(i))) + } + } + } + + def sumFreqMaps[A](map1: Map[A, Int], + map2: Map[A, Int]) = { + if (map1 isEmpty) { + map2 + } else if (map2 isEmpty) { + map1 + } else { + Map.empty[A, Int] ++ + (for ((y1, x1) <- map1; (y2, x2) <- map2 if (y1 == y2)) + yield ((y1, x1 + x2))) ++ + (for (y <- (map1.keySet diff map2.keySet)) + yield ((y, map1(y)))) ++ + (for (y <- (map2.keySet diff map1.keySet)) + yield ((y, map2(y)))) + } + } + + @inline def log2(x: Double) = { + math.log(x) / math.log(2) + } + +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/InfoTheory.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/InfoTheory.scala new file mode 100644 index 0000000000000..5ed84558048fc --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/InfoTheory.scala @@ -0,0 +1,49 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.mllib.util + +import org.apache.spark.mllib.discretization.Utils + +/** + * Object with some Information Theory methods. + */ +object InfoTheory { + + /** + * Calculate entropy for the given frequencies. + * + * @param freqs Frequencies of each different class + * @param n Number of elements + */ + def entropy(freqs: Seq[Int], n: Int): Double = { + freqs.aggregate(0.0)({ + case (h, q) => + h + (q.toDouble / n) * Utils.log2(q.toDouble / n) + }, { case (h1, h2) => h1 + h2 }) * -1 + } + + /** + * Calculate entropy for the given frequencies. + * + * @param freqs Frequencies of each different class + */ + def entropy(freqs: Seq[Int]): Double = { + entropy(freqs, freqs.reduce(_ + _)) + } + +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/discretization/EMDDiscretizerSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/discretization/EMDDiscretizerSuite.scala new file mode 100644 index 0000000000000..7720cda86c1db --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/discretization/EMDDiscretizerSuite.scala @@ -0,0 +1,60 @@ +package org.apache.spark.mllib.discretization + +import org.scalatest.FunSuite +import org.apache.spark.mllib.util.LocalSparkContext +import org.apache.spark.rdd.RDD +import org.apache.spark.SparkContext._ +import org.apache.spark.mllib.regression.LabeledPoint +import scala.util.Random +import org.apache.spark.mllib.util.InfoTheory + +object EMDDiscretizerSuite { + val nFeatures = 5 + val nDatapoints = 50 + val nLabels = 3 + val nPartitions = 3 + + def generateLabeledData : Array[LabeledPoint] = + { + + val rnd = new Random(42) + val labels = Array.fill[Double](nLabels)(rnd.nextDouble) + + Array.fill[LabeledPoint](nDatapoints) { + LabeledPoint(labels(rnd.nextInt(nLabels)), + Array.fill[Double](nFeatures)(rnd.nextDouble)) + } + } +} + +class EMDDiscretizerSuite extends FunSuite with LocalSparkContext { + + test("EMD discretization") { + val rnd = new Random() + + val data = + for (i <- 1 to 99) yield + if (i <= 33) { + LabeledPoint(1.0, Array(i.toDouble + rnd.nextDouble*2 - 1)) + } else if (i <= 66) { + LabeledPoint(2.0, Array(i.toDouble + rnd.nextDouble*2 - 1)) + } else { + LabeledPoint(3.0, Array(i.toDouble + rnd.nextDouble*2 - 1)) + } + + val shuffledData = data.sortWith((lp1, lp2) => if (rnd.nextDouble < 0.5) true else false) + + val rdd = sc.parallelize(shuffledData, 3) + + val thresholds = EMDDiscretizer(rdd, Seq(0)).getThresholdsForFeature(0) + + val thresholdsArray = thresholds.toArray + if (math.abs(thresholdsArray(1) - 33.5) > 1.55) { + fail("Selected thresholds aren't what they should be.") + } + if (math.abs(thresholdsArray(2) - 66.5) > 1.55) { + fail("Selected thresholds aren't what they should be.") + } + } + +} \ No newline at end of file From cbf44317646d7b7f123e463189c55b191b3f12d6 Mon Sep 17 00:00:00 2001 From: LIDIAgroup Date: Tue, 25 Mar 2014 11:17:10 +0100 Subject: [PATCH 02/10] Changed discretizer name to EntropyMinimizationDiscretizer --- ...la => EntropyMinimizationDiscretizer.scala} | 18 +++++++++--------- ... EntropyMinimizationDiscretizerSuite.scala} | 6 +++--- 2 files changed, 12 insertions(+), 12 deletions(-) rename mllib/src/main/scala/org/apache/spark/mllib/discretization/{EMDDiscretizer.scala => EntropyMinimizationDiscretizer.scala} (95%) rename mllib/src/test/scala/org/apache/spark/mllib/discretization/{EMDDiscretizerSuite.scala => EntropyMinimizationDiscretizerSuite.scala} (87%) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/discretization/EMDDiscretizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizer.scala similarity index 95% rename from mllib/src/main/scala/org/apache/spark/mllib/discretization/EMDDiscretizer.scala rename to mllib/src/main/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizer.scala index 39868d16fb54e..a4b03b03f7751 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/discretization/EMDDiscretizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizer.scala @@ -30,7 +30,7 @@ import scala.collection.mutable * This class contains methods to discretize continuous values with the method proposed in * [Fayyad and Irani, Multi-Interval Discretization of Continuous-Valued Attributes, 1993] */ -class EMDDiscretizer private ( +class EntropyMinimizationDiscretizer private ( @transient var data: RDD[LabeledPoint], @transient var continousFeatures: Seq[Int], var elementsPerPartition: Int = 18000, @@ -47,7 +47,7 @@ class EMDDiscretizer private ( * * @param data RDD[LabeledPoint] to be discretized */ - def setData(data: RDD[LabeledPoint]): EMDDiscretizer = { + def setData(data: RDD[LabeledPoint]): EntropyMinimizationDiscretizer = { this.data = data this } @@ -57,7 +57,7 @@ class EMDDiscretizer private ( * * @param continuousFeatures Indexes of features to be discretized */ - def setContinuousFeatures(continuousFeatures: Seq[Int]): EMDDiscretizer = { + def setContinuousFeatures(continuousFeatures: Seq[Int]): EntropyMinimizationDiscretizer = { this.continousFeatures = continuousFeatures this } @@ -68,7 +68,7 @@ class EMDDiscretizer private ( * @param ratio Maximum number of elements for a partition * @return The Discretizer with the parameter changed */ - def setElementsPerPartition(ratio: Int): EMDDiscretizer = { + def setElementsPerPartition(ratio: Int): EntropyMinimizationDiscretizer = { this.elementsPerPartition = ratio this } @@ -79,7 +79,7 @@ class EMDDiscretizer private ( * @param maxBins Maximum number of discrete values * @return The Discretizer with the parameter changed */ - def setMaxBins(maxBins: Int): EMDDiscretizer = { + def setMaxBins(maxBins: Int): EntropyMinimizationDiscretizer = { this.maxBins = maxBins this } @@ -388,15 +388,15 @@ class EMDDiscretizer private ( } -object EMDDiscretizer { +object EntropyMinimizationDiscretizer { def apply( data: RDD[LabeledPoint], continuousFeatures: Seq[Int]) - : EMDDiscretizer = { - new EMDDiscretizer(data, continuousFeatures) + : EntropyMinimizationDiscretizer = { + new EntropyMinimizationDiscretizer(data, continuousFeatures) } - def apply: EMDDiscretizer = new EMDDiscretizer() + def apply: EntropyMinimizationDiscretizer = new EntropyMinimizationDiscretizer() } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/discretization/EMDDiscretizerSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizerSuite.scala similarity index 87% rename from mllib/src/test/scala/org/apache/spark/mllib/discretization/EMDDiscretizerSuite.scala rename to mllib/src/test/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizerSuite.scala index 7720cda86c1db..5a8bb3ceb2e5e 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/discretization/EMDDiscretizerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizerSuite.scala @@ -8,7 +8,7 @@ import org.apache.spark.mllib.regression.LabeledPoint import scala.util.Random import org.apache.spark.mllib.util.InfoTheory -object EMDDiscretizerSuite { +object EntropyMinimizationDiscretizerSuite { val nFeatures = 5 val nDatapoints = 50 val nLabels = 3 @@ -27,7 +27,7 @@ object EMDDiscretizerSuite { } } -class EMDDiscretizerSuite extends FunSuite with LocalSparkContext { +class EntropyMinimizationDiscretizerSuite extends FunSuite with LocalSparkContext { test("EMD discretization") { val rnd = new Random() @@ -46,7 +46,7 @@ class EMDDiscretizerSuite extends FunSuite with LocalSparkContext { val rdd = sc.parallelize(shuffledData, 3) - val thresholds = EMDDiscretizer(rdd, Seq(0)).getThresholdsForFeature(0) + val thresholds = EntropyMinimizationDiscretizer(rdd, Seq(0)).getThresholdsForFeature(0) val thresholdsArray = thresholds.toArray if (math.abs(thresholdsArray(1) - 33.5) > 1.55) { From e37cae7b55fec4fb1359629fbf36148b47f2cecb Mon Sep 17 00:00:00 2001 From: LIDIAgroup Date: Tue, 25 Mar 2014 11:19:47 +0100 Subject: [PATCH 03/10] Added documentation to DiscretizerModel. --- .../apache/spark/mllib/discretization/DiscretizerModel.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/discretization/DiscretizerModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/discretization/DiscretizerModel.scala index 7b9747e690c7a..b08e7c93f8e65 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/discretization/DiscretizerModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/discretization/DiscretizerModel.scala @@ -19,6 +19,9 @@ package org.apache.spark.mllib.discretization import org.apache.spark.rdd.RDD +/** + * DiscretizerModel provides a template with the basic methods for future discretizers. + */ trait DiscretizerModel extends Serializable { /** * Return the thresholds used to discretized the given feature From 0104fe0fa693263d070865641ef847483fb2470d Mon Sep 17 00:00:00 2001 From: LIDIAgroup Date: Tue, 25 Mar 2014 13:04:03 +0100 Subject: [PATCH 04/10] Fixed EntropyMinimizationDiscretizer. --- .../EntropyMinimizationDiscretizerSuite.scala | 47 ++++++++++++------- 1 file changed, 29 insertions(+), 18 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizerSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizerSuite.scala index 5a8bb3ceb2e5e..162b9d7fefe09 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizerSuite.scala @@ -1,22 +1,34 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + package org.apache.spark.mllib.discretization +import scala.util.Random import org.scalatest.FunSuite -import org.apache.spark.mllib.util.LocalSparkContext -import org.apache.spark.rdd.RDD -import org.apache.spark.SparkContext._ import org.apache.spark.mllib.regression.LabeledPoint -import scala.util.Random -import org.apache.spark.mllib.util.InfoTheory +import org.apache.spark.mllib.util.LocalSparkContext object EntropyMinimizationDiscretizerSuite { - val nFeatures = 5 - val nDatapoints = 50 - val nLabels = 3 - val nPartitions = 3 + val nFeatures = 5 + val nDatapoints = 50 + val nLabels = 3 + val nPartitions = 3 - def generateLabeledData : Array[LabeledPoint] = - { - + def generateLabeledData : Array[LabeledPoint] = { val rnd = new Random(42) val labels = Array.fill[Double](nLabels)(rnd.nextDouble) @@ -29,11 +41,10 @@ object EntropyMinimizationDiscretizerSuite { class EntropyMinimizationDiscretizerSuite extends FunSuite with LocalSparkContext { - test("EMD discretization") { - val rnd = new Random() + test("EMD discretization") { + val rnd = new Random(13) - val data = - for (i <- 1 to 99) yield + val data = for (i <- 1 to 99) yield if (i <= 33) { LabeledPoint(1.0, Array(i.toDouble + rnd.nextDouble*2 - 1)) } else if (i <= 66) { @@ -42,7 +53,7 @@ class EntropyMinimizationDiscretizerSuite extends FunSuite with LocalSparkContex LabeledPoint(3.0, Array(i.toDouble + rnd.nextDouble*2 - 1)) } - val shuffledData = data.sortWith((lp1, lp2) => if (rnd.nextDouble < 0.5) true else false) + val shuffledData = data.sortWith((lp1, lp2) => rnd.nextDouble < 0.5) val rdd = sc.parallelize(shuffledData, 3) @@ -55,6 +66,6 @@ class EntropyMinimizationDiscretizerSuite extends FunSuite with LocalSparkContex if (math.abs(thresholdsArray(2) - 66.5) > 1.55) { fail("Selected thresholds aren't what they should be.") } - } + } } \ No newline at end of file From d5c9e1c5667feb19e36cafa345047e45f3f5a21a Mon Sep 17 00:00:00 2001 From: LIDIAgroup Date: Tue, 25 Mar 2014 13:22:56 +0100 Subject: [PATCH 05/10] Fixed Apache License format. --- .../discretization/DiscretizerModel.scala | 30 ++++++++--------- .../EntropyMinimizationDiscretizer.scala | 32 +++++++++---------- .../mllib/discretization/MapAccumulator.scala | 30 ++++++++--------- .../spark/mllib/discretization/Utils.scala | 30 ++++++++--------- .../apache/spark/mllib/util/InfoTheory.scala | 30 ++++++++--------- .../EntropyMinimizationDiscretizerSuite.scala | 30 ++++++++--------- 6 files changed, 91 insertions(+), 91 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/discretization/DiscretizerModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/discretization/DiscretizerModel.scala index b08e7c93f8e65..ca1bca666420c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/discretization/DiscretizerModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/discretization/DiscretizerModel.scala @@ -1,19 +1,19 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.spark.mllib.discretization diff --git a/mllib/src/main/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizer.scala index a4b03b03f7751..5f35ee1c51781 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizer.scala @@ -1,19 +1,19 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.spark.mllib.discretization @@ -304,7 +304,7 @@ class EntropyMinimizationDiscretizer private ( // k: number of distinct classes // hs: entropy - val s = l_total.value.values.reduce(_+_) + val s = l_total.value.values.reduce(_ + _) val hs = InfoTheory.entropy(l_total.value.values.toSeq, s) val k = l_total.value.values.size diff --git a/mllib/src/main/scala/org/apache/spark/mllib/discretization/MapAccumulator.scala b/mllib/src/main/scala/org/apache/spark/mllib/discretization/MapAccumulator.scala index ec7ba6f60af26..a27ab7fddf505 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/discretization/MapAccumulator.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/discretization/MapAccumulator.scala @@ -1,19 +1,19 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.spark.mllib.discretization diff --git a/mllib/src/main/scala/org/apache/spark/mllib/discretization/Utils.scala b/mllib/src/main/scala/org/apache/spark/mllib/discretization/Utils.scala index 64de687defa0d..9be45af55156a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/discretization/Utils.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/discretization/Utils.scala @@ -1,19 +1,19 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.spark.mllib.discretization diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/InfoTheory.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/InfoTheory.scala index 5ed84558048fc..5324df4964934 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/InfoTheory.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/InfoTheory.scala @@ -1,19 +1,19 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.spark.mllib.util diff --git a/mllib/src/test/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizerSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizerSuite.scala index 162b9d7fefe09..79297ceb4b44d 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizerSuite.scala @@ -1,19 +1,19 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.spark.mllib.discretization From b456059387a6b9466386cb7c98560f6a6ea7a975 Mon Sep 17 00:00:00 2001 From: LIDIAgroup Date: Tue, 25 Mar 2014 13:37:49 +0100 Subject: [PATCH 06/10] Fixed MapAccumulator. --- .../EntropyMinimizationDiscretizer.scala | 2 +- .../mllib/discretization/MapAccumulator.scala | 21 ++++--------------- 2 files changed, 5 insertions(+), 18 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizer.scala index 5f35ee1c51781..c1ef7baed4811 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizer.scala @@ -94,7 +94,7 @@ class EntropyMinimizationDiscretizer private ( case Some(a) => a case None => val featureValues = data.map({ - case LabeledPoint(label, values) => (values(feature), label.toString) + case LabeledPoint(label, values) => (values(feature), label.toString.trim) }) val sortedValues = featureValues.sortByKey() val initial_candidates = initialThresholds(sortedValues) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/discretization/MapAccumulator.scala b/mllib/src/main/scala/org/apache/spark/mllib/discretization/MapAccumulator.scala index a27ab7fddf505..75e0568026fb7 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/discretization/MapAccumulator.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/discretization/MapAccumulator.scala @@ -19,31 +19,18 @@ package org.apache.spark.mllib.discretization import org.apache.spark.AccumulatorParam -object MapAccumulator extends AccumulatorParam[Map[String, Int]] { +private[discretization] object MapAccumulator extends AccumulatorParam[Map[String, Int]] { def addInPlace(map1: Map[String, Int], map2: Map[String, Int]): Map[String, Int] = { + if (map1 isEmpty) { map2 } else if (map2 isEmpty) { map1 } else { - var result = Map.empty[String, Int] - for ((y1, x1) <- map1; (y2, x2) <- map2) { - if (y1.trim() == y2.trim()) { - result += ((y1, x1 + x2)) - } - } - - (map1.keySet diff map2.keySet) foreach { y => - result += ((y, map1(y))) - } - - (map2.keySet diff map1.keySet) foreach { y => - result += ((y, map2(y))) - } - - result + map2.foldLeft(map1)({ case (acc, (k,v)) => acc.updated(k, acc.getOrElse(k, 0) + 1) }) } + } def zero(initialValue: Map[String, Int]): Map[String, Int] = { From 2ea61eeb53b407cdd9dd9071a4dd795d93abf97c Mon Sep 17 00:00:00 2001 From: LIDIAgroup Date: Fri, 28 Mar 2014 18:07:26 +0100 Subject: [PATCH 07/10] Changed MapAccumulator to ArrayAccumulator and made a big refactor in discretizer architecture. --- ...cumulator.scala => ArrayAccumulator.scala} | 20 +- .../discretization/DiscretizerModel.scala | 27 +- .../EntropyMinimizationDiscretizer.scala | 327 +++++++----------- .../EntropyMinimizationDiscretizerModel.scala | 82 +++++ .../{Utils.scala => InfoTheory.scala} | 50 ++- .../EntropyMinimizationDiscretizerSuite.scala | 26 +- 6 files changed, 253 insertions(+), 279 deletions(-) rename mllib/src/main/scala/org/apache/spark/mllib/discretization/{MapAccumulator.scala => ArrayAccumulator.scala} (65%) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizerModel.scala rename mllib/src/main/scala/org/apache/spark/mllib/discretization/{Utils.scala => InfoTheory.scala} (52%) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/discretization/MapAccumulator.scala b/mllib/src/main/scala/org/apache/spark/mllib/discretization/ArrayAccumulator.scala similarity index 65% rename from mllib/src/main/scala/org/apache/spark/mllib/discretization/MapAccumulator.scala rename to mllib/src/main/scala/org/apache/spark/mllib/discretization/ArrayAccumulator.scala index 75e0568026fb7..7365f18377bb0 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/discretization/MapAccumulator.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/discretization/ArrayAccumulator.scala @@ -19,22 +19,14 @@ package org.apache.spark.mllib.discretization import org.apache.spark.AccumulatorParam -private[discretization] object MapAccumulator extends AccumulatorParam[Map[String, Int]] { - - def addInPlace(map1: Map[String, Int], map2: Map[String, Int]): Map[String, Int] = { - - if (map1 isEmpty) { - map2 - } else if (map2 isEmpty) { - map1 - } else { - map2.foldLeft(map1)({ case (acc, (k,v)) => acc.updated(k, acc.getOrElse(k, 0) + 1) }) - } +private[discretization] object ArrayAccumulator extends AccumulatorParam[Array[Long]] { + def addInPlace(array1: Array[Long], array2: Array[Long]): Array[Long] = { + array2.clone } - def zero(initialValue: Map[String, Int]): Map[String, Int] = { - Map.empty[String, Int] - } + def zero(initialValue: Array[Long]): Array[Long] = { + initialValue +} } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/discretization/DiscretizerModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/discretization/DiscretizerModel.scala index ca1bca666420c..0491c103c469f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/discretization/DiscretizerModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/discretization/DiscretizerModel.scala @@ -22,29 +22,22 @@ import org.apache.spark.rdd.RDD /** * DiscretizerModel provides a template with the basic methods for future discretizers. */ -trait DiscretizerModel extends Serializable { - /** - * Return the thresholds used to discretized the given feature - * - * @param feature The number of the feature to discretize - */ - def getThresholdsForFeature(feature: Int): Seq[Double] +trait DiscretizerModel[T] extends Serializable { /** - * Return the thresholds used to discretized the given features + * Discretizes values for the given data set using the model trained. * - * @param features The number of the feature to discretize + * @param data RDD representing data points to discretize. + * @return RDD with values discretized */ - def getThresholdsForFeature(features: Seq[Int]): Map[Int, Seq[Double]] + def discretize(data: RDD[T]): RDD[T] /** - * Return the thresholds used to discretized the continuous features - */ - def getThresholdsForContinuousFeatures: Map[Int, Seq[Double]] - - /** - * Discretizes an RDD + * Discretizes values for the given data set using the model trained. + * + * @param data Data point to discretize. + * @return Data point with values discretized */ - def discretize: RDD[_] + def discretize(data: T): T } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizer.scala index c1ef7baed4811..813d268c198a3 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizer.scala @@ -17,143 +17,82 @@ package org.apache.spark.mllib.discretization -import scala.collection.mutable.Stack +import scala.collection.mutable import org.apache.spark.SparkContext._ -import org.apache.spark.mllib.util.InfoTheory import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.mllib.regression.LabeledPoint -import scala.collection.mutable - /** - * This class contains methods to discretize continuous values with the method proposed in - * [Fayyad and Irani, Multi-Interval Discretization of Continuous-Valued Attributes, 1993] + * This class contains methods to calculate thresholds to discretize continuous values with the + * method proposed by Fayyad and Irani in Multi-Interval Discretization of Continuous-Valued + * Attributes (1993). + * + * @param continuousFeaturesIndexes Indexes of features to be discretized. + * @param elementsPerPartition Maximum number of thresholds to treat in each RDD partition. + * @param maxBins Maximum number of bins for each discretized feature. */ class EntropyMinimizationDiscretizer private ( - @transient var data: RDD[LabeledPoint], - @transient var continousFeatures: Seq[Int], - var elementsPerPartition: Int = 18000, - var maxBins: Int = Int.MaxValue) - extends DiscretizerModel { - - private var thresholds = Map.empty[Int, Seq[Double]] - private val partitions = { x : Long => math.ceil(x.toDouble / elementsPerPartition).toInt } - - def this() = this(null, null) - - /** - * Sets the RDD[LabeledPoint] to be discretized - * - * @param data RDD[LabeledPoint] to be discretized - */ - def setData(data: RDD[LabeledPoint]): EntropyMinimizationDiscretizer = { - this.data = data - this - } - - /** - * Sets the indexes of the features to be discretized - * - * @param continuousFeatures Indexes of features to be discretized - */ - def setContinuousFeatures(continuousFeatures: Seq[Int]): EntropyMinimizationDiscretizer = { - this.continousFeatures = continuousFeatures - this - } + val continuousFeaturesIndexes: Seq[Int], + val elementsPerPartition: Int, + val maxBins: Int) + extends Serializable { - /** - * Sets the maximum number of elements that a partition should have - * - * @param ratio Maximum number of elements for a partition - * @return The Discretizer with the parameter changed - */ - def setElementsPerPartition(ratio: Int): EntropyMinimizationDiscretizer = { - this.elementsPerPartition = ratio - this - } + private val partitions = { x: Long => math.ceil(x.toDouble / elementsPerPartition).toInt } + private val log2 = { x: Double => math.log(x) / math.log(2) } /** - * Sets the maximum number of discrete values - * - * @param maxBins Maximum number of discrete values - * @return The Discretizer with the parameter changed - */ - def setMaxBins(maxBins: Int): EntropyMinimizationDiscretizer = { - this.maxBins = maxBins - this - } - - /** - * Returns the thresholds used to discretized the given feature - * - * @param feature The number of the feature to discretize + * Run the algorithm with the configured parameters on an input. + * @param data RDD of LabeledPoint's. + * @return A EntropyMinimizationDiscretizerModel with the thresholds to discretize. */ - def getThresholdsForFeature(feature: Int): Seq[Double] = { - thresholds.get(feature) match { - case Some(a) => a - case None => - val featureValues = data.map({ - case LabeledPoint(label, values) => (values(feature), label.toString.trim) - }) - val sortedValues = featureValues.sortByKey() - val initial_candidates = initialThresholds(sortedValues) - val thresholdsForFeature = this.getThresholds(initial_candidates) - this.thresholds += ((feature, thresholdsForFeature)) - thresholdsForFeature - } - } - - /** - * Returns the thresholds used to discretized the given features - * - * @param features The number of the feature to discretize - */ - def getThresholdsForFeature(features: Seq[Int]): Map[Int, Seq[Double]] = { - for (feature <- features diff this.thresholds.keys.toSeq) { - getThresholdsForFeature(feature) + def run(data: RDD[LabeledPoint]) = { + val labels2Int = data.context.broadcast(data.map(_.label).distinct.collect.zipWithIndex.toMap) + val nLabels = labels2Int.value.size + + var thresholds = Map.empty[Int, Seq[Double]] + for (i <- continuousFeaturesIndexes) { + val featureValues = data.map({ + case LabeledPoint(label, values) => (values(i), labels2Int.value(label)) + }) + val sortedValues = featureValues.sortByKey() + val initialCandidates = initialThresholds(sortedValues, nLabels) + val thresholdsForFeature = this.getThresholds(initialCandidates, nLabels) + thresholds += ((i, thresholdsForFeature)) } - this.thresholds.filter({ features.contains(_) }) - } - - /** - * Returns the thresholds used to discretized the continuous features - */ - def getThresholdsForContinuousFeatures: Map[Int, Seq[Double]] = { - for (feature <- continousFeatures diff this.thresholds.keys.toSeq) { - getThresholdsForFeature(feature) - } + new EntropyMinimizationDiscretizerModel(thresholds) - this.thresholds } /** * Calculates the initial candidate treholds for a feature - * @param data RDD of (value, label) pairs - * @return RDD of (candidate, class frequencies between last and current candidate) pairs + * @param data RDD of (value, label) pairs. + * @param nLabels Number of distinct labels in the dataset. + * @return RDD of (candidate, class frequencies between last and current candidate) pairs. */ - private def initialThresholds(data: RDD[(Double, String)]): RDD[(Double, Map[String,Int])] = { + private def initialThresholds(data: RDD[(Double, Int)], nLabels: Int) = { data.mapPartitions({ it => var lastX = Double.NegativeInfinity - var lastY = "" - var result = Seq.empty[(Double, Map[String, Int])] - var freqs = Map.empty[String, Int] + var lastY = Int.MinValue + var result = Seq.empty[(Double, Array[Long])] + var freqs = Array.fill[Long](nLabels)(0L) for ((x, y) <- it) { if (x != lastX && y != lastY && lastX != Double.NegativeInfinity) { // new candidate and interval result = ((x + lastX) / 2, freqs) +: result - freqs = freqs.empty + ((y, 1)) + freqs = Array.fill[Long](nLabels)(0L) + freqs(y) = 1L } else { // we continue on the same interval - freqs = freqs.updated(y, freqs.getOrElse(y, 0) + 1) + freqs(y) += 1 } lastX = x lastY = y } - // we add last element as a candidate threshold for convenience + // we add last element as a candidate threshold for convenience result = (lastX, freqs) +: result result.reverse.toIterator @@ -165,7 +104,7 @@ class EntropyMinimizationDiscretizer private ( * * @param candidates RDD of (value, label) pairs */ - private def getThresholds(candidates: RDD[(Double, Map[String,Int])]): Seq[Double] = { + private def getThresholds(candidates: RDD[(Double, Array[Long])], nLabels: Int): Seq[Double] = { //Create queue val stack = new mutable.Queue[((Double, Double), Option[Double])] @@ -184,7 +123,7 @@ class EntropyMinimizationDiscretizer private ( if (nCands > 0) { cands = cands.coalesce(partitions(nCands)) - evalThresholds(cands, lastThresh) match { + evalThresholds(cands, lastThresh, nLabels) match { case Some(th) => result = th +: result stack.enqueue(((bounds._1, th), Some(th))) @@ -203,89 +142,89 @@ class EntropyMinimizationDiscretizer private ( * @param lastSelected last selected threshold to avoid selecting it again */ private def evalThresholds( - candidates: RDD[(Double, Map[String, Int])], - lastSelected : Option[Double]) = { + candidates: RDD[(Double, Array[Long])], + lastSelected : Option[Double], + nLabels: Int) = { var result = candidates.map({ case (cand, freqs) => - (cand, freqs, Seq.empty[Int], Seq.empty[Int]) + (cand, freqs, Array.empty[Long], Array.empty[Long]) }).cache val numPartitions = candidates.partitions.size - val bc_numPartitions = candidates.context.broadcast(numPartitions) + val bcNumPartitions = candidates.context.broadcast(numPartitions) // stores accumulated freqs from left to right - val l_total = candidates.context.accumulator(Map.empty[String, Int])(MapAccumulator) + val bcLeftTotal = candidates.context.accumulator(Array.fill(nLabels)(0L))(ArrayAccumulator) // stores accumulated freqs from right to left - val r_total = candidates.context.accumulator(Map.empty[String, Int])(MapAccumulator) + val bcRightTotal = candidates.context.accumulator(Array.fill(nLabels)(0L))(ArrayAccumulator) // calculates accumulated frequencies for each candidate - (0 until numPartitions) foreach { l2r_i => + (0 until numPartitions) foreach { l2rIndex => - val bc_l_total = l_total.value - val bc_r_total = r_total.value + val leftTotal = bcLeftTotal.value + val rightTotal = bcRightTotal.value result = result.mapPartitionsWithIndex({ (slice, it) => - val l2r = slice == l2r_i - val r2l = slice == bc_numPartitions.value - 1 - l2r_i + val l2r = slice == l2rIndex + val r2l = slice == bcNumPartitions.value - 1 - l2rIndex if (l2r && r2l) { // accumulate both from left to right and right to left - var partialResult = Seq.empty[(Double, Map[String, Int], Seq[Int], Seq[Int])] - var accum = Map.empty[String, Int] + var partialResult = Seq.empty[(Double, Array[Long], Array[Long], Array[Long])] + var accum = leftTotal - for ((cand, freqs, _, r_freqs) <- it) { - accum = Utils.sumFreqMaps(accum, freqs) - val l_freqs = Utils.sumFreqMaps(accum, bc_l_total).values.toList - partialResult = (cand, freqs, l_freqs, r_freqs) +: partialResult + for ((cand, freqs, _, rightFreqs) <- it) { + for (i <- 0 until nLabels) accum(i) += freqs(i) + partialResult = (cand, freqs, accum.clone, rightFreqs) +: partialResult } - l_total += accum + bcLeftTotal += accum val r2lIt = partialResult.iterator - partialResult = Seq.empty[(Double, Map[String, Int], Seq[Int], Seq[Int])] - accum = Map.empty[String, Int] - for ((cand, freqs, l_freqs, _) <- r2lIt) { - val r_freqs = Utils.sumFreqMaps(accum, bc_r_total).values.toList - accum = Utils.sumFreqMaps(accum, freqs) - partialResult = (cand, freqs, l_freqs, r_freqs) +: partialResult + partialResult = Seq.empty[(Double, Array[Long], Array[Long], Array[Long])] + accum = Array.fill[Long](nLabels)(0L) + + for ((cand, freqs, leftFreqs, _) <- r2lIt) { + partialResult = (cand, freqs, leftFreqs, accum.clone) +: partialResult + for (i <- 0 until nLabels) accum(i) += freqs(i) } - r_total += accum + + bcRightTotal += accum partialResult.iterator } else if (l2r) { // accumulate freqs from left to right - var partialResult = Seq.empty[(Double, Map[String, Int], Seq[Int], Seq[Int])] - var accum = Map.empty[String, Int] + var partialResult = Seq.empty[(Double, Array[Long], Array[Long], Array[Long])] + val accum = leftTotal - for ((cand, freqs, _, r_freqs) <- it) { - accum = Utils.sumFreqMaps(accum, freqs) - val l_freqs = Utils.sumFreqMaps(accum, bc_l_total).values.toList - partialResult = (cand, freqs, l_freqs, r_freqs) +: partialResult + for ((cand, freqs, _, rightFreqs) <- it) { + for (i <- 0 until nLabels) accum(i) += freqs(i) + partialResult = (cand, freqs, accum.clone, rightFreqs) +: partialResult } - l_total += accum + bcLeftTotal += accum partialResult.reverseIterator } else if (r2l) { // accumulate freqs from right to left - var partialResult = Seq.empty[(Double, Map[String, Int], Seq[Int], Seq[Int])] - var accum = Map.empty[String, Int] val r2lIt = it.toSeq.reverseIterator - for ((cand, freqs, l_freqs, _) <- r2lIt) { - val r_freqs = Utils.sumFreqMaps(accum, bc_r_total).values.toList - accum = Utils.sumFreqMaps(accum, freqs) - partialResult = (cand, freqs, l_freqs, r_freqs) +: partialResult + var partialResult = Seq.empty[(Double, Array[Long], Array[Long], Array[Long])] + val accum = rightTotal + + for ((cand, freqs, leftFreqs, _) <- r2lIt) { + partialResult = (cand, freqs, leftFreqs, accum.clone) +: partialResult + for (i <- 0 until nLabels) accum(i) += freqs(i) } - r_total += accum + bcRightTotal += accum partialResult.iterator @@ -297,6 +236,7 @@ class EntropyMinimizationDiscretizer private ( .persist(StorageLevel.MEMORY_AND_DISK) // needed, otherwise spark repeats calculations result.foreachPartition({ _ => }) // Forces the iteration to be calculated + } // calculate h(S) @@ -304,27 +244,27 @@ class EntropyMinimizationDiscretizer private ( // k: number of distinct classes // hs: entropy - val s = l_total.value.values.reduce(_ + _) - val hs = InfoTheory.entropy(l_total.value.values.toSeq, s) - val k = l_total.value.values.size + val s = bcLeftTotal.value.reduce(_ + _) + val hs = InfoTheory.entropy(bcLeftTotal.value.toSeq, s) + val k = bcLeftTotal.value.filter(_ != 0).size // select best threshold according to the criteria - val final_candidates = + val finalCandidates = result.flatMap({ - case (cand, _, l_freqs, r_freqs) => + case (cand, _, leftFreqs, rightFreqs) => - val k1 = l_freqs.size - val s1 = if (k1 > 0) l_freqs.reduce(_ + _) else 0 - val hs1 = InfoTheory.entropy(l_freqs, s1) + val k1 = leftFreqs.filter(_ != 0).size + val s1 = if (k1 > 0) leftFreqs.reduce(_ + _) else 0 + val hs1 = InfoTheory.entropy(leftFreqs, s1) - val k2 = r_freqs.size - val s2 = if (k2 > 0) r_freqs.reduce(_ + _) else 0 - val hs2 = InfoTheory.entropy(r_freqs, s2) + val k2 = rightFreqs.filter(_ != 0).size + val s2 = if (k2 > 0) rightFreqs.reduce(_ + _) else 0 + val hs2 = InfoTheory.entropy(rightFreqs, s2) - val weighted_hs = (s1 * hs1 + s2 * hs2) / s - val gain = hs - weighted_hs - val delta = Utils.log2(3 ^ k - 2) - (k * hs - k1 * hs1 - k2 * hs2) - var criterion = (gain - (Utils.log2(s - 1) + delta) / s) > -1e-5 + val weightedHs = (s1 * hs1 + s2 * hs2) / s + val gain = hs - weightedHs + val delta = log2(3 ^ k - 2) - (k * hs - k1 * hs1 - k2 * hs2) + var criterion = (gain - (log2(s - 1) + delta) / s) > -1e-5 lastSelected match { case None => @@ -332,71 +272,46 @@ class EntropyMinimizationDiscretizer private ( } if (criterion) { - Seq((weighted_hs, cand)) + Seq((weightedHs, cand)) } else { Seq.empty[(Double, Double)] } }) // choose best candidates and partition data to make recursive calls - if (final_candidates.count > 0) { - val selected_threshold = final_candidates.reduce({ case ((whs1, cand1), (whs2, cand2)) => + if (finalCandidates.count > 0) { + val selectedThreshold = finalCandidates.reduce({ case ((whs1, cand1), (whs2, cand2)) => if (whs1 < whs2) (whs1, cand1) else (whs2, cand2) - })._2; - Some(selected_threshold) + })._2 + Some(selectedThreshold) } else { None } } - /** - * Discretizes a value with a set of intervals. - * - * @param value The value to be discretized - * @param thresholds Thresholds used to asign a discrete value - */ - private def assignDiscreteValue(value: Double, thresholds: Seq[Double]) = { - var aux = thresholds.zipWithIndex - while (value > aux.head._1) aux = aux.tail - aux.head._2 - } +} + +object EntropyMinimizationDiscretizer { /** - * Discretizes an RDD of (label, array of values) pairs. + * Train a EntropyMinimizationDiscretizerModel given an RDD of LabeledPoint's. + * @param input RDD of LabeledPoint's. + * @param continuousFeaturesIndexes Indexes of features to be discretized. + * @param maxBins Maximum number of bins for each discretized feature. + * @param elementsPerPartition Maximum number of thresholds to treat in each RDD partition. + * @return A EntropyMinimizationDiscretizerModel which has the thresholds to discretize. */ - def discretize: RDD[LabeledPoint] = { - // calculate thresholds that aren't already calculated - getThresholdsForContinuousFeatures - - val bc_thresholds = this.data.context.broadcast(this.thresholds) - - // applies thresholds to discretize every continuous feature - data.map { - case LabeledPoint(label, values) => - LabeledPoint(label, - values.zipWithIndex map { - case (value, i) => - if (bc_thresholds.value.keySet contains i) { - assignDiscreteValue(value, bc_thresholds.value(i)) - } else { - value - } - }) - } - } - -} + def train( + input: RDD[LabeledPoint], + continuousFeaturesIndexes: Seq[Int], + maxBins: Int = Int.MaxValue, + elementsPerPartition: Int = 20000) + : EntropyMinimizationDiscretizerModel = { -object EntropyMinimizationDiscretizer { + new EntropyMinimizationDiscretizer(continuousFeaturesIndexes, elementsPerPartition, maxBins) + .run(input) - def apply( - data: RDD[LabeledPoint], - continuousFeatures: Seq[Int]) - : EntropyMinimizationDiscretizer = { - new EntropyMinimizationDiscretizer(data, continuousFeatures) } - def apply: EntropyMinimizationDiscretizer = new EntropyMinimizationDiscretizer() - } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizerModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizerModel.scala new file mode 100644 index 0000000000000..7f7184276fb7e --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizerModel.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.discretization + +import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.rdd.RDD + +/** + * This class provides the methods to discretize data with the given thresholds. + * @param thresholds Thresholds used to discretize. + */ +class EntropyMinimizationDiscretizerModel (val thresholds: Map[Int, Seq[Double]]) + extends DiscretizerModel[LabeledPoint] with Serializable { + + /** + * Discretizes values for the given data set using the model trained. + * + * @param data Data point to discretize. + * @return Data point with values discretized + */ + override def discretize(data: LabeledPoint): LabeledPoint = { + val newValues = data.features.zipWithIndex.map({ case (value, i) => + if (this.thresholds.keySet contains i) { + assignDiscreteValue(value, thresholds(i)) + } else { + value + } + }) + LabeledPoint(data.label, newValues) + } + + /** + * Discretizes values for the given data set using the model trained. + * + * @param data RDD representing data points to discretize. + * @return RDD with values discretized + */ + override def discretize(data: RDD[LabeledPoint]): RDD[LabeledPoint] = { + val bc_thresholds = data.context.broadcast(this.thresholds) + + // applies thresholds to discretize every continuous feature + data.map({ case LabeledPoint(label, values) => + val newValues = values.zipWithIndex.map({ case (value, i) => + if (bc_thresholds.value.keySet contains i) { + assignDiscreteValue(value, bc_thresholds.value(i)) + } else { + value + } + }) + LabeledPoint(label, newValues) + }) + } + + + /** + * Discretizes a value with a set of intervals. + * + * @param value The value to be discretized + * @param thresholds Thresholds used to asign a discrete value + */ + private def assignDiscreteValue(value: Double, thresholds: Seq[Double]) = { + var aux = thresholds.zipWithIndex + while (value > aux.head._1) aux = aux.tail + aux.head._2 + } + +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/discretization/Utils.scala b/mllib/src/main/scala/org/apache/spark/mllib/discretization/InfoTheory.scala similarity index 52% rename from mllib/src/main/scala/org/apache/spark/mllib/discretization/Utils.scala rename to mllib/src/main/scala/org/apache/spark/mllib/discretization/InfoTheory.scala index 9be45af55156a..76942fd00cdfa 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/discretization/Utils.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/discretization/InfoTheory.scala @@ -17,38 +17,30 @@ package org.apache.spark.mllib.discretization -object Utils { - - implicit class MyRichSeq[T](val seq: Seq[T]) extends AnyVal { - - def apply(indexes: Seq[Int]): Option[Seq[T]] = { - if (indexes.length == 0) { - None - } else { - Some(indexes.map(i => seq(i))) - } - } - } +/** + * Object with some Information Theory methods. + */ +private[discretization] object InfoTheory { - def sumFreqMaps[A](map1: Map[A, Int], - map2: Map[A, Int]) = { - if (map1 isEmpty) { - map2 - } else if (map2 isEmpty) { - map1 - } else { - Map.empty[A, Int] ++ - (for ((y1, x1) <- map1; (y2, x2) <- map2 if (y1 == y2)) - yield ((y1, x1 + x2))) ++ - (for (y <- (map1.keySet diff map2.keySet)) - yield ((y, map1(y)))) ++ - (for (y <- (map2.keySet diff map1.keySet)) - yield ((y, map2(y)))) - } + /** + * Calculate entropy for the given frequencies. + * + * @param freqs Frequencies of each different class + * @param n Number of elements + */ + def entropy(freqs: Seq[Long], n: Long): Double = { + freqs.aggregate(0.0)({ case (h, q) => + h + (if (q == 0) 0 else (q.toDouble / n) * (math.log(q.toDouble / n) / math.log(2))) + }, { case (h1, h2) => h1 + h2 }) * -1 } - @inline def log2(x: Double) = { - math.log(x) / math.log(2) + /** + * Calculate entropy for the given frequencies. + * + * @param freqs Frequencies of each different class + */ + def entropy(freqs: Seq[Long]): Double = { + entropy(freqs, freqs.reduce(_ + _)) } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizerSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizerSuite.scala index 79297ceb4b44d..9be19b6895856 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizerSuite.scala @@ -27,16 +27,16 @@ object EntropyMinimizationDiscretizerSuite { val nDatapoints = 50 val nLabels = 3 val nPartitions = 3 - - def generateLabeledData : Array[LabeledPoint] = { - val rnd = new Random(42) - val labels = Array.fill[Double](nLabels)(rnd.nextDouble) - - Array.fill[LabeledPoint](nDatapoints) { - LabeledPoint(labels(rnd.nextInt(nLabels)), - Array.fill[Double](nFeatures)(rnd.nextDouble)) - } - } + + def generateLabeledData: Array[LabeledPoint] = { + val rnd = new Random(42) + val labels = Array.fill[Double](nLabels)(rnd.nextDouble) + + Array.fill[LabeledPoint](nDatapoints) { + LabeledPoint(labels(rnd.nextInt(nLabels)), + Array.fill[Double](nFeatures)(rnd.nextDouble)) + } + } } class EntropyMinimizationDiscretizerSuite extends FunSuite with LocalSparkContext { @@ -56,10 +56,10 @@ class EntropyMinimizationDiscretizerSuite extends FunSuite with LocalSparkContex val shuffledData = data.sortWith((lp1, lp2) => rnd.nextDouble < 0.5) val rdd = sc.parallelize(shuffledData, 3) - - val thresholds = EntropyMinimizationDiscretizer(rdd, Seq(0)).getThresholdsForFeature(0) + + val discretizer = EntropyMinimizationDiscretizer.train(rdd, Seq(0)) - val thresholdsArray = thresholds.toArray + val thresholdsArray = discretizer.thresholds(0).toArray if (math.abs(thresholdsArray(1) - 33.5) > 1.55) { fail("Selected thresholds aren't what they should be.") } From 0f3a518c3a90dc6f89fcf7f8c97dbfbc1cf70f47 Mon Sep 17 00:00:00 2001 From: LIDIAgroup Date: Fri, 28 Mar 2014 18:10:49 +0100 Subject: [PATCH 08/10] Removed public InfoTheory --- .../apache/spark/mllib/util/InfoTheory.scala | 49 ------------------- 1 file changed, 49 deletions(-) delete mode 100644 mllib/src/main/scala/org/apache/spark/mllib/util/InfoTheory.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/InfoTheory.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/InfoTheory.scala deleted file mode 100644 index 5324df4964934..0000000000000 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/InfoTheory.scala +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.mllib.util - -import org.apache.spark.mllib.discretization.Utils - -/** - * Object with some Information Theory methods. - */ -object InfoTheory { - - /** - * Calculate entropy for the given frequencies. - * - * @param freqs Frequencies of each different class - * @param n Number of elements - */ - def entropy(freqs: Seq[Int], n: Int): Double = { - freqs.aggregate(0.0)({ - case (h, q) => - h + (q.toDouble / n) * Utils.log2(q.toDouble / n) - }, { case (h1, h2) => h1 + h2 }) * -1 - } - - /** - * Calculate entropy for the given frequencies. - * - * @param freqs Frequencies of each different class - */ - def entropy(freqs: Seq[Int]): Double = { - entropy(freqs, freqs.reduce(_ + _)) - } - -} From bd6ec82ea8d506b2edc202972eaf88e8b404add1 Mon Sep 17 00:00:00 2001 From: LIDIAgroup Date: Mon, 31 Mar 2014 10:33:01 +0200 Subject: [PATCH 09/10] Fixed comment style. --- .../mllib/discretization/EntropyMinimizationDiscretizer.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizer.scala index 813d268c198a3..e0ab1947a2605 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizer.scala @@ -106,10 +106,10 @@ class EntropyMinimizationDiscretizer private ( */ private def getThresholds(candidates: RDD[(Double, Array[Long])], nLabels: Int): Seq[Double] = { - //Create queue + // Create queue val stack = new mutable.Queue[((Double, Double), Option[Double])] - //Insert first in the stack + // Insert first in the stack stack.enqueue(((Double.NegativeInfinity, Double.PositiveInfinity), None)) var result = Seq(Double.NegativeInfinity) From 70b63e42d81f9ca9f9c48bf8435f9620c6800b75 Mon Sep 17 00:00:00 2001 From: LIDIAgroup Date: Wed, 2 Apr 2014 17:42:12 +0200 Subject: [PATCH 10/10] Simplified evalThresholds function. --- .../discretization/ArrayAccumulator.scala | 32 ----- .../EntropyMinimizationDiscretizer.scala | 131 ++++++------------ .../EntropyMinimizationDiscretizerSuite.scala | 46 +++--- 3 files changed, 68 insertions(+), 141 deletions(-) delete mode 100644 mllib/src/main/scala/org/apache/spark/mllib/discretization/ArrayAccumulator.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/discretization/ArrayAccumulator.scala b/mllib/src/main/scala/org/apache/spark/mllib/discretization/ArrayAccumulator.scala deleted file mode 100644 index 7365f18377bb0..0000000000000 --- a/mllib/src/main/scala/org/apache/spark/mllib/discretization/ArrayAccumulator.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.mllib.discretization - -import org.apache.spark.AccumulatorParam - -private[discretization] object ArrayAccumulator extends AccumulatorParam[Array[Long]] { - - def addInPlace(array1: Array[Long], array2: Array[Long]): Array[Long] = { - array2.clone - } - - def zero(initialValue: Array[Long]): Array[Long] = { - initialValue -} - -} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizer.scala index e0ab1947a2605..2b02b462e3a43 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizer.scala @@ -146,107 +146,66 @@ class EntropyMinimizationDiscretizer private ( lastSelected : Option[Double], nLabels: Int) = { - var result = candidates.map({ - case (cand, freqs) => - (cand, freqs, Array.empty[Long], Array.empty[Long]) - }).cache - val numPartitions = candidates.partitions.size - val bcNumPartitions = candidates.context.broadcast(numPartitions) - - // stores accumulated freqs from left to right - val bcLeftTotal = candidates.context.accumulator(Array.fill(nLabels)(0L))(ArrayAccumulator) - // stores accumulated freqs from right to left - val bcRightTotal = candidates.context.accumulator(Array.fill(nLabels)(0L))(ArrayAccumulator) - - // calculates accumulated frequencies for each candidate - (0 until numPartitions) foreach { l2rIndex => - - val leftTotal = bcLeftTotal.value - val rightTotal = bcRightTotal.value - - result = - result.mapPartitionsWithIndex({ (slice, it) => - - val l2r = slice == l2rIndex - val r2l = slice == bcNumPartitions.value - 1 - l2rIndex - - if (l2r && r2l) { - - // accumulate both from left to right and right to left - var partialResult = Seq.empty[(Double, Array[Long], Array[Long], Array[Long])] - var accum = leftTotal - - for ((cand, freqs, _, rightFreqs) <- it) { - for (i <- 0 until nLabels) accum(i) += freqs(i) - partialResult = (cand, freqs, accum.clone, rightFreqs) +: partialResult - } - - bcLeftTotal += accum - - val r2lIt = partialResult.iterator - partialResult = Seq.empty[(Double, Array[Long], Array[Long], Array[Long])] - accum = Array.fill[Long](nLabels)(0L) - - for ((cand, freqs, leftFreqs, _) <- r2lIt) { - partialResult = (cand, freqs, leftFreqs, accum.clone) +: partialResult - for (i <- 0 until nLabels) accum(i) += freqs(i) - } - - bcRightTotal += accum - - partialResult.iterator - } else if (l2r) { + val sc = candidates.sparkContext - // accumulate freqs from left to right - var partialResult = Seq.empty[(Double, Array[Long], Array[Long], Array[Long])] - val accum = leftTotal - - for ((cand, freqs, _, rightFreqs) <- it) { - for (i <- 0 until nLabels) accum(i) += freqs(i) - partialResult = (cand, freqs, accum.clone, rightFreqs) +: partialResult - } + // store total frequencies for each partition + val totals = sc.runJob(candidates, { case it => + val accum = Array.fill(nLabels)(0L) + for ((_, freqs) <- it) { + for (i <- 0 until nLabels) accum(i) += freqs(i) + } + accum + }: (Iterator[(Double, Array[Long])]) => Array[Long]) - bcLeftTotal += accum - partialResult.reverseIterator + val bcTotals = sc.broadcast(totals) - } else if (r2l) { + val result = + candidates.mapPartitionsWithIndex({ (slice, it) => - // accumulate freqs from right to left - val r2lIt = it.toSeq.reverseIterator + // accumulate freqs from left to right + val leftTotal = Array.fill(nLabels)(0L) + for (i <- 0 until slice) { + for (j <- 0 until nLabels) leftTotal(j) += bcTotals.value(i)(j) + } - var partialResult = Seq.empty[(Double, Array[Long], Array[Long], Array[Long])] - val accum = rightTotal + var leftAccum = Seq.empty[(Double, Array[Long], Array[Long])] - for ((cand, freqs, leftFreqs, _) <- r2lIt) { - partialResult = (cand, freqs, leftFreqs, accum.clone) +: partialResult - for (i <- 0 until nLabels) accum(i) += freqs(i) - } + for ((cand, freqs) <- it) { + for (i <- 0 until nLabels) leftTotal(i) += freqs(i) + leftAccum = (cand, freqs, leftTotal.clone) +: leftAccum + } - bcRightTotal += accum + // accumulate freqs from right to left + val rightTotal = Array.fill(nLabels)(0L) + for (i <- slice + 1 until numPartitions) { + for (j <- 0 until nLabels) leftTotal(j) += bcTotals.value(i)(j) + } - partialResult.iterator + var leftAndRightAccum = Seq.empty[(Double, Array[Long], Array[Long], Array[Long])] - } else { - // do nothing in this iteration - it - } - }, true) // important to maintain partitions within the loop - .persist(StorageLevel.MEMORY_AND_DISK) // needed, otherwise spark repeats calculations - - result.foreachPartition({ _ => }) // Forces the iteration to be calculated + for ((cand, freqs, leftFreqs) <- leftAccum) { + leftAndRightAccum = (cand, freqs, leftFreqs, rightTotal.clone) +: leftAndRightAccum + for (i <- 0 until nLabels) rightTotal(i) += freqs(i) + } - } + leftAndRightAccum.iterator + }) // calculate h(S) // s: number of elements // k: number of distinct classes // hs: entropy - val s = bcLeftTotal.value.reduce(_ + _) - val hs = InfoTheory.entropy(bcLeftTotal.value.toSeq, s) - val k = bcLeftTotal.value.filter(_ != 0).size + val total = Array.fill(nLabels)(0L) + for (partition_total <- totals) { + for (i <- 0 until nLabels) total(i) += partition_total(i) + } + + val s = total.reduce(_ + _) + val hs = InfoTheory.entropy(total.toSeq, s) + val k = total.filter(_ != 0).size // select best threshold according to the criteria val finalCandidates = @@ -262,9 +221,9 @@ class EntropyMinimizationDiscretizer private ( val hs2 = InfoTheory.entropy(rightFreqs, s2) val weightedHs = (s1 * hs1 + s2 * hs2) / s - val gain = hs - weightedHs - val delta = log2(3 ^ k - 2) - (k * hs - k1 * hs1 - k2 * hs2) - var criterion = (gain - (log2(s - 1) + delta) / s) > -1e-5 + val gain = hs - weightedHs + val delta = log2(math.pow(3, k) - 2) - (k * hs - k1 * hs1 - k2 * hs2) + var criterion = (gain - (log2(s - 1) + delta) / s) > -1e-5 lastSelected match { case None => diff --git a/mllib/src/test/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizerSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizerSuite.scala index 9be19b6895856..8a045429d8024 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizerSuite.scala @@ -43,29 +43,29 @@ class EntropyMinimizationDiscretizerSuite extends FunSuite with LocalSparkContex test("EMD discretization") { val rnd = new Random(13) - - val data = for (i <- 1 to 99) yield - if (i <= 33) { - LabeledPoint(1.0, Array(i.toDouble + rnd.nextDouble*2 - 1)) - } else if (i <= 66) { - LabeledPoint(2.0, Array(i.toDouble + rnd.nextDouble*2 - 1)) - } else { - LabeledPoint(3.0, Array(i.toDouble + rnd.nextDouble*2 - 1)) - } - - val shuffledData = data.sortWith((lp1, lp2) => rnd.nextDouble < 0.5) - - val rdd = sc.parallelize(shuffledData, 3) + + val data = for (i <- 1 to 99) yield + if (i <= 33) { + LabeledPoint(1.0, Array(i.toDouble + rnd.nextDouble*2 - 1)) + } else if (i <= 66) { + LabeledPoint(2.0, Array(i.toDouble + rnd.nextDouble*2 - 1)) + } else { + LabeledPoint(3.0, Array(i.toDouble + rnd.nextDouble*2 - 1)) + } + + val shuffledData = data.sortWith((lp1, lp2) => rnd.nextDouble < 0.5) + + val rdd = sc.parallelize(shuffledData, 3) - val discretizer = EntropyMinimizationDiscretizer.train(rdd, Seq(0)) - - val thresholdsArray = discretizer.thresholds(0).toArray - if (math.abs(thresholdsArray(1) - 33.5) > 1.55) { - fail("Selected thresholds aren't what they should be.") - } - if (math.abs(thresholdsArray(2) - 66.5) > 1.55) { - fail("Selected thresholds aren't what they should be.") - } + val discretizer = EntropyMinimizationDiscretizer.train(rdd, Seq(0)) + + val thresholdsArray = discretizer.thresholds(0).toArray + if (math.abs(thresholdsArray(1) - 33.5) > 1.55) { + fail("Selected thresholds aren't what they should be.") + } + if (math.abs(thresholdsArray(2) - 66.5) > 1.55) { + fail("Selected thresholds aren't what they should be.") + } } -} \ No newline at end of file +}