# Introduction to Data Science – Parallel Computing 
*COMP 5360 / MATH 4100, University of Utah, http://datasciencecourse.net/* 

In this lecture, we'll discuss parallel computing. We'll first lok at the reasons for parallel computing and then looks at some data-science specific parallel applications such as MapReduce and Spark. 

**Further reading:**

[J. Lin and C. Dyer, Data-Intensive Text Processing with MapReduce (2010)](https://lintool.github.io/MapReduceAlgorithms/MapReduce-book-final.pdf)


## What is parallelism?

Traditional algorithms use serial computation, which means that instructions are processed one after the others. These algorithms are easy to implement.

**Parallelism** is the approach of executing multiple threads simultaneously. This can be applied to anything. For example, it is common that GUI applications have a thread for the user interface and a separate thread for any back-end processing (saving a file, running a computation), which ensures that the user interface stays responsive while the back-end processing is active. 

**Parallel computing** is when a computational task is broken into smaller subtasks, which are processed simultaneously and indpendently. 

So why do we need parallelism? Speed! The benefits of parallel computing are that either larger tasks can be completed that couldn't be completed in reasonable time on a single CPU, or that tasks can be completed faster. 

## Group Exercise

```
R1: 19 18 7 13 17
R2: 19 18 18 11 7 
R3: 9 1 5 10 13
R4: 20 7 17 17 6 
R5: 14 18 5 13 14
R6: 19 2 5 11 1  
```

## The Rise of Parallelism

Parellism has always been important, but it is becoming increasingly relevant. Two decades ago, we could rely on increasing CPU speeds to make computation faster and faster. [Moore's law](https://en.wikipedia.org/wiki/Moore%27s_law) states that the number of transistors in a chip doubles approximately every two years. And more transistors (naively) equals more computing power.

![Moore's Law](moores_law.png)

Up to about 2004 this also correlated with an increase in speed for a single CPU core by increasing it's clock speed (number of instructions computed per second). However, since then it hasn't been possible to further increase clock speed for various technical reasons. 

![Clock Speed](clock_speed.png)

Now, instead of making individual CPUs faster, we are adding more CPUs on a single Chip. However, having multiple processors doesn't make a single thread faster. Hence, we increasingly need parallelism to leverage the speed advantages. 

On a larger scale, we can also distribute workload amonst multiple computers, e.g., in a computer clusters. Also, GPUs (Graphics Processing Units) are now used for a wide variety of tasks, because of their ability to run many processes in parallel. 

**A Programmers Perspective.** Question: How do you make your program run faster? 

Answer before 2004: Wait 6 months and buy a new computer. 

Answer after 2004: You need to write parallel software.

**Example 1:** To speed up the addition of an array of numbers, we can leverage parallelism:

![GPGPU Example](gpgpu.png)
*Source: Kohei KaiGai*


**Example 2:** To use cross validation to search for optimal parameters, you could have put each parameter tested on a different computer. See the `n_jobs` parameter in the scikit-learn command [*GridSearchCV*](http://scikit-learn.org/stable/modules/generated/sklearn.model_selection.GridSearchCV.html).

### Parallel Thinking

 * Decompose work into pieces that can safely be performed in parallel
 * Assign work to processors
 * Managing communication/syncrhonization between the processors so that it does not limit speedup. 


### Exercise 2

In a group, develop conceptual strategies to parallelize these operations for the following tasks. Think about how to split the input (you want to split evenly with respect to workload) and how to merge the data. 

 * Multiply all elements of a large array by 4.
 * Calcualte the mean of a very large array.
 * Check all numbers from one to a hundred thousand for primality.

Which of these operations is better to parralelize? 

### Hurdles

On the other hand, parallel computing is not easy. There are a large number of low-level programming aspects that must be handled. For example, one must consider  
- Partitioning input data
- Shared memory (Open Multiprocessing (OpenMP)) or distributed memory (Message Passing Interface (MPI)) architectures
- Scheduling execution
- Handling failures
- Interprocessor communication

There are a lot of difficult Computer Science questions here! For example, [Amdahl's law](https://en.wikipedia.org/wiki/Amdahl%27s_law) gives the theoretical speedup of a process depending on how much of the task can be parallelized. 

![Amdahl's Law](amdahls_law.png)

*Source: Daniels220, CC BY-SA 3.0, https://commons.wikimedia.org/w/index.php?curid=6678551*

## Basic Parallelism in Python

Python has support for low-level parallelism; see the 
[python documentation](https://docs.python.org/3.6/library/concurrency.html), 
[multiprocessing](https://docs.python.org/3.6/library/multiprocessing.html),
[ipyparallel](https://ipyparallel.readthedocs.io/en/latest/index.html), 
[joblib](https://pypi.python.org/pypi/joblib), 
*etc*...

We'll use the multiproceesing library for an example. It uses lower level process spawning API and provides true parellelism by spawning sub processes. It uses the multiple processor cores.

In [1]:
import time
import threading
import multiprocessing
from numpy import arange

In [2]:
def isprime(n):
    prime = True
    for i in arange(2, n):
        if n/i % 1 == 0:
            prime = False
    return prime

This function will call the function isprime for integers 1 to max_number. This will be carried out serially one number at a time.

In [3]:
def prime_serial_test(max_number):
    [isprime(i) for i in range(max_number)]

This function will create a pool of processes and will call isprime on integers from 1 to max_number. Multiple numbers will be processed at the same time, depending on the number of processes and the number of CPUs in your computer. 

Tthe number of processes spawned in the Pool must be less than or equal to number of cores in CPU for this to make sense. Using more processes than CPUs reduces the performance rather than improving it.

In [4]:
def prime_parallel_test(max_number):
    # create multiple sub-processes
    pool = multiprocessing.Pool(processes = 4)
    # pass a chunk of numbers into each process as it becomes available
    # the chunksize defines how many are put in in batch
    pool.map(isprime, range(max_number), chunksize=100)
    # pool.close() will terminate all the subprocesses once their allocated work is done.
    pool.close()
    # pool.join() makes the main processes wait for subprocesses to complete
    pool.join()

In [5]:
%time prime_serial_test(4000)

CPU times: user 4.67 s, sys: 60.2 ms, total: 4.73 s
Wall time: 4.84 s


In [6]:
%time prime_parallel_test(4000)

CPU times: user 13.8 ms, sys: 20.1 ms, total: 33.9 ms
Wall time: 1.36 s


Here are the results on a machine with 4 physical cores (8 - logical cores due to hyperthreading).

| Condition                         | Time     |
| :---------------------------------|  --------|
| Serial                            | 4.23 sec |
| Parallel  (with 2 processes)      | 2.59 sec |
| Parallel  (with 3 processes)      | 2.18 sec | 
| Parallel  (with 4 processes)      | 1.88 sec |
| Parallel  (with 8 processes)      | 1.91 sec |
| Parallel  (with 16 processes)     | 2.06 sec |

Here we calculate the sum of all pairwise products: 

In [7]:
import random
big_array = [random.random() for _ in range(5000)]
big_array[:10]
# This function multiplies each element of vector X with element y
# and returns sum of the products.
def ssum(X, y):
    return sum(x*y for x in X)

# This function calls ssum for each element of vector X with X itself.
def pw_sum(X):
    return sum(ssum(X,y) for y in X)
    
# This function makes it parallelizes the pw_sum function.
# pool.map can only take a function and one argument for the function.
# To pass two arguments we turn them into a single tuple.
# We pass it to ssum using *args which is variable arguments in python, this automatically unfolds the tuple into arguments of ssum.
def parallel_pw_sum(args):
    return ssum(*args)

# Serial call for pairwise sum.
%time results = pw_sum(big_array)
print(results)

pool = multiprocessing.Pool(processes=4)
# Notice how we pass the list to parallel_pw_sum with each element of list being a tuple as discussed in above comment.
%time results = pool.map(parallel_pw_sum, ((big_array,y) for y in big_array))
print(sum(results))

CPU times: user 1.49 s, sys: 5.54 ms, total: 1.5 s
Wall time: 1.5 s
6191253.2464611
CPU times: user 11.6 ms, sys: 5.18 ms, total: 16.8 ms
Wall time: 840 ms
6191253.2464611


## Some Object Oriented Programming

Object-oriented programming is an important paradigm that we've largely ignored so far. See also [the documentation](https://docs.python.org/3/tutorial/classes.html).

In [9]:
# The class definition
class Box():
    # initialization
    def __init__(self):
        # this is how you initialize class members
        self.nr_objects = 0
    # a method
    def isEmpty(self):
        if self.nr_objects:
            return False;
        return True;
    
    def putObjectsIn(self, nr_objects):
        self.nr_objects += nr_objects;

In [10]:
my_box = Box()
my_box.isEmpty()


True

In [11]:
my_box.putObjectsIn(3)
my_box.isEmpty()

False

### Inheritance

We can use inheritance to make a more specialized version of a class. Here, we make a smarter Box. 

In [12]:
# The parent class is passed in parantheses of the class definition
class SmartBox(Box):
    # we add a new method that is not in the original Box class
    def howManyObjects(self):
        return self.nr_objects

In [13]:
smart_box = SmartBox()
# the methods of the Box base class are available in a SmartBox
smart_box.putObjectsIn(5)
smart_box.isEmpty()

False

In [14]:
# the new method is available as well
smart_box.howManyObjects()

5

### Overriding a function

Sometimes it makes sense to replace the functionality of a base class with a specific implementation for the new class. We call this overriding: 

In [17]:
class PrintingBox(Box):
    def isEmpty(self):
        if self.nr_objects:
            print("Your Box is not empty.")
            return False;
        print("Your Box is empty.")
        return True;

Now the same call – `isEmpty()` also prints something. 

In [18]:
printing_box = PrintingBox()
printing_box.isEmpty()
printing_box.putObjectsIn(9)
printing_box.isEmpty()

Your Box is empty.
Your Box is not empty.


False

### Exercise 4

Write a class `Dog` that has an array of tricks (strings). Add and add_trick function. Initialize a couple of dogs and add different tricks. Print which tricks each dog can do. 

## MapReduce

1. Programming model for  distributed computations
+ Addressing large data sets (think ~ 1 terabyte of data)
+ Parallel and distributed algorithm
+ Cluster framework
+ Functional

**History:**

1. Developed by Google, but built on previously-developed ideas
+  Apache Hadoop is an open source implemenation 
+ implemented in Java
+ There are several Python interfaces to Hadoop, including MrJob, etc.... Unfortunately, these don't yet work nicely with Jupyter notebook. 

**Concept:**

Data is typically stored in key-value pair. The key is a unique (or to some resolution) identifier of the object, and then the value can store anything.

1. Mapping: 
Typically, the initial format of the key-value pair is very rough. The data is just stored, but
not processed much yet, and in any particular order. It may contain many irrelevant parts of information (for
the current task). So the mapping phase is in charge of getting it into the right format. Keep in mind, this
data set is very large, so it is stored once, but may be used for many many different purposes.
The mapping phase takes a file, and converts into another set of key-value pairs. The values generally
contain the data of current interest, and the keys are used to obtain locality in the next part.

2. Shuffling: 
Output of Map is typically (roughly) as large as the original data, so it also cannot all fit on
one machine. The shuffle step puts all key-value pairs with the same key on one machine (if they can all
fit). The data is redistributed and mixed up, kind of what it would look like in the mapping of card positions
when shuffling a deck of cards...

3. Reducing: 
The reduce step takes the data that has been aggregated by keys and does something useful
(application specific). This data is now all in the same location – we have locality.

![](map_reduce.png)


-See Lecture 14 of [Harvard CS109 notes](https://drive.google.com/drive/folders/0BxYkKyLxfsNVd0xicUVDS1dIS0k) and the accompanying video [here](http://cs109.github.io/2015/pages/videos.html).

Let's looks at a practical example. Note the use of the [`yield`](https://docs.python.org/3/reference/expressions.html#yieldexpr ) keyword. 

You'll have to install MRJob: 

```
pip install mrjob 
```

Here we count how many words, characters, and lines are in a text. Note that his will not execute in the notebook.

```python
from mrjob.job import MRJob

class MRWordFrequencyCount(MRJob):

    def mapper(self, _, line):
        yield "chars", len(line)
        yield "words", len(line.split())
        yield "lines", 1

    def reducer(self, key, values):
        yield key, sum(values)


if __name__ == '__main__':
    MRWordFrequencyCount.run()
```

It's easier to run the above code from the console. The code is available in the file [word_frequency.py](word_frequency.py). Here's the (sanitized) output if we run this: 

```
$python word_frequency.py pg2701.txt
"words"	215135
"lines"	22108
"chars"	1213077
```

**Now let's count the number of specific words**

![](mapreducewc.png)

```python
from mrjob.job import MRJob
import re

class MRCountWords(MRJob):

    def mapper(self, _, line):
        for word in line.split():
            word = re.sub("[^a-zA-Z]+", "", word)
            yield word.lower(), 1

    def reducer(self, word, occurences):
        yield word, sum(occurences)

if __name__ == '__main__':
    MRCountWords.run()
```

This is what the output for the above code looks like: 

```
"seuss"	2
"seven"	2
"sew"	2
"sews"	9
"shake"	2
"shame"	4
"she"	7
"sheep"	2
"shine"	1
"ship"	4
"shocking"	1
"shoe"	2
"shoes"	2
"shook"	4
"should"	15
"shove"	1
"show"	4
"shut"	2
"sick"	2
"side"	2
"silly"	1
"simply"	2
...
```

[Google Ngram viewer](https://books.google.com/ngrams) is an example where map reduce word freuqencies becomes useful.

### Combiners

This is fine, but we leave a lot of work for the shuffle and sort step. We can do better, by doing a "local reduce", which is known as combiners: 


![](combine.png)

Note that the combiner has to have the same method signature as the reducer. Sometimes they can be the same, as in this example:

```python
from mrjob.job import MRJob
import re

class MRCountWords(MRJob):

    def mapper(self, _, line):
        for word in line.split():
            word = re.sub("[^a-zA-Z]+", "", word)
            yield word.lower(), 1

    # new: the combiner
    def combiner(self, word, occurences):
        yield word, sum(occurences)
            
    def reducer(self, word, occurences):
        yield word, sum(occurences)

if __name__ == '__main__':
    MRCountWords.run()
```

### Exercise 5:
How would you use MapReduce to find anagrams?

## Spark

Spark can be thought of as  MapReduce 2.0

- In memory as opposed to disk
- Data can be cached in memory or disk for future use
- 100x faster than Hadoop MapReduce in memory or 10x faster on disk
- resilient distributed dataset (RDD)
- Python, Java, and Scala interfaces
- [apache-spark](http://spark.apache.org/) can be used in python through [findspark](https://github.com/minrk/findspark)
- Easier than Hadoop while being functional, runs a general DAG

For more, see Lecture 15 of [Harvard CS109 notes](https://drive.google.com/drive/folders/0BxYkKyLxfsNVd0xicUVDS1dIS0k) 
and accompanying [notebook](https://github.com/cs109/2015/blob/master/Lectures/15b-Spark.ipynb)