## Spark Basics

This notebook will introduce you to two fundamental objects in Spark:

* The Spark Context
* The Resilient Distributed DataSet or RDD

### Spark Context
* Spark is complex distributed doftware. 
* The python interface to spark is called **pyspark**
* **SparkContext** is a python class, defined as part of **pyspark** which manages the communication between the user's program and spark.

We start by creating a **SparkContext** object named **sc**. In this case we create a spark context that uses 3 *executors*

In [137]:
from pyspark import SparkContext
sc = SparkContext(master="local[3]")
sc

<pyspark.context.SparkContext at 0x10855f690>

### Only one sparkContext at a time!
When you run spark in local mode, you can have only a single context at a time. Therefor, if you want to use spark in a second notebook, you should first stop the one you are using here. This is what the method `.stop()` is for.

In [138]:
# sc.stop() #commented out so that you don't stop your context by mistake

<h3>RDDs</h3>

<p>RDD (or Resilient Distributed DataSet) is the main novel data structure in Spark. You can think of it as a list whose elements are stored on several computers.</p>

<p><img alt="" src="Figures/SparkContextAndRDD.jpg" style="height:324px; width:900px" /></p>


The elements of each `RDD` are distributed across the **worker nodes** which are the nodes that perform the actual computations. This notebook, however, is running on the **Driver node**. As the RDD is not stored on the driver-node you cannot access it directly. The variable name `RDD` is really just a pointer to a python object which holds the information regardnig the actual location of the elements.

#### Parallelize 
* Simplest way to create an RDD.
* The method `A=sc.parallelize(L)`, creates an RDD named `A` from list `L`.
* `A` is an RDD of type `ParallelCollectionRDD`.

In [139]:
A=sc.parallelize(range(3))
A

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:423

#### Collect

* RDD content is distributed among all executors.
* `collect()` is the inverse of `parallelize()'
* collects the elements of the RDD
* Returns a list


In [140]:
L=A.collect()
print type(L)
print L

<type 'list'>
[0, 1, 2]


### Using `.collect()` eliminates the benefits of parallelism
It is often tempting to `.collect()` and RDD, make it into a list, and then process the list using standard python. However, note that this means that you are using only the head node to perform the computation which means that you are not getting any benefit from spark.

Using RDD operations, as described below, **will** make use of all of the computers at your disposal.

### Map
* applies a given operation to each element of an RDD
* parameter is the function defining the operation.
* returns a new RDD.
* Operation performed in parallel on all executors.
* Each executor operates on the data **local** to it.

**Note:** Here we are using **lambda** functions, later we will see that regular functions can also be used.

For more on lambda function see [here](http://www.secnetix.de/olli/Python/lambda_functions.hawk)

In [141]:
A.map(lambda x: x*x).collect()

[0, 1, 4]

#### Excercise 1

1. Write a function called `mapcos` that has a single paramater: an RDD of numbers, computes the `cos()` (cosine) and returns it as a list.

```cos(A)``` should produce the output
    
```
    [1.0, 0.54030..., -0.41614...]
