# [Clustering4Ever](https://github.com/Clustering4Ever/Clustering4Ever) on [SparkNotebook](http://spark-notebook.io/) by [LIPN](https://lipn.univ-paris13.fr/) [A3](https://lipn.univ-paris13.fr/accueil/equipe/a3/) team

# Scala _K_-Means

In [ ]:
import smile.plot._
import scala.io.Source
import scala.collection.{immutable, mutable, parallel}
import org.clustering4ever.clustering.patchwork.PatchWork
import org.clustering4ever.scala.indic.ExternalIndexes
import org.clustering4ever.scala.indic.NmiNormalizationNature._

<console>:71: error: object indexes is not a member of package org.clustering4ever.scala
       import org.clustering4ever.scala.indexes.ExternalIndexes
                                        ^
<console>:72: error: object indexes is not a member of package org.clustering4ever.scala
       import org.clustering4ever.scala.indexes.NmiNormalizationNature._
                                        ^


## Download dataset Aggregation

In [ ]:
:sh wget -P /tmp/ http://www.clustering4ever.org/Datasets/Aggregation/aggregation.csv
:sh wget -P /tmp/ http://www.clustering4ever.org/Datasets/Aggregation/labels

--2018-11-15 22:06:36--  http://www.clustering4ever.org/Datasets/Aggregation/aggregation.csv
Resolving www.clustering4ever.org (www.clustering4ever.org)... 62.210.16.62
Connecting to www.clustering4ever.org (www.clustering4ever.org)|62.210.16.62|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 8063 (7.9K) [text/csv]
Saving to: ‘/tmp/aggregation.csv.5’

     0K .......                                               100% 81.5M=0s

2018-11-15 22:06:36 (81.5 MB/s) - ‘/tmp/aggregation.csv.5’ saved [8063/8063]

:sh: Scheme missing.
--2018-11-15 22:06:36--  http://wget/
Resolving wget (wget)... failed: Name or service not known.
wget: unable to resolve host address ‘wget’
--2018-11-15 22:06:36--  http://www.clustering4ever.org/Datasets/Aggregation/labels
Reusing existing connection to www.clustering4ever.org:80.
HTTP request sent, awaiting response... 200 OK
Length: 1576 (1.5K)
Saving to: ‘/tmp/labels.6’

     0K .                                                     100

## Import data as ParArray[ArrayBuffer[Double]]

In [ ]:
val datasetSize = 100000
val dim = 2
val path = "/tmp/aggregation.csv"
val useAggregationDS = true

val groundTruePath = "/tmp/labels"

val rdd = sc.parallelize(Source.fromFile(path).getLines.map( x => mutable.ArrayBuffer(x.split(",").map(_.toDouble):_*) ).toSeq).cache
val rddArray = rdd.map(_.toArray).cache

datasetSize: Int = 100000
dim: Int = 2
path: String = /tmp/aggregation.csv
useAggregationDS: Boolean = true
groundTruePath: String = /tmp/labels
rdd: org.apache.spark.rdd.RDD[scala.collection.mutable.ArrayBuffer[Double]] = ParallelCollectionRDD[0] at parallelize at <console>:89
rddArray: org.apache.spark.rdd.RDD[Array[Double]] = MapPartitionsRDD[1] at map at <console>:90


## Parameters 

In [ ]:
val epsilon = Array(1.5, 1.5)
val minPts = 1
val minCellInCluster = 20
val ratio = 0.0

epsilon: Array[Double] = Array(1.5, 1.5)
minPts: Int = 1
minCellInCluster: Int = 20
ratio: Double = 0.0


## Run and measure the algorithm time

In [ ]:
val t1 = System.nanoTime
val model = new PatchWork(epsilon, minPts, ratio, minCellInCluster).run(rddArray)
val t2 = System.nanoTime

(t2 - t1) / 1000000000D

t1: Long = 3408619110636
model: org.clustering4ever.spark.clustering.patchwork.PatchWorkModel = org.clustering4ever.spark.clustering.patchwork.PatchWorkModel@5ad1875
t2: Long = 3410150804314
res6: Double = 1.531693678


## Different ways to apply model to datasets

In [ ]:
val clusterized1 = rddArray.map( v => (model.predict(v).getID, v) ).collect

clusterized1: Array[(Int, Array[Double])] = Array((1,Array(15.55, 28.65)), (1,Array(14.9, 27.55)), (1,Array(14.45, 28.35)), (1,Array(14.15, 28.8)), (1,Array(13.75, 28.05)), (1,Array(13.35, 28.45)), (1,Array(13.0, 29.15)), (1,Array(13.45, 27.5)), (1,Array(13.6, 26.5)), (1,Array(12.8, 27.35)), (1,Array(12.4, 27.85)), (1,Array(12.3, 28.4)), (1,Array(12.2, 28.65)), (1,Array(13.4, 25.1)), (1,Array(12.95, 25.95)), (1,Array(12.9, 26.5)), (1,Array(11.85, 27.0)), (1,Array(11.35, 28.0)), (1,Array(11.15, 28.7)), (1,Array(11.25, 27.4)), (1,Array(10.75, 27.7)), (1,Array(10.5, 28.35)), (1,Array(9.65, 28.45)), (1,Array(10.25, 27.25)), (1,Array(10.75, 26.55)), (1,Array(11.7, 26.35)), (1,Array(11.6, 25.9)), (1,Array(11.9, 25.05)), (1,Array(12.6, 24.05)), (1,Array(11.9, 24.5)), (1,Array(11.1, 25.2)), (1,...

## Plot clustering results

In [ ]:
val rawData = clusterized1.map(_._2.toArray).toArray
val labels = clusterized1.map(_._1).toArray
plot(rawData, labels, '*', Palette.COLORS)

rawData: Array[Array[Double]] = Array(Array(15.55, 28.65), Array(14.9, 27.55), Array(14.45, 28.35), Array(14.15, 28.8), Array(13.75, 28.05), Array(13.35, 28.45), Array(13.0, 29.15), Array(13.45, 27.5), Array(13.6, 26.5), Array(12.8, 27.35), Array(12.4, 27.85), Array(12.3, 28.4), Array(12.2, 28.65), Array(13.4, 25.1), Array(12.95, 25.95), Array(12.9, 26.5), Array(11.85, 27.0), Array(11.35, 28.0), Array(11.15, 28.7), Array(11.25, 27.4), Array(10.75, 27.7), Array(10.5, 28.35), Array(9.65, 28.45), Array(10.25, 27.25), Array(10.75, 26.55), Array(11.7, 26.35), Array(11.6, 25.9), Array(11.9, 25.05), Array(12.6, 24.05), Array(11.9, 24.5), Array(11.1, 25.2), Array(10.55, 25.15), Array(10.05, 25.95), Array(9.35, 26.6), Array(9.3, 27.25), Array(9.2, 27.8), Array(7.5, 28.25), Array(8.55, 27.45), Ar...