# [Clustering4Ever](https://github.com/Clustering4Ever/Clustering4Ever) on [SparkNotebook](http://spark-notebook.io/) by [LIPN](https://lipn.univ-paris13.fr/) [A3](https://lipn.univ-paris13.fr/accueil/equipe/a3/) team

# Spark _K_-Means

In [ ]:
import smile.plot._
import scala.io.Source
import scala.collection.{mutable, immutable}
import clustering4ever.spark.clustering.kmeans.KMeans
import clustering4ever.math.distances.scalar.{Euclidean, Cosine, Minkowski}
import clustering4ever.spark.indexes.ExternalIndexes
import clustering4ever.scala.indexes.NmiNormalizationNature._
import clustering4ever.util.SparkImplicits._
import clustering4ever.scala.clusterizables.RealClusterizable
import clustering4ever.scala.vectorizables.RealVector

import smile.plot._
import scala.io.Source
import scala.collection.{mutable, immutable}
import clustering4ever.spark.clustering.kmeans.KMeans
import clustering4ever.math.distances.scalar.{Euclidean, Cosine, Minkowski}
import clustering4ever.spark.indexes.ExternalIndexes
import clustering4ever.scala.indexes.NmiNormalizationNature._
import clustering4ever.util.SparkImplicits._
import clustering4ever.scala.clusterizables.RealClusterizable
import clustering4ever.scala.vectorizables.RealVector


## Download dataset Aggregation

In [ ]:
:sh wget -P /tmp/ http://www.clustering4ever.org/Datasets/Aggregation/aggregation.csv
:sh wget -P /tmp/ http://www.clustering4ever.org/Datasets/Aggregation/labels

--2018-09-09 23:23:30--  http://www.clustering4ever.org/Datasets/Aggregation/aggregation.csv
Resolving www.clustering4ever.org (www.clustering4ever.org)... 62.210.16.62
Connecting to www.clustering4ever.org (www.clustering4ever.org)|62.210.16.62|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 8063 (7.9K) [text/csv]
Saving to: ‘/tmp/aggregation.csv.15’

     0K .......                                               100%  919K=0.009s

2018-09-09 23:23:30 (919 KB/s) - ‘/tmp/aggregation.csv.15’ saved [8063/8063]

:sh: Scheme missing.
--2018-09-09 23:23:30--  http://wget/
Resolving wget (wget)... failed: Name or service not known.
wget: unable to resolve host address ‘wget’
--2018-09-09 23:23:30--  http://www.clustering4ever.org/Datasets/Aggregation/labels
Reusing existing connection to www.clustering4ever.org:80.
HTTP request sent, awaiting response... 200 OK
Length: 1576 (1.5K)
Saving to: ‘/tmp/labels.17’

     0K .                                                  

## Import data as Array[Array[Double]]

In [ ]:
val datasetSize = 500000
val dim = 4
val useAggregationDS = true
val dp = 16

val path = "/tmp/aggregation.csv"
val rawRdd = if( useAggregationDS ) sc.textFile(path, dp).map( x => mutable.ArrayBuffer(x.split(",").map(_.toDouble):_*)) else sc.parallelize(List.fill(datasetSize)(mutable.ArrayBuffer.fill(dim)(scala.util.Random.nextDouble)), dp)
val rdd = rawRdd.zipWithIndex.map{ case (v, id) => new RealClusterizable(id, new RealVector(v)) }.cache
val labelsPath = "/tmp/labels"

rdd.count

datasetSize: Int = 500000
dim: Int = 4
useAggregationDS: Boolean = true
dp: Int = 16
path: String = /tmp/aggregation.csv
rawRdd: org.apache.spark.rdd.RDD[scala.collection.mutable.ArrayBuffer[Double]] = MapPartitionsRDD[2] at map at <console>:89
rdd: org.apache.spark.rdd.RDD[clustering4ever.scala.clusterizables.RealClusterizable[Long,scala.collection.mutable.ArrayBuffer[Double],scala.collection.mutable.ArrayBuffer[Double]]] = MapPartitionsRDD[4] at map at <console>:90
labelsPath: String = /tmp/labels
res4: Long = 788


## Parameters 

In [ ]:
val k = 7
val iterMax = 40
val epsilon = 0.5

k: Int = 7
iterMax: Int = 40
epsilon: Double = 0.5


## Run and measure the algorithm time, you can give a well prepared RDD[Clusterizable] or use implicit conversion and gives RDD[Seq[Double]]

