##  We are getting started with Apache Spark! 

For the very basics, look at the following site: :
* https://spark.apache.org/docs/0.9.0/python-programming-guide.html


Credit for this notebook: https://www.analyticsvidhya.com/blog/2016/09/comprehensive-introduction-to-apache-spark-rdds-dataframes-using-pyspark/
and 
https://www.analyticsvidhya.com/blog/2016/10/using-pyspark-to-perform-transformations-and-actions-on-rdd/

Other helpful ressources: 
* https://spark.apache.org/
* Book: Learning Spark (O'Reilly), for a lot of details:
https://www.oreilly.com/library/view/learning-spark/9781449359034/
(pdf online)
* https://www.tutorialspoint.com/pyspark/index.htm

First we create a context, this means we are telling python to start a connection to spark, using the pyspark API.

# Apache Spark data representations: RDD / Dataframe / Dataset

Spark has three data representations: RDD, Dataframe, Dataset. For each data representation, Spark has a different API. For example, later we are going to use ml (a library), which currently supports only Dataframe API. Dataframe is much faster than RDD because it has metadata (some information about data) associated with it, which allows Spark to optimize the query plan. The Dataframe feature in Apache Spark was added in Spark 1.3. 

In this notebook, we will spend some time on RDD, to get you started with Apache Spark. Later, we will spend some time on Dataframes. Dataframes share some common characteristics with RDD (transformations and actions). We arenot going to talk about Dataset as this functionality is not included in PySpark.

### RDD:
After installing and configuring PySpark, we can start programming using Spark in Python. But to use Spark functionality, we must use RDDs. A RDD (Resilient Distributed Database) is a collection of elements, that can be divided across multiple nodes in a cluster to run parallel processing. It is also fault tolerant collection of elements, which means it can automatically recover from failures. RDD is immutable, we can create a RDD once but can’t change it. We can apply any number of operation on it and can create another RDD by applying some transformations. Here are a few things to keep in mind about RDD:

We can apply 2 types of operations on RDDs:

* Transformation: Transformation refers to the operation applied on a RDD to create a new RDD.
* Action: Actions refer to an operation which also apply on RDD that perform computation and send the result back to driver.

Example: Map (Transformation) performs operation on each element of RDD and returns a new RDD. But, in case of Reduce (Action), it reduces / aggregates the output of a map by applying some functions (Reduce by key). There are many transformations and actions that are defined in Apache Spark documentation.

RDDs use Shared Variables:
The parallel operations in Apache Spark use shared variable. It means that whenever a task is sent by a driver to executors program in a cluster, a copy of shared variable is sent to each node in a cluster, so that they can use this variable while performing tasks. Accumulator and Broadcast are the two types of shared variables supported by Apache Spark.
Broadcast: We can use the Broadcast variable to save the copy of data across all node.
Accumulator: In Accumulator variables are used for aggregating the information.

 

How to Create a RDD in Apache Spark

Existing storage: When we want to create a RDD though existing storage in driver program (which we would like to be parallelized). For example, converting a list to RDD, which is already created in a driver program.

External sources: When we want to create a RDD though external sources such as a shared file system, HDFS, HBase, or any data source offering a Hadoop Input Format.

 

Writing a first program in Apache Spark

We have already discussed that RDD supports two type of operations, which are transformation and action. Let us get down to writing our first program:

## Step 1: Create SparkContext

First step in any Apache programming is to create a SparkContext. SparkContext is needed when we want to execute operations in a cluster. SparkContext tells Spark how and where to access a cluster. It is first step to connect with Apache Cluster. If you are using Spark Shell, we will find that this is already created. Otherwise, we can create the Spark Context by importing, initializing and providing the configuration settings. For example:

In [None]:
from pyspark import SparkContext
sc = spark.sparkContext

## Step 2: Create a RDD

We can create RDD in two ways: Either from an existing storage or from an external storage. Let’s create our first RDD. SparkContext has the parallelize method, which is used for creating the Spark RDD from an iterable (like list, tuple..) already present in driver program.

We can also provide the number of partitions as a parameter to parallelize method. If we do not give number of partition parameter, then Spark will automatically set the number of partition in a cluster. The number of partition can be set manually by passing a second parameter to the parallelize method. For example, sc.parallelize(data, 10)), where data is an existing data in driver program and 10 is the number of partitions.
Lets create the first Spark RDD called rdd.

In [None]:
data = range(1,1000) # a python object
rdd = sc.parallelize(data)

In [None]:
rdd.collect() # returns the data
# also :rdd.take(10) what does this command do?

In [None]:
# what is the return type of the above?

