# Introduction to Data Science – Parallel Computing 
*COMP 5360 / MATH 4100, University of Utah, http://datasciencecourse.net/* 

In this lecture, we'll discuss parallel computing. We'll first lok at the reasons for parallel computing and then looks at some data-science specific parallel applications such as MapReduce and Spark. 

**Further reading:**

[J. Lin and C. Dyer, Data-Intensive Text Processing with MapReduce (2010)](https://lintool.github.io/MapReduceAlgorithms/MapReduce-book-final.pdf)


## What is parallelism?

Traditional algorithms use serial computation, which means that instructions are processed one after the others. These algorithms are easy to implement.

**Parallelism** is the approach of executing multiple threads simultaneously. This can be applied to anything. For example, it is common that GUI applications have a thread for the user interface and a separate thread for any back-end processing (saving a file, running a computation), which ensures that the user interface stays responsive while the back-end processing is active. 

**Parallel computing** is when a computational task is broken into smaller subtasks, which are processed simultaneously and indpendently. 

So why do we need parallelism? Speed! The benefits of parallel computing are that either larger tasks can be completed that couldn't be completed in reasonable time on a single CPU, or that tasks can be completed faster. 

## Group Exercise



```
R1: 19 18 7 13 17
R2: 19 18 18 11 7 
R3: 9 1 5 10 13
R4: 20 7 17 17 6 
R5: 14 18 5 13 14
R6: 19 2 5 11 1  
```

## The Rise of Parallelism

Parellism has always been important, but it is becoming increasingly relevant. Two decades ago, we could rely on increasing CPU speeds to make computation faster and faster. [Moore's law](https://en.wikipedia.org/wiki/Moore%27s_law) states that the number of transistors in a chip doubles approximately every two years. And more transistors (naively) equals more computing power.

![Moore's Law](moores_law.png)

Up to about 2004 this also correlated with an increase in speed for a single CPU core by increasing it's clock speed (number of instructions computed per second). However, since then it hasn't been possible to further increase clock speed for various technical reasons. 

![Clock Speed](clock_speed.png)

Now, instead of making individual CPUs faster, we are adding more CPUs on a single Chip. However, having multiple processors doesn't make a single thread faster. Hence, we increasingly need parallelism to leverage the speed advantages. 

On a larger scale, we can also distribute workload amonst multiple computers, e.g., in a computer clusters. Also, GPUs (Graphics Processing Units) are now used for a wide variety of tasks, because of their ability to run many processes in parallel. 

**A Programmers Perspective.** Question: How do you make your program run faster? 

Answer before 2004: Wait 6 months and buy a new computer. 

Answer after 2004: You need to write parallel software.

**Example 1:** To speed up the addition of an array of numbers, we can leverage parallelism:

![GPGPU Example](gpgpu.png)
*Source: Kohei KaiGai*


**Example 2:** To use cross validation to search for optimal parameters, you could have put each parameter tested on a different computer. See the `n_jobs` parameter in the scikit-learn command [*GridSearchCV*](http://scikit-learn.org/stable/modules/generated/sklearn.model_selection.GridSearchCV.html).

### Parallel Thinking

 * Decompose work into pieces that can safely be performed in parallel
 * Assign work to processors
 * Managing communication/synchronization between the processors so that it does not limit speedup. 


### Exercise 2

In a group, develop conceptual strategies to parallelize these operations for the following tasks. Think about how to split the input and how to merge the data. 

 * Multiply all elements of a large array by 4.
 * Calcualte the mean of a very large array.
 * For a tabular dataset, group by a categoical dimension and find the minimum of each group.

Which of these operations is better to parallelize? 

### Hurdles

On the other hand, parallel computing is not easy. There are a large number of low-level programming aspects that must be handled. For example, one must consider  
- Partitioning input data
- Shared memory (Open Multiprocessing (OpenMP)) or distributed memory (Message Passing Interface (MPI)) architectures
- Scheduling execution
- Handling failures
- Interprocessor communication

There are a lot of difficult Computer Science questions here! For example, [Amdahl's law](https://en.wikipedia.org/wiki/Amdahl%27s_law) gives the theoretical speedup of a process depending on how much of the task can be parallelized. 

![Amdahl's Law](amdahls_law.png)

*Source: Daniels220, CC BY-SA 3.0, https://commons.wikimedia.org/w/index.php?curid=6678551*

## Basic Parallelism in Python

Python has support for low-level parallelism; see the 
[python documentation](https://docs.python.org/3.6/library/concurrency.html), 
[multiprocessing](https://docs.python.org/3.6/library/multiprocessing.html),
[ipyparallel](https://ipyparallel.readthedocs.io/en/latest/index.html), 
[joblib](https://pypi.python.org/pypi/joblib), 
*etc*...

We'll use the multiproceesing library for an example. It uses lower level process spawning API and provides true parellelism by spawning sub processes. It uses the multiple processor cores.

In [16]:
import time
import threading
import multiprocessing
from numpy import arange

Here is a naive function to evaluate whether a function is prime or not.

In [38]:

def isprime(n):
    prime = True
    for i in arange(2, n):
        if n/i % 1 == 0:
            return False            
    return prime

This function will call the function isprime for integers 1 to max_number. This will be carried out serially one number at a time.

In [39]:
def prime_serial_test(max_number):
    [isprime(i) for i in range(max_number)]

This function will create a pool of processes and will call `isprime` on integers from `1` to `max_number`. Multiple numbers will be processed at the same time, depending on the number of processes and the number of CPUs in your computer. 

The number of processes spawned in the pool must be less than or equal to number of cores in CPU for this to make sense. Using more processes than CPUs reduces the performance rather than improving it.

The critical function here is `pool.map`, which takes a function (`isprime`), an iterable data structure (the array returned by range), and a chunksize (optional). It takes the process of the pool and then gives it chunks of data that are then passed to the `isprime` function. 

In [65]:
def prime_parallel_test(max_number):
    # create multiple sub-processes
    pool = multiprocessing.Pool(processes = 4)
    # pass a chunk of numbers into each process as it becomes available
    # the chunksize defines how many are put in in batch
    pool.map(isprime, range(max_number), chunksize=100)
    # pool.close() will terminate all the subprocesses once their allocated work is done.
    pool.close()
    # pool.join() makes the main processes wait for subprocesses to complete
    pool.join()

In [66]:
%time prime_serial_test(20000)

CPU times: user 11.9 s, sys: 51.5 ms, total: 12 s
Wall time: 12 s


In [67]:
%time prime_parallel_test(20000)

CPU times: user 28.2 ms, sys: 21.3 ms, total: 49.5 ms
Wall time: 3.67 s


Here are the results on a machine with 4 physical cores (8 - logical cores due to hyperthreading).

| Condition                         | Time     |
| :---------------------------------|  --------|
| Serial                            | 11.9 sec |
| Parallel  (with 2 processes)      | 6.07 sec |
| Parallel  (with 4 processes)      | 3.49 sec | 
| Parallel  (with 8 processes)      | 3.61 sec |
| Parallel  (with 16 processes)     | 3.64 sec |

Here we calculate the sum of all pairwise products of an array: 

In [80]:
import random
big_array = [random.random() for _ in range(5000)]
print (big_array[:10])

# This function multiplies each element of vector X with element y
# and returns sum of the products.
def sum_products(X, y):
    return sum(x*y for x in X) 

[0.5233250317622361, 0.14321663192869138, 0.9554937128974341, 0.48605940530912706, 0.0885312335113213, 0.3022338711843523, 0.9331123487264593, 0.06107177632849614, 0.56063664635997, 0.879535690243721]


In [81]:
# This function calls sum_products for each element of vector X with X itself.
def pairwise_sum(X):
    return sum(sum_products(X,y) for y in X)

# Serial call for pairwise sum.
%time results = pairwise_sum(big_array)
print(results)

CPU times: user 1.37 s, sys: 15.5 ms, total: 1.39 s
Wall time: 1.38 s
6336802.429491523


In [82]:
# This function wraps the sum_products function.
# pool.map can only take a function and one argument for the function.
# To pass two arguments we turn them into a single tuple.
# We pass it to sum_products using *args which is variable arguments in python,
# this automatically unfolds the tuple into the two arguments needed by sum_products
def wrap_sum_products(args):
    return sum_products(*args)


pool = multiprocessing.Pool(processes=4)
# Notice how we pass the list to parallel_pairwise_sum with each element of 
# the list being a tuple as discussed in above comment.
%time sub_results = pool.map(wrap_sum_products, ((big_array, y) for y in big_array))
print("The first 10 results of individual workers:", sub_results[0:10])
# This is a serial step
results = sum(sub_results)
print("The Sum", results)

CPU times: user 8.76 ms, sys: 4.71 ms, total: 13.5 ms
Wall time: 365 ms
The first 10 results of individual workers: [1317.3664287916924, 360.51931686155626, 2405.264919306379, 1223.557644083063, 222.85972932462417, 760.8135124002831, 2348.924297322143, 153.73602063504748, 1411.2909794854609, 2214.055741514053]
The Sum 6336802.429491523


## MapReduce

MapReduce is a programming model for  distributed computations. It's designed for processinging large data sets (think ~ 1 terabyte of data) and is essentially a parallel and distributed algorithm. You can easily deploy MapReduce programs on clusters, it will take care of a lot of problems associated with clusters, such as individual nodes dying, etc.  

**History:**

1. Developed by Google, but built on previously-developed ideas
+  Apache Hadoop is an open source implemenation 
+ implemented in Java
+ There are several Python interfaces to Hadoop, including MrJob, etc.... Unfortunately, these don't yet work nicely with Jupyter notebook.

We'll be using [MrJob](https://pythonhosted.org/mrjob/) and have taken and modified examples from there. 

**Concept:**

Data is mapped into key-value pairs. The key is a unique identifier of the object, and then the value can store anything.

1. Mapping: 
Typically, the initial format of the key-value pair is very rough. The data is just stored, but
not processed much yet, and in any particular order. It may contain many irrelevant parts of information (for
the current task). So the mapping phase is in charge of getting it into the right format. Keep in mind, this
data set is very large, so it is stored once, but may be used for many different purposes.
The mapping phase takes a file, and converts it into another set of key-value pairs. The values generally
contain the data of current interest, and the keys are the identifiers we care about.

2. Shuffling: 
The output of the maping step is typically (roughly) as large as the original data, so it also cannot all fit on
one machine. The shuffle step puts all key-value pairs with the same key on one machine (if they can all
fit). 

3. Reducing: 
The reduce step aggregates the data by keys using some useful (application specific) function. This data is now all in the same location – we have locality.

![](map_reduce.png)


-See Lecture 14 of [Harvard CS109 notes](https://drive.google.com/drive/folders/0BxYkKyLxfsNVd0xicUVDS1dIS0k) and the accompanying video [here](http://cs109.github.io/2015/pages/videos.html).

Let's looks at a practical example. Note the use of the [`yield`](https://docs.python.org/3/reference/expressions.html#yieldexpr ) keyword. 

You'll have to install MRJob: 

```
pip install mrjob 
```

We want to count how many words, characters, and lines are in a text that is stored in a file. Note that his will not execute in the notebook, you'll have to run it on the command line. 

```python
#!/usr/bin/env python3
from mrjob.job import MRJob

class MRWordFrequencyCount(MRJob):

    # Takes one line from a file passed as a command line parameter
    # and yields three key-value pairs for chars, words and lines. 
    def mapper(self, _, line):
        yield "chars", len(line)
        yield "words", len(line.split())
        yield "lines", 1

    # Reduces the values associated with each key by summing them. 
    # Creates sums for chars, words and lines.
    def reducer(self, key, values):
        yield key, sum(values)


if __name__ == '__main__':
    MRWordFrequencyCount.run()
```

The code is available in the file [word_frequency.py](word_frequency.py). Here's the (sanitized) output if we run this on Moby Dick, which is stored in `pg2701.txt`. 

```
$python word_frequency.py pg2701.txt
"words"	215135
"lines"	22108
"chars"	1213077
```

**Now let's count the number of specific words in a text.** Here is our model for splitting, mapping, shuffling, and reducing: 

![](mapreducewc.png)

Here is the implementation of this idea, which you can also find in [count_words.py](count_words.py).

```python
#!/usr/bin/env python3
from mrjob.job import MRJob
import re

class MRCountWords(MRJob):

    # The mapper creates a key for every word and a value of 1
    # Input is a line of text. We ignore keys _
    def mapper(self, _, line):
        for word in line.split():
            # regular expression to remove all non-word characters
            word = re.sub("[^a-zA-Z]+", "", word)
            # make it lowercase and yield the word as key and 1 as value
            yield word.lower(), 1

    # The reducer sums over all the values of the keys
    def reducer(self, word, occurences):
        yield word, sum(occurences)

if __name__ == '__main__':
    MRCountWords.run()
```

We run this on the text of "The Cat in the Hat" by Dr. Suess:

```
python count_words.py cat_in_the_hat.txt 
```
And get this output:

```
"seuss"	2
"seven"	2
"sew"	2
"sews"	9
"shake"	2
"shame"	4
"she"	7
"sheep"	2
"shine"	1
"ship"	4
"shocking"	1
"shoe"	2
"shoes"	2
"shook"	4
"should"	15
"shove"	1
"show"	4
"shut"	2
"sick"	2
"side"	2
"silly"	1
"simply"	2
...
```

[Google Ngram viewer](https://books.google.com/ngrams) is an example where map reduce word freuqencies becomes useful.

### Combiners

This is fine, but we leave a lot of work for the shuffle and sort step. We can do better, by doing a "local reduce", which is known as combiners. Combiners work just like reducers, but are executed locally on a single compute node, whereas reducers are executed globally. 


![](combine.png)

Note that the combiner has to have the same method signature as the reducer. Sometimes they can be the same, as in this example:

```python
from mrjob.job import MRJob
import re

class MRCountWords(MRJob):

    def mapper(self, _, line):
        for word in line.split():
            word = re.sub("[^a-zA-Z]+", "", word)
            yield word.lower(), 1

    # New: the combiner. It has the same implementation as the reducer
    # but is execued on the local node only. 
    def combiner(self, word, occurences):
        yield word, sum(occurences)
            
    def reducer(self, word, occurences):
        yield word, sum(occurences)

if __name__ == '__main__':
    MRCountWords.run()
```

Here is code that finds the most frequent word in a text. See [most_common.py](most_common.py). The key here is that we run two MapReduce steps. 

```python
#!/usr/bin/env python3
from mrjob.job import MRJob
from mrjob.step import MRStep
import re

# we store the regular expression in an object
WORD_RE = re.compile(r"[\w']+")

class MRMostUsedWord(MRJob):

    # here we explicitly define the steps to take. We have two steps, the first
    # has a mapper, combiner and reducer, the second only a reducer.
    def steps(self):
        return [
            MRStep(mapper=self.mapper_get_words,
                   combiner=self.combiner_count_words,
                   reducer=self.reducer_count_words),
            MRStep(reducer=self.reducer_find_max_word)
        ]

    def mapper_get_words(self, _, line):
        # yield each word in the line
        for word in WORD_RE.findall(line):
            yield (word.lower(), 1)

    def combiner_count_words(self, word, counts):
        # optimization: sum the words we've seen so far
        yield (word, sum(counts))

    def reducer_count_words(self, word, counts):
        # send all (num_occurrences, word) pairs to the same reducer.
        # num_occurrences is so we can easily use Python's max() function.
        num_occurences = sum(counts)
        # we are not yielding a key, so that all outputs are treated together
        yield None, (num_occurences, word)

    # discard the key; it is just None
    def reducer_find_max_word(self, _, word_count_pairs):
        # each item of word_count_pairs is (count, word),
        # so yielding one results in key=counts, value=word
        yield max(word_count_pairs)

if __name__ == '__main__':
    MRMostUsedWord.run()
```

### Running MapReduce on servers

We've so far only run MapReduce locally and in a single process. MRJob, however, has support for a variety of different architectures. 

Here is how you run it locally but in multiple processes and simulating Hadoop features: 
```
python most_common.py -r local pg2701.txt
```

Here is a command that would run it on Amazon Elastic MapReduce [if properly set up](https://pythonhosted.org/mrjob/guides/emr-quickstart.html)

```
python my_job.py -r emr s3://my-inputs/input.txt
```

### Exercise 3:
How would you use MapReduce to find anagrams? Sketch a strategy.

## Spark

Spark can be thought of as  MapReduce 2.0

- In memory as opposed to disk
- Data can be cached in memory or disk for future use
- 100x faster than Hadoop MapReduce in memory or 10x faster on disk
- resilient distributed dataset (RDD)
- Python, Java, and Scala interfaces
- [apache-spark](http://spark.apache.org/) can be used in python through [findspark](https://github.com/minrk/findspark)
- Easier than Hadoop while being functional, runs a general DAG

For more, see Lecture 15 of [Harvard CS109 notes](https://drive.google.com/drive/folders/0BxYkKyLxfsNVd0xicUVDS1dIS0k) 
and accompanying [notebook](https://github.com/cs109/2015/blob/master/Lectures/15b-Spark.ipynb)