From 4be44237d7f204209db1b51ec9f77725656c8904 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Thu, 25 Jun 2015 00:05:16 +0800 Subject: [PATCH 1/2] Python support for Power Iteration Clustering --- ...PowerIterationClusteringModelWrapper.scala | 33 +++++++ .../mllib/api/python/PythonMLLibAPI.scala | 21 ++++ python/pyspark/mllib/clustering.py | 98 ++++++++++++++++++- 3 files changed, 149 insertions(+), 3 deletions(-) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/api/python/PowerIterationClusteringModelWrapper.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PowerIterationClusteringModelWrapper.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PowerIterationClusteringModelWrapper.scala new file mode 100644 index 0000000000000..b4f3aad8d94b3 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PowerIterationClusteringModelWrapper.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.api.python + +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.clustering.PowerIterationClusteringModel + +/** + * A Wrapper of PowerIterationClusteringModel to provide helper method for Python + */ +private[python] class PowerIterationClusteringModelWrapper(model: PowerIterationClusteringModel) + extends PowerIterationClusteringModel(model.k, model.assignments) { + + def getAssignments: RDD[(Long, Int)] = { + SerDe.fromTuple2RDD(model.assignments.map(x => (x.id, x.cluster))) + .asInstanceOf[RDD[(Long, Int)]] + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index c4bea7c2cad4f..fd87b58533fd9 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -406,6 +406,27 @@ private[python] class PythonMLLibAPI extends Serializable { model.predictSoft(data).map(Vectors.dense) } + /** + * Java stub for Python mllib PowerIterationClustering.run(). This stub returns a + * handle to the Java object instead of the content of the Java object. Extra care + * needs to be taken in the Python code to ensure it gets freed on exit; see the + * Py4J documentation. + */ + def trainPowerIterationClusteringModel( + data: JavaRDD[Vector], + k: Int, + maxIterations: Int, + initMode: String): PowerIterationClusteringModel = { + + val pic = new PowerIterationClustering() + .setK(k) + .setMaxIterations(maxIterations) + .setInitializationMode(initMode) + + val model = pic.run(data.rdd.map(v => (v(0).toLong, v(1).toLong, v(2)))) + new PowerIterationClusteringModelWrapper(model) + } + /** * Java stub for Python mllib ALS.train(). This stub returns a handle * to the Java object instead of the content of the Java object. Extra care diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index e6ef72942ce77..565f574e43dd4 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -25,15 +25,18 @@ from numpy import array, random, tile +from collections import namedtuple + from pyspark import SparkContext from pyspark.rdd import RDD, ignore_unicode_prefix -from pyspark.mllib.common import callMLlibFunc, callJavaFunc, _py2java, _java2py +from pyspark.mllib.common import JavaModelWrapper, callMLlibFunc, callJavaFunc, _py2java, _java2py from pyspark.mllib.linalg import SparseVector, _convert_to_vector, DenseVector from pyspark.mllib.stat.distribution import MultivariateGaussian -from pyspark.mllib.util import Saveable, Loader, inherit_doc +from pyspark.mllib.util import Saveable, Loader, inherit_doc, JavaLoader, JavaSaveable from pyspark.streaming import DStream __all__ = ['KMeansModel', 'KMeans', 'GaussianMixtureModel', 'GaussianMixture', + 'PowerIterationClusteringModel', 'PowerIterationClustering', 'StreamingKMeans', 'StreamingKMeansModel'] @@ -272,6 +275,94 @@ def train(cls, rdd, k, convergenceTol=1e-3, maxIterations=100, seed=None, initia return GaussianMixtureModel(weight, mvg_obj) +class PowerIterationClusteringModel(JavaModelWrapper, JavaSaveable, JavaLoader): + + """ + .. note:: Experimental + + Model produced by [[PowerIterationClustering]]. + + >>> data = [(0, 1, 1.0), (0, 2, 1.0), (1, 3, 1.0), (2, 3, 1.0), + ... (0, 3, 1.0), (1, 2, 1.0), (0, 4, 0.1)] + >>> rdd = sc.parallelize(data, 2) + >>> model = PowerIterationClustering.train(rdd, 2, 100) + >>> model.k + 2 + >>> sorted(model.assignments().collect()) + [Assignment(id=0, cluster=1), Assignment(id=1, cluster=0), ... + >>> import os, tempfile + >>> path = tempfile.mkdtemp() + >>> model.save(sc, path) + >>> sameModel = PowerIterationClusteringModel.load(sc, path) + >>> sameModel.k + 2 + >>> sorted(sameModel.assignments().collect()) + [Assignment(id=0, cluster=1), Assignment(id=1, cluster=0), ... + >>> from shutil import rmtree + >>> try: + ... rmtree(path) + ... except OSError: + ... pass + """ + + @property + def k(self): + """ + Returns the number of clusters. + """ + return self.call("k") + + def assignments(self): + """ + Returns the cluster assignments of this model. + """ + return self.call("getAssignments").map( + lambda x: (PowerIterationClustering.Assignment(x[0], x[1]))) + + @classmethod + def load(cls, sc, path): + model = cls._load_java(sc, path) + wrapper = sc._jvm.PowerIterationClusteringModelWrapper(model) + return PowerIterationClusteringModel(wrapper) + + +class PowerIterationClustering(object): + """ + .. note:: Experimental + + Power Iteration Clustering (PIC), a scalable graph clustering algorithm + developed by [[http://www.icml2010.org/papers/387.pdf Lin and Cohen]]. + From the abstract: PIC finds a very low-dimensional embedding of a + dataset using truncated power iteration on a normalized pair-wise + similarity matrix of the data. + """ + + @classmethod + def train(cls, rdd, k, maxIterations=100, initMode="random"): + """ + :param rdd: an RDD of (i, j, s,,ij,,) tuples representing the + affinity matrix, which is the matrix A in the PIC paper. + The similarity s,,ij,, must be nonnegative. + This is a symmetric matrix and hence s,,ij,, = s,,ji,,. + For any (i, j) with nonzero similarity, there should be + either (i, j, s,,ij,,) or (j, i, s,,ji,,) in the input. + Tuples with i = j are ignored, because we assume + s,,ij,, = 0.0. + :param k: Number of clusters. + :param maxIterations: Maximum number of iterations of the + PIC algorithm. + :param initMode: Initialization mode. + """ + model = callMLlibFunc("trainPowerIterationClusteringModel", + rdd.map(_convert_to_vector), int(k), int(maxIterations), initMode) + return PowerIterationClusteringModel(model) + + class Assignment(namedtuple("Assignment", ["id", "cluster"])): + """ + Represents an (id, cluster) tuple. + """ + + class StreamingKMeansModel(KMeansModel): """ .. note:: Experimental @@ -466,7 +557,8 @@ def predictOnValues(self, dstream): def _test(): import doctest - globs = globals().copy() + import pyspark.mllib.clustering + globs = pyspark.mllib.clustering.__dict__.copy() globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) globs['sc'].stop() From 6b03d826c5ba1bf24145ff1a689bad906a92ab2c Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Sun, 28 Jun 2015 15:51:42 +0800 Subject: [PATCH 2/2] address comments --- .../api/python/PowerIterationClusteringModelWrapper.scala | 5 ++--- .../org/apache/spark/mllib/api/python/PythonMLLibAPI.scala | 6 ++++++ python/pyspark/mllib/clustering.py | 2 +- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PowerIterationClusteringModelWrapper.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PowerIterationClusteringModelWrapper.scala index b4f3aad8d94b3..bc6041b221732 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PowerIterationClusteringModelWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PowerIterationClusteringModelWrapper.scala @@ -26,8 +26,7 @@ import org.apache.spark.mllib.clustering.PowerIterationClusteringModel private[python] class PowerIterationClusteringModelWrapper(model: PowerIterationClusteringModel) extends PowerIterationClusteringModel(model.k, model.assignments) { - def getAssignments: RDD[(Long, Int)] = { - SerDe.fromTuple2RDD(model.assignments.map(x => (x.id, x.cluster))) - .asInstanceOf[RDD[(Long, Int)]] + def getAssignments: RDD[Array[Any]] = { + model.assignments.map(x => Array(x.id, x.cluster)) } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index fd87b58533fd9..269fa38b7b151 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -411,6 +411,12 @@ private[python] class PythonMLLibAPI extends Serializable { * handle to the Java object instead of the content of the Java object. Extra care * needs to be taken in the Python code to ensure it gets freed on exit; see the * Py4J documentation. + * @param data an RDD of (i, j, s,,ij,,) tuples representing the affinity matrix. + * @param k number of clusters. + * @param maxIterations maximum number of iterations of the power iteration loop. + * @param initMode the initialization mode. This can be either "random" to use + * a random vector as vertex properties, or "degree" to use + * normalized sum similarities. Default: random. */ def trainPowerIterationClusteringModel( data: JavaRDD[Vector], diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index 565f574e43dd4..83b0f390d8b87 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -317,7 +317,7 @@ def assignments(self): Returns the cluster assignments of this model. """ return self.call("getAssignments").map( - lambda x: (PowerIterationClustering.Assignment(x[0], x[1]))) + lambda x: (PowerIterationClustering.Assignment(*x))) @classmethod def load(cls, sc, path):