In [143]:
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import col,mean,max,min,sum,log

In [2]:
spark = SparkSession.builder.appName("k-means").config("spark.executor.memory", "5g").config("spark.driver.memory", "5g").master("local[*]").getOrCreate()

#### one line of data means a TCP connection, sending several bytes and receiving several bytes.

In [7]:
#schema
rawschema = '''
duration: continuous.
protocol_type: symbolic.
service: symbolic.
flag: symbolic.
src_bytes: continuous.
dst_bytes: continuous.
land: symbolic.
wrong_fragment: continuous.
urgent: continuous.
hot: continuous.
num_failed_logins: continuous.
logged_in: symbolic.
num_compromised: continuous.
root_shell: continuous.
su_attempted: continuous.
num_root: continuous.
num_file_creations: continuous.
num_shells: continuous.
num_access_files: continuous.
num_outbound_cmds: continuous.
is_host_login: symbolic.
is_guest_login: symbolic.
count: continuous.
srv_count: continuous.
serror_rate: continuous.
srv_serror_rate: continuous.
rerror_rate: continuous.
srv_rerror_rate: continuous.
same_srv_rate: continuous.
diff_srv_rate: continuous.
srv_diff_host_rate: continuous.
dst_host_count: continuous.
dst_host_srv_count: continuous.
dst_host_same_srv_rate: continuous.
dst_host_diff_srv_rate: continuous.
dst_host_same_src_port_rate: continuous.
dst_host_srv_diff_host_rate: continuous.
dst_host_serror_rate: continuous.
dst_host_srv_serror_rate: continuous.
dst_host_rerror_rate: continuous.
dst_host_srv_rerror_rate: continuous.
'''

In [52]:
import re
schema = re.sub(r': (\w+.\n)',',',rawschema).strip()

In [53]:
schema = list(schema.split(','))

In [55]:
schema[-1] = 'attack_type'

In [37]:
rawdata = spark.read.csv('./kddcup.data.corrected',inferSchema=True)

In [39]:
rawdata.columns

['_c0',
 '_c1',
 '_c2',
 '_c3',
 '_c4',
 '_c5',
 '_c6',
 '_c7',
 '_c8',
 '_c9',
 '_c10',
 '_c11',
 '_c12',
 '_c13',
 '_c14',
 '_c15',
 '_c16',
 '_c17',
 '_c18',
 '_c19',
 '_c20',
 '_c21',
 '_c22',
 '_c23',
 '_c24',
 '_c25',
 '_c26',
 '_c27',
 '_c28',
 '_c29',
 '_c30',
 '_c31',
 '_c32',
 '_c33',
 '_c34',
 '_c35',
 '_c36',
 '_c37',
 '_c38',
 '_c39',
 '_c40',
 '_c41']

In [56]:
def rename_cols(df,schema):
    for i,column in enumerate(df.columns):
        new_column = schema[i]
        df = df.withColumnRenamed(column, new_column)
    return df

In [57]:
rawdata = rename_cols(rawdata,schema)

In [58]:
rawdata.head()

Row(duration=0, protocol_type='tcp', service='http', flag='SF', src_bytes=215, dst_bytes=45076, land=0, wrong_fragment=0, urgent=0, hot=0, num_failed_logins=0, logged_in=1, num_compromised=0, root_shell=0, su_attempted=0, num_root=0, num_file_creations=0, num_shells=0, num_access_files=0, num_outbound_cmds=0, is_host_login=0, is_guest_login=0, count=1, srv_count=1, serror_rate=0.0, srv_serror_rate=0.0, rerror_rate=0.0, srv_rerror_rate=0.0, same_srv_rate=1.0, diff_srv_rate=0.0, srv_diff_host_rate=0.0, dst_host_count=0, dst_host_srv_count=0, dst_host_same_srv_rate=0.0, dst_host_diff_srv_rate=0.0, dst_host_same_src_port_rate=0.0, dst_host_srv_diff_host_rate=0.0, dst_host_serror_rate=0.0, dst_host_srv_serror_rate=0.0, dst_host_rerror_rate=0.0, dst_host_srv_rerror_rate=0.0, attack_type='normal.')

In [62]:
from pyspark.sql.functions import col

#### Different attacks by their names

In [64]:
rawdata.groupBy('attack_type').count().sort(col('count'),ascending=False).show()