```

**Approximate answers** The reason that the numbers above end with `...` is that, when computing with floating point numbers, there are always roundoff errors, and the roundoff errors may depend on the architecture of the computer you are using. To avoid this problem, we test the answers you produce within a tolerance of 0.1%, in other words if your answer is $a$ and the correct answer is $b$ then your answer is considered correct if
$$ \left| \frac{a-b}{a} \right| < 0.001$$

In [142]:
import numpy as np
def mapcos(A):
    return A.map(np.cos).collect()
mapcos(A)

[1.0, 0.54030230586813977, -0.41614683654714241]

In [143]:
def very_close_lists(A,B,tol=0.001):
    ''' Check that the two firs parameters are lists of equal length 
    and then check'''
    assert type(A)==list and type(B) ==list
    assert len(A)==len(B)
    for i in range(len(A)):
        a=A[i]; b=B[i]
        if a==0:
            a+=1; b+=1
        else:
            r=(a-b)/a
            assert abs(r)<tol
            
very_close_lists(mapcos(sc.parallelize(range(3))),[1.0, 0.5403, -0.4161])

### Excercise 2
Consider the following RDD: 

```
stringRDD=sc.parallelize(["Spring quarter", "Learning spark basics", "Big data analytics with Spark"])
```
   Write a function called `mapwords` that has a single paramater: an RDD of strings, and returns a list of words for each string.  
    ```mapwords(MapString)``` should produce the output:
    
``` 
[['Spring', 'quarter'], ['Learning', 'spark', 'basics'], ['Big', 'data', 'analytics', 'with', 'Spark']]
```

### Reduce

* Takes RDD as input, returns a single value.
* **Reduce operator** takes **two** elements as input returns **one** as output.
* Repeatedly applies a **reduce operator**
* Each executor reduces the data local to it.
* The results from all executors are combined.

The simplest example of a 2-to-1 operation is the sum:

In [144]:
A.reduce(lambda x,y: x+y)

3

Here is an example of a reduce operation that finds the shortest string in an RDD of strings.

In [145]:
words=['this','is','the','best','mac','ever']
wordRDD=sc.parallelize(words)
wordRDD.reduce(lambda w,v: w if len(w)<len(v) else v)

'is'

#### Exercise 2

1. Write a `reduce` command that outputs the maximum number from a list of numbers. Your command should produce the following output on the input ```RDD=sc.parallelize([0,2,1])```

   Output: ``` 2 ```
   

2. Consider the stringRDD defined in Exercise. Write a `reduce` command to produce a single string which is the concatenation of all the strings in stringRDD(with a space between each string). You output should look like:

    Output: ``` 'Spring quarter Learning spark basics Big data analytics with Spark' ```

### Using regular functions instead of lambda functions

* lambda function are short and sweet.
* but sometimes it's hard to use just one line.
* We can use full-fledged functions instead.

suppose we want to find the 
* last word in a lexicographical order 
* among 
* the longest words in the list.

We could achieve that as follows

In [146]:
def largerThan(x,y):
    if len(x)>len(y): return x
    elif len(y)>len(x): return y
    else:  #lengths are equal, compare lexicographically
        if x>y: 
            return x
        else: 
            return y
        
wordRDD.reduce(largerThan)

'this'

#### Exercise 3

1. Consider the following RDD:

    ``` listRDD=sc.parallelize([[3,4],[2,1],[7,9]]) ```
 
     Write a regular function with `reduce` command to output the maximum element from a set of lists. Your output should look like:
     
     Output: ```[9]```
     
     (Note: The output is a list containing a single number rather than just a single number)

<h3>Reduce operations <strong>must not depend on the order</strong></h3>

<ul>
	<li>Order of operands should not matter</li>
	<li>Order of application of reduce operator should not matter</li>
</ul>

<p>Multiplication and summation are good:</p>

<h1>&nbsp; &nbsp; &nbsp; &nbsp; 1 + 3 + 5 + 2 &nbsp; &nbsp; &nbsp;5 + 3 + 1 + 2 &nbsp;</h1>


<p>Division and subtraction are bad:</p>

<h1>&nbsp; &nbsp; &nbsp; &nbsp; 1 - 3 - 5 - 2 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;1 - 3 - 5 - 2 &nbsp;</h1>


### Why must reordering not change the result?

You can think about the reduce operation as a binary tree where the leaves are the elements of the list and the root is the final result. Each triplet of the form (parent, child1, child2) corresponds to a single application of the reduce function. 

The order in which the reduce operation is applied is **determined at run time** and depends on how the RDD is partitioned across the cluster.
There are many different orders to apply the reduce operation. 

If we want the input RDD to uniquely determine the reduced value **all evaluation orders must must yield the same final result**. In addition, the order of the elements in the list must not change the result. In particular, reversing the order of the operands in a reduce function must not change the outcome. 

For example the arithmetic operations multiply `*` and add `+` can be used in a reduce, but the operations subtract `-` and divide `/` should not.

Doing so will not raise an error, but the result is unpredictable.

Here is a example  
Which of these the following orders was executed?
* $$((1-3)-5)-2$$ or
* $$(1-3)-(5-2)$$

In [147]:
B=sc.parallelize([1,3,5,2])
B.reduce(lambda x,y: x-y)

-5

### Combining operations

The method `map` takes as input an RDD and returns an RDD. Similarly the method `reduce` takes as input an RDD and returns a single element. 

We can combine map and reduce operations to perform more complex operations.

Suppose we want to compute the sum of the squares
$$ \sum_{i=1}^n x_i^2 $$
where the elements $x_i$ are stored in an RDD.

Traditional syntax: 
* perform the map
* store the intermediate result in a variable
* perform the reduce

In [148]:
#separate commands
Squares=B.map(lambda x:x*x)
Squares.reduce(lambda x,y:x+y)

39

Or we can combine them into a single cascaded command

In [149]:
#cascaded commands
B.map(lambda x:x*x)\
   .reduce(lambda x,y:x+y)

39

These two expressions are equivalent, and we might expect that the more basic one is the first, where the commands 
are separate, and that the python compiler translates the cascaded commands into machine code that corresponds to the separate commands.

It turns out that the opposite is true, it is the cascaded form that is closer to the machine code, and spark identifies cascading operations even when they are expressed in a non-cascaded way.

The explanation of this surprising behaviour is related to the notion of lazy evaluation in scala and is explained in [spark programming guide/RDD operations](http://spark.apache.org/docs/latest/programming-guide.html#rdd-operations)

### An instructive mistake
Here is another way to compute the sum of the squares using a single reduce command. What is wrong with it?

In [150]:
C=sc.parallelize([1,1,1])
C.reduce(lambda x,y: x*x+y*y)


5

<h1>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 1 &nbsp; &nbsp; 1 &nbsp; &nbsp; 1<br />
&nbsp;</h1>


#### Exercise 4.

1. Consider the listRDD given in Exercise 3. Find the sum of maximum numbers of all lists. Your output should be:

    Output: ``` 15 ```

### getting information about an RDD
RDD's typically have hundreds of thousands of elements. It usually makes no sense to print out the content of a whole RDD. Here are some ways to get manageable amounts of information about an RDD

In [151]:
n=1000000
B=sc.parallelize([0,0,1,0]*(n/4))

In [152]:
#find the number of elements in the RDD
B.count()

1000000

In [153]:
# get the first few elements of an RDD
print 'first element=',B.first()
print 'first 5 elements = ',B.take(5)

first element= 0
first 5 elements =  [0, 0, 1, 0, 0]


#### Sampling an RDD
* RDDs are often very large.
* Aggregates, such as averages, can be approximated efficiently by using a sample.
* Sampling is done in parallel and it keeps the data local.

The method `RDD.sample(withReplacement,p)` generates a sample of the elements of the RDD. where
- `withReplacement` is a boolean flag indicating whether or not a an element in the RDD can be sampled more than once.
- `p` is the probability of accepting each element into the sample. Note that as the sampling is performed independently in each partition, the number of elements in the sample changes from sample to sample.

In [154]:
# get a sample whose expected size is m
m=5.
B.sample(False,m/n).collect()

[0, 0, 1]

In [155]:
#compute the average exactly:
exact=B.reduce(lambda x,y:x+y)/(n+0.0)
print 'exact average', exact
#compute the average by sampling 1% of the elements
p=0.01
approx=B.sample(False,p).reduce(lambda x,y:x+y)/(n*p)
print 'approximate average',approx
print 'error=',approx-exact


exact average 0.25
approximate average 0.2462
error= -0.0038


#### Things to note and think about
* Each time you run the previous cell, you get a different estimate
* The accuracy of the estimate is determined by the size of the sample $n*p$
* See how the error changes as you vary $p$
* Can you give a formula that relates the variance of the estimate to $(p*n)$ ? (The answer is in the Probability and statistics course).

#### filtering an RDD
The method `RDD.filter(func)` Return a new dataset formed by selecting those elements of the source on which func returns true.


In [156]:
# How many positive numbers?
B.filter(lambda n: n > 0).count()

250000

#### Exercise 5

1. Write a `filter` command to output elements whose cosine is positive. Your command should produce the following output on ` RDD=sc.parallelize([0,2,1]) `:

    ` [0,1] `
    
    
2. Write a `filter` command to output all words whose length is greater than or equal to 4. Your command should produce the following output on ` wordRDD=sc.parallelize(['this','is','the','best','mac','ever']) `:

    ` ['this', 'best', 'ever'] `

#### Remove duplicate elements in RDD
The method `RDD.distinct(numPartitions=None)` Returns a new dataset that contains the distinct elements of the source dataset 

* The number of partitions is specified through the **numPartitions** argument. Each of this partitions is potentially on different machine.


In [157]:
# Remove duplicate element in DuplicateRDD, we get distinct RDD
DuplicateRDD = sc.parallelize([1,1,2,2,3,3])
DistinctRDD = DuplicateRDD.distinct()
DistinctRDD.collect()


[3, 1, 2]

#### flatmap an RDD
The method `RDD.flatMap(func)` is similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).

In [158]:
text=["you are my sunshine","my only sunshine"]
text_file = sc.parallelize(text)
# map each line in text to a list of words
print 'map:',text_file.map(lambda line: line.split(" ")).collect()
# create a single list of words by combining the words from all of the lines
print 'flatmap:',text_file.flatMap(lambda line: line.split(" ")).collect()

map: [['you', 'are', 'my', 'sunshine'], ['my', 'only', 'sunshine']]
flatmap: ['you', 'are', 'my', 'sunshine', 'my', 'only', 'sunshine']


#### Exercise 6

1. Write a `flatMap` command to collect all the elements from 1 to x for each element x in a list. Your command should produce the following output on `RDD=sc.parallelize([2,3,5])`:

    ``` [1, 2, 1, 2, 3, 1, 2, 3, 4, 5] ```

### Set operations
In this part, we explore set operations including **union**,**intersection**,**subtract**, **cartesian** in pyspark

In [159]:
rdd1 = sc.parallelize([1, 1, 2, 3])
rdd2 = sc.parallelize([1, 3, 4, 5])


1. union(other)
 * Return the union of this RDD and another one.
 

 rdd1.union(rdd2).collect()

2. intersection(other)
 * Return the intersection of this RDD and another one. The output will not contain any duplicate elements, even if the input RDDs did.Note that this method performs a shuffle internally.

In [160]:
rdd1.intersection(rdd2).collect()

[1, 3]

3. subtract(other, numPartitions=None)
 * Return each value in self that is not contained in other.

In [161]:
rdd1.subtract(rdd2).collect()

[2]

4. cartesian(other)
 * Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of elements (a, b) where **a** is in **self** and **b** is in **other**.

In [162]:
print rdd1.cartesian(rdd2).collect()

[(1, 1), (1, 3), (1, 4), (1, 5), (1, 1), (1, 3), (1, 4), (1, 5), (2, 1), (3, 1), (2, 3), (3, 3), (2, 4), (2, 5), (3, 4), (3, 5)]


#### Exercise 7

Consider the following RDDs: 

` RDD1=sc.parallelize(["spark basics", "big data analysis", "spring"]) `

` RDD2=sc.parallelize(["spark using pyspark", "big data"]) `

Use the set operations to produce the following outputs:

* ` ['spark', 'basics', 'big', 'data', 'analysis', 'spring', 'spark', 'using', 'pyspark', 'big', 'data'] `
* ` ['data', 'big', 'spark'] `
* ` ['spring', 'analysis', 'basics'] `
* ` [('spark', 'spark'), ('spark', 'using'), ('spark', 'pyspark'), ('basics', 'spark'), ('basics', 'using'), ('basics', 'pyspark'), ('spark', 'big'), ('spark', 'data'), ('basics', 'big'), ('basics', 'data'), ('big', 'spark'), ('big', 'using'), ('big', 'pyspark'), ('data', 'spark'), ('analysis', 'spark'), ('data', 'using'), ('data', 'pyspark'), ('analysis', 'using'), ('analysis', 'pyspark'), ('spring', 'spark'), ('spring', 'using'), ('spring', 'pyspark'), ('big', 'big'), ('big', 'data'), ('data', 'big'), ('analysis', 'big'), ('data', 'data'), ('analysis', 'data'), ('spring', 'big'), ('spring', 'data')]
 `    