In [11]:
from pyspark import SparkContext, SparkConf
import pandas as pd
import time
# Initializing Spark
conf=SparkConf().setAppName("KDDCup_Analytics").setMaster("local[*]")
sc=SparkContext(conf=conf)

In [1]:
# How many cores in the current machine?
sc.defaultParallelism

4

In [2]:
# importing data into the environment using its URL
# Source: http://kdd.ics.uci.edu/databases/kddcup99/
import urllib.request
urllib.request.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data.gz", "kddcup.data.gz")

('kddcup.data.gz', <http.client.HTTPMessage at 0x7fa1bc7b5fd0>)

In [3]:
# Read file into RDD
KDDcup_rdd = sc.textFile("./kddcup.data.gz", 8)

In [4]:
# Take 5 samples to understand date
KDDcup_rdd.take(5)

['0,tcp,http,SF,215,45076,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,1,1,0.00,0.00,0.00,0.00,1.00,0.00,0.00,0,0,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,162,4528,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,2,2,0.00,0.00,0.00,0.00,1.00,0.00,0.00,1,1,1.00,0.00,1.00,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,236,1228,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,1,1,0.00,0.00,0.00,0.00,1.00,0.00,0.00,2,2,1.00,0.00,0.50,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,233,2032,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,2,2,0.00,0.00,0.00,0.00,1.00,0.00,0.00,3,3,1.00,0.00,0.33,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,239,486,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,3,3,0.00,0.00,0.00,0.00,1.00,0.00,0.00,4,4,1.00,0.00,0.25,0.00,0.00,0.00,0.00,0.00,normal.']

In [5]:
# Count the number of records
KDDcup_rdd.count()

4898431

In [5]:
# Count the number of "normal" connections
Normal_rdd = KDDcup_rdd.filter(lambda line: 'normal.' in line)
Normal_rdd.count()

972781

In [6]:
# List the name of labels
Split_rdd = KDDcup_rdd.map(lambda line: line.split(","))
Label_rdd = Split_rdd.map(lambda item: item[-1]).distinct() # Get last index for "labels"
Label_rdd.collect()

['normal.',
 'buffer_overflow.',
 'loadmodule.',
 'perl.',
 'neptune.',
 'smurf.',
 'guess_passwd.',
 'pod.',
 'teardrop.',
 'portsweep.',
 'ipsweep.',
 'land.',
 'ftp_write.',
 'back.',
 'imap.',
 'satan.',
 'phf.',
 'nmap.',
 'multihop.',
 'warezmaster.',
 'warezclient.',
 'spy.',
 'rootkit.']

In [7]:
# List the name of protocols
Protocol_rdd = Split_rdd.map(lambda item: item[1]).distinct() # Get index 1
Protocol_rdd.collect()

['tcp', 'udp', 'icmp']

In [16]:
# A function for counting labels
Labels_Count = []
def LabelCount_func(items):
  for i in items:
    Labels_Count.append(KDDcup_rdd.filter(lambda line: i in line).count())

In [17]:
# Count the number of records based on the labels
import time
start_time = time.time()

Labels_list = Label_rdd.collect()
LabelCount_func(Labels_list)

# Print out the time taken for executing the function
print("--- %s sec ---" % (time.time() - start_time))

--- 240.74376225471497 sec ---


In [13]:
print(Labels_list)
print(Labels_Count)

['normal.', 'buffer_overflow.', 'loadmodule.', 'perl.', 'neptune.', 'smurf.', 'guess_passwd.', 'pod.', 'teardrop.', 'portsweep.', 'ipsweep.', 'land.', 'ftp_write.', 'back.', 'imap.', 'satan.', 'phf.', 'nmap.', 'multihop.', 'warezmaster.', 'warezclient.', 'spy.', 'rootkit.']
[972781, 30, 9, 3, 1072017, 2807886, 53, 264, 979, 10413, 12481, 21, 8, 2203, 12, 15892, 4, 2316, 7, 20, 1020, 2, 10]


In [18]:
# Create a DataFrame to visualize all in a table format
import pandas as pd
DF_labels = pd.DataFrame({'Label': Labels_list,
                          'Count': Labels_Count})
DF_labels

Unnamed: 0,Label,Count
0,normal.,972781
1,buffer_overflow.,30
2,loadmodule.,9
3,perl.,3
4,neptune.,1072017
5,smurf.,2807886
6,guess_passwd.,53
7,pod.,264
8,teardrop.,979
9,portsweep.,10413


In [16]:
Label_rdd_KV.take(5)

[('normal.', 1),
 ('normal.', 1),
 ('normal.', 1),
 ('normal.', 1),
 ('normal.', 1)]

In [9]:
# Alternative solution: Create (Key, value) for counting Labels
import time
start_time = time.time()

Label_rdd_KV=Split_rdd.map(lambda x: (x[-1],1))
Label_rdd_Reduce = Label_rdd_KV.reduceByKey(lambda a,b: a+b)

# Print out the time taken for executing the Key-Value operation
print("--- %s sec ---" % (time.time() - start_time))

Label_rdd_Reduce.collect()

--- 0.04381275177001953 sec ---


[('normal.', 972781),
 ('buffer_overflow.', 30),
 ('loadmodule.', 9),
 ('perl.', 3),
 ('neptune.', 1072017),
 ('smurf.', 2807886),
 ('guess_passwd.', 53),
 ('pod.', 264),
 ('teardrop.', 979),
 ('portsweep.', 10413),
 ('ipsweep.', 12481),
 ('land.', 21),
 ('ftp_write.', 8),
 ('back.', 2203),
 ('imap.', 12),
 ('satan.', 15892),
 ('phf.', 4),
 ('nmap.', 2316),
 ('multihop.', 7),
 ('warezmaster.', 20),
 ('warezclient.', 1020),
 ('spy.', 2),
 ('rootkit.', 10)]

In [12]:
Keys = Label_rdd_Reduce.keys().collect()
Values = Label_rdd_Reduce.values().collect()

DF_labels_KV = pd.DataFrame({'Label': Keys,
                          'Count': Values})
DF_labels_KV

Unnamed: 0,Label,Count
0,normal.,972781
1,buffer_overflow.,30
2,loadmodule.,9
3,perl.,3
4,neptune.,1072017
5,smurf.,2807886
6,guess_passwd.,53
7,pod.,264
8,teardrop.,979
9,portsweep.,10413


In [19]:
# To make sure all the calculations are correct
print (DF_labels['Count'].sum())

DF_labels.sort_values(by='Count')


DF_labels.sort_values(by='Count', ascending=False)

[DF_labels.Count.min(), DF_labels.Count.max(), DF_labels.Count.mean()]

4898431


[2, 2807886, 212975.26086956522]