Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
huaxingao committed Apr 27, 2018
1 parent 387d6ff commit 6d00f34
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 109 deletions.
218 changes: 109 additions & 109 deletions python/pyspark/ml/clustering.py
Expand Up @@ -26,7 +26,7 @@
__all__ = ['BisectingKMeans', 'BisectingKMeansModel', 'BisectingKMeansSummary',
'KMeans', 'KMeansModel',
'GaussianMixture', 'GaussianMixtureModel', 'GaussianMixtureSummary',
'LDA', 'LDAModel', 'LocalLDAModel', 'DistributedLDAModel']
'LDA', 'LDAModel', 'LocalLDAModel', 'DistributedLDAModel', 'PowerIterationClustering']


class ClusteringSummary(JavaWrapper):
Expand Down Expand Up @@ -836,7 +836,7 @@ class LDA(JavaEstimator, HasFeaturesCol, HasMaxIter, HasSeed, HasCheckpointInter
Terminology:
- "term" = "word": an el
- "term" = "word": an element of the vocabulary
- "token": instance of a term appearing in a document
- "topic": multinomial distribution over terms representing some concept
- "document": one piece of text, corresponding to one row in the input data
Expand Down Expand Up @@ -938,7 +938,7 @@ def __init__(self, featuresCol="features", maxIter=20, seed=None, checkpointInte
k=10, optimizer="online", learningOffset=1024.0, learningDecay=0.51,\
subsamplingRate=0.05, optimizeDocConcentration=True,\
docConcentration=None, topicConcentration=None,\
topicDistributionCol="topicDistribution", keepLastCheckpoint=True):
topicDistributionCol="topicDistribution", keepLastCheckpoint=True)
"""
super(LDA, self).__init__()
self._java_obj = self._new_java_obj("org.apache.spark.ml.clustering.LDA", self.uid)
Expand Down Expand Up @@ -967,7 +967,7 @@ def setParams(self, featuresCol="features", maxIter=20, seed=None, checkpointInt
k=10, optimizer="online", learningOffset=1024.0, learningDecay=0.51,\
subsamplingRate=0.05, optimizeDocConcentration=True,\
docConcentration=None, topicConcentration=None,\
topicDistributionCol="topicDistribution", keepLastCheckpoint=True):
topicDistributionCol="topicDistribution", keepLastCheckpoint=True)
Sets params for LDA.
"""
Expand Down Expand Up @@ -1156,126 +1156,68 @@ def getKeepLastCheckpoint(self):
return self.getOrDefault(self.keepLastCheckpoint)


class _PowerIterationClusteringParams(JavaParams, HasMaxIter, HasPredictionCol):
"""
Params for :py:attr:`PowerIterationClustering`.
.. versionadded:: 2.4.0
"""

k = Param(Params._dummy(), "k",
"The number of clusters to create. Must be > 1.",
typeConverter=TypeConverters.toInt)
initMode = Param(Params._dummy(), "initMode",
"The initialization algorithm. This can be either " +
"'random' to use a random vector as vertex properties, or 'degree' to use " +
"a normalized sum of similarities with other vertices. Supported options: " +
"'random' and 'degree'.",
typeConverter=TypeConverters.toString)
idCol = Param(Params._dummy(), "idCol",
"Name of the input column for vertex IDs.",
typeConverter=TypeConverters.toString)
neighborsCol = Param(Params._dummy(), "neighborsCol",
"Name of the input column for neighbors in the adjacency list " +
"representation.",
typeConverter=TypeConverters.toString)
similaritiesCol = Param(Params._dummy(), "similaritiesCol",
"Name of the input column for non-negative weights (similarities) " +
"of edges between the vertex in `idCol` and each neighbor in " +
"`neighborsCol`",
typeConverter=TypeConverters.toString)

@since("2.4.0")
def getK(self):
"""
Gets the value of `k`
"""
return self.getOrDefault(self.k)

@since("2.4.0")
def getInitMode(self):
"""
Gets the value of `initMode`
"""
return self.getOrDefault(self.initMode)

@since("2.4.0")
def getIdCol(self):
"""
Gets the value of `idCol`
"""
return self.getOrDefault(self.idCol)

@since("2.4.0")
def getNeighborsCol(self):
"""
Gets the value of `neighborsCol`
"""
return self.getOrDefault(self.neighborsCol)

@since("2.4.0")
def getSimilaritiesCol(self):
"""
Gets the value of `similaritiesCol`
"""
return self.getOrDefault(self.binary)


