# RDD Basics
---

In spark, the main abstraction is the resilient distributed dataset or RDD. An RDD is a lower level immutable object which will contain our data. In this lesson, we are going to do a deep dive into the RDD api.

In particular, we look at the following transformations and actions on RDDs,

method|transformation or action
-|-
map()|transformation
mapPartitions()|transformation
mapPartitionsWithIndex()|transformation
filter()|transformation
flatMap()|transformation
reduceByKey()groupByKey()|transformation
irst()|action
take()|action
takeSample()|action
takeOrdered()|action
collect()|action
count()|action
countByValue()|action
reduce()|action
top()|action


## *Resources*
[PySpark RDD](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD)

![](spark.png)

## Create some data

In [1]:
rdd = spark.sparkContext.parallelize(xrange(25))

In [2]:
type(rdd)

pyspark.rdd.PipelinedRDD

In [3]:
rdd.toDebugString()


'(4) PythonRDD[1] at RDD at PythonRDD.scala:48 []\n |  ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:475 []'

In [4]:
rdd.getNumPartitions()

4

In [5]:
rdd.take(5)

[0, 1, 2, 3, 4]

We can use `.collect()` to view the entire rdd, but this is not smart to do unless it is small or a subset that can comfortably fit in memory.

## Transformations
Our first transformation, is perhaps the most common operation we will use on rdds: `map()`. This is equivalent to the idea of what Python's built-in `map()` is doing -- applying a function to each element in a collection (our collection, in this case, is an RDD).

In [6]:
def addOne(x):
    return x + 1

In [7]:
newRDD = rdd.map(addOne) # Nothing happens yet -- lazy eval

In [8]:
newRDD.take(5) # Calling an action, in this case `take()`, actually results in the computation

[1, 2, 3, 4, 5]

In [9]:
rdd.map(addOne).take(5)

[1, 2, 3, 4, 5]

In [10]:
rdd.take(5) # RDD is immutable

[0, 1, 2, 3, 4]

Once map is called with an action, a stage of tasks is launched to distribute `map()` (the tasks) to each partition. the function is applied to each partition and a new partition is returned.

## Collect
We can view the rdd in entirety with `collect()`, but keep in mind this will literally collect all the data to the driver, and if there is not enough space it will fail.

In [11]:
rdd.collect()

[0,
 1,
 2,
 3,
 4,
 5,
 6,
 7,
 8,
 9,
 10,
 11,
 12,
 13,
 14,
 15,
 16,
 17,
 18,
 19,
 20,
 21,
 22,
 23,
 24]

## Count
Count will count the items in each partion and then sum them and return that value.

In [12]:
rdd.count()

25

## Filter
Like Python's built-in filter, we can use sparks `filter()` to subset the data.

In [13]:
def sub(x):
    if x > 10:
        return True
    else:
        return False

In [14]:
smallRDD = rdd.filter(sub)

In [15]:
smallRDD.collect() # to view it

[11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24]

We can just as easily use lambda functions (and in most cases it's preferable).

In [16]:
rdd.filter(lambda x: x > 10).collect()

[11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24]

## First

In [17]:
rdd.first()

0

## top

In [18]:
rdd.top(10) # 10 largest elements

[24, 23, 22, 21, 20, 19, 18, 17, 16, 15]

## take ordered
By default, it is in ascending order. We could reverse it, though.

In [19]:
rdd.takeOrdered(10) # 10 smallest elements

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

## Reduce
Reduce will reduce elements within a partition to a single value (which makes it highly efficient) by applying a function.

In [20]:
smallRDD.reduce(lambda x,y: x+y)

245

In [21]:
smallRDD.collect()

[11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24]

## Sample
We can sample with `takeSample()`. Note we can use replacement and set a random seed.

In [22]:
smallRDD.takeSample(withReplacement=True, num=10, seed=0)

[21, 17, 24, 19, 16, 12, 11, 20, 11, 13]

## countbyvalue

In [23]:
smallRDD.countByValue()

defaultdict(int,
            {11: 1,
             12: 1,
             13: 1,
             14: 1,
             15: 1,
             16: 1,
             17: 1,
             18: 1,
             19: 1,
             20: 1,
             21: 1,
             22: 1,
             23: 1,
             24: 1})

## flatMap
flatMap is exactly like map, except that it will map each element to zero or more elements (like a flattened array -- think of np.ravel()).

In [24]:
bears = ['stella', 'princeton', 'blitz', 'stella']

In [25]:
brdd = spark.sparkContext.parallelize(bears)

In [26]:
brdd.map(lambda x: (x + " is a lovely cat",)).collect()

[('stella is a lovely cat',),
 ('princeton is a lovely cat',),
 ('blitz is a lovely cat',),
 ('stella is a lovely cat',)]

In [27]:
brdd.flatMap(lambda x: (x + " is a lovely cat",)).collect() # notice the reduction in the shape

['stella is a lovely cat',
 'princeton is a lovely cat',
 'blitz is a lovely cat',
 'stella is a lovely cat']

## Pair RDD
A pair rdd is an rdd with a tuple as an element. Below we will add to the brdd with the length of each name.

In [28]:
newbrdd = brdd.map(lambda x: (x, len(x)))

In [29]:
newbrdd.collect()

[('stella', 6), ('princeton', 9), ('blitz', 5), ('stella', 6)]

## Grouping BY
We can groupyByKey or reduceByKey and we will see in the future how reduceByKey is much more efficient.

In [30]:
newbrdd.groupByKey().map(lambda (x, y): (x, sum(y))).collect() # group data at partitions

[('blitz', 5), ('stella', 12), ('princeton', 9)]

In [31]:
newbrdd.reduceByKey(lambda x,y: x+y).collect() # no unncessary shuffling of data between partions

[('blitz', 5), ('stella', 12), ('princeton', 9)]

## sortBy

In [32]:
newbrdd.reduceByKey(lambda x,y: x+y).sortBy(lambda x: x[1], ascending=False).collect()

[('stella', 12), ('princeton', 9), ('blitz', 5)]

## chaining syntax

In [33]:
(newbrdd
 .reduceByKey(lambda x,y: x+y)
 .sortBy(lambda x: x[1], ascending=False)
 .collect())

[('stella', 12), ('princeton', 9), ('blitz', 5)]