source: https://www.codementor.io/@jadianes/spark-python-rdd-basics-du107x2ra


## Creating a Spark context, sc, in a local mode


In [3]:
import pyspark
sc = pyspark.SparkContext('local[*]')


## RDD Creation
### Getting the Data Files

In [4]:
import urllib.request
f = urllib.request.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "kddcup.data_10_percent.gz")

### Creating a RDD from a file


In [5]:
data_file = "./kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)

In [6]:
raw_data.count()


494021

In [5]:
raw_data.take(5)


['0,tcp,http,SF,181,5450,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,9,9,1.00,0.00,0.11,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,239,486,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,19,19,1.00,0.00,0.05,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,235,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,29,29,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,219,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.00,0.00,0.00,0.00,1.00,0.00,0.00,39,39,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,217,2032,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.00,0.00,0.00,0.00,1.00,0.00,0.00,49,49,1.00,0.00,0.02,0.00,0.00,0.00,0.00,0.00,normal.']

### Creating and RDD using parallelize

In [6]:
a = range(100)
    
data = sc.parallelize(a)
data.count()


100

In [7]:
data.take(5)

[0, 1, 2, 3, 4]

### The filter Transformation

In [8]:
normal_raw_data = raw_data.filter(lambda x: 'normal.' in x)


In [14]:
from time import time
t0 = time()
normal_count = normal_raw_data.count()
tt = time() - t0
print("There are {} 'normal' interactions".format(normal_count))
print("Count completed in {} seconds".format(round(tt, 3)))


There are 97278 'normal' interactions
Count completed in 1.083 seconds


### The map Transformation

In [16]:
from pprint import pprint
csv_data = raw_data.map(lambda x: x.split(","))
t0 = time()
head_rows = csv_data.take(5)
tt = time() - t0
print ("Parse completed in {} seconds".format(round(tt,3)))
pprint(head_rows[0])

Parse completed in 0.044 seconds
['0',
 'tcp',
 'http',
 'SF',
 '181',
 '5450',
 '0',
 '0',
 '0',
 '0',
 '0',
 '1',
 '0',
 '0',
 '0',
 '0',
 '0',
 '0',
 '0',
 '0',
 '0',
 '0',
 '8',
 '8',
 '0.00',
 '0.00',
 '0.00',
 '0.00',
 '1.00',
 '0.00',
 '0.00',
 '9',
 '9',
 '1.00',
 '0.00',
 '0.11',
 '0.00',
 '0.00',
 '0.00',
 '0.00',
 '0.00',
 'normal.']


In [17]:
t0 = time()
head_rows = csv_data.take(100000)
tt = time() - t0
print ("Parse completed in {} seconds".format(round(tt,3)))

Parse completed in 1.918 seconds


### Using map with Predefined Functions

In [18]:
def parse_interaction(line):
    elems = line.split(",")
    tag = elems[41]
    return (tag, elems)

key_csv_data = raw_data.map(parse_interaction)
head_rows = key_csv_data.take(5)
pprint(head_rows[0])

('normal.',
 ['0',
  'tcp',
  'http',
  'SF',
  '181',
  '5450',
  '0',
  '0',
  '0',
  '0',
  '0',
  '1',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '8',
  '8',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '1.00',
  '0.00',
  '0.00',
  '9',
  '9',
  '1.00',
  '0.00',
  '0.11',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  'normal.'])


### The collect Action

In [19]:
t0 = time()
all_raw_data = raw_data.collect()
tt = time() - t0
print ("Data collected in {} seconds".format(round(tt,3)))

Data collected in 3.907 seconds


In [21]:
# get data from file
data_file = "./kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)

# parse into key-value pairs
key_csv_data = raw_data.map(parse_interaction)

# filter normal key interactions
normal_key_interactions = key_csv_data.filter(lambda x: x[0] == "normal.")

# collect all
t0 = time()
all_normal = normal_key_interactions.collect()
tt = time() - t0
normal_count = len(all_normal)
print ("Data collected in {} seconds".format(round(tt,3)))
print ("There are {} 'normal' interactions".format(normal_count))

Data collected in 3.495 seconds
There are 97278 'normal' interactions


## Sampling RDDs
### Getting the Data and Creating the RDD

In [7]:
f = urllib.request.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data.gz", "kddcup.data.gz")

data_file = "./kddcup.data.gz"
raw_data = sc.textFile(data_file)

### The sample Transformation

In [8]:
raw_data_sample = raw_data.sample(False, 0.1, 1234)
sample_size = raw_data_sample.count()
total_size = raw_data.count()
print ("Sample size is {} of {}".format(sample_size, total_size))

Sample size is 489957 of 4898431


In [9]:
from time import time

