## __Chapter 5. KMeans clustering으로 네트워크 이상 탐지하기__

* Data Loading (490만 개의 연결, 38개의 수치형 특징)

In [1]:
# !wget http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data.gz
# !gzip -d kddcup.data.gz
# !wget http://kdd.ics.uci.edu/databases/kddcup99/kddcup.names

In [2]:
with open('kddcup.names') as f :
    col_info = f.read()

In [3]:
print(col_info[:250])

back,buffer_overflow,ftp_write,guess_passwd,imap,ipsweep,land,loadmodule,multihop,neptune,nmap,normal,perl,phf,pod,portsweep,rootkit,satan,smurf,spy,teardrop,warezclient,warezmaster.
duration: continuous.
protocol_type: symbolic.
service: symbolic.
f


In [4]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

datapath = './'
DRIVER_MEMORY = "8g"

spark = SparkSession.builder.appName('ch05')\
    .master('local[*]')\
    .config('spark.sql.warehouse.dir', datapath)\
    .config("spark.driver.memory", DRIVER_MEMORY)\
    .getOrCreate()

In [5]:
data = spark.read\
    .option('inferSchema', True)\
    .option('header', False)\
    .csv('kddcup.data')\
    .toDF(
        "duration", "protocol_type", "service", "flag", "src_bytes", "dst_bytes", "land", "wrong_fragment", "urgent",
        "hot", "num_failed_logins", "logged_in", "num_compromised", "root_shell", "su_attempted", "num_root", "num_file_creations",
        "num_shells", "num_access_files", "num_outbound_cmds", "is_host_login", "is_guest_login", "count", "srv_count",
        "serror_rate", "srv_serror_rate", "rerror_rate", "srv_rerror_rate",
        "same_srv_rate", "diff_srv_rate", "srv_diff_host_rate", "dst_host_count", "dst_host_srv_count",
        "dst_host_same_srv_rate", "dst_host_diff_srv_rate", "dst_host_same_src_port_rate", "dst_host_srv_diff_host_rate",
        "dst_host_serror_rate", "dst_host_srv_serror_rate", "dst_host_rerror_rate", "dst_host_srv_rerror_rate",
        "label")

In [6]:
print((data.count(), len(data.columns)))

(4898431, 42)


* EDA : 기존 label의 분포 확인

In [7]:
# https://stackoverflow.com/questions/52283751/calculating-percentage-of-total-count-for-groupby-using-pyspark/52285108
import pyspark.sql.functions as func
data.groupBy('label').agg(
    (F.count('label')).alias('count'),
    func.round((F.count('label') / data.count()), 3).alias('percentage')
).orderBy(F.col('count').desc()).show(n=5)

+--------+-------+----------+
|   label|  count|percentage|
+--------+-------+----------+
|  smurf.|2807886|     0.573|
|neptune.|1072017|     0.219|
| normal.| 972781|     0.199|
|  satan.|  15892|     0.003|
|ipsweep.|  12481|     0.003|
+--------+-------+----------+
only showing top 5 rows



### __5.5. 첫 번째 군집화하기__

In [8]:
from pyspark.ml import Pipeline
from pyspark.ml.clustering import KMeans, KMeansModel
from pyspark.ml.feature import VectorAssembler

In [9]:
# Remove categorical data
numericOnly = data.drop('protocol_type', 'service', 'flag')
print(len(data.columns), '->', len(numericOnly.columns))
numericOnly.cache()

42 -> 39


DataFrame[duration: int, src_bytes: int, dst_bytes: int, land: int, wrong_fragment: int, urgent: int, hot: int, num_failed_logins: int, logged_in: int, num_compromised: int, root_shell: int, su_attempted: int, num_root: int, num_file_creations: int, num_shells: int, num_access_files: int, num_outbound_cmds: int, is_host_login: int, is_guest_login: int, count: int, srv_count: int, serror_rate: double, srv_serror_rate: double, rerror_rate: double, srv_rerror_rate: double, same_srv_rate: double, diff_srv_rate: double, srv_diff_host_rate: double, dst_host_count: int, dst_host_srv_count: int, dst_host_same_srv_rate: double, dst_host_diff_srv_rate: double, dst_host_same_src_port_rate: double, dst_host_srv_diff_host_rate: double, dst_host_serror_rate: double, dst_host_srv_serror_rate: double, dst_host_rerror_rate: double, dst_host_srv_rerror_rate: double, label: string]

In [10]:
%%time
assembler = VectorAssembler()\
    .setInputCols([c for c in numericOnly.columns if c != 'label'])\
    .setOutputCol('featureVector')

