# Week 6- MapReduce and Apache Spark

**Objectives**: Today we are going to work with Apache Spark on AWS. Spark is one of the most popular "big data" platforms and supports both batch processing like Hadoop and streaming. We will also explore the basics of the MapReduce programming model that was the basis for Apache Hadoop. Today we will:
  
* Review MapReduce
* Review the conceptual foundation of MapReduce with parallel Python
* Review Spark and its place in the "big data" technology ecosystem
* Set up our Spark environment including PySpark
* Read data from S3 into an RDD
* Conduct some analyses using Spark

# MapReduce and Apache Hadoop

The one technology that is most associated with "big data" is the MapReduce programming model and its open source framework [Hadoop](https://hadoop.apache.org/). This model uses a functional style to enable computing operations to scale across very large datasets. The functional style of programming enables networked applications to operate over large clusters of independent computers while still maintaining the integrity of the results. MapReduce is designed to work with very large datasets by using clusters that might have thousands of independent, low-cost computers acting as the worker nodes. Unlike traditional systems which required more central capacity in terms of memory or processing (vertical scalability), MapReduce supports a decentralized model that can scale by adding more worker nodes (horizontal scalability). 

<img src="https://raw.githubusercontent.com/azbones/big_data/master/images/map_reduce.png">
(source: http://static.googleusercontent.com/media/research.google.com/en//archive/mapreduce-osdi04.pdf)

While MapReduce and Hadoop has been one of hottest analytics technologies over the past five years, there are signs that the technology may be suffering from what Gartner terms ["the trough of disillusionment"](http://www.gartner.com/technology/research/methodologies/hype-cycle.jsp).  

In December of 2015, the Wall Street Journal said:

>Researcher Gartner Inc. says Hadoop adoption [remains low](http://blogs.wsj.com/cio/2015/05/13/hadoop-corporate-adoption-remains-low-gartner/) as firms struggle to articulate Hadoop’s business value and overcome a shortage of workers who have the skills to use it. A survey of 284 global IT and business leaders in May found more than half had no plans to invest in Hadoop. Adoption could grow with the use of [tools based on SQL](http://blogs.wsj.com/cio/2015/03/31/corporate-hadoop-adoption-is-growing-barclays-report-says/), a query language that corporate IT shops know well, Barclays analyst Raimo Lenschow said earlier this year.

(source: http://blogs.wsj.com/cio/2015/12/11/cio-explainer-what-is-hadoop/)

While firms in general may be struggling to adopt Hadoop and MapReduce, there is little doubt that the programming model continues to be important for large scale data processing especially at Internet firms. Newer technologies like Spark were built on the foundation that MapReduce built. 

# Serial Execution Through One Process

To demonstrate the conceptual foundations of MapReduce, we will use TextBlob to count noun phrase using a single-process serial approach and then use the IPython Parallel library to conduct the same analysis using several workers in parallel. We will use the IPython magic command <code>%time</code> to capture the time of execution for each version.

In [None]:
# This opens the full text of Moby Dick file into a file object


import codecs
with codecs.open('./datasets/moby_full','r',encoding='utf8') as f:
    text = f.read()

We will create a TextBlob object with the full text of Moby Dick.

In [None]:
# Now, we create a TextBlob object from the file 

from textblob import TextBlob

full_text = TextBlob(text)

In [None]:
# See the text in the object

full_text

We will count the noun phrases in the TextBlob using the <code>np_counts</code> method.

In [None]:
# Count noun phrases. Note it will take some time depending on your computer.

%time serial = full_text.np_counts

Next, we will derive some statistics from the count we conducted.

In [None]:
print 'Length of noun phrases is {}'.format(len(serial))
print 'Sum of noun phrase counts is {}'.format(sum(serial.values()))

# Parallel Execution Through Four Worker Processes

For the next example, we are going to use [IPython Parallel](http://ipyparallel.readthedocs.org/) to conduct the same analysis using a process similar to that of MapReduce. While we will be executing this on our individual computers, the same code with minor changes could be used across different physical devices as a cluster. 

From command line, start the iPython worker nodes in your working directory:

<code>ipcluster start -n 4</code>

In [None]:
# Start IPython Parallel in notebook and check for workers

from ipyparallel import Client
c = Client()


In [None]:
# Now, let's check the client to make sure all four workers have started

print 'These are the currently active worker ids:{}'.format(c.ids)

To use each of these workers in parallel, we need an object that acts as a multiplexer. In IPython Parallel, a DirectView is an object which allows interactive access to these worker processes. In the next code block, we will assign all the workers to a view using a slice approach. 

In [None]:
# Assign all workers to a view

dview=c[:]

In order to make this example easier to understand, we split the Moby Dick text into four equal parts which are contained in the datasets folder and have the following names.

In [None]:
text_list = ['moby25a', 'moby25b', 'moby25c', 'moby25d']

We will use a DirectView [decorator](http://simeonfranklin.com/blog/2012/jul/1/python-decorators-in-12-steps/) with the @ symbol to make a standard serial function work on a cluster of IPython processes in parallel. 

In [None]:
@dview.parallel(block=True)
def read_texts_parallel(text):
    from textblob import TextBlob
    import codecs
    with codecs.open('./datasets/{}'.format(text[0]),'r',encoding='utf8') as f:
        text = f.read()
    blob = TextBlob(text)
    counts = blob.np_counts
    return dict(counts)    

Next, we will create a function that:

* Maps the individual texts (an iterable list) into the parallel function
* Reduces the returns from those functions into a summarized list of noun phrases

Counter is a subclass for dictionaries to count values.

We will then run the combined map and reduce function using the list of text sections from Moby Dick to compare run times.


In [None]:
from collections import Counter

def map_reduce(texts):
    # This effectively maps the iterable list of texts to the function on each worker
    mapped_text = read_texts_parallel(texts)
    # This takes the returned map results and combines them in the notebook process
    reduced = reduce(lambda x, y:Counter(x) + Counter(y), mapped_text)
    return reduced


%time map_reduced = map_reduce(text_list)

Let's see if the noun phrase counts were the same as the earlier serial process with a single file.

In [None]:
print 'Length of noun phrases is {}'.format(len(map_reduced))
print 'Sum of noun phrase counts in {}'.format(sum(map_reduced.values()))

Finally, we will use the powerful [sets module](https://docs.python.org/2/library/sets.html) to compare our results.

In [None]:
set(serial).difference(set(map_reduced))