# Cluster analysis example

### Importing MLlib libraries 

In [1]:
import org.apache.spark.mllib.clustering._
import org.apache.spark.mllib.linalg._
import org.apache.spark.rdd._
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._

### Read data and pre-processing

The Iris flower data set or Fisher's Iris data set is a multivariate data set introduced by Ronald Fisher in his 1936 paper "The use of multiple measurements in taxonomic problems as an example of linear discriminant analysis".
The data set consists of 50 samples from each of three species of Iris (Iris setosa, Iris virginica and Iris versicolor). Four features were measured from each sample: the length and the width of the sepals and petals, in centimetres.

In [2]:
val rawData = sc.textFile("data/iris.csv")

In [3]:
rawData.take(5).foreach(println)

5.1,3.5,1.4,0.2,Iris-setosa
4.9,3.0,1.4,0.2,Iris-setosa
4.7,3.2,1.3,0.2,Iris-setosa
4.6,3.1,1.5,0.2,Iris-setosa
5.0,3.6,1.4,0.2,Iris-setosa


The following  code splits the CSV lines into columns and removes the final column. The remaining values are converted to an array of numeric values (Double objects), and emitted with the final label column in a tuple.

In [4]:
rawData.map(_.split(',').last).countByValue().toSeq.sortBy(_._2).reverse.foreach(println)
val labelsAndData = rawData.map { line =>
      val buffer = line.split(',').toBuffer
      val label = buffer.remove(buffer.length - 1)
      val vector = Vectors.dense(buffer.map(_.toDouble).toArray)
      (label, vector)
                                }
val data = labelsAndData.values.cache()

(Iris-setosa,50)
(Iris-virginica,50)
(Iris-versicolor,50)


### Perform cluster analysis

k-means is one of the most commonly used clustering algorithms that clusters the data points into a predefined number of clusters. The MLlib implementation includes a parallelized variant of the k-means++ method called kmeans||

In [5]:
val numClusters = 2
val numIterations = 1
val model = KMeans.train(data,numClusters,numIterations)

In [6]:
model.clusterCenters.foreach(println)

[6.273737373737372,2.8757575757575764,4.925252525252526,1.6818181818181817]
[5.007843137254901,3.409803921568628,1.4921568627450983,0.2627450980392156]


### Apply cluster model

The following code uses the model to assign each observation to a cluster, counts occurrences of cluster and label pairs, and prints them.

In [7]:
val clusterLabelCount = labelsAndData.map { case (label, datum) =>
    val cluster = model.predict(datum)
    (cluster, label)}.countByValue()
    clusterLabelCount.toSeq.sorted.foreach { case ((cluster, label), count) =>
    println(f"$cluster%1s$label%18s$count%8s")
}

0   Iris-versicolor      47
0    Iris-virginica      50
1       Iris-setosa      50
1   Iris-versicolor       3


### Write sample and total result in a directory hadoop style and coalesce all in one file

In [8]:
val sample = data.map(datum => model.predict(datum) + "," +datum.toArray.mkString(",")).sample(false,0.05)
val total = data.map(datum => model.predict(datum) + "," +datum.toArray.mkString(","))
total.saveAsTextFile("results/total2")
total.coalesce(1).saveAsTextFile("results/sample_total2")

### Choice of K

A clustering could be considered good if each data point were near to its closest centroid. So, we define a Euclidean distance function, and a function that returns the distance from a data point to its nearest cluster’s centroid. From this, it’s possible to define a function that measures the average distance to centroid, for a model built with a given k.
This is an internal quality measure.

In [9]:
def distance(a: Vector, b: Vector) =math.sqrt(a.toArray.zip(b.toArray).map(p => p._1 - p._2).map(d => d * d).sum)
def distToCentroid(datum: Vector, model: KMeansModel) = {
      val cluster = model.predict(datum)
      val centroid = model.clusterCenters(cluster)
      distance(centroid, datum)
}   
def clusteringScore(data: RDD[Vector], k: Int): Double = {
      val kmeans = new KMeans()
      kmeans.setK(k)
      kmeans.setRuns(30)
      kmeans.setEpsilon(1.0e-6)
      val model = kmeans.run(data)
      data.map(datum => distToCentroid(datum, model)).mean()
}


Evaluate score values for different k from 1 to 10.

In [10]:
(1 to 10 by 1).map(k => (k, clusteringScore(data, k))).foreach(println)