kmeans = KMeans()\
    .setPredictionCol('cluster')\
    .setFeaturesCol('featureVector')

pipeline = Pipeline().setStages([assembler, kmeans])
pipelineModel = pipeline.fit(numericOnly)

CPU times: user 14.2 ms, sys: 46.1 ms, total: 60.3 ms
Wall time: 56.6 s


In [11]:
kmeansModel = pipelineModel.stages[-1]
type(kmeansModel)

pyspark.ml.clustering.KMeansModel

In [12]:
print(type(kmeansModel.clusterCenters()))

<class 'list'>


In [13]:
# k=2로 군집화되었다는 뜻
import numpy as np
import sys
np.set_printoptions(threshold=sys.maxsize)

for i in kmeansModel.clusterCenters() :
    print(i)

[4.83401949e+01 1.83462155e+03 8.26203190e+02 5.71611720e-06
 6.48779303e-04 7.96173468e-06 1.24376586e-02 3.20510858e-05
 1.43529049e-01 8.08830584e-03 6.81851124e-05 3.67464677e-05
 1.29349608e-02 1.18874823e-03 7.43095237e-05 1.02114351e-03
 0.00000000e+00 4.08294086e-07 8.35165553e-04 3.34973508e+02
 2.95267146e+02 1.77970317e-01 1.78036989e-01 5.76648988e-02
 5.77299094e-02 7.89884132e-01 2.11796106e-02 2.82608101e-02
 2.32981078e+02 1.89214283e+02 7.53713390e-01 3.07109788e-02
 6.05051931e-01 6.46410789e-03 1.78091184e-01 1.77885898e-01
 5.79276115e-02 5.76592214e-02]
