In [1]:
from pyspark import SparkContext, SparkConf

conf = SparkConf().setAppName("data cleaning").setMaster("local[*]")
sc = SparkContext(conf=conf)

# Loading train and test datasets

In [2]:
rdd_train = sc.textFile("/data_serving/train_data.gz")
rdd_test = sc.textFile("/data_serving/test_data.gz")

# Repartition and persist rdds in the cache

In [3]:
# change number of partition for train data as it includes milion rows
rdd_train = rdd_train.repartition(30) # shuffle all data
print(sc.defaultParallelism)
print("Number of partition for train data: ", rdd_train.getNumPartitions())
print("Number of partition for test data: ", rdd_test.getNumPartitions())
# store in cache
rdd_train.persist()
rdd_test.persist()

4
Number of partition for train data:  30
Number of partition for test data:  1


/data_serving/test_data.gz MapPartitionsRDD[3] at textFile at NativeMethodAccessorImpl.java:0

# Count total rows in dataset

In [4]:
print("total rows in dataset: ", rdd_train.count())
print("total rows in test dataset: ", rdd_test.count())

total rows in dataset:  4898431
total rows in test dataset:  311029


# Taking 10 random samples of dataset

### train_data

In [5]:
rdd_train.takeSample(withReplacement=False, num=10, seed=42)

