# SparkContext and RDD basics

Spark revolves around the concept of a resilient distributed dataset (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel. There are two ways to create RDDs: parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.

### Import libraries

In [0]:
pip install pyspark

In [0]:
from pyspark import SparkContext
import numpy as np

## Initialize a `SparkContext` (the main abstraction to the cluster)
**Note the '4' in the argument. It denotes 4 cores to be used for this SparkContext object.**

In [0]:
sc=SparkContext(master="local[4]")

[0;31m---------------------------------------------------------------------------[0m
[0;31mValueError[0m                                Traceback (most recent call last)
[0;32m<command-3704061629036228>[0m in [0;36m<module>[0;34m[0m
[0;32m----> 1[0;31m [0msc[0m[0;34m=[0m[0mSparkContext[0m[0;34m([0m[0mmaster[0m[0;34m=[0m[0;34m"local[4]"[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/spark/python/pyspark/context.py[0m in [0;36m__init__[0;34m(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer, conf, gateway, jsc, profiler_cls)[0m
[1;32m    143[0m                 " is not allowed as it is a security risk.")
[1;32m    144[0m [0;34m[0m[0m
[0;32m--> 145[0;31m         [0mSparkContext[0m[0;34m.[0m[0m_ensure_initialized[0m[0;34m([0m[0mself[0m[0;34m,[0m [0mgateway[0m[0;34m=[0m[0mgateway[0m[0;34m,[0m [0mconf[0m[0;34m=[0m[0mconf[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m    146[0m

In [0]:
print(sc)

<SparkContext master=local[8] appName=Databricks Shell>


### Generate a list of random integeres

In [0]:
lst=np.random.randint(0,10,20)

In [0]:
print(lst)

[3 6 4 0 1 0 1 6 5 1 7 4 6 8 6 3 7 3 2 2]


### Parallelize the list - this is the main operation toward distributed computing

In [0]:
A=sc.parallelize(lst)

### What did we just do? We created a RDD? What is a RDD?
![](https://i.stack.imgur.com/cwrMN.png)
Spark revolves around the concept of a resilient distributed dataset (RDD), which is a **fault-tolerant collection of elements that can be operated on in parallel**. SparkContext manages the distributed data over the worker nodes through the cluster manager. 

There are two ways to create RDDs: 
* parallelizing an existing collection in your driver program, or 
* referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.

We created a RDD using the former approach

### `A` is a pyspark RDD object, we cannot access the elements directly

In [0]:
type(A)

Out[7]: pyspark.rdd.RDD

In [0]:
A

Out[8]: ParallelCollectionRDD[0] at readRDDFromInputStream at PythonRDD.scala:413

### Opposite to parallelization - `collect` brings all the distributed elements and returns them to the head node. <br><br>Note - this is a slow process, do not use it often.

In [0]:
A.collect()

Out[9]: [3, 6, 4, 0, 1, 0, 1, 6, 5, 1, 7, 4, 6, 8, 6, 3, 7, 3, 2, 2]

### How were the partitions created? Use `glom` method

In [0]:
A.glom().collect()

Out[10]: [[3, 6], [4, 0], [1, 0], [1, 6, 5, 1], [7, 4], [6, 8], [6, 3], [7, 3, 2, 2]]

### Now stop the SC and reinitialize it with 2 cores and see what happens when you repeat the process!

In [0]:
sc.stop()

In [0]:
sc=SparkContext(master="local[2]")

In [0]:
A = sc.parallelize(lst)

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
[0;32m<command-3704061629036246>[0m in [0;36m<module>[0;34m[0m
[0;32m----> 1[0;31m [0mA[0m [0;34m=[0m [0msc[0m[0;34m.[0m[0mparallelize[0m[0;34m([0m[0mlst[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;31mNameError[0m: name 'lst' is not defined

In [0]:
A.glom().collect()

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
[0;32m<command-3704061629036247>[0m in [0;36m<module>[0;34m[0m
[0;32m----> 1[0;31m [0mA[0m[0;34m.[0m[0mglom[0m[0;34m([0m[0;34m)[0m[0;34m.[0m[0mcollect[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;31mNameError[0m: name 'A' is not defined

**The RDD is now distributed over two chunks, not four!** 

So, let's redo the process with 4 cores again.

In [0]:
sc.stop()

In [0]:
sc = SparkContext(master="local[4]")

In [0]:
A = sc.parallelize(lst)

## Basic operations
### `Count` the elements

In [0]:
A.count()

### The first element (`first`) and the first few elements (`take`)

In [0]:
A.first()

Out[26]: 5

In [0]:
A.take(4)

Out[27]: [5, 9, 2, 8]

### Removing duplicates: Get another RDD with only the `distinct` elements

The method `RDD.distinct()` Returns a new dataset that contains the distinct elements of the source dataset.

**NOTE**: This operation requires a **shuffle** in order to detect duplication across partitions. **So, it is a slow operation.**

In [0]:
A_distinct=A.distinct()

In [0]:
A_distinct.collect()

Out[29]: [8, 4, 5, 9, 1, 2, 6, 7, 3]

### To sum all the elements use `reduce` method

In [0]:
A.reduce(lambda x,y:x+y)

Out[30]: 106

### Or direct `sum` method

In [0]:
A.sum()

Out[31]: 106

### Or using the `fold` method, which aggregates the elements of each partition, and then the results for all the partitions

In [0]:
A.fold(0,lambda x,y:x+y)

Out[25]: 80

### Finding maximum element by `reduce`

In [0]:
A.reduce(lambda x,y: x if x > y else y)

Out[32]: 9

### Finding longest word using `reduce`

In [0]:
words = 'These are some of the best Macintosh computers ever'.split(' ')
wordRDD = sc.parallelize(words)
wordRDD.reduce(lambda w,v: w if len(w)>len(v) else v)

Out[33]: 'computers'

## Functions/filtering over RDD
### Use `filter` to return a new RDD with elements satisfying a given predicate (lambda expression)

In [0]:
# Return RDD with elements divisible by 3
A.filter(lambda x:x%3==0).collect()

Out[34]: [9, 9, 6, 6, 3, 9, 6]

### Lambda functions are short and sweet but we can write regular Python functions to use with `reduce`

In [0]:
def largerThan(x,y):
    """
    Returns the last word among the longest words in a list
    """
    if len(x)> len(y):
        return x
    elif len(y) > len(x):
        return y
    else:
        if x < y: return x
        else: return y

In [0]:
wordRDD.reduce(largerThan)

Out[36]: 'Macintosh'

## Sampling an RDD
* RDDs are often very large.
* **Aggregates, such as averages, can be approximated efficiently by using a sample.** This comes handy often for operation with extremely large datasets where a sample can tell a lot about the pattern and descriptive statistics of the data.
* Sampling is done in parallel and requires limited computation.

The method `RDD.sample(withReplacement,p)` generates a sample of the elements of the RDD. where
- `withReplacement` is a boolean flag indicating whether or not a an element in the RDD can be sampled more than once.
- `p` is the probability of accepting each element into the sample. Note that as the sampling is performed independently in each partition, the number of elements in the sample changes from sample to sample.

In [0]:
# get a sample whose expected size is m
# Note that the size of the sample is different in different runs
m=5
n=20
print('sample1=',A.sample(False,m/n).collect()) 
print('sample2=',A.sample(False,m/n).collect())
print('sample3=',A.sample(False,m/n).collect())
print('sample4=',A.sample(False,m/n).collect())

sample1= [2, 8, 8, 1, 9, 5]
sample2= [7, 9, 6, 3, 1]
sample3= [9, 8, 9, 1, 6, 5]
sample4= [9, 2, 6, 6, 1]


### Things to note and think about
* Each time you run the previous cell, you get a different estimate
* The accuracy of the estimate is determined by the size of the sample $n*p$. Here, probability $p=\frac{m}{n}$
* See how the error changes as you vary $p$

## Basic statistics

In [0]:
print("Maximum: ",A.max())
print("Minimum: ",A.min())
print("Mean (average): ",A.mean())
print("Standard deviation: ",A.stdev())

Maximum:  9
Minimum:  1
Mean (average):  5.3
Standard deviation:  2.8478061731796283


In [0]:
A.stats()

Out[33]: (count: 20, mean: 4.0, stdev: 2.9832867780352594, max: 9.0, min: 0.0)

## Mapping
### `map` operation with _lambda_ function

In [0]:
B=A.map(lambda x:x*x)

In [0]:
B.collect()

Out[35]: [16, 64, 4, 4, 16, 49, 0, 9, 9, 81, 4, 36, 0, 0, 1, 49, 25, 1, 81, 49]

### `map` operation with regular Python function

In [0]:
def square_if_odd(x):
    if x%2==1:
        return x*x
    else:
        return x

In [0]:
A.map(square_if_odd).collect()

Out[43]: [25, 81, 2, 8, 49, 8, 4, 8, 1, 81, 1, 1, 6, 6, 9, 81, 6, 1, 49, 25]

### `flatmap` method returns a new RDD by first applying a function to all elements of this RDD, and then flattening the results

In [0]:
A.flatMap(lambda x:(x,x*x)).collect()

Out[44]: [5,
 25,
 9,
 81,
 2,
 4,
 8,
 64,
 7,
 49,
 8,
 64,
 4,
 16,
 8,
 64,
 1,
 1,
 9,
 81,
 1,
 1,
 1,
 1,
 6,
 36,
 6,
 36,
 3,
 9,
 9,
 81,
 6,
 36,
 1,
 1,
 7,
 49,
 5,
 25]

## Grouping and binning
### `groupby` returns a RDD of grouped elements (iterable) as per a given group operation (function)

In [0]:
result=A.groupBy(lambda x:x%2).collect()
print(A.collect())
#print(sorted(result[0][1]))
sorted([(x, sorted(y)) for (x, y) in result])

[5, 9, 2, 8, 7, 8, 4, 8, 1, 9, 1, 1, 6, 6, 3, 9, 6, 1, 7, 5]


Out[52]: [(0, [2, 4, 6, 6, 6, 8, 8, 8]), (1, [1, 1, 1, 1, 3, 5, 5, 7, 7, 9, 9, 9])]

### `histogram` method takes a list of bins/buckets and returns a tuple with result of the histogram (binning)

In [0]:
A.histogram([x for x in range(0,100,10)])

Out[54]: ([0, 10, 20, 30, 40, 50, 60, 70, 80, 90], [20, 0, 0, 0, 0, 0, 0, 0, 0])

## Set operations
### Create smaller RDDs to demonstrate joint operations

In [0]:
lst1=np.random.randint(0,10,3)
C=sc.parallelize(lst1)
lst2=np.random.randint(10,20,3)
D=sc.parallelize(lst2)
print("C:",C.collect())
print("D:",D.collect())

C: [4, 0, 2]
D: [13, 14, 11]


### `C+D` gives the union (like set union), not the element wise sum

In [0]:
(C+D).collect()

Out[57]: [4, 0, 2, 13, 14, 11]

### `cartesian` gives the pairwise product (as tuples)

In [0]:
C.cartesian(D).collect()

Out[58]: [(4, 13),
 (4, 14),
 (4, 11),
 (0, 13),
 (0, 14),
 (0, 11),
 (2, 13),
 (2, 14),
 (2, 11)]

### `intersection` and `subtract `methods return a RDD of the set intersection and subtraction (difference)

In [0]:
rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5])
rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8])
rdd1.intersection(rdd2).collect()

Out[59]: [1, 2, 3]

In [0]:
rdd1.subtract(rdd2).collect()

Out[60]: [10, 4, 5]

### Stop the `SparkContext` at the end

In [0]:
sc.stop()