In [ ]:
val t1 = System.currentTimeMillis
val model = KMeans.run[
  Long,
  mutable.ArrayBuffer[Double],
  mutable.ArrayBuffer[Double],
  RealClusterizable[Long, mutable.ArrayBuffer[Double], mutable.ArrayBuffer[Double]]
](
  sc,
//   rdd,
  rawRdd,
  k,
  epsilon,
  iterMax
)
val t2 = System.currentTimeMillis
(t2 - t1) / 1000D

t1: Long = 1536528232836
model: clustering4ever.spark.clustering.kmeans.KMeansModel[Long,scala.collection.mutable.ArrayBuffer[Double],scala.collection.mutable.ArrayBuffer[Double],clustering4ever.scala.clusterizables.RealClusterizable[Long,scala.collection.mutable.ArrayBuffer[Double],scala.collection.mutable.ArrayBuffer[Double]],clustering4ever.math.distances.scalar.Euclidean[scala.collection.mutable.ArrayBuffer[Double]]] = clustering4ever.spark.clustering.kmeans.KMeansModel@4ba72f9e
t2: Long = 1536528234206
res7: Double = 1.37


In [ ]:
val clusterized = model.centerPredict(rdd)
val lclusterized = clusterized.collect

clusterized: org.apache.spark.rdd.RDD[clustering4ever.scala.clusterizables.RealClusterizable[Long,scala.collection.mutable.ArrayBuffer[Double],scala.collection.mutable.ArrayBuffer[Double]]] = MapPartitionsRDD[24] at map at K-CommonsSpark.scala:153
lclusterized: Array[clustering4ever.scala.clusterizables.RealClusterizable[Long,scala.collection.mutable.ArrayBuffer[Double],scala.collection.mutable.ArrayBuffer[Double]]] = Array(RealClusterizable(0,clustering4ever.scala.vectorizables.RealVector@21f0c75a,None,Some(0)), RealClusterizable(1,clustering4ever.scala.vectorizables.RealVector@63d358e1,None,Some(0)), RealClusterizable(2,clustering4ever.scala.vectorizables.RealVector@1f449400,None,Some(0)), RealClusterizable(3,clustering4ever.scala.vectorizables.RealVector@3e7fc15c,None,Some(0)), RealC...

## Plot clustering results

In [ ]:
val rawData = lclusterized.map(_.vector.toArray).toArray
val labels = lclusterized.map( cz => cz.clusterID.get ).toArray

plot(rawData, labels, '*', Palette.COLORS)

rawData: Array[Array[Double]] = Array(Array(15.55, 28.65), Array(14.9, 27.55), Array(14.45, 28.35), Array(14.15, 28.8), Array(13.75, 28.05), Array(13.35, 28.45), Array(13.0, 29.15), Array(13.45, 27.5), Array(13.6, 26.5), Array(12.8, 27.35), Array(12.4, 27.85), Array(12.3, 28.4), Array(12.2, 28.65), Array(13.4, 25.1), Array(12.95, 25.95), Array(12.9, 26.5), Array(11.85, 27.0), Array(11.35, 28.0), Array(11.15, 28.7), Array(11.25, 27.4), Array(10.75, 27.7), Array(10.5, 28.35), Array(9.65, 28.45), Array(10.25, 27.25), Array(10.75, 26.55), Array(11.7, 26.35), Array(11.6, 25.9), Array(11.9, 25.05), Array(12.6, 24.05), Array(11.9, 24.5), Array(11.1, 25.2), Array(10.55, 25.15), Array(10.05, 25.95), Array(9.35, 26.6), Array(9.3, 27.25), Array(9.2, 27.8), Array(7.5, 28.25), Array(8.55, 27.45), Ar...

## Inspect performance metrics

In [ ]:
val trueLabels = sc.parallelize(Source.fromFile(labelsPath).getLines.map(_.toInt).toSeq)

val (_, trueLabelsFrom0) = ExternalIndexes.prepareLabels(trueLabels)

val predicts = clusterized.map(_.clusterID.get)

val trueAndPredictRDD = sc.parallelize(trueLabelsFrom0.collect.zip(predicts.collect).map{ case(gt, pred) => (gt.toInt, pred) })

val nmi = ExternalIndexes.nmi(sc, trueAndPredictRDD, SQRT)

nmi

trueLabels: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[25] at parallelize at <console>:100
trueLabelsFrom0: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[30] at map at ExternalIndexes.scala:76
predicts: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[31] at map at <console>:104
trueAndPredictRDD: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[32] at parallelize at <console>:106
nmi: Double = 0.8294086258162061
res12: Double = 0.8294086258162061