+----------------+-------+
|     attack_type|  count|
+----------------+-------+
|          smurf.|2807886|
|        neptune.|1072017|
|         normal.| 972781|
|          satan.|  15892|
|        ipsweep.|  12481|
|      portsweep.|  10413|
|           nmap.|   2316|
|           back.|   2203|
|    warezclient.|   1020|
|       teardrop.|    979|
|            pod.|    264|
|   guess_passwd.|     53|
|buffer_overflow.|     30|
|           land.|     21|
|    warezmaster.|     20|
|           imap.|     12|
|        rootkit.|     10|
|     loadmodule.|      9|
|      ftp_write.|      8|
|       multihop.|      7|
+----------------+-------+
only showing top 20 rows



#### using k-means only allows numeric type features, so filter the data

In [93]:
numeric = rawdata.select(rawdata.columns[0:1] + rawdata.columns[4:])

In [94]:
from pyspark.ml.clustering import KMeans

In [95]:
kmeans = KMeans(k=2)

#### transform the column into vectors

In [97]:
from pyspark.ml.feature import VectorAssembler

vecAssembler = VectorAssembler(inputCols=numeric.columns[:-1], outputCol="features")
new_df = vecAssembler.transform(numeric)

In [100]:
model = kmeans.fit(new_df)

In [107]:
model.transform(new_df).groupby('attack_type','prediction').count().show()

+----------------+----------+-------+
|     attack_type|prediction|  count|
+----------------+----------+-------+
|          satan.|         0|  15892|
|        neptune.|         0|1072017|
|         normal.|         0| 972781|
|      portsweep.|         1|      1|
|        ipsweep.|         0|  12481|
|    warezmaster.|         0|     20|
|     loadmodule.|         0|      9|
|      ftp_write.|         0|      8|
|       teardrop.|         0|    979|
|          smurf.|         0|2807886|
|            phf.|         0|      4|
|           back.|         0|   2203|
|            spy.|         0|      2|
|           nmap.|         0|   2316|
|        rootkit.|         0|     10|
|           imap.|         0|     12|
|       multihop.|         0|      7|
|      portsweep.|         0|  10412|
|            pod.|         0|    264|
|buffer_overflow.|         0|     30|
+----------------+----------+-------+
only showing top 20 rows



In [108]:
model.computeCost(new_df)

4.663458567026278e+18

#### not ideal result for k-means if k == 2, let's try different k's and evaluate every k's cost

In [111]:
for k in range(5,40,5):
    kmeans = KMeans(k=k)
    model = kmeans.fit(new_df)
    print('cost k == {0}:{1}'.format(k,model.computeCost(new_df)))

cost k == 5:1.3862573792390758e+17
cost k == 10:7309786388759979.0
cost k == 15:3709690471461066.0
cost k == 20:385018586702936.5
cost k == 25:385018586702936.5
cost k == 30:382534667588576.5
cost k == 35:225090325069400.7


In [115]:
model.transform(new_df).groupby('prediction','attack_type').count().sort(col('count'),ascending=False).show() #clearly not good

+----------+----------------+-------+
|prediction|     attack_type|  count|
+----------+----------------+-------+
|         0|          smurf.|2807886|
|         0|        neptune.|1072017|
|         0|         normal.| 971538|
|         0|          satan.|  15892|
|         0|        ipsweep.|  12481|
|         0|      portsweep.|  10402|
|         0|           nmap.|   2316|
|         0|           back.|   2203|
|         0|       teardrop.|    979|
|         0|    warezclient.|    960|
|        10|         normal.|    650|
|         0|            pod.|    264|
|        19|         normal.|    241|
|         9|         normal.|    178|
|        21|         normal.|    144|
|        14|    warezclient.|     59|
|         0|   guess_passwd.|     53|
|         0|buffer_overflow.|     30|
|         0|           land.|     21|
|        11|    warezmaster.|     15|
+----------+----------------+-------+
only showing top 20 rows



#### using entropy as a means to test whether a k is good or not

In [144]:
def entropy(v,n):
    p = v / n
    entropy = -p * log(p)
    return entropy

In [130]:
temp = model.transform(new_df).groupby('prediction','attack_type').count().sort(col('count'),ascending=False)

In [131]:
temp = temp.groupby('prediction').sum('count').sort(col('sum(count)'),ascending=False)

In [133]:
temp.show()

+----------+----------+
|prediction|sum(count)|
+----------+----------+
|         0|   4897098|
|        10|       653|
|        19|       242|
|         9|       179|
|        21|       144|
|        14|        61|
|        11|        29|
|        15|         6|
|        13|         5|
|        12|         2|
|        16|         1|
|        17|         1|
|        20|         1|
|         8|         1|
|         3|         1|
|         5|         1|
|         1|         1|
|         2|         1|
|        18|         1|
|         4|         1|
+----------+----------+
only showing top 20 rows