# transformations to be applied
raw_data_sample_items = raw_data_sample.map(lambda x: x.split(","))
sample_normal_tags = raw_data_sample_items.filter(lambda x: "normal." in x)

# actions + time
t0 = time()
sample_normal_tags_count = sample_normal_tags.count()
tt = time() - t0

sample_normal_ratio = sample_normal_tags_count / float(sample_size)
print ("The ratio of 'normal' interactions is {}".format(round(sample_normal_ratio,3)) )
print ("Count done in {} seconds".format(round(tt,3)))

The ratio of 'normal' interactions is 0.199
Count done in 11.517 seconds


### calculating the ratio without sampling

In [10]:
# transformations to be applied
raw_data_items = raw_data.map(lambda x: x.split(","))
normal_tags = raw_data_items.filter(lambda x: "normal." in x)

# actions + time
t0 = time()
normal_tags_count = normal_tags.count()
tt = time() - t0

normal_ratio = normal_tags_count / float(total_size)
print ("The ratio of 'normal' interactions is {}".format(round(normal_ratio,3)) )
print ("Count done in {} seconds".format(round(tt,3)))

The ratio of 'normal' interactions is 0.199
Count done in 22.412 seconds


### The takeSample Action

In [11]:
t0 = time()
raw_data_sample = raw_data.takeSample(False, 400000, 1234)
normal_data_sample = [x.split(",") for x in raw_data_sample if "normal." in x]
tt = time() - t0

normal_sample_size = len(normal_data_sample)

normal_ratio = normal_sample_size / 400000.0
print ("The ratio of 'normal' interactions is {}".format(normal_ratio))
print ("Count done in {} seconds".format(round(tt,3)))

The ratio of 'normal' interactions is 0.1988025
Count done in 19.84 seconds


## Set Operations on RDDs
### Getting the Data and Creating the RDD

In [13]:
f = urllib.request.urlretrieve  ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "kddcup.data_10_percent.gz")

data_file = "./kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)

### Getting Attack Interactions Using substract

In [14]:
normal_raw_data = raw_data.filter(lambda x: "normal." in x)
attack_raw_data = raw_data.subtract(normal_raw_data)

from time import time

# count all
t0 = time()
raw_data_count = raw_data.count()
tt = time() - t0
print ("All count in {} secs".format(round(tt,3)))


All count in 0.909 secs


In [15]:
# count normal
t0 = time()
normal_raw_data_count = normal_raw_data.count()
tt = time() - t0
print ("Normal count in {} secs".format(round(tt,3)))

Normal count in 1.079 secs


In [16]:
# count attacks
t0 = time()
attack_raw_data_count = attack_raw_data.count()
tt = time() - t0
print ("Attack count in {} secs".format(round(tt,3)))

print ("There are {} normal interactions and {} attacks, \
from a total of {} interactions".format(normal_raw_data_count,attack_raw_data_count,raw_data_count))

Attack count in 5.229 secs
There are 97278 normal interactions and 396743 attacks, from a total of 494021 interactions


### Protocol and Service Combinations Using cartesian

In [38]:
csv_data = raw_data.map(lambda x: x.split(","))
protocols = csv_data.map(lambda x: x[1]).distinct()
protocols.collect()

['tcp', 'udp', 'icmp']

In [39]:
services = csv_data.map(lambda x: x[2]).distinct()
services.collect()

['http',
 'smtp',
 'finger',
 'domain_u',
 'auth',
 'telnet',
 'ftp',
 'eco_i',
 'ntp_u',
 'ecr_i',
 'other',
 'private',
 'pop_3',
 'ftp_data',
 'rje',
 'time',
 'mtp',
 'link',
 'remote_job',
 'gopher',
 'ssh',
 'name',
 'whois',
 'domain',
 'login',
 'imap4',
 'daytime',
 'ctf',
 'nntp',
 'shell',
 'IRC',
 'nnsp',
 'http_443',
 'exec',
 'printer',
 'efs',
 'courier',
 'uucp',
 'klogin',
 'kshell',
 'echo',
 'discard',
 'systat',
 'supdup',
 'iso_tsap',
 'hostnames',
 'csnet_ns',
 'pop_2',
 'sunrpc',
 'uucp_path',
 'netbios_ns',
 'netbios_ssn',
 'netbios_dgm',
 'sql_net',
 'vmnet',
 'bgp',
 'Z39_50',
 'ldap',
 'netstat',
 'urh_i',
 'X11',
 'urp_i',
 'pm_dump',
 'tftp_u',
 'tim_i',
 'red_i']

In [40]:
product = protocols.cartesian(services).collect()
print ("There are {} combinations of protocol X service".format(len(product)))

There are 198 combinations of protocol X service