@inherit_doc
class PowerIterationClustering(JavaTransformer, _PowerIterationClusteringParams, JavaMLReadable,
JavaMLWritable):
class PowerIterationClustering(HasMaxIter, HasPredictionCol, JavaTransformer, JavaParams,
JavaMLReadable, JavaMLWritable):
"""
Model produced by [[PowerIterationClustering]].
.. note:: Experimental
Power Iteration Clustering (PIC), a scalable graph clustering algorithm developed by
<a href=http://www.icml2010.org/papers/387.pdf>Lin and Cohen</a>. From the abstract:
PIC finds a very low-dimensional embedding of a dataset using truncated power
iteration on a normalized pair-wise similarity matrix of the data.
PIC takes an affinity matrix between items (or vertices) as input. An affinity matrix
is a symmetric matrix whose entries are non-negative similarities between items.
PIC takes this matrix (or graph) as an adjacency matrix. Specifically, each input row
includes:
- :py:class:`idCol`: vertex ID
- :py:class:`neighborsCol`: neighbors of vertex in :py:class:`idCol`
- :py:class:`similaritiesCol`: non-negative weights (similarities) of edges between the
vertex in :py:class:`idCol` and each neighbor in :py:class:`neighborsCol`
PIC returns a cluster assignment for each input vertex. It appends a new column
:py:class:`predictionCol` containing the cluster assignment in :py:class:`[0,k)` for
each row (vertex).
Notes:
- [[PowerIterationClustering]] is a transformer with an expensive [[transform]] operation.
Transform runs the iterative PIC algorithm to cluster the whole input dataset.
- Input validation: This validates that similarities are non-negative but does NOT validate
that the input matrix is symmetric.
@see <a href=http://en.wikipedia.org/wiki/Spectral_clustering>
Spectral clustering (Wikipedia)</a>
>>> from pyspark.sql.types import ArrayType, DoubleType, LongType, StructField, StructType
>>> import math
>>> def genCircle(r, n):
... points = []
... for i in range(0, n):
... theta = 2.0 * math.pi * i / n
... points.append((r * math.cos(theta), r * math.sin(theta)))
... return points
>>> def sim(x, y):
... dist = (x[0] - y[0]) * (x[0] - y[0]) + (x[1] - y[1]) * (x[1] - y[1])
... return math.exp(-dist / 2.0)
>>> r1 = 1.0
>>> n1 = 10
>>> r2 = 4.0
>>> n2 = 40
>>> n = n1 + n2
>>> points = genCircle(r1, n1) + genCircle(r2, n2)
>>> similarities = []
>>> for i in range (1, n):
... neighbor = []
... weight = []
... for j in range (i):
... neighbor.append((long)(j))
... weight.append(sim(points[i], points[j]))
... similarities.append([(long)(i), neighbor, weight])
>>> similarities = [((long)(1), [0], [0.5]), ((long)(2), [0, 1], [0.7,0.5]), \
((long)(3), [0, 1, 2], [0.9, 0.7, 0.5]), \
((long)(4), [0, 1, 2, 3], [1.1, 0.9, 0.7,0.5]), \
((long)(5), [0, 1, 2, 3, 4], [1.3, 1.1, 0.9, 0.7,0.5])]
>>> rdd = sc.parallelize(similarities, 2)
>>> schema = StructType([StructField("id", LongType(), False), \
StructField("neighbors", ArrayType(LongType(), False), True), \
StructField("similarities", ArrayType(DoubleType(), False), True)])
StructField("neighbors", ArrayType(LongType(), False), True), \
StructField("similarities", ArrayType(DoubleType(), False), True)])
>>> df = spark.createDataFrame(rdd, schema)
>>> pic = PowerIterationClustering()
>>> result = pic.setK(2).setMaxIter(40).transform(df)
>>> result = pic.setK(2).setMaxIter(10).transform(df)
>>> predictions = sorted(set([(i[0], i[1]) for i in result.select(result.id, result.prediction)
... .collect()]), key=lambda x: x[0])
>>> predictions[0]
(1, 1)
>>> predictions[8]
(9, 1)
>>> predictions[9]
(10, 0)
>>> predictions[20]
(21, 0)
>>> predictions[48]
(49, 0)
>>> predictions[1]
(2, 1)
>>> predictions[2]
(3, 0)
>>> predictions[3]
(4, 0)
>>> predictions[4]
(5, 0)
>>> pic_path = temp_path + "/pic"
>>> pic.save(pic_path)
>>> pic2 = PowerIterationClustering.load(pic_path)
>>> pic2.getK()
2
>>> pic2.getMaxIter()
40
10
>>> pic3 = PowerIterationClustering(k=4, initMode="degree")
>>> pic3.getIdCol()
'id'
Expand All @@ -1288,12 +1230,35 @@ class PowerIterationClustering(JavaTransformer, _PowerIterationClusteringParams,
.. versionadded:: 2.4.0
"""

k = Param(Params._dummy(), "k",
"The number of clusters to create. Must be > 1.",
typeConverter=TypeConverters.toInt)
initMode = Param(Params._dummy(), "initMode",
"The initialization algorithm. This can be either " +
"'random' to use a random vector as vertex properties, or 'degree' to use " +
"a normalized sum of similarities with other vertices. Supported options: " +
"'random' and 'degree'.",
typeConverter=TypeConverters.toString)
idCol = Param(Params._dummy(), "idCol",
"Name of the input column for vertex IDs.",
typeConverter=TypeConverters.toString)
neighborsCol = Param(Params._dummy(), "neighborsCol",
"Name of the input column for neighbors in the adjacency list " +
"representation.",
typeConverter=TypeConverters.toString)
similaritiesCol = Param(Params._dummy(), "similaritiesCol",
"Name of the input column for non-negative weights (similarities) " +
"of edges between the vertex in `idCol` and each neighbor in " +
"`neighborsCol`",
typeConverter=TypeConverters.toString)

