# Spark notebook basics

**SparkContext**: our way of comunicating to the Spark system

In [1]:
import findspark
findspark.init()

In [2]:
from pyspark import SparkContext
sc=SparkContext(master='local[4]')
print(sc)

<SparkContext master=local[4] appName=pyspark-shell>


 The `master='local[4]` runs spark locally in my notebook using 4 workers (since I have 4 cores, we have one worker per core)
 
 We must have only one SparkContext at a time. It is designed for a single user. Before running a new SparkContext, stop the current one.

In [3]:
# sc.stop() # stop current SparkContext

**RDDs** (Resilient Distributed Dataset): a list of elements stored in several computers

**Parallelize**: simplest wat of creating an RDD. It is of type `PythonRDD`

In [4]:
A=sc.parallelize(range(3))
A

PythonRDD[1] at RDD at PythonRDD.scala:48

**Collect**: RDD content is distributed among all executors. `collect()` is the inverse of `parallelize()`. Collects the elements of the RDD, returns a list

In [5]:
L=A.collect()
print(type(L))
print(L)

<class 'list'>
[0, 1, 2]


Using collect eleminates the benefits of parallelism!!

It is often tempting to `.collect()` an RDD to make it into a list, and then process it using standard Python. However, this means only the head node is performing the computation, thus not benefitting from Spark. Using RDD operations will make you use all the computers at your disposal

**Map**: applies an operation to each element of the RDD. Parameter is the function defining the operation. Returns a new RDD. Operation is performed in parallel on all execution. Each executor operates on the local data

In [6]:
A.map(lambda x: x*x).collect()

[0, 1, 4]

**Reduce**: takes an RDD, return a single value. Reduce operator takes two elements as input and returns one as output. Repeatedly applies a reduce operator. Each executor reduces the data local to it. The results from all executors are combined

In [7]:
A.reduce(lambda x,y: x+y)

3

In [8]:
# finds the shortest string
words=['this','is','the','best','thinkpad','ever!']
wordsRDD=sc.parallelize(words)
wordsRDD.reduce(lambda w,v: w if len(w)<len(v) else v)

'is'

In [9]:
# bad reduce operation
B=sc.parallelize([1,3,5,2])
B.reduce(lambda x,y: x-y)

-9

This isn't an operation in which the order doesn't matter, because $x-y$ is different from $y-x$. Which of the following did you do:

$$((1-3)-5)-2$$

or

$$(1-3)-(5-2)$$


Using regular functions instead of lambda functions: lambda functions are short and sweet, but sometimes it is hard to use it in one line, we can use a full-fledged functions instead

Suppose we want to find the last word in a lexigographical order among the longest words in the list

In [10]:
def largerThan(x,y):
    if len(x)>len(y):
        return x
    elif len(y)>len(x):
        return y
    else:
        if x>y:
            return x
        else:
            return y

In [11]:
wordsRDD.reduce(largerThan)

'thinkpad'

# Changing number of workers

**Effect of changing the number of workers**: when you initialize SparkContext, you can specify the number of workers, usually one worker per core, but it can be smaller or larger than that

In [12]:
from time import time
from pyspark import SparkContext
sc.stop()
for j in range(1,10):
    sc = SparkContext(master='local[%d]'%(j))
    tic=time()
    for i in range(10):
        sc.parallelize([1,2]*100000).reduce(lambda x,y:x+y)
    print('%2d executors, time=%4.3f'%(j,time()-tic))
    sc.stop()

 1 executors, time=2.357
 2 executors, time=1.957
 3 executors, time=2.202
 4 executors, time=2.323
 5 executors, time=1.890
 6 executors, time=2.287
 7 executors, time=2.358
 8 executors, time=2.808
 9 executors, time=2.976


# Execution plans lazy eval and caching

Suppose our task is $\sum^n_{i=1}x^2_i$. The standard (busy) way to do this is: i) calculate the square of each element; ii) sum the squares. This requires storing all intermediate results!

