In [3]:
val sqlC = new org.apache.spark.sql.SQLContext(sc)
import sqlC.implicits._

val data = spark.read.
    option("inferSchema",true).
    option("header",false).
    csv("kddcup.data_10_percent_corrected").
    toDF("duration","protocol_type","service","flag",
         "src_bytes","dst_bytes","land","srong_fragment","urgent",
         "hot","num_failed_logins","logged_in","num_compromised",
         "root_shell","su_attempted","num_root","num_file_creations",
         "num_shells","num_access_files","num_outbound_cmds",
         "is_host_login","is_guest_login","count","srv_count",
         "serror_Rate","srv_serror_rate","rerror_rate","srv_rerror_rate",
         "same_srv_rate","diff_srv_rate","srv_diff_host_rate",
         "dst_host_count","dst_host_srv_count",
         "dst_host_same_srv_rate","dst_host_diff_srv_rate",
         "dst_host_same_src_port_rate","dst_host_srv_diff_host_rate",
         "dst_host_serror_rate","dst_host_srv_serror_rate",
         "dst_host_rerror_rate","dst_host_src_rerror_rate",
         "label")

sqlC = org.apache.spark.sql.SQLContext@54442443
data = [duration: int, protocol_type: string ... 40 more fields]


lastException: Throwable = null


[duration: int, protocol_type: string ... 40 more fields]

In [4]:
data.cache()

[duration: int, protocol_type: string ... 40 more fields]

In [7]:
data.select("label").groupBy("label").count().orderBy($"count".desc).show(25)

+----------------+------+
|           label| count|
+----------------+------+
|          smurf.|280790|
|        neptune.|107201|
|         normal.| 97278|
|           back.|  2203|
|          satan.|  1589|
|        ipsweep.|  1247|
|      portsweep.|  1040|
|    warezclient.|  1020|
|       teardrop.|   979|
|            pod.|   264|
|           nmap.|   231|
|   guess_passwd.|    53|
|buffer_overflow.|    30|
|           land.|    21|
|    warezmaster.|    20|
|           imap.|    12|
|        rootkit.|    10|
|     loadmodule.|     9|
|      ftp_write.|     8|
|       multihop.|     7|
|            phf.|     4|
|           perl.|     3|
|            spy.|     2|
+----------------+------+



In [8]:
val numericOnly = data.drop("protocol_type","service","flag").cache()

numericOnly = [duration: int, src_bytes: int ... 37 more fields]


[duration: int, src_bytes: int ... 37 more fields]

In [12]:
import org.apache.spark.ml.{PipelineModel, Pipeline}
import org.apache.spark.ml.clustering.{KMeans, KMeansModel}
import org.apache.spark.ml.feature.{OneHotEncoder, VectorAssembler, StringIndexer, StandardScaler}
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.util.Random

In [14]:
val assembler = new VectorAssembler().
    setInputCols(numericOnly.columns.filter(_ != "label")).
    setOutputCol("featureVector")

val kmeans = new KMeans().
    setSeed(Random.nextLong()).
    setPredictionCol("cluster").
    setFeaturesCol("featureVector")

val pipeline = new Pipeline().setStages(Array(assembler, kmeans))
val pipelineModel = pipeline.fit(numericOnly)
val kmeansModel = pipelineModel.stages.last.asInstanceOf[KMeansModel]

assembler = vecAssembler_b62752ec8ffe
kmeans = kmeans_2d0c8d1c9d03
pipeline = pipeline_b31ac5416eee
pipelineModel = pipeline_b31ac5416eee
kmeansModel = kmeans_2d0c8d1c9d03


kmeans_2d0c8d1c9d03

In [15]:
kmeansModel.clusterCenters.foreach(println)

[47.979395571029514,1622.078830816566,868.5341828266062,4.453261001578883E-5,0.006432937937735314,1.4169466823205539E-5,0.03451682118132869,1.5181571596291647E-4,0.14824703453301485,0.01021213716043885,1.1133152503947209E-4,3.6435771831099954E-5,0.011351767134933808,0.0010829521072021374,1.0930731549329986E-4,0.0010080563539937655,0.0,0.0,0.0013865835391279706,332.2862475203433,292.9071434354884,0.17668541759442902,0.17660780940042928,0.05743309987449906,0.05771839196793657,0.7915488441768395,0.020981640419422482,0.028996862475204364,232.4707319541719,188.6660459090725,0.7537812031907427,0.030905611108822645,0.6019355289260995,0.006683514837456327,0.1767539573296518,0.17644162179667539,0.05811762681672781,0.05741111695882666]
[2.0,6.9337564E8,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,57.0,3.0,0.79,0.67,0.21,0.33,0.05,0.39,0.0,255.0,3.0,0.01,0.09,0.22,0.0,0.18,0.67,0.05,0.33]


