# What is Apache Spark?

**Apache Spark is a cluster computing platform designed to be fast and general-purpose.**

- On the speed side, Spark extends the popular MapReduce model to efficiently support more types of computations, including interactive queries and stream processing.

- One of the main features Spark offers for speed is the ability to run computations in memory, but the system is also more efficient than MapReduce for complex applications running on disk.

- At its core, Spark is a **“computational engine”** that is responsible for scheduling, distributing, and monitoring applications consisting of many computational tasks across many worker machines, or a computing cluster.




<img src = "images/apache-spark-vs-hadoop-mapreduce.jpg">

## Apache Spark: Data Science tasks

- Data scientists job includes experience with SQL, statistics, predictive modeling (machine learning), and programming, usually in Python, Matlab, or R. Data scientists also have experience with techniques necessary to transform data into formats that can be analyzed for insights (sometimes referred to as data wrangling)


- Spark’s speed and simple APIs can make their life bit easy, and its built-in libraries mean that many algorithms are available out of the box.

- The Spark shell makes it easy to do interactive data analysis using Python or Scala.

- Spark SQL also has a separate SQL shell that can be used to do data exploration using SQL, or Spark SQL can be used as part of a regular Spark program or in the Spark shell.

- Machine learning and data analysis is supported through the MLLib libraries or you can load your own models too.

- Spark provides a simple way to parallelize these applications across clusters, and hides the complexity of distributed systems programming, network communication, and fault tolerance

## Storage layers for Spark

- Spark can create distributed datasets from any file stored in the Hadoop distributed filesystem (HDFS) or other storage systems supported by the Hadoop APIs (including local filesystem, Amazon S3, Cassandra, Hive, HBase, etc.).

*Note: Spark does not require Hadoop; it simply has support for storage systems implementing the Hadoop APIs.*

## Installation

1. Visit Apache Spark Downloads page

    http://spark.apache.org/downloads.html
    

