In [1]:
# start the Spark Context
import findspark
findspark.init()

In [2]:
import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

K-Means

In [3]:
# k-means is one of the most commonly used clustering algorithms that clusters 
# the data points into a predefined number of clusters.
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

In [4]:
# Loads data.
dataset = spark.read.format("libsvm").load("sample_kmeans_data.txt")

In [5]:
dataset.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|           (3,[],[])|
|  1.0|(3,[0,1,2],[0.1,0...|
|  2.0|(3,[0,1,2],[0.2,0...|
|  3.0|(3,[0,1,2],[9.0,9...|
|  4.0|(3,[0,1,2],[9.1,9...|
|  5.0|(3,[0,1,2],[9.2,9...|
+-----+--------------------+



In [6]:
# Trains a k-means model.
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(dataset)

In [7]:
# Make predictions
predictions = model.transform(dataset)

In [8]:
predictions.show()

+-----+--------------------+----------+
|label|            features|prediction|
+-----+--------------------+----------+
|  0.0|           (3,[],[])|         0|
|  1.0|(3,[0,1,2],[0.1,0...|         0|
|  2.0|(3,[0,1,2],[0.2,0...|         0|
|  3.0|(3,[0,1,2],[9.0,9...|         1|
|  4.0|(3,[0,1,2],[9.1,9...|         1|
|  5.0|(3,[0,1,2],[9.2,9...|         1|
+-----+--------------------+----------+



In [9]:
# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()

In [10]:
# The silhouette value is a measure of how similar an object is to its 
# own cluster (cohesion) compared to other clusters (separation).
silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

Silhouette with squared euclidean distance = 0.9997530305375207


In [11]:
# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

Cluster Centers: 
[0.1 0.1 0.1]
[9.1 9.1 9.1]


Latent Dirichlet Allocation (LDA)

In [12]:
# LDA represents documents as a mixture of topics that identify words with 
# certain probabilities.
from pyspark.ml.clustering import LDA

In [13]:
# Loads data.
dataset = spark.read.format("libsvm").load("sample_lda_libsvm_data.txt")

In [14]:
dataset.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(11,[0,1,2,4,5,6,...|
|  1.0|(11,[0,1,3,4,7,10...|
|  2.0|(11,[0,1,2,5,6,8,...|
|  3.0|(11,[0,1,3,6,8,9,...|
|  4.0|(11,[0,1,2,3,4,6,...|
|  5.0|(11,[0,1,3,4,5,6,...|
|  6.0|(11,[0,1,3,6,8,9,...|
|  7.0|(11,[0,1,2,3,4,5,...|
|  8.0|(11,[0,1,3,4,5,6,...|
|  9.0|(11,[0,1,2,4,6,8,...|
| 10.0|(11,[0,1,2,3,5,6,...|
| 11.0|(11,[0,1,4,5,6,7,...|
+-----+--------------------+



In [15]:
# Trains a LDA model.
lda = LDA(k=10, maxIter=10)
model = lda.fit(dataset)

In [16]:
ll = model.logLikelihood(dataset)
lp = model.logPerplexity(dataset)
print("The lower bound on the log likelihood of the entire corpus: " + str(ll))
print("The upper bound on perplexity: " + str(lp))

The lower bound on the log likelihood of the entire corpus: -780.6088066046375
The upper bound on perplexity: 3.002341556818048


In [17]:
# Describe topics.
topics = model.describeTopics(3)
print("The topics described by their top-weighted terms:")
topics.show(truncate=False)

The topics described by their top-weighted terms:
+-----+-----------+---------------------------------------------------------------+
|topic|termIndices|termWeights                                                    |
+-----+-----------+---------------------------------------------------------------+
|0    |[10, 6, 2] |[0.09943625304073225, 0.09656861173688167, 0.09444596832167373]|
|1    |[7, 3, 4]  |[0.10233471045862663, 0.10102649230424836, 0.09522666933010829]|
|2    |[6, 3, 10] |[0.10301447441084771, 0.0975497952238186, 0.09672088440203121] |
|3    |[5, 4, 1]  |[0.1751152052600677, 0.1355111841618985, 0.12503611536090373]  |
|4    |[6, 7, 2]  |[0.10084575392429412, 0.09929275794167297, 0.09818559610016428]|
|5    |[7, 3, 8]  |[0.10871077644897749, 0.10730095470606255, 0.10214692001601644]|
|6    |[1, 6, 9]  |[0.10389867465851156, 0.10034711060364988, 0.0989482022707671] |
|7    |[3, 10, 9] |[0.20090550775001445, 0.14359143107060254, 0.13969216127267556]|
|8    |[5, 2, 9]  |[0.1027

In [18]:
# Shows the result of label, features, and topic Distribution
transformed = model.transform(dataset)
transformed.show(truncate=False)

+-----+---------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|label|features                                                       |topicDistribution                                                                                                                                                                                                     |
+-----+---------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|0.0  |(11,[0,1,2,4,5,6,7,10],[1.0,2.0,6.0,2.0,3.0,1.0,1.0,3.0])      |[0.0046745740897159846,0.004674496658531129,0.004674549198917468,0.9

In [19]:
spark.stop()