In [16]:
val withCluster = pipelineModel.transform(numericOnly)
withCluster.select("cluster","label").
    groupBy("cluster","label").count().
    orderBy($"cluster",$"count".desc).
    show(25)

+-------+----------------+------+
|cluster|           label| count|
+-------+----------------+------+
|      0|          smurf.|280790|
|      0|        neptune.|107201|
|      0|         normal.| 97278|
|      0|           back.|  2203|
|      0|          satan.|  1589|
|      0|        ipsweep.|  1247|
|      0|      portsweep.|  1039|
|      0|    warezclient.|  1020|
|      0|       teardrop.|   979|
|      0|            pod.|   264|
|      0|           nmap.|   231|
|      0|   guess_passwd.|    53|
|      0|buffer_overflow.|    30|
|      0|           land.|    21|
|      0|    warezmaster.|    20|
|      0|           imap.|    12|
|      0|        rootkit.|    10|
|      0|     loadmodule.|     9|
|      0|      ftp_write.|     8|
|      0|       multihop.|     7|
|      0|            phf.|     4|
|      0|           perl.|     3|
|      0|            spy.|     2|
|      1|      portsweep.|     1|
+-------+----------------+------+



withCluster = [duration: int, src_bytes: int ... 39 more fields]


[duration: int, src_bytes: int ... 39 more fields]

In [20]:
def clusteringScore0(data: DataFrame, k: Int): Double = {
    val assembler = new VectorAssembler().
        setInputCols(data.columns.filter(_ != "label")).
        setOutputCol("featureVector")
    val kmeans = new KMeans().
        setSeed(Random.nextLong()).
        setK(k).
        setPredictionCol("cluster").
        setFeaturesCol("featureVector")
    
    val pipeline = new Pipeline().setStages(Array(assembler, kmeans))
    
    val kmeansModel = pipeline.fit(data).stages.last.asInstanceOf[KMeansModel]
    kmeansModel.computeCost(assembler.transform(data))/data.count()
}

lastException: Throwable = null
clusteringScord0: (data: org.apache.spark.sql.DataFrame, k: Int)Double


In [21]:
(20 to 100 by 20).map(k => (k, clusteringScore0(numericOnly, k))).foreach(println)

(20,5.3363665885195054E7)
(40,2.443378891667313E7)
(60,2.45724894745384E7)
(80,1.0038948916729508E7)
(100,4.282691789256643E7)


In [27]:
def clusteringScore1(data: DataFrame, k: Int): Double = {
    val assembler = new VectorAssembler().
        setInputCols(data.columns.filter(_ != "label")).
        setOutputCol("featureVector")
    val kmeans = new KMeans().
        setSeed(Random.nextLong()).
        setK(k).
        setPredictionCol("cluster").
        setFeaturesCol("featureVector").
        setMaxIter(40).
        setTol(1.0e-5)
    
    val pipeline = new Pipeline().setStages(Array(assembler, kmeans))
    
    val kmeansModel = pipeline.fit(data).stages.last.asInstanceOf[KMeansModel]
    kmeansModel.computeCost(assembler.transform(data))/data.count()
}

lastException: Throwable = null
clusteringScore1: (data: org.apache.spark.sql.DataFrame, k: Int)Double


In [28]:
(20 to 100 by 20).map(k => (k, clusteringScore1(numericOnly, k))).foreach(println)

(20,3.891557673306432E7)
(40,6.988909526868746E7)
(60,1.8769449653698936E7)
(80,3.4060347885287456E7)
(100,3957021.7452403726)


In [39]:
def clusteringScore2(data: DataFrame, k: Int): Double = {
    val assembler = new VectorAssembler().
        setInputCols(data.columns.filter(_ != "label")).
        setOutputCol("featureVector")
    val scaler = new StandardScaler()
        .setInputCol("featureVector")
        .setOutputCol("scaledFeatureVector")
        .setWithStd(true)
        .setWithMean(false)
    val kmeans = new KMeans().
        setSeed(Random.nextLong()).
        setK(k).
        setPredictionCol("cluster").
        setFeaturesCol("scaledFeatureVector").
        setMaxIter(40).
        setTol(1.0e-5)
    
    val pipeline = new Pipeline().setStages(Array(assembler, scaler, kmeans))
    val pipelineModel = pipeline.fit(data)
    
    val kmeansModel = pipelineModel.stages.last.asInstanceOf[KMeansModel]
    kmeansModel.computeCost(pipelineModel.transform(data))/data.count()
}

