In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans,KMeansModel
from pyspark.ml import Pipeline
from pyspark.ml.feature import StandardScaler

In [3]:
spark = SparkSession.builder.config("spark.sql.anamolyDet.dir","file:///d:/temp").appName("anamolyDetection").getOrCreate()

In [4]:
data = spark.read.format('csv').load("kddcup.data",inferSchema=True)

In [5]:
data.head

<bound method DataFrame.head of DataFrame[_c0: int, _c1: string, _c2: string, _c3: string, _c4: int, _c5: int, _c6: int, _c7: int, _c8: int, _c9: int, _c10: int, _c11: int, _c12: int, _c13: int, _c14: int, _c15: int, _c16: int, _c17: int, _c18: int, _c19: int, _c20: int, _c21: int, _c22: int, _c23: int, _c24: double, _c25: double, _c26: double, _c27: double, _c28: double, _c29: double, _c30: double, _c31: int, _c32: int, _c33: double, _c34: double, _c35: double, _c36: double, _c37: double, _c38: double, _c39: double, _c40: double, _c41: string]>

In [6]:
dataFrame = data.toDF("duration", "protocol_type", "service", "flag",
"src_bytes", "dst_bytes", "land", "wrong_fragment", "urgent",
"hot", "num_failed_logins", "logged_in", "num_compromised",
"root_shell", "su_attempted", "num_root", "num_file_creations",
"num_shells", "num_access_files", "num_outbound_cmds",
"is_host_login", "is_guest_login", "count", "srv_count",
"serror_rate", "srv_serror_rate", "rerror_rate", "srv_rerror_rate",
"same_srv_rate", "diff_srv_rate", "srv_diff_host_rate",
"dst_host_count", "dst_host_srv_count",
"dst_host_same_srv_rate", "dst_host_diff_srv_rate",
"dst_host_same_src_port_rate", "dst_host_srv_diff_host_rate",
"dst_host_serror_rate", "dst_host_srv_serror_rate",
"dst_host_rerror_rate", "dst_host_srv_rerror_rate",
"label")

In [7]:
dataFrame.select("label").groupBy("label").count().orderBy(col("count").desc()).show(25)

+----------------+-------+
|           label|  count|
+----------------+-------+
|          smurf.|2807886|
|        neptune.|1072017|
|         normal.| 972781|
|          satan.|  15892|
|        ipsweep.|  12481|
|      portsweep.|  10413|
|           nmap.|   2316|
|           back.|   2203|
|    warezclient.|   1020|
|       teardrop.|    979|
|            pod.|    264|
|   guess_passwd.|     53|
|buffer_overflow.|     30|
|           land.|     21|
|    warezmaster.|     20|
|           imap.|     12|
|        rootkit.|     10|
|     loadmodule.|      9|
|      ftp_write.|      8|
|       multihop.|      7|
|            phf.|      4|
|           perl.|      3|
|            spy.|      2|
+----------------+-------+



In [8]:
numericOnly = dataFrame.drop("protocol_type", "service", "flag").cache()

In [9]:
inputCols = numericOnly.columns[:38]
assembler = VectorAssembler(inputCols=inputCols,outputCol="features")

In [10]:
kmeans = KMeans()

In [11]:
pipeline = Pipeline(stages = [assembler,kmeans])
pipeModel = pipeline.fit(numericOnly)

In [12]:
kmeansModel = pipeModel.stages[-1]

In [13]:
print((20,kmeansModel.summary.trainingCost))

(20, 4.663458567021335e+18)


In [14]:
center = kmeansModel.clusterCenters()
for c in center:
    print(c)

[4.83401949e+01 1.83462155e+03 8.26203190e+02 5.71611720e-06
 6.48779303e-04 7.96173468e-06 1.24376586e-02 3.20510858e-05
 1.43529049e-01 8.08830584e-03 6.81851124e-05 3.67464677e-05
 1.29349608e-02 1.18874823e-03 7.43095237e-05 1.02114351e-03
 0.00000000e+00 4.08294086e-07 8.35165553e-04 3.34973508e+02
 2.95267146e+02 1.77970317e-01 1.78036989e-01 5.76648988e-02
 5.77299094e-02 7.89884132e-01 2.11796106e-02 2.82608101e-02
 2.32981078e+02 1.89214283e+02 7.53713390e-01 3.07109788e-02
 6.05051931e-01 6.46410789e-03 1.78091184e-01 1.77885898e-01
 5.79276115e-02 5.76592214e-02]