We have 2 parallel operations in RDD which are Transformation and Action. Transformation and Action were already discussed briefly earlier. So let’s see how transformation works. Remember that RDDs are immutable – so we can’t change our RDD, but we can apply transformation on it. Let’s see an example of map transformation to demonstrate how transformation works.

## Step 3: Map transformation.

Map transformation returns a Mapped RDD by applying function to each element of the base RDD. Let’s repeat the first step of creating a RDD from existing source, For example,

In [None]:
data = ['Hello' , 'I' , 'am', 'a', 'big', 'data', 'engineer!']
rdd = sc.parallelize(data)

Now a RDD (name is ‘rdd’) is created from the existing source, which is a list of strings in a driver program. We will now apply a lambda function to each element of rdd and return the mapped (transformed) RDD (word,1) pair in the rdd1.

In [None]:
rdd1 = rdd.map(lambda x: (x,1))

Lazy evaluation, the previous statement is only evaluated when we perform the action:

In [None]:
rdd1.collect()

Nothing happened after applying the lambda function on rdd1 (we won’t see any computation happening in a cluster). This is called the lazy operation. All transformation operations in Spark are lazy, which means that we will not see any computations on RDD, until we need them for further action.

Spark remembers which transformation is applied to which RDD with the help of DAG (Directed a Cyclic Graph). The lazy evaluation helps Spark to optimize the solution because Spark will get time to see the DAG before actually executing the operations on RDD. This enables Spark to run operations more efficiently.

In the code above, collect() and take() are the examples of an action.

There are many number of transformation defined in Apache Spark. 


* We can create a RDD in two different ways, from existing source and external source.
* We can apply two types of operations on RDD, namely “transformation” and “action”. All transformations on RDD are lazy in nature, which means that computations on RDD are not done until we apply an action.
* RDDs are immutable in nature i.e. we cannot change the RDD, we need to transform it by applying transformation(s). There are various transformations and actions, which can be applied on RDD.


In [None]:
rdd = sc.textFile("../data/README.md")

In [None]:
rdd.take(5)

## Transformation: map and flatMap

Q1: Convert all words in a rdd to lowercase and split the lines of a document using space.

To lower the case of each word of a document, we can use the map transformation. A map transformation is useful when we need to transform a RDD by applying a function to each element. So how can we use map transformation on ‘rdd’ in our case?

In [None]:
# TODO

After applying the function (Func) on “rdd”, we have transformed this “rdd” into “rdd1”, we want to see the first elements

In [None]:
# TODO

We can also see that our output is not flat (it’s a nested list). So for getting the flat output, we need to apply a transformation which will flatten the output, The transformation “flatMap” will help here:

The “flatMap” transformation will return a new RDD by first applying a function to all elements of this RDD, and then flattening the results. This is the main difference between the “flatMap” and map transformations. Let’s apply a “flatMap” transformation on “rdd” , then take the result of this transformation in “rdd2” and print the result after applying this transformation.

In [None]:
rdd2 = rdd.flatMap(Func)
rdd2.take(10)

# Transformation: filter

Q2: Next, I want to remove the words, which are not necessary to analyze this text. We call these words as “stop words”; Stop words do not add much value in a text. For example, “is”, “am”, “are” and “the” are few examples of stop words.

In [None]:
stopwords = ['is','am','are','the','for','a', 'and', 'to']
# TODO

## Transformation: groupBy

Q3: After getting the results into rdd3, we want to group the words in rdd3 based on which letters they start with. For example, suppose I want to group each word of rdd3 based on first 3 characters.

In [None]:
# TODO

# Transformation: groupByKey / reduceByKey 

Q4: What if we want to calculate how many times each word is coming in corpus ?

Solution: We can apply the “groupByKey” / “reduceByKey” transformations on (key,val) pair RDD. The “groupByKey” will group the values for each key in the original RDD. It will create a new pair, where the original key corresponds to this collected group of values.

To use “groupbyKey” / “reduceByKey” transformation to find the frequencies of each words, you can follow the steps below:

1. A (key,val) pair RDD is required; In this (key,val) pair RDD, key is the word and val is 1 for each word in RDD (1 represents the number for the each word in “rdd3”).
2. To apply “groupbyKey” / “reduceByKey” on “rdd3”, we need to first convert “rdd3” to (key,val) pair RDD.

 

Let’s see, how to convert “rdd3” to new mapped (key,val) RDD. And then we can apply “groupbyKey” / “reduceByKey” transformation on this RDD.

In [33]:
# TO DO

<img src="images/reduceByKey-3.png">

But in case of “groupByKey” transformation, it will not combine the values in each key in all partition it directly shuffle the data then merge the values for each key. Here in “groupByKey” transformation lot of shuffling in the data is required to get the answer, so it is better to use “reduceByKey” in case of large shuffling of data.

<img src="images/groupbykey.png">