# Chapter 5: Anomaly Detection in Network Traffic with K-means Clustering
http://kdd.ics.uci.edu/databases/kddcup99/kddcup99.html

In [1]:
import pyspark
from pprint import pprint

sc = pyspark.SparkContext(appName="kmeans")
sc

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
        .master("local").appName("kmeans").getOrCreate()

### A First Take on Clustering

In [3]:
dataWithoutHeader = spark.read.option('inferSchema', 'true') \
                            .option('header', 'false') \
                            .csv('kddcup.data_10_percent_corrected')
# 10% sampling dataser --> 500k rows

In [4]:
data = dataWithoutHeader.toDF(
"duration", "protocol_type", "service", "flag",
"src_bytes", "dst_bytes", "land", "wrong_fragment", "urgent",
"hot", "num_failed_logins", "logged_in", "num_compromised",
"root_shell", "su_attempted", "num_root", "num_file_creations",
"num_shells", "num_access_files", "num_outbound_cmds",
"is_host_login", "is_guest_login", "count", "srv_count",
"serror_rate", "srv_serror_rate", "rerror_rate", "srv_rerror_rate",
"same_srv_rate", "diff_srv_rate", "srv_diff_host_rate",
"dst_host_count", "dst_host_srv_count",
"dst_host_same_srv_rate", "dst_host_diff_srv_rate",
"dst_host_same_src_port_rate", "dst_host_srv_diff_host_rate",
"dst_host_serror_rate", "dst_host_srv_serror_rate",
"dst_host_rerror_rate", "dst_host_srv_rerror_rate",
"label")

In [5]:
data.select("label").groupBy("label").count().orderBy("count", ascending=False).show(25)

+----------------+------+
|           label| count|
+----------------+------+
|          smurf.|280790|
|        neptune.|107201|
|         normal.| 97278|
|           back.|  2203|
|          satan.|  1589|
|        ipsweep.|  1247|
|      portsweep.|  1040|
|    warezclient.|  1020|
|       teardrop.|   979|
|            pod.|   264|
|           nmap.|   231|
|   guess_passwd.|    53|
|buffer_overflow.|    30|
|           land.|    21|
|    warezmaster.|    20|
|           imap.|    12|
|        rootkit.|    10|
|     loadmodule.|     9|
|      ftp_write.|     8|
|       multihop.|     7|
|            phf.|     4|
|           perl.|     3|
|            spy.|     2|
+----------------+------+



In [6]:
from pyspark.ml import Pipeline
from pyspark.ml.clustering import KMeans, KMeansModel
from pyspark.ml.feature import VectorAssembler

In [7]:
# numericOnly = data.drop("protocol_type", "service", "flag").dropna().cache()
numericOnly = data.drop("protocol_type", "service", "flag").cache()

In [8]:
cols = numericOnly.columns
assembler = VectorAssembler() \
    .setInputCols(cols.remove("label"))\
    .setInputCols(["dst_host_rerror_rate", "dst_host_srv_rerror_rate"])\
    .setOutputCol("featureVector")

In [9]:
kmeans = KMeans() \
    .setPredictionCol("cluster") \
    .setFeaturesCol("featureVector")

In [10]:
pipeline = Pipeline().setStages([assembler, kmeans]) # stages가 뭐임 -> transformer, estimator
# pipeline = Pipeline(stages=[assembler, kmeans])
pipelineModel = pipeline.fit(numericOnly)
kmeansModel = pipelineModel.stages[-1]

kmeansModel.clusterCenters() # k=2

[array([0.00190743, 0.00097509]), array([0.9805854 , 0.98359484])]

In [11]:
withCluster = pipelineModel.transform(numericOnly)

In [12]:
withCluster.select("cluster", "label") \
    .groupBy("cluster", "label").count() \
    .orderBy(["cluster", "count"], ascending=[1, 0]) \
    .show(25)

+-------+----------------+------+
|cluster|           label| count|
+-------+----------------+------+
|      0|          smurf.|280790|
|      0|         normal.| 91744|
|      0|        neptune.| 86744|
|      0|           back.|  2191|
|      0|        ipsweep.|  1160|
|      0|    warezclient.|  1020|
|      0|       teardrop.|   979|
|      0|          satan.|   359|
|      0|            pod.|   264|
|      0|           nmap.|   231|
|      0|      portsweep.|    37|
|      0|buffer_overflow.|    30|
|      0|           land.|    21|
|      0|    warezmaster.|    20|
|      0|           imap.|    12|
|      0|        rootkit.|    10|
|      0|     loadmodule.|     9|
|      0|      ftp_write.|     8|
|      0|       multihop.|     7|
|      0|            phf.|     4|
|      0|           perl.|     3|
|      0|   guess_passwd.|     2|
|      0|            spy.|     2|
|      1|        neptune.| 20457|
|      1|         normal.|  5534|
+-------+----------------+------+
only showing t

