##  We are getting started with Appache Spark! 

For the very basics, look at the following site: :
* https://spark.apache.org/docs/0.9.0/python-programming-guide.html


Credit for this notebook: https://www.analyticsvidhya.com/blog/2016/09/comprehensive-introduction-to-apache-spark-rdds-dataframes-using-pyspark/
and 
https://www.analyticsvidhya.com/blog/2016/10/using-pyspark-to-perform-transformations-and-actions-on-rdd/

Other helpful ressources: 
* https://spark.apache.org/
* Book: Learning Spark (O'Reilly), for a lot of details:
https://www.oreilly.com/library/view/learning-spark/9781449359034/
(pdf online)
* https://www.tutorialspoint.com/pyspark/index.htm

First we create a context, this means we are telling python to start a connection to spark, using the pyspark API.

# Apache Spark data representations: RDD / Dataframe / Dataset

Spark has three data representations: RDD, Dataframe, Dataset. For each data representation, Spark has a different API. For example, later we are going to use ml (a library), which currently supports only Dataframe API. Dataframe is much faster than RDD because it has metadata (some information about data) associated with it, which allows Spark to optimize the query plan. The Dataframe feature in Apache Spark was added in Spark 1.3. 

In this notebook, we will spend some time on RDD, to get you started with Apache Spark. Later, we will spend some time on Dataframes. Dataframes share some common characteristics with RDD (transformations and actions). We arenot going to talk about Dataset as this functionality is not included in PySpark.

### RDD:
After installing and configuring PySpark, we can start programming using Spark in Python. But to use Spark functionality, we must use RDDs. A RDD (Resilient Distributed Database) is a collection of elements, that can be divided across multiple nodes in a cluster to run parallel processing. It is also fault tolerant collection of elements, which means it can automatically recover from failures. RDD is immutable, we can create a RDD once but can’t change it. We can apply any number of operation on it and can create another RDD by applying some transformations. Here are a few things to keep in mind about RDD:

We can apply 2 types of operations on RDDs:

* Transformation: Transformation refers to the operation applied on a RDD to create a new RDD.
* Action: Actions refer to an operation which also apply on RDD that perform computation and send the result back to driver.

Example: Map (Transformation) performs operation on each element of RDD and returns a new RDD. But, in case of Reduce (Action), it reduces / aggregates the output of a map by applying some functions (Reduce by key). There are many transformations and actions that are defined in Apache Spark documentation.

RDDs use Shared Variables:
The parallel operations in Apache Spark use shared variable. It means that whenever a task is sent by a driver to executors program in a cluster, a copy of shared variable is sent to each node in a cluster, so that they can use this variable while performing tasks. Accumulator and Broadcast are the two types of shared variables supported by Apache Spark.
Broadcast: We can use the Broadcast variable to save the copy of data across all node.
Accumulator: In Accumulator variables are used for aggregating the information.

 

How to Create a RDD in Apache Spark

Existing storage: When we want to create a RDD though existing storage in driver program (which we would like to be parallelized). For example, converting a list to RDD, which is already created in a driver program.

External sources: When we want to create a RDD though external sources such as a shared file system, HDFS, HBase, or any data source offering a Hadoop Input Format.

 

Writing a first program in Apache Spark

We have already discussed that RDD supports two type of operations, which are transformation and action. Let us get down to writing our first program:

## Step 1: Create SparkContext

First step in any Apache programming is to create a SparkContext. SparkContext is needed when we want to execute operations in a cluster. SparkContext tells Spark how and where to access a cluster. It is first step to connect with Apache Cluster. If you are using Spark Shell, we will find that this is already created. Otherwise, we can create the Spark Context by importing, initializing and providing the configuration settings. For example:

In [1]:
from pyspark import SparkContext
sc = SparkContext()

## Step 2: Create a RDD

We can create RDD in two ways: Either from an existing storage or from an external storage. Let’s create our first RDD. SparkContext has the parallelize method, which is used for creating the Spark RDD from an iterable (like list, tuple..) already present in driver program.

We can also provide the number of partitions as a parameter to parallelize method. If we do not give number of partition parameter, then Spark will automatically set the number of partition in a cluster. The number of partition can be set manually by passing a second parameter to the parallelize method. For example, sc.parallelize(data, 10)), where data is an existing data in driver program and 10 is the number of partitions.
Lets create the first Spark RDD called rdd.