@keyword_only
def __init__(self, predictionCol="prediction", k=2, maxIter=20, initMode="random",
idCol="id", neighborsCol="neighbors", similaritiesCol="similarities"):
"""
__init__(self, predictionCol="prediction", k=2, maxIter=20, initMode="random",\
idCol="id", neighborsCol="neighbors", similaritiesCol="similarities"):
idCol="id", neighborsCol="neighbors", similaritiesCol="similarities")
"""
super(PowerIterationClustering, self).__init__()
self._java_obj = self._new_java_obj(
Expand All @@ -1309,7 +1274,7 @@ def setParams(self, predictionCol="prediction", k=2, maxIter=20, initMode="rando
idCol="id", neighborsCol="neighbors", similaritiesCol="similarities"):
"""
setParams(self, predictionCol="prediction", k=2, maxIter=20, initMode="random",\
idCol="id", neighborsCol="neighbors", similaritiesCol="similarities"):
idCol="id", neighborsCol="neighbors", similaritiesCol="similarities")
Sets params for PowerIterationClustering.
"""
kwargs = self._input_kwargs
Expand All @@ -1322,34 +1287,69 @@ def setK(self, value):
"""
return self._set(k=value)

@since("2.4.0")
def getK(self):
"""
Gets the value of :py:attr:`k`.
"""
return self.getOrDefault(self.k)

@since("2.4.0")
def setInitMode(self, value):
"""
Sets the value of :py:attr:`initMode`.
"""
return self._set(initMode=value)

@since("2.4.0")
def getInitMode(self):
"""
Gets the value of `initMode`
"""
return self.getOrDefault(self.initMode)

@since("2.4.0")
def setIdCol(self, value):
"""
Sets the value of :py:attr:`idCol`.
"""
return self._set(idCol=value)

@since("2.4.0")
def getIdCol(self):
"""
Gets the value of :py:attr:`idCol`.
"""
return self.getOrDefault(self.idCol)

@since("2.4.0")
def setNeighborsCol(self, value):
"""
Sets the value of :py:attr:`neighborsCol.
"""
return self._set(neighborsCol=value)

@since("2.4.0")
def getNeighborsCol(self):
"""
Gets the value of :py:attr:`neighborsCol`.
"""
return self.getOrDefault(self.neighborsCol)

@since("2.4.0")
def setSimilaritiesCol(self, value):
"""
Sets the value of :py:attr:`similaritiesCol`.
"""
return self._set(similaritiesCol=value)

@since("2.4.0")
def getSimilaritiesCol(self):
"""
Gets the value of :py:attr:`similaritiesCol`.
"""
return self.getOrDefault(self.binary)


if __name__ == "__main__":
import doctest
Expand Down
47 changes: 47 additions & 0 deletions python/pyspark/ml/tests.py
Expand Up @@ -1873,6 +1873,53 @@ def test_kmeans_cosine_distance(self):
self.assertTrue(result[4].prediction == result[5].prediction)


class PowerIterationClustering(SparkSessionTestCase):

def test_power_iteration_clustering(self):
from pyspark.sql.types import ArrayType, DoubleType, LongType, StructField, StructType
from pyspark.ml.clustering import PowerIterationClustering
import math

def genCircle(r, n):
points = []
for i in range(0, n):
theta = 2.0 * math.pi * i / n
points.append((r * math.cos(theta), r * math.sin(theta)))
return points

def sim(x, y):
dist = (x[0] - y[0]) * (x[0] - y[0]) + (x[1] - y[1]) * (x[1] - y[1])
return math.exp(-dist / 2.0)

r1 = 1.0
n1 = 10
r2 = 4.0
n2 = 40
n = n1 + n2
points = genCircle(r1, n1) + genCircle(r2, n2)
similarities = []
for i in range(1, n):
neighbor = []
weight = []
for j in range(i):
neighbor.append((long)(j))
weight.append(sim(points[i], points[j]))
similarities.append([(long)(i), neighbor, weight])
rdd = self.sc.parallelize(similarities, 2)
schema = StructType([StructField("id", LongType(), False),
StructField("neighbors", ArrayType(LongType(), False), True),
StructField("similarities", ArrayType(DoubleType(), False), True)])
df = self.spark.createDataFrame(rdd, schema)
pic = PowerIterationClustering()
result = pic.setK(2).setMaxIter(40).transform(df)
predictions = sorted(set([(i[0], i[1]) for i in result.select(result.id,
result.prediction).collect()]), key=lambda x: x[0])
for i in range(0, 8):
self.assertEqual(predictions[i], (i+1, 1))
for i in range(9, 48):
self.assertEqual(predictions[i], (i+1, 0))


class OneVsRestTests(SparkSessionTestCase):

def test_copy(self):
Expand Down

0 comments on commit 6d00f34

Please sign in to comment.