(1,1.9440683605553895)
(2,0.855577769526653)
(3,0.6480304904934434)
(4,0.5573847727333313)
(5,0.5096521951118287)
(6,0.4670733963396439)
(7,0.4381125733411079)
(8,0.4108698632459714)
(9,0.3961705272186211)
(10,0.388696499714614)


### Data Normalization

Since Euclidean distance is used, the clusters will be influenced strongly by the magnitudes of the variables, especially by outliers. Normalizing removes this bias. We can normalize each feature by converting it to a standard score. This means subtracting the mean of the feature’s values from each value, and dividing by the standard deviation.

In [11]:
def buildNormalizationFunction(data: RDD[Vector]): (Vector => Vector) = {
    val dataAsArray = data.map(_.toArray)
    val numCols = dataAsArray.first().length
    val n = dataAsArray.count()
    val sums = dataAsArray.reduce(
      (a, b) => a.zip(b).map(t => t._1 + t._2))
    val sumSquares = dataAsArray.fold(
        new Array[Double](numCols)
      )(
        (a, b) => a.zip(b).map(t => t._1 + t._2 * t._2)
      )
    val stdevs = sumSquares.zip(sums).map {
      case (sumSq, sum) => math.sqrt(n * sumSq - sum * sum) / n
    }
    val means = sums.map(_ / n)

    (datum: Vector) => {
      val normalizedArray = (datum.toArray, means, stdevs).zipped.map(
        (value, mean, stdev) =>
          if (stdev <= 0)  (value - mean) else  (value - mean) / stdev
      )
      Vectors.dense(normalizedArray)
    }
}

In [12]:
val normalizedData = data.map(buildNormalizationFunction(data)).cache()

### Choice of K with normalized data

In [13]:
(1 to 10 by 1).map(k =>
      (k, clusteringScore(normalizedData, k))).foreach(println)

(1,0.034363366543011024)
(2,0.01477149695109605)
(3,0.008824012747163202)
(4,0.007194540980690931)
(5,0.006304525208280512)
(6,0.005749320505622912)
(7,0.005135446289737331)
(8,0.004699770499438223)
(9,0.00442993146456794)
(10,0.004269386854148335)


### Definition of Entropy measure

The are different  metrics for homogeneity. Entropy is used here for illustration. A good clustering would have clusters whose collections of labels are homogeneous and so have low entropy. A weighted average of entropy can therefore be used as a cluster score.
This is an external quality measure.

In [14]:
def entropy(counts: Iterable[Int]) = {
    val values = counts.filter(_ > 0)
    val n: Double = values.sum
    values.map { v =>
    val p = v / n 
    -p * math.log(p)
    }.sum
}

In [15]:
def buildCategoricalAndLabelFunction(rawData: RDD[String]): (String => (String,Vector)) = {
val splitData = rawData.map(_.split(','))
   (line: String) => {
      val buffer = line.split(',').toBuffer 
      val label = buffer.remove(buffer.length - 1)
      val vector = buffer.map(_.toDouble)
      (label, Vectors.dense(vector.toArray))
    }
}

### Entropy for choosing K

In [16]:
def clusteringScore3(normalizedLabelsAndData: RDD[(String,Vector)], k: Int) = {
    val kmeans = new KMeans()
    kmeans.setK(k)
    kmeans.setRuns(10)
    kmeans.setEpsilon(1.0e-6)
    val model = kmeans.run(normalizedLabelsAndData.values)
    val labelsAndClusters = normalizedLabelsAndData.mapValues(model.predict)
    val clustersAndLabels = labelsAndClusters.map(_.swap)
    val labelsInCluster = clustersAndLabels.groupByKey().values
    val labelCounts = labelsInCluster.map(_.groupBy(l => l).map(_._2.size))
    val n = normalizedLabelsAndData.count()
    labelCounts.map(m => m.sum * entropy(m)).sum / n
}

val parseFunction = buildCategoricalAndLabelFunction(rawData)
val labelsAndData = rawData.map(parseFunction)
val normalizedLabelsAndData =
      labelsAndData.mapValues(buildNormalizationFunction(labelsAndData.values)).cache()
(1 to 10 by 1).map(k =>
      (k, clusteringScore3(normalizedLabelsAndData, k))).foreach(println)
normalizedLabelsAndData.unpersist()

(1,1.0986122886681096)
(2,0.4620981203732969)
(3,0.16554309474734444)
(4,0.16368731464664396)
(5,0.12442865073449615)
(6,0.13095513392785052)
(7,0.11730722141295463)
(8,0.11730722141295463)
(9,0.11346845541210213)
(10,0.10606250990139728)


MapPartitionsRDD[1000] at mapValues at <console>:49

checked