In [138]:
temp2 = model.transform(new_df).groupby('prediction','attack_type').count()

In [140]:
joined = temp2.join(temp,on='prediction')

In [147]:
from pyspark.sql.functions import udf
spark.udf.register("entropy", entropy)
final = joined.withColumn('entropy',entropy(col('count'),col('sum(count)')))

In [149]:
final.sort(col('entropy'),ascending=False).show()

+----------+------------+-------+----------+--------------------+
|prediction| attack_type|  count|sum(count)|             entropy|
+----------+------------+-------+----------+--------------------+
|        15|  portsweep.|      2|         6|  0.3662040962227032|
|        11|     normal.|     14|        29|  0.3515634139723109|
|        11|warezmaster.|     15|        29| 0.34098911838841234|
|         0|    neptune.|1072017|   4897098|  0.3325442845877533|
|         0|     normal.| 971538|   4897098|   0.320900233709089|
|         0|      smurf.|2807886|   4897098|  0.3189188357605378|
|        15|     normal.|      4|         6|  0.2703100720721096|
|        14|     normal.|      2|        61| 0.11205661257748742|
|        14|warezclient.|     59|        61| 0.03224342288176915|
|         9|warezmaster.|      1|       179|  0.0289798089711774|
|        19|warezclient.|      1|       242|0.022681560851887137|
|         0|      satan.|  15892|   4897098| 0.01859681238407876|
|        1

#### calculate weighted average of entropy

In [151]:
total = final.select(sum('sum(count)'))

In [165]:
total = total.collect()[0][0]

In [171]:
sum_entropy = final.groupby('prediction').sum('entropy','count').withColumn('weighted_average',col('sum(entropy)') * (col('sum(count)'))/total).select(sum('weighted_average'))

In [173]:
sum_entropy.show()

+---------------------+
|sum(weighted_average)|
+---------------------+
|  0.04481029177741765|
+---------------------+



#### test with various k

In [175]:
for k in range(80,150,10):
    kmeans = KMeans(k=k)
    model = kmeans.fit(new_df)
    temp = model.transform(new_df).groupby('prediction','attack_type').count().sort(col('count'),ascending=False)
    temp = temp.groupby('prediction').sum('count').sort(col('sum(count)'),ascending=False)
    temp2 = model.transform(new_df).groupby('prediction','attack_type').count()
    joined = temp2.join(temp,on='prediction')
    final = joined.withColumn('entropy',entropy(col('count'),col('sum(count)')))
    total = final.select(sum('sum(count)'))
    total = total.collect()[0][0]
    sum_entropy = final.groupby('prediction').sum('entropy','count').withColumn('weighted_average',col('sum(entropy)') * (col('sum(count)'))/total).select(sum('weighted_average'))
    sum_entropy = sum_entropy.collect()[0][0]
    print('cost k == {0}:{1}'.format(k,sum_entropy))

cost k == 80:0.04434824524422834
cost k == 90:0.04434538917913914
cost k == 100:0.044246099294903876
cost k == 110:0.044246099294903876
cost k == 120:0.045606471446412555
cost k == 130:0.045406606880109696
cost k == 140:0.045406606880109696


#### k=110 seems good fit

In [176]:
kmeans = KMeans(k=110)
model = kmeans.fit(new_df)

In [180]:
model.transform(new_df).groupby('prediction','attack_type').count().sort(col('count'),ascending=False).show() #clearly not good

+----------+-------------+-------+
|prediction|  attack_type|  count|
+----------+-------------+-------+
|         0|       smurf.|2807886|
|         0|     neptune.|1072017|
|         0|      normal.| 918540|
|        32|      normal.|  51368|
|         0|       satan.|  15891|
|         0|     ipsweep.|  12480|
|         0|   portsweep.|  10402|
|         0|        nmap.|   2316|
|        16|        back.|   2189|
|        16|      normal.|   1378|
|         0|    teardrop.|    979|
|         0| warezclient.|    960|
|        30|      normal.|    686|
|         0|         pod.|    264|
|        18|      normal.|    216|
|        11|      normal.|    141|
|        13|      normal.|     97|
|        79|      normal.|     60|
|         0|guess_passwd.|     53|
|        17|      normal.|     38|
+----------+-------------+-------+
only showing top 20 rows



#### we can start building a network intrusion detection system, when we get a new data point, we try to calculate the most closest centroid, if the distance is above a threshold, then we pinpoint the data point as anomalous