## Configure your environment variables

```bash
export PYSPARK_DRIVER_PYTHON=/path/to/anaconda3/bin/jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --NotebookApp.open_browser=False --NotebookApp.ip='*' --NotebookApp.port=8880"
```

# Sampling/filtering RDDs to pick out relevant data points

In [None]:
import urllib.request

In [None]:
f = urllib.request.urlretrieve('https://archive.ics.uci.edu/ml/machine-learning-databases/kddcup99-mld/kddcup.data.gz',"kddcup.data.gz")

In [None]:
f

In [1]:
sc

In [1]:
raw_data = sc.textFile("./kddcup.data.gz")

In [3]:
contains_normal = raw_data.filter(lambda line: "normal." in line)

In [None]:
contains_normal.count()

In [None]:
split_file = raw_data.map(lambda line: line.split(','))

In [None]:
split_file.collect()

In [13]:
from time import time

In [2]:
sampled = raw_data.sample(False, 0.1, 42)
contains_normal_sample = sampled.map(lambda x: x.split(",")).filter(lambda x: "normal" in x)

In [6]:
t0 = time()
num_sampled = contains_normal_sample.count()
duration = time() - t0

In [7]:
duration

26.59442687034607

In [8]:
contains_normal = raw_data.map(lambda x: x.split(",")).filter(lambda x: "normal" in x)
t0 = time()
num_sampled = contains_normal.count()
duration = time() - t0

In [9]:
duration

52.91870903968811

In [10]:
data_in_memory = raw_data.takeSample(False, 10, 42)
contains_normal_py = [line.split(",") for line in data_in_memory if "normal" in line]

In [11]:
len(contains_normal_py)

1

In [None]:
normal_sample = sampled.filter(lambda line: "normal." in line)

In [None]:
non_normal_sample = sampled.subtract(normal_sample)

In [3]:
sampled.count()

490705

In [None]:
normal_sample.count()

In [None]:
non_normal_sample.count()

In [29]:
sampled.take(1)

['0,tcp,http,SF,215,45076,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,1,1,0.00,0.00,0.00,0.00,1.00,0.00,0.00,0,0,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,normal.']

In [24]:
feature_1 = sampled.map(lambda line: line.split(',')).map(lambda feature: feature[1]).distinct()

In [25]:
feature_1.take(1)

['http']

In [17]:
feature_2 = sampled.map(lambda line: line.split(",")).map(lambda features: features[2]).distinct()

In [18]:
feature_2.take(1)

['http']

In [26]:
f1 = feature_1.collect()
f2 = feature_2.collect()

In [27]:
f1

['http',
 'finger',
 'auth',
 'domain_u',
 'smtp',
 'ftp',
 'telnet',
 'eco_i',
 'ntp_u',
 'ecr_i',
 'other',
 'private',
 'pop_3',
 'ftp_data',
 'daytime',
 'remote_job',
 'supdup',
 'name',
 'ssh',
 'domain',
 'gopher',
 'time',
 'rje',
 'ctf',
 'mtp',
 'X11',
 'urp_i',
 'pm_dump',
 'IRC',
 'exec',
 'bgp',
 'nnsp',
 'iso_tsap',
 'http_443',
 'login',
 'shell',
 'printer',
 'efs',
 'courier',
 'uucp',
 'kshell',
 'klogin',
 'whois',
 'echo',
 'discard',
 'systat',
 'netstat',
 'hostnames',
 'csnet_ns',
 'pop_2',
 'sunrpc',
 'uucp_path',
 'nntp',
 'netbios_ns',
 'netbios_ssn',
 'netbios_dgm',
 'imap4',
 'sql_net',
 'vmnet',
 'link',
 'Z39_50',
 'ldap',
 'urh_i',
 'tftp_u',
 'red_i',
 'tim_i']

In [28]:
f2

['http',
 'finger',
 'auth',
 'domain_u',
 'smtp',
 'ftp',
 'telnet',
 'eco_i',
 'ntp_u',
 'ecr_i',
 'other',
 'private',
 'pop_3',
 'ftp_data',
 'daytime',
 'remote_job',
 'supdup',
 'name',
 'ssh',
 'domain',
 'gopher',
 'time',
 'rje',
 'ctf',
 'mtp',
 'X11',
 'urp_i',
 'pm_dump',
 'IRC',
 'exec',
 'bgp',
 'nnsp',
 'iso_tsap',
 'http_443',
 'login',
 'shell',
 'printer',
 'efs',
 'courier',
 'uucp',
 'kshell',
 'klogin',
 'whois',
 'echo',
 'discard',
 'systat',
 'netstat',
 'hostnames',
 'csnet_ns',
 'pop_2',
 'sunrpc',
 'uucp_path',
 'nntp',
 'netbios_ns',
 'netbios_ssn',
 'netbios_dgm',
 'imap4',
 'sql_net',
 'vmnet',
 'link',
 'Z39_50',
 'ldap',
 'urh_i',
 'tftp_u',
 'red_i',
 'tim_i']

In [None]:
len(feature_1.cartesian(feature_2).collect())