# First Lesson:
## RDD: Resilient Distributed Databases

In [3]:
# Import the data.
import urllib.request
f = urllib.request.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "kddcup.data_10_percent.gz")

In [6]:
# That's very important chunk for every code
# -------------------
import findspark
findspark.init("/usr/local/spark")
import pyspark
from pyspark import SparkContext
sc =SparkContext()
# -------------------------
# Spark textFile handles compressed data
data_file = "./kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)
# We have now created an RDD

In [7]:
raw_data.count()

494021

In [8]:
raw_data.take(5)

['0,tcp,http,SF,181,5450,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,9,9,1.00,0.00,0.11,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,239,486,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,19,19,1.00,0.00,0.05,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,235,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,29,29,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,219,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.00,0.00,0.00,0.00,1.00,0.00,0.00,39,39,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,217,2032,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.00,0.00,0.00,0.00,1.00,0.00,0.00,49,49,1.00,0.00,0.02,0.00,0.00,0.00,0.00,0.00,normal.']

In [11]:
# Another way to create an RDD is to parallelize an already existing list.
a = range(100)

data = sc.parallelize(a)
data.count()  # 100
data.take(5)

[0, 1, 2, 3, 4]

## RDD Basics: map, filter, collect

In [12]:
# Filter how many 'normal.' interaction there are.
normal_raw_data = raw_data.filter(lambda x: 'normal.' in x)

In [17]:
# Let's count the elements in the new RDD
from time import time
t0 = time()
normal_count = normal_raw_data.count()
tt = time() - t0
print("There are {} 'normal' interactions".format(normal_count))
print("Count completed in {} seconds".format(round(tt,3)))

# Warning: Actual (distributed) computations in Spark take place when we execute 
#          actions (e.g. 'collect') and not transformations (e.g. 'filter', 'map') 

There are 97278 'normal' interactions
Count completed in 2.609 seconds


In [19]:
# We want to read our data file as a CSV formatted one.
# By using the map transformation in Spark, we can apply a function to every element in our RDD.
from pprint import pprint
csv_data = raw_data.map(lambda x: x.split(","))
t0 = time()
head_rows = csv_data.take(5)
tt = time() - t0
print("Parse completed in {} seconds".format(round(tt,3)))
pprint(head_rows[0])

Parse completed in 0.073 seconds
['0',
 'tcp',
 'http',
 'SF',
 '181',
 '5450',
 '0',
 '0',
 '0',
 '0',
 '0',
 '1',
 '0',
 '0',
 '0',
 '0',
 '0',
 '0',
 '0',
 '0',
 '0',
 '0',
 '8',
 '8',
 '0.00',
 '0.00',
 '0.00',
 '0.00',
 '1.00',
 '0.00',
 '0.00',
 '9',
 '9',
 '1.00',
 '0.00',
 '0.11',
 '0.00',
 '0.00',
 '0.00',
 '0.00',
 '0.00',
 'normal.']


In [None]:
# Basically it will get all the elements in the RDD into memory for us to work with them. 
# For this reason it has to be used with care, specially when working with large RDDs.
t0 = time()
all_raw_data = raw_data.collect()
tt = time() - t0
print("Data collected in {} seconds".format(round(tt,3)))

# Sampling RDD
## sample (action) and takeSample (transformation)

In [21]:
raw_data_sample = raw_data.sample(False, 0.1, 1234)
# The sample transformation takes up to three parameters. First is whether the sampling is done with replacement or not. 
# Second is the sample size as a fraction. Finally we can optionally provide a random seed.

sample_size = raw_data_sample.count()
total_size = raw_data.count()
print("Sample size is {} of {}".format(sample_size, total_size))

Sample size is 49493 of 494021


In [None]:
# We want to have an approximation of the proportion of normal. interactions in our dataset. 

# transformations to be applied
raw_data_sample_items = raw_data_sample.map(lambda x: x.split(","))
sample_normal_tags = raw_data_sample_items.filter(lambda x: "normal." in x)

# actions + time
t0 = time()
sample_normal_tags_count = sample_normal_tags.count()
tt = time() - t0

sample_normal_ratio = sample_normal_tags_count / float(sample_size)
print("The ratio of 'normal' interactions is {}".format(round(sample_normal_ratio,3))) 
print("Count done in {} seconds".format(round(tt,3)))
# almost 44 sec

In [None]:
# Without sampling
# transformations to be applied
raw_data_items = raw_data.map(lambda x: x.split(","))
normal_tags = raw_data_items.filter(lambda x: "normal." in x)

# actions + time
t0 = time()
normal_tags_count = normal_tags.count()
tt = time() - t0

normal_ratio = normal_tags_count / float(total_size)
print("The ratio of 'normal' interactions is {}".format(round(normal_ratio,3)))
print("Count done in {} seconds".format(round(tt,3)))
# Almost 90 sec

In [None]:
# If what we need is to grab a sample of raw data from our RDD into local memory in order to be used by other 
# non-Spark libraries, takeSample can be used.
t0 = time()
raw_data_sample = raw_data.takeSample(False, 400000, 1234) # 400000 is the number of elements that needs to be taken
normal_data_sample = [x.split(",") for x in raw_data_sample if "normal." in x]
tt = time() - t0

normal_sample_size = len(normal_data_sample)

normal_ratio = normal_sample_size / 400000.0
print("The ratio of 'normal' interactions is {}".format(normal_ratio))
print("Count done in {} seconds".format(round(tt,3)))