In [23]:
from pyspark import SparkContext
from collections import OrderedDict
from numpy import array
from math import sqrt
from pyspark.mllib.clustering import KMeans
from pyspark.mllib.feature import StandardScaler
from collections import OrderedDict
sc = SparkContext("local[2]", "First Spark App")

In [24]:
data=sc.textFile("kddcup.data")

In [123]:
print ("Counting all different labels")
labels = data.map(lambda line: line.strip().split(",")[-1])
label_counts = labels.countByValue()
sorted_labels = OrderedDict(sorted(label_counts.items(), key=lambda t: t[1], reverse=True))
for label, count in sorted_labels.items():
    print (label, count)

Counting all different labels
smurf. 2807886
neptune. 1072017
normal. 972781
satan. 15892
ipsweep. 12481
portsweep. 10413
nmap. 2316
back. 2203
warezclient. 1020
teardrop. 979
pod. 264
guess_passwd. 53
buffer_overflow. 30
land. 21
warezmaster. 20
imap. 12
rootkit. 10
loadmodule. 9
ftp_write. 8
multihop. 7
phf. 4
perl. 3
spy. 2


In [125]:
def parse_interaction(line):
    """
    Parses a network data interaction.
    """
    line_split = line.split(",")
    clean_line_split = [line_split[0]]+line_split[4:-1]
    return (line_split[-1], array([float(x) for x in clean_line_split]))


def distance(a, b):
    """
    Calculates the euclidean distance between two numeric RDDs
    """
    return sqrt(
        a.zip(b)
        .map(lambda x: (x[0]-x[1]))
        .map(lambda x: x*x)
        .reduce(lambda a,b: a+b)
        )

def dist_to_centroid(datum, clusters):
    """
    Determines the distance of a point to its cluster centroid
    """
    cluster = clusters.predict(datum)
    centroid = clusters.centers[cluster]
    return sqrt(sum([x**2 for x in (centroid - datum)]))

def clustering_score(data, k):
    clusters = KMeans.train(data, k, maxIterations=10, runs=5, initializationMode="random")
    result = (k, clusters, data.map(lambda datum: dist_to_centroid(datum, clusters)).mean())
    print ("Clustering score for k=%(k)d is %(score)f" % {"k": k, "score": result[2]})
    return result

In [127]:
print ("Parsing dataset...")
parsed_data = data.map(parse_interaction)
parsed_data_values = parsed_data.values().cache()

Parsing dataset...


In [130]:
print ("Successfully imported Spark Modules")
print ("Standardizing data...")
standardizer = StandardScaler(True, True)
standardizer_model = standardizer.fit(parsed_data_values)
standardized_data_values = standardizer_model.transform(parsed_data_values)

Successfully imported Spark Modules
Standardizing data...


In [152]:
# Evaluate values of k from 10 to 15
max_k=15
print ("Calculating total in within cluster distance for different k values (10 to %(max_k)d):" % {"max_k": max_k})
scores = map(lambda k: clustering_score(standardized_data_values, k), range(10,max_k+1,1))

Calculating total in within cluster distance for different k values (10 to 15):


In [154]:
# Obtain min score k
min_k = min(scores, key=lambda x: x[2])[0]
print ("Best k value is %(best_k)d" % {"best_k": min_k})



Clustering score for k=10 is 1.035337
Clustering score for k=11 is 1.062859
Clustering score for k=12 is 0.846474
Clustering score for k=13 is 0.938197
Clustering score for k=14 is 0.775179
Clustering score for k=15 is 0.877925
Best k value is 14


In [161]:
scores = map(lambda k: clustering_score(standardized_data_values, k), range(min_k,min_k+1))
print ("Obtaining clustering result sample for k=%(min_k)d..." % {"min_k": min_k})
best_model = min(scores, key=lambda x: x[2])[1]
cluster_assignments_sample = standardized_data_values.map(lambda datum: str(best_model.predict(datum))+","+",".join(map(str,datum))).sample(False,0.05)

Obtaining clustering result sample for k=14...




Clustering score for k=14 is 0.883399


In [167]:
cluster_assignments_sample

PythonRDD[854] at RDD at PythonRDD.scala:48

In [None]:
# Save assignment sample to file
print("Saving sample to file...")
cluster_assignments_sample.saveAsTextFile("sample_standardized")
print("DONE!")

Saving sample to file...