In [2]:
data = range(1,1000) # a python object
rdd = sc.parallelize(data)

In [3]:
rdd.collect() # returns the data
# also :rdd.take(10) what does this command do?

[1,
 2,
 3,
 4,
 5,
 6,
 7,
 8,
 9,
 10,
 11,
 12,
 13,
 14,
 15,
 16,
 17,
 18,
 19,
 20,
 21,
 22,
 23,
 24,
 25,
 26,
 27,
 28,
 29,
 30,
 31,
 32,
 33,
 34,
 35,
 36,
 37,
 38,
 39,
 40,
 41,
 42,
 43,
 44,
 45,
 46,
 47,
 48,
 49,
 50,
 51,
 52,
 53,
 54,
 55,
 56,
 57,
 58,
 59,
 60,
 61,
 62,
 63,
 64,
 65,
 66,
 67,
 68,
 69,
 70,
 71,
 72,
 73,
 74,
 75,
 76,
 77,
 78,
 79,
 80,
 81,
 82,
 83,
 84,
 85,
 86,
 87,
 88,
 89,
 90,
 91,
 92,
 93,
 94,
 95,
 96,
 97,
 98,
 99,
 100,
 101,
 102,
 103,
 104,
 105,
 106,
 107,
 108,
 109,
 110,
 111,
 112,
 113,
 114,
 115,
 116,
 117,
 118,
 119,
 120,
 121,
 122,
 123,
 124,
 125,
 126,
 127,
 128,
 129,
 130,
 131,
 132,
 133,
 134,
 135,
 136,
 137,
 138,
 139,
 140,
 141,
 142,
 143,
 144,
 145,
 146,
 147,
 148,
 149,
 150,
 151,
 152,
 153,
 154,
 155,
 156,
 157,
 158,
 159,
 160,
 161,
 162,
 163,
 164,
 165,
 166,
 167,
 168,
 169,
 170,
 171,
 172,
 173,
 174,
 175,
 176,
 177,
 178,
 179,
 180,
 181,
 182,
 183,
 184,
 185

In [4]:
rdd.take(2) # It will print first 2 elements of rdd

[1, 2]

In [5]:
# what is the return type of the above?

We have 2 parallel operations in RDD which are Transformation and Action. Transformation and Action were already discussed briefly earlier. So let’s see how transformation works. Remember that RDDs are immutable – so we can’t change our RDD, but we can apply transformation on it. Let’s see an example of map transformation to demonstrate how transformation works.

## Step 3: Map transformation.

Map transformation returns a Mapped RDD by applying function to each element of the base RDD. Let’s repeat the first step of creating a RDD from existing source, For example,

In [6]:
data = ['Hello' , 'I' , 'am', 'a', 'big', 'data', 'engineer!']
Rdd = sc.parallelize(data)

Now a RDD (name is ‘Rdd’) is created from the existing source, which is a list of strings in a driver program. We will now apply a lambda function to each element of Rdd and return the mapped (transformed) RDD (word,1) pair in the Rdd1.

In [7]:
Rdd1 = Rdd.map(lambda x: (x,1))

Lazy evaluation, the previous statement is only evaluated when we perform the action:

In [8]:
Rdd1.collect()

[('Hello', 1),
 ('I', 1),
 ('am', 1),
 ('a', 1),
 ('big', 1),
 ('data', 1),
 ('engineer!', 1)]

Nothing happened after applying the lambda function on Rdd1 (we won’t see any computation happening in a cluster). This is called the lazy operation. All transformation operations in Spark are lazy, which means that we will not see any computations on RDD, until we need them for further action.

Spark remembers which transformation is applied to which RDD with the help of DAG (Directed a Cyclic Graph). The lazy evaluation helps Spark to optimize the solution because Spark will get time to see the DAG before actually executing the operations on RDD. This enables Spark to run operations more efficiently.

In the code above, collect() and take() are the examples of an action.

There are many number of transformation defined in Apache Spark. 


* We can create a RDD in two different ways, from existing source and external source.
* We can apply two types of operations on RDD, namely “transformation” and “action”. All transformations on RDD are lazy in nature, which means that computations on RDD are not done until we apply an action.
* RDDs are immutable in nature i.e. we cannot change the RDD, we need to transform it by applying transformation(s). There are various transformations and actions, which can be applied on RDD.


What is Transformation and Action?

Spark has certain operations which can be performed on RDD. An operation is a method, which can be applied on a RDD to accomplish certain tasks. RDD supports two types of operations, which are Action and Transformation. An operation can be something as simple as sorting, filtering and summarizing data.

Let’s take few examples to understand the concept of transformation and action better. Let’s assume, we want to develop a machine learning model on a data set. Before applying a machine learning model, we will need to perform certain tasks:

* Understand the data ( List out the number of columns in data and their type)
* Preprocess the data (Remove null value observations on data).
* Filter the data (Let’s say, we want to filter the observations corresponding to males data)
* Fill the null values in data ( Filling the null values in data by constant, mean, median, etc)
* Calculate the features in data

All the above mentioned tasks are examples of an operation. In Spark, operations are divided into 2 parts – one is transformation and second is action. Find below a brief descriptions of these operations.

Transformation: Transformation refers to the operation applied on a RDD to create new RDD. Filter, groupBy and map are the examples of transformations.

Actions: Actions refer to an operation which also applies on RDD, that instructs Spark to perform computation and send the result back to driver. This is an example of action.

The Transformations and Actions in Apache Spark are divided into 4 major categories:

* General
* Mathematical and Statistical
* Set Theory and Relational
* Data-structure and IO

 
Applying Transformation and Action

To understand the operations, we are going to use the readme textfiel.  I have already copied and pasted all text. Before applying operations on the readme, we need to first load this file with the help of SparkContext.

In [9]:
rdd = sc.textFile("README.md")

In [10]:
rdd.take(5)

[u'# Apache Spark',
 u'',
 u'Spark is a fast and general cluster computing system for Big Data. It provides',
 u'high-level APIs in Scala, Java, Python, and R, and an optimized engine that',
 u'supports general computation graphs for data analysis. It also supports a']

## Transformation: map and flatMap

Q1: Convert all words in a rdd to lowercase and split the lines of a document using space.

To lower the case of each word of a document, we can use the map transformation. A map transformation is useful when we need to transform a RDD by applying a function to each element. So how can we use map transformation on ‘rdd’ in our case?

Solution: Let’s see through the example, Apply a function called “Func” on each words of a document. “Func” will do two things:

1. It will take a corpus, lower the each words in this corpus.
2. After that it splits the words in each line by space.

To do this first we need to write “Func” and then apply this function using map.

In [11]:
def Func(lines):
    lines = lines.lower()
    lines = lines.split()
    return lines
rdd1 = rdd.map(Func)

After applying the function (Func) on “rdd”, we have transformed this “rdd” into “rdd1”, we can see the first 5 elements of “rdd1” by applying take operation (which is an action).

In [12]:
 rdd1.take(5) # we create a list of lists

[[u'#', u'apache', u'spark'],
 [],
 [u'spark',
  u'is',
  u'a',
  u'fast',
  u'and',
  u'general',
  u'cluster',
  u'computing',
  u'system',
  u'for',
  u'big',
  u'data.',
  u'it',
  u'provides'],
 [u'high-level',
  u'apis',
  u'in',
  u'scala,',
  u'java,',
  u'python,',
  u'and',
  u'r,',
  u'and',
  u'an',
  u'optimized',
  u'engine',
  u'that'],
 [u'supports',
  u'general',
  u'computation',
  u'graphs',
  u'for',
  u'data',
  u'analysis.',
  u'it',
  u'also',
  u'supports',
  u'a']]

We can also see that our output is not flat (it’s a nested list). So for getting the flat output, we need to apply a transformation which will flatten the output, The transformation “flatMap” will help here:

The “flatMap” transformation will return a new RDD by first applying a function to all elements of this RDD, and then flattening the results. This is the main difference between the “flatMap” and map transformations. Let’s apply a “flatMap” transformation on “rdd” , then take the result of this transformation in “rdd2” and print the result after applying this transformation.

In [13]:
rdd2 = rdd.flatMap(Func)
rdd2.take(10)

[u'#',
 u'apache',
 u'spark',
 u'spark',
 u'is',
 u'a',
 u'fast',
 u'and',
 u'general',
 u'cluster']

# Transformation: filter

Q2: Next, I want to remove the words, which are not necessary to analyze this text. We call these words as “stop words”; Stop words do not add much value in a text. For example, “is”, “am”, “are” and “the” are few examples of stop words.

Solution: To remove the stop words, we can use a “filter” transformation which will return a new RDD containing only the elements that satisfy given condition(s). Lets apply “filter” transformation on “rdd2” and get words which are not stop words and get the result in “rdd3”. To do that:

1. We need to define the list of stop words in a variable called “stopwords” ( Here, I am selecting only a few words in stop words list instead of all the words).
2. Apply “filter” on “rdd2” (Check if individual words of “rdd2” are in the “stopwords” list or not ).

We can check first 10 elements of “rdd3” by applying take action.

In [14]:
stopwords = ['is','am','are','the','for','a', 'and', 'to']
rdd3 = rdd2.filter(lambda x: x not in stopwords)
rdd3.take(10)

[u'#',
 u'apache',
 u'spark',
 u'spark',
 u'fast',
 u'general',
 u'cluster',
 u'computing',
 u'system',
 u'big']

## Transformation: groupBy

Q3: After getting the results into rdd3, we want to group the words in rdd3 based on which letters they start with. For example, suppose I want to group each word of rdd3 based on first 3 characters.

Solution: The “groupBy”  transformation will group the data in the original RDD. It creates a set of key value pairs, where the key is output of a user function, and the value is all items for which the function yields this key.

1. We have to pass a function (in this case, I am using a lambda function) inside the “groupBy” which will take the first 3 characters of each word in “rdd3”.
2. The key is the first 3 characters and value is all the words which start with these 3 characters.

After applying “groupBy” function, we store the transformed result in “rdd4” (RDDs are immutable – remember!). To view “rdd4”, we can print first (key, value) elements in “rdd4”.

In [15]:
rdd4 = rdd3.groupBy(lambda w: w[0:3])
print [(k, list(v)) for (k, v) in rdd4.take(5)]

[(u'pre', [u'pre-built', u'prefer']), (u'oth', [u'other']), (u'rev', [u'review']), (u'onl', [u'online', u'only', u'online']), (u'abb', [u'abbreviated'])]


# Transformation: groupByKey / reduceByKey 

Q4: What if we want to calculate how many times each word is coming in corpus ?

Solution: We can apply the “groupByKey” / “reduceByKey” transformations on (key,val) pair RDD. The “groupByKey” will group the values for each key in the original RDD. It will create a new pair, where the original key corresponds to this collected group of values.

To use “groupbyKey” / “reduceByKey” transformation to find the frequencies of each words, you can follow the steps below:

1. A (key,val) pair RDD is required; In this (key,val) pair RDD, key is the word and val is 1 for each word in RDD (1 represents the number for the each word in “rdd3”).
2. To apply “groupbyKey” / “reduceByKey” on “rdd3”, we need to first convert “rdd3” to (key,val) pair RDD.

 

Let’s see, how to convert “rdd3” to new mapped (key,val) RDD. And then we can apply “groupbyKey” / “reduceByKey” transformation on this RDD.

In [16]:
rdd3_mapped = rdd3.map(lambda x: (x,1))
rdd3_grouped = rdd3_mapped.groupByKey()

In the above code we are first converting “rdd3” into “rdd3_mapped”.  The “rdd3_mapped” is nothing but a mapped (key,val) pair RDD. Then I am applying “groupByKey” transformation on “rdd3_mapped” to group the all elements based on the keys (words). Next, I am saving the result into “rdd3_grouped”. Let’s see the first 5 elements in “rdd3_grouped”.

In [17]:
print(list((j[0], list(j[1])) for j in rdd3_grouped.take(5)))

[(u'when', [1]), (u'alternatively,', [1]), (u'"local"', [1]), (u'including', [1, 1, 1, 1]), (u'computation', [1])]


After seeing the result of the above code, I rechecked the corpus to know, how many times the word ‘spark’ is there, so I found that ‘spark’ is written more then once. Let’s filter ‘spark,’ in “rdd3”.

In [18]:
rdd3.filter(lambda x: x == 'spark').collect()

[u'spark',
 u'spark',
 u'spark',
 u'spark',
 u'spark',
 u'spark',
 u'spark',
 u'spark',
 u'spark',
 u'spark',
 u'spark',
 u'spark',
 u'spark',
 u'spark',
 u'spark',
 u'spark']

Until now we have not calculated the frequencies / counts of each words. Let’s proceed further :

In [19]:
rdd3_freq_of_words = rdd3_grouped.mapValues(sum).map(lambda x: (x[1],x[0])).sortByKey(False)

In the above code, I first applied “mapValues” transformation on “rdd3_grouped”. The “mapValues” (only applicable on pair RDD) transformation is like a map (can be applied on any RDD) transform but it has one difference that when we apply map transform on pair RDD we can access the key and value both of this RDD but in case of “mapValues” transformation, it will transform the values by applying some function and key will not be affected. So for example, in above code I applied sum, which will calculate the sum (counts) for the each word.

After applying “mapValues”  transformation I want to sort the words based on their frequencies so for doing that I am first converting a ( word, frequency ) pair to ( frequency,word ) so that our key and values will be interchanged then, I will apply a sorting based on key and then get a result in “rdd3_freq_of_words”. We can see that 10 most frequent words.

In [20]:
rdd3_freq_of_words.take(10)

[(16, u'spark'),
 (9, u'##'),
 (8, u'you'),
 (7, u'run'),
 (7, u'on'),
 (7, u'can'),
 (6, u'in'),
 (5, u'using'),
 (5, u'of'),
 (4, u'including')]

We can also use “reduceByKey” transformation for counting the frequencies of each word in (key,value) pair RDD. Lets see how will we do this.

In [21]:
rdd3_mapped.reduceByKey(lambda x,y: x+y).map(lambda x:(x[1],x[0])).sortByKey(False).take(10)

[(16, u'spark'),
 (9, u'##'),
 (8, u'you'),
 (7, u'run'),
 (7, u'on'),
 (7, u'can'),
 (6, u'in'),
 (5, u'using'),
 (5, u'of'),
 (4, u'including')]

If we compare the result of both ( “groupByKey” and “reduceByKey”) transformations, we have got the same results. I am sure you must be wondering what is the difference in both transformations. The “reduceByKey” transformations first combined the values for each key in all partition, so each partition will have only one value for a key then after shuffling, in reduce phase executors will apply operation for example, in my case sum(lambda x: x+y).

<img src="images/reduceByKey-3.png">

But in case of “groupByKey” transformation, it will not combine the values in each key in all partition it directly shuffle the data then merge the values for each key. Here in “groupByKey” transformation lot of shuffling in the data is required to get the answer, so it is better to use “reduceByKey” in case of large shuffling of data.

<img src="images/groupbykey.png">

## Transformation: mapPartitions

Q5: How do I perform a task (say count the words ‘spark’ and ‘apache’ in rdd3) separatly on each partition and get the output of the task performed in these partition ?
Soltion: We can do this by applying “mapPartitions” transformation. The “mapPartitions” is like a map transformation but runs separately on different partitions of a RDD. So, for counting the frequencies of words ‘spark’ and ‘apache’ in each partition of RDD, you can follow the steps:

1. Create a function called “func” which will count the frequencies for these words
2. Then, pass the function defined in step1 to the “mapPartitions” transformation.


In [22]:
def func(iterator):
    count_spark = 0
    count_apache = 0
    for i in iterator:
        if i =='spark':
            count_spark = count_spark + 1
        if i == 'apache':
            count_apache = count_apache + 1
    return (count_spark,count_apache)
rdd3.mapPartitions(func).glom().collect()

[[12, 1], [4, 0]]

I have used the “glom” function which is very useful when we want to see the data insights for each partition of a RDD. So above result shows that 12, 1 are the counts of ‘spark’, ‘apache’ in partition1 and 4, 0 are the counts of ‘spark’, ‘apache’ in partition2. If we won’t use the “glom” function we won’t we able to see the results of each partition.

In [23]:
rdd3.mapPartitions(func).collect()

[12, 1, 4, 0]

## Math / Statistical Transformation, 

Transformation: sample

Q6: What if I want to work with samples instead of full data ?
Soltion: “sample” transformation helps us in taking samples instead of working on full data. The sample method will return a new RDD, containing a statistical sample of the original RDD.
We can pass the arguments insights as the sample operation:

    “withReplacement = True” or False (to choose the sample with or without replacement)
    “fraction = x” ( x= .4 means we want to choose 40% of data in “rdd” ) and “seed” for reproduce the results.


In [24]:
rdd3_sampled = rdd3.sample(False, 0.4, 42)
print len(rdd3.collect()),len(rdd3_sampled.collect())

412 165


## Set Theory / Relational Transformation
Transformation: union

Q 7: What if I want to create a RDD which contains all the elements (a.k.a. union) of two RDDs ?
Solution: To do so, we can use “union” transformation on two RDDs. In Spark “union” transformation will return a new RDD by taking the union of two RDDs. Please note that duplicate items will not be removed in the new RDD. To illustrate this:

I am first going to create a two sample RDD ( say sample1, sample2 ) from the “rdd3” by taking 20% sample for each.
Apply a union transformation on sample1, sample2.



In [25]:
sample1 = rdd3.sample(False,0.2,42)
sample2 =rdd3.sample(False,0.2,42)
union_of_sample1_sample2 = sample1.union(sample2)
print len(sample1.collect()), len(sample2.collect()),len(union_of_sample1_sample2.collect())

77 77 154


Note: the union operation didn’t remove the duplicate elements.

## Transformation: join

Q 8: If we want to join the two pair RDDs based on their key.
Solution: The “join” transformation can help us join two pairs of RDDs based on their key. To show that:

First create the two sample (key,value) pair RDDs (“sample1”, “sample2”) from the “rdd3_mapped” same as I did for “union” transformation
 Apply a “join” transformation on “sample1”,  “sample2”.


In [26]:
sample1 = rdd3_mapped.sample(False,.2,42)
sample2 = rdd3_mapped.sample(False,.2,42)
join_on_sample1_sample2 = sample1.join(sample2)
len(join_on_sample1_sample2.collect())

119

## Transformation: distinct

Q 9: How to calculate distinct elements in a RDD ?
Solution: We can apply “distinct” transformation on RDD to get the distinct elements. Let’s see how many distinct words do we have in the “rdd3”.

In [27]:
rdd3_distinct = rdd3.distinct()
len(rdd3_distinct.collect())

263

Some additional examples: 

In [28]:
#from pyspark import SparkContext
logFile = "./README.md"  
#sc = SparkContext("local", "first app")
logData = sc.textFile(logFile).cache()
numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()
print "Lines with a: %i, lines with b: %i" % (numAs, numBs)

Lines with a: 62, lines with b: 30


Can you explain what this function does exactly?

In [29]:
NUM_SAMPLES = 10**6
import random
def inside(p):
    x, y = random.random(), random.random()
    return x*x + y*y < 1

count = sc.parallelize(xrange(0, NUM_SAMPLES)) \
             .filter(inside).count()
print "Pi is roughly %f" % (4.0 * count / NUM_SAMPLES)

Pi is roughly 3.139616


Can you explain what this function does exactly?