lastException: Throwable = null
clusteringScore2: (data: org.apache.spark.sql.DataFrame, k: Int)Double


In [40]:
(60 to 270 by 30).map(k => (k, clusteringScore2(numericOnly, k))).foreach(println)

(60,1.2111267779255115)
(90,0.6653698452660044)
(120,0.46878029654614667)
(150,0.36709440581939085)
(180,0.30087886338609443)
(210,0.2606169408364967)
(240,0.2280685080569917)
(270,0.2068320386116904)


In [45]:
def oneHotPipeline(inputCol : String): (Pipeline, String) = {
    val indexer = new StringIndexer().
        setInputCol(inputCol).
        setOutputCol(inputCol + "_indexed")
    val encoder = new OneHotEncoder().
        setInputCol(inputCol + "_indexed").
        setOutputCol(inputCol + "_vec")
    val pipeline = new Pipeline().setStages(Array(indexer, encoder))
    (pipeline, inputCol + "_vec")
}

lastException: Throwable = null
oneHotPipeline: (inputCol: String)(org.apache.spark.ml.Pipeline, String)


In [46]:
def clusteringScore3(data: DataFrame, k: Int): Double = {
    val (protoTypeEncoder, protoTypeVecCol) = oneHotPipeline("protocol_type")
    val (serviceEncoder, serviceVecCol) = oneHotPipeline("service")
    val (flagEncoder, flagVecCol) = oneHotPipeline("flag")
    
    val assemblerCols = Set(data.columns: _*) --
        Seq("label","protocol_type","service","flag")++
        Seq(protoTypeVecCol, serviceVecCol, flagVecCol)
    val assembler = new VectorAssembler().
        setInputCols(assemblerCols.toArray).
        setOutputCol("featureVector")
    val scaler = new StandardScaler()
        .setInputCol("featureVector")
        .setOutputCol("scaledFeatureVector")
        .setWithStd(true)
        .setWithMean(false)
    val kmeans = new KMeans().
        setSeed(Random.nextLong()).
        setK(k).
        setPredictionCol("cluster").
        setFeaturesCol("scaledFeatureVector").
        setMaxIter(40).
        setTol(1.0e-5)
    
    val pipeline = new Pipeline().setStages(
        Array(protoTypeEncoder, serviceEncoder, flagEncoder,assembler, scaler, kmeans))
    val pipelineModel = pipeline.fit(data)
    
    val kmeansModel = pipelineModel.stages.last.asInstanceOf[KMeansModel]
    kmeansModel.computeCost(pipelineModel.transform(data))/data.count()
}

clusteringScore3: (data: org.apache.spark.sql.DataFrame, k: Int)Double


In [47]:
(60 to 270 by 30).map(k => (k, clusteringScore3(data, k))).foreach(println)

(60,32.96466776827036)
(90,14.237032729287357)
(120,3.201837749401058)
(150,2.008937370448537)
(180,1.5282948089474215)
(210,1.19630793311748)
(240,0.9464393979592473)
(270,0.8185019426650948)


In [49]:
def fitPipeline4(data: DataFrame, k: Int): PipelineModel = {
    val (protoTypeEncoder, protoTypeVecCol) = oneHotPipeline("protocol_type")
    val (serviceEncoder, serviceVecCol) = oneHotPipeline("service")
    val (flagEncoder, flagVecCol) = oneHotPipeline("flag")
    
    val assemblerCols = Set(data.columns: _*) --
        Seq("label","protocol_type","service","flag")++
        Seq(protoTypeVecCol, serviceVecCol, flagVecCol)
    val assembler = new VectorAssembler().
        setInputCols(assemblerCols.toArray).
        setOutputCol("featureVector")
    val scaler = new StandardScaler()
        .setInputCol("featureVector")
        .setOutputCol("scaledFeatureVector")
        .setWithStd(true)
        .setWithMean(false)
    val kmeans = new KMeans().
        setSeed(Random.nextLong()).
        setK(k).
        setPredictionCol("cluster").
        setFeaturesCol("scaledFeatureVector").
        setMaxIter(40).
        setTol(1.0e-5) 
    
    val pipeline = new Pipeline().setStages(
        Array(protoTypeEncoder, serviceEncoder, flagEncoder,assembler, scaler, kmeans))
    pipeline.fit(data)
}