### Choosing k

In [13]:
import random

def clusteringScore0(data, k): # (data: DataFrame, k: Int): Double 
    cols = data.columns.copy()
    cols.remove("label")

    assembler = VectorAssembler() \
        .setInputCols(cols) \
        .setOutputCol("featureVector")
        
    kmeans = KMeans() \
        .setSeed(random.randint(0,1000)) \
        .setK(k) \
        .setPredictionCol("cluster") \
        .setFeaturesCol("featureVector")
    
    pipeline = Pipeline().setStages([assembler, kmeans])
    kmeansModel = pipeline.fit(data).stages[-1]
    return kmeansModel.computeCost(assembler.transform(data)) / data.count()

In [14]:
scores0 = map(lambda x: (x, clusteringScore0(numericOnly, x)) ,range(20, 101, 20))
list(scores0)

[(20, 33186397.422698718),
 (40, 33483863.119649567),
 (60, 34134989.49093766),
 (80, 15127527.320445474),
 (100, 14357179.118356757)]

In [15]:
def clusteringScore1(data, k): # (data: DataFrame, k: Int): Double 
    cols = data.columns.copy()
    cols.remove("label")

    assembler = VectorAssembler() \
        .setInputCols(cols) \
        .setOutputCol("featureVector")
        
    kmeans = KMeans() \
        .setSeed(random.randint(0,1000)) \
        .setK(k) \
        .setMaxIter(40) \
        .setTol(1.0e-5) \
        .setPredictionCol("cluster") \
        .setFeaturesCol("featureVector")
    
    pipeline = Pipeline().setStages([assembler, kmeans])
    kmeansModel = pipeline.fit(data).stages[-1]
    return kmeansModel.computeCost(assembler.transform(data)) / data.count()

In [17]:
scores1 = map(lambda x: (x, clusteringScore1(numericOnly, x)) ,range(20, 101, 20))
list(scores1)

[(20, 34478047.18040431),
 (40, 34001097.514237694),
 (60, 23406981.74675835),
 (80, 25109958.64245254),
 (100, 3628639.6452119444)]

### Visualization with SparkR
> R code는 내용이 같으므로 생략

### Feature Normalization

In [15]:
from pyspark.ml.feature import StandardScaler

def clusteringScore2(data, k): #def clusteringScore2(data: DataFrame, k: Int): Double = {
    cols = data.columns.copy()
    cols.remove("label")

    assembler = VectorAssembler() \
        .setInputCols(cols) \
        .setOutputCol("featureVector")
    
    scaler = StandardScaler() \
        .setInputCol("featureVector") \
        .setOutputCol("scaledFeatureVector") \
        .setWithStd(True) \
        .setWithMean(False)
        
    kmeans = KMeans() \
        .setSeed(random.randint(0,1000)) \
        .setK(k) \
        .setMaxIter(40) \
        .setTol(1.0e-5) \
        .setPredictionCol("cluster") \
        .setFeaturesCol("featureVector")
    
    pipeline = Pipeline().setStages([assembler, scaler, kmeans])
    kmeansModel = pipeline.fit(data).stages[-1]
    return kmeansModel.computeCost(assembler.transform(data)) / data.count()

In [19]:
scores2 = map(lambda x: (x, clusteringScore2(numericOnly, x)) ,range(60, 271, 30))
list(scores2)

[(60, 3940231.1333903004),
 (90, 34130705.46761614),
 (120, 5750760.496748073),
 (150, 2957923.03802364),
 (180, 1065087.2376975007),
 (210, 1760915.8189446821),
 (240, 9760382.727434369),
 (270, 6260249.827110213)]

### Categorical Variables

In [183]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

def oneHotPipeline(inputCol): # (inputCol: String): (Pipeline, String)
    indexer = StringIndexer(inputCol=inputCol, outputCol=inputCol+"_indexed")   
    encoder = OneHotEncoder(inputCol=inputCol+"_indexed", outputCol=inputCol+"_vec")

    pipeline = Pipeline().setStages([indexer, encoder])
    return (pipeline, inputCol + "_vec")

