In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!rm -rf spark-2.3.2-bin-hadoop2.7*
!wget -q http://apache.osuosl.org/spark/spark-2.3.2/spark-2.3.2-bin-hadoop2.7.tgz
!tar xf spark-2.3.2-bin-hadoop2.7.tgz
!pip install -q pyspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.3.2-bin-hadoop2.7"

In [2]:
!rm -rf kddcup*
!wget -q http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz
!gzip -d kddcup.data_10_percent.gz
!ls kddcup*

kddcup.data_10_percent


In [0]:
from pprint import pprint

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Ch05").getOrCreate()
sc = spark.sparkContext

In [0]:
dataWithoutHeader = spark.read.option('inferSchema', 'true') \
                            .option('header', 'false') \
                            .csv('kddcup.data_10_percent')

In [0]:
data = dataWithoutHeader.toDF(
"duration", "protocol_type", "service", "flag",
"src_bytes", "dst_bytes", "land", "wrong_fragment", "urgent",
"hot", "num_failed_logins", "logged_in", "num_compromised",
"root_shell", "su_attempted", "num_root", "num_file_creations",
"num_shells", "num_access_files", "num_outbound_cmds",
"is_host_login", "is_guest_login", "count", "srv_count",
"serror_rate", "srv_serror_rate", "rerror_rate", "srv_rerror_rate",
"same_srv_rate", "diff_srv_rate", "srv_diff_host_rate",
"dst_host_count", "dst_host_srv_count",
"dst_host_same_srv_rate", "dst_host_diff_srv_rate",
"dst_host_same_src_port_rate", "dst_host_srv_diff_host_rate",
"dst_host_serror_rate", "dst_host_srv_serror_rate",
"dst_host_rerror_rate", "dst_host_srv_rerror_rate",
"label")

In [0]:
data = data.sample(0.1)

In [0]:
# numericOnly = data.drop("protocol_type", "service", "flag").dropna().cache()
numericOnly = data.drop("protocol_type", "service", "flag").cache()

In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler, StandardScaler

In [0]:
k = 100

In [0]:
inputCols = list(numericOnly.columns)
inputCols.remove("label")

assembler = VectorAssembler() \
    .setInputCols(inputCols) \
    .setOutputCol("featureVector")

scaler = StandardScaler() \
    .setInputCol("featureVector") \
    .setOutputCol("scaledFeatureVector") \
    .setWithStd(True) \
    .setWithMean(False)

kmeans = KMeans() \
    .setSeed(42) \
    .setK(k) \
    .setMaxIter(40) \
    .setTol(1.0e-5) \
    .setPredictionCol("cluster") \
    .setFeaturesCol("scaledFeatureVector")

pipeline = Pipeline().setStages([assembler, scaler, kmeans])
pipelineModel = pipeline.fit(numericOnly)
kmeansModel = pipelineModel.stages[-1]


In [0]:
kMeansModel = pipelineModel.stages[-1]
centroids = kMeansModel.clusterCenters()
clustered = pipelineModel.transform(data)

In [17]:
import numpy as np
def sqdist(a,b):
    return float(np.sqrt(np.sum((a-b)**2, axis=0)))
    
thresholds = clustered.select("cluster", "scaledFeatureVector").rdd \
    .map(lambda x: sqdist(centroids[x[0]], np.array(x[1])))\
    .sortBy(lambda x: x, ascending=False)\
    .take(100)
threshold = thresholds[-1]
threshold

5.698832099142769

In [18]:
anomalies = clustered.select("cluster", "scaledFeatureVector", "label").rdd \
    .filter(lambda x: sqdist(centroids[x[0]], np.array(x[1])) >= threshold).toDF()
anomalies.select("cluster", "label").groupBy('label') \
    .count().orderBy("count", ascending=False).show()

+------------+-----+
|       label|count|
+------------+-----+
|     normal.|   50|
|      satan.|   21|
|        pod.|   14|
|       nmap.|    6|
|  portsweep.|    6|
|    ipsweep.|    1|
|warezclient.|    1|
|       land.|    1|
|    neptune.|    1|
+------------+-----+