[1.0999000e+04 0.0000000e+00 1.3099374e+09 0.0000000e+00 0.0000000e+00
 0.0000000e+00 0.0000000e+00 0.0000000e+00 0.0000000e+00 0.0000000e+00
 0.0000000e+00 0.0000000e+00 0.0000000e+00 0.0000000e+00 0.0000000e+00
 0.0000000e+00 0.0000000e+00 0.0000000e+00 0.0000000e+00 1.0000000e+00
 1.0000000e+00 0.0000000e+00 0.0000000e+00 1.0000000e+00 1.0000000e+00
 1.0000000e+00 0.0000000e+00 0.0000000e+00 2.5500000e+02 1.00000

* 군집화 결과를 직관적으로 이해하기 위해, 각 군집이 포함하는 레이블을 집계해보는 것이 좋겠다.

In [14]:
withCluster = pipelineModel.transform(numericOnly)

In [15]:
withCluster\
    .select('cluster', 'label')\
    .groupBy('cluster', 'label')\
    .count()\
    .orderBy(F.col('cluster'), F.col('label').desc())\
    .show(5)

+-------+------------+-------+
|cluster|       label|  count|
+-------+------------+-------+
|      0|warezmaster.|     20|
|      0|warezclient.|   1020|
|      0|   teardrop.|    979|
|      0|        spy.|      2|
|      0|      smurf.|2807886|
+-------+------------+-------+
only showing top 5 rows



In [16]:
from pyspark.sql.functions import countDistinct

withCluster\
    .select('cluster', 'label')\
    .groupBy('cluster')\
    .agg(countDistinct("label")).show()
# --> 군집1에는 단 하나의 레이블만이 존재한다.

+-------+------------+
|cluster|count(label)|
+-------+------------+
|      1|           1|
|      0|          23|
+-------+------------+



In [17]:
withCluster.filter('cluster==1').count()

1

--> 군집1에 단 한 건의 데이터만 존재함을 확인함. 의미있는 군집이 형성되지 않았음.  
  
    

### __5.6. K 선정하기__

* 각 데이터 포인트가, 가장 가까운 군집의 중심에 가까울수록 잘 군집화되었다.  
    --> 유클리드 거리 기준. KMeansModel.computeCost 메서드가 제곱 거리의 평균을 구해줌.  
    --> 그러나, spark3.0.1에서 해당 메서드는 Deprecated됨.  

(As-is) KMeansModel.computeCost
Note Deprecated in 3.0.0. It will be removed in future versions.   
Use ClusteringEvaluator instead. You can also get the cost on the training dataset in the summary.

https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.evaluation.ClusteringEvaluator    
    
class pyspark.ml.evaluation.ClusteringEvaluator( predictionCol='prediction' , featuresCol='features' , metricName='silhouette' , distanceMeasure='squaredEuclidean')

_____
#### ※ Silhouette value

Silhouette은 클러스터의 유효성을 확인하는 척도입니다.  
값의 범위는 -1 $\leq$ Silhouette(i) $\leq$ 1 이며,  
1에 가까운 값은, 클러스터의 포인트가 같은 클러스터의 다른 포인트에 가깝고, 다른 클러스터의 포인트에서 멀리 떨어져 있음을 의미합니다.  
보통 실루엣 지표가 0.5보다 크면 군집 결과가 타당한 것으로 평가하게 된다고 합니다.

We now define a silhouette (value) of **one data point** *i*

![Silhouette](https://wikimedia.org/api/rest_v1/media/math/render/svg/3d80ab22fb291b347b2d9dc3cc7cd614f6b15479)

Silhouette coefficient : for the maximum value of __the mean *s(i)*__ over all data of the entire dataset.

![Silhouette_coefficient](https://wikimedia.org/api/rest_v1/media/math/render/svg/cf1bf4fc02c4dde69ad981bba92e1829f92dd648)  
____

In [18]:
from pyspark.sql import DataFrame
from pyspark.ml.evaluation import ClusteringEvaluator

def clusteringScore0(data, k) :
    assembler = VectorAssembler()\
        .setInputCols([c for c in data.columns if c != 'label'])\
        .setOutputCol('featureVector')

    kmeans = KMeans()\
        .setSeed(123)\
        .setK(k)\
        .setPredictionCol('cluster')\
        .setFeaturesCol('featureVector')

    pipeline = Pipeline().setStages([assembler, kmeans])
    kmeansModel = pipeline.fit(data).stages[-1]
    
    predictions = pipeline.fit(data).transform(data)
    centers = kmeansModel.clusterCenters()
    
    evaluator = ClusteringEvaluator(predictionCol='cluster', featuresCol='featureVector')
    silhouette = evaluator.evaluate(predictions)
    
    return predictions, centers, silhouette

In [19]:
%%time

all_preds = []
print("[Step0] Silhouette with squared euclidean distance :: ")

for k in range(20, 100+1, 20) :
    predictions, centers, silhouette = clusteringScore0(numericOnly, k)
    all_preds.append((k, predictions))
    print('\t', (k, silhouette))

[Step0] Silhouette with squared euclidean distance :: 
	 (20, 0.9999387403931099)
	 (40, -0.6072042735190857)
	 (60, -0.5609990691653991)
	 (80, -0.09664689300403269)
	 (100, -0.03644915104694833)
CPU times: user 476 ms, sys: 319 ms, total: 795 ms
Wall time: 12min 2s


### 5.7. 시각화하기

In [20]:
pred20 = all_preds[0][1]
pred20.select('cluster').groupBy('cluster').count().orderBy(F.col('count').desc()).toPandas().T

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17
cluster,17,0,10,11,12,9,13,15,1,6,16,3,5,4,8,7,14,2
count,4897898,244,203,63,4,4,3,2,1,1,1,1,1,1,1,1,1,1


In [21]:
pred100 = all_preds[-1][1]
pred100.select('cluster').groupBy('cluster').count().orderBy(F.col('count').desc()).toPandas().T

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,61,62,63,64,65,66,67,68,69,70
cluster,29,16,68,65,11,32,67,58,0,31,...,25,70,62,56,33,14,42,2,30,18
count,2375696,1707725,527953,183853,67885,18816,10272,3564,1462,490,...,1,1,1,1,1,1,1,1,1,1


(100개 군집으로 나눴을 때) 개별 군집에 가장 많은 레이블은 무엇일까?   
    --> cluster29: smurf,  cluster16: neptune,   cluster65: normal     
    --> smurf, neptune은 비교적 군집 특징이 뚜렷한가봄.   

In [22]:
# 29번 군집에는 smurf가 제일 많다.
sample100_cluster29 = pred100.filter('cluster==29')
sample100_cluster29.select('label').groupBy('label').count().orderBy(F.col('count').desc()).toPandas().T

Unnamed: 0,0,1,2,3,4,5,6,7,8,9
label,smurf.,normal.,pod.,warezclient.,satan.,back.,portsweep.,imap.,multihop.,neptune.
count,2280163,95193,259,63,7,6,2,1,1,1


In [23]:
# 16번 군집에는 neptune이 제일 많다. 
# EDA때 neptune이 약 107만건 이었던걸 감안하면, 16번 군집은 Neptune을 잘 군집화했다고 볼 수 있을 것 같다.
sample100_cluster16 = pred100.filter('cluster==16')
sample100_cluster16.select('label').groupBy('label').count().orderBy(F.col('count').desc()).toPandas().T

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19
label,neptune.,normal.,satan.,ipsweep.,portsweep.,nmap.,teardrop.,warezclient.,smurf.,guess_passwd.,land.,imap.,ftp_write.,rootkit.,pod.,loadmodule.,warezmaster.,buffer_overflow.,multihop.,spy.
count,1072016,593710,15883,12480,9517,2316,979,590,122,52,21,9,6,6,5,5,2,2,2,2


In [24]:
# 65번 군집에는 normal이 제일 많다. 하지만 29, 16번 군집에도 그 수가 많다...
sample100_cluster65 = pred100.filter('cluster==65')
sample100_cluster65.select('label').groupBy('label').count().orderBy(F.col('count').desc()).toPandas().T

Unnamed: 0,0,1,2,3,4,5,6,7,8,9
label,normal.,warezclient.,buffer_overflow.,loadmodule.,perl.,warezmaster.,rootkit.,guess_passwd.,satan.,ftp_write.
count,183529,285,26,3,3,2,2,1,1,1


In [33]:
pred100.columns[-5:]

['dst_host_rerror_rate',
 'dst_host_srv_rerror_rate',
 'label',
 'featureVector',
 'cluster']

In [114]:
from pyspark.ml.feature import PCA
df_pca = PCA()\
    .setInputCol("featureVector")\
    .setOutputCol("pcaFeatures")\
    .setK(2)\
    .fit(pred100)\
    .transform(pred100)\
    .select('pcaFeatures', 'label', 'cluster')
print((df_pca.count(), len(df_pca.columns)))
print(df_pca.columns)
df_pca.cache()

(4898431, 3)
['pcaFeatures', 'label', 'cluster']


DataFrame[pcaFeatures: vector, label: string, cluster: int]

In [115]:
# PCA결과로 얻은 DenseVector를 해체함.
for i in range(2) :
    to_array = F.udf(lambda v: v.toArray().tolist()[i])
    new_colname = 'v{}'.format(i+1)
    df_pca = df_pca.withColumn(new_colname, to_array(F.col('pcaFeatures')))
df_pca.columns

['pcaFeatures', 'label', 'cluster', 'v1', 'v2']

In [116]:
pdf_pca = df_pca.toPandas()
pdf_pca.shape

(4898431, 5)

In [None]:
%matplotlib inline
%time
pdf_pca.plot(kind='scatter', x='v1', y='v2', c='cluster', colormap='viridis')

In [None]:
colors = ["#7fc97f", "#beaed4", "#fdc086", "#ffff99", "#386cb0", "#f0027f"]

In [None]:
ax.scatter(finalDf.loc[indicesToKeep, 'principal component 1']
               , finalDf.loc[indicesToKeep, 'principal component 2']
               , c = color
               , s = 50)

In [None]:
for label, color in zip(labels, colors):
    indicesToKeep = finalDataFrame['label'] == label
    ax.scatter(finalDataFrame.loc[indicesToKeep, 'principal component 1']
               , finalDataFrame.loc[indicesToKeep, 'principal component 2']
               , c = color
               , s = 30)

ax.legend(labels)

In [None]:
# from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder, StandardScaler, MinMaxScaler, PCA
from pyspark.ml.feature import PCA

def pca_figure(pred_df, input_col, k):
    pca = PCA(k=k, inputCol=input_col, outputCol="features_pca")
    pca_model = pca.fit(pred_df)
    pcaDf = pca_model.transform(predDf).select("features_pca", "prediction")
    
    pca_df = pcaDf.toPandas()
    
    if k == 2:
        dataX = []
        dataY = []
        for vec in pca_df.values:
            dataX.extend([vec[0][0]])
            dataY.extend([vec[0][1]])
        sns.scatterplot(dataX, dataY, hue=pca_df['prediction'])
        plt.show()
    
    elif k == 3:
        dataX = []
        dataY = []
        dataZ = []
        for vec in pca_df.values:
            dataX.extend([vec[0][0]])
            dataY.extend([vec[0][1]])
            dataZ.extend([vec[0][2]])

        fig = plt.figure(figsize=(10, 5))
        ax = fig.add_subplot(111, projection='3d') # Axe3D object
        p = ax.scatter(dataX, dataY, dataZ, c=pca_df['prediction'], s=20, alpha=0.5)
        fig.colorbar(p)
        plt.show()
    else:
        pass

### 5.8. 정규화 (StandardScaler)

In [25]:
from pyspark.ml.feature import StandardScaler

def clusteringScore2(data, k) :
    assembler = VectorAssembler()\
        .setInputCols([c for c in data.columns if c != 'label'])\
        .setOutputCol('featureVector')
    
    scaler = StandardScaler()\
        .setInputCol('featureVector')\
        .setOutputCol('scaledFeatureVector')\
        .setWithStd(True)\
        .setWithMean(False)

    kmeans = KMeans()\
        .setSeed(123)\
        .setK(k)\
        .setPredictionCol('cluster')\
        .setFeaturesCol('scaledFeatureVector')\
        .setMaxIter(40)\
        .setTol(1.0e-5)

    pipeline = Pipeline().setStages([assembler, scaler, kmeans])
    predictions = pipeline.fit(data).transform(data)
    
    # 전체 거리 제곱의 평균(비용)을 구한다.
    # return kmeansModel.computeCost(assembler.transform(data)) / data.count()
    evaluator = ClusteringEvaluator(predictionCol='cluster', featuresCol='featureVector')
    silhouette = evaluator.evaluate(predictions)
    return silhouette

In [26]:
%%time
print("Silhouette with squared euclidean distance :: ")
for k in range(20, 101, 20) :
    print((k, clusteringScore2(numericOnly, k)))

Silhouette with squared euclidean distance :: 
(20, 0.10287107799394214)
(40, 0.2877090047241465)
(60, 0.29655580360408745)
(80, 0.3051650244355804)
(100, 0.3382389268645949)
CPU times: user 523 ms, sys: 341 ms, total: 864 ms
Wall time: 13min 30s


### 5.9. 범주형 변수 One-Hot-Encoding

먼저 StringIndexer를 사용해 정수 인덱스로 변환하고,  다시 정수 인덱스를 OHE 벡터로 인코딩한다.

In [27]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

def oneHotPipeline(inputColname) :
    indexer = StringIndexer()\
        .setInputCol(inputColname)\
        .setOutputCol(inputColname + "_indexed")
    encoder = OneHotEncoder()\
        .setInputCol(inputColname + "_indexed")\
        .setOutputCol(inputColname + "_vec")
    pipeline = Pipeline().setStages([indexer, encoder])
    return pipeline, inputColname + "_vec"
    

def clusteringScore3(data, k) :
    protoTypeEncoder, protoTypeVecCol = oneHotPipeline("protocol_type")
    serviceEncoder, serviceVecCol = oneHotPipeline("service")
    flagEncoder, flagVecCol = oneHotPipeline("flag")

    assembleCols = set(data.columns)\
        - set(["label", "protocol_type", "service", "flag"])\
        .union(set([protoTypeVecCol, serviceVecCol, flagVecCol]))
    
    assembler = VectorAssembler()\
        .setInputCols(list(assembleCols))\
        .setOutputCol('featureVector')
    
    scaler = StandardScaler()\
        .setInputCol('featureVector')\
        .setOutputCol('scaledFeatureVector')\
        .setWithStd(True)\
        .setWithMean(False)

    kmeans = KMeans()\
        .setSeed(123)\
        .setK(k)\
        .setPredictionCol('cluster')\
        .setFeaturesCol('scaledFeatureVector')\
        .setMaxIter(40)\
        .setTol(1.0e-5)

    pipeline = Pipeline().setStages([assembler, scaler, kmeans])
    predictions = pipeline.fit(data).transform(data)
    
    # 전체 거리 제곱의 평균(비용)을 구한다.
    # return kmeansModel.computeCost(assembler.transform(data)) / data.count()
    evaluator = ClusteringEvaluator(predictionCol='cluster', featuresCol='featureVector')
    silhouette = evaluator.evaluate(predictions)
    return silhouette

In [28]:
%%time
print("Silhouette with squared euclidean distance :: ")
for k in range(60, 200, 40) :
    print((k, clusteringScore3(numericOnly, k)))

Silhouette with squared euclidean distance :: 
(60, 0.2965558036040874)
(100, 0.33823892686459495)
(140, 0.1361259737154779)
(180, 0.18680049117949526)
CPU times: user 489 ms, sys: 273 ms, total: 762 ms
Wall time: 17min 6s


### 5.10. 엔트로피와 함께 레이블 활용하기

In [29]:
import math
def entropy(counts) :
    values = [c for c in counts if c > 0]
    n = sum([float(c) for c in counts if c > 0])
    values.map(lambda x : -math.log(p))

In [30]:
clusterLabel = 

SyntaxError: invalid syntax (<ipython-input-30-af9e326e4577>, line 1)