# RDD basics

#### [Introduction to Spark with Python, by Jose A. Dianes](https://github.com/jadianes/spark-py-notebooks)

This notebook will introduce three basic but essential Spark operations. Two of them are the *transformations* `map` and `filter`. The other is the *action* `collect`. At the same time we will introduce the concept of *persistence* in Spark.

## SparkSession

In [None]:
import pyspark
from pyspark.sql import SparkSession

# spark = SparkSession.builder.appName("RDDBasics").getOrCreate(), solamente hay un sparksession
sc = spark.sparkContext

## Getting the data and creating the RDD

As we did in our first notebook, we will use the reduced dataset (10 percent) provided for the KDD Cup 1999, containing nearly half million network interactions. The file is provided as a Gzip file that we will download locally.

In [None]:
url = "http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz"
from pyspark import SparkFiles
spark.sparkContext.addFile(url)

Now we can use this file to create our RDD.

In [None]:
myRDD = sc.textFile("file://" + SparkFiles.get("kddcup.data_10_percent.gz"))

## The `filter` transformation

This transformation can be applied to RDDs in order to keep just elements that satisfy a certain condition. More concretely, a function is evaluated on every element in the original RDD. The new resulting RDD will contain just those elements that make the function return `True`.

For example, imagine we want to count how many `normal.` interactions we have in our dataset. We can filter our `raw_data` RDD as follows.

In [None]:
normal_myRDD = myRDD.filter(lambda x: "normal." in x)

Now we can count how many elements we have in the new RDD.

In [None]:
normal_myRDD.count()

Out[7]: 97278

In [None]:
myRDD.count()

Out[8]: 494021

In [None]:
from time import time
t0 = time()
normal_count = normal_myRDD.count()
tt = time() - t0
print("There are {} 'normal' intercations".format(normal_count))
print("Count completed in {} seconds".format(round(tt,3)))

There are 97278 'normal' intercations
Count completed in 1.257 seconds


Remember from notebook 1 that we have a total of 494021 in our 10 percent dataset. Here we can see that 97278 contain the `normal.` tag word.

Notice that we have measured the elapsed time for counting the elements in the RDD. We have done this because we wanted to point out that actual (distributed) computations in Spark take place when we execute *actions* and not *transformations*. In this case `count` is the action we execute on the RDD. We can apply as many transformations as we want on a our RDD and no computation will take place until we call the first action that, in this case takes a few seconds to complete.

## The `map` transformation

By using the `map` transformation in Spark, we can apply a function to every element in our RDD. Python's lambdas are specially expressive for this particular.

In this case we want to read our data file as a CSV formatted one. We can do this by applying a lambda function to each element in the RDD as follows.

In [None]:
myRDD.take(5)

Out[11]: ['0,tcp,http,SF,181,5450,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,9,9,1.00,0.00,0.11,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,239,486,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,19,19,1.00,0.00,0.05,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,235,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,29,29,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,219,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.00,0.00,0.00,0.00,1.00,0.00,0.00,39,39,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,217,2032,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.00,0.00,0.00,0.00,1.00,0.00,0.00,49,49,1.00,0.00,0.02,0.00,0.00,0.00,0.00,0.00,normal.']

In [None]:
csv_data = myRDD.map(lambda x: x.split(",")) # Transformacion, no hace nada

t0 = time()
head_rows = csv_data.take(5) # Acción, ejecuta la transformación y me muestra el resultado
tt = time() - t0
print("Parse completed in {} seconds".format(round(tt,3)))

Parse completed in 0.867 seconds


In [None]:
head_rows[0:3]