**Lazy evaluation**: postpone computing the square until result is needed, no need to intermediate results, scan through the data once, rather than twice. We create an RDD with one million elements to amplify the effects of lazy evaluation and caching:

In [13]:
%%time
sc=SparkContext()
RDD=sc.parallelize(range(1000000))

CPU times: user 17.6 ms, sys: 0 ns, total: 17.6 ms
Wall time: 141 ms


Define a computation: the role of the function `taketime` is to consume CPU cycles

In [14]:
from math import cos
def taketime(i):
    [cos(j) for j in range(100)]
    return cos(i)

In [15]:
%%time
taketime(1)

CPU times: user 130 µs, sys: 10 µs, total: 140 µs
Wall time: 148 µs


0.5403023058681398

Time units
* 1 second = 1000 millisecond ($ms$)
* 1 millisecond = 1000 microsecond ($\mu s$)
* 1 microsecond = 1000 nanosecond ($\eta s$)

Clock rate: one cycle of a 3GHz CPU takes $\frac{1}{3} \eta s$

`taketime(1000)` takes about $25\eta s=75,000$ clock cycles

**Map operation**

In [16]:
%%time
Interm=RDD.map(lambda x: taketime(x))

CPU times: user 32 µs, sys: 3 µs, total: 35 µs
Wall time: 41.2 µs


How come so fast??

We expected this map operation to take $1000000\times 25\eta s=25 seconds$

Why did the previous took ~30 seconds??

Because no computation was done! The cell defined an execution plan, but did not execute it yet

**Execution plans**: at this point the variable `Interm` doesn't point to an actual data structure. Instead, it points to an execution plan expressed as a **dependence graph**. The dependence graph defines how the RDDs are computed from each other. The dependenec graph associated with an RDD can be printed out using the method `toDebugString()`

In [17]:
print(Interm.toDebugString().decode())

(4) PythonRDD[1] at RDD at PythonRDD.scala:48 []
 |  ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:175 []


**Interm** (4) PythonRDD[1] at RDD at PythonRDD.scala:48 [], the [4] corresponds to the number of partitions

**RDD** ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:175

So, at this point, only the left blocks of the plan have been declared (the parallelize and the map), but not the reduce

**Actual execution**: the `reduce` command needs to output an actual output, Spark, therefore has to actually execute the `map` and the `reduce`. Some real computation needs to be done, which takes about 1-3 seconds depending on the machine used and on it's load

In [18]:
%%time
print('out=',Interm.reduce(lambda x,y:x+y))

out= -0.2887054679684464
CPU times: user 15.6 ms, sys: 4 ms, total: 19.6 ms
Wall time: 15.7 s


**How come so fast?**

We expected this map operation to take 25 seconds, map+reduce take only ~10 seconds, why?

Because we have 4 workers rather than one, because the measurement of a single iteration of `taketime` is an overestimate

**Executing a different calculation based on the same plan**: the plan defined by `Interm` might need to be executed more than once. For example, compute the number of map outputs that are larger than zero

In [19]:
%%time
print('out=',Interm.filter(lambda x:x>0).count())

out= 500000
CPU times: user 12.4 ms, sys: 18 µs, total: 12.4 ms
Wall time: 13.3 s


**The price of not materializing**: the runtime is similar of that of the reduce, because the intermediate results in `Interm` have not been saved in memory (materialized), they need to be recomputed

The middle block is executed twice, once for each final step

    RDD                     Interm
parallelize(range(1000000)) -> map(taketime()) -> reduce(lambda x,y: x+y)       -> number

                                                             \
                                                             filter(lambda x: x>0).count()   -> number
                                                             
**Caching intermediate results**: we sometimes want to keep the intermediate results in memory so that we can reuse them later without recalculating. This will reduce the running time, at the cost of requiring more memory.

The method `cache()` indicates that the RDD generated in this plan should be stored in memory. Note that this is a **plan to cache**. The actual caching will be done only when the final result is needed

