From 649e87f9e7dd44ba90d6a855549e29febbfbb1c5 Mon Sep 17 00:00:00 2001 From: noelsmith Date: Sun, 6 Sep 2015 19:40:48 +0100 Subject: [PATCH 1/7] Added @since tags to mllib.clustering --- python/pyspark/__init__.py | 20 ++++++++++ python/pyspark/mllib/clustering.py | 61 +++++++++++++++++++++++++++++- 2 files changed, 80 insertions(+), 1 deletion(-) diff --git a/python/pyspark/__init__.py b/python/pyspark/__init__.py index 8475dfb1c6ad0..76ee3b1895e9f 100644 --- a/python/pyspark/__init__.py +++ b/python/pyspark/__init__.py @@ -67,6 +67,26 @@ def deco(f): # for back compatibility from pyspark.sql import SQLContext, HiveContext, SchemaRDD, Row + +def since(version): + """ + A decorator that annotates a function to append the version of Spark the function was added. + """ + import re + indent_p = re.compile(r'\n( +)') + + def deco(f): + if f.__doc__ is None: + f.__doc__ = ".. versionadded:: {}".format(version) + return f + else: + indents = indent_p.findall(f.__doc__) + indent = ' ' * (min(len(m) for m in indents) if indents else 0) + f.__doc__ = "{}\n\n{}.. versionadded:: {}".format(f.__doc__.rstrip(), indent, version) + return f + return deco + + __all__ = [ "SparkConf", "SparkContext", "SparkFiles", "RDD", "StorageLevel", "Broadcast", "Accumulator", "AccumulatorParam", "MarshalSerializer", "PickleSerializer", diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index 6964a45db2493..96a6c3b1963d5 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -28,7 +28,7 @@ from collections import namedtuple -from pyspark import SparkContext +from pyspark import SparkContext, since from pyspark.rdd import RDD, ignore_unicode_prefix from pyspark.mllib.common import JavaModelWrapper, callMLlibFunc, callJavaFunc, _py2java, _java2py from pyspark.mllib.linalg import SparseVector, _convert_to_vector, DenseVector @@ -96,21 +96,26 @@ class KMeansModel(Saveable, Loader): ... initialModel = KMeansModel([(-1000.0,-1000.0),(5.0,5.0),(1000.0,1000.0)])) >>> model.clusterCenters [array([-1000., -1000.]), array([ 5., 5.]), array([ 1000., 1000.])] + + .. versionadded:: 0.9.1 """ def __init__(self, centers): self.centers = centers @property + @since('1.0.0') def clusterCenters(self): """Get the cluster centers, represented as a list of NumPy arrays.""" return self.centers @property + @since('1.4.0') def k(self): """Total number of clusters.""" return len(self.centers) + @since('0.9.1') def predict(self, x): """Find the cluster to which x belongs in this model.""" best = 0 @@ -126,6 +131,7 @@ def predict(self, x): best_distance = distance return best + @since('1.4.0') def computeCost(self, rdd): """ Return the K-means cost (sum of squared distances of points to @@ -135,20 +141,26 @@ def computeCost(self, rdd): [_convert_to_vector(c) for c in self.centers]) return cost + @since('1.4.0') def save(self, sc, path): java_centers = _py2java(sc, [_convert_to_vector(c) for c in self.centers]) java_model = sc._jvm.org.apache.spark.mllib.clustering.KMeansModel(java_centers) java_model.save(sc._jsc.sc(), path) @classmethod + @since('1.4.0') def load(cls, sc, path): java_model = sc._jvm.org.apache.spark.mllib.clustering.KMeansModel.load(sc._jsc.sc(), path) return KMeansModel(_java2py(sc, java_model.clusterCenters())) class KMeans(object): + """ + .. versionadded:: 0.9.1 + """ @classmethod + @since('0.9.1') def train(cls, rdd, k, maxIterations=100, runs=1, initializationMode="k-means||", seed=None, initializationSteps=5, epsilon=1e-4, initialModel=None): """Train a k-means clustering model.""" @@ -222,9 +234,12 @@ class GaussianMixtureModel(JavaModelWrapper, JavaSaveable, JavaLoader): True >>> labels[3]==labels[4] True + + .. versionadded:: 1.3.0 """ @property + @since('1.4.0') def weights(self): """ Weights for each Gaussian distribution in the mixture, where weights[i] is @@ -233,6 +248,7 @@ def weights(self): return array(self.call("weights")) @property + @since('1.4.0') def gaussians(self): """ Array of MultivariateGaussian where gaussians[i] represents @@ -243,10 +259,12 @@ def gaussians(self): for gaussian in zip(*self.call("gaussians"))] @property + @since('1.4.0') def k(self): """Number of gaussians in mixture.""" return len(self.weights) + @since('1.3.0') def predict(self, x): """ Find the cluster to which the points in 'x' has maximum membership @@ -262,6 +280,7 @@ def predict(self, x): raise TypeError("x should be represented by an RDD, " "but got %s." % type(x)) + @since('1.3.0') def predictSoft(self, x): """ Find the membership of each point in 'x' to all mixture components. @@ -279,6 +298,7 @@ def predictSoft(self, x): "but got %s." % type(x)) @classmethod + @since('1.5.0') def load(cls, sc, path): """Load the GaussianMixtureModel from disk. @@ -302,8 +322,11 @@ class GaussianMixture(object): :param maxIterations: Number of iterations. Default to 100 :param seed: Random Seed :param initialModel: GaussianMixtureModel for initializing learning + + .. versionadded:: 1.3.0 """ @classmethod + @since('1.3.0') def train(cls, rdd, k, convergenceTol=1e-3, maxIterations=100, seed=None, initialModel=None): """Train a Gaussian Mixture clustering model.""" initialModelWeights = None @@ -358,15 +381,19 @@ class PowerIterationClusteringModel(JavaModelWrapper, JavaSaveable, JavaLoader): ... rmtree(path) ... except OSError: ... pass + + .. versionadded:: 1.5.0 """ @property + @since('1.5.0') def k(self): """ Returns the number of clusters. """ return self.call("k") + @since('1.5.0') def assignments(self): """ Returns the cluster assignments of this model. @@ -375,6 +402,7 @@ def assignments(self): lambda x: (PowerIterationClustering.Assignment(*x))) @classmethod + @since('1.5.0') def load(cls, sc, path): model = cls._load_java(sc, path) wrapper = sc._jvm.PowerIterationClusteringModelWrapper(model) @@ -390,9 +418,12 @@ class PowerIterationClustering(object): From the abstract: PIC finds a very low-dimensional embedding of a dataset using truncated power iteration on a normalized pair-wise similarity matrix of the data. + + .. versionadded:: 1.5.0 """ @classmethod + @since('1.5.0') def train(cls, rdd, k, maxIterations=100, initMode="random"): """ :param rdd: an RDD of (i, j, s,,ij,,) tuples representing the @@ -415,6 +446,8 @@ def train(cls, rdd, k, maxIterations=100, initMode="random"): class Assignment(namedtuple("Assignment", ["id", "cluster"])): """ Represents an (id, cluster) tuple. + + .. versionadded:: 1.5.0 """ @@ -474,17 +507,21 @@ class StreamingKMeansModel(KMeansModel): 0 >>> stkm.predict([1.5, 1.5]) 1 + + .. versionadded:: 1.5.0 """ def __init__(self, clusterCenters, clusterWeights): super(StreamingKMeansModel, self).__init__(centers=clusterCenters) self._clusterWeights = list(clusterWeights) @property + @since('1.5.0') def clusterWeights(self): """Return the cluster weights.""" return self._clusterWeights @ignore_unicode_prefix + @since('1.5.0') def update(self, data, decayFactor, timeUnit): """Update the centroids, according to data @@ -523,6 +560,8 @@ class StreamingKMeans(object): :param decayFactor: float, forgetfulness of the previous centroids. :param timeUnit: can be "batches" or "points". If points, then the decayfactor is raised to the power of no. of new points. + + .. versionadded:: 1.5.0 """ def __init__(self, k=2, decayFactor=1.0, timeUnit="batches"): self._k = k @@ -533,10 +572,12 @@ def __init__(self, k=2, decayFactor=1.0, timeUnit="batches"): self._timeUnit = timeUnit self._model = None + @since('1.5.0') def latestModel(self): """Return the latest model""" return self._model + @since('1.5.0') def _validate(self, dstream): if self._model is None: raise ValueError( @@ -547,16 +588,19 @@ def _validate(self, dstream): "Expected dstream to be of type DStream, " "got type %s" % type(dstream)) + @since('1.5.0') def setK(self, k): """Set number of clusters.""" self._k = k return self + @since('1.5.0') def setDecayFactor(self, decayFactor): """Set decay factor.""" self._decayFactor = decayFactor return self + @since('1.5.0') def setHalfLife(self, halfLife, timeUnit): """ Set number of batches after which the centroids of that @@ -566,6 +610,7 @@ def setHalfLife(self, halfLife, timeUnit): self._decayFactor = exp(log(0.5) / halfLife) return self + @since('1.5.0') def setInitialCenters(self, centers, weights): """ Set initial centers. Should be set before calling trainOn. @@ -573,6 +618,7 @@ def setInitialCenters(self, centers, weights): self._model = StreamingKMeansModel(centers, weights) return self + @since('1.5.0') def setRandomCenters(self, dim, weight, seed): """ Set the initial centres to be random samples from @@ -584,6 +630,7 @@ def setRandomCenters(self, dim, weight, seed): self._model = StreamingKMeansModel(clusterCenters, clusterWeights) return self + @since('1.5.0') def trainOn(self, dstream): """Train the model on the incoming dstream.""" self._validate(dstream) @@ -593,6 +640,7 @@ def update(rdd): dstream.foreachRDD(update) + @since('1.5.0') def predictOn(self, dstream): """ Make predictions on a dstream. @@ -601,6 +649,7 @@ def predictOn(self, dstream): self._validate(dstream) return dstream.map(lambda x: self._model.predict(x)) + @since('1.5.0') def predictOnValues(self, dstream): """ Make predictions on a keyed dstream. @@ -649,16 +698,21 @@ class LDAModel(JavaModelWrapper): ... rmtree(path) ... except OSError: ... pass + + .. versionadded:: 1.5.0 """ + @since('1.5.0') def topicsMatrix(self): """Inferred topics, where each topic is represented by a distribution over terms.""" return self.call("topicsMatrix").toArray() + @since('1.5.0') def vocabSize(self): """Vocabulary size (number of terms or terms in the vocabulary)""" return self.call("vocabSize") + @since('1.5.0') def save(self, sc, path): """Save the LDAModel on to disk. @@ -672,6 +726,7 @@ def save(self, sc, path): self._java_model.save(sc._jsc.sc(), path) @classmethod + @since('1.5.0') def load(cls, sc, path): """Load the LDAModel from disk. @@ -688,8 +743,12 @@ def load(cls, sc, path): class LDA(object): + """ + .. versionadded:: 1.5.0 + """ @classmethod + @since('1.5.0') def train(cls, rdd, k=10, maxIterations=20, docConcentration=-1.0, topicConcentration=-1.0, seed=None, checkpointInterval=10, optimizer="em"): """Train a LDA model. From a1cf249e18cb3472ad08ae1eafc0d36e8b6ad417 Mon Sep 17 00:00:00 2001 From: noelsmith Date: Mon, 7 Sep 2015 12:00:41 +0100 Subject: [PATCH 2/7] Removed @since to show inherited docstring / from private method --- python/pyspark/mllib/clustering.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index 96a6c3b1963d5..c682e5557f993 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -141,7 +141,6 @@ def computeCost(self, rdd): [_convert_to_vector(c) for c in self.centers]) return cost - @since('1.4.0') def save(self, sc, path): java_centers = _py2java(sc, [_convert_to_vector(c) for c in self.centers]) java_model = sc._jvm.org.apache.spark.mllib.clustering.KMeansModel(java_centers) @@ -577,7 +576,6 @@ def latestModel(self): """Return the latest model""" return self._model - @since('1.5.0') def _validate(self, dstream): if self._model is None: raise ValueError( From 914f8307ac02c7fea453684e23d21d22fe955ece Mon Sep 17 00:00:00 2001 From: noelsmith Date: Thu, 10 Sep 2015 22:05:25 +0100 Subject: [PATCH 3/7] Rebased to include SPARK-10373 --- python/pyspark/__init__.py | 20 -------------------- 1 file changed, 20 deletions(-) diff --git a/python/pyspark/__init__.py b/python/pyspark/__init__.py index 76ee3b1895e9f..8475dfb1c6ad0 100644 --- a/python/pyspark/__init__.py +++ b/python/pyspark/__init__.py @@ -67,26 +67,6 @@ def deco(f): # for back compatibility from pyspark.sql import SQLContext, HiveContext, SchemaRDD, Row - -def since(version): - """ - A decorator that annotates a function to append the version of Spark the function was added. - """ - import re - indent_p = re.compile(r'\n( +)') - - def deco(f): - if f.__doc__ is None: - f.__doc__ = ".. versionadded:: {}".format(version) - return f - else: - indents = indent_p.findall(f.__doc__) - indent = ' ' * (min(len(m) for m in indents) if indents else 0) - f.__doc__ = "{}\n\n{}.. versionadded:: {}".format(f.__doc__.rstrip(), indent, version) - return f - return deco - - __all__ = [ "SparkConf", "SparkContext", "SparkFiles", "RDD", "StorageLevel", "Broadcast", "Accumulator", "AccumulatorParam", "MarshalSerializer", "PickleSerializer", From 86f72d6a5c6b5e8bd96266eb6e5789c2686c2aa6 Mon Sep 17 00:00:00 2001 From: noelsmith Date: Thu, 10 Sep 2015 22:17:30 +0100 Subject: [PATCH 4/7] Added missing docstrings. Removed maintenance part from version numbers --- python/pyspark/mllib/clustering.py | 98 ++++++++++++++++-------------- 1 file changed, 54 insertions(+), 44 deletions(-) diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index c682e5557f993..5f359828bdc1f 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -97,25 +97,25 @@ class KMeansModel(Saveable, Loader): >>> model.clusterCenters [array([-1000., -1000.]), array([ 5., 5.]), array([ 1000., 1000.])] - .. versionadded:: 0.9.1 + .. versionadded:: 0.9 """ def __init__(self, centers): self.centers = centers @property - @since('1.0.0') + @since(1.0) def clusterCenters(self): """Get the cluster centers, represented as a list of NumPy arrays.""" return self.centers @property - @since('1.4.0') + @since(1.4) def k(self): """Total number of clusters.""" return len(self.centers) - @since('0.9.1') + @since(0.9) def predict(self, x): """Find the cluster to which x belongs in this model.""" best = 0 @@ -131,7 +131,7 @@ def predict(self, x): best_distance = distance return best - @since('1.4.0') + @since(1.4) def computeCost(self, rdd): """ Return the K-means cost (sum of squared distances of points to @@ -141,25 +141,32 @@ def computeCost(self, rdd): [_convert_to_vector(c) for c in self.centers]) return cost + @since(1.4) def save(self, sc, path): + """ + Save this model to the given path. + """ java_centers = _py2java(sc, [_convert_to_vector(c) for c in self.centers]) java_model = sc._jvm.org.apache.spark.mllib.clustering.KMeansModel(java_centers) java_model.save(sc._jsc.sc(), path) @classmethod - @since('1.4.0') + @since(1.4) def load(cls, sc, path): + """ + Load a model from the given path. + """ java_model = sc._jvm.org.apache.spark.mllib.clustering.KMeansModel.load(sc._jsc.sc(), path) return KMeansModel(_java2py(sc, java_model.clusterCenters())) class KMeans(object): """ - .. versionadded:: 0.9.1 + .. versionadded:: 0.9 """ @classmethod - @since('0.9.1') + @since(0.9) def train(cls, rdd, k, maxIterations=100, runs=1, initializationMode="k-means||", seed=None, initializationSteps=5, epsilon=1e-4, initialModel=None): """Train a k-means clustering model.""" @@ -234,11 +241,11 @@ class GaussianMixtureModel(JavaModelWrapper, JavaSaveable, JavaLoader): >>> labels[3]==labels[4] True - .. versionadded:: 1.3.0 + .. versionadded:: 1.3 """ @property - @since('1.4.0') + @since(1.4) def weights(self): """ Weights for each Gaussian distribution in the mixture, where weights[i] is @@ -247,7 +254,7 @@ def weights(self): return array(self.call("weights")) @property - @since('1.4.0') + @since(1.4) def gaussians(self): """ Array of MultivariateGaussian where gaussians[i] represents @@ -258,12 +265,12 @@ def gaussians(self): for gaussian in zip(*self.call("gaussians"))] @property - @since('1.4.0') + @since(1.4) def k(self): """Number of gaussians in mixture.""" return len(self.weights) - @since('1.3.0') + @since(1.3) def predict(self, x): """ Find the cluster to which the points in 'x' has maximum membership @@ -279,7 +286,7 @@ def predict(self, x): raise TypeError("x should be represented by an RDD, " "but got %s." % type(x)) - @since('1.3.0') + @since(1.3) def predictSoft(self, x): """ Find the membership of each point in 'x' to all mixture components. @@ -297,7 +304,7 @@ def predictSoft(self, x): "but got %s." % type(x)) @classmethod - @since('1.5.0') + @since(1.5) def load(cls, sc, path): """Load the GaussianMixtureModel from disk. @@ -322,10 +329,10 @@ class GaussianMixture(object): :param seed: Random Seed :param initialModel: GaussianMixtureModel for initializing learning - .. versionadded:: 1.3.0 + .. versionadded:: 1.3 """ @classmethod - @since('1.3.0') + @since(1.3) def train(cls, rdd, k, convergenceTol=1e-3, maxIterations=100, seed=None, initialModel=None): """Train a Gaussian Mixture clustering model.""" initialModelWeights = None @@ -381,18 +388,18 @@ class PowerIterationClusteringModel(JavaModelWrapper, JavaSaveable, JavaLoader): ... except OSError: ... pass - .. versionadded:: 1.5.0 + .. versionadded:: 1.5 """ @property - @since('1.5.0') + @since(1.5) def k(self): """ Returns the number of clusters. """ return self.call("k") - @since('1.5.0') + @since(1.5) def assignments(self): """ Returns the cluster assignments of this model. @@ -401,8 +408,11 @@ def assignments(self): lambda x: (PowerIterationClustering.Assignment(*x))) @classmethod - @since('1.5.0') + @since(1.5) def load(cls, sc, path): + """ + Load a model from the given path. + """ model = cls._load_java(sc, path) wrapper = sc._jvm.PowerIterationClusteringModelWrapper(model) return PowerIterationClusteringModel(wrapper) @@ -418,11 +428,11 @@ class PowerIterationClustering(object): dataset using truncated power iteration on a normalized pair-wise similarity matrix of the data. - .. versionadded:: 1.5.0 + .. versionadded:: 1.5 """ @classmethod - @since('1.5.0') + @since(1.5) def train(cls, rdd, k, maxIterations=100, initMode="random"): """ :param rdd: an RDD of (i, j, s,,ij,,) tuples representing the @@ -446,7 +456,7 @@ class Assignment(namedtuple("Assignment", ["id", "cluster"])): """ Represents an (id, cluster) tuple. - .. versionadded:: 1.5.0 + .. versionadded:: 1.5 """ @@ -507,20 +517,20 @@ class StreamingKMeansModel(KMeansModel): >>> stkm.predict([1.5, 1.5]) 1 - .. versionadded:: 1.5.0 + .. versionadded:: 1.5 """ def __init__(self, clusterCenters, clusterWeights): super(StreamingKMeansModel, self).__init__(centers=clusterCenters) self._clusterWeights = list(clusterWeights) @property - @since('1.5.0') + @since(1.5) def clusterWeights(self): """Return the cluster weights.""" return self._clusterWeights @ignore_unicode_prefix - @since('1.5.0') + @since(1.5) def update(self, data, decayFactor, timeUnit): """Update the centroids, according to data @@ -560,7 +570,7 @@ class StreamingKMeans(object): :param timeUnit: can be "batches" or "points". If points, then the decayfactor is raised to the power of no. of new points. - .. versionadded:: 1.5.0 + .. versionadded:: 1.5 """ def __init__(self, k=2, decayFactor=1.0, timeUnit="batches"): self._k = k @@ -571,7 +581,7 @@ def __init__(self, k=2, decayFactor=1.0, timeUnit="batches"): self._timeUnit = timeUnit self._model = None - @since('1.5.0') + @since(1.5) def latestModel(self): """Return the latest model""" return self._model @@ -586,19 +596,19 @@ def _validate(self, dstream): "Expected dstream to be of type DStream, " "got type %s" % type(dstream)) - @since('1.5.0') + @since(1.5) def setK(self, k): """Set number of clusters.""" self._k = k return self - @since('1.5.0') + @since(1.5) def setDecayFactor(self, decayFactor): """Set decay factor.""" self._decayFactor = decayFactor return self - @since('1.5.0') + @since(1.5) def setHalfLife(self, halfLife, timeUnit): """ Set number of batches after which the centroids of that @@ -608,7 +618,7 @@ def setHalfLife(self, halfLife, timeUnit): self._decayFactor = exp(log(0.5) / halfLife) return self - @since('1.5.0') + @since(1.5) def setInitialCenters(self, centers, weights): """ Set initial centers. Should be set before calling trainOn. @@ -616,7 +626,7 @@ def setInitialCenters(self, centers, weights): self._model = StreamingKMeansModel(centers, weights) return self - @since('1.5.0') + @since(1.5) def setRandomCenters(self, dim, weight, seed): """ Set the initial centres to be random samples from @@ -628,7 +638,7 @@ def setRandomCenters(self, dim, weight, seed): self._model = StreamingKMeansModel(clusterCenters, clusterWeights) return self - @since('1.5.0') + @since(1.5) def trainOn(self, dstream): """Train the model on the incoming dstream.""" self._validate(dstream) @@ -638,7 +648,7 @@ def update(rdd): dstream.foreachRDD(update) - @since('1.5.0') + @since(1.5) def predictOn(self, dstream): """ Make predictions on a dstream. @@ -647,7 +657,7 @@ def predictOn(self, dstream): self._validate(dstream) return dstream.map(lambda x: self._model.predict(x)) - @since('1.5.0') + @since(1.5) def predictOnValues(self, dstream): """ Make predictions on a keyed dstream. @@ -697,20 +707,20 @@ class LDAModel(JavaModelWrapper): ... except OSError: ... pass - .. versionadded:: 1.5.0 + .. versionadded:: 1.5 """ - @since('1.5.0') + @since(1.5) def topicsMatrix(self): """Inferred topics, where each topic is represented by a distribution over terms.""" return self.call("topicsMatrix").toArray() - @since('1.5.0') + @since(1.5) def vocabSize(self): """Vocabulary size (number of terms or terms in the vocabulary)""" return self.call("vocabSize") - @since('1.5.0') + @since(1.5) def save(self, sc, path): """Save the LDAModel on to disk. @@ -724,7 +734,7 @@ def save(self, sc, path): self._java_model.save(sc._jsc.sc(), path) @classmethod - @since('1.5.0') + @since(1.5) def load(cls, sc, path): """Load the LDAModel from disk. @@ -742,11 +752,11 @@ def load(cls, sc, path): class LDA(object): """ - .. versionadded:: 1.5.0 + .. versionadded:: 1.5 """ @classmethod - @since('1.5.0') + @since(1.5) def train(cls, rdd, k=10, maxIterations=20, docConcentration=-1.0, topicConcentration=-1.0, seed=None, checkpointInterval=10, optimizer="em"): """Train a LDA model. From c8da1cc304c2266ee86ccb76c649d2c075271f70 Mon Sep 17 00:00:00 2001 From: noelsmith Date: Mon, 14 Sep 2015 21:57:19 +0100 Subject: [PATCH 5/7] Reinstated 3-part version numbers --- python/pyspark/mllib/clustering.py | 88 +++++++++++++++--------------- 1 file changed, 44 insertions(+), 44 deletions(-) diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index 5f359828bdc1f..d194fa0cda4c4 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -97,25 +97,25 @@ class KMeansModel(Saveable, Loader): >>> model.clusterCenters [array([-1000., -1000.]), array([ 5., 5.]), array([ 1000., 1000.])] - .. versionadded:: 0.9 + .. versionadded:: 0.9.1 """ def __init__(self, centers): self.centers = centers @property - @since(1.0) + @since('1.0.0') def clusterCenters(self): """Get the cluster centers, represented as a list of NumPy arrays.""" return self.centers @property - @since(1.4) + @since('1.4.0') def k(self): """Total number of clusters.""" return len(self.centers) - @since(0.9) + @since('0.9.1') def predict(self, x): """Find the cluster to which x belongs in this model.""" best = 0 @@ -131,7 +131,7 @@ def predict(self, x): best_distance = distance return best - @since(1.4) + @since('1.4.0') def computeCost(self, rdd): """ Return the K-means cost (sum of squared distances of points to @@ -151,7 +151,7 @@ def save(self, sc, path): java_model.save(sc._jsc.sc(), path) @classmethod - @since(1.4) + @since('1.4.0') def load(cls, sc, path): """ Load a model from the given path. @@ -162,11 +162,11 @@ def load(cls, sc, path): class KMeans(object): """ - .. versionadded:: 0.9 + .. versionadded:: 0.9.1 """ @classmethod - @since(0.9) + @since('0.9.1') def train(cls, rdd, k, maxIterations=100, runs=1, initializationMode="k-means||", seed=None, initializationSteps=5, epsilon=1e-4, initialModel=None): """Train a k-means clustering model.""" @@ -241,11 +241,11 @@ class GaussianMixtureModel(JavaModelWrapper, JavaSaveable, JavaLoader): >>> labels[3]==labels[4] True - .. versionadded:: 1.3 + .. versionadded:: 1.3.0 """ @property - @since(1.4) + @since('1.4.0') def weights(self): """ Weights for each Gaussian distribution in the mixture, where weights[i] is @@ -254,7 +254,7 @@ def weights(self): return array(self.call("weights")) @property - @since(1.4) + @since('1.4.0') def gaussians(self): """ Array of MultivariateGaussian where gaussians[i] represents @@ -265,12 +265,12 @@ def gaussians(self): for gaussian in zip(*self.call("gaussians"))] @property - @since(1.4) + @since('1.4.0') def k(self): """Number of gaussians in mixture.""" return len(self.weights) - @since(1.3) + @since('1.3.0') def predict(self, x): """ Find the cluster to which the points in 'x' has maximum membership @@ -286,7 +286,7 @@ def predict(self, x): raise TypeError("x should be represented by an RDD, " "but got %s." % type(x)) - @since(1.3) + @since('1.3.0') def predictSoft(self, x): """ Find the membership of each point in 'x' to all mixture components. @@ -304,7 +304,7 @@ def predictSoft(self, x): "but got %s." % type(x)) @classmethod - @since(1.5) + @since('1.5.0') def load(cls, sc, path): """Load the GaussianMixtureModel from disk. @@ -329,10 +329,10 @@ class GaussianMixture(object): :param seed: Random Seed :param initialModel: GaussianMixtureModel for initializing learning - .. versionadded:: 1.3 + .. versionadded:: 1.3.0 """ @classmethod - @since(1.3) + @since('1.3.0') def train(cls, rdd, k, convergenceTol=1e-3, maxIterations=100, seed=None, initialModel=None): """Train a Gaussian Mixture clustering model.""" initialModelWeights = None @@ -388,18 +388,18 @@ class PowerIterationClusteringModel(JavaModelWrapper, JavaSaveable, JavaLoader): ... except OSError: ... pass - .. versionadded:: 1.5 + .. versionadded:: 1.5.0 """ @property - @since(1.5) + @since('1.5.0') def k(self): """ Returns the number of clusters. """ return self.call("k") - @since(1.5) + @since('1.5.0') def assignments(self): """ Returns the cluster assignments of this model. @@ -408,7 +408,7 @@ def assignments(self): lambda x: (PowerIterationClustering.Assignment(*x))) @classmethod - @since(1.5) + @since('1.5.0') def load(cls, sc, path): """ Load a model from the given path. @@ -428,11 +428,11 @@ class PowerIterationClustering(object): dataset using truncated power iteration on a normalized pair-wise similarity matrix of the data. - .. versionadded:: 1.5 + .. versionadded:: 1.5.0 """ @classmethod - @since(1.5) + @since('1.5.0') def train(cls, rdd, k, maxIterations=100, initMode="random"): """ :param rdd: an RDD of (i, j, s,,ij,,) tuples representing the @@ -456,7 +456,7 @@ class Assignment(namedtuple("Assignment", ["id", "cluster"])): """ Represents an (id, cluster) tuple. - .. versionadded:: 1.5 + .. versionadded:: 1.5.0 """ @@ -517,20 +517,20 @@ class StreamingKMeansModel(KMeansModel): >>> stkm.predict([1.5, 1.5]) 1 - .. versionadded:: 1.5 + .. versionadded:: 1.5.0 """ def __init__(self, clusterCenters, clusterWeights): super(StreamingKMeansModel, self).__init__(centers=clusterCenters) self._clusterWeights = list(clusterWeights) @property - @since(1.5) + @since('1.5.0') def clusterWeights(self): """Return the cluster weights.""" return self._clusterWeights @ignore_unicode_prefix - @since(1.5) + @since('1.5.0') def update(self, data, decayFactor, timeUnit): """Update the centroids, according to data @@ -570,7 +570,7 @@ class StreamingKMeans(object): :param timeUnit: can be "batches" or "points". If points, then the decayfactor is raised to the power of no. of new points. - .. versionadded:: 1.5 + .. versionadded:: 1.5.0 """ def __init__(self, k=2, decayFactor=1.0, timeUnit="batches"): self._k = k @@ -581,7 +581,7 @@ def __init__(self, k=2, decayFactor=1.0, timeUnit="batches"): self._timeUnit = timeUnit self._model = None - @since(1.5) + @since('1.5.0') def latestModel(self): """Return the latest model""" return self._model @@ -596,19 +596,19 @@ def _validate(self, dstream): "Expected dstream to be of type DStream, " "got type %s" % type(dstream)) - @since(1.5) + @since('1.5.0') def setK(self, k): """Set number of clusters.""" self._k = k return self - @since(1.5) + @since('1.5.0') def setDecayFactor(self, decayFactor): """Set decay factor.""" self._decayFactor = decayFactor return self - @since(1.5) + @since('1.5.0') def setHalfLife(self, halfLife, timeUnit): """ Set number of batches after which the centroids of that @@ -618,7 +618,7 @@ def setHalfLife(self, halfLife, timeUnit): self._decayFactor = exp(log(0.5) / halfLife) return self - @since(1.5) + @since('1.5.0') def setInitialCenters(self, centers, weights): """ Set initial centers. Should be set before calling trainOn. @@ -626,7 +626,7 @@ def setInitialCenters(self, centers, weights): self._model = StreamingKMeansModel(centers, weights) return self - @since(1.5) + @since('1.5.0') def setRandomCenters(self, dim, weight, seed): """ Set the initial centres to be random samples from @@ -638,7 +638,7 @@ def setRandomCenters(self, dim, weight, seed): self._model = StreamingKMeansModel(clusterCenters, clusterWeights) return self - @since(1.5) + @since('1.5.0') def trainOn(self, dstream): """Train the model on the incoming dstream.""" self._validate(dstream) @@ -648,7 +648,7 @@ def update(rdd): dstream.foreachRDD(update) - @since(1.5) + @since('1.5.0') def predictOn(self, dstream): """ Make predictions on a dstream. @@ -657,7 +657,7 @@ def predictOn(self, dstream): self._validate(dstream) return dstream.map(lambda x: self._model.predict(x)) - @since(1.5) + @since('1.5.0') def predictOnValues(self, dstream): """ Make predictions on a keyed dstream. @@ -707,20 +707,20 @@ class LDAModel(JavaModelWrapper): ... except OSError: ... pass - .. versionadded:: 1.5 + .. versionadded:: 1.5.0 """ - @since(1.5) + @since('1.5.0') def topicsMatrix(self): """Inferred topics, where each topic is represented by a distribution over terms.""" return self.call("topicsMatrix").toArray() - @since(1.5) + @since('1.5.0') def vocabSize(self): """Vocabulary size (number of terms or terms in the vocabulary)""" return self.call("vocabSize") - @since(1.5) + @since('1.5.0') def save(self, sc, path): """Save the LDAModel on to disk. @@ -734,7 +734,7 @@ def save(self, sc, path): self._java_model.save(sc._jsc.sc(), path) @classmethod - @since(1.5) + @since('1.5.0') def load(cls, sc, path): """Load the LDAModel from disk. @@ -752,11 +752,11 @@ def load(cls, sc, path): class LDA(object): """ - .. versionadded:: 1.5 + .. versionadded:: 1.5.0 """ @classmethod - @since(1.5) + @since('1.5.0') def train(cls, rdd, k=10, maxIterations=20, docConcentration=-1.0, topicConcentration=-1.0, seed=None, checkpointInterval=10, optimizer="em"): """Train a LDA model. From 7925db7213ce3c34f1fd438273931d03686d5425 Mon Sep 17 00:00:00 2001 From: noelsmith Date: Tue, 15 Sep 2015 08:07:05 +0100 Subject: [PATCH 6/7] Replacing 0.9.1 with 0.9.0 --- python/pyspark/mllib/clustering.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index d194fa0cda4c4..e15ba2ec184f2 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -97,7 +97,7 @@ class KMeansModel(Saveable, Loader): >>> model.clusterCenters [array([-1000., -1000.]), array([ 5., 5.]), array([ 1000., 1000.])] - .. versionadded:: 0.9.1 + .. versionadded:: 0.9.0 """ def __init__(self, centers): @@ -115,7 +115,7 @@ def k(self): """Total number of clusters.""" return len(self.centers) - @since('0.9.1') + @since('0.9.0') def predict(self, x): """Find the cluster to which x belongs in this model.""" best = 0 @@ -162,11 +162,11 @@ def load(cls, sc, path): class KMeans(object): """ - .. versionadded:: 0.9.1 + .. versionadded:: 0.9.0 """ @classmethod - @since('0.9.1') + @since('0.9.0') def train(cls, rdd, k, maxIterations=100, runs=1, initializationMode="k-means||", seed=None, initializationSteps=5, epsilon=1e-4, initialModel=None): """Train a k-means clustering model.""" From 4a2fbe06165f1395b65a508dfd2fa42721bf892e Mon Sep 17 00:00:00 2001 From: noelsmith Date: Wed, 21 Oct 2015 20:23:07 +0100 Subject: [PATCH 7/7] Fixed version number on KMeansModel.save() --- python/pyspark/mllib/clustering.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index e15ba2ec184f2..c451df17cf264 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -141,7 +141,7 @@ def computeCost(self, rdd): [_convert_to_vector(c) for c in self.centers]) return cost - @since(1.4) + @since('1.4.0') def save(self, sc, path): """ Save this model to the given path.