In [184]:
def make_one_hot_df(data):
    (protoTypeEncoder, protoTypeVecCol) = oneHotPipeline("protocol_type")
    (serviceEncoder, serviceVecCol) = oneHotPipeline("service")
    (flagEncoder, flagVecCol) = oneHotPipeline("flag")
    
    pipeline = Pipeline().setStages([protoTypeEncoder, serviceEncoder, flagEncoder])
    return pipeline.fit(data).transform(data)

In [188]:
def clusteringScore3(data, k): # data: DataFrame, k: Int): Double = {
       
    cols = data.columns.copy()
    for c in ["protocol_type", "service", "flag", "protocol_type_indexed", "service_indexed", "flag_indexed", "label"]:
        cols.remove(c)
    cols.extend(["protocol_type_vec", "service_vec", "flag_vec"])

    assembler = VectorAssembler() \
        .setInputCols(cols) \
        .setOutputCol("featureVector")
    
    scaler = StandardScaler() \
        .setInputCol("featureVector") \
        .setOutputCol("scaledFeatureVector") \
        .setWithStd(True) \
        .setWithMean(False)
        
    kmeans = KMeans() \
        .setSeed(random.randint(0,1000)) \
        .setK(k) \
        .setMaxIter(40) \
        .setTol(1.0e-5) \
        .setPredictionCol("cluster") \
        .setFeaturesCol("featureVector")
    
    pipeline = Pipeline().setStages([assembler, scaler, kmeans])
    kmeansModel = pipeline.fit(data).stages[-1]
    return kmeansModel.computeCost(assembler.transform(data)) / data.count()

In [189]:
data_one_hot = make_one_hot_df(data)
scores3 = map(lambda x: (x, clusteringScore3(data_one_hot, x)), range(60, 271, 30))
list(scores3)

[(60, 26934794.934924006),
 (90, 1356914.6988293286),
 (120, 3270926.8840265777),
 (150, 4101677.2447230327),
 (180, 2953135.322722774),
 (210, 1477482.3868878032),
 (240, 5576328.503406511),
 (270, 1962455.3764015127)]

### Using Labels with Entropy

In [17]:
import math
#  Calc entropy
#  파이썬 map은 제너레이터
def entropy(counts): # (counts: iterable[int]): Double
    values = counts.filter(lambda x: x>0)
    n = values.map(float).sum()
    entropys = values.map(lambda v: calc_each_entropy(v,n))
    return entropys.sum()

def calc_each_entropy(v, n):
    p = v/n
    return -p*math.log(p)

In [87]:
clusterLabel = pipelineModel.transform(data).select(["cluster", "label"])
labels_grouped = clusterLabel.rdd.groupByKey()
labels_size = labels_grouped.mapValues(len).map(lambda x: x[1]).collect()
print(labels_size)

[465647, 28374]


In [88]:
# RDD.map안에서 다시 sc.parallelize 선언시 에러남
labels_count = labels_grouped.map(lambda x: Counter(x[1].data)).collect()
labels_count_list = list(map(lambda x: sc.parallelize(x.values()), labels_count))
weightedClusterEntropy = list(map(lambda x,y: x*entropy(y), labels_size, labels_count_list))
print(weightedClusterEntropy)

[475792.1616625872, 23870.055042170145]


In [89]:
sum(weightedClusterEntropy)/data.count()

1.0114189815913845

In [190]:
def fitPipeline4(data, k): # (data: DataFrame, k: Int): PipelineModel
    
    cols = data.columns.copy()
    for c in ["protocol_type", "service", "flag", "protocol_type_indexed", "service_indexed", "flag_indexed", "label"]:
        cols.remove(c)
    cols.extend(["protocol_type_vec", "service_vec", "flag_vec"])

    assembler = VectorAssembler() \
        .setInputCols(cols) \
        .setOutputCol("featureVector")
    
    scaler = StandardScaler() \
        .setInputCol("featureVector") \
        .setOutputCol("scaledFeatureVector") \
        .setWithStd(True) \
        .setWithMean(False)
        
    kmeans = KMeans() \
        .setSeed(random.randint(0,1000)) \
        .setK(k) \
        .setMaxIter(40) \
        .setTol(1.0e-5) \
        .setPredictionCol("cluster") \
        .setFeaturesCol("featureVector")
    
    pipeline = Pipeline().setStages([assembler, scaler, kmeans])
    return pipeline.fit(data)    