In [20]:
%%time
Interm=RDD.map(lambda x: taketime(x)).cache()

CPU times: user 5.89 ms, sys: 363 µs, total: 6.25 ms
Wall time: 13.2 ms


**Plan to cache**: the definition of `Interm` is almost the same as before. However, the plan corresponding to `Interm` is more elaborate and contains information about how the intermediate results will be cached nad replicated. Note that `PythonRDD[4]` is now `Memory Serialized 1x Replicated`

In [21]:
print(Interm.toDebugString().decode())

(4) PythonRDD[4] at RDD at PythonRDD.scala:48 [Memory Serialized 1x Replicated]
 |  ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:175 [Memory Serialized 1x Replicated]


**Creating the cache**: the following command executes the first map-reduce command **and** caches the result of the `map` command in memory

In [22]:
%%time
print('out=',Interm.reduce(lambda x,y: x+y))

out= -0.2887054679684464
CPU times: user 7.42 ms, sys: 4 ms, total: 11.4 ms
Wall time: 12.2 s


**Using the cache**: this time `Interm` is cached. Therefore the second use of `Interm` is much faster than when we did'nt use cache: 0.35 seconds instead of 10 seconds

In [23]:
%%time
print('out=',Interm.filter(lambda x: x>0).count())

out= 500000
CPU times: user 8.83 ms, sys: 57 µs, total: 8.89 ms
Wall time: 288 ms


# Partition and gloming

When you create an RDD, you can specify the number of partitions. The default is the number os workers you defined when you set up the `SparkContext`

In [24]:
A=sc.parallelize(range(100000))
print(A.getNumPartitions())

4


In [25]:
# you can repartition an RDD into a different number
D=A.repartition(10)
print(D.getNumPartitions())

10


In [26]:
# we can also define the number of partitions when creating the RDD
A=sc.parallelize(range(100000), numSlices=10)
print(A.getNumPartitions())

10


* Why is the number of partitions important?

They define the unit the executor works on. You should have at least as many partitions as you have workers (otherwise you will have idle workers). Sometimes smaller partitions can allow more parallelization

**Repartition for load balance**

Suppose we start with 10 partitions, all with exactly the same number of elements

In [27]:
A=sc.parallelize(range(100000))\
    .map(lambda x: (x,x)).partitionBy(10)
print(A.glom().map(len).collect()) # how many elements are in each part

[10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000]


Suppose we want to use `filter()` to select some of the elements in `A`

Some partitions might have more elements remaining than others

In [28]:
# select 10% of the entries
B=A.filter(lambda pair: pair[0] % 5 == 0)
# get number of partitions
print(B.glom().map(len).collect())

[10000, 0, 0, 0, 0, 10000, 0, 0, 0, 0]


Future operations on `B` will use only two workers. The other workers willdo nothing because their partitions are empty.

To fix this situation we need to repartition the RDD. One way to do that is to repartition using a new key

The method `.partitionBy(k)` expects to get a `(key,value)` RDD where key are integers. Partitions the RDD into `k` partitions. The element `(key,value)` is placed into partition number `key % k`

In [29]:
C=B.map(lambda pair: (pair[1]/10, pair[1])).partitionBy(10)
print(C.glom().map(len).collect())

[2000, 2000, 2000, 2000, 2000, 2000, 2000, 2000, 2000, 2000]


Another approach is to use random partitioning using `repartition(k)`

An advantage of random partitioning is that it does not require defining a key. A disadvantage of random partitioning is that you have no control on the partitioning

In [30]:
C=B.repartition(10)
print(C.glom().map(len).collect())

[2000, 2000, 2000, 2000, 2000, 2000, 2000, 2000, 2000, 2000]


**glom()**

In general, Spark does not allow the worker to refer to specific elements of the RDD. Keeps the language clean, but can be a major limitation

`glom()` transforms each partition into a tuple (immutable list) of elements. Creates an RDD of tuples, one tuple per partition.