2. Select following options
    1. Choose a Spark release: **2.2.x** or greater (I'll be using 2.2.1)
    2. Choose a package type:  **Pre-built for Apache Hadoop 2.7 and later**
    3. Download Spark: **spark-2.2.1-bin-hadoop2.7.tgz**

    Download that tar compressed file to your local machine.

3. After downloading the compressed file, unzip it to desired location:

    `$ tar -xvzf spark-2.2.1-bin-hadoop2.7.tgz -C /home/prakshi/spark/`

4. Setting up the environment for Spark:

    To set up environment variable:
    
    Add following lines to your `~/.bashrc`

    ```bash
    export SPARK_HOME=/home/prakshi/tools/spark
    export PATH=$SPARK_HOME/bin:$PATH
    ```
    Make sure you change the path in `SPARK_HOME` as per your spark software file are located.
    Reload your `~/.bashrc` file using:

    ```
    $ source ~/.bashrc
    ```

5. That's all! Spark has been set-up. Try running `pyspark` command to use Spark from Python.


## Pyspark in Jupyter Notebook

Two methods to do so.

1. Configure PySpark driver
    Update PySpark driver environment variables: add these lines to your ~/.bashrc (or ~/.zshrc) file.

    ```bash
    export PYSPARK_DRIVER_PYTHON=jupyter
    export PYSPARK_DRIVER_PYTHON_OPTS='notebook'
    ```
    
    Restart your terminal and launch PySpark again:

    `$ pyspark`

    Now, this command should start a Jupyter Notebook in your web browser.

2. Using `findspark` module
    
    findSpark package is not specific to Jupyter Notebook, you can use this trick in your favorite IDE too.

    To install findspark:

    `$ pip install findspark`

    Irrespective of Jupyter notebook/Python script all you need to do to use spark is add following line in your code:

    ```python
    import findspark
    findspark.init()
    ```

# RDD: Resilient distributed dataset

**Table of Contents**

1.1 What are RDDs?

1.2 Introduction to the problem

1.3 Creating RDDs

1.4 Operations on RDDs

## 1.1 What are RDDs?

-  RDDs stands for Resilient Distributed Dataset.

- RDD is the **core abstraction** in Apache Spark.
(In case you are thinking, What does abstraction mean in programming? then go through this excellent expalnation over stackoverflow - [Abstraction](https://stackoverflow.com/questions/21220155/what-does-abstraction-mean-in-programming)

- An RDD is simply a **immutable distributed collection of elements**. 

- The name captures two important properties:
    - **Resilient** means that we must be able to withstand failures and complete an ongoing computation.
    - **Distributed** means that we must account for multiple machines having a subset of data. Formally, RDD is a read-only, partitioned collection of records

- In Spark all work is expressed as either creating new RDDs, transforming existing RDDs, or calling operations on RDDs to compute a result.

- The data inside a spark application is read into the form of RDDs and then Spark automatically distributes the data contained in RDDs across your cluster and parallelizes the operations you perform on them.

- RDDs can contain any type of Python, Java, or Scala objects, including user-defined classes.

## 1.2 Introduction to the problem

### PageRank

We live in a computer era. Internet is part of our everyday lives and information is only a click away. Just open your favorite search engine, like Google, AltaVista, Yahoo, type in the key words, and the search engine will display the pages relevant for your search. But how does a search engine really work?

The usefulness of a search engine depends on the relevance of the result set it gives back. There may of course be millions of web pages that include a particular word or phrase; however some of them will be more relevant, popular, or authoritative than others. A user does not have the ability or patience to scan through all pages that contain the given query words. One expects the relevant pages to be displayed within the top 20-30 pages returned by the search engine. Modern search engines employ methods of ranking the results to provide the "best" results first that are more elaborate than just plain text ranking.

So, our aim is to **Rank the webpages to provide best search results.**

For this, we will use Page Rank Algorithm. The idea that Page Rank introduce is that, the importance of any web page can be judged by looking at the pages that link to it. Let's take an example to understand the algorithm better:

<img src = "images/pagerank.jpg">


### How the Algorithm Works?

The PageRank algorithm outputs a probability distribution that represents the likelihood that a person randomly clicking on web links will arrive at a particular web page. If we run the PageRank program with the input data file and indicate 20 iterations we shall get the following output: 

 ```
 url_4 has rank: 1.3705281840649928.
 url_2 has rank: 0.4613200524321036.
 url_3 has rank: 0.7323900229505396.
 url_1 has rank: 1.4357617405523626.
 ```
The results clearly indicates that URL_1 has the highest page rank followed by URL_4 and then URL_3 & last URL_2. The algorithm works in the following manner:

If a URL (page) is referenced the most by other URLs then its rank increases, because being referenced means that it is important which is the case of URL_1.
If an important URL like URL_1 references other URLs like URL_4 this will increase the destination’s ranking.



## Introduction to the Dataset

Throughout the entire concept, we will be using MEDLINE records (abstracts in the life sciences domain). 
The links represent content-similarity links, i.e., pairs of abstracts that are similar in the words they contain. For example, consider pmid (unique identifier in the MEDLINE collection) [8709207](https://www.ncbi.nlm.nih.gov/pubmed/8709207). See the "Related Links" panel on the right hand side of the browser? The data provided above represent instances of graphs defined by such links.

The files are tab-delimited adjacency list representations of the link between these medical records and hence their webpages. The first token on each line represents the unique id of the source node, and the rest of the tokens represent the target nodes (i.e., outlinks from the source node). If a node does not have any outlinks, its corresponding line will contain only one token (the source node id).


### Let's get started

**Treat**: The function implementing the algorithm logic will be provided :) 

**Task**: Read the dataset into RDDs and implement a simpler version of PageRank in PySpark.



## 1.3 Creating RDDs

Spark provides two ways to create RDDs:

- Parallelizing a collection in your driver program.
- Loading an external dataset

### Parallelizing a collection in your driver program:

Create RDDs using parallelize() method on existing iterable or collection in your driver program:

# Spark Context

If you have configured Apache Spark on your machine successfully then go ahead and type command `pyspark` to open up Ipython notebook in your browser.

Since we initiated from `pyspark` command, instance of `SparkContext` will be available to us. 
If you have followed another method then use `findspark`. Initiating `SparkContext` in that case would be as follows:

```python
import findspark
findspark.init()
import pyspark
sc = pyspark.SparkContext()
```

> you may need to pass `spark_home` path to `init` method call

Example:

`findspark.init(spark_home='path_to_spark')`

In [1]:
### Test to see if you have access to `sc` variable which is `SparkContext` instance

In [2]:
sc

<img src = "images/sc.jpeg">

## Role of a Spark Context

**Spark Context is the entry point. Like a key to your car.**

- Tells Sparks how to access a cluster

- Allocates Executors

-  The context, living in your driver program, coordinates sets of processes on the cluster to run your application.

- The context keeps track of live executors by sending heartbeat messages periodically. 


- Third, the context may perform dynamic resource allocation if the cluster manager permits. This increases cluster utilization in shared environments by proper scheduling of multiple applications according to their resource demands.

**Example 1:**

In the sense, if you want to compute a complex aggregation on spark, you need to distribute the task in the cluster.

Spark context is the gateway to a Spark Cluster.

**Example 2:**

If you have a dataset ( simple CSV/TXT file) and want computations on this data, you want all the worker nodes to have access to this data. Use the spark context to broadcast this file to all the nodes.

- It allows your Spark Application to access Spark Cluster with the help of Resource Manager. The resource manager can be one of these three- Spark Standalone, YARN, Apache Mesos

In [3]:
import re
import sys
from operator import add

from pyspark.sql import SparkSession

wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']

# splitting into 4 slices
wordsRDD = sc.parallelize(wordsList, 4)

# Print out the type of wordsRDD
print(type(wordsRDD))

# Print number of partitions in your RDD
num_partitions = wordsRDD.getNumPartitions()
print("Number of partitions in your RDD is  %s" % (num_partitions))


<class 'pyspark.rdd.RDD'>
Number of partitions in your RDD is  4


### Loading an external dataset

**Two methods:** These methods takes an URI for the file (either a local path on the machine, or a hdfs://, s3n://, etc URI).
    - `sc.textFile()`: Creates a RDD with each line as an element. 
    - `sc.wholeTextFiles()` :  Creates a PairRDD with the key being the file name with a path.
I'll be using a similar dataset to explain the concepts and same operations can be applied on the medline dataset to achieve solution to our problem.

**Example dataset** - `data/example_pager.txt`

**Medline dataset** - `data/medline_pager.txt`


In [22]:
# Using sc.textfile()

url_rdd = sc.textFile("data/sample_pager.txt")
url_rdd.collect()

[u'url_1\turl_4',
 u'url_2\turl_1',
 u'url_3\turl_2\turl_1',
 u'url_4\turl_3\turl_1']

Similarly, create a RDD from Medline dataset

In [5]:
med_rdd = sc.textFile("data/medline_pager.txt")

Check number of partitions in your RDD

In [None]:
med_num_partitions = med_rdd.getNumPartitions()

print ("Number of partitions in your Medline RDD is  %s" % (med_num_partitions))

## 1.4 Operations on RDDs

From the Spark Programming Guide:

>RDDs support two types of operations: **transformations**, which create a new dataset from an existing one, and **actions**, which return a value to the driver program after running a computation on the dataset.

### Transformation on RDDs

- Transformations are kind of operations which will transform your RDD data from one form to another. And when you apply this operation on any RDD, you will get a new RDD with transformed data

- Operations like map, filter, flatMap are transformations.

- **Note:** when you apply the transformation on any RDD it will not perform the operation immediately. It will create a DAG(Directed Acyclic Graph) using the applied operation, source RDD and function used for transformation. And it will keep on building this graph using the references till you apply any action operation on the last lined up RDD. That is why the transformation in Spark are lazy.


### Actions on RDDs
- This kind of operation will also give you another RDD but this operation will trigger all the lined up transformation on the base RDD (or in the DAG) and than execute the action operation on the last RDD. 
- Operations like collect, take, count, first, saveAsTextFile are actions


Let's have a look to data in your RDD currently looks like. We will use `take()` method for this:

**`take(n)`**:

The action take(n) returns n number of elements from RDD. 

In [8]:
url_rdd.collect()

[[u'url_1', u'url_4'],
 [u'url_2', u'url_1'],
 [u'url_3', u'url_2', u'url_1'],
 [u'url_4', u'url_3', u'url_1']]

Check how your medline RDD looks like

In [7]:
med_rdd.take(2)

[u'9627181\t10027417\t8618855\t9562469\t12135350\t8980023',
 u'9470136\t9427340\t8747858\t7891871\t10208640\t7992850']

So this RDD contains on each line, first element as page name and rest list of all neighbors and everything tab seperated.

**Generate a web system as an RDD as page name, neighbor page name from the current RDD (page name, list of all neighbors).**

- For this, we have to call transformations on our current RDD. First let's create a simple list out of each line removing tabs.

**`map():`**
The map() transformation takes in a function and applies it to each element in the RDD with the result of the function being the new value of each element in the resulting RDD.


**`filter():`**
The filter() transformation takes in a function and returns an RDD that only has elements that pass the filter() function.

In [23]:
url_rdd = url_rdd.map(lambda line: line.split('\t'))
url_rdd.take(4)

[[u'url_1', u'url_4'],
 [u'url_2', u'url_1'],
 [u'url_3', u'url_2', u'url_1'],
 [u'url_4', u'url_3', u'url_1']]

Next, let's call a customize function to get the RDD in the required format. Note here we want to produce multiple output elements for each input element. The operation to do this is called **flatMap()**.

**flatMap():** Similar to map, it returns a new RDD by applying a function to each element of the RDD, but output is flattened.

In [24]:
def ungroup_weblinks(each_pager):
    page_main = each_pager[0]
    final_list = []
    for i in range(len(each_pager)-1):
        final_list.append([page_main, each_pager[i+1]])
    return final_list


url_links_rdd = url_rdd.flatMap(lambda line: ungroup_weblinks(line))

url_links_rdd.collect()


[[u'url_1', u'url_4'],
 [u'url_2', u'url_1'],
 [u'url_3', u'url_2'],
 [u'url_3', u'url_1'],
 [u'url_4', u'url_3'],
 [u'url_4', u'url_1']]

As, you'll see the ouput is flattened as expected.

Do the same operation on `med_rdd`.

**Our workflow to solve this problem will be like as shown below:**


<img src = "images/pgalgo.jpg">

**Let's start with creating the links RDD**

### Part 1: Create url link Paired RDD

#### PairedRDDs: Working with Key/Value pairs


**Paired RDD**: Pair RDD is just a way of referring to an RDD containing key/value pairs, i.e. tuples of data.

- Pair RDDs are a useful building block in many programs, as they expose operations that allow you to act on each key in parallel or regroup data across the network. For example, pair RDDs have a reduceByKey() method that can aggregate data separately for each key, and a join() method that can merge two RDDs together by grouping elements with the same key. 

- We will create a paired RDD from our existing RDD in the next steps

**Now, Generate the websystem as an RDD of tuples (page, list of neighbors) from the `url_links_rdd` i.e create a PairedRDD**

Spark provides us with a direct method to group the elements in a RDD by using `groupByKey()/reduceByKey()` method.

**groupByKey()**:  We can apply the “groupByKey” / “reduceByKey” transformations on (key,val) pair RDD. The “groupByKey” will group the values for each key in the original RDD. It will create a new pair, where the original key corresponds to this collected group of values.



In [25]:
def parseNeighbors(urls):
    """Parses a urls pair string into urls pair."""
    return urls[0], urls[1]


url_links_rdd = url_links_rdd.map(lambda urls: parseNeighbors(urls)).distinct().groupByKey().cache()
# pg.map(lambda urls: parseNeighbors(urls)).distinct().groupByKey().mapValues(list).collect()

**Let's check if we got the expected RDD!**

In [26]:
# url_links_rdd now contains tuples (page, list of neighbors)
url_links_rdd.mapValues(list).collect()



[(u'url_3', [u'url_1', u'url_2']),
 (u'url_1', [u'url_4']),
 (u'url_4', [u'url_3', u'url_1']),
 (u'url_2', [u'url_1'])]

We used `cache()` method above. **Can you tell why?**

Hint - Links RDD will be called multiple times in our solution

Answer - When you run a spark transformation via an action (count, print, foreach), then, and only then is your graph being materialized and in your case the file is being consumed. RDD.cache purpose it to make sure that the result of sc.textFile("data.txt") is available in memory and isn't needed to be read over again. In our case we want transformed url link rdd to cached.

When you persist an RDD, each node stores any partitions of it that it computes in memory and reuses them in other actions on that dataset (or datasets derived from it). This allows future actions to be much faster (often by more than 10x). Caching is a key tool for iterative algorithms and fast interactive use.

In [14]:
# Checking keys of a PairedRDD
url_links_rdd.keys()

PythonRDD[25] at RDD at PythonRDD.scala:48

**Transformations are again grouped into two types:**

- **Narrow transformation:** RDD operations like map, union, filter can operate on a single partition and map the data of that partition to resulting single partition. These kind of operations which maps data from one to one partition are referred as Narrow operations. Narrow operations doesn’t required to distribute the data across the partitions.


- **Wide transformation** -RDD operations like groupByKey, distinct, join may require to map the data across the partitions in new RDD. These kind of operations which maps data from one to many partitions are referred as Wide operations. Narrow operations doesn’t required to distribute the data across the partitions. In most of the cases Wide operations distribute the data across the partitions. These considered to be more costly than narrow operations due to data shuffling.

<img src= "images/transformations.jpg">

**Note: Wide transformations results in stages**


Data materialization occurs in a few places. When reading, shuffling, or passing data to an action. This is where the distinction between **narrow and wide dependencies** comes up. 
Narrow dependencies allow pipelining, a state of flow is straightforward. While wide dependencies forbid pipelining by requiring a shuffle. 

### Part 2: Populating the Ranks Data - Initial Seeds

The code below creates "ranks0" - a key/value pair RDD by taking the key (URL) from the links RDD and assigning the value = 1.0 to it. Ranks0 is the initial ranks RDD and it is populated with the seed number 1.0 (please see diagram below). In the 3rd part of the program we shall see how this ranks RDD is recalculated at each iteration and eventually converges, after 20 iterations, into the PageRank probability scores mentioned previously.

In [27]:
ranks = url_links_rdd.map(lambda url_neighbors: (url_neighbors[0], 1.0))

In [28]:
ranks.take(2)

[(u'url_3', 1.0), (u'url_1', 1.0)]

### Part 3: Looping and Calculating Contributions & Recalculating Ranks


This part is the heart of the PageRank algorithm. In each iteration, the contributions are calculated and the ranks are recalculated based on those contributions. The algorithm has 4 steps: 

1- Start the algorithm with each page at rank 1 

2- Calculate URL contribution: contrib = rank/size 

3- Set each URL new rank = 0.15 + 0.85 x contrib 

4- Iterate to step 2 with the new rank

We will apply the formula given above to calculate the PageRanks

**Transformations on a Paired RDD**

**`mapValues():`** 

- When we use map() with a Pair RDD, we get access to both Key & value. There are times we might only be interested in accessing the value(& not key). In those case, we can use mapValues() instead of map().

- Apply a function to each value of a pair RDD without changing the key.

**`reduceByKey()`**:
- Combine values with the same key.

- It is a wide operation as it shuffles data from multiple partitions and creates another RDD

- Before sending data across the partitions, it also merges the data locally using the same associative function for optimized data shuffling

**`join()`**:

- Some of the most useful operations we get with keyed data comes from using it together with other keyed data. Joining data together is probably one of the most common operations on a pair RDD.

- Perform an inner join between two RDDs.

- This takes in 2 Pair RDDs, and returns 1 Pair Rdd whose keys are present in both input RDDs. 

In [29]:
def computeContribs(urls, rank):
    """Calculates URL contributions to the rank of other URLs."""
    num_urls = len(urls)
    for url in urls:
        yield (url, rank / num_urls)


In [30]:
url_links_rdd.join(ranks).collect()

[(u'url_3', (<pyspark.resultiterable.ResultIterable at 0x7f009cbc55d0>, 1.0)),
 (u'url_2', (<pyspark.resultiterable.ResultIterable at 0x7f009cbb9d90>, 1.0)),
 (u'url_1', (<pyspark.resultiterable.ResultIterable at 0x7f009cbb9f10>, 1.0)),
 (u'url_4', (<pyspark.resultiterable.ResultIterable at 0x7f009cbb98d0>, 1.0))]

In [16]:
no_of_iteration = 2
for iteration in range(no_of_iteration):
    # Calculates URL contributions to the rank of other URLs.
    contribs = url_links_rdd.join(ranks).flatMap(
        lambda url_urls_rank: computeContribs(url_urls_rank[1][0], url_urls_rank[1][1]))
    ranks = contribs.reduceByKey(add).mapValues(lambda rank: rank * 0.85 + 0.15)

In [17]:
for (link, rank) in ranks.collect():
    print("%s has rank: %s." % (link, rank))
    
#The higher the rank, more relevant is the page.

url_3 has rank: 0.575.
url_2 has rank: 0.394375.
url_1 has rank: 1.308125.
url_4 has rank: 1.7225.


In [18]:
data = ranks.collect()
import matplotlib.pyplot as plt
x, y = zip(*data)
plt.plot(x, y)
plt.show()

<Figure size 640x480 with 1 Axes>

## Concluding the operations we performed:

<img src = "images/pagerank_conclusion.jpg">