Out[13]: [['0',
  'tcp',
  'http',
  'SF',
  '181',
  '5450',
  '0',
  '0',
  '0',
  '0',
  '0',
  '1',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '8',
  '8',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '1.00',
  '0.00',
  '0.00',
  '9',
  '9',
  '1.00',
  '0.00',
  '0.11',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  'normal.'],
 ['0',
  'tcp',
  'http',
  'SF',
  '239',
  '486',
  '0',
  '0',
  '0',
  '0',
  '0',
  '1',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '8',
  '8',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '1.00',
  '0.00',
  '0.00',
  '19',
  '19',
  '1.00',
  '0.00',
  '0.05',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  'normal.'],
 ['0',
  'tcp',
  'http',
  'SF',
  '235',
  '1337',
  '0',
  '0',
  '0',
  '0',
  '0',
  '1',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '8',
  '8',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '1.00',
  '0.00',
  '0.00',
  '29',
  '29',
  '1.00',
 

Again, all action happens once we call the first Spark *action* (i.e. *take* in this case). What if we take a lot of elements instead of just the first few?

In [None]:
t0 = time()
head_rows = csv_data.take(100000)
tt = time() - t0
print("parse completed in {} seconds".format(round(tt, 3)))

parse completed in 1.981 seconds


We can see that it takes longer. The `map` function is applied now in a  distributed way to a lot of elements on the RDD, hence the longer execution time.

### Using `map` and predefined functions

Of course we can use predefined functions with `map`. Imagine we want to have each element in the RDD as a key-value pair where the key is the tag (e.g. *normal*) and the value is the whole list of elements that represents the row in the CSV formatted file. We could proceed as follows.

In [None]:
myRDD.take(5)

Out[17]: ['0,tcp,http,SF,181,5450,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,9,9,1.00,0.00,0.11,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,239,486,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,19,19,1.00,0.00,0.05,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,235,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,29,29,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,219,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.00,0.00,0.00,0.00,1.00,0.00,0.00,39,39,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,217,2032,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.00,0.00,0.00,0.00,1.00,0.00,0.00,49,49,1.00,0.00,0.02,0.00,0.00,0.00,0.00,0.00,normal.']

In [None]:
def parse_interaction(line):
    elems = line.split(",")
    tag = elems[41]
    return (tag, elems)

key_csv_data = myRDD.map(parse_interaction)
head_rows = key_csv_data.take(5)
print(head_rows[0:2])

[('normal.', ['0', 'tcp', 'http', 'SF', '181', '5450', '0', '0', '0', '0', '0', '1', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '8', '8', '0.00', '0.00', '0.00', '0.00', '1.00', '0.00', '0.00', '9', '9', '1.00', '0.00', '0.11', '0.00', '0.00', '0.00', '0.00', '0.00', 'normal.']), ('normal.', ['0', 'tcp', 'http', 'SF', '239', '486', '0', '0', '0', '0', '0', '1', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '8', '8', '0.00', '0.00', '0.00', '0.00', '1.00', '0.00', '0.00', '19', '19', '1.00', '0.00', '0.05', '0.00', '0.00', '0.00', '0.00', '0.00', 'normal.'])]


That was easy, wasn't it?

In our notebook about working with key-value pairs we will use this type of RDDs to do data aggregations (e.g. count by key).

## The `collect` action

So far we have used the actions `count` and `take`. Another basic action we need to learn is `collect`. Basically it will get all the elements in the RDD **into memory** for us to work with them. For this reason it has to be used with care, specially when working with large RDDs.

An example using our raw data.

In [None]:
t0 = time()
all_raw_data = myRDD.collect()
tt = time() - t0
print("Data collected in {} seconds".format(round(tt,3)))

Data collected in 2.867 seconds


In [None]:
all_raw_data[:3]

Out[22]: ['0,tcp,http,SF,181,5450,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,9,9,1.00,0.00,0.11,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,239,486,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,19,19,1.00,0.00,0.05,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,235,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,29,29,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,normal.']

That took longer as any other action we used before, of course. Every Spark worker node that has a fragment of the RDD has to be coordinated in order to retrieve its part, and then *reduce* everything together.

As a last example combining all the previous, we want to collect all the `normal` interactions as key-value pairs.

In [None]:
# parse into key-value pairs
key_csv_data = myRDD.map(parse_interaction)

# filter normal key interactions
normal_key_interactions = key_csv_data.filter(lambda x: x[0] == "normal.")

# Collect all
t0 = time()
all_normal = normal_key_interactions.collect()
tt = time() - t0
normal_count = len(all_normal)
print("Data collected in  {} seconds".format(round(tt, 3)))
print("There are {} 'normal' interactions".format(normal_count))

Data collected in  3.782 seconds
There are 97278 'normal' interactions


In [None]:
print("Number of Partitions: " + str(myRDD.getNumPartitions()))
print("Number of Partitions: " + str(key_csv_data.getNumPartitions()))

Number of Partitions: 1
Number of Partitions: 1


In [None]:
type(myRDD)

Out[36]: pyspark.rdd.RDD

This count matches with the previous count for `normal` interactions. The new procedure is more time consuming. This is because we retrieve all the data with `collect` and then use Python's `len` on the resulting list. Before we were just counting the total number of elements in the RDD by using `count`.