workers can refer to elements of the partition index, but you cannot assign values to the elements, the RDD is still immutable

Now we can understand the command used above to count the number of elements in each partition.

We use `glom()` to make each partition into a tuple

We use `len` on each partition to get the length of the tuple (size of the partition)

We `collect` the results to print them out

A more elaborate example:

In [31]:
def getPartitionInfo(G):
    d=0
    if len(G) > 1:
        for i in range(len(G)-1):
            d += abs(G[i+1][1] - G[i][1])
        return (G[0][0], len(G), d)
    else:
        return (None)
output=B.glom().map(lambda B: getPartitionInfo(B)).collect()
print(output)

[(0, 10000, 99990), None, None, None, None, (5, 10000, 99990), None, None, None, None]


# Spark basics part II

**Chaining**

We can chain transformations and actions to create a computation **pipeline**: suppose we want to compute the sum of squares

In [32]:
# sequential syntax
B=sc.parallelize(range(4))
Squares=B.map(lambda x: x*x)
Squares.reduce(lambda x,y: x+y)

14

In [33]:
# cascaded syntax
B.map(lambda x: x*x)\
    .reduce(lambda x,y: x+y)

14

Both syntaxes mean exactly the same, the only difference is in the sequential syntax the intermediate RDD has a name _Squares_ whereas in the cascaded syntax the intermediate RDD is _anonymous_. The execution is identical

**Sequential execution** means perform a map, store the resulting RDD in memory, perform the reduce. The disadvantages are the intermediate result requires memory space and two scans of memory (`B` and `Squares`) double the cache misses

**Pipelined execution** perform the whole computation in a single pass. For each element of `B` compute the square and input it to the reduce operation. Advantages are less memory required (intermediate result is not stored) and faster (only one pass through the input RDD)

**Lazy evaluation** this pipelined evaluation is called lazy evaluation, lazy because computing the square is not executed immediately, instead the execution is delayed as long as possible so that several commands are executed in a single pass. The delayed commands are organized in an **execution plan**

**an instructive mistake**: here is another way to compute the sum of squares with a single reduce command:

In [34]:
C=sc.parallelize([1,1,2])
C.reduce(lambda x,y: x*x+y*y)

8

**getting information about an RDD**

printing the results is not an option...

In [35]:
n=1000000
B=sc.parallelize([1,2,3,4]*int(n/4))

In [36]:
# find the number of elements in the RDD

B.count()

1000000

In [37]:
# get first elements of RDD

print('first element', B.first())
print('first 5 elements', B.take(5))

first element 1
first 5 elements [1, 2, 3, 4, 1]


**sampling from an RDD**

aggregates such as average could be approximated efficiently by using a sample, samlping is done in parallel and requires limited computation

In [38]:
# get a sample of size m
# note the size of the sample is different in each run

m=5.
print('sample1 = ', B.sample(False, m/n).collect())
print('sample2 = ', B.sample(False, m/n).collect())

sample1 =  [4, 3, 4, 1, 3, 3]
sample2 =  [1, 4, 3, 2, 2]


**filtering** an RDD: selecting those elements of the source on which a function returns `True`

In [39]:
print('number of elements in B that are > 3', B.filter(lambda n: n>3).count())

number of elements in B that are > 3 250000


**removing duplicate elements of an RDD**: the method `RDD.distinct()` returns a new dataset that contains the distinct elements of the source dataset. This operation requires a **shuffle** in order to detectt duplication across partitions

In [40]:
DuplicateRDD=sc.parallelize([1,1,2,2,3,3])
print('DuplicateRDD=', DuplicateRDD.collect())
print('DistinctRDD=', DuplicateRDD.distinct().collect())

DuplicateRDD= [1, 1, 2, 2, 3, 3]
DistinctRDD= [1, 2, 3]


**flatMap an RDD** is similar to `map`, but each input item can be mapped to 0 or more output items (so a function should return a sequence rather than a single item)