In [197]:
def clusteringScore4(data, k): # (data: DataFrame, k: Int): Double 
    pipelineModel = fitPipeline4(data, k)

    # Predict cluster for each datum
    clusterLabel = pipelineModel.transform(data).select(["cluster", "label"])
    labels_grouped = clusterLabel.rdd.groupByKey()
    labels_size = labels_grouped.mapValues(len).map(lambda x: x[1]).collect()

    # Extract collections of labels, per cluster
    labels_count = labels_grouped.map(lambda x: Counter(x[1].data)).collect()
    labels_count_list = list(map(lambda x: sc.parallelize(x.values()), labels_count))
    weightedClusterEntropy = list(map(lambda x,y: x*entropy(y), labels_size, labels_count_list))


    # Average entropy weighted by cluster size    
    return sum(weightedClusterEntropy)/data.count()

In [192]:
# 매우 오래 걸림 노트북에서 안하는걸 추천
# 혹은 데이터 샘플링 추천
scores4 = map(lambda x: (x, clusteringScore4(data_one_hot, x)), range(60, 271, 30))
list(scores4)

KeyboardInterrupt: 

In [198]:
clusteringScore4(data_one_hot, 20) # 너무 오래거려서 k=20인 경우만 테스트

1.0784599434573783

### Clustering in Action

In [236]:
pipelineModel = fitPipeline4(data_one_hot, 60) # 결과가 모두 다를거고 여기선 k=60이 최적이라는 가정하에 계산
countByClusterLabel = pipelineModel.transform(data_one_hot) \
    .select("cluster", "label") \
    .groupBy("cluster", "label").count() \
    .orderBy(["cluster", "label"])
countByClusterLabel.show(200)

# 전체 데이터를 다 쓰지 않아서인지 결과가 좋지 않습니다

+-------+----------------+------+
|cluster|           label| count|
+-------+----------------+------+
|      0|           back.|     4|
|      0|buffer_overflow.|    21|
|      0|      ftp_write.|     7|
|      0|   guess_passwd.|    53|
|      0|           imap.|    10|
|      0|        ipsweep.|  1247|
|      0|           land.|    21|
|      0|     loadmodule.|     6|
|      0|       multihop.|     3|
|      0|        neptune.|107201|
|      0|           nmap.|   231|
|      0|         normal.| 84249|
|      0|           perl.|     3|
|      0|            pod.|   264|
|      0|      portsweep.|  1039|
|      0|        rootkit.|     7|
|      0|          satan.|  1589|
|      0|          smurf.|280790|
|      0|            spy.|     2|
|      0|       teardrop.|   979|
|      0|    warezclient.|   960|
|      0|    warezmaster.|     3|
|      1|      portsweep.|     1|
|      2|    warezclient.|    59|
|      3|         normal.|     1|
|      3|    warezmaster.|    15|
|      4|     

In [237]:
kMeansModel = pipelineModel.stages[-1]
centroids = kMeansModel.clusterCenters()
clustered = pipelineModel.transform(data_one_hot)

In [310]:
import numpy as np
def sqdist(a,b):  return float(np.sqrt(np.sum((a-b)**2, axis=0)))
    
thresholds = clustered.select("cluster", "scaledFeatureVector").rdd \
    .map(lambda x: sqdist(centroids[x[0]], np.array(x[1]))).collect()
threshold = sorted(thresholds)[50]
threshold

855.0507798357681

In [296]:
samples = clustered.sample(0.01) # 너무 오래걸려서  1%만 샘플링
samples.count()

5032

In [306]:
anomalies = samples.select("cluster", "scaledFeatureVector", "label").rdd \
    .filter(lambda x: sqdist(centroids[x[0]], np.array(x[1])) >= threshold).toDF()

In [309]:
anomalies.select("cluster", "label").groupBy('label') \
    .count().orderBy("count", ascending=False).show()

+----------------+-----+
|           label|count|
+----------------+-----+
|          smurf.| 2856|
|        neptune.| 1097|
|         normal.|  995|
|           back.|   21|
|       teardrop.|   15|
|        ipsweep.|   13|
|      portsweep.|   12|
|          satan.|    9|
|    warezclient.|    7|
|            pod.|    2|
|           nmap.|    2|
|buffer_overflow.|    2|
|   guess_passwd.|    1|
+----------------+-----+