['Log-- 0 tcp-http | SF { 316 } 2698 remote 0 0 0 0 login_success 0 not_root_obtained 0 0 0 0 1 0 not_host_login not_guest_login 21:21 0.00 0.00 0.00 0.00 1.00 0.00 0.00 (255:244) 0.96 0.02 0.01 0.00 0.00 0.00 0.00 0.00 <label:normal.>',
 'Log-- 0 icmp-ecr_i | SF { 1032 } 0 remote 0 0 0 0 login_failed 0 not_root_obtained 0 0 0 0 0 0 not_host_login not_guest_login 511:511 0.00 0.00 0.00 0.00 1.00 0.00 0.00 (255:255) 1.00 0.00 1.00 0.00 0.00 0.00 0.00 0.00 <label:smurf.>',
 'Log-- 0 icmp-ecr_i | SF { 1032 } 0 remote 0 0 0 0 login_failed 0 not_root_obtained 0 0 0 0 0 0 not_host_login not_guest_login 511:511 0.00 0.00 0.00 0.00 1.00 0.00 0.00 (255:255) 1.00 0.00 1.00 0.00 0.00 0.00 0.00 0.00 <label:smurf.>',
 'Log-- 0 tcp-http | SF { 214 } 1941 remote 0 0 0 0 login_success 0 not_root_obtained 0 0 0 0 0 0 not_host_login not_guest_login 9:9 0.00 0.00 0.00 0.00 1.00 0.00 0.00 (9:255) 1.00 0.00 0.11 0.02 0.00 0.00 0.00 0.00 <label:normal.>',
 'Log-- 0 icmp-ecr_i | SF { 520 } 0 remote 0 0 0 0 l

### test_data

In [6]:
rdd_test.takeSample(withReplacement=False, num=10, seed=42)

['Log-- 0 tcp-private | REJ { 0 } 0 remote 0 0 0 0 login_failed 0 not_root_obtained 0 0 0 0 0 0 not_host_login not_guest_login 240:6 0.00 0.00 1.00 1.00 0.03 0.06 0.00 (255:6) 0.02 0.07 0.00 0.00 0.00 0.00 1.00 1.00 <label:neptune.>',
 'Log-- 0 icmp-ecr_i | SF { 520 } 0 remote 0 0 0 0 login_failed 0 not_root_obtained 0 0 0 0 0 0 not_host_login not_guest_login 510:510 0.00 0.00 0.00 0.00 1.00 0.00 0.00 (255:255) 1.00 0.00 1.00 0.00 0.00 0.00 0.00 0.00 <label:smurf.>',
 'Log-- 0 tcp-private | REJ { 0 } 0 remote 0 0 0 0 login_failed 0 not_root_obtained 0 0 0 0 0 0 not_host_login not_guest_login 273:13 0.00 0.00 1.00 1.00 0.05 0.06 0.00 (255:13) 0.05 0.07 0.00 0.00 0.00 0.00 1.00 1.00 <label:neptune.>',
 'Log-- 0 icmp-ecr_i | SF { 520 } 0 remote 0 0 0 0 login_failed 0 not_root_obtained 0 0 0 0 0 0 not_host_login not_guest_login 509:509 0.00 0.00 0.00 0.00 1.00 0.00 0.00 (255:255) 1.00 0.00 1.00 0.00 0.00 0.00 0.00 0.00 <label:smurf.>',
 'Log-- 0 icmp-ecr_i | SF { 1032 } 0 remote 0 0 0 0 lo

# Metadata


## name of features


In [7]:
feature_names = []
with open("/home/ubuntu/kddcup.names.txt") as f:
  lines = f.readlines()
  for line in lines:
    line = line.strip()
    if line.startswith("back") or line == "":
        continue
    try:

        name = line.split(":")[0]
        feature_names.append(name)
    except:
      pass
feature_names.append("label")
len(feature_names)

42

In [8]:
feature_names

['duration',
 'protocol_type',
 'service',
 'flag',
 'src_bytes',
 'dst_bytes',
 'land',
 'wrong_fragment',
 'urgent',
 'hot',
 'num_failed_logins',
 'logged_in',
 'num_compromised',
 'root_shell',
 'su_attempted',
 'num_root',
 'num_file_creations',
 'num_shells',
 'num_access_files',
 'num_outbound_cmds',
 'is_host_login',
 'is_guest_login',
 'count',
 'srv_count',
 'serror_rate',
 'srv_serror_rate',
 'rerror_rate',
 'srv_rerror_rate',
 'same_srv_rate',
 'diff_srv_rate',
 'srv_diff_host_rate',
 'dst_host_count',
 'dst_host_srv_count',
 'dst_host_same_srv_rate',
 'dst_host_diff_srv_rate',
 'dst_host_same_src_port_rate',
 'dst_host_srv_diff_host_rate',
 'dst_host_serror_rate',
 'dst_host_srv_serror_rate',
 'dst_host_rerror_rate',
 'dst_host_srv_rerror_rate',
 'label']

## attack types

In [9]:
name_type_attack = []
type_of_attack = set()

with open("/home/ubuntu/training_attack_types.txt") as f:
  lines = f.readlines()
  for line in lines:
    line = line.strip()
    try:
      name, attack = line.split(" ")
      name_type_attack.append(name)
      type_of_attack.add(attack)
    except:
      pass
type_of_attack.add("normal")
print(type_of_attack)
print(name_type_attack)

{'dos', 'r2l', 'probe', 'u2r', 'normal'}
['back', 'buffer_overflow', 'ftp_write', 'guess_passwd', 'imap', 'ipsweep', 'land', 'loadmodule', 'multihop', 'neptune', 'nmap', 'perl', 'phf', 'pod', 'portsweep', 'rootkit', 'satan', 'smurf', 'spy', 'teardrop', 'warezclient', 'warezmaster']


# Data cleaning

example row: **'Log-- 0 icmp-ecr_i | SF { 1032 } 0 remote 0 0 0 0 login_failed 0 not_root_obtained 0 0 0 0 0 0 not_host_login not_guest_login 511:511 0.00 0.00 0.00 0.00 1.00 0.00 0.00 (255:255) 1.00 0.00 1.00 0.00 0.00 0.00 0.00 0.00 <label:smurf.>'**

* Remove **log--** from each row
* Remove **-** between **icmp** (protocol_type) and **ecr_i** (service). These are two seperate features according to name of features.
* remove character **|**
* remove all **{}** and **()** .
* According to name of features, some of the features are boolean (0 or 1). Use map/filter/distinct/etc to check which features are boolean and replace with 0 or 1.
* remove **:** between numbers. According to name of features, each of them is a seperate feature.
* remove **label** and extra characters for the last feature.
* Add comma between features

### Remove **log--**

In [10]:
rdd_train = rdd_train.map(lambda x: x.replace("Log--", "").strip())
rdd_test = rdd_test.map(lambda x: x.replace("Log--", "").strip())

In [11]:
rdd_train.take(1)

['0 tcp-http | SF { 303 } 784 remote 0 0 0 0 login_success 0 not_root_obtained 0 0 0 0 0 0 not_host_login not_guest_login 5:6 0.00 0.00 0.00 0.00 1.00 0.00 0.33 (137:255) 1.00 0.00 0.01 0.02 0.00 0.00 0.00 0.00 <label:normal.>']

In [12]:
rdd_test.take(1)

['0 udp-domain_u | SF { 44 } 44 remote 0 0 0 0 login_failed 0 not_root_obtained 0 0 0 0 0 0 not_host_login not_guest_login 2:4 0.00 0.00 0.00 0.00 1.00 0.00 0.50 (255:236) 0.93 0.01 0.01 0.00 0.00 0.00 0.00 0.00 <label:normal.>']

### Remove **-** between **icmp** (protocol_type) and **ecr_i** (service). Remove character **|**

In [13]:
# solution 1
def remove_dash(x):
    try:
        x = x.split(" ")
        protocl_type, service = x[1].split("-")
        x[1] = protocl_type
        x[2] = service
        return " ".join(x)
    except:
        print("An error occured")
        return " ".join(x)

rdd_train = rdd_train.map(lambda x: remove_dash(x))
rdd_test = rdd_test.map(lambda x: remove_dash(x))
# solution 2
# rdd = rdd.map(lambda x: x.replace("|", "").strip())
# rdd = rdd.map(lambda x: x.replace("-", ""))

In [14]:
rdd_train.take(1)

['0 tcp http SF { 303 } 784 remote 0 0 0 0 login_success 0 not_root_obtained 0 0 0 0 0 0 not_host_login not_guest_login 5:6 0.00 0.00 0.00 0.00 1.00 0.00 0.33 (137:255) 1.00 0.00 0.01 0.02 0.00 0.00 0.00 0.00 <label:normal.>']

In [15]:
rdd_test.take(1)

['0 udp domain_u SF { 44 } 44 remote 0 0 0 0 login_failed 0 not_root_obtained 0 0 0 0 0 0 not_host_login not_guest_login 2:4 0.00 0.00 0.00 0.00 1.00 0.00 0.50 (255:236) 0.93 0.01 0.01 0.00 0.00 0.00 0.00 0.00 <label:normal.>']

### remove all **{}** and **()** .

In [16]:
rdd_train = rdd_train.map(lambda x: x.replace("{", "").replace("}", "").replace("(", "").replace(")", ""))
rdd_test = rdd_test.map(lambda x: x.replace("{", "").replace("}", "").replace("(", "").replace(")", ""))

In [17]:
# remove two spaces
rdd_train = rdd_train.map(lambda x: x.strip().replace("  ", " "))
rdd_test = rdd_test.map(lambda x: x.strip().replace("  ", " "))

In [18]:
rdd_train.take(1)

['0 tcp http SF 303 784 remote 0 0 0 0 login_success 0 not_root_obtained 0 0 0 0 0 0 not_host_login not_guest_login 5:6 0.00 0.00 0.00 0.00 1.00 0.00 0.33 137:255 1.00 0.00 0.01 0.02 0.00 0.00 0.00 0.00 <label:normal.>']

In [19]:
rdd_test.take(1)

['0 udp domain_u SF 44 44 remote 0 0 0 0 login_failed 0 not_root_obtained 0 0 0 0 0 0 not_host_login not_guest_login 2:4 0.00 0.00 0.00 0.00 1.00 0.00 0.50 255:236 0.93 0.01 0.01 0.00 0.00 0.00 0.00 0.00 <label:normal.>']

### Features with boolean type

Columns with indices (6, 11, 13, 20, 21) seem to have only two possible values.

In [20]:
%%time
# Check columns [6, 11, 13, 20, 21] have only two unique values
rdd_train.map(lambda x: x.split(" ")).flatMap(lambda x: [(i, x[i]) for i in [6, 11, 13, 20, 21]]).distinct().countByKey()

CPU times: user 24.4 ms, sys: 6.15 ms, total: 30.6 ms
Wall time: 15.8 s


defaultdict(int, {20: 2, 21: 2, 11: 2, 6: 2, 13: 2})

In [21]:
%%time
# Check columns [6, 11, 13, 20, 21] have only two unique values
rdd_test.map(lambda x: x.split(" ")).flatMap(lambda x: [(i, x[i]) for i in [6, 11, 13, 20, 21]]).distinct().countByKey()

CPU times: user 12 ms, sys: 1.12 ms, total: 13.1 ms
Wall time: 3.87 s


defaultdict(int, {6: 2, 11: 2, 13: 2, 20: 2, 21: 2})

### Replace with 0 and 1

In [22]:
# example row: '0 tcp http SF 213 1415 remote 0 0 0 0 login_success 0 not_root_obtained 0 0 0 0 0 0 not_host_login not_guest_login 2:14 0.00 0.00 0.00 0.00 1.00 0.00 0.14 27:33 1.00 0.00 0.04 0.06 0.00 0.00 0.00 0.00 <label:normal.>'

def replace_with_0_and_1(x):
    x = x.split(" ")
    x[6] = str(0) if x[6] == "remote" else str(1)
    x[11] = str(1) if x[11] == "login_success" else str(0)
    x[13] = str(0) if x[13] == "not_root_obtained" else str(1)
    x[20] = str(0) if x[20] == "not_host_login" else str(1)
    x[21] = str(0) if x[21] == "not_guest_login" else str(1)
    return " ".join(x)



rdd_train_replaced = rdd_train.map(lambda x: replace_with_0_and_1(x))
rdd_train_replaced.persist()
rdd_test_replaced = rdd_test.map(lambda x: replace_with_0_and_1(x))
rdd_test_replaced.persist()

PythonRDD[32] at RDD at PythonRDD.scala:53

In [23]:
rdd_train_replaced.take(1)

['0 tcp http SF 303 784 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 5:6 0.00 0.00 0.00 0.00 1.00 0.00 0.33 137:255 1.00 0.00 0.01 0.02 0.00 0.00 0.00 0.00 <label:normal.>']

In [24]:
rdd_test_replaced.take(1)

['0 udp domain_u SF 44 44 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 2:4 0.00 0.00 0.00 0.00 1.00 0.00 0.50 255:236 0.93 0.01 0.01 0.00 0.00 0.00 0.00 0.00 <label:normal.>']

### Remove all **(:)** for each row

In [25]:
rdd_train_replaced = rdd_train_replaced.map(lambda x: x.replace(":", " "))
rdd_test_replaced = rdd_test_replaced.map(lambda x: x.replace(":", " "))

In [26]:
rdd_train_replaced.take(1)

['0 tcp http SF 303 784 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 5 6 0.00 0.00 0.00 0.00 1.00 0.00 0.33 137 255 1.00 0.00 0.01 0.02 0.00 0.00 0.00 0.00 <label normal.>']

In [27]:
rdd_test_replaced.take(1)

['0 udp domain_u SF 44 44 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 2 4 0.00 0.00 0.00 0.00 1.00 0.00 0.50 255 236 0.93 0.01 0.01 0.00 0.00 0.00 0.00 0.00 <label normal.>']

### Remove **label** and extra characters around last feature

In [28]:
import re

rdd_train_replaced = rdd_train_replaced.map(lambda row: re.sub(r'<label\s+(\w+)\.>', r'\1', row))
rdd_test_replaced = rdd_test_replaced.map(lambda row: re.sub(r'<label\s+(\w+)\.>', r'\1', row))

In [29]:
# train data
rdd_train_replaced.takeSample(False, 10)

['0 tcp efs REJ 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 149 20 0.00 0.00 1.00 1.00 0.13 0.05 0.00 255 20 0.08 0.06 0.00 0.00 0.00 0.00 1.00 1.00 neptune',
 '0 tcp http SF 252 176 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 1 2 0.00 0.00 0.00 0.00 1.00 0.00 1.00 255 255 1.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 normal',
 '0 icmp ecr_i SF 1032 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 511 511 0.00 0.00 0.00 0.00 1.00 0.00 0.00 255 255 1.00 0.00 1.00 0.00 0.00 0.00 0.00 0.00 smurf',
 '0 tcp private REJ 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 212 6 0.00 0.00 1.00 1.00 0.03 0.07 0.00 255 6 0.02 0.07 0.00 0.00 0.00 0.00 1.00 1.00 neptune',
 '0 udp private SF 105 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 1 0.00 0.00 0.00 0.00 1.00 0.00 0.00 255 241 0.95 0.01 0.00 0.00 0.00 0.00 0.00 0.00 normal',
 '0 tcp http REJ 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 1 0.00 0.00 1.00 1.00 1.00 0.00 0.00 53 210 1.00 0.00 0.02 0.10 0.00 0.00 1.00 1.00 normal',
 '0 tcp private S0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 121 20 1.00 1.00 0.00 0.00 0.17 

In [30]:
# test data
rdd_test_replaced.takeSample(False, 10)

['0 icmp ecr_i SF 1032 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 511 511 0.00 0.00 0.00 0.00 1.00 0.00 0.00 255 255 1.00 0.00 1.00 0.00 0.00 0.00 0.00 0.00 smurf',
 '0 tcp http SF 218 8899 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 1 1 0.00 0.00 0.00 0.00 1.00 0.00 0.00 213 255 1.00 0.00 0.00 0.02 0.00 0.00 0.00 0.00 normal',
 '0 tcp private REJ 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 143 10 0.00 0.00 1.00 1.00 0.07 0.06 0.00 255 10 0.04 0.05 0.00 0.00 0.00 0.00 1.00 1.00 neptune',
 '0 tcp private REJ 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 199 4 0.02 0.00 0.98 1.00 0.02 0.07 0.00 255 4 0.02 0.07 0.00 0.00 0.01 0.00 0.99 1.00 neptune',
 '0 icmp ecr_i SF 520 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 511 511 0.00 0.00 0.00 0.00 1.00 0.00 0.00 255 255 1.00 0.00 1.00 0.00 0.00 0.00 0.00 0.00 smurf',
 '0 icmp ecr_i SF 520 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 511 511 0.00 0.00 0.00 0.00 1.00 0.00 0.00 255 255 1.00 0.00 1.00 0.00 0.00 0.00 0.00 0.00 smurf',
 '0 icmp ecr_i SF 1032 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 511 511 0.00 0.0

### Add comma between features

In [31]:
rdd_train_replaced = rdd_train_replaced.map(lambda x: x.replace(" ", ","))
rdd_test_replaced = rdd_test_replaced.map(lambda x: x.replace(" ", ","))

In [32]:
rdd_train_replaced.take(1)

['0,tcp,http,SF,303,784,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,5,6,0.00,0.00,0.00,0.00,1.00,0.00,0.33,137,255,1.00,0.00,0.01,0.02,0.00,0.00,0.00,0.00,normal']

In [33]:
rdd_test_replaced.take(1)

['0,udp,domain_u,SF,44,44,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,2,4,0.00,0.00,0.00,0.00,1.00,0.00,0.50,255,236,0.93,0.01,0.01,0.00,0.00,0.00,0.00,0.00,normal']

## Count amount of each label

### train dataset

In [34]:
labels_train = rdd_train_replaced.map(lambda row: row.split(",")[-1])

label_train_pairs = labels_train.map(lambda label: (label, 1))

label_train_counts = label_train_pairs.reduceByKey(lambda x, y: x + y)

result_train = label_train_counts.collect()

for label, count in result_train:
    print(f"Label: {label} ------ Count: {count}")

Label: portsweep ------ Count: 10413
Label: neptune ------ Count: 1072017
Label: pod ------ Count: 264
Label: ftp_write ------ Count: 8
Label: teardrop ------ Count: 979
Label: perl ------ Count: 3
Label: warezclient ------ Count: 1020
Label: warezmaster ------ Count: 20
Label: guess_passwd ------ Count: 53
Label: smurf ------ Count: 2807886
Label: ipsweep ------ Count: 12481
Label: spy ------ Count: 2
Label: nmap ------ Count: 2316
Label: loadmodule ------ Count: 9
Label: normal ------ Count: 972781
Label: rootkit ------ Count: 10
Label: multihop ------ Count: 7
Label: back ------ Count: 2203
Label: land ------ Count: 21
Label: imap ------ Count: 12
Label: satan ------ Count: 15892
Label: phf ------ Count: 4
Label: buffer_overflow ------ Count: 30


### test dataset

In [35]:
labels_test = rdd_test_replaced.map(lambda row: row.split(",")[-1])

label_test_pairs = labels_test.map(lambda label: (label, 1))

label_test_counts = label_test_pairs.reduceByKey(lambda x, y: x + y)

result_test = label_test_counts.collect()

for label, count in result_test:
    print(f"Label: {label} ------ Count: {count}")

Label: normal ------ Count: 60593
Label: snmpgetattack ------ Count: 7741
Label: smurf ------ Count: 164091
Label: ipsweep ------ Count: 306
Label: saint ------ Count: 736
Label: portsweep ------ Count: 354
Label: apache2 ------ Count: 794
Label: warezmaster ------ Count: 1602
Label: guess_passwd ------ Count: 4367
Label: satan ------ Count: 1633
Label: processtable ------ Count: 759
Label: mscan ------ Count: 1053
Label: httptunnel ------ Count: 158
Label: neptune ------ Count: 58001
Label: mailbomb ------ Count: 5000
Label: rootkit ------ Count: 13
Label: pod ------ Count: 87
Label: phf ------ Count: 2
Label: snmpguess ------ Count: 2406
Label: teardrop ------ Count: 12
Label: back ------ Count: 1098
Label: multihop ------ Count: 18
Label: xterm ------ Count: 13
Label: nmap ------ Count: 84
Label: xlock ------ Count: 9
Label: xsnoop ------ Count: 4
Label: named ------ Count: 17
Label: buffer_overflow ------ Count: 22
Label: worm ------ Count: 2
Label: sendmail ------ Count: 17
Label:

### Labels in test and train data dont match. Test data has additional labels which are not in train data. 
### We remove these extra labels in test data and keep only labels that exist in both dataset.

In [36]:
name_type_attack.append("normal")
def filter_labels(x):
    label = x.split(",")[-1]
    if label in name_type_attack:
        return True
    else:
        return False

test_data_filtered = rdd_test_replaced.filter(lambda x: filter_labels(x))

In [37]:
test_data_filtered.takeSample(False, 10)

['0,icmp,ecr_i,SF,1032,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,510,510,0.00,0.00,0.00,0.00,1.00,0.00,0.00,255,254,1.00,0.01,1.00,0.00,0.00,0.00,0.00,0.00,smurf',
 '0,tcp,private,S0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,134,18,1.00,1.00,0.00,0.00,0.13,0.05,0.00,255,18,0.07,0.05,0.00,0.00,1.00,1.00,0.00,0.00,neptune',
 '0,icmp,ecr_i,SF,1032,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,511,511,0.00,0.00,0.00,0.00,1.00,0.00,0.00,255,255,1.00,0.00,1.00,0.00,0.00,0.00,0.00,0.00,smurf',
 '0,icmp,ecr_i,SF,520,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,511,511,0.00,0.00,0.00,0.00,1.00,0.00,0.00,255,255,1.00,0.00,1.00,0.00,0.00,0.00,0.00,0.00,smurf',
 '0,icmp,ecr_i,SF,1032,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,511,511,0.00,0.00,0.00,0.00,1.00,0.00,0.00,255,255,1.00,0.00,1.00,0.00,0.00,0.00,0.00,0.00,smurf',
 '0,tcp,http,SF,252,1483,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,1,1,0.00,0.00,0.00,0.00,1.00,0.00,0.00,159,255,1.00,0.00,0.01,0.02,0.00,0.00,0.00,0.00,normal',
 '0,tcp,http,SF,289,1311,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,10,11,0.00,0.

# Save cleaned dataset

The following code should not be run as already the datasets are saved. 
These code are just for illustration.

In [None]:
one_partition_train = train_data_replaced.coalesce(1)

output_path = "/data_serving/cleaned_kddcup.data.gz"

one_partition_train.saveAsTextFile(output_path, compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec")

In [56]:
one_partition_test = test_data_filtered.coalesce(1)

output_path = "/data_serving/cleaned_test.data.gz"

one_partition_test.saveAsTextFile(output_path, compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec")