From 6bcb09ba21bf5af6a5662e719abd0eb488c3a6c1 Mon Sep 17 00:00:00 2001 From: Xiaoqing Wang Date: Tue, 18 Aug 2015 07:02:26 +0800 Subject: [PATCH 1/3] SPARK-8918 Add @since tags to mllib.clustering --- .../clustering/GaussianMixtureModel.scala | 35 +++++++- .../spark/mllib/clustering/KMeans.scala | 7 ++ .../spark/mllib/clustering/KMeansModel.scala | 34 ++++++-- .../apache/spark/mllib/clustering/LDA.scala | 67 ++++++++++++--- .../spark/mllib/clustering/LDAOptimizer.scala | 12 ++- .../clustering/PowerIterationClustering.scala | 18 ++++ .../mllib/clustering/StreamingKMeans.scala | 86 +++++++++++++++---- 7 files changed, 224 insertions(+), 35 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala index 76aeebd703d4e..f5da4ce79385b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala @@ -43,6 +43,7 @@ import org.apache.spark.sql.{SQLContext, Row} * the weight for Gaussian i, and weights.sum == 1 * @param gaussians Array of MultivariateGaussian where gaussians(i) represents * the Multivariate Gaussian (Normal) Distribution for Gaussian i + * @since 1.3.0 */ @Experimental class GaussianMixtureModel( @@ -53,32 +54,48 @@ class GaussianMixtureModel( override protected def formatVersion = "1.0" + /** + * @since 1.4.0 + */ override def save(sc: SparkContext, path: String): Unit = { GaussianMixtureModel.SaveLoadV1_0.save(sc, path, weights, gaussians) } - /** Number of gaussians in mixture */ + /** + * Number of gaussians in mixture + * @since 1.3.0 + */ def k: Int = weights.length - /** Maps given points to their cluster indices. */ + /** + * Maps given points to their cluster indices. + * @since 1.3.0 + */ def predict(points: RDD[Vector]): RDD[Int] = { val responsibilityMatrix = predictSoft(points) responsibilityMatrix.map(r => r.indexOf(r.max)) } - /** Maps given point to its cluster index. */ + /** + * Maps given point to its cluster index. + * @since 1.4.0 + */ def predict(point: Vector): Int = { val r = computeSoftAssignments(point.toBreeze.toDenseVector, gaussians, weights, k) r.indexOf(r.max) } - /** Java-friendly version of [[predict()]] */ + /** + * Java-friendly version of [[predict()]] + * @since 1.4.0 + */ def predict(points: JavaRDD[Vector]): JavaRDD[java.lang.Integer] = predict(points.rdd).toJavaRDD().asInstanceOf[JavaRDD[java.lang.Integer]] /** * Given the input vectors, return the membership value of each vector * to all mixture components. + * @since 1.3.0 */ def predictSoft(points: RDD[Vector]): RDD[Array[Double]] = { val sc = points.sparkContext @@ -91,6 +108,7 @@ class GaussianMixtureModel( /** * Given the input vector, return the membership values to all mixture components. + * @since 1.4.0 */ def predictSoft(point: Vector): Array[Double] = { computeSoftAssignments(point.toBreeze.toDenseVector, gaussians, weights, k) @@ -115,6 +133,9 @@ class GaussianMixtureModel( } } +/** + * @since 1.4.0 + */ @Experimental object GaussianMixtureModel extends Loader[GaussianMixtureModel] { @@ -147,6 +168,9 @@ object GaussianMixtureModel extends Loader[GaussianMixtureModel] { sc.parallelize(dataArray, 1).toDF().write.parquet(Loader.dataPath(path)) } + /** + * @since 1.4.0 + */ def load(sc: SparkContext, path: String): GaussianMixtureModel = { val dataPath = Loader.dataPath(path) val sqlContext = new SQLContext(sc) @@ -165,6 +189,9 @@ object GaussianMixtureModel extends Loader[GaussianMixtureModel] { } } + /** + * @since 1.4.0 + */ override def load(sc: SparkContext, path: String) : GaussianMixtureModel = { val (loadedClassName, version, metadata) = Loader.loadMetadata(sc, path) implicit val formats = DefaultFormats diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index 0a65403f4ec95..c74de4d139a8c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -430,11 +430,14 @@ class KMeans private ( /** * Top-level methods for calling K-means clustering. + * @since 0.8.0 */ object KMeans { // Initialization mode names + /** @since 0.8.0 */ val RANDOM = "random" + /** @since 0.8.0 */ val K_MEANS_PARALLEL = "k-means||" /** @@ -446,6 +449,7 @@ object KMeans { * @param runs number of parallel runs, defaults to 1. The best model is returned. * @param initializationMode initialization model, either "random" or "k-means||" (default). * @param seed random seed value for cluster initialization + * @since 1.3.0 */ def train( data: RDD[Vector], @@ -470,6 +474,7 @@ object KMeans { * @param maxIterations max number of iterations * @param runs number of parallel runs, defaults to 1. The best model is returned. * @param initializationMode initialization model, either "random" or "k-means||" (default). + * @since 0.8.0 */ def train( data: RDD[Vector], @@ -486,6 +491,7 @@ object KMeans { /** * Trains a k-means model using specified parameters and the default values for unspecified. + * @since 0.8.0 */ def train( data: RDD[Vector], @@ -496,6 +502,7 @@ object KMeans { /** * Trains a k-means model using specified parameters and the default values for unspecified. + * @since 0.8.0 */ def train( data: RDD[Vector], diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala index 96359024fa228..9805fafd31425 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala @@ -34,35 +34,52 @@ import org.apache.spark.sql.Row /** * A clustering model for K-means. Each point belongs to the cluster with the closest center. + * @since 0.8.0 */ class KMeansModel ( val clusterCenters: Array[Vector]) extends Saveable with Serializable with PMMLExportable { - /** A Java-friendly constructor that takes an Iterable of Vectors. */ + /** + * A Java-friendly constructor that takes an Iterable of Vectors. + * @since 1.4.0 + */ def this(centers: java.lang.Iterable[Vector]) = this(centers.asScala.toArray) - /** Total number of clusters. */ + /** + * Total number of clusters. + * @since 0.8.0 + */ def k: Int = clusterCenters.length - /** Returns the cluster index that a given point belongs to. */ + /** + * Returns the cluster index that a given point belongs to. + * @since 0.8.0 + */ def predict(point: Vector): Int = { KMeans.findClosest(clusterCentersWithNorm, new VectorWithNorm(point))._1 } - /** Maps given points to their cluster indices. */ + /** + * Maps given points to their cluster indices. + * @since 1.0.0 + */ def predict(points: RDD[Vector]): RDD[Int] = { val centersWithNorm = clusterCentersWithNorm val bcCentersWithNorm = points.context.broadcast(centersWithNorm) points.map(p => KMeans.findClosest(bcCentersWithNorm.value, new VectorWithNorm(p))._1) } - /** Maps given points to their cluster indices. */ + /** + * Maps given points to their cluster indices. + * @since 1.0.0 + */ def predict(points: JavaRDD[Vector]): JavaRDD[java.lang.Integer] = predict(points.rdd).toJavaRDD().asInstanceOf[JavaRDD[java.lang.Integer]] /** * Return the K-means cost (sum of squared distances of points to their nearest center) for this * model on the given data. + * @since 0.8.0 */ def computeCost(data: RDD[Vector]): Double = { val centersWithNorm = clusterCentersWithNorm @@ -73,13 +90,20 @@ class KMeansModel ( private def clusterCentersWithNorm: Iterable[VectorWithNorm] = clusterCenters.map(new VectorWithNorm(_)) + /** + * @since 1.4.0 + */ override def save(sc: SparkContext, path: String): Unit = { KMeansModel.SaveLoadV1_0.save(sc, this, path) } + /** @since 1.4.0 */ override protected def formatVersion: String = "1.0" } +/** + * @since 1.4.0 + */ object KMeansModel extends Loader[KMeansModel] { override def load(sc: SparkContext, path: String): KMeansModel = { KMeansModel.SaveLoadV1_0.load(sc, path) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala index 0fc9b1ac4d716..02b7b881c4613 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala @@ -43,6 +43,7 @@ import org.apache.spark.util.Utils * * @see [[http://en.wikipedia.org/wiki/Latent_Dirichlet_allocation Latent Dirichlet allocation * (Wikipedia)]] + * @since 1.3.0 */ @Experimental class LDA private ( @@ -60,12 +61,15 @@ class LDA private ( /** * Number of topics to infer. I.e., the number of soft cluster centers. + * + * @since 1.3.0 */ def getK: Int = k /** * Number of topics to infer. I.e., the number of soft cluster centers. * (default = 10) + * @since 1.3.0 */ def setK(k: Int): this.type = { require(k > 0, s"LDA k (number of clusters) must be > 0, but was set to $k") @@ -78,6 +82,7 @@ class LDA private ( * distributions over topics ("theta"). * * This is the parameter to a Dirichlet distribution. + * @since 1.5.0 */ def getAsymmetricDocConcentration: Vector = this.docConcentration @@ -87,6 +92,7 @@ class LDA private ( * * This method assumes the Dirichlet distribution is symmetric and can be described by a single * [[Double]] parameter. It should fail if docConcentration is asymmetric. + * @since 1.3.0 */ def getDocConcentration: Double = { val parameter = docConcentration(0) @@ -121,6 +127,7 @@ class LDA private ( * - Values should be >= 0 * - default = uniformly (1.0 / k), following the implementation from * [[https://github.com/Blei-Lab/onlineldavb]]. + * @since 1.5.0 */ def setDocConcentration(docConcentration: Vector): this.type = { require(docConcentration.size > 0, "docConcentration must have > 0 elements") @@ -128,22 +135,37 @@ class LDA private ( this } - /** Replicates a [[Double]] docConcentration to create a symmetric prior. */ + /** + * Replicates a [[Double]] docConcentration to create a symmetric prior. + * @since 1.3.0 + */ def setDocConcentration(docConcentration: Double): this.type = { this.docConcentration = Vectors.dense(docConcentration) this } - /** Alias for [[getAsymmetricDocConcentration]] */ + /** + * Alias for [[getAsymmetricDocConcentration]] + * @since 1.5.0 + */ def getAsymmetricAlpha: Vector = getAsymmetricDocConcentration - /** Alias for [[getDocConcentration]] */ + /** + * Alias for [[getDocConcentration]] + * @since 1.3.0 + */ def getAlpha: Double = getDocConcentration - /** Alias for [[setDocConcentration()]] */ + /** + * Alias for [[setDocConcentration()]] + * @since 1.5.0 + */ def setAlpha(alpha: Vector): this.type = setDocConcentration(alpha) - /** Alias for [[setDocConcentration()]] */ + /** + * Alias for [[setDocConcentration()]] + * @since 1.3.0 + */ def setAlpha(alpha: Double): this.type = setDocConcentration(alpha) /** @@ -154,6 +176,7 @@ class LDA private ( * * Note: The topics' distributions over terms are called "beta" in the original LDA paper * by Blei et al., but are called "phi" in many later papers such as Asuncion et al., 2009. + * @since 1.3.0 */ def getTopicConcentration: Double = this.topicConcentration @@ -178,36 +201,51 @@ class LDA private ( * - Value should be >= 0 * - default = (1.0 / k), following the implementation from * [[https://github.com/Blei-Lab/onlineldavb]]. + * @since 1.3.0 */ def setTopicConcentration(topicConcentration: Double): this.type = { this.topicConcentration = topicConcentration this } - /** Alias for [[getTopicConcentration]] */ + /** + * Alias for [[getTopicConcentration]] + * @since 1.3.0 + */ def getBeta: Double = getTopicConcentration - /** Alias for [[setTopicConcentration()]] */ + /** + * Alias for [[setTopicConcentration()]] + * @since 1.3.0 + */ def setBeta(beta: Double): this.type = setTopicConcentration(beta) /** * Maximum number of iterations for learning. + * @since 1.3.0 */ def getMaxIterations: Int = maxIterations /** * Maximum number of iterations for learning. * (default = 20) + * @since 1.3.0 */ def setMaxIterations(maxIterations: Int): this.type = { this.maxIterations = maxIterations this } - /** Random seed */ + /** + * Random seed + * @since 1.3.0 + */ def getSeed: Long = seed - /** Random seed */ + /** + * Random seed + * @since 1.3.0 + */ def setSeed(seed: Long): this.type = { this.seed = seed this @@ -215,6 +253,7 @@ class LDA private ( /** * Period (in iterations) between checkpoints. + * @since 1.3.0 */ def getCheckpointInterval: Int = checkpointInterval @@ -225,6 +264,7 @@ class LDA private ( * [[org.apache.spark.SparkContext]], this setting is ignored. * * @see [[org.apache.spark.SparkContext#setCheckpointDir]] + * @since 1.3.0 */ def setCheckpointInterval(checkpointInterval: Int): this.type = { this.checkpointInterval = checkpointInterval @@ -236,6 +276,7 @@ class LDA private ( * :: DeveloperApi :: * * LDAOptimizer used to perform the actual calculation + * @since 1.4.0 */ @DeveloperApi def getOptimizer: LDAOptimizer = ldaOptimizer @@ -244,6 +285,7 @@ class LDA private ( * :: DeveloperApi :: * * LDAOptimizer used to perform the actual calculation (default = EMLDAOptimizer) + * @since 1.4.0 */ @DeveloperApi def setOptimizer(optimizer: LDAOptimizer): this.type = { @@ -254,6 +296,7 @@ class LDA private ( /** * Set the LDAOptimizer used to perform the actual calculation by algorithm name. * Currently "em", "online" are supported. + * @since 1.4.0 */ def setOptimizer(optimizerName: String): this.type = { this.ldaOptimizer = @@ -274,6 +317,7 @@ class LDA private ( * (where the vocabulary size is the length of the vector). * Document IDs must be unique and >= 0. * @return Inferred LDA model + * @since 1.3.0 */ def run(documents: RDD[(Long, Vector)]): LDAModel = { val state = ldaOptimizer.initialize(documents, this) @@ -289,7 +333,10 @@ class LDA private ( state.getLDAModel(iterationTimes) } - /** Java-friendly version of [[run()]] */ + /** + * Java-friendly version of [[run()]] + * @since 1.3.0 + */ def run(documents: JavaPairRDD[java.lang.Long, Vector]): LDAModel = { run(documents.rdd.asInstanceOf[RDD[(Long, Vector)]]) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala index a0008f9c99ad7..360241c8081ac 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala @@ -35,6 +35,7 @@ import org.apache.spark.rdd.RDD * * An LDAOptimizer specifies which optimization/learning/inference algorithm to use, and it can * hold optimizer-specific parameters for users to set. + * @since 1.4.0 */ @DeveloperApi sealed trait LDAOptimizer { @@ -73,7 +74,7 @@ sealed trait LDAOptimizer { * - Paper which clearly explains several algorithms, including EM: * Asuncion, Welling, Smyth, and Teh. * "On Smoothing and Inference for Topic Models." UAI, 2009. - * + * @since 1.4.0 */ @DeveloperApi final class EMLDAOptimizer extends LDAOptimizer { @@ -225,6 +226,7 @@ final class EMLDAOptimizer extends LDAOptimizer { * * Original Online LDA paper: * Hoffman, Blei and Bach, "Online Learning for Latent Dirichlet Allocation." NIPS, 2010. + * @since 1.4.0 */ @DeveloperApi final class OnlineLDAOptimizer extends LDAOptimizer { @@ -274,6 +276,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer { /** * A (positive) learning parameter that downweights early iterations. Larger values make early * iterations count less. + * @since 1.4.0 */ def getTau0: Double = this.tau0 @@ -281,6 +284,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer { * A (positive) learning parameter that downweights early iterations. Larger values make early * iterations count less. * Default: 1024, following the original Online LDA paper. + * @since 1.4.0 */ def setTau0(tau0: Double): this.type = { require(tau0 > 0, s"LDA tau0 must be positive, but was set to $tau0") @@ -290,6 +294,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer { /** * Learning rate: exponential decay rate + * @since 1.4.0 */ def getKappa: Double = this.kappa @@ -297,6 +302,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer { * Learning rate: exponential decay rate---should be between * (0.5, 1.0] to guarantee asymptotic convergence. * Default: 0.51, based on the original Online LDA paper. + * @since 1.4.0 */ def setKappa(kappa: Double): this.type = { require(kappa >= 0, s"Online LDA kappa must be nonnegative, but was set to $kappa") @@ -306,6 +312,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer { /** * Mini-batch fraction, which sets the fraction of document sampled and used in each iteration + * @since 1.4.0 */ def getMiniBatchFraction: Double = this.miniBatchFraction @@ -318,6 +325,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer { * maxIterations * miniBatchFraction >= 1. * * Default: 0.05, i.e., 5% of total documents. + * @since 1.4.0 */ def setMiniBatchFraction(miniBatchFraction: Double): this.type = { require(miniBatchFraction > 0.0 && miniBatchFraction <= 1.0, @@ -329,6 +337,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer { /** * Optimize alpha, indicates whether alpha (Dirichlet parameter for document-topic distribution) * will be optimized during training. + * @since 1.5.0 */ def getOptimzeAlpha: Boolean = this.optimizeAlpha @@ -336,6 +345,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer { * Sets whether to optimize alpha parameter during training. * * Default: false + * @since 1.5.0 */ def setOptimzeAlpha(optimizeAlpha: Boolean): this.type = { this.optimizeAlpha = optimizeAlpha diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala index 407e43a024a2e..9ad9643a695e8 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala @@ -39,12 +39,16 @@ import org.apache.spark.{Logging, SparkContext, SparkException} * * @param k number of clusters * @param assignments an RDD of clustering [[PowerIterationClustering#Assignment]]s + * @since 1.3.0 */ @Experimental class PowerIterationClusteringModel( val k: Int, val assignments: RDD[PowerIterationClustering.Assignment]) extends Saveable with Serializable { + /** + * @since 1.4.0 + */ override def save(sc: SparkContext, path: String): Unit = { PowerIterationClusteringModel.SaveLoadV1_0.save(sc, this, path) } @@ -52,6 +56,9 @@ class PowerIterationClusteringModel( override protected def formatVersion: String = "1.0" } +/** + * @since 1.4.0 + */ object PowerIterationClusteringModel extends Loader[PowerIterationClusteringModel] { override def load(sc: SparkContext, path: String): PowerIterationClusteringModel = { PowerIterationClusteringModel.SaveLoadV1_0.load(sc, path) @@ -65,6 +72,9 @@ object PowerIterationClusteringModel extends Loader[PowerIterationClusteringMode private[clustering] val thisClassName = "org.apache.spark.mllib.clustering.PowerIterationClusteringModel" + /** + * @since 1.4.0 + */ def save(sc: SparkContext, model: PowerIterationClusteringModel, path: String): Unit = { val sqlContext = new SQLContext(sc) import sqlContext.implicits._ @@ -77,6 +87,9 @@ object PowerIterationClusteringModel extends Loader[PowerIterationClusteringMode dataRDD.write.parquet(Loader.dataPath(path)) } + /** + * @since 1.4.0 + */ def load(sc: SparkContext, path: String): PowerIterationClusteringModel = { implicit val formats = DefaultFormats val sqlContext = new SQLContext(sc) @@ -164,6 +177,7 @@ class PowerIterationClustering private[clustering] ( * assume s,,ij,, = 0.0. * * @return a [[PowerIterationClusteringModel]] that contains the clustering result + * @since 1.5.0 */ def run(graph: Graph[Double, Double]): PowerIterationClusteringModel = { val w = normalize(graph) @@ -221,6 +235,9 @@ class PowerIterationClustering private[clustering] ( } } +/** + * @since 1.3.0 + */ @Experimental object PowerIterationClustering extends Logging { @@ -229,6 +246,7 @@ object PowerIterationClustering extends Logging { * Cluster assignment. * @param id node id * @param cluster assigned cluster id + * @since 1.3.0 */ @Experimental case class Assignment(id: Long, cluster: Int) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala index d9b34cec64894..329a4f2dfe09c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala @@ -63,6 +63,7 @@ import org.apache.spark.util.random.XORShiftRandom * such that at time t + h the discount applied to the data from t is 0.5. * The definition remains the same whether the time unit is given * as batches or points. + * @since 1.2.0 * */ @Experimental @@ -70,22 +71,38 @@ class StreamingKMeansModel( override val clusterCenters: Array[Vector], val clusterWeights: Array[Double]) extends KMeansModel(clusterCenters) with Logging { - /** Perform a k-means update on a batch of data. */ + /** + * Perform a k-means update on a batch of data. + * @since 1.2.0 + */ def update(data: RDD[Vector], decayFactor: Double, timeUnit: String): StreamingKMeansModel = { - // find nearest cluster to each point + /** + * find nearest cluster to each point + * @since 1.2.0 + */ val closest = data.map(point => (this.predict(point), (point, 1L))) - // get sums and counts for updating each cluster + /** + * get sums and counts for updating each cluster + * @since 1.2.0 + */ val mergeContribs: ((Vector, Long), (Vector, Long)) => (Vector, Long) = (p1, p2) => { BLAS.axpy(1.0, p2._1, p1._1) (p1._1, p1._2 + p2._2) } val dim = clusterCenters(0).size + + /** + * @since 1.2.0 + */ val pointStats: Array[(Int, (Vector, Long))] = closest .aggregateByKey((Vectors.zeros(dim), 0L))(mergeContribs, mergeContribs) .collect() + /** + * @since 1.2.0 + */ val discount = timeUnit match { case StreamingKMeans.BATCHES => decayFactor case StreamingKMeans.POINTS => @@ -95,10 +112,16 @@ class StreamingKMeansModel( math.pow(decayFactor, numNewPoints) } - // apply discount to weights + /** + * apply discount to weights + * @since 1.2.0 + */ BLAS.scal(discount, Vectors.dense(clusterWeights)) - // implement update rule + /** + * implement update rule + * @since 1.2.0 + */ pointStats.foreach { case (label, (sum, count)) => val centroid = clusterCenters(label) @@ -118,7 +141,10 @@ class StreamingKMeansModel( logInfo(s"Cluster $label updated with weight $updatedWeight and centroid: $display") } - // Check whether the smallest cluster is dying. If so, split the largest cluster. + /** + * Check whether the smallest cluster is dying. If so, split the largest cluster. + * @since 1.2.0 + */ val weightsWithIndex = clusterWeights.view.zipWithIndex val (maxWeight, largest) = weightsWithIndex.maxBy(_._1) val (minWeight, smallest) = weightsWithIndex.minBy(_._1) @@ -161,6 +187,7 @@ class StreamingKMeansModel( * .setRandomCenters(5, 100.0) * .trainOn(DStream) * }}} + * @since 1.2.0 */ @Experimental class StreamingKMeans( @@ -168,23 +195,33 @@ class StreamingKMeans( var decayFactor: Double, var timeUnit: String) extends Logging with Serializable { + /** @since 1.2.0 */ def this() = this(2, 1.0, StreamingKMeans.BATCHES) - + /** @since 1.2.0 */ protected var model: StreamingKMeansModel = new StreamingKMeansModel(null, null) - /** Set the number of clusters. */ + /** + * Set the number of clusters. + * @since 1.2.0 + */ def setK(k: Int): this.type = { this.k = k this } - /** Set the decay factor directly (for forgetful algorithms). */ + /** + * Set the decay factor directly (for forgetful algorithms). + * @since 1.2.0 + */ def setDecayFactor(a: Double): this.type = { this.decayFactor = a this } - /** Set the half life and time unit ("batches" or "points") for forgetful algorithms. */ + /** + * Set the half life and time unit ("batches" or "points") for forgetful algorithms. + * @since 1.2.0 + */ def setHalfLife(halfLife: Double, timeUnit: String): this.type = { if (timeUnit != StreamingKMeans.BATCHES && timeUnit != StreamingKMeans.POINTS) { throw new IllegalArgumentException("Invalid time unit for decay: " + timeUnit) @@ -195,7 +232,10 @@ class StreamingKMeans( this } - /** Specify initial centers directly. */ + /** + * Specify initial centers directly. + * @since 1.2.0 + */ def setInitialCenters(centers: Array[Vector], weights: Array[Double]): this.type = { model = new StreamingKMeansModel(centers, weights) this @@ -207,6 +247,7 @@ class StreamingKMeans( * @param dim Number of dimensions * @param weight Weight for each center * @param seed Random seed + * @since 1.2.0 */ def setRandomCenters(dim: Int, weight: Double, seed: Long = Utils.random.nextLong): this.type = { val random = new XORShiftRandom(seed) @@ -216,7 +257,10 @@ class StreamingKMeans( this } - /** Return the latest model. */ + /** + * Return the latest model. + * @since 1.2.0 + */ def latestModel(): StreamingKMeansModel = { model } @@ -228,6 +272,7 @@ class StreamingKMeans( * and updates the model using each batch of data from the stream. * * @param data DStream containing vector data + * @since 1.2.0 */ def trainOn(data: DStream[Vector]) { assertInitialized() @@ -236,7 +281,10 @@ class StreamingKMeans( } } - /** Java-friendly version of `trainOn`. */ + /** + * Java-friendly version of `trainOn`. + * @since 1.4.0 + */ def trainOn(data: JavaDStream[Vector]): Unit = trainOn(data.dstream) /** @@ -244,13 +292,17 @@ class StreamingKMeans( * * @param data DStream containing vector data * @return DStream containing predictions + * @since 1.2.0 */ def predictOn(data: DStream[Vector]): DStream[Int] = { assertInitialized() data.map(model.predict) } - /** Java-friendly version of `predictOn`. */ + /** + * Java-friendly version of `predictOn`. + * @since 1.4.0 + */ def predictOn(data: JavaDStream[Vector]): JavaDStream[java.lang.Integer] = { JavaDStream.fromDStream(predictOn(data.dstream).asInstanceOf[DStream[java.lang.Integer]]) } @@ -261,13 +313,17 @@ class StreamingKMeans( * @param data DStream containing (key, feature vector) pairs * @tparam K key type * @return DStream containing the input keys and the predictions as values + * @since 1.2.0 */ def predictOnValues[K: ClassTag](data: DStream[(K, Vector)]): DStream[(K, Int)] = { assertInitialized() data.mapValues(model.predict) } - /** Java-friendly version of `predictOnValues`. */ + /** + * Java-friendly version of `predictOnValues`. + * @since 1.4.0 + */ def predictOnValues[K]( data: JavaPairDStream[K, Vector]): JavaPairDStream[K, java.lang.Integer] = { implicit val tag = fakeClassTag[K] From c679b975cf2fe5d463bee4ca665dced7504af107 Mon Sep 17 00:00:00 2001 From: Xiaoqing Wang Date: Tue, 18 Aug 2015 21:55:58 +0800 Subject: [PATCH 2/3] update code style,tab replaced by blank --- .../clustering/GaussianMixtureModel.scala | 42 +++++++++---------- .../spark/mllib/clustering/KMeansModel.scala | 8 ++-- .../clustering/PowerIterationClustering.scala | 18 ++++---- .../mllib/clustering/StreamingKMeans.scala | 20 ++++----- 4 files changed, 44 insertions(+), 44 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala index f5da4ce79385b..52cff49d75deb 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala @@ -54,41 +54,41 @@ class GaussianMixtureModel( override protected def formatVersion = "1.0" - /** - * @since 1.4.0 - */ + /** + * @since 1.4.0 + */ override def save(sc: SparkContext, path: String): Unit = { GaussianMixtureModel.SaveLoadV1_0.save(sc, path, weights, gaussians) } /** - * Number of gaussians in mixture - * @since 1.3.0 - */ + * Number of gaussians in mixture + * @since 1.3.0 + */ def k: Int = weights.length /** - * Maps given points to their cluster indices. - * @since 1.3.0 - */ + * Maps given points to their cluster indices. + * @since 1.3.0 + */ def predict(points: RDD[Vector]): RDD[Int] = { val responsibilityMatrix = predictSoft(points) responsibilityMatrix.map(r => r.indexOf(r.max)) } /** - * Maps given point to its cluster index. - * @since 1.4.0 - */ + * Maps given point to its cluster index. + * @since 1.4.0 + */ def predict(point: Vector): Int = { val r = computeSoftAssignments(point.toBreeze.toDenseVector, gaussians, weights, k) r.indexOf(r.max) } /** - * Java-friendly version of [[predict()]] - * @since 1.4.0 - */ + * Java-friendly version of [[predict()]] + * @since 1.4.0 + */ def predict(points: JavaRDD[Vector]): JavaRDD[java.lang.Integer] = predict(points.rdd).toJavaRDD().asInstanceOf[JavaRDD[java.lang.Integer]] @@ -168,9 +168,9 @@ object GaussianMixtureModel extends Loader[GaussianMixtureModel] { sc.parallelize(dataArray, 1).toDF().write.parquet(Loader.dataPath(path)) } - /** - * @since 1.4.0 - */ + /** + * @since 1.4.0 + */ def load(sc: SparkContext, path: String): GaussianMixtureModel = { val dataPath = Loader.dataPath(path) val sqlContext = new SQLContext(sc) @@ -189,9 +189,9 @@ object GaussianMixtureModel extends Loader[GaussianMixtureModel] { } } - /** - * @since 1.4.0 - */ + /** + * @since 1.4.0 + */ override def load(sc: SparkContext, path: String) : GaussianMixtureModel = { val (loadedClassName, version, metadata) = Loader.loadMetadata(sc, path) implicit val formats = DefaultFormats diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala index 9805fafd31425..4a8be2cfda262 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala @@ -90,14 +90,14 @@ class KMeansModel ( private def clusterCentersWithNorm: Iterable[VectorWithNorm] = clusterCenters.map(new VectorWithNorm(_)) - /** - * @since 1.4.0 - */ + /** + * @since 1.4.0 + */ override def save(sc: SparkContext, path: String): Unit = { KMeansModel.SaveLoadV1_0.save(sc, this, path) } - /** @since 1.4.0 */ + /** @since 1.4.0 */ override protected def formatVersion: String = "1.0" } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala index 9ad9643a695e8..66ec544d07917 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala @@ -46,9 +46,9 @@ class PowerIterationClusteringModel( val k: Int, val assignments: RDD[PowerIterationClustering.Assignment]) extends Saveable with Serializable { - /** - * @since 1.4.0 - */ + /** + * @since 1.4.0 + */ override def save(sc: SparkContext, path: String): Unit = { PowerIterationClusteringModel.SaveLoadV1_0.save(sc, this, path) } @@ -72,9 +72,9 @@ object PowerIterationClusteringModel extends Loader[PowerIterationClusteringMode private[clustering] val thisClassName = "org.apache.spark.mllib.clustering.PowerIterationClusteringModel" - /** - * @since 1.4.0 - */ + /** + * @since 1.4.0 + */ def save(sc: SparkContext, model: PowerIterationClusteringModel, path: String): Unit = { val sqlContext = new SQLContext(sc) import sqlContext.implicits._ @@ -87,9 +87,9 @@ object PowerIterationClusteringModel extends Loader[PowerIterationClusteringMode dataRDD.write.parquet(Loader.dataPath(path)) } - /** - * @since 1.4.0 - */ + /** + * @since 1.4.0 + */ def load(sc: SparkContext, path: String): PowerIterationClusteringModel = { implicit val formats = DefaultFormats val sqlContext = new SQLContext(sc) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala index 329a4f2dfe09c..1e661e479dff4 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala @@ -94,15 +94,15 @@ class StreamingKMeansModel( val dim = clusterCenters(0).size /** - * @since 1.2.0 - */ + * @since 1.2.0 + */ val pointStats: Array[(Int, (Vector, Long))] = closest .aggregateByKey((Vectors.zeros(dim), 0L))(mergeContribs, mergeContribs) .collect() - /** - * @since 1.2.0 - */ + /** + * @since 1.2.0 + */ val discount = timeUnit match { case StreamingKMeans.BATCHES => decayFactor case StreamingKMeans.POINTS => @@ -143,8 +143,8 @@ class StreamingKMeansModel( /** * Check whether the smallest cluster is dying. If so, split the largest cluster. - * @since 1.2.0 - */ + * @since 1.2.0 + */ val weightsWithIndex = clusterWeights.view.zipWithIndex val (maxWeight, largest) = weightsWithIndex.maxBy(_._1) val (minWeight, smallest) = weightsWithIndex.minBy(_._1) @@ -195,9 +195,9 @@ class StreamingKMeans( var decayFactor: Double, var timeUnit: String) extends Logging with Serializable { - /** @since 1.2.0 */ + /** @since 1.2.0 */ def this() = this(2, 1.0, StreamingKMeans.BATCHES) - /** @since 1.2.0 */ + /** @since 1.2.0 */ protected var model: StreamingKMeansModel = new StreamingKMeansModel(null, null) /** @@ -319,7 +319,7 @@ class StreamingKMeans( assertInitialized() data.mapValues(model.predict) } - + /** * Java-friendly version of `predictOnValues`. * @since 1.4.0 From e430de9fa576ae8752f7d90b342026ff84fab8b5 Mon Sep 17 00:00:00 2001 From: Xiaoqing Wang Date: Tue, 18 Aug 2015 22:45:41 +0800 Subject: [PATCH 3/3] update code style : delete the Whitespace at end of line --- .../clustering/GaussianMixtureModel.scala | 8 +++---- .../spark/mllib/clustering/KMeansModel.scala | 10 ++++----- .../apache/spark/mllib/clustering/LDA.scala | 22 +++++++++---------- .../mllib/clustering/StreamingKMeans.scala | 22 +++++++++---------- 4 files changed, 31 insertions(+), 31 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala index 52cff49d75deb..05c52002fe922 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala @@ -62,13 +62,13 @@ class GaussianMixtureModel( } /** - * Number of gaussians in mixture + * Number of gaussians in mixture * @since 1.3.0 */ def k: Int = weights.length /** - * Maps given points to their cluster indices. + * Maps given points to their cluster indices. * @since 1.3.0 */ def predict(points: RDD[Vector]): RDD[Int] = { @@ -77,7 +77,7 @@ class GaussianMixtureModel( } /** - * Maps given point to its cluster index. + * Maps given point to its cluster index. * @since 1.4.0 */ def predict(point: Vector): Int = { @@ -86,7 +86,7 @@ class GaussianMixtureModel( } /** - * Java-friendly version of [[predict()]] + * Java-friendly version of [[predict()]] * @since 1.4.0 */ def predict(points: JavaRDD[Vector]): JavaRDD[java.lang.Integer] = diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala index 4a8be2cfda262..a184d6af438b5 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala @@ -40,19 +40,19 @@ class KMeansModel ( val clusterCenters: Array[Vector]) extends Saveable with Serializable with PMMLExportable { /** - * A Java-friendly constructor that takes an Iterable of Vectors. + * A Java-friendly constructor that takes an Iterable of Vectors. * @since 1.4.0 */ def this(centers: java.lang.Iterable[Vector]) = this(centers.asScala.toArray) /** - * Total number of clusters. + * Total number of clusters. * @since 0.8.0 */ def k: Int = clusterCenters.length /** - * Returns the cluster index that a given point belongs to. + * Returns the cluster index that a given point belongs to. * @since 0.8.0 */ def predict(point: Vector): Int = { @@ -60,7 +60,7 @@ class KMeansModel ( } /** - * Maps given points to their cluster indices. + * Maps given points to their cluster indices. * @since 1.0.0 */ def predict(points: RDD[Vector]): RDD[Int] = { @@ -70,7 +70,7 @@ class KMeansModel ( } /** - * Maps given points to their cluster indices. + * Maps given points to their cluster indices. * @since 1.0.0 */ def predict(points: JavaRDD[Vector]): JavaRDD[java.lang.Integer] = diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala index 02b7b881c4613..d7c811a69df76 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala @@ -135,9 +135,9 @@ class LDA private ( this } - /** + /** * Replicates a [[Double]] docConcentration to create a symmetric prior. - * @since 1.3.0 + * @since 1.3.0 */ def setDocConcentration(docConcentration: Double): this.type = { this.docConcentration = Vectors.dense(docConcentration) @@ -145,25 +145,25 @@ class LDA private ( } /** - * Alias for [[getAsymmetricDocConcentration]] + * Alias for [[getAsymmetricDocConcentration]] * @since 1.5.0 */ def getAsymmetricAlpha: Vector = getAsymmetricDocConcentration /** - * Alias for [[getDocConcentration]] + * Alias for [[getDocConcentration]] * @since 1.3.0 */ def getAlpha: Double = getDocConcentration /** - * Alias for [[setDocConcentration()]] + * Alias for [[setDocConcentration()]] * @since 1.5.0 */ def setAlpha(alpha: Vector): this.type = setDocConcentration(alpha) /** - * Alias for [[setDocConcentration()]] + * Alias for [[setDocConcentration()]] * @since 1.3.0 */ def setAlpha(alpha: Double): this.type = setDocConcentration(alpha) @@ -209,13 +209,13 @@ class LDA private ( } /** - * Alias for [[getTopicConcentration]] + * Alias for [[getTopicConcentration]] * @since 1.3.0 */ def getBeta: Double = getTopicConcentration /** - * Alias for [[setTopicConcentration()]] + * Alias for [[setTopicConcentration()]] * @since 1.3.0 */ def setBeta(beta: Double): this.type = setTopicConcentration(beta) @@ -237,13 +237,13 @@ class LDA private ( } /** - * Random seed + * Random seed * @since 1.3.0 */ def getSeed: Long = seed /** - * Random seed + * Random seed * @since 1.3.0 */ def setSeed(seed: Long): this.type = { @@ -334,7 +334,7 @@ class LDA private ( } /** - * Java-friendly version of [[run()]] + * Java-friendly version of [[run()]] * @since 1.3.0 */ def run(documents: JavaPairRDD[java.lang.Long, Vector]): LDAModel = { diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala index 1e661e479dff4..af748198f046c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala @@ -72,7 +72,7 @@ class StreamingKMeansModel( val clusterWeights: Array[Double]) extends KMeansModel(clusterCenters) with Logging { /** - * Perform a k-means update on a batch of data. + * Perform a k-means update on a batch of data. * @since 1.2.0 */ def update(data: RDD[Vector], decayFactor: Double, timeUnit: String): StreamingKMeansModel = { @@ -92,7 +92,7 @@ class StreamingKMeansModel( (p1._1, p1._2 + p2._2) } val dim = clusterCenters(0).size - + /** * @since 1.2.0 */ @@ -201,7 +201,7 @@ class StreamingKMeans( protected var model: StreamingKMeansModel = new StreamingKMeansModel(null, null) /** - * Set the number of clusters. + * Set the number of clusters. * @since 1.2.0 */ def setK(k: Int): this.type = { @@ -210,7 +210,7 @@ class StreamingKMeans( } /** - * Set the decay factor directly (for forgetful algorithms). + * Set the decay factor directly (for forgetful algorithms). * @since 1.2.0 */ def setDecayFactor(a: Double): this.type = { @@ -219,7 +219,7 @@ class StreamingKMeans( } /** - * Set the half life and time unit ("batches" or "points") for forgetful algorithms. + * Set the half life and time unit ("batches" or "points") for forgetful algorithms. * @since 1.2.0 */ def setHalfLife(halfLife: Double, timeUnit: String): this.type = { @@ -233,7 +233,7 @@ class StreamingKMeans( } /** - * Specify initial centers directly. + * Specify initial centers directly. * @since 1.2.0 */ def setInitialCenters(centers: Array[Vector], weights: Array[Double]): this.type = { @@ -258,7 +258,7 @@ class StreamingKMeans( } /** - * Return the latest model. + * Return the latest model. * @since 1.2.0 */ def latestModel(): StreamingKMeansModel = { @@ -282,7 +282,7 @@ class StreamingKMeans( } /** - * Java-friendly version of `trainOn`. + * Java-friendly version of `trainOn`. * @since 1.4.0 */ def trainOn(data: JavaDStream[Vector]): Unit = trainOn(data.dstream) @@ -300,7 +300,7 @@ class StreamingKMeans( } /** - * Java-friendly version of `predictOn`. + * Java-friendly version of `predictOn`. * @since 1.4.0 */ def predictOn(data: JavaDStream[Vector]): JavaDStream[java.lang.Integer] = { @@ -319,9 +319,9 @@ class StreamingKMeans( assertInitialized() data.mapValues(model.predict) } - + /** - * Java-friendly version of `predictOnValues`. + * Java-friendly version of `predictOnValues`. * @since 1.4.0 */ def predictOnValues[K](