[1.0999000e+04 0.0000000e+00 1.3099374e+09 0.0000000e+00 0.0000000e+00
 0.0000000e+00 0.0000000e+00 0.0000000e+00 0.0000000e+00 0.0000000e+00
 0.0000000e+00 0.0000000e+00 0.0000000e+00 0.0000000e+00 0.0000000e+00
 0.0000000e+00 0.0000000e+00 0.0000000e+00 0.0000000e+00 1.0000000e+00
 1.0000000e+00 0.0000000e+00 0.0000000e+00 1.0000000e+00 1.0000000e+00
 1.0000000e+00 0.0000000e+00 0.0000000e+00 2.5500000e+02 1.00000

In [15]:
withCluster = pipeModel.transform(numericOnly)

In [16]:
withCluster.select('prediction','label').groupBy('prediction','label').count().orderBy('prediction',col('count').desc()).show(25)

+----------+----------------+-------+
|prediction|           label|  count|
+----------+----------------+-------+
|         0|          smurf.|2807886|
|         0|        neptune.|1072017|
|         0|         normal.| 972781|
|         0|          satan.|  15892|
|         0|        ipsweep.|  12481|
|         0|      portsweep.|  10412|
|         0|           nmap.|   2316|
|         0|           back.|   2203|
|         0|    warezclient.|   1020|
|         0|       teardrop.|    979|
|         0|            pod.|    264|
|         0|   guess_passwd.|     53|
|         0|buffer_overflow.|     30|
|         0|           land.|     21|
|         0|    warezmaster.|     20|
|         0|           imap.|     12|
|         0|        rootkit.|     10|
|         0|     loadmodule.|      9|
|         0|      ftp_write.|      8|
|         0|       multihop.|      7|
|         0|            phf.|      4|
|         0|           perl.|      3|
|         0|            spy.|      2|
|         1|

In [17]:
def meanScore(k,df):
    inputCol = df.columns[:38]
    assembler = VectorAssembler(inputCols=inputCols,outputCol="features")
    kmeans = KMeans().setK(k).setMaxIter(40).setTol(1.0e-5)
    pipeModel2 = Pipeline(stages=[assembler,kmeans])
    kmeansModel = pipeModel2.fit(df).stages[1]
    out = float(kmeansModel.summary.trainingCost)/float(df.count())
    return(out)
    

In [18]:
def scaledScore(k,df):
    inputCol = df.columns[:38]
    assembler = VectorAssembler(inputCols=inputCols,outputCol="featureVector")
    scaler = StandardScaler().setInputCol("featureVector").setOutputCol("features")
    kmeans = KMeans().setK(k).setMaxIter(40).setTol(1.0e-5)
    pipeModel3 = Pipeline(stages=[assembler,scaler,kmeans])
    kmeansModel = pipeModel3.fit(df).stages[-1]
    out = float(kmeansModel.summary.trainingCost)/float(df.count())
    return(out)

In [19]:
for k in range(60,271,30):
    sc = scaledScore(k,numericOnly)
    print((k,sc))

(60, 1.1122258658942799)
(90, 0.7453323919680306)
(120, 0.5041179085729937)
(150, 0.4184469605999362)
(180, 0.33656674592411784)
(210, 0.2949430578830955)
(240, 0.24827669334114305)
(270, 0.21641754295220192)


In [23]:
assembler = VectorAssembler(inputCols=inputCols,outputCol="featureVector")
scaler = StandardScaler().setInputCol("featureVector").setOutputCol("features")
kmeansFinal = KMeans().setK(80)
pipeFinal = Pipeline(stages=[assembler,scaler,kmeansFinal])

In [24]:
model = pipeFinal.fit(numericOnly)
withCluster = model.transform(numericOnly)
withCluster.select('prediction','label').groupBy('prediction','label').count().orderBy('prediction',col('count').desc()).show(25)

+----------+----------------+-------+
|prediction|           label|  count|
+----------+----------------+-------+
|         0|          smurf.|2805819|
|         0|         normal.|    335|
|         1|         normal.|   2133|
|         1|    warezclient.|     19|
|         1|      ftp_write.|      2|
|         1|       multihop.|      2|
|         1|   guess_passwd.|      1|
|         1|    warezmaster.|      1|
|         2|        neptune.| 103897|
|         2|      portsweep.|    644|
|         2|         normal.|    378|
|         2|          satan.|      3|
|         3|         normal.|      2|
|         4|         normal.|   7802|
|         4|    warezclient.|      7|
|         4|      portsweep.|      3|
|         5|          satan.|  13983|
|         5|      portsweep.|   1742|
|         5|         normal.|      3|
|         6|         normal.|      1|
|         7|         normal.|    220|
|         7|buffer_overflow.|     18|
|         7|            phf.|      4|
|         7|