fitPipeline4: (data: org.apache.spark.sql.DataFrame, k: Int)org.apache.spark.ml.PipelineModel


In [50]:
val pipelineModel = fitPipeline4(data, 120)
val countByClusterLabel = pipelineModel.transform(data).
    select("cluster","label").
    groupBy("cluster","label").count().
    orderBy("cluster","label")
countByClusterLabel.show()

+-------+----------+------+
|cluster|     label| count|
+-------+----------+------+
|      0|  ipsweep.|     4|
|      0|     nmap.|     1|
|      0|   normal.|   340|
|      0|      pod.|     5|
|      0|portsweep.|     1|
|      0|    smurf.|280787|
|      1|  neptune.|   101|
|      2|  ipsweep.|     1|
|      2|  neptune.|   112|
|      2|   normal.|     3|
|      3|  neptune.|   102|
|      3|portsweep.|     1|
|      3|    satan.|     1|
|      4|  neptune.|   113|
|      4|portsweep.|     2|
|      5|  ipsweep.|    76|
|      5|     nmap.|     3|
|      5|   normal.|   382|
|      5|    satan.|     2|
|      6|  neptune.|   115|
+-------+----------+------+
only showing top 20 rows



pipelineModel = pipeline_c4e612f0807d
countByClusterLabel = [cluster: int, label: string ... 1 more field]


[cluster: int, label: string ... 1 more field]

In [58]:
val kMeansModel = pipelineModel.stages.last.asInstanceOf[KMeansModel]
val centroids = kMeansModel.clusterCenters

val clustered = pipelineModel.transform(data)
val threshold = clustered.
    select("cluster","scaledFeatureVector").as[(Int, Vector)].
    map{ case(cluster, vec) => Vectors.sqdist(centroids(cluster), vec)}.
    orderBy($"value".desc).take(100).last

val originalCols = data.columns
val anomalies = clustered.filter { row =>
    val cluster = row.getAs[Int]("cluster")
    val vec = row.getAs[Vector]("scaledFeatureVector")
    Vectors.sqdist(centroids(cluster), vec) >= threshold
}.select(originalCols.head, originalCols.tail:_*)

println(anomalies.first())

[79,tcp,telnet,SF,281,1301,0,0,0,2,0,1,1,1,0,0,4,2,0,0,0,0,1,1,0.0,0.0,0.0,0.0,1.0,0.0,0.0,1,10,1.0,0.0,1.0,0.3,0.0,0.0,0.0,0.1,loadmodule.]


kMeansModel = kmeans_0a959cd62f0e
centroids = Array([0.0,0.0,2.0222889823054575,0.0,0.0,2.433658221513732,2.4016487867831544,2.3758859206593397,0.0,0.0,1.3193002770647952E-4,0.0,9.458144850048677E-4,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,3.934505073162457,1.2170765964802329E-4,0.0,3.6448876692729094E-5,2.0558942261166684,1.2918784089810097E-9,2.5760614800967034,3.5648142143689766E-4,0.0,0.0,0.0,7.091508801488769E-5,2.0770440220807695,0.0,2.362126925887523,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2.01964580493419,0.0,0.0,0.0,8.881764236910917E-5,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...


lastException: Throwable = null


[[0.0,0.0,2.0222889823054575,0.0,0.0,2.433658221513732,2.4016487867831544,2.3758859206593397,0.0,0.0,1.3193002770647952E-4,0.0,9.458144850048677E-4,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,3.934505073162457,1.2170765964802329E-4,0.0,3.6448876692729094E-5,2.0558942261166684,1.2918784089810097E-9,2.5760614800967034,3.5648142143689766E-4,0.0,0.0,0.0,7.091508801488769E-5,2.0770440220807695,0.0,2.362126925887523,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2.01964580493419,0.0,0.0,0.0,8.881764236910917E-5,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0], [0.0,0.0,0.0,2.0553630063997925,0.0,0.09858063476700704,0.09766496561693887,1.0620642088083503,0.8174091731305806,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2.132505985357395,0.0,2.1308290011650617,0.752762205269101,0.0,0.0,0.0,3.938504933232349,0.8158168785458