In [41]:
text=['you are my sunshine', 'my only sunshine']
text_file=sc.parallelize(text)

# map each line in text to a list of words
print('map:', text_file.map(lambda line: line.split(' ')).collect())

# create a single list of words by combining the words from all
print('flatmap:', text_file.flatMap(lambda line: line.split(' ')).collect())

map: [['you', 'are', 'my', 'sunshine'], ['my', 'only', 'sunshine']]
flatmap: ['you', 'are', 'my', 'sunshine', 'my', 'only', 'sunshine']


**set operation**: union, subtract, cartesian in Spark

In [42]:
rdd1=sc.parallelize([1,1,2,3])
rdd2=sc.parallelize([1,3,4,5])

In [43]:
# union
rdd2=sc.parallelize(['a','b',1])
print('rdd1:', rdd1.collect())
print('rdd2:', rdd2.collect())
print('union as bags:', rdd1.union(rdd2).collect())
print('union as sets:', rdd1.union(rdd2).distinct().collect())

rdd1: [1, 1, 2, 3]
rdd2: ['a', 'b', 1]
union as bags: [1, 1, 2, 3, 'a', 'b', 1]
union as sets: [1, 'a', 2, 3, 'b']


In [44]:
# intersection
rdd2=sc.parallelize([1,1,2,5])
print('rdd1:', rdd1.collect())
print('rdd2:', rdd2.collect())
print('intersection:', rdd1.intersection(rdd2).collect())

rdd1: [1, 1, 2, 3]
rdd2: [1, 1, 2, 5]
intersection: [1, 2]


In [45]:
# subtract
rdd2=sc.parallelize([1,1,2,5])
print('rdd1:', rdd1.collect())
print('rdd2:', rdd2.collect())
print('rdd1 substracts rdd2:', rdd1.subtract(rdd2).collect())

rdd1: [1, 1, 2, 3]
rdd2: [1, 1, 2, 5]
rdd1 substracts rdd2: [3]


In [46]:
# cartesian
rdd2=sc.parallelize(['a','b'])
print('rdd1:', rdd1.collect())
print('rdd2:', rdd2.collect())
print('rdd1 cartesian rdd2:', rdd1.cartesian(rdd2).collect())

rdd1: [1, 1, 2, 3]
rdd2: ['a', 'b']
rdd1 cartesian rdd2: [(1, 'a'), (1, 'b'), (1, 'a'), (1, 'b'), (2, 'a'), (2, 'b'), (3, 'a'), (3, 'b')]


# Word count

Counting the number of occurances of words in a text is a popular first exercise using map-reduce

The task: the input consists of words separated by spaces, the output is a list of words and their counts sorted. We will use the Mobi Dick book as our input.

**Define an RDD that will read the file**

Execution of read is lazy, file has been opened, reading starts when stage is executed

In [48]:
%%time
text_file=sc.textFile('Data/mobydick.txt')
type(text_file)

CPU times: user 2.94 ms, sys: 0 ns, total: 2.94 ms
Wall time: 273 ms


**Steps for counting the words**

split lines by spaces, map `word` to `(word,1)`, count the number of occurances of each words

In [52]:
%%time
words=text_file.flatMap(lambda line: line.split(' '))
not_empty=words.filter(lambda x: x != '')
key_values=not_empty.map(lambda word: (word, 1))
counts=key_values.reduceByKey(lambda a,b: a+b)

CPU times: user 727 µs, sys: 11.1 ms, total: 11.9 ms
Wall time: 25.3 ms


**The execution plan**

In the last cell we defined the execution plan, but we haven't started to execute it. Preparing the plan took ~100ms (which is non-trivial), but much less than the time to execute it. Let's have a look at the execution plan:

In [57]:
print(counts.toDebugString().decode())

(2) PythonRDD[100] at RDD at PythonRDD.scala:48 []
 |  MapPartitionsRDD[99] at mapPartitions at PythonRDD.scala:122 []
 |  ShuffledRDD[98] at partitionBy at NativeMethodAccessorImpl.java:0 []
 +-(2) PairwiseRDD[97] at reduceByKey at <timed exec>:4 []
    |  PythonRDD[96] at reduceByKey at <timed exec>:4 []
    |  Data/mobydick.txt MapPartitionsRDD[85] at textFile at NativeMethodAccessorImpl.java:0 []
    |  Data/mobydick.txt HadoopRDD[84] at textFile at NativeMethodAccessorImpl.java:0 []


**Execution**

Now the lazy execution model finally performs some actual work, which takes a significant amount of time

In [58]:
%%time
Count=counts.count() # Count = the number of different words
Sum=counts.map(lambda x: x[1]).reduce(lambda x,y: x+y)
print('Different words=%5.0f, total words=%6.0f, mean number of occurances per word=%4.2f' % (Count, Sum, float(Sum)/Count))

Different words=19840, total words=115314, mean number of occurances per word=5.81
CPU times: user 16 ms, sys: 3.67 ms, total: 19.7 ms
Wall time: 116 ms


# Finding most common words

`counts`: RDD with 19840 pairs of the form `(word, count)`

Find the 5 most frequent words

**Method 1**: `collect` and `sort` on head node

**Method 2**: pure Spark, `collect` only at the end

**Method 1**

In [59]:
%%time
C=counts.collect()

CPU times: user 14 ms, sys: 3.87 ms, total: 17.9 ms
Wall time: 96.8 ms


In [62]:
%%time
C.sort(key=lambda x: x[1])
print('most common words \n'+'\n'.join(['%s:\t%d' % c for c in reversed(C[-5:])]))

most common words 
the:	6611
of:	3460
and:	2969
a:	2466
to:	2339
CPU times: user 6.72 ms, sys: 0 ns, total: 6.72 ms
Wall time: 6.57 ms


**Method 2**

In [68]:
%%time
# step 1: split, clean and map

word_pairs=text_file.flatMap(lambda x: x.split(' '))\
                    .filter(lambda x: x != '')\
                    .map(lambda word: (word,1))

CPU times: user 41 µs, sys: 4 µs, total: 45 µs
Wall time: 49.6 µs


In [69]:
%%time
# step 2: count occurances of each word

counts=word_pairs.reduceByKey(lambda x,y: x+y)

CPU times: user 15.6 ms, sys: 303 µs, total: 15.9 ms
Wall time: 29.2 ms


In [70]:
%%time
# step 3: reverse (word, count) to (count, word) and sort by key

reverse_counts=counts.map(lambda x: (x[1], x[0]))
sorted_counts=reverse_counts.sortByKey(ascending=False)

CPU times: user 31 ms, sys: 967 µs, total: 31.9 ms
Wall time: 443 ms


**full execution plan**

But we have to execute it! Not even a single byte has been read from Moby Dick's

In [71]:
print(counts.toDebugString().decode())

(2) PythonRDD[112] at RDD at PythonRDD.scala:48 []
 |  MapPartitionsRDD[111] at mapPartitions at PythonRDD.scala:122 []
 |  ShuffledRDD[110] at partitionBy at NativeMethodAccessorImpl.java:0 []
 +-(2) PairwiseRDD[109] at reduceByKey at <timed exec>:3 []
    |  PythonRDD[108] at reduceByKey at <timed exec>:3 []
    |  Data/mobydick.txt MapPartitionsRDD[85] at textFile at NativeMethodAccessorImpl.java:0 []
    |  Data/mobydick.txt HadoopRDD[84] at textFile at NativeMethodAccessorImpl.java:0 []


In [72]:
%%time

D=sorted_counts.take(5)
print('most common words\n'+'\n'.join(['%d:\t%s' % c for c in D ]))

most common words
6611:	the
3460:	of
2969:	and
2466:	a
2339:	to
CPU times: user 11.3 ms, sys: 289 µs, total: 11.6 ms
Wall time: 228 ms
