In [1]:
val rawData = sc.textFile("/docker/datasets/kddcup.data").cache()//.sample(false, 0.1)

In [7]:
sc.getConf.getAll

Array((spark.localProperties.clone,true), (spark.cores.max,8 spark.executor.memory=12g), (spark.app.name,IBM Spark Kernel), (spark.driver.host,192.168.0.8), (spark.master,local[*]), (spark.executor.id,driver), (spark.submit.deployMode,client), (spark.repl.class.uri,http://192.168.0.8:38359), (spark.fileserver.uri,http://192.168.0.8:47359), (spark.externalBlockStore.folderName,spark-6cd3add5-80e1-4e47-99e2-3a1effebf71d), (spark.driver.port,56663), (spark.app.id,local-1450171299861), (spark.jars,file:/opt/spark-kernel/lib/kernel-assembly-0.1.5-SNAPSHOT.jar))

In [8]:
rawData.first

0,tcp,http,SF,215,45076,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,1,1,0.00,0.00,0.00,0.00,1.00,0.00,0.00,0,0,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,normal.

In [None]:
rawData.map(_.split(',').last).countByValue.toSeq.sortBy(_._2).reverse.foreach(println)

In [None]:
import org.apache.spark.mllib.linalg._

val labelsAndData = rawData.map { line =>
    val buffer = line.split(',').toBuffer
    buffer.remove(1, 3)
    val label = buffer.remove(buffer.length-1)
    val vector = Vectors.dense(buffer.map(_.toDouble).toArray)
    (label, vector)
}

In [None]:
val data = labelsAndData.values.cache()

## Train k-means model

In [None]:
import org.apache.spark.mllib.clustering._

val kmeans = new KMeans()
val model = kmeans.run(data)

model.clusterCenters.foreach(println)

In [None]:
val clusterLabel = labelsAndData.map { case (label, datum) =>
    val cluster = model.predict(datum)
    (cluster, label)
}

In [None]:
val clusterLabelCounts = clusterLabel.countByValue

In [None]:
clusterLabelCounts.toSeq.sorted.foreach {
    case ((cluster, label), count) => println(f"$cluster%1s$label%18s$count")
}

## Calculating distances

Calculating distances between 2 vectors using Euclidean distance function:
$$ distance(A, B) = \sqrt{\sum (A-B)^2} $$

In [None]:
import math._
import org.apache.spark.rdd._
import org.apache.spark.mllib.clustering._

def distance(a: Vector, b: Vector) = sqrt((a.toArray zip b.toArray).map { case (x, y) => pow(x-y, 2) }.sum)

def distanceToCentroid(a: Vector, model: KMeansModel) = {
    val cluster = model.predict(a)
    val centroid = model.clusterCenters(cluster)
    distance(a, centroid)
}

def clusterScore(data: RDD[Vector], k: Int) = {
    val kmeans = new KMeans()
    kmeans.setK(k)
    val model = kmeans.run(data)
    data.map(a => distanceToCentroid(a, model)).mean
}

In [None]:
(5 to 40 by 5).map(k => k -> clusterScore(data, k)).toList.foreach(println)

## Testing on Normalized Data
Selecting a good value of *k* that returns a small *k* with a small distences avarage

In [None]:
import org.apache.spark.mllib.feature._

val normalizer = new Normalizer(2)
def normalize(a: Vector) = normalizer.transform(a)

In [None]:
val normalizedData = data.map(normalize).cache

In [None]:
(60 to 120 by 10).par.map(k => k -> clusterScore(normalizedData, k)).toList.foreach(println)

## Scoring with Entropy
_

$$ entropy(v) = \sum_i\big(P(v_i)*\log_2 P(v_i)\big) $$
$$ P(v_i) = \dfrac{v_i}{\sum{v}} $$

In [None]:
import math._

def entropy(counts: Iterable[Int]) = {
    val values = counts.filter(_ > 0)
    val n: Double = values.sum
    values.map { v =>
        val p = v/n
        -p * log(p)
    }.sum
}

In [None]:
def clusterEntropyScore(dataAndLables: RDD[(String, Vector)], k: Int) = {
    val kmeans = new KMeans()
    kmeans.setK(k)
    val model = kmeans.run(dataAndLables.values)
    val labelsAndClusters = dataAndLables.mapValues(model.predict)
    val clustersAndLabels = labelsAndClusters.map(_.swap)
    val labelsInClusters = clustersAndLabels.groupByKey().values
    val labelCountsInClusters = labelsInClusters.map(_.groupBy(l => l).map(_._2.size))
    val n = dataAndLables.count
    labelCountsInClusters.map(m => m.sum * entropy(m)).sum / n
}

In [None]:
val normalizedDataAndLabels = labelsAndData.mapValues(normalize)
(60 to 160 by 10).par.map(k => k -> clusterEntropyScore(normalizedDataAndLabels, k